replies.hpp 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_REPLIES_HPP
  8. #define BOOST_MQTT5_REPLIES_HPP
  9. #include <boost/mqtt5/types.hpp>
  10. #include <boost/mqtt5/detail/control_packet.hpp>
  11. #include <boost/mqtt5/detail/internal_types.hpp>
  12. #include <boost/asio/any_completion_handler.hpp>
  13. #include <boost/asio/any_io_executor.hpp>
  14. #include <boost/asio/async_result.hpp>
  15. #include <boost/asio/consign.hpp>
  16. #include <boost/asio/dispatch.hpp>
  17. #include <boost/asio/error.hpp>
  18. #include <boost/asio/post.hpp>
  19. #include <boost/asio/prepend.hpp>
  20. #include <algorithm>
  21. #include <chrono>
  22. #include <cstdint>
  23. #include <memory>
  24. #include <string>
  25. #include <vector>
  26. namespace boost::mqtt5::detail {
  27. namespace asio = boost::asio;
  28. class replies {
  29. public:
  30. using executor_type = asio::any_io_executor;
  31. private:
  32. using Signature = void (error_code, byte_citer, byte_citer);
  33. static constexpr auto max_reply_time = std::chrono::seconds(20);
  34. class reply_handler {
  35. asio::any_completion_handler<Signature> _handler;
  36. control_code_e _code;
  37. uint16_t _packet_id;
  38. std::chrono::time_point<std::chrono::system_clock> _ts;
  39. public:
  40. template <typename H>
  41. reply_handler(control_code_e code, uint16_t pid, H&& handler) :
  42. _handler(std::forward<H>(handler)), _code(code), _packet_id(pid),
  43. _ts(std::chrono::system_clock::now())
  44. {}
  45. reply_handler(reply_handler&&) = default;
  46. reply_handler(const reply_handler&) = delete;
  47. reply_handler& operator=(reply_handler&&) = default;
  48. reply_handler& operator=(const reply_handler&) = delete;
  49. void complete(
  50. error_code ec,
  51. byte_citer first = byte_citer {}, byte_citer last = byte_citer {}
  52. ) {
  53. std::move(_handler)(ec, first, last);
  54. }
  55. void complete_post(const executor_type& ex, error_code ec) {
  56. asio::post(
  57. ex,
  58. asio::prepend(
  59. std::move(_handler), ec, byte_citer {}, byte_citer {}
  60. )
  61. );
  62. }
  63. uint16_t packet_id() const noexcept {
  64. return _packet_id;
  65. }
  66. control_code_e code() const noexcept {
  67. return _code;
  68. }
  69. auto time() const noexcept {
  70. return _ts;
  71. }
  72. };
  73. executor_type _ex;
  74. using handlers = std::vector<reply_handler>;
  75. handlers _handlers;
  76. struct fast_reply {
  77. control_code_e code;
  78. uint16_t packet_id;
  79. std::unique_ptr<std::string> packet;
  80. };
  81. using fast_replies = std::vector<fast_reply>;
  82. fast_replies _fast_replies;
  83. public:
  84. template <typename Executor>
  85. explicit replies(Executor ex) : _ex(std::move(ex)) {}
  86. replies(replies&&) = default;
  87. replies(const replies&) = delete;
  88. replies& operator=(replies&&) = default;
  89. replies& operator=(const replies&) = delete;
  90. template <typename CompletionToken>
  91. decltype(auto) async_wait_reply(
  92. control_code_e code, uint16_t packet_id, CompletionToken&& token
  93. ) {
  94. auto dup_handler_ptr = find_handler(code, packet_id);
  95. if (dup_handler_ptr != _handlers.end()) {
  96. dup_handler_ptr->complete_post(_ex, asio::error::operation_aborted);
  97. _handlers.erase(dup_handler_ptr);
  98. }
  99. auto freply = find_fast_reply(code, packet_id);
  100. if (freply == _fast_replies.end()) {
  101. auto initiation = [](
  102. auto handler, replies& self,
  103. control_code_e code, uint16_t packet_id
  104. ) {
  105. self._handlers.emplace_back(
  106. code, packet_id, std::move(handler)
  107. );
  108. };
  109. return asio::async_initiate<CompletionToken, Signature>(
  110. initiation, token, std::ref(*this), code, packet_id
  111. );
  112. }
  113. auto fdata = std::move(*freply);
  114. _fast_replies.erase(freply);
  115. auto initiation = [](
  116. auto handler, std::unique_ptr<std::string> packet,
  117. const executor_type& ex
  118. ) {
  119. byte_citer first = packet->cbegin();
  120. byte_citer last = packet->cend();
  121. asio::post(
  122. ex,
  123. asio::consign(
  124. asio::prepend(
  125. std::move(handler), error_code {}, first, last
  126. ),
  127. std::move(packet)
  128. )
  129. );
  130. };
  131. return asio::async_initiate<CompletionToken, Signature>(
  132. initiation, token, std::move(fdata.packet), _ex
  133. );
  134. }
  135. void dispatch(
  136. error_code ec, control_code_e code, uint16_t packet_id,
  137. byte_citer first, byte_citer last
  138. ) {
  139. auto handler_ptr = find_handler(code, packet_id);
  140. if (handler_ptr == _handlers.end()) {
  141. _fast_replies.push_back({
  142. code, packet_id,
  143. std::make_unique<std::string>(first, last)
  144. });
  145. return;
  146. }
  147. auto handler = std::move(*handler_ptr);
  148. _handlers.erase(handler_ptr);
  149. handler.complete(ec, first, last);
  150. }
  151. void resend_unanswered() {
  152. auto ua = std::move(_handlers);
  153. for (auto& h : ua)
  154. h.complete(asio::error::try_again);
  155. }
  156. void cancel_unanswered() {
  157. auto ua = std::move(_handlers);
  158. for (auto& h : ua)
  159. h.complete_post(_ex, asio::error::operation_aborted);
  160. }
  161. bool any_expired() {
  162. auto now = std::chrono::system_clock::now();
  163. return std::any_of(
  164. _handlers.begin(), _handlers.end(),
  165. [now](const auto& h) {
  166. return now - h.time() > max_reply_time;
  167. }
  168. );
  169. }
  170. void clear_fast_replies() {
  171. _fast_replies.clear();
  172. }
  173. void clear_pending_pubrels() {
  174. for (auto it = _handlers.begin(); it != _handlers.end();) {
  175. if (it->code() == control_code_e::pubrel) {
  176. it->complete(asio::error::operation_aborted);
  177. it = _handlers.erase(it);
  178. }
  179. else
  180. ++it;
  181. }
  182. }
  183. private:
  184. handlers::iterator find_handler(control_code_e code, uint16_t packet_id) {
  185. return std::find_if(
  186. _handlers.begin(), _handlers.end(),
  187. [code, packet_id](const auto& h) {
  188. return h.code() == code && h.packet_id() == packet_id;
  189. }
  190. );
  191. }
  192. fast_replies::iterator find_fast_reply(
  193. control_code_e code, uint16_t packet_id
  194. ) {
  195. return std::find_if(
  196. _fast_replies.begin(), _fast_replies.end(),
  197. [code, packet_id](const auto& f) {
  198. return f.code == code && f.packet_id == packet_id;
  199. }
  200. );
  201. }
  202. };
  203. } // end namespace boost::mqtt5::detail
  204. #endif // !BOOST_MQTT5_REPLIES_HPP