| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- #include "CServer.h"
- #include "../helper/CAliyunMNS.h"
- void CServer::Init()
- {
- //TCP服务器默认使用9001端口
- m_port = 9001;
- acceptor_ = new tcp::acceptor(io_context_, tcp::endpoint(tcp::v4(), m_port));
- //开始监听客户端的消息
- start_accept();
- //开始接受和处理mns的消息
- start_aliyun_mns();
- //任务队列,发送消息给客户端
- std::thread t(&CServer::SendMessageToClient, this);
- t.detach();
- //开始异步执行
- io_context_.run();
- }
- void CServer::start_accept()
- {
- //新建一个客户端会话
- CClientSession* new_session = new CClientSession(io_context_);
- new_session->SetServer(this);
- //记录客户端的顺序
- m_nClientCount++;
- new_session->SetNum(m_nClientCount);
- acceptor_->async_accept(new_session->socket(),
- boost::bind(&CServer::handle_accept, this, new_session,
- boost::asio::placeholders::error));
- }
- void CServer::handle_accept(CClientSession* new_session,
- const boost::system::error_code& error)
- {
- if (!error)
- {
- new_session->start();
- }
- else
- {
- delete new_session;
- }
- //继续监听下一个客户端
- start_accept();
- }
- /*
- *将客户端的用户名和socket进行绑定,每个用户名只能绑定一个socket,后登陆的会删掉前面先登录的
- **/
- void CServer::BindUsername(std::string username, CClientSession* session)
- {
- m_map_mutex.lock();
- if (m_clients_map.find(username) != m_clients_map.end() && m_clients_map[username]->GetNum() < session->GetNum())
- {
- //之前已经存在了一个,先把直接的关闭掉
- m_clients_map[username]->stop();
- }
- m_clients_map[username] = session;
- m_map_mutex.unlock();
- }
- void CServer::DeleteClient(std::string username)
- {
- m_map_mutex.lock();
- if (m_clients_map.find(username) != m_clients_map.end())
- {
- m_clients_map.erase(username);
- }
- m_map_mutex.unlock();
- }
- void CServer::start_aliyun_mns()
- {
- //开启线程,监听消息
- int n_cpu = CSystem::get_CPU_core_num();
- LOG_INFO("cpu num:" << n_cpu);
- //创建跟CPU核数2倍的线程数量来处理消息队列的消息
- for(int i = 0; i < n_cpu * 2; i++)
- {
- std::thread t(&CServer::ReceiveMNSMessage, this);
- t.detach();
- }
- }
- /*
- *接收消息,并且对消息进行处理
- **/
- void CServer::ReceiveMNSMessage()
- {
- CAliyunMNS mns(CConfigReader::getTaskQueue());
- if(mns.getIsInit() == false)
- {
- //初始化消息队列失败了,线程直接退出
- LOG_INFO("Init AliyunMNS Failed!");
- return;
- }
- while(true)
- {
- std::string message = mns.getMessage();
- if(message == "error!")
- {
- continue;
- }
- LOG_INFO("get new message:" << message.c_str());
- //获取到了新的消息,开始进行处理
- rapidjson::Document document;
- document.Parse(message.c_str());
- if(!document.IsObject())
- {
- LOG_INFO("message 非法!");
- continue;
- }
- //处理消息类型
- int type = document["msg_type"].GetInt();
- std::string timestamp = document["timestamp"].GetString();
- std::string use_time = document["use_time"].GetString();
- std::string username = document["username"].GetString();
- std::string waimai_order_id = document["waimai_order_id"].GetString();
- std::string waimai_order_no = document["waimai_order_no"].GetString();
- //判断是否过期
- if (abs(time(NULL) - atoi(timestamp.c_str())) > 1800)
- {
- //如果消息已经超过了30分钟,直接丢弃
- continue;
- }
- //把消息放进队列
- CClientMessage newMessage;
- newMessage.msg_type = type;
- newMessage.timestamp = to_string((NULL));
- newMessage.m_username = username;
- newMessage.m_order_id = waimai_order_id;
- newMessage.m_order_no = waimai_order_no;
- m_queue_mutex.lock();
- m_message_queue.push(newMessage);
- m_queue_mutex.unlock();
- }
- }
- /*
- *负责把队列中的消息发给客户端
- **/
- void CServer::SendMessageToClient()
- {
- while (1)
- {
- m_queue_mutex.lock();
- if (m_message_queue.empty())
- {
- m_queue_mutex.unlock();
- CSystem::my_sleep(1);
- continue;
- }
- CClientMessage msg = m_message_queue.front();
- m_message_queue.pop();
- m_queue_mutex.unlock();
- std::string username = msg.m_username;
- //判断消息对应的用户是否在线,如果不在线就消息暂存在数据库
- m_map_mutex.lock();
- if (m_clients_map.find(username) == m_clients_map.end())
- {
- m_map_mutex.unlock();
- //客户端不在线,操作存数据库
- }
- else
- {
- CClientSession* session = m_clients_map[username];
- m_map_mutex.unlock();
- //返回给接入层的消息
- std::string res_msg = msg.toJson();
- session->send_message(res_msg);
- }
- }
-
- }
|