writer_fsm.ipp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. //
  2. // Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
  3. // Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  4. //
  5. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  6. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  7. //
  8. #ifndef BOOST_REDIS_WRITER_FSM_IPP
  9. #define BOOST_REDIS_WRITER_FSM_IPP
  10. #include <boost/redis/adapter/any_adapter.hpp>
  11. #include <boost/redis/detail/connection_state.hpp>
  12. #include <boost/redis/detail/coroutine.hpp>
  13. #include <boost/redis/detail/multiplexer.hpp>
  14. #include <boost/redis/detail/writer_fsm.hpp>
  15. #include <boost/redis/impl/is_terminal_cancel.hpp>
  16. #include <boost/redis/impl/log_utils.hpp>
  17. #include <boost/redis/logger.hpp>
  18. #include <boost/asio/cancellation_type.hpp>
  19. #include <boost/asio/error.hpp>
  20. #include <boost/assert.hpp>
  21. #include <boost/system/error_code.hpp>
  22. #include <cstddef>
  23. namespace boost::redis::detail {
  24. inline void process_ping_node(
  25. buffered_logger& lgr,
  26. resp3::basic_node<std::string_view> const& nd,
  27. system::error_code& ec)
  28. {
  29. switch (nd.data_type) {
  30. case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break;
  31. case resp3::type::blob_error: ec = redis::error::resp3_blob_error; break;
  32. default: ;
  33. }
  34. if (ec) {
  35. log_info(lgr, "Health checker: server answered ping with an error: ", nd.value);
  36. }
  37. }
  38. inline any_adapter make_ping_adapter(buffered_logger& lgr)
  39. {
  40. return any_adapter{
  41. [&lgr](any_adapter::parse_event evt, resp3::node_view const& nd, system::error_code& ec) {
  42. if (evt == any_adapter::parse_event::node)
  43. process_ping_node(lgr, nd, ec);
  44. }};
  45. }
  46. writer_action writer_fsm::resume(
  47. connection_state& st,
  48. system::error_code ec,
  49. std::size_t bytes_written,
  50. asio::cancellation_type_t cancel_state)
  51. {
  52. switch (resume_point_) {
  53. BOOST_REDIS_CORO_INITIAL
  54. for (;;) {
  55. // Attempt to write while we have requests ready to send
  56. while (st.mpx.prepare_write() != 0u) {
  57. // Write an entire message. We can't use asio::async_write because we want
  58. // to apply timeouts to individual write operations
  59. for (;;) {
  60. // Write what we can. If nothing has been written for the health check
  61. // interval, we consider the connection as failed
  62. BOOST_REDIS_YIELD(
  63. resume_point_,
  64. 1,
  65. writer_action::write_some(st.cfg.health_check_interval))
  66. // Commit the received bytes. This accounts for partial success
  67. bool finished = st.mpx.commit_write(bytes_written);
  68. log_debug(st.logger, "Writer task: ", bytes_written, " bytes written.");
  69. // Check for cancellations and translate error codes
  70. if (is_terminal_cancel(cancel_state))
  71. ec = asio::error::operation_aborted;
  72. else if (ec == asio::error::operation_aborted)
  73. ec = error::write_timeout;
  74. // Check for errors
  75. if (ec) {
  76. if (ec == asio::error::operation_aborted) {
  77. log_debug(st.logger, "Writer task: cancelled (1).");
  78. } else {
  79. log_debug(st.logger, "Writer task error: ", ec);
  80. }
  81. return ec;
  82. }
  83. // Are we done yet?
  84. if (finished)
  85. break;
  86. }
  87. }
  88. // No more requests ready to be written. Wait for more, or until we need to send a PING
  89. BOOST_REDIS_YIELD(resume_point_, 2, writer_action::wait(st.cfg.health_check_interval))
  90. // Check for cancellations
  91. if (is_terminal_cancel(cancel_state)) {
  92. log_debug(st.logger, "Writer task: cancelled (2).");
  93. return system::error_code(asio::error::operation_aborted);
  94. }
  95. // If we weren't notified, it's because there is no data and we should send a health check
  96. if (!ec) {
  97. auto elem = make_elem(st.ping_req, make_ping_adapter(st.logger));
  98. elem->set_done_callback([] { });
  99. st.mpx.add(elem);
  100. }
  101. }
  102. }
  103. // We should never reach here
  104. BOOST_ASSERT(false);
  105. return system::error_code();
  106. }
  107. } // namespace boost::redis::detail
  108. #endif