reconnect_op.hpp 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_RECONNECT_OP_HPP
  8. #define BOOST_MQTT5_RECONNECT_OP_HPP
  9. #include <boost/mqtt5/types.hpp>
  10. #include <boost/mqtt5/detail/async_traits.hpp>
  11. #include <boost/mqtt5/impl/connect_op.hpp>
  12. #include <boost/asio/any_completion_handler.hpp>
  13. #include <boost/asio/associated_allocator.hpp>
  14. #include <boost/asio/associated_cancellation_slot.hpp>
  15. #include <boost/asio/associated_executor.hpp>
  16. #include <boost/asio/async_result.hpp>
  17. #include <boost/asio/deferred.hpp>
  18. #include <boost/asio/dispatch.hpp>
  19. #include <boost/asio/error.hpp>
  20. #include <boost/asio/experimental/parallel_group.hpp>
  21. #include <boost/asio/ip/tcp.hpp>
  22. #include <boost/asio/prepend.hpp>
  23. #include <boost/random/linear_congruential.hpp>
  24. #include <boost/random/uniform_smallint.hpp>
  25. #include <array>
  26. #include <chrono>
  27. #include <memory>
  28. #include <string>
  29. namespace boost::mqtt5::detail {
  30. class exponential_backoff {
  31. int _curr_exp { 0 };
  32. static constexpr int _base_mulptilier = 1000;
  33. static constexpr int _max_exp = 4;
  34. // sizeof(_generator) = 8
  35. boost::random::rand48 _generator { uint32_t(std::time(0)) };
  36. boost::random::uniform_smallint<> _distribution { -500, 500 };
  37. public:
  38. exponential_backoff() = default;
  39. duration generate() {
  40. int exponent = _curr_exp < _max_exp ? _curr_exp++ : _max_exp;
  41. int base = 1 << exponent;
  42. return std::chrono::milliseconds(
  43. base * _base_mulptilier + _distribution(_generator) /* noise */
  44. );
  45. }
  46. };
  47. namespace asio = boost::asio;
  48. template <typename Owner>
  49. class reconnect_op {
  50. struct on_locked {};
  51. struct on_next_endpoint {};
  52. struct on_connect {};
  53. struct on_backoff {};
  54. Owner& _owner;
  55. using handler_type = asio::any_completion_handler<void (error_code)>;
  56. handler_type _handler;
  57. std::unique_ptr<std::string> _buffer_ptr;
  58. exponential_backoff _generator;
  59. using endpoint = asio::ip::tcp::endpoint;
  60. using epoints = asio::ip::tcp::resolver::results_type;
  61. public:
  62. template <typename Handler>
  63. reconnect_op(Owner& owner, Handler&& handler) :
  64. _owner(owner), _handler(std::move(handler))
  65. {}
  66. reconnect_op(reconnect_op&&) = default;
  67. reconnect_op(const reconnect_op&) = delete;
  68. reconnect_op& operator=(reconnect_op&&) = default;
  69. reconnect_op& operator=(const reconnect_op&) = delete;
  70. using allocator_type = asio::associated_allocator_t<handler_type>;
  71. allocator_type get_allocator() const noexcept {
  72. return asio::get_associated_allocator(_handler);
  73. }
  74. using cancellation_slot_type =
  75. asio::associated_cancellation_slot_t<handler_type>;
  76. cancellation_slot_type get_cancellation_slot() const noexcept {
  77. return asio::get_associated_cancellation_slot(_handler);
  78. }
  79. using executor_type = asio::associated_executor_t<handler_type>;
  80. executor_type get_executor() const noexcept {
  81. return asio::get_associated_executor(_handler);
  82. }
  83. void perform(typename Owner::stream_ptr s) {
  84. _owner._conn_mtx.lock(
  85. asio::prepend(std::move(*this), on_locked {}, s)
  86. );
  87. }
  88. void operator()(on_locked, typename Owner::stream_ptr s, error_code ec) {
  89. if (ec == asio::error::operation_aborted)
  90. // cancelled without acquiring the lock (by calling client.cancel())
  91. return std::move(_handler)(ec);
  92. if (!_owner.is_open())
  93. return complete(asio::error::operation_aborted);
  94. if (s != _owner._stream_ptr)
  95. return complete(asio::error::try_again);
  96. do_reconnect();
  97. }
  98. void do_reconnect() {
  99. _owner._endpoints.async_next_endpoint(
  100. asio::prepend(std::move(*this), on_next_endpoint {})
  101. );
  102. }
  103. void backoff_and_reconnect() {
  104. _owner._connect_timer.expires_after(_generator.generate());
  105. _owner._connect_timer.async_wait(
  106. asio::prepend(std::move(*this), on_backoff {})
  107. );
  108. }
  109. void operator()(on_backoff, error_code ec) {
  110. if (ec == asio::error::operation_aborted || !_owner.is_open())
  111. return complete(asio::error::operation_aborted);
  112. do_reconnect();
  113. }
  114. void operator()(
  115. on_next_endpoint, error_code ec,
  116. epoints eps, authority_path ap
  117. ) {
  118. // the three error codes below are the only possible codes
  119. // that may be returned from async_next_endpont
  120. if (ec == asio::error::operation_aborted || !_owner.is_open())
  121. return complete(asio::error::operation_aborted);
  122. if (ec == asio::error::try_again)
  123. return backoff_and_reconnect();
  124. if (ec == asio::error::host_not_found)
  125. return complete(asio::error::no_recovery);
  126. connect(eps.cbegin(), std::move(ap));
  127. }
  128. void connect(epoints::const_iterator eps, authority_path ap) {
  129. namespace asioex = boost::asio::experimental;
  130. const auto& ep = eps->endpoint();
  131. auto sptr = _owner.construct_and_open_next_layer(ep.protocol());
  132. if constexpr (has_tls_context<typename Owner::stream_context_type>)
  133. setup_tls_sni(
  134. ap, _owner._stream_context.tls_context(), *sptr
  135. );
  136. // wait max 5 seconds for the connect (handshake) op to finish
  137. _owner._connect_timer.expires_after(std::chrono::seconds(5));
  138. auto init_connect = [](
  139. auto handler, typename Owner::stream_type& stream,
  140. mqtt_ctx& context, log_invoke<typename Owner::logger_type>& log,
  141. endpoint ep, authority_path ap
  142. ) {
  143. connect_op { stream, context, log, std::move(handler) }
  144. .perform(ep, std::move(ap));
  145. };
  146. auto timed_connect = asioex::make_parallel_group(
  147. asio::async_initiate<const asio::deferred_t, void(error_code)>(
  148. init_connect, asio::deferred, std::ref(*sptr),
  149. std::ref(_owner._stream_context.mqtt_context()),
  150. std::ref(_owner.log()),
  151. ep, ap
  152. ),
  153. _owner._connect_timer.async_wait(asio::deferred)
  154. );
  155. timed_connect.async_wait(
  156. asioex::wait_for_one(),
  157. asio::prepend(
  158. std::move(*this), on_connect {},
  159. std::move(sptr), std::move(eps), std::move(ap)
  160. )
  161. );
  162. }
  163. void operator()(
  164. on_connect,
  165. typename Owner::stream_ptr sptr, epoints::const_iterator eps, authority_path ap,
  166. std::array<std::size_t, 2> ord,
  167. error_code connect_ec, error_code timer_ec
  168. ) {
  169. // connect_ec may be any of:
  170. // 1) async_connect error codes
  171. // 2) async_handshake (TLS) error codes
  172. // 3) async_handshake (WebSocket) error codes
  173. // 4) async_write error codes
  174. // 5) async_read error codes
  175. // 5) client::error::malformed_packet
  176. if (
  177. (ord[0] == 0 && connect_ec == asio::error::operation_aborted) ||
  178. (ord[0] == 1 && timer_ec == asio::error::operation_aborted) ||
  179. !_owner.is_open()
  180. )
  181. return complete(asio::error::operation_aborted);
  182. // retry for operation timed out and any other error_code or client::error::malformed_packet
  183. if (ord[0] == 1 || connect_ec) {
  184. // if the hostname resolved into more endpoints, try the next one
  185. if (++eps != epoints::const_iterator())
  186. return connect(std::move(eps), std::move(ap));
  187. // try next server
  188. return do_reconnect();
  189. }
  190. _owner.replace_next_layer(std::move(sptr));
  191. complete(error_code {});
  192. }
  193. private:
  194. void complete(error_code ec) {
  195. _owner._conn_mtx.unlock();
  196. std::move(_handler)(ec);
  197. }
  198. };
  199. } // end namespace boost::mqtt5::detail
  200. #endif // !BOOST_MQTT5_RECONNECT_OP_HPP