read_message_op.hpp 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_READ_MESSAGE_OP_HPP
  8. #define BOOST_MQTT5_READ_MESSAGE_OP_HPP
  9. #include <boost/mqtt5/reason_codes.hpp>
  10. #include <boost/mqtt5/types.hpp>
  11. #include <boost/mqtt5/detail/control_packet.hpp>
  12. #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
  13. #include <boost/mqtt5/impl/disconnect_op.hpp>
  14. #include <boost/mqtt5/impl/publish_rec_op.hpp>
  15. #include <boost/mqtt5/impl/re_auth_op.hpp>
  16. #include <boost/asio/error.hpp>
  17. #include <boost/asio/prepend.hpp>
  18. #include <boost/asio/recycling_allocator.hpp>
  19. #include <boost/assert.hpp>
  20. #include <cstdint>
  21. #include <memory>
  22. namespace boost::mqtt5::detail {
  23. namespace asio = boost::asio;
  24. template <typename ClientService, typename Handler>
  25. class read_message_op {
  26. using client_service = ClientService;
  27. using handler_type = Handler;
  28. struct on_message {};
  29. struct on_disconnect {};
  30. std::shared_ptr<client_service> _svc_ptr;
  31. handler_type _handler;
  32. public:
  33. read_message_op(std::shared_ptr<client_service> svc_ptr, Handler&& handler)
  34. : _svc_ptr(std::move(svc_ptr)), _handler(std::move(handler))
  35. {}
  36. read_message_op(read_message_op&&) noexcept = default;
  37. read_message_op(const read_message_op&) = delete;
  38. read_message_op& operator=(read_message_op&&) noexcept = default;
  39. read_message_op& operator=(const read_message_op&) = delete;
  40. using allocator_type = asio::associated_allocator_t<handler_type>;
  41. allocator_type get_allocator() const noexcept {
  42. return asio::get_associated_allocator(_handler);
  43. }
  44. using executor_type = typename client_service::executor_type;
  45. executor_type get_executor() const noexcept {
  46. return _svc_ptr->get_executor();
  47. }
  48. void perform() {
  49. _svc_ptr->async_assemble(
  50. asio::prepend(std::move(*this), on_message {})
  51. );
  52. }
  53. void operator()(
  54. on_message, error_code ec,
  55. uint8_t control_code,
  56. byte_citer first, byte_citer last
  57. ) {
  58. if (ec == client::error::malformed_packet)
  59. return on_malformed_packet(
  60. disconnect_rc_e::malformed_packet,
  61. "Malformed Packet received from the Server"
  62. );
  63. else if (ec == client::error::packet_too_large)
  64. return on_malformed_packet(
  65. disconnect_rc_e::packet_too_large,
  66. "The packet size is greater than Maximum Packet Size"
  67. );
  68. if (ec == asio::error::no_recovery)
  69. _svc_ptr->cancel();
  70. if (ec)
  71. return complete();
  72. dispatch(control_code, first, last);
  73. }
  74. void operator()(on_disconnect, error_code ec) {
  75. if (ec)
  76. return complete();
  77. perform();
  78. }
  79. private:
  80. void dispatch(
  81. uint8_t control_byte,
  82. byte_citer first, byte_citer last
  83. ) {
  84. auto code = control_code_e(control_byte & 0b11110000);
  85. switch (code) {
  86. case control_code_e::publish: {
  87. auto msg = decoders::decode_publish(
  88. control_byte, static_cast<uint32_t>(std::distance(first, last)), first
  89. );
  90. if (!msg.has_value())
  91. return on_malformed_packet(
  92. disconnect_rc_e::malformed_packet,
  93. "Malformed PUBLISH received: cannot decode"
  94. );
  95. publish_rec_op { _svc_ptr }.perform(std::move(*msg));
  96. }
  97. break;
  98. case control_code_e::disconnect: {
  99. auto rv = decoders::decode_disconnect(
  100. static_cast<uint32_t>(std::distance(first, last)), first
  101. );
  102. if (!rv.has_value())
  103. return on_malformed_packet(
  104. disconnect_rc_e::malformed_packet,
  105. "Malformed DISCONNECT received: cannot decode"
  106. );
  107. const auto& [rc, props] = *rv;
  108. _svc_ptr->log().at_disconnect(
  109. to_reason_code<reason_codes::category::disconnect>(rc)
  110. .value_or(reason_codes::unspecified_error),
  111. props
  112. );
  113. return _svc_ptr->async_shutdown(
  114. asio::prepend(std::move(*this), on_disconnect {})
  115. );
  116. }
  117. break;
  118. case control_code_e::auth: {
  119. auto rv = decoders::decode_auth(
  120. static_cast<uint32_t>(std::distance(first, last)), first
  121. );
  122. if (!rv.has_value())
  123. return on_malformed_packet(
  124. disconnect_rc_e::malformed_packet,
  125. "Malformed AUTH received: cannot decode"
  126. );
  127. re_auth_op { _svc_ptr }.perform(std::move(*rv));
  128. }
  129. break;
  130. default:
  131. BOOST_ASSERT(false);
  132. }
  133. perform();
  134. }
  135. void on_malformed_packet(disconnect_rc_e rc, const std::string& reason) {
  136. auto props = disconnect_props {};
  137. props[prop::reason_string] = reason;
  138. auto svc_ptr = _svc_ptr; // copy before this is moved
  139. async_disconnect(
  140. rc, props, svc_ptr,
  141. asio::prepend(std::move(*this), on_disconnect {})
  142. );
  143. }
  144. void complete() {
  145. return std::move(_handler)();
  146. }
  147. };
  148. } // end namespace boost::mqtt5::detail
  149. #endif // !BOOST_MQTT5_READ_MESSAGE_OP_HPP