run_fsm.ipp 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. //
  2. // Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
  3. // Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  4. //
  5. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  6. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  7. //
  8. #include <boost/redis/adapter/any_adapter.hpp>
  9. #include <boost/redis/config.hpp>
  10. #include <boost/redis/detail/connection_state.hpp>
  11. #include <boost/redis/detail/coroutine.hpp>
  12. #include <boost/redis/detail/multiplexer.hpp>
  13. #include <boost/redis/detail/run_fsm.hpp>
  14. #include <boost/redis/impl/is_terminal_cancel.hpp>
  15. #include <boost/redis/impl/log_utils.hpp>
  16. #include <boost/redis/impl/setup_request_utils.hpp>
  17. #include <boost/asio/cancellation_type.hpp>
  18. #include <boost/asio/error.hpp>
  19. #include <boost/asio/local/basic_endpoint.hpp> // for BOOST_ASIO_HAS_LOCAL_SOCKETS
  20. #include <boost/system/error_code.hpp>
  21. namespace boost::redis::detail {
  22. inline system::error_code check_config(const config& cfg)
  23. {
  24. if (!cfg.unix_socket.empty()) {
  25. if (cfg.use_ssl)
  26. return error::unix_sockets_ssl_unsupported;
  27. #ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS
  28. return error::unix_sockets_unsupported;
  29. #endif
  30. }
  31. return system::error_code{};
  32. }
  33. inline void compose_ping_request(const config& cfg, request& to)
  34. {
  35. to.clear();
  36. to.push("PING", cfg.health_check_id);
  37. }
  38. inline void process_setup_node(
  39. connection_state& st,
  40. resp3::basic_node<std::string_view> const& nd,
  41. system::error_code& ec)
  42. {
  43. switch (nd.data_type) {
  44. case resp3::type::simple_error:
  45. case resp3::type::blob_error:
  46. case resp3::type::null:
  47. ec = redis::error::resp3_hello;
  48. st.setup_diagnostic = nd.value;
  49. break;
  50. default:;
  51. }
  52. }
  53. inline any_adapter make_setup_adapter(connection_state& st)
  54. {
  55. return any_adapter{
  56. [&st](any_adapter::parse_event evt, resp3::node_view const& nd, system::error_code& ec) {
  57. if (evt == any_adapter::parse_event::node)
  58. process_setup_node(st, nd, ec);
  59. }};
  60. }
  61. inline void on_setup_done(const multiplexer::elem& elm, connection_state& st)
  62. {
  63. const auto ec = elm.get_error();
  64. if (ec) {
  65. if (st.setup_diagnostic.empty()) {
  66. log_info(st.logger, "Setup request execution: ", ec);
  67. } else {
  68. log_info(st.logger, "Setup request execution: ", ec, " (", st.setup_diagnostic, ")");
  69. }
  70. } else {
  71. log_info(st.logger, "Setup request execution: success");
  72. }
  73. }
  74. run_action run_fsm::resume(
  75. connection_state& st,
  76. system::error_code ec,
  77. asio::cancellation_type_t cancel_state)
  78. {
  79. switch (resume_point_) {
  80. BOOST_REDIS_CORO_INITIAL
  81. // Check config
  82. ec = check_config(st.cfg);
  83. if (ec) {
  84. log_err(st.logger, "Invalid configuration: ", ec);
  85. stored_ec_ = ec;
  86. BOOST_REDIS_YIELD(resume_point_, 1, run_action_type::immediate)
  87. return stored_ec_;
  88. }
  89. // Compose the setup request. This only depends on the config, so it can be done just once
  90. compose_setup_request(st.cfg);
  91. // Compose the PING request. Same as above
  92. compose_ping_request(st.cfg, st.ping_req);
  93. for (;;) {
  94. // Try to connect
  95. BOOST_REDIS_YIELD(resume_point_, 2, run_action_type::connect)
  96. // Check for cancellations
  97. if (is_terminal_cancel(cancel_state)) {
  98. log_debug(st.logger, "Run: cancelled (1)");
  99. return system::error_code(asio::error::operation_aborted);
  100. }
  101. // If we were successful, run all the connection tasks
  102. if (!ec) {
  103. // Initialization
  104. st.mpx.reset();
  105. st.setup_diagnostic.clear();
  106. // Add the setup request to the multiplexer
  107. if (st.cfg.setup.get_commands() != 0u) {
  108. auto elm = make_elem(st.cfg.setup, make_setup_adapter(st));
  109. elm->set_done_callback([&elem_ref = *elm, &st] {
  110. on_setup_done(elem_ref, st);
  111. });
  112. st.mpx.add(elm);
  113. }
  114. // Run the tasks
  115. BOOST_REDIS_YIELD(resume_point_, 3, run_action_type::parallel_group)
  116. // Store any error yielded by the tasks for later
  117. stored_ec_ = ec;
  118. // We've lost connection or otherwise been cancelled.
  119. // Remove from the multiplexer the required requests.
  120. st.mpx.cancel_on_conn_lost();
  121. // The receive operation must be cancelled because channel
  122. // subscription does not survive a reconnection but requires
  123. // re-subscription.
  124. BOOST_REDIS_YIELD(resume_point_, 4, run_action_type::cancel_receive)
  125. // Restore the error
  126. ec = stored_ec_;
  127. }
  128. // Check for cancellations
  129. if (is_terminal_cancel(cancel_state)) {
  130. log_debug(st.logger, "Run: cancelled (2)");
  131. return system::error_code(asio::error::operation_aborted);
  132. }
  133. // If we are not going to try again, we're done
  134. if (st.cfg.reconnect_wait_interval.count() == 0) {
  135. return ec;
  136. }
  137. // Wait for the reconnection interval
  138. BOOST_REDIS_YIELD(resume_point_, 5, run_action_type::wait_for_reconnection)
  139. // Check for cancellations
  140. if (is_terminal_cancel(cancel_state)) {
  141. log_debug(st.logger, "Run: cancelled (3)");
  142. return system::error_code(asio::error::operation_aborted);
  143. }
  144. }
  145. }
  146. // We should never get here
  147. BOOST_ASSERT(false);
  148. return system::error_code();
  149. }
  150. } // namespace boost::redis::detail