CServer.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. #include "CServer.h"
  2. #include "../helper/CAliyunMNS.h"
  3. void CServer::Init()
  4. {
  5. //TCP服务器默认使用9001端口
  6. m_port = 9001;
  7. acceptor_ = new tcp::acceptor(io_context_, tcp::endpoint(tcp::v4(), m_port));
  8. //开始监听客户端的消息
  9. start_accept();
  10. //开始接受和处理mns的消息
  11. start_aliyun_mns();
  12. //任务队列,发送消息给客户端
  13. std::thread t(&CServer::SendMessageToClient, this);
  14. t.detach();
  15. //开始异步执行
  16. io_context_.run();
  17. }
  18. void CServer::start_accept()
  19. {
  20. //新建一个客户端会话
  21. CClientSession* new_session = new CClientSession(io_context_);
  22. new_session->SetServer(this);
  23. //记录客户端的顺序
  24. m_nClientCount++;
  25. new_session->SetNum(m_nClientCount);
  26. acceptor_->async_accept(new_session->socket(),
  27. boost::bind(&CServer::handle_accept, this, new_session,
  28. boost::asio::placeholders::error));
  29. }
  30. void CServer::handle_accept(CClientSession* new_session,
  31. const boost::system::error_code& error)
  32. {
  33. if (!error)
  34. {
  35. new_session->start();
  36. }
  37. else
  38. {
  39. delete new_session;
  40. }
  41. //继续监听下一个客户端
  42. start_accept();
  43. }
  44. /*
  45. *将客户端的用户名和socket进行绑定,每个用户名只能绑定一个socket,后登陆的会删掉前面先登录的
  46. **/
  47. void CServer::BindUsername(std::string username, CClientSession* session)
  48. {
  49. m_map_mutex.lock();
  50. if (m_clients_map.find(username) != m_clients_map.end() && m_clients_map[username]->GetNum() < session->GetNum())
  51. {
  52. //之前已经存在了一个,先把直接的关闭掉
  53. m_clients_map[username]->stop();
  54. }
  55. m_clients_map[username] = session;
  56. m_map_mutex.unlock();
  57. }
  58. void CServer::DeleteClient(std::string username)
  59. {
  60. m_map_mutex.lock();
  61. if (m_clients_map.find(username) != m_clients_map.end())
  62. {
  63. m_clients_map.erase(username);
  64. }
  65. m_map_mutex.unlock();
  66. }
  67. void CServer::start_aliyun_mns()
  68. {
  69. //开启线程,监听消息
  70. int n_cpu = CSystem::get_CPU_core_num();
  71. LOG_INFO("cpu num:" << n_cpu);
  72. //创建跟CPU核数2倍的线程数量来处理消息队列的消息
  73. for(int i = 0; i < n_cpu * 2; i++)
  74. {
  75. std::thread t(&CServer::ReceiveMNSMessage, this);
  76. t.detach();
  77. }
  78. }
  79. /*
  80. *接收消息,并且对消息进行处理
  81. **/
  82. void CServer::ReceiveMNSMessage()
  83. {
  84. CAliyunMNS mns(CConfigReader::getTaskQueue());
  85. if(mns.getIsInit() == false)
  86. {
  87. //初始化消息队列失败了,线程直接退出
  88. LOG_INFO("Init AliyunMNS Failed!");
  89. return;
  90. }
  91. while(true)
  92. {
  93. std::string message = mns.getMessage();
  94. if(message == "error!")
  95. {
  96. continue;
  97. }
  98. LOG_INFO("get new message:" << message.c_str());
  99. //获取到了新的消息,开始进行处理
  100. rapidjson::Document document;
  101. document.Parse(message.c_str());
  102. if(!document.IsObject())
  103. {
  104. LOG_INFO("message 非法!");
  105. continue;
  106. }
  107. //处理消息类型
  108. int type = document["msg_type"].GetInt();
  109. std::string timestamp = document["timestamp"].GetString();
  110. std::string use_time = document["use_time"].GetString();
  111. std::string username = document["username"].GetString();
  112. std::string waimai_order_id = document["waimai_order_id"].GetString();
  113. std::string waimai_order_no = document["waimai_order_no"].GetString();
  114. //判断是否过期
  115. if (abs(time(NULL) - atoi(timestamp.c_str())) > 1800)
  116. {
  117. //如果消息已经超过了30分钟,直接丢弃
  118. continue;
  119. }
  120. //把消息放进队列
  121. CClientMessage newMessage;
  122. newMessage.msg_type = type;
  123. newMessage.timestamp = to_string((NULL));
  124. newMessage.m_username = username;
  125. newMessage.m_order_id = waimai_order_id;
  126. newMessage.m_order_no = waimai_order_no;
  127. m_queue_mutex.lock();
  128. m_message_queue.push(newMessage);
  129. m_queue_mutex.unlock();
  130. }
  131. }
  132. /*
  133. *负责把队列中的消息发给客户端
  134. **/
  135. void CServer::SendMessageToClient()
  136. {
  137. while (1)
  138. {
  139. m_queue_mutex.lock();
  140. if (m_message_queue.empty())
  141. {
  142. m_queue_mutex.unlock();
  143. CSystem::my_sleep(1);
  144. continue;
  145. }
  146. CClientMessage msg = m_message_queue.front();
  147. m_message_queue.pop();
  148. m_queue_mutex.unlock();
  149. std::string username = msg.m_username;
  150. //判断消息对应的用户是否在线,如果不在线就消息暂存在数据库
  151. m_map_mutex.lock();
  152. if (m_clients_map.find(username) == m_clients_map.end())
  153. {
  154. m_map_mutex.unlock();
  155. //客户端不在线,操作存数据库
  156. }
  157. else
  158. {
  159. CClientSession* session = m_clients_map[username];
  160. m_map_mutex.unlock();
  161. //返回给接入层的消息
  162. std::string res_msg = msg.toJson();
  163. session->send_message(res_msg);
  164. }
  165. }
  166. }