publish_send_op.hpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_PUBLISH_SEND_OP_HPP
  8. #define BOOST_MQTT5_PUBLISH_SEND_OP_HPP
  9. #include <boost/mqtt5/error.hpp>
  10. #include <boost/mqtt5/reason_codes.hpp>
  11. #include <boost/mqtt5/types.hpp>
  12. #include <boost/mqtt5/detail/cancellable_handler.hpp>
  13. #include <boost/mqtt5/detail/control_packet.hpp>
  14. #include <boost/mqtt5/detail/internal_types.hpp>
  15. #include <boost/mqtt5/detail/topic_validation.hpp>
  16. #include <boost/mqtt5/detail/utf8_mqtt.hpp>
  17. #include <boost/mqtt5/impl/codecs/message_decoders.hpp>
  18. #include <boost/mqtt5/impl/codecs/message_encoders.hpp>
  19. #include <boost/mqtt5/impl/disconnect_op.hpp>
  20. #include <boost/asio/associated_allocator.hpp>
  21. #include <boost/asio/associated_executor.hpp>
  22. #include <boost/asio/cancellation_type.hpp>
  23. #include <boost/asio/detached.hpp>
  24. #include <boost/asio/error.hpp>
  25. #include <boost/asio/prepend.hpp>
  26. #include <cstdint>
  27. #include <memory>
  28. #include <string>
  29. #include <type_traits>
  30. namespace boost::mqtt5::detail {
  31. namespace asio = boost::asio;
  32. template <qos_e qos_type>
  33. using on_publish_signature = std::conditional_t<
  34. qos_type == qos_e::at_most_once,
  35. void (error_code),
  36. std::conditional_t<
  37. qos_type == qos_e::at_least_once,
  38. void (error_code, reason_code, puback_props),
  39. void (error_code, reason_code, pubcomp_props)
  40. >
  41. >;
  42. template <qos_e qos_type>
  43. using on_publish_props_type = std::conditional_t<
  44. qos_type == qos_e::at_most_once,
  45. void,
  46. std::conditional_t<
  47. qos_type == qos_e::at_least_once,
  48. puback_props,
  49. pubcomp_props
  50. >
  51. >;
  52. template <typename ClientService, typename Handler, qos_e qos_type>
  53. class publish_send_op {
  54. using client_service = ClientService;
  55. struct on_publish {};
  56. struct on_puback {};
  57. struct on_pubrec {};
  58. struct on_pubrel {};
  59. struct on_pubcomp {};
  60. std::shared_ptr<client_service> _svc_ptr;
  61. using handler_type = cancellable_handler<
  62. Handler,
  63. typename client_service::executor_type
  64. >;
  65. handler_type _handler;
  66. serial_num_t _serial_num;
  67. public:
  68. publish_send_op(
  69. std::shared_ptr<client_service> svc_ptr,
  70. Handler&& handler
  71. ) :
  72. _svc_ptr(std::move(svc_ptr)),
  73. _handler(std::move(handler), _svc_ptr->get_executor())
  74. {
  75. auto slot = asio::get_associated_cancellation_slot(_handler);
  76. if (slot.is_connected())
  77. slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
  78. svc.cancel();
  79. });
  80. }
  81. publish_send_op(publish_send_op&&) = default;
  82. publish_send_op(const publish_send_op&) = delete;
  83. publish_send_op& operator=(publish_send_op&&) = default;
  84. publish_send_op& operator=(const publish_send_op&) = delete;
  85. using allocator_type = asio::associated_allocator_t<handler_type>;
  86. allocator_type get_allocator() const noexcept {
  87. return asio::get_associated_allocator(_handler);
  88. }
  89. using executor_type = typename client_service::executor_type;
  90. executor_type get_executor() const noexcept {
  91. return _svc_ptr->get_executor();
  92. }
  93. void perform(
  94. std::string topic, std::string payload,
  95. retain_e retain, const publish_props& props
  96. ) {
  97. uint16_t packet_id = 0;
  98. if constexpr (qos_type != qos_e::at_most_once) {
  99. packet_id = _svc_ptr->allocate_pid();
  100. if (packet_id == 0)
  101. return complete_immediate(client::error::pid_overrun, packet_id);
  102. }
  103. auto ec = validate_publish(topic, payload, retain, props);
  104. if (ec)
  105. return complete_immediate(ec, packet_id);
  106. _serial_num = _svc_ptr->next_serial_num();
  107. auto publish = control_packet<allocator_type>::of(
  108. with_pid, get_allocator(),
  109. encoders::encode_publish, packet_id,
  110. std::move(topic), std::move(payload),
  111. qos_type, retain, dup_e::no, props
  112. );
  113. auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
  114. .value_or(default_max_send_size);
  115. if (publish.size() > max_packet_size)
  116. return complete_immediate(client::error::packet_too_large, packet_id);
  117. send_publish(std::move(publish));
  118. }
  119. void send_publish(control_packet<allocator_type> publish) {
  120. auto wire_data = publish.wire_data();
  121. _svc_ptr->async_send(
  122. wire_data,
  123. _serial_num,
  124. send_flag::throttled * (qos_type != qos_e::at_most_once),
  125. asio::prepend(std::move(*this), on_publish {}, std::move(publish))
  126. );
  127. }
  128. void resend_publish(control_packet<allocator_type> publish) {
  129. if (_handler.cancelled() != asio::cancellation_type_t::none)
  130. return complete(
  131. asio::error::operation_aborted, publish.packet_id()
  132. );
  133. send_publish(std::move(publish));
  134. }
  135. void operator()(
  136. on_publish, control_packet<allocator_type> publish,
  137. error_code ec
  138. ) {
  139. if (ec == asio::error::try_again)
  140. return resend_publish(std::move(publish));
  141. if constexpr (qos_type == qos_e::at_most_once)
  142. return complete(ec);
  143. else {
  144. auto packet_id = publish.packet_id();
  145. if (ec)
  146. return complete(ec, packet_id);
  147. if constexpr (qos_type == qos_e::at_least_once)
  148. _svc_ptr->async_wait_reply(
  149. control_code_e::puback, packet_id,
  150. asio::prepend(
  151. std::move(*this), on_puback {}, std::move(publish)
  152. )
  153. );
  154. else if constexpr (qos_type == qos_e::exactly_once)
  155. _svc_ptr->async_wait_reply(
  156. control_code_e::pubrec, packet_id,
  157. asio::prepend(
  158. std::move(*this), on_pubrec {}, std::move(publish)
  159. )
  160. );
  161. }
  162. }
  163. template <
  164. qos_e q = qos_type,
  165. std::enable_if_t<q == qos_e::at_least_once, bool> = true
  166. >
  167. void operator()(
  168. on_puback, control_packet<allocator_type> publish,
  169. error_code ec, byte_citer first, byte_citer last
  170. ) {
  171. if (ec == asio::error::try_again) // "resend unanswered"
  172. return resend_publish(std::move(publish.set_dup()));
  173. uint16_t packet_id = publish.packet_id();
  174. if (ec)
  175. return complete(ec, packet_id);
  176. auto puback = decoders::decode_puback(
  177. static_cast<uint32_t>(std::distance(first, last)), first
  178. );
  179. if (!puback.has_value()) {
  180. on_malformed_packet("Malformed PUBACK: cannot decode");
  181. return resend_publish(std::move(publish.set_dup()));
  182. }
  183. auto& [reason_code, props] = *puback;
  184. auto rc = to_reason_code<reason_codes::category::puback>(reason_code);
  185. if (!rc) {
  186. on_malformed_packet("Malformed PUBACK: invalid Reason Code");
  187. return resend_publish(std::move(publish.set_dup()));
  188. }
  189. complete(ec, packet_id, *rc, std::move(props));
  190. }
  191. template <
  192. qos_e q = qos_type,
  193. std::enable_if_t<q == qos_e::exactly_once, bool> = true
  194. >
  195. void operator()(
  196. on_pubrec, control_packet<allocator_type> publish,
  197. error_code ec, byte_citer first, byte_citer last
  198. ) {
  199. if (ec == asio::error::try_again) // "resend unanswered"
  200. return resend_publish(std::move(publish.set_dup()));
  201. uint16_t packet_id = publish.packet_id();
  202. if (ec)
  203. return complete(ec, packet_id);
  204. auto pubrec = decoders::decode_pubrec(
  205. static_cast<uint32_t>(std::distance(first, last)), first
  206. );
  207. if (!pubrec.has_value()) {
  208. on_malformed_packet("Malformed PUBREC: cannot decode");
  209. return resend_publish(std::move(publish.set_dup()));
  210. }
  211. auto& [reason_code, props] = *pubrec;
  212. auto rc = to_reason_code<reason_codes::category::pubrec>(reason_code);
  213. if (!rc) {
  214. on_malformed_packet("Malformed PUBREC: invalid Reason Code");
  215. return resend_publish(std::move(publish.set_dup()));
  216. }
  217. if (*rc)
  218. return complete(ec, packet_id, *rc);
  219. auto pubrel = control_packet<allocator_type>::of(
  220. with_pid, get_allocator(),
  221. encoders::encode_pubrel, packet_id,
  222. 0, pubrel_props {}
  223. );
  224. send_pubrel(std::move(pubrel), false);
  225. }
  226. void send_pubrel(control_packet<allocator_type> pubrel, bool throttled) {
  227. auto wire_data = pubrel.wire_data();
  228. _svc_ptr->async_send(
  229. wire_data,
  230. _serial_num,
  231. (send_flag::throttled * throttled) | send_flag::prioritized,
  232. asio::prepend(std::move(*this), on_pubrel {}, std::move(pubrel))
  233. );
  234. }
  235. template <
  236. qos_e q = qos_type,
  237. std::enable_if_t<q == qos_e::exactly_once, bool> = true
  238. >
  239. void operator()(
  240. on_pubrel, control_packet<allocator_type> pubrel, error_code ec
  241. ) {
  242. if (ec == asio::error::try_again)
  243. return send_pubrel(std::move(pubrel), true);
  244. uint16_t packet_id = pubrel.packet_id();
  245. if (ec)
  246. return complete(ec, packet_id);
  247. _svc_ptr->async_wait_reply(
  248. control_code_e::pubcomp, packet_id,
  249. asio::prepend(std::move(*this), on_pubcomp {}, std::move(pubrel))
  250. );
  251. }
  252. template <
  253. qos_e q = qos_type,
  254. std::enable_if_t<q == qos_e::exactly_once, bool> = true
  255. >
  256. void operator()(
  257. on_pubcomp, control_packet<allocator_type> pubrel,
  258. error_code ec,
  259. byte_citer first, byte_citer last
  260. ) {
  261. if (ec == asio::error::try_again) // "resend unanswered"
  262. return send_pubrel(std::move(pubrel), true);
  263. uint16_t packet_id = pubrel.packet_id();
  264. if (ec)
  265. return complete(ec, packet_id);
  266. auto pubcomp = decoders::decode_pubcomp(
  267. static_cast<uint32_t>(std::distance(first, last)), first
  268. );
  269. if (!pubcomp.has_value()) {
  270. on_malformed_packet("Malformed PUBCOMP: cannot decode");
  271. return send_pubrel(std::move(pubrel), true);
  272. }
  273. auto& [reason_code, props] = *pubcomp;
  274. auto rc = to_reason_code<reason_codes::category::pubcomp>(reason_code);
  275. if (!rc) {
  276. on_malformed_packet("Malformed PUBCOMP: invalid Reason Code");
  277. return send_pubrel(std::move(pubrel), true);
  278. }
  279. return complete(ec, pubrel.packet_id(), *rc);
  280. }
  281. private:
  282. error_code validate_publish(
  283. const std::string& topic, const std::string& payload,
  284. retain_e retain, const publish_props& props
  285. ) const {
  286. constexpr uint8_t default_retain_available = 1;
  287. constexpr uint8_t default_maximum_qos = 2;
  288. constexpr uint8_t default_payload_format_ind = 0;
  289. auto topic_name_valid = props[prop::topic_alias].has_value() ?
  290. validate_topic_alias_name(topic) == validation_result::valid :
  291. validate_topic_name(topic) == validation_result::valid
  292. ;
  293. if (!topic_name_valid)
  294. return client::error::invalid_topic;
  295. auto max_qos = _svc_ptr->connack_property(prop::maximum_qos)
  296. .value_or(default_maximum_qos);
  297. auto retain_available = _svc_ptr->connack_property(prop::retain_available)
  298. .value_or(default_retain_available);
  299. if (uint8_t(qos_type) > max_qos)
  300. return client::error::qos_not_supported;
  301. if (retain_available == 0 && retain == retain_e::yes)
  302. return client::error::retain_not_available;
  303. auto payload_format_ind = props[prop::payload_format_indicator]
  304. .value_or(default_payload_format_ind);
  305. if (
  306. payload_format_ind == 1 &&
  307. validate_mqtt_utf8(payload) != validation_result::valid
  308. )
  309. return client::error::malformed_packet;
  310. return validate_props(props);
  311. }
  312. error_code validate_props(const publish_props& props) const {
  313. constexpr uint16_t default_topic_alias_max = 0;
  314. const auto& topic_alias = props[prop::topic_alias];
  315. if (topic_alias) {
  316. auto topic_alias_max = _svc_ptr->connack_property(prop::topic_alias_maximum)
  317. .value_or(default_topic_alias_max);
  318. if (topic_alias_max == 0 || *topic_alias > topic_alias_max)
  319. return client::error::topic_alias_maximum_reached;
  320. if (*topic_alias == 0 )
  321. return client::error::malformed_packet;
  322. }
  323. const auto& response_topic = props[prop::response_topic];
  324. if (
  325. response_topic &&
  326. validate_topic_name(*response_topic) != validation_result::valid
  327. )
  328. return client::error::malformed_packet;
  329. const auto& user_properties = props[prop::user_property];
  330. for (const auto& user_property: user_properties)
  331. if (!is_valid_string_pair(user_property))
  332. return client::error::malformed_packet;
  333. if (!props[prop::subscription_identifier].empty())
  334. return client::error::malformed_packet;
  335. const auto& content_type = props[prop::content_type];
  336. if (
  337. content_type &&
  338. validate_mqtt_utf8(*content_type) != validation_result::valid
  339. )
  340. return client::error::malformed_packet;
  341. return error_code {};
  342. }
  343. void on_malformed_packet(const std::string& reason) {
  344. auto props = disconnect_props {};
  345. props[prop::reason_string] = reason;
  346. async_disconnect(
  347. disconnect_rc_e::malformed_packet, props, _svc_ptr,
  348. asio::detached
  349. );
  350. }
  351. template <
  352. qos_e q = qos_type,
  353. std::enable_if_t<q == qos_e::at_most_once, bool> = true
  354. >
  355. void complete(error_code ec, uint16_t = 0) {
  356. _handler.complete(ec);
  357. }
  358. template <
  359. qos_e q = qos_type,
  360. std::enable_if_t<q == qos_e::at_most_once, bool> = true
  361. >
  362. void complete_immediate(error_code ec, uint16_t) {
  363. _handler.complete_immediate(ec);
  364. }
  365. template <
  366. typename Props = on_publish_props_type<qos_type>,
  367. std::enable_if_t<
  368. std::is_same_v<Props, puback_props> ||
  369. std::is_same_v<Props, pubcomp_props>,
  370. bool
  371. > = true
  372. >
  373. void complete(
  374. error_code ec, uint16_t packet_id,
  375. reason_code rc = reason_codes::empty, Props&& props = Props {}
  376. ) {
  377. _svc_ptr->free_pid(packet_id, true);
  378. _handler.complete(ec, rc, std::forward<Props>(props));
  379. }
  380. template <
  381. typename Props = on_publish_props_type<qos_type>,
  382. std::enable_if_t<
  383. std::is_same_v<Props, puback_props> ||
  384. std::is_same_v<Props, pubcomp_props>,
  385. bool
  386. > = true
  387. >
  388. void complete_immediate(error_code ec, uint16_t packet_id) {
  389. if (packet_id != 0)
  390. _svc_ptr->free_pid(packet_id, false);
  391. _handler.complete_immediate(ec, reason_codes::empty, Props {});
  392. }
  393. };
  394. template <typename ClientService, qos_e qos_type>
  395. class initiate_async_publish {
  396. std::shared_ptr<ClientService> _svc_ptr;
  397. public:
  398. explicit initiate_async_publish(std::shared_ptr<ClientService> svc_ptr) :
  399. _svc_ptr(std::move(svc_ptr))
  400. {}
  401. using executor_type = typename ClientService::executor_type;
  402. executor_type get_executor() const noexcept {
  403. return _svc_ptr->get_executor();
  404. }
  405. template <typename Handler>
  406. void operator()(
  407. Handler&& handler,
  408. std::string topic, std::string payload,
  409. retain_e retain, const publish_props& props
  410. ) {
  411. detail::publish_send_op<ClientService, Handler, qos_type> {
  412. _svc_ptr, std::move(handler)
  413. }.perform(
  414. std::move(topic), std::move(payload), retain, props
  415. );
  416. }
  417. };
  418. } // end namespace boost::mqtt5::detail
  419. #endif // !BOOST_MQTT5_PUBLISH_SEND_OP_HPP