connect_fsm.ipp 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. //
  2. // Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
  3. // Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  4. //
  5. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  6. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  7. //
  8. #include <boost/redis/config.hpp>
  9. #include <boost/redis/detail/connect_fsm.hpp>
  10. #include <boost/redis/detail/coroutine.hpp>
  11. #include <boost/redis/error.hpp>
  12. #include <boost/redis/impl/log_utils.hpp>
  13. #include <boost/asio/cancellation_type.hpp>
  14. #include <boost/asio/error.hpp>
  15. #include <boost/asio/ip/tcp.hpp>
  16. #include <boost/assert.hpp>
  17. #include <string>
  18. namespace boost::redis::detail {
  19. // Logging
  20. inline void format_tcp_endpoint(const asio::ip::tcp::endpoint& ep, std::string& to)
  21. {
  22. // This formatting is inspired by Asio's endpoint operator<<
  23. const auto& addr = ep.address();
  24. if (addr.is_v6())
  25. to += '[';
  26. to += addr.to_string();
  27. if (addr.is_v6())
  28. to += ']';
  29. to += ':';
  30. to += std::to_string(ep.port());
  31. }
  32. template <>
  33. struct log_traits<asio::ip::tcp::endpoint> {
  34. static inline void log(std::string& to, const asio::ip::tcp::endpoint& value)
  35. {
  36. format_tcp_endpoint(value, to);
  37. }
  38. };
  39. template <>
  40. struct log_traits<asio::ip::tcp::resolver::results_type> {
  41. static inline void log(std::string& to, const asio::ip::tcp::resolver::results_type& value)
  42. {
  43. auto iter = value.cbegin();
  44. auto end = value.cend();
  45. if (iter != end) {
  46. format_tcp_endpoint(iter->endpoint(), to);
  47. ++iter;
  48. for (; iter != end; ++iter) {
  49. to += ", ";
  50. format_tcp_endpoint(iter->endpoint(), to);
  51. }
  52. }
  53. }
  54. };
  55. inline transport_type transport_from_config(const config& cfg)
  56. {
  57. if (cfg.unix_socket.empty()) {
  58. if (cfg.use_ssl) {
  59. return transport_type::tcp_tls;
  60. } else {
  61. return transport_type::tcp;
  62. }
  63. } else {
  64. BOOST_ASSERT(!cfg.use_ssl);
  65. return transport_type::unix_socket;
  66. }
  67. }
  68. inline system::error_code translate_timeout_error(
  69. system::error_code io_ec,
  70. asio::cancellation_type_t cancel_state,
  71. error code_if_cancelled)
  72. {
  73. // Translates cancellations and timeout errors into a single error_code.
  74. // - Cancellation state set, and an I/O error: the entire operation was cancelled.
  75. // The I/O code (probably operation_aborted) is appropriate.
  76. // - Cancellation state set, and no I/O error: same as above, but the cancellation
  77. // arrived after the operation completed and before the handler was called. Set the code here.
  78. // - No cancellation state set, I/O error set to operation_aborted: since we use cancel_after,
  79. // this means a timeout.
  80. // - Otherwise, respect the I/O error.
  81. if ((cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none) {
  82. return io_ec ? io_ec : asio::error::operation_aborted;
  83. }
  84. return io_ec == asio::error::operation_aborted ? code_if_cancelled : io_ec;
  85. }
  86. connect_action connect_fsm::resume(
  87. system::error_code ec,
  88. const asio::ip::tcp::resolver::results_type& resolver_results,
  89. redis_stream_state& st,
  90. asio::cancellation_type_t cancel_state)
  91. {
  92. // Translate error codes
  93. ec = translate_timeout_error(ec, cancel_state, error::resolve_timeout);
  94. // Log it
  95. if (ec) {
  96. log_info(*lgr_, "Error resolving the server hostname: ", ec);
  97. } else {
  98. log_info(*lgr_, "Resolve results: ", resolver_results);
  99. }
  100. // Delegate to the regular resume function
  101. return resume(ec, st, cancel_state);
  102. }
  103. connect_action connect_fsm::resume(
  104. system::error_code ec,
  105. const asio::ip::tcp::endpoint& selected_endpoint,
  106. redis_stream_state& st,
  107. asio::cancellation_type_t cancel_state)
  108. {
  109. // Translate error codes
  110. ec = translate_timeout_error(ec, cancel_state, error::connect_timeout);
  111. // Log it
  112. if (ec) {
  113. log_info(*lgr_, "Failed to connect to the server: ", ec);
  114. } else {
  115. log_info(*lgr_, "Connected to ", selected_endpoint);
  116. }
  117. // Delegate to the regular resume function
  118. return resume(ec, st, cancel_state);
  119. }
  120. connect_action connect_fsm::resume(
  121. system::error_code ec,
  122. redis_stream_state& st,
  123. asio::cancellation_type_t cancel_state)
  124. {
  125. switch (resume_point_) {
  126. BOOST_REDIS_CORO_INITIAL
  127. // Record the transport that we will be using
  128. st.type = transport_from_config(*cfg_);
  129. if (st.type == transport_type::unix_socket) {
  130. // Reset the socket, to discard any previous state. Ignore any errors
  131. BOOST_REDIS_YIELD(resume_point_, 1, connect_action_type::unix_socket_close)
  132. // Connect to the socket
  133. BOOST_REDIS_YIELD(resume_point_, 2, connect_action_type::unix_socket_connect)
  134. // Fix error codes. If we were cancelled and the code is operation_aborted,
  135. // it is because per-operation cancellation was activated. If we were not cancelled
  136. // but the operation failed with operation_aborted, it's a timeout.
  137. // Also check for cancellations that didn't cause a failure
  138. ec = translate_timeout_error(ec, cancel_state, error::connect_timeout);
  139. // Log it
  140. if (ec) {
  141. log_info(*lgr_, "Failed to connect to the server: ", ec);
  142. } else {
  143. log_info(*lgr_, "Connected to ", cfg_->unix_socket);
  144. }
  145. // If this failed, we can't continue
  146. if (ec) {
  147. return ec;
  148. }
  149. // Done
  150. return system::error_code();
  151. } else {
  152. // ssl::stream doesn't support being re-used. If we're to use
  153. // TLS and the stream has been used, re-create it.
  154. // Must be done before anything else is done on the stream.
  155. // We don't need to close the TCP socket if using plaintext TCP
  156. // because range-connect closes open sockets, while individual connect doesn't
  157. if (cfg_->use_ssl && st.ssl_stream_used) {
  158. BOOST_REDIS_YIELD(resume_point_, 3, connect_action_type::ssl_stream_reset)
  159. }
  160. // Resolve names. The continuation needs access to the returned
  161. // endpoints, and is a specialized resume() that will call this function
  162. BOOST_REDIS_YIELD(resume_point_, 4, connect_action_type::tcp_resolve)
  163. // If this failed, we can't continue (error code translation already performed here)
  164. if (ec) {
  165. return ec;
  166. }
  167. // Now connect to the endpoints returned by the resolver.
  168. // This has a specialized resume(), too
  169. BOOST_REDIS_YIELD(resume_point_, 5, connect_action_type::tcp_connect)
  170. // If this failed, we can't continue (error code translation already performed here)
  171. if (ec) {
  172. return ec;
  173. }
  174. if (cfg_->use_ssl) {
  175. // Mark the SSL stream as used
  176. st.ssl_stream_used = true;
  177. // Perform the TLS handshake
  178. BOOST_REDIS_YIELD(resume_point_, 6, connect_action_type::ssl_handshake)
  179. // Translate error codes
  180. ec = translate_timeout_error(ec, cancel_state, error::ssl_handshake_timeout);
  181. // Log it
  182. if (ec) {
  183. log_info(*lgr_, "Failed to perform SSL handshake: ", ec);
  184. } else {
  185. log_info(*lgr_, "Successfully performed SSL handshake");
  186. }
  187. // If this failed, we can't continue
  188. if (ec) {
  189. return ec;
  190. }
  191. }
  192. // Done
  193. return system::error_code();
  194. }
  195. }
  196. BOOST_ASSERT(false);
  197. return system::error_code();
  198. }
  199. } // namespace boost::redis::detail