endpoints.hpp 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_ENDPOINTS_HPP
  8. #define BOOST_MQTT5_ENDPOINTS_HPP
  9. #include <boost/mqtt5/detail/log_invoke.hpp>
  10. #include <boost/mqtt5/detail/internal_types.hpp>
  11. #include <boost/asio/append.hpp>
  12. #include <boost/asio/associated_allocator.hpp>
  13. #include <boost/asio/associated_cancellation_slot.hpp>
  14. #include <boost/asio/associated_executor.hpp>
  15. #include <boost/asio/async_result.hpp>
  16. #include <boost/asio/deferred.hpp>
  17. #include <boost/asio/error.hpp>
  18. #include <boost/asio/experimental/parallel_group.hpp>
  19. #include <boost/asio/ip/tcp.hpp>
  20. #include <boost/asio/post.hpp>
  21. #include <boost/asio/prepend.hpp>
  22. #include <array>
  23. #include <chrono>
  24. #include <string>
  25. namespace boost::mqtt5::detail {
  26. namespace asio = boost::asio;
  27. using epoints = asio::ip::tcp::resolver::results_type;
  28. template <typename Owner, typename Handler>
  29. class resolve_op {
  30. struct on_resolve {};
  31. Owner& _owner;
  32. using handler_type = Handler;
  33. handler_type _handler;
  34. public:
  35. resolve_op(Owner& owner, Handler&& handler) :
  36. _owner(owner), _handler(std::move(handler))
  37. {}
  38. resolve_op(resolve_op&&) = default;
  39. resolve_op(const resolve_op&) = delete;
  40. resolve_op& operator=(resolve_op&&) = default;
  41. resolve_op& operator=(const resolve_op&) = delete;
  42. using allocator_type = asio::associated_allocator_t<handler_type>;
  43. allocator_type get_allocator() const noexcept {
  44. return asio::get_associated_allocator(_handler);
  45. }
  46. using cancellation_slot_type =
  47. asio::associated_cancellation_slot_t<handler_type>;
  48. cancellation_slot_type get_cancellation_slot() const noexcept {
  49. return asio::get_associated_cancellation_slot(_handler);
  50. }
  51. using executor_type = asio::associated_executor_t<handler_type>;
  52. executor_type get_executor() const noexcept {
  53. return asio::get_associated_executor(_handler);
  54. }
  55. void perform() {
  56. namespace asioex = boost::asio::experimental;
  57. if (_owner._servers.empty())
  58. return complete_post(asio::error::host_not_found, {}, {});
  59. _owner._current_host++;
  60. if (_owner._current_host + 1 > static_cast<int>(_owner._servers.size())) {
  61. _owner._current_host = -1;
  62. return complete_post(asio::error::try_again, {}, {});
  63. }
  64. authority_path ap = _owner._servers[_owner._current_host];
  65. _owner._connect_timer.expires_after(std::chrono::seconds(5));
  66. auto timed_resolve = asioex::make_parallel_group(
  67. _owner._resolver.async_resolve(ap.host, ap.port, asio::deferred),
  68. _owner._connect_timer.async_wait(asio::deferred)
  69. );
  70. timed_resolve.async_wait(
  71. asioex::wait_for_one(),
  72. asio::append(
  73. asio::prepend(std::move(*this), on_resolve {}),
  74. std::move(ap)
  75. )
  76. );
  77. }
  78. void operator()(
  79. on_resolve, std::array<std::size_t, 2> ord,
  80. error_code resolve_ec, epoints epts,
  81. error_code timer_ec, authority_path ap
  82. ) {
  83. if (
  84. (ord[0] == 0 && resolve_ec == asio::error::operation_aborted) ||
  85. (ord[0] == 1 && timer_ec == asio::error::operation_aborted)
  86. )
  87. return complete(asio::error::operation_aborted, {}, {});
  88. resolve_ec = timer_ec ? resolve_ec : asio::error::timed_out;
  89. _owner._log.at_resolve(resolve_ec, ap.host, ap.port, epts);
  90. if (!resolve_ec)
  91. return complete(error_code {}, std::move(epts), std::move(ap));
  92. perform();
  93. }
  94. private:
  95. void complete(error_code ec, epoints eps, authority_path ap) {
  96. std::move(_handler)(ec, std::move(eps), std::move(ap));
  97. }
  98. void complete_post(error_code ec, epoints eps, authority_path ap) {
  99. asio::post(
  100. _owner.get_executor(),
  101. asio::prepend(
  102. std::move(_handler), ec,
  103. std::move(eps), std::move(ap)
  104. )
  105. );
  106. }
  107. };
  108. template <typename LoggerType>
  109. class endpoints {
  110. using logger_type = LoggerType;
  111. asio::ip::tcp::resolver _resolver;
  112. asio::steady_timer& _connect_timer;
  113. std::vector<authority_path> _servers;
  114. int _current_host { -1 };
  115. log_invoke<logger_type>& _log;
  116. template <typename Owner, typename Handler>
  117. friend class resolve_op;
  118. public:
  119. template <typename Executor>
  120. endpoints(
  121. Executor ex, asio::steady_timer& timer,
  122. log_invoke<logger_type>& log
  123. ) :
  124. _resolver(std::move(ex)), _connect_timer(timer),
  125. _log(log)
  126. {}
  127. endpoints(const endpoints&) = delete;
  128. endpoints& operator=(const endpoints&) = delete;
  129. void clone_servers(const endpoints& other) {
  130. _servers = other._servers;
  131. }
  132. using executor_type = asio::ip::tcp::resolver::executor_type;
  133. // NOTE: asio::ip::basic_resolver returns executor by value
  134. executor_type get_executor() noexcept {
  135. return _resolver.get_executor();
  136. }
  137. template <typename CompletionToken>
  138. decltype(auto) async_next_endpoint(CompletionToken&& token) {
  139. using Signature = void (error_code, epoints, authority_path);
  140. auto initiation = [](auto handler, endpoints& self) {
  141. resolve_op { self, std::move(handler) }.perform();
  142. };
  143. return asio::async_initiate<CompletionToken, Signature>(
  144. initiation, token, std::ref(*this)
  145. );
  146. }
  147. void brokers(std::string hosts, uint16_t default_port) {
  148. _servers.clear();
  149. // loosely based on RFC 3986
  150. for (auto it = hosts.cbegin(), end = hosts.cend(); it != end;) {
  151. skip_spaces(it, end);
  152. auto host = std::string(match_while("A-Za-z0-9._~-", it, end));
  153. if (host.empty()) break;
  154. std::string port;
  155. if (it < end && *it == ':') {
  156. port = std::string(match_while("0-9", ++it, end));
  157. if (port.empty()) break;
  158. }
  159. std::string path;
  160. if (it < end && *it == '/')
  161. path = std::string(match_while("/A-Za-z0-9._~-", it, end));
  162. _servers.push_back({
  163. std::move(host),
  164. port.empty()
  165. ? std::to_string(default_port)
  166. : std::move(port),
  167. std::move(path)
  168. });
  169. skip_spaces(it, end);
  170. if (it == end || *it++ != ',') break;
  171. }
  172. }
  173. private:
  174. static constexpr void skip_spaces(
  175. byte_citer& it, const byte_citer& end
  176. ) {
  177. match_while(" \t\r\n\f\v", it, end);
  178. };
  179. static constexpr std::string_view match_while(
  180. const char* chs, byte_citer& it, const byte_citer& end
  181. ) {
  182. if (it == end)
  183. return {};
  184. auto beg = it;
  185. while (it < end && is_any_of(chs, *it)) ++it;
  186. return { &*beg, static_cast<size_t>(it - beg) };
  187. }
  188. static constexpr bool is_any_of(const char* chs, char c) {
  189. while (*chs)
  190. if (*(chs + 1) == '-' && *(chs + 2)) {
  191. if (c >= *chs && c <= *(chs + 2))
  192. return true;
  193. chs += 3;
  194. }
  195. else {
  196. if (c == *chs)
  197. return true;
  198. ++chs;
  199. }
  200. return false;
  201. }
  202. };
  203. } // end namespace boost::mqtt5::detail
  204. #endif // !BOOST_MQTT5_ENDPOINTS_HPP