#include "../pch/pch.h" #include "CServer.h" #include "../helper/CAliyunMNS.h" CServer::CServer() { } CServer::~CServer() { LOG_ERROR("start ~CServer"); delete acceptor_; if (m_db != NULL) { sqlite3_close(m_db); } LOG_ERROR("end ~CServer"); } void CServer::Init() { LOG_ERROR("start Init()"); //初始化数据库 InitSqlLite(); //使用9001端口 boost::asio::ip::tcp::endpoint endpotion(boost::asio::ip::tcp::v4(), 9001); acceptor_ = new tcp::acceptor(io_context_, endpotion); //开始监听客户端的消息 start_accept(); int n_cpu = CSystem::get_CPU_core_num(); //开始接受和处理mns的消息 for (int i = 0; i < n_cpu * 2; i++) { std::thread t(&CServer::ReceiveMNSMessage, this); t.detach(); } //任务队列,发送消息给客户端 for (int i = 0; i < n_cpu * 2; i++) { std::thread t(&CServer::SendMessageToClient, this); t.detach(); } //处理离线消息的队列 for (int i = 0; i < n_cpu * 2; i++) { std::thread t(&CServer::HandleOfflineMessage, this); t.detach(); } //开一个线程,每隔30分钟清理一次过期的消息 for (int i = 0; i < 1; i++) { std::thread t(&CServer::HandleCleanupDueMessage, this); t.detach(); } LOG_ERROR("ready to run"); try { //开始异步执行 io_context_.run(); } catch (std::exception& e) { LOG_ERROR("io_context Exception: " << e.what()); } LOG_ERROR("stop run"); } void CServer::InitSqlLite() { #ifdef _WIN32 wstring folderPath = CSystem::GetProgramDir() + L"\\db"; if (!CSystem::IsDirExist(folderPath)) { bool flag = CreateDirectory(folderPath.c_str(), NULL); bool a = flag; } //如果没有这个文件,这里会创建这个文件 wstring path = CSystem::GetProgramDir() + L"\\db\\pos.db"; string s_path = CLewaimaiString::UnicodeToUTF8(path); m_rc = sqlite3_open(s_path.c_str(), &m_db); #else std::string db_path = CSystem::getAbsopath() + "../db/pos.db"; //LOG_INFO("db path:" << db_path.c_str()); m_rc = sqlite3_open(db_path.c_str(), &m_db); #endif if (m_rc) { LOG_ERROR("Can't open database: " << sqlite3_errmsg(m_db)); return; } else { //LOG_INFO("Opened database successfully"); } //初始化数据表 std::string sql = "SELECT COUNT(*) FROM sqlite_master where type = 'table' and name = 'pos_message';"; sqlite3_stmt * stmt = NULL; //读取厨房打印机的参数 if (sqlite3_prepare_v2(m_db, sql.c_str(), -1, &stmt, NULL) == SQLITE_OK) { if (sqlite3_step(stmt) == SQLITE_ROW) { int count = sqlite3_column_int(stmt, 0); if (count == 0) { //说明没找到这个表,那么这个时候新建这个表,先释放前面的stmt sqlite3_finalize(stmt); stmt = NULL; sql = "CREATE TABLE pos_message(" \ "id INTEGER PRIMARY KEY AUTOINCREMENT,"\ "username CHAR(100) NOT NULL," \ "due_time CHAR(100) NOT NULL," \ "data CHAR(2000) NOT NULL);"; if (sqlite3_prepare_v2(m_db, sql.c_str(), -1, &stmt, NULL) == SQLITE_OK) { //执行该语句 if (sqlite3_step(stmt) != SQLITE_DONE) { std::string err = sqlite3_errmsg(m_db); LOG_INFO("create table fail: " << err.c_str()); sqlite3_finalize(stmt); return; } //走到这里就是表创建成功了 LOG_INFO("create table success"); sqlite3_finalize(stmt); } else { LOG_INFO("create table prepare fail: " << sqlite3_errmsg(m_db)); sqlite3_finalize(stmt); return; } } else { //说明已经有这个表了,就不用再创建了 sqlite3_finalize(stmt); } } } } bool CServer::AddMessageToDB(std::string username, std::string due_time, std::string data) { int result = sqlite3_exec(m_db, "BEGIN;", 0, 0, 0); std::string sql = "INSERT INTO pos_message (username, due_time, data) VALUES ('" + username + "' ,'" + due_time + "','" + data + "')"; result = sqlite3_exec(m_db, sql.c_str(), 0, 0, 0); result = sqlite3_exec(m_db, "COMMIT;", 0, 0, 0); if (result == SQLITE_OK) { //LOG_INFO("save params success"); return true; } //LOG_INFO("save params fail"); return false; } /* *把要处理离线消息的用户名,加入到队列 **/ void CServer::CheckOfflineMessage(std::string username) { m_offlineMsg_mutex.lock(); m_offlineMsg.push(username); m_offlineMsg_mutex.unlock(); } void CServer::HandleOfflineMessage() { while (1) { m_offlineMsg_mutex.lock(); if (m_offlineMsg.empty()) { m_offlineMsg_mutex.unlock(); CSystem::my_sleep(1); continue; } std::string username = m_offlineMsg.front(); m_offlineMsg.pop(); m_offlineMsg_mutex.unlock(); //先把过期的消息全部删除 std::string curTime = CLewaimaiTime::DatetimeToString(time(NULL)); std::string sql = "DELETE FROM pos_message WHERE username = '" + username + "' AND due_time <= '" + curTime + "';"; sqlite3_exec(m_db, sql.c_str(), 0, 0, 0); //再看有没有剩余的离线消息 sql = "SELECT * FROM pos_message WHERE username = '" + username + "' AND due_time > '" + curTime + "';"; sqlite3_stmt * stmt = NULL; if (sqlite3_prepare_v2(m_db, sql.c_str(), -1, &stmt, NULL) == SQLITE_OK) { while (sqlite3_step(stmt) == SQLITE_ROW) { std::string id = (char*)sqlite3_column_text(stmt, 0); std::string username = (char*)sqlite3_column_text(stmt, 1); std::string due_time = (char*)sqlite3_column_text(stmt, 2); std::string data = (char*)sqlite3_column_text(stmt, 3); m_map_mutex.lock(); if (m_clients_map.find(username) == m_clients_map.end()) { m_map_mutex.unlock(); //socket失效了,下次再处理 break; } else { CClientSession* session = m_clients_map[username]; m_map_mutex.unlock(); bool ret = session->send_message(data); if (ret == false) { //发送失败,等下次发送 break; } //成功发送一条,这里就删除一条 sql = "DELETE FROM pos_message WHERE id = '" + id + "';"; sqlite3_exec(m_db, sql.c_str(), 0, 0, 0); } } sqlite3_finalize(stmt); } else { //异常情况 sqlite3_finalize(stmt); } } } void CServer::start_accept() { //新建一个客户端会话 CClientSession* new_session = new CClientSession(io_context_); new_session->SetServer(this); //记录客户端的顺序 m_nClientCount++; new_session->SetNum(m_nClientCount); acceptor_->async_accept(new_session->socket(), boost::bind(&CServer::handle_accept, this, new_session, boost::asio::placeholders::error)); } void CServer::handle_accept(CClientSession* new_session, const boost::system::error_code& error) { LOG_INFO("new client, handle_accept!"); if (!error) { new_session->start(); } else { LOG_ERROR("handle_accept error"); delete new_session; } //继续监听下一个客户端 start_accept(); } /* *将客户端的用户名和socket进行绑定,每个用户名只能绑定一个socket,后登陆的会删掉前面先登录的 **/ void CServer::BindUsername(std::string username, CClientSession* session) { m_map_mutex.lock(); if (m_clients_map.find(username) != m_clients_map.end()) { if (m_clients_map[username]->GetNum() < session->GetNum()) { //之前已经存在了一个,先把之前的关闭掉(这个是服务器端主动的关闭) m_clients_map[username]->stop(); } else { LOG_ERROR("异常情况,不应该出现"); } } m_clients_map[username] = session; m_map_mutex.unlock(); LOG_INFO("绑定新客户端成功,当前客户端数量:" << m_clients_map.size()); } void CServer::DeleteClient(std::string username) { m_map_mutex.lock(); if (m_clients_map.find(username) != m_clients_map.end()) { m_clients_map.erase(username); } m_map_mutex.unlock(); LOG_INFO("删除客户端成功,当前客户端数量:" << m_clients_map.size()); } /* *接收消息,并且对消息进行处理 **/ void CServer::ReceiveMNSMessage() { CAliyunMNS mns(CConfigReader::getTaskQueue()); if(mns.getIsInit() == false) { //初始化消息队列失败了,线程直接退出 LOG_INFO("Init AliyunMNS Failed!"); return; } //LOG_INFO("Init AliyunMNS success!"); while(true) { std::string message = mns.getMessage(); if(message == "error!") { LOG_INFO("message error!"); continue; } LOG_INFO("new msn message:" << message.c_str()); //获取到了新的消息,开始进行处理 rapidjson::Document document; document.Parse(message.c_str()); if(!document.IsObject()) { LOG_INFO("message 非法!"); LOG_INFO("get new message:" << message.c_str()); continue; } //处理消息类型 if (!document["username"].IsString() || !document["timestamp"].IsString()) { continue; } std::string use_time; if (document["use_time"].IsInt()) { use_time = to_string(document["use_time"].GetInt()); } else { use_time = document["use_time"].GetString(); } std::string username = document["username"].GetString(); std::string timestamp = document["timestamp"].GetString(); rapidjson::Value& data = document["data"]; rapidjson::StringBuffer sbBuf; rapidjson::Writer Writer(sbBuf); data.Accept(Writer); std::string strData = std::string(sbBuf.GetString()); //判断是否过期 if (time(NULL) > atoi(timestamp.c_str()) + atoi(use_time.c_str())) { //消息过了有效期,直接丢弃 //LOG_INFO("message due date!"); continue; } //把消息放进队列 CClientMessage newMessage; newMessage.m_username = username; time_t start = atoi(timestamp.c_str()); time_t due = atoi(use_time.c_str()); time_t end = start + due; newMessage.m_due_time = CLewaimaiTime::DatetimeToString(end); newMessage.m_data = strData; m_queue_mutex.lock(); m_message_queue.push(newMessage); m_queue_mutex.unlock(); } } /* *负责把队列中的消息发给客户端 **/ void CServer::SendMessageToClient() { while (1) { m_queue_mutex.lock(); if (m_message_queue.empty()) { m_queue_mutex.unlock(); CSystem::my_sleep(1); continue; } CClientMessage msg = m_message_queue.front(); m_message_queue.pop(); m_queue_mutex.unlock(); std::string username = msg.m_username; std::string due_time = msg.m_due_time; std::string data = msg.m_data; //LOG_INFO("queue get new message: username:" << username.c_str() << ", due_time:" << due_time.c_str() << ", data:" << data.c_str()); //判断消息对应的用户是否在线,如果不在线就消息暂存在数据库 m_map_mutex.lock(); if (m_clients_map.find(username) == m_clients_map.end()) { m_map_mutex.unlock(); //LOG_INFO("can not find client, save to db, username:"<send_message(data); if (ret == false) { //LOG_INFO("send to client fail, save it to db, username:" << username.c_str()); //如果发送失败了,把消息存回到数据库 AddMessageToDB(username, due_time, data); } else { //LOG_INFO("send to client success, username:" << username.c_str()); } } } } /* *每30分钟清理一次数据库 **/ void CServer::HandleCleanupDueMessage() { while (true) { std::string curTime = CLewaimaiTime::DatetimeToString(time(NULL)); std::string sql = "DELETE FROM pos_message WHERE due_time <= '" + curTime + "';"; sqlite3_exec(m_db, sql.c_str(), 0, 0, 0); CSystem::my_sleep(60 * 30); } }