run_op.hpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_RUN_OP_HPP
  8. #define BOOST_MQTT5_RUN_OP_HPP
  9. #include <boost/mqtt5/detail/cancellable_handler.hpp>
  10. #include <boost/mqtt5/detail/control_packet.hpp>
  11. #include <boost/mqtt5/detail/internal_types.hpp>
  12. #include <boost/mqtt5/impl/ping_op.hpp>
  13. #include <boost/mqtt5/impl/read_message_op.hpp>
  14. #include <boost/mqtt5/impl/sentry_op.hpp>
  15. #include <boost/asio/associated_allocator.hpp>
  16. #include <boost/asio/associated_cancellation_slot.hpp>
  17. #include <boost/asio/associated_executor.hpp>
  18. #include <boost/asio/experimental/parallel_group.hpp>
  19. #include <memory>
  20. namespace boost::mqtt5::detail {
  21. namespace asio = boost::asio;
  22. template <typename ClientService, typename Handler>
  23. class run_op {
  24. using client_service = ClientService;
  25. std::shared_ptr<client_service> _svc_ptr;
  26. using handler_type = cancellable_handler<
  27. Handler,
  28. typename client_service::executor_type
  29. >;
  30. handler_type _handler;
  31. public:
  32. run_op(
  33. std::shared_ptr<client_service> svc_ptr,
  34. Handler&& handler
  35. ) :
  36. _svc_ptr(std::move(svc_ptr)),
  37. _handler(std::move(handler), _svc_ptr->get_executor())
  38. {
  39. auto slot = asio::get_associated_cancellation_slot(_handler);
  40. if (slot.is_connected())
  41. slot.assign([&svc = *_svc_ptr](asio::cancellation_type_t) {
  42. svc.cancel();
  43. });
  44. }
  45. run_op(run_op&&) = default;
  46. run_op(const run_op&) = delete;
  47. run_op& operator=(run_op&&) = default;
  48. run_op& operator=(const run_op&) = delete;
  49. using allocator_type = asio::associated_allocator_t<handler_type>;
  50. allocator_type get_allocator() const noexcept {
  51. return asio::get_associated_allocator(_handler);
  52. }
  53. using executor_type = typename client_service::executor_type;
  54. executor_type get_executor() const noexcept {
  55. return _svc_ptr->get_executor();
  56. }
  57. void perform() {
  58. namespace asioex = boost::asio::experimental;
  59. _svc_ptr->_stream.open();
  60. _svc_ptr->_rec_channel.reset();
  61. auto init_read_message_op = [](
  62. auto handler, std::shared_ptr<client_service> svc_ptr
  63. ) {
  64. return read_message_op { std::move(svc_ptr), std::move(handler) }
  65. .perform();
  66. };
  67. auto init_ping_op = [](
  68. auto handler, std::shared_ptr<client_service> svc_ptr
  69. ) {
  70. return ping_op { std::move(svc_ptr), std::move(handler) }
  71. .perform();
  72. };
  73. auto init_senty_op = [](
  74. auto handler, std::shared_ptr<client_service> svc_ptr
  75. ) {
  76. return sentry_op { std::move(svc_ptr), std::move(handler) }
  77. .perform();
  78. };
  79. asioex::make_parallel_group(
  80. asio::async_initiate<const asio::deferred_t, void ()>(
  81. init_read_message_op, asio::deferred, _svc_ptr
  82. ),
  83. asio::async_initiate<const asio::deferred_t, void ()>(
  84. init_ping_op, asio::deferred, _svc_ptr
  85. ),
  86. asio::async_initiate<const asio::deferred_t, void ()>(
  87. init_senty_op, asio::deferred, _svc_ptr
  88. )
  89. ).async_wait(asioex::wait_for_all(), std::move(*this));
  90. }
  91. void operator()(std::array<std::size_t, 3> /* ord */) {
  92. _handler.complete(make_error_code(asio::error::operation_aborted));
  93. }
  94. };
  95. template <typename ClientService>
  96. class initiate_async_run {
  97. std::shared_ptr<ClientService> _svc_ptr;
  98. public:
  99. explicit initiate_async_run(std::shared_ptr<ClientService> svc_ptr) :
  100. _svc_ptr(std::move(svc_ptr))
  101. {}
  102. using executor_type = typename ClientService::executor_type;
  103. executor_type get_executor() const noexcept {
  104. return _svc_ptr->get_executor();
  105. }
  106. template <typename Handler>
  107. void operator()(Handler&& handler) {
  108. run_op<ClientService, Handler> {
  109. _svc_ptr, std::move(handler)
  110. }.perform();
  111. }
  112. };
  113. } // end namespace boost::mqtt5::detail
  114. #endif // !BOOST_MQTT5_RUN_OP_HPP