#include "CServer.h" #include "../helper/CAliyunMNS.h" void CServer::start_accept() { //新建一个客户端会话 CClientSession* new_session = new CClientSession(io_context_); new_session->SetServer(this); acceptor_.async_accept(new_session->socket(), boost::bind(&CServer::handle_accept, this, new_session, boost::asio::placeholders::error)); } void CServer::handle_accept(CClientSession* new_session, const boost::system::error_code& error) { if (!error) { new_session->start(); } else { delete new_session; } start_accept(); } void CServer::BindUsername(std::string username, CClientSession* session) { m_clients_map[username] = session; } void CServer::start_aliyun_mns() { //开启线程,监听消息 int n_cpu = CSystem::get_CPU_core_num(); LOG_INFO("cpu num:" << n_cpu); //创建跟CPU核数2倍的线程数量来处理消息队列的消息 for(int i = 0; i < n_cpu * 2; i++) { std::thread t(&CServer::ReceiveMNSMessage, this); t.detach(); } } /* *接收消息,并且对消息进行处理 **/ void CServer::ReceiveMNSMessage() { CAliyunMNS mns(CConfigReader::getTaskQueue()); if(mns.getIsInit() == false) { //初始化消息队列失败了,线程直接退出 LOG_INFO("Init AliyunMNS Failed!"); return; } while(true) { std::string message = mns.getMessage(); if(message == "error!") { continue; } LOG_INFO("get new message:" << message.c_str()); //获取到了新的消息,开始进行处理 rapidjson::Document document; document.Parse(message.c_str()); if(!document.IsObject()) { LOG_INFO("message 非法!"); continue; } //处理消息类型 { "username":"zhangyang","order_id":"86425730"} int type = document["type"].GetInt(); std::string username = document["username"].GetString(); std::string order_id = document["order_id"].GetString(); std::string order_no = document["order_no"].GetString(); //把消息放进队列,队列发给客户端 CClientMessage newMessage; newMessage.m_type = type; newMessage.m_username = username; newMessage.m_order_id = order_id; newMessage.m_order_no = order_no; m_queue_mutex.lock(); m_message_queue.push(newMessage); m_queue_mutex.unlock(); } } /* *负责把队列中的消息发给客户端 **/ void CServer::SendMessageToClient() { while (1) { m_queue_mutex.lock(); if (m_message_queue.empty()) { m_queue_mutex.unlock(); CSystem::my_sleep(1); continue; } CClientMessage msg = m_message_queue.front(); m_message_queue.pop(); m_queue_mutex.unlock(); std::string username = msg.m_username; //返回给接入层的消息 std::string res_msg = msg.toJson(); //找到username对应的socket,然后发消息过去 if (m_clients_map.find(username) != m_clients_map.end()) { m_clients_map[username]->send_message(res_msg); } } }