|
@@ -31,13 +31,29 @@ void CServer::Init()
|
|
|
//开始监听客户端的消息
|
|
//开始监听客户端的消息
|
|
|
start_accept();
|
|
start_accept();
|
|
|
|
|
|
|
|
|
|
+ int n_cpu = CSystem::get_CPU_core_num();
|
|
|
|
|
+ LOG_INFO("cpu num:" << n_cpu);
|
|
|
|
|
+
|
|
|
//开始接受和处理mns的消息
|
|
//开始接受和处理mns的消息
|
|
|
- start_aliyun_mns();
|
|
|
|
|
|
|
+ for (int i = 0; i < n_cpu * 2; i++)
|
|
|
|
|
+ {
|
|
|
|
|
+ std::thread t(&CServer::ReceiveMNSMessage, this);
|
|
|
|
|
+ t.detach();
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
//任务队列,发送消息给客户端
|
|
//任务队列,发送消息给客户端
|
|
|
- std::thread(&CServer::SendMessageToClient, this).detach();
|
|
|
|
|
|
|
+ for (int i = 0; i < n_cpu * 2; i++)
|
|
|
|
|
+ {
|
|
|
|
|
+ std::thread t(&CServer::SendMessageToClient, this);
|
|
|
|
|
+ t.detach();
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- std::thread(&CServer::HandleOfflineMessage, this).detach();
|
|
|
|
|
|
|
+ //处理离线消息的队列
|
|
|
|
|
+ for (int i = 0; i < n_cpu * 2; i++)
|
|
|
|
|
+ {
|
|
|
|
|
+ std::thread t(&CServer::HandleOfflineMessage, this);
|
|
|
|
|
+ t.detach();
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
//开始异步执行
|
|
//开始异步执行
|
|
|
io_context_.run();
|
|
io_context_.run();
|
|
@@ -192,6 +208,7 @@ void CServer::HandleOfflineMessage()
|
|
|
{
|
|
{
|
|
|
while (sqlite3_step(stmt) == SQLITE_ROW)
|
|
while (sqlite3_step(stmt) == SQLITE_ROW)
|
|
|
{
|
|
{
|
|
|
|
|
+ std::string id = (char*)sqlite3_column_text(stmt, 0);
|
|
|
std::string username = (char*)sqlite3_column_text(stmt, 1);
|
|
std::string username = (char*)sqlite3_column_text(stmt, 1);
|
|
|
std::string due_time = (char*)sqlite3_column_text(stmt, 2);
|
|
std::string due_time = (char*)sqlite3_column_text(stmt, 2);
|
|
|
std::string data = (char*)sqlite3_column_text(stmt, 3);
|
|
std::string data = (char*)sqlite3_column_text(stmt, 3);
|
|
@@ -209,7 +226,18 @@ void CServer::HandleOfflineMessage()
|
|
|
CClientSession* session = m_clients_map[username];
|
|
CClientSession* session = m_clients_map[username];
|
|
|
m_map_mutex.unlock();
|
|
m_map_mutex.unlock();
|
|
|
|
|
|
|
|
- session->send_message(data);
|
|
|
|
|
|
|
+ bool ret = session->send_message(data);
|
|
|
|
|
+ if (ret == false)
|
|
|
|
|
+ {
|
|
|
|
|
+ //发送失败,把数据写会数据库,等下次发送
|
|
|
|
|
+ AddMessageToDB(username, due_time, data);
|
|
|
|
|
+
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ //成功发送一条,这里就删除一条
|
|
|
|
|
+ sql = "DELETE FROM pos_message WHERE id = '" + id + "';";
|
|
|
|
|
+ sqlite3_exec(m_db, sql.c_str(), 0, 0, 0);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -221,10 +249,6 @@ void CServer::HandleOfflineMessage()
|
|
|
//异常情况
|
|
//异常情况
|
|
|
sqlite3_finalize(stmt);
|
|
sqlite3_finalize(stmt);
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- //把所有数据库的离线消息都删掉
|
|
|
|
|
- //sql = "DELETE FROM pos_message WHERE username = '" + username + "';";
|
|
|
|
|
- //sqlite3_exec(m_db, sql.c_str(), 0, 0, 0);
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -268,7 +292,7 @@ void CServer::BindUsername(std::string username, CClientSession* session)
|
|
|
|
|
|
|
|
if (m_clients_map.find(username) != m_clients_map.end() && m_clients_map[username]->GetNum() < session->GetNum())
|
|
if (m_clients_map.find(username) != m_clients_map.end() && m_clients_map[username]->GetNum() < session->GetNum())
|
|
|
{
|
|
{
|
|
|
- //之前已经存在了一个,先把直接的关闭掉
|
|
|
|
|
|
|
+ //之前已经存在了一个,先把直接的关闭掉(这个是服务器端主动的关闭)
|
|
|
m_clients_map[username]->stop();
|
|
m_clients_map[username]->stop();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -289,20 +313,6 @@ void CServer::DeleteClient(std::string username)
|
|
|
m_map_mutex.unlock();
|
|
m_map_mutex.unlock();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-void CServer::start_aliyun_mns()
|
|
|
|
|
-{
|
|
|
|
|
- //开启线程,监听消息
|
|
|
|
|
- int n_cpu = CSystem::get_CPU_core_num();
|
|
|
|
|
- LOG_INFO("cpu num:" << n_cpu);
|
|
|
|
|
-
|
|
|
|
|
- //创建跟CPU核数2倍的线程数量来处理消息队列的消息
|
|
|
|
|
- for(int i = 0; i < n_cpu * 2; i++)
|
|
|
|
|
- {
|
|
|
|
|
- std::thread t(&CServer::ReceiveMNSMessage, this);
|
|
|
|
|
- t.detach();
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
/*
|
|
/*
|
|
|
*接收消息,并且对消息进行处理
|
|
*接收消息,并且对消息进行处理
|
|
|
**/
|
|
**/
|
|
@@ -414,7 +424,12 @@ void CServer::SendMessageToClient()
|
|
|
m_map_mutex.unlock();
|
|
m_map_mutex.unlock();
|
|
|
|
|
|
|
|
//直接把消息发给客户端
|
|
//直接把消息发给客户端
|
|
|
- session->send_message(data);
|
|
|
|
|
|
|
+ bool ret = session->send_message(data);
|
|
|
|
|
+ if (ret == false)
|
|
|
|
|
+ {
|
|
|
|
|
+ //如果发送失败了,把消息存回到数据库
|
|
|
|
|
+ AddMessageToDB(username, due_time, data);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|