Explorar el Código

TCP重连机制

zhangyang hace 6 años
padre
commit
47a3cabea2

BIN
bin/Win32/Debug/zhipuzi_pos_windows/db/pos.db


BIN
bin/Win32/Debug/zhipuzi_pos_windows/zhipuzi_pos_windows.exe


BIN
bin/Win32/Debug/zhipuzi_pos_windows_server/zhipuzi_pos_windows_server.exe


+ 143 - 137
lewaimai_dispatch/network/CMessagePush.cpp

@@ -13,10 +13,14 @@ CMessagePush::~CMessagePush()
 
 void CMessagePush::Start()
 {
-    //开始处理接收消息
-    std::thread t_1(&CMessagePush::Run, this);
+    //心跳包
+    std::thread t_1(&CMessagePush::KeepAlive, this);
     t_1.detach();
 
+	//接收消息
+	std::thread t_7(&CMessagePush::ReceiveMessage, this);
+	t_7.detach();
+
     //处理声音提醒
     std::thread t_2(&CMessagePush::HandleVoice, this);
     t_2.detach();
@@ -37,145 +41,147 @@ void CMessagePush::Start()
     t_6.detach();
 }
 
-void CMessagePush::Run()
+void CMessagePush::KeepAlive()
 {
-    try
-    {
-        char host[] = "127.0.0.1";
-        char port[] = "9001";
-
-        tcp::resolver resolver(m_io_context);
-        tcp::resolver::results_type endpoints =
-            resolver.resolve(tcp::v4(), host, port);
-
-        boost::asio::connect(socket_, endpoints);
-
-        //发送身份信息
-        rapidjson::Document doc;
-        doc.SetObject();
-        rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); //获取分配器
-
-        doc.AddMember("username", "zhangyang", allocator);
-
-        rapidjson::StringBuffer buffer;
-        rapidjson::Writer<StringBuffer> writer(buffer);
-        doc.Accept(writer);
-
-        //返回给接入层的消息
-        std::string m_login_msg = buffer.GetString();
-
-        //异步发送消息
-        PushMessage(m_login_msg);
-
-        //同时启动一个异步接受消息
-        socket_.async_read_some(boost::asio::buffer(data_, max_length),
-                                boost::bind(&CMessagePush::handle_read, this,
-                                            boost::asio::placeholders::error,
-                                            boost::asio::placeholders::bytes_transferred));
-
-        m_io_context.run();
-    }
-    catch(std::exception& e)
-    {
-        std::cerr << "Exception: " << e.what() << "\n";
-    }
-}
-
-void CMessagePush::PushMessage(std::string msg)
-{
-    boost::asio::async_write(socket_,
-                             boost::asio::buffer(msg.c_str(), msg.length()),
-                             boost::bind(&CMessagePush::handle_write, this,
-                                         boost::asio::placeholders::error));
+	while (m_is_work)
+	{
+		//生成心跳包
+		rapidjson::Document doc;
+		doc.SetObject();
+		rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); //获取分配器
+
+		doc.AddMember("username", "zhangyang", allocator);
+
+		rapidjson::StringBuffer buffer;
+		rapidjson::Writer<StringBuffer> writer(buffer);
+		doc.Accept(writer);
+
+		//返回给接入层的消息
+		std::string m_keepalive_msg = buffer.GetString();
+
+		try
+		{
+			socket_.write_some(boost::asio::buffer(m_keepalive_msg.c_str(), m_keepalive_msg.length()));
+		}
+		catch (const std::exception& e)
+		{
+			//走到这里来说明心跳包发送失败了,socket失效了
+			std::string err = e.what();
+			LOG_INFO("write err info:" << err.c_str());
+
+			//先把socket关闭掉
+			socket_.close();
+
+			try
+			{
+				//发送失败,重新建立连接
+				char host[] = "127.0.0.1";
+				char port[] = "9001";
+
+				tcp::resolver resolver(m_io_context);
+				tcp::resolver::results_type endpoints =
+					resolver.resolve(tcp::v4(), host, port);
+
+				boost::asio::connect(socket_, endpoints);
+
+				socket_.write_some(boost::asio::buffer(m_keepalive_msg.c_str(), m_keepalive_msg.length()));
+			}
+			catch (const std::exception& e)
+			{
+				//重新连接或者重新发送又失败了,可能是网络断了
+				std::string err = e.what();
+				LOG_INFO("write err info:" << err.c_str());
+
+				//关闭无效的连接
+				socket_.close();
+
+				//10秒后再重试
+				CSystem::my_sleep(10);
+
+				continue;
+			}
+		}
+
+		//走到这里,说明心跳包发送成功了,socket是连通的
+
+		//休眠10秒钟,之后再发心跳包
+		CSystem::my_sleep(10);
+	}
+
+	socket_.close();
 }
 
-void CMessagePush::HandleMessage(std::string msg)
+void CMessagePush::ReceiveMessage()
 {
-    try
-    {
-        //收到服务器的消息,对服务器的消息进行处理
-        rapidjson::Document document;
-        document.Parse(msg.c_str());
-        if(!document.IsObject())
-        {
-            LOG_INFO("message 非法!");
-            return;
-        }
-
-        int type = document["type"].GetInt();
-
-        if(type == 0)
-        {
-            std::string status = document["status"].GetString();
-
-            if(status == "ok")
-            {
-                //表示连接服务器成功了
-            }
-        }
-        else if(type == 1)
-        {
-            std::string username = document["username"].GetString();
-            std::string order_id = document["order_id"].GetString();
-            std::string order_no = document["order_no"].GetString();
-
-            //新订单来了,首先判断是否要语音提醒
-            if(CSetting::GetParam("setting_is_new_waimai_voice") == "1")
-            {
-                AddVoice(order_id);
-            }
-
-            //判断是否要自动确认
-            if(CSetting::GetParam("setting_is_new_waimai_autoconfirm") == "1")
-            {
-                AddConfirm(order_id);
-            }
-
-            //判断是否右下角弹框提醒
-            if(CSetting::GetParam("setting_is_new_waimai_dialog") == "1")
-            {
-
-            }
-
-            AddPinter(order_id, order_no);
-        }
-    }
-    catch(std::exception& e)
-    {
-        std::cerr << "Exception: " << e.what() << "\n";
-    }
-}
-
-void CMessagePush::handle_read(const boost::system::error_code& error,
-                               size_t bytes_transferred)
-{
-    if(!error)
-    {
-        string s_reply = data_;
-        HandleMessage(s_reply);
-
-        memset(data_, 0, max_length);
-        socket_.async_read_some(boost::asio::buffer(data_, max_length),
-                                boost::bind(&CMessagePush::handle_read, this,
-                                            boost::asio::placeholders::error,
-                                            boost::asio::placeholders::bytes_transferred));
-    }
-    else
-    {
-        //这里有时会出现,看看是什么错误,可能是断开了跟服务器的连接
-    }
-}
-
-void CMessagePush::handle_write(const boost::system::error_code& error)
-{
-    if(!error)
-    {
-        LOG_INFO("send success!");
-    }
-    else
-    {
-        //这里有时会出现,看看是什么错误,可能是断开了跟服务器的连接
-    }
+	while (true)
+	{
+		try
+		{
+			memset(data_, 0, max_length);
+			socket_.read_some(boost::asio::buffer(data_, max_length));
+
+			std::string msg = data_;
+
+			//收到服务器的消息,对服务器的消息进行处理
+			rapidjson::Document document;
+			document.Parse(msg.c_str());
+			if (!document.IsObject())
+			{
+				LOG_INFO("message 非法!");
+				return;
+			}
+
+			int type = document["type"].GetInt();
+
+			if (type == 0)
+			{
+				std::string status = document["status"].GetString();
+
+				if (status == "ok")
+				{
+					//表示连接服务器成功了
+				}
+			}
+			else if (type == 1)
+			{
+				std::string username = document["username"].GetString();
+				std::string order_id = document["order_id"].GetString();
+				std::string order_no = document["order_no"].GetString();
+
+				//新订单来了,首先判断是否要语音提醒
+				if (CSetting::GetParam("setting_is_new_waimai_voice") == "1")
+				{
+					AddVoice(order_id);
+				}
+
+				//判断是否要自动确认
+				if (CSetting::GetParam("setting_is_new_waimai_autoconfirm") == "1")
+				{
+					AddConfirm(order_id);
+				}
+
+				//判断是否右下角弹框提醒
+				if (CSetting::GetParam("setting_is_new_waimai_dialog") == "1")
+				{
+
+				}
+
+				AddPinter(order_id, order_no);
+			}
+
+			//处理完了,接着处理下一条
+			continue;
+		}
+		catch (std::exception& e)
+		{
+			std::string err = e.what();
+			LOG_INFO("read err:" << err.c_str());
+
+			//如果这里异常了,说明socket失效了,等2秒重新读
+			CSystem::my_sleep(2);
+			continue;
+		}
+	}
 }
 
 void CMessagePush::AddVoice(std::string order_id)

+ 7 - 17
lewaimai_dispatch/network/CMessagePush.h

@@ -33,19 +33,19 @@ public:
 
 	~CMessagePush();
 
+	//开始工作
 	void Start();
 
-	void Run();
-
 	void StopWork()
 	{
 		m_is_work = false;
 	}
 
-	void PushMessage(std::string msg);
+	//发送心跳包
+	void KeepAlive();
 
 	//专门处理推送消息
-	void HandleMessage(std::string msg);
+	void ReceiveMessage();
 
 	//队列处理
 	void HandleVoice();
@@ -54,22 +54,12 @@ public:
 	void HandleShouyinPrinter();
 	void HandleChufangPrinter();
 
-	void AddPinter(std::string order_id, std::string order_no);
 
+	void AddVoice(std::string order_id);
+	void AddConfirm(std::string order_id);
+	void AddPinter(std::string order_id, std::string order_no);
 	void AddShouyinPrinter(CWaimaiOrder order);
-
 	void AddChufangPrinter(CWaimaiOrder order);
-
-private:
-	void handle_read(const boost::system::error_code& error,
-		size_t bytes_transferred);
-
-	void handle_write(const boost::system::error_code& error);
-
-	void AddVoice(std::string order_id);
-
-	void AddConfirm(std::string order_id);	
-
 private:
 	bool m_is_work = true;