assemble_op.hpp 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_ASSEMBLE_OP_HPP
  8. #define BOOST_MQTT5_ASSEMBLE_OP_HPP
  9. #include <boost/mqtt5/error.hpp>
  10. #include <boost/mqtt5/detail/control_packet.hpp>
  11. #include <boost/mqtt5/detail/internal_types.hpp>
  12. #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
  13. #include <boost/asio/append.hpp>
  14. #include <boost/asio/associated_allocator.hpp>
  15. #include <boost/asio/buffer.hpp>
  16. #include <boost/asio/completion_condition.hpp>
  17. #include <boost/asio/post.hpp>
  18. #include <boost/asio/prepend.hpp>
  19. #include <boost/assert.hpp>
  20. #include <boost/system/error_code.hpp>
  21. #include <chrono>
  22. #include <cstdint>
  23. #include <string>
  24. #include <utility>
  25. namespace boost::mqtt5::detail {
  26. namespace asio = boost::asio;
  27. class data_span : private std::pair<byte_citer, byte_citer> {
  28. using base = std::pair<byte_citer, byte_citer>;
  29. public:
  30. using base::base;
  31. auto first() const {
  32. return base::first;
  33. }
  34. auto last() const {
  35. return base::second;
  36. }
  37. void expand_suffix(size_t num_chars) {
  38. base::second += num_chars;
  39. }
  40. void remove_prefix(size_t num_chars) {
  41. base::first += num_chars;
  42. }
  43. size_t size() const {
  44. return std::distance(base::first, base::second);
  45. }
  46. };
  47. template <typename ClientService, typename Handler>
  48. class assemble_op {
  49. using client_service = ClientService;
  50. using handler_type = Handler;
  51. struct on_read {};
  52. client_service& _svc;
  53. handler_type _handler;
  54. std::string& _read_buff;
  55. data_span& _data_span;
  56. public:
  57. assemble_op(
  58. client_service& svc, handler_type&& handler,
  59. std::string& read_buff, data_span& active_span
  60. ) :
  61. _svc(svc),
  62. _handler(std::move(handler)),
  63. _read_buff(read_buff), _data_span(active_span)
  64. {}
  65. assemble_op(assemble_op&&) noexcept = default;
  66. assemble_op(const assemble_op&) = delete;
  67. assemble_op& operator=(assemble_op&&) noexcept = default;
  68. assemble_op& operator=(const assemble_op&) = delete;
  69. using allocator_type = asio::associated_allocator_t<handler_type>;
  70. allocator_type get_allocator() const noexcept {
  71. return asio::get_associated_allocator(_handler);
  72. }
  73. using executor_type = asio::associated_executor_t<handler_type>;
  74. executor_type get_executor() const noexcept {
  75. return asio::get_associated_executor(_handler);
  76. }
  77. template <typename CompletionCondition>
  78. void perform(CompletionCondition cc) {
  79. if (cc(error_code {}, 0) == 0 && _data_span.size()) {
  80. return asio::post(
  81. _svc.get_executor(),
  82. asio::prepend(
  83. std::move(*this), on_read {}, error_code {},
  84. 0, std::move(cc)
  85. )
  86. );
  87. }
  88. prepare_buffer(1);
  89. // Must be evaluated before this is moved
  90. auto store_begin = _read_buff.data()
  91. + std::distance(_read_buff.cbegin(), _data_span.last());
  92. auto store_size = std::distance(_data_span.last(), _read_buff.cend());
  93. _svc._stream.async_read_some(
  94. asio::buffer(store_begin, store_size), compute_read_timeout(),
  95. asio::prepend(
  96. asio::append(std::move(*this), std::move(cc)),
  97. on_read {}
  98. )
  99. );
  100. }
  101. template <typename CompletionCondition>
  102. void operator()(
  103. on_read, error_code ec, size_t bytes_read,
  104. CompletionCondition cc
  105. ) {
  106. if (ec == asio::error::try_again) {
  107. _svc.update_session_state();
  108. _svc._async_sender.resend();
  109. _data_span = { _read_buff.cbegin(), _read_buff.cbegin() };
  110. return perform(std::move(cc));
  111. }
  112. if (ec)
  113. return complete(ec, 0, {}, {});
  114. _data_span.expand_suffix(bytes_read);
  115. BOOST_ASSERT(_data_span.size());
  116. auto control_byte = uint8_t(*_data_span.first());
  117. if ((control_byte & 0b11110000) == 0)
  118. // close the connection, cancel
  119. return complete(client::error::malformed_packet, 0, {}, {});
  120. auto first = _data_span.first() + 1;
  121. auto varlen = decoders::type_parse(
  122. first, _data_span.last(), decoders::basic::varint_
  123. );
  124. if (!varlen) {
  125. if (_data_span.size() < 5)
  126. return perform(asio::transfer_at_least(1));
  127. return complete(client::error::malformed_packet, 0, {}, {});
  128. }
  129. if (
  130. static_cast<uint32_t>(*varlen)
  131. > max_recv_size() - std::distance(_data_span.first(), first)
  132. )
  133. return complete(client::error::packet_too_large, 0, {}, {});
  134. if (std::distance(first, _data_span.last()) < *varlen) {
  135. prepare_buffer(*varlen - std::distance(first, _data_span.last()));
  136. return perform(asio::transfer_at_least(1));
  137. }
  138. _data_span.remove_prefix(
  139. std::distance(_data_span.first(), first) + *varlen
  140. );
  141. dispatch(control_byte, first, first + *varlen);
  142. }
  143. private:
  144. void prepare_buffer(std::ptrdiff_t extra_len) {
  145. if (std::distance(_data_span.last(), _read_buff.cend()) >= extra_len)
  146. return;
  147. // make room for the packet by erasing bytes we already parsed from the
  148. // beginning of the read buffer
  149. const auto data_span_size = _data_span.size();
  150. _read_buff.erase(_read_buff.cbegin(), _data_span.first());
  151. _read_buff.resize(max_recv_size());
  152. _data_span = {
  153. _read_buff.cbegin(),
  154. _read_buff.cbegin() + data_span_size
  155. };
  156. }
  157. uint32_t max_recv_size() const {
  158. return (std::min)(
  159. _svc.connect_property(prop::maximum_packet_size)
  160. .value_or(default_max_recv_size),
  161. static_cast<uint32_t>(default_max_send_size)
  162. );
  163. }
  164. duration compute_read_timeout() const {
  165. auto negotiated_ka = _svc.negotiated_keep_alive();
  166. return negotiated_ka ?
  167. std::chrono::milliseconds(3 * negotiated_ka * 1000 / 2) :
  168. duration((std::numeric_limits<duration::rep>::max)());
  169. }
  170. static bool valid_header(uint8_t control_byte) {
  171. auto code = control_code_e(control_byte & 0b11110000);
  172. if (code == control_code_e::publish)
  173. return true;
  174. auto res = control_byte & 0b00001111;
  175. if (code == control_code_e::pubrel)
  176. return res == 0b00000010;
  177. return res == 0b00000000;
  178. }
  179. void dispatch(
  180. uint8_t control_byte, byte_citer first, byte_citer last
  181. ) {
  182. using namespace decoders;
  183. if (!valid_header(control_byte))
  184. return complete(client::error::malformed_packet, 0, {}, {});
  185. auto code = control_code_e(control_byte & 0b11110000);
  186. if (code == control_code_e::pingresp)
  187. return perform(asio::transfer_at_least(0));
  188. bool is_reply = code != control_code_e::publish &&
  189. code != control_code_e::auth &&
  190. code != control_code_e::disconnect;
  191. if (is_reply) {
  192. auto packet_id = decoders::decode_packet_id(first).value();
  193. _svc._replies.dispatch(error_code {}, code, packet_id, first, last);
  194. return perform(asio::transfer_at_least(0));
  195. }
  196. complete(error_code {}, control_byte, first, last);
  197. }
  198. void complete(
  199. error_code ec, uint8_t control_code,
  200. byte_citer first, byte_citer last
  201. ) {
  202. if (ec)
  203. _data_span = { _read_buff.cbegin(), _read_buff.cbegin() };
  204. std::move(_handler)(ec, control_code, first, last);
  205. }
  206. };
  207. } // end namespace boost::mqtt5::detail
  208. #endif // !BOOST_MQTT5_ASSEMBLE_OP_HPP