publish_rec_op.hpp 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_PUBLISH_REC_OP_HPP
  8. #define BOOST_MQTT5_PUBLISH_REC_OP_HPP
  9. #include <boost/mqtt5/error.hpp>
  10. #include <boost/mqtt5/property_types.hpp>
  11. #include <boost/mqtt5/reason_codes.hpp>
  12. #include <boost/mqtt5/types.hpp>
  13. #include <boost/mqtt5/detail/control_packet.hpp>
  14. #include <boost/mqtt5/detail/internal_types.hpp>
  15. #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
  16. #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
  17. #include <boost/mqtt5/impl/disconnect_op.hpp>
  18. #include <boost/asio/consign.hpp>
  19. #include <boost/asio/detached.hpp>
  20. #include <boost/asio/prepend.hpp>
  21. #include <boost/asio/recycling_allocator.hpp>
  22. #include <cstdint>
  23. #include <memory>
  24. #include <string>
  25. namespace boost::mqtt5::detail {
  26. namespace asio = boost::asio;
  27. template <typename ClientService>
  28. class publish_rec_op {
  29. using client_service = ClientService;
  30. struct on_puback {};
  31. struct on_pubrec {};
  32. struct on_pubrel {};
  33. struct on_pubcomp {};
  34. std::shared_ptr<client_service> _svc_ptr;
  35. decoders::publish_message _message;
  36. public:
  37. explicit publish_rec_op(std::shared_ptr<client_service> svc_ptr) :
  38. _svc_ptr(std::move(svc_ptr))
  39. {}
  40. publish_rec_op(publish_rec_op&&) noexcept = default;
  41. publish_rec_op(const publish_rec_op&) = delete;
  42. publish_rec_op& operator=(publish_rec_op&&) noexcept = default;
  43. publish_rec_op& operator=(const publish_rec_op&) = delete;
  44. using allocator_type = asio::recycling_allocator<void>;
  45. allocator_type get_allocator() const noexcept {
  46. return allocator_type {};
  47. }
  48. using executor_type = typename client_service::executor_type;
  49. executor_type get_executor() const noexcept {
  50. return _svc_ptr->get_executor();
  51. }
  52. void perform(decoders::publish_message message) {
  53. auto flags = std::get<2>(message);
  54. auto qos_bits = (flags >> 1) & 0b11;
  55. if (qos_bits == 0b11)
  56. return on_malformed_packet(
  57. "Malformed PUBLISH received: QoS bits set to 0b11"
  58. );
  59. auto qos = qos_e(qos_bits);
  60. _message = std::move(message);
  61. if (qos == qos_e::at_most_once)
  62. return complete();
  63. auto packet_id = std::get<1>(_message);
  64. if (qos == qos_e::at_least_once) {
  65. auto puback = control_packet<allocator_type>::of(
  66. with_pid, get_allocator(),
  67. encoders::encode_puback, *packet_id,
  68. uint8_t(0), puback_props {}
  69. );
  70. return send_puback(std::move(puback));
  71. }
  72. // qos == qos_e::exactly_once
  73. auto pubrec = control_packet<allocator_type>::of(
  74. with_pid, get_allocator(),
  75. encoders::encode_pubrec, *packet_id,
  76. uint8_t(0), pubrec_props {}
  77. );
  78. return send_pubrec(std::move(pubrec));
  79. }
  80. void send_puback(control_packet<allocator_type> puback) {
  81. auto wire_data = puback.wire_data();
  82. _svc_ptr->async_send(
  83. wire_data,
  84. no_serial, send_flag::none,
  85. asio::consign(
  86. asio::prepend(std::move(*this), on_puback {}),
  87. std::move(puback)
  88. )
  89. );
  90. }
  91. void operator()(on_puback, error_code ec) {
  92. if (ec)
  93. return;
  94. complete();
  95. }
  96. void send_pubrec(control_packet<allocator_type> pubrec) {
  97. auto wire_data = pubrec.wire_data();
  98. _svc_ptr->async_send(
  99. wire_data,
  100. no_serial, send_flag::none,
  101. asio::prepend(std::move(*this), on_pubrec {}, std::move(pubrec))
  102. );
  103. }
  104. void operator()(
  105. on_pubrec, control_packet<allocator_type> packet,
  106. error_code ec
  107. ) {
  108. if (ec)
  109. return;
  110. wait_pubrel(packet.packet_id());
  111. }
  112. void wait_pubrel(uint16_t packet_id) {
  113. _svc_ptr->async_wait_reply(
  114. control_code_e::pubrel, packet_id,
  115. asio::prepend(std::move(*this), on_pubrel {}, packet_id)
  116. );
  117. }
  118. void operator()(
  119. on_pubrel, uint16_t packet_id,
  120. error_code ec, byte_citer first, byte_citer last
  121. ) {
  122. if (ec == asio::error::try_again) // "resend unanswered"
  123. return wait_pubrel(packet_id);
  124. if (ec)
  125. return;
  126. auto pubrel = decoders::decode_pubrel(static_cast<uint32_t>(std::distance(first, last)), first);
  127. if (!pubrel.has_value()) {
  128. on_malformed_packet("Malformed PUBREL received: cannot decode");
  129. return wait_pubrel(packet_id);
  130. }
  131. auto& [reason_code, props] = *pubrel;
  132. auto rc = to_reason_code<reason_codes::category::pubrel>(reason_code);
  133. if (!rc) {
  134. on_malformed_packet("Malformed PUBREL received: invalid Reason Code");
  135. return wait_pubrel(packet_id);
  136. }
  137. auto pubcomp = control_packet<allocator_type>::of(
  138. with_pid, get_allocator(),
  139. encoders::encode_pubcomp, packet_id,
  140. uint8_t(0), pubcomp_props{}
  141. );
  142. send_pubcomp(std::move(pubcomp));
  143. }
  144. void send_pubcomp(control_packet<allocator_type> pubcomp) {
  145. auto wire_data = pubcomp.wire_data();
  146. _svc_ptr->async_send(
  147. wire_data,
  148. no_serial, send_flag::none,
  149. asio::prepend(std::move(*this), on_pubcomp {}, std::move(pubcomp))
  150. );
  151. }
  152. void operator()(
  153. on_pubcomp, control_packet<allocator_type> packet,
  154. error_code ec
  155. ) {
  156. if (ec == asio::error::try_again)
  157. return wait_pubrel(packet.packet_id());
  158. if (ec)
  159. return;
  160. complete();
  161. }
  162. private:
  163. void on_malformed_packet(const std::string& reason) {
  164. auto props = disconnect_props {};
  165. props[prop::reason_string] = reason;
  166. return async_disconnect(
  167. disconnect_rc_e::malformed_packet, props,
  168. _svc_ptr, asio::detached
  169. );
  170. }
  171. void complete() {
  172. /* auto rv = */_svc_ptr->channel_store(std::move(_message));
  173. }
  174. };
  175. } // end namespace boost::mqtt5::detail
  176. #endif // !BOOST_MQTT5_PUBLISH_REC_OP_HPP