autoconnect_stream.hpp 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_AUTOCONNECT_STREAM_HPP
  8. #define BOOST_MQTT5_AUTOCONNECT_STREAM_HPP
  9. #include <boost/mqtt5/detail/async_mutex.hpp>
  10. #include <boost/mqtt5/detail/async_traits.hpp>
  11. #include <boost/mqtt5/detail/log_invoke.hpp>
  12. #include <boost/mqtt5/impl/endpoints.hpp>
  13. #include <boost/mqtt5/impl/read_op.hpp>
  14. #include <boost/mqtt5/impl/reconnect_op.hpp>
  15. #include <boost/mqtt5/impl/shutdown_op.hpp>
  16. #include <boost/mqtt5/impl/write_op.hpp>
  17. #include <boost/asio/async_result.hpp>
  18. #include <boost/asio/ip/tcp.hpp>
  19. #include <boost/asio/steady_timer.hpp>
  20. #include <boost/system/error_code.hpp>
  21. #include <cstdint>
  22. #include <memory>
  23. #include <string>
  24. #include <variant> // std::monostate
  25. namespace boost::mqtt5::detail {
  26. namespace asio = boost::asio;
  27. using error_code = boost::system::error_code;
  28. template <
  29. typename StreamType,
  30. typename StreamContext = std::monostate,
  31. typename LoggerType = noop_logger
  32. >
  33. class autoconnect_stream {
  34. public:
  35. using self_type = autoconnect_stream<StreamType, StreamContext, LoggerType>;
  36. using stream_type = StreamType;
  37. using stream_context_type = StreamContext;
  38. using logger_type = LoggerType;
  39. using executor_type = typename stream_type::executor_type;
  40. private:
  41. using stream_ptr = std::shared_ptr<stream_type>;
  42. executor_type _stream_executor;
  43. async_mutex _conn_mtx;
  44. asio::steady_timer _read_timer, _connect_timer;
  45. endpoints<logger_type> _endpoints;
  46. stream_ptr _stream_ptr;
  47. stream_context_type& _stream_context;
  48. log_invoke<logger_type>& _log;
  49. template <typename Owner, typename Handler>
  50. friend class read_op;
  51. template <typename Owner, typename Handler>
  52. friend class write_op;
  53. template <typename Owner>
  54. friend class reconnect_op;
  55. template <typename Owner>
  56. friend class shutdown_op;
  57. public:
  58. autoconnect_stream(
  59. const executor_type& ex, stream_context_type& context,
  60. log_invoke<logger_type>& log
  61. ) :
  62. _stream_executor(ex),
  63. _conn_mtx(_stream_executor),
  64. _read_timer(_stream_executor), _connect_timer(_stream_executor),
  65. _endpoints(_stream_executor, _connect_timer, log),
  66. _stream_context(context),
  67. _log(log)
  68. {
  69. replace_next_layer(construct_next_layer());
  70. }
  71. autoconnect_stream(const autoconnect_stream&) = delete;
  72. autoconnect_stream& operator=(const autoconnect_stream&) = delete;
  73. using next_layer_type = stream_type;
  74. next_layer_type& next_layer() {
  75. return *_stream_ptr;
  76. }
  77. const next_layer_type& next_layer() const {
  78. return *_stream_ptr;
  79. }
  80. executor_type get_executor() const noexcept {
  81. return _stream_executor;
  82. }
  83. void brokers(std::string hosts, uint16_t default_port) {
  84. _endpoints.brokers(std::move(hosts), default_port);
  85. }
  86. void clone_endpoints(const autoconnect_stream& other) {
  87. _endpoints.clone_servers(other._endpoints);
  88. }
  89. bool is_open() const noexcept {
  90. return lowest_layer(*_stream_ptr).is_open();
  91. }
  92. void open() {
  93. open_lowest_layer(_stream_ptr, asio::ip::tcp::v4());
  94. }
  95. void cancel() {
  96. _conn_mtx.cancel();
  97. _connect_timer.cancel();
  98. }
  99. void close() {
  100. error_code ec;
  101. lowest_layer(*_stream_ptr).close(ec);
  102. }
  103. template <typename CompletionToken>
  104. void async_shutdown(CompletionToken&& token) {
  105. using Signature = void (error_code);
  106. auto initiation = [](auto handler, self_type& self) {
  107. shutdown_op { self, std::move(handler) }.perform();
  108. };
  109. return asio::async_initiate<CompletionToken, Signature>(
  110. initiation, token, std::ref(*this)
  111. );
  112. }
  113. bool was_connected() const {
  114. error_code ec;
  115. lowest_layer(*_stream_ptr).remote_endpoint(ec);
  116. return ec == boost::system::errc::success;
  117. }
  118. template <typename BufferType, typename CompletionToken>
  119. decltype(auto) async_read_some(
  120. const BufferType& buffer, duration wait_for, CompletionToken&& token
  121. ) {
  122. using Signature = void (error_code, size_t);
  123. auto initiation = [](
  124. auto handler, self_type& self,
  125. const BufferType& buffer, duration wait_for
  126. ) {
  127. read_op { self, std::move(handler) }.perform(buffer, wait_for);
  128. };
  129. return asio::async_initiate<CompletionToken, Signature>(
  130. initiation, token, std::ref(*this), buffer, wait_for
  131. );
  132. }
  133. template <typename BufferType, typename CompletionToken>
  134. decltype(auto) async_write(
  135. const BufferType& buffer, CompletionToken&& token
  136. ) {
  137. using Signature = void (error_code, size_t);
  138. auto initiation = [](
  139. auto handler, self_type& self, const BufferType& buffer
  140. ) {
  141. write_op { self, std::move(handler) }.perform(buffer);
  142. };
  143. return asio::async_initiate<CompletionToken, Signature>(
  144. initiation, token, std::ref(*this), buffer
  145. );
  146. }
  147. private:
  148. log_invoke<logger_type>& log() {
  149. return _log;
  150. }
  151. static void open_lowest_layer(const stream_ptr& sptr, asio::ip::tcp protocol) {
  152. error_code ec;
  153. auto& layer = lowest_layer(*sptr);
  154. layer.open(protocol, ec);
  155. layer.set_option(asio::socket_base::reuse_address(true), ec);
  156. layer.set_option(asio::ip::tcp::no_delay(true), ec);
  157. }
  158. stream_ptr construct_next_layer() const {
  159. stream_ptr sptr;
  160. if constexpr (has_tls_context<StreamContext>)
  161. sptr = std::make_shared<stream_type>(
  162. _stream_executor, _stream_context.tls_context()
  163. );
  164. else
  165. sptr = std::make_shared<stream_type>(_stream_executor);
  166. return sptr;
  167. }
  168. stream_ptr construct_and_open_next_layer(asio::ip::tcp protocol) const {
  169. auto sptr = construct_next_layer();
  170. open_lowest_layer(sptr, protocol);
  171. return sptr;
  172. }
  173. void replace_next_layer(stream_ptr sptr) {
  174. // close() will cancel all outstanding async operations on
  175. // _stream_ptr; cancelling posts operation_aborted to handlers
  176. // but handlers will be executed after std::exchange below;
  177. // handlers should therefore treat (operation_aborted && is_open())
  178. // equivalent to try_again.
  179. if (_stream_ptr)
  180. close();
  181. std::exchange(_stream_ptr, std::move(sptr));
  182. }
  183. template <typename CompletionToken>
  184. decltype(auto) async_reconnect(stream_ptr s, CompletionToken&& token) {
  185. using Signature = void (error_code);
  186. auto initiation = [](auto handler, self_type& self, stream_ptr s) {
  187. reconnect_op { self, std::move(handler) }.perform(std::move(s));
  188. };
  189. return asio::async_initiate<CompletionToken, Signature>(
  190. initiation, token, std::ref(*this), std::move(s)
  191. );
  192. }
  193. };
  194. } // end namespace boost::mqtt5::detail
  195. #endif // !BOOST_MQTT5_AUTOCONNECT_STREAM_HPP