CServer.cpp 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. #include "CServer.h"
  2. #include "../helper/CAliyunMNS.h"
  3. void CServer::start_accept()
  4. {
  5. //新建一个客户端会话
  6. CClientSession* new_session = new CClientSession(io_context_);
  7. new_session->SetServer(this);
  8. acceptor_.async_accept(new_session->socket(),
  9. boost::bind(&CServer::handle_accept, this, new_session,
  10. boost::asio::placeholders::error));
  11. }
  12. void CServer::handle_accept(CClientSession* new_session,
  13. const boost::system::error_code& error)
  14. {
  15. if (!error)
  16. {
  17. new_session->start();
  18. }
  19. else
  20. {
  21. delete new_session;
  22. }
  23. start_accept();
  24. }
  25. void CServer::BindUsername(std::string username, CClientSession* session)
  26. {
  27. m_clients_map[username] = session;
  28. }
  29. void CServer::start_aliyun_mns()
  30. {
  31. //开启线程,监听消息
  32. int n_cpu = CSystem::get_CPU_core_num();
  33. LOG_INFO("cpu num:" << n_cpu);
  34. //创建跟CPU核数2倍的线程数量来处理消息队列的消息
  35. for(int i = 0; i < n_cpu * 2; i++)
  36. {
  37. std::thread t(&CServer::ReceiveMNSMessage, this);
  38. t.detach();
  39. }
  40. }
  41. /*
  42. *接收消息,并且对消息进行处理
  43. **/
  44. void CServer::ReceiveMNSMessage()
  45. {
  46. CAliyunMNS mns(CConfigReader::getTaskQueue());
  47. if(mns.getIsInit() == false)
  48. {
  49. //初始化消息队列失败了,线程直接退出
  50. LOG_INFO("Init AliyunMNS Failed!");
  51. return;
  52. }
  53. while(true)
  54. {
  55. std::string message = mns.getMessage();
  56. if(message == "error!")
  57. {
  58. continue;
  59. }
  60. LOG_INFO("get new message:" << message.c_str());
  61. //获取到了新的消息,开始进行处理
  62. rapidjson::Document document;
  63. document.Parse(message.c_str());
  64. if(!document.IsObject())
  65. {
  66. LOG_INFO("message 非法!");
  67. continue;
  68. }
  69. //处理消息类型 { "username":"zhangyang","order_id":"86425730"}
  70. int type = document["type"].GetInt();
  71. std::string username = document["username"].GetString();
  72. std::string order_id = document["order_id"].GetString();
  73. std::string order_no = document["order_no"].GetString();
  74. //把消息放进队列,队列发给客户端
  75. CClientMessage newMessage;
  76. newMessage.m_type = type;
  77. newMessage.m_username = username;
  78. newMessage.m_order_id = order_id;
  79. newMessage.m_order_no = order_no;
  80. m_queue_mutex.lock();
  81. m_message_queue.push(newMessage);
  82. m_queue_mutex.unlock();
  83. }
  84. }
  85. /*
  86. *负责把队列中的消息发给客户端
  87. **/
  88. void CServer::SendMessageToClient()
  89. {
  90. while (1)
  91. {
  92. m_queue_mutex.lock();
  93. if (m_message_queue.empty())
  94. {
  95. m_queue_mutex.unlock();
  96. CSystem::my_sleep(1);
  97. continue;
  98. }
  99. CClientMessage msg = m_message_queue.front();
  100. m_message_queue.pop();
  101. m_queue_mutex.unlock();
  102. std::string username = msg.m_username;
  103. //返回给接入层的消息
  104. std::string res_msg = msg.toJson();
  105. //找到username对应的socket,然后发消息过去
  106. if (m_clients_map.find(username) != m_clients_map.end())
  107. {
  108. m_clients_map[username]->send_message(res_msg);
  109. }
  110. }
  111. }