async_sender.hpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_ASYNC_SENDER_HPP
  8. #define BOOST_MQTT5_ASYNC_SENDER_HPP
  9. #include <boost/mqtt5/detail/internal_types.hpp>
  10. #include <boost/asio/any_completion_handler.hpp>
  11. #include <boost/asio/any_io_executor.hpp>
  12. #include <boost/asio/bind_allocator.hpp>
  13. #include <boost/asio/bind_executor.hpp>
  14. #include <boost/asio/buffer.hpp>
  15. #include <boost/asio/error.hpp>
  16. #include <boost/asio/post.hpp>
  17. #include <boost/asio/prepend.hpp>
  18. #include <boost/asio/recycling_allocator.hpp>
  19. #include <boost/system/error_code.hpp>
  20. #include <algorithm>
  21. #include <cstdint>
  22. #include <utility>
  23. #include <vector>
  24. namespace boost::mqtt5::detail {
  25. namespace asio = boost::asio;
  26. class write_req {
  27. static constexpr unsigned SERIAL_BITS = sizeof(serial_num_t) * 8;
  28. asio::const_buffer _buffer;
  29. serial_num_t _serial_num;
  30. unsigned _flags;
  31. using handler_type = asio::any_completion_handler<void (error_code)>;
  32. handler_type _handler;
  33. public:
  34. write_req(
  35. asio::const_buffer buffer,
  36. serial_num_t serial_num, unsigned flags,
  37. handler_type handler
  38. ) :
  39. _buffer(buffer), _serial_num(serial_num), _flags(flags),
  40. _handler(std::move(handler))
  41. {}
  42. write_req(write_req&&) = default;
  43. write_req(const write_req&) = delete;
  44. write_req& operator=(write_req&&) = default;
  45. write_req& operator=(const write_req&) = delete;
  46. static serial_num_t next_serial_num(serial_num_t last) {
  47. return last + 1;
  48. }
  49. asio::const_buffer buffer() const {
  50. return _buffer;
  51. }
  52. void complete(error_code ec) {
  53. std::move(_handler)(ec);
  54. }
  55. void complete_post(const asio::any_io_executor& ex, error_code ec) {
  56. asio::post(
  57. ex,
  58. asio::prepend(std::move(_handler), ec)
  59. );
  60. }
  61. bool empty() const {
  62. return !_handler;
  63. }
  64. bool throttled() const {
  65. return _flags & send_flag::throttled;
  66. }
  67. bool terminal() const {
  68. return _flags & send_flag::terminal;
  69. }
  70. bool operator<(const write_req& other) const {
  71. if (prioritized() != other.prioritized())
  72. return prioritized();
  73. auto s1 = _serial_num;
  74. auto s2 = other._serial_num;
  75. if (s1 < s2)
  76. return (s2 - s1) < (1u << (SERIAL_BITS - 1));
  77. return (s1 - s2) >= (1u << (SERIAL_BITS - 1));
  78. }
  79. private:
  80. bool prioritized() const {
  81. return _flags & send_flag::prioritized;
  82. }
  83. };
  84. template <typename ClientService>
  85. class async_sender {
  86. using self_type = async_sender<ClientService>;
  87. using client_service = ClientService;
  88. using queue_allocator_type = asio::recycling_allocator<write_req>;
  89. using write_queue_t = std::vector<write_req, queue_allocator_type>;
  90. ClientService& _svc;
  91. write_queue_t _write_queue;
  92. bool _write_in_progress { false };
  93. static constexpr uint16_t MAX_LIMIT = 65535;
  94. uint16_t _limit { MAX_LIMIT };
  95. uint16_t _quota { MAX_LIMIT };
  96. serial_num_t _last_serial_num { 0 };
  97. public:
  98. explicit async_sender(ClientService& svc) : _svc(svc) {}
  99. async_sender(async_sender&&) = default;
  100. async_sender(const async_sender&) = delete;
  101. async_sender& operator=(async_sender&&) = default;
  102. async_sender& operator=(const async_sender&) = delete;
  103. using allocator_type = queue_allocator_type;
  104. allocator_type get_allocator() const noexcept {
  105. return allocator_type {};
  106. }
  107. using executor_type = typename client_service::executor_type;
  108. executor_type get_executor() const noexcept {
  109. return _svc.get_executor();
  110. }
  111. serial_num_t next_serial_num() {
  112. return _last_serial_num = write_req::next_serial_num(_last_serial_num);
  113. }
  114. template <typename CompletionToken, typename BufferType>
  115. decltype(auto) async_send(
  116. const BufferType& buffer,
  117. serial_num_t serial_num, unsigned flags,
  118. CompletionToken&& token
  119. ) {
  120. using Signature = void (error_code);
  121. auto initiation = [](
  122. auto handler, self_type& self, const BufferType& buffer,
  123. serial_num_t serial_num, unsigned flags
  124. ) {
  125. self._write_queue.emplace_back(
  126. asio::buffer(buffer), serial_num, flags, std::move(handler)
  127. );
  128. self.do_write();
  129. };
  130. return asio::async_initiate<CompletionToken, Signature>(
  131. initiation, token, std::ref(*this),
  132. buffer, serial_num, flags
  133. );
  134. }
  135. void cancel() {
  136. auto ops = std::move(_write_queue);
  137. for (auto& op : ops)
  138. op.complete_post(_svc.get_executor(), asio::error::operation_aborted);
  139. }
  140. void resend() {
  141. if (_write_in_progress)
  142. return;
  143. // The _write_in_progress flag is set to true to prevent any write
  144. // operations executing before the _write_queue is filled with
  145. // all the packets that require resending.
  146. _write_in_progress = true;
  147. auto new_limit = _svc._stream_context.connack_property(prop::receive_maximum);
  148. _limit = new_limit.value_or(MAX_LIMIT);
  149. _quota = _limit;
  150. auto write_queue = std::move(_write_queue);
  151. _svc._replies.resend_unanswered();
  152. for (auto& op : write_queue)
  153. op.complete(asio::error::try_again);
  154. std::stable_sort(_write_queue.begin(), _write_queue.end());
  155. _write_in_progress = false;
  156. do_write();
  157. }
  158. void operator()(write_queue_t write_queue, error_code ec, size_t) {
  159. _write_in_progress = false;
  160. if (ec == asio::error::try_again) {
  161. _svc.update_session_state();
  162. _write_queue.insert(
  163. _write_queue.begin(),
  164. std::make_move_iterator(write_queue.begin()),
  165. std::make_move_iterator(write_queue.end())
  166. );
  167. return resend();
  168. }
  169. if (ec == asio::error::no_recovery)
  170. _svc.cancel();
  171. // errors, if any, are propagated to ops
  172. for (auto& op : write_queue)
  173. op.complete(ec);
  174. if (
  175. ec == asio::error::operation_aborted ||
  176. ec == asio::error::no_recovery
  177. )
  178. return;
  179. do_write();
  180. }
  181. void throttled_op_done() {
  182. if (_limit == MAX_LIMIT)
  183. return;
  184. ++_quota;
  185. do_write();
  186. }
  187. private:
  188. void do_write() {
  189. if (_write_in_progress || _write_queue.empty())
  190. return;
  191. _write_in_progress = true;
  192. write_queue_t write_queue;
  193. auto terminal_req = std::find_if(
  194. _write_queue.begin(), _write_queue.end(),
  195. [](const auto& op) { return op.terminal(); }
  196. );
  197. if (terminal_req != _write_queue.end()) {
  198. write_queue.push_back(std::move(*terminal_req));
  199. _write_queue.erase(terminal_req);
  200. }
  201. else if (_limit == MAX_LIMIT) {
  202. write_queue = std::move(_write_queue);
  203. }
  204. else {
  205. for (write_req& req : _write_queue)
  206. if (!req.throttled())
  207. write_queue.push_back(std::move(req));
  208. else if (_quota > 0) {
  209. --_quota;
  210. write_queue.push_back(std::move(req));
  211. }
  212. if (write_queue.empty()) {
  213. _write_in_progress = false;
  214. return;
  215. }
  216. auto it = std::remove_if(
  217. _write_queue.begin(), _write_queue.end(),
  218. [](const write_req& req) { return req.empty(); }
  219. );
  220. _write_queue.erase(it, _write_queue.end());
  221. }
  222. std::vector<asio::const_buffer> buffers;
  223. buffers.reserve(write_queue.size());
  224. for (const auto& op : write_queue)
  225. buffers.push_back(op.buffer());
  226. _svc._replies.clear_fast_replies();
  227. _svc._stream.async_write(
  228. buffers,
  229. asio::prepend(std::ref(*this), std::move(write_queue))
  230. );
  231. }
  232. };
  233. } // end namespace boost::mqtt5::detail
  234. #endif // !BOOST_MQTT5_ASYNC_SENDER_HPP