#include "CServer.h" #include "../helper/CAliyunMNS.h" void CServer::Init() { //TCP服务器默认使用9001端口 m_port = 9001; acceptor_ = new tcp::acceptor(io_context_, tcp::endpoint(tcp::v4(), m_port)); //开始监听客户端的消息 start_accept(); //开始接受和处理mns的消息 start_aliyun_mns(); //任务队列,发送消息给客户端 std::thread t(&CServer::SendMessageToClient, this); t.detach(); //开始异步执行 io_context_.run(); } void CServer::start_accept() { //新建一个客户端会话 CClientSession* new_session = new CClientSession(io_context_); new_session->SetServer(this); //记录客户端的顺序 m_nClientCount++; new_session->SetNum(m_nClientCount); acceptor_->async_accept(new_session->socket(), boost::bind(&CServer::handle_accept, this, new_session, boost::asio::placeholders::error)); } void CServer::handle_accept(CClientSession* new_session, const boost::system::error_code& error) { if (!error) { new_session->start(); } else { delete new_session; } //继续监听下一个客户端 start_accept(); } /* *将客户端的用户名和socket进行绑定,每个用户名只能绑定一个socket,后登陆的会删掉前面先登录的 **/ void CServer::BindUsername(std::string username, CClientSession* session) { m_map_mutex.lock(); if (m_clients_map.find(username) != m_clients_map.end() && m_clients_map[username]->GetNum() < session->GetNum()) { //之前已经存在了一个,先把直接的关闭掉 m_clients_map[username]->stop(); } m_clients_map[username] = session; m_map_mutex.unlock(); } void CServer::DeleteClient(std::string username) { m_map_mutex.lock(); if (m_clients_map.find(username) != m_clients_map.end()) { m_clients_map.erase(username); } m_map_mutex.unlock(); } void CServer::start_aliyun_mns() { //开启线程,监听消息 int n_cpu = CSystem::get_CPU_core_num(); LOG_INFO("cpu num:" << n_cpu); //创建跟CPU核数2倍的线程数量来处理消息队列的消息 for(int i = 0; i < n_cpu * 2; i++) { std::thread t(&CServer::ReceiveMNSMessage, this); t.detach(); } } /* *接收消息,并且对消息进行处理 **/ void CServer::ReceiveMNSMessage() { CAliyunMNS mns(CConfigReader::getTaskQueue()); if(mns.getIsInit() == false) { //初始化消息队列失败了,线程直接退出 LOG_INFO("Init AliyunMNS Failed!"); return; } while(true) { std::string message = mns.getMessage(); if(message == "error!") { continue; } LOG_INFO("get new message:" << message.c_str()); //获取到了新的消息,开始进行处理 rapidjson::Document document; document.Parse(message.c_str()); if(!document.IsObject()) { LOG_INFO("message 非法!"); continue; } //处理消息类型 int type = document["msg_type"].GetInt(); std::string timestamp = document["timestamp"].GetString(); std::string use_time = document["use_time"].GetString(); std::string username = document["username"].GetString(); std::string waimai_order_id = document["waimai_order_id"].GetString(); std::string waimai_order_no = document["waimai_order_no"].GetString(); //判断是否过期 if (abs(time(NULL) - atoi(timestamp.c_str())) > 1800) { //如果消息已经超过了30分钟,直接丢弃 continue; } //把消息放进队列 CClientMessage newMessage; newMessage.msg_type = type; newMessage.timestamp = to_string((NULL)); newMessage.m_username = username; newMessage.m_order_id = waimai_order_id; newMessage.m_order_no = waimai_order_no; m_queue_mutex.lock(); m_message_queue.push(newMessage); m_queue_mutex.unlock(); } } /* *负责把队列中的消息发给客户端 **/ void CServer::SendMessageToClient() { while (1) { m_queue_mutex.lock(); if (m_message_queue.empty()) { m_queue_mutex.unlock(); CSystem::my_sleep(1); continue; } CClientMessage msg = m_message_queue.front(); m_message_queue.pop(); m_queue_mutex.unlock(); std::string username = msg.m_username; //判断消息对应的用户是否在线,如果不在线就消息暂存在数据库 m_map_mutex.lock(); if (m_clients_map.find(username) == m_clients_map.end()) { m_map_mutex.unlock(); //客户端不在线,操作存数据库 } else { CClientSession* session = m_clients_map[username]; m_map_mutex.unlock(); //返回给接入层的消息 std::string res_msg = msg.toJson(); session->send_message(res_msg); } } }