| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- #include "CServer.h"
- #include "../helper/CAliyunMNS.h"
- void CServer::start_accept()
- {
- //新建一个客户端会话
- CClientSession* new_session = new CClientSession(io_context_);
- new_session->SetServer(this);
- acceptor_.async_accept(new_session->socket(),
- boost::bind(&CServer::handle_accept, this, new_session,
- boost::asio::placeholders::error));
- }
- void CServer::handle_accept(CClientSession* new_session,
- const boost::system::error_code& error)
- {
- if (!error)
- {
- new_session->start();
- }
- else
- {
- delete new_session;
- }
- start_accept();
- }
- void CServer::BindUsername(std::string username, CClientSession* session)
- {
- m_clients_map[username] = session;
- }
- void CServer::start_aliyun_mns()
- {
- //开启线程,监听消息
- int n_cpu = CSystem::get_CPU_core_num();
- LOG_INFO("cpu num:" << n_cpu);
- //创建跟CPU核数2倍的线程数量来处理消息队列的消息
- for(int i = 0; i < n_cpu * 2; i++)
- {
- std::thread t(&CServer::ReceiveMNSMessage, this);
- t.detach();
- }
- }
- /*
- *接收消息,并且对消息进行处理
- **/
- void CServer::ReceiveMNSMessage()
- {
- CAliyunMNS mns(CConfigReader::getTaskQueue());
- if(mns.getIsInit() == false)
- {
- //初始化消息队列失败了,线程直接退出
- LOG_INFO("Init AliyunMNS Failed!");
- return;
- }
- while(true)
- {
- std::string message = mns.getMessage();
- if(message == "error!")
- {
- continue;
- }
- LOG_INFO("get new message:" << message.c_str());
- //获取到了新的消息,开始进行处理
- rapidjson::Document document;
- document.Parse(message.c_str());
- if(!document.IsObject())
- {
- LOG_INFO("message 非法!");
- continue;
- }
- //处理消息类型 { "username":"zhangyang","order_id":"86425730"}
- int type = document["type"].GetInt();
- std::string username = document["username"].GetString();
- std::string order_id = document["order_id"].GetString();
- std::string order_no = document["order_no"].GetString();
- //把消息放进队列,队列发给客户端
- CClientMessage newMessage;
- newMessage.m_type = type;
- newMessage.m_username = username;
- newMessage.m_order_id = order_id;
- newMessage.m_order_no = order_no;
- m_queue_mutex.lock();
- m_message_queue.push(newMessage);
- m_queue_mutex.unlock();
- }
- }
- /*
- *负责把队列中的消息发给客户端
- **/
- void CServer::SendMessageToClient()
- {
- while (1)
- {
- m_queue_mutex.lock();
- if (m_message_queue.empty())
- {
- m_queue_mutex.unlock();
- CSystem::my_sleep(1);
- continue;
- }
- CClientMessage msg = m_message_queue.front();
- m_message_queue.pop();
- m_queue_mutex.unlock();
- std::string username = msg.m_username;
- //返回给接入层的消息
- std::string res_msg = msg.toJson();
- //找到username对应的socket,然后发消息过去
- if (m_clients_map.find(username) != m_clients_map.end())
- {
- m_clients_map[username]->send_message(res_msg);
- }
- }
-
- }
|