reader_fsm.ipp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
  2. *
  3. * Distributed under the Boost Software License, Version 1.0. (See
  4. * accompanying file LICENSE.txt)
  5. */
  6. #include <boost/redis/detail/connection_state.hpp>
  7. #include <boost/redis/detail/coroutine.hpp>
  8. #include <boost/redis/detail/multiplexer.hpp>
  9. #include <boost/redis/detail/reader_fsm.hpp>
  10. #include <boost/redis/impl/is_terminal_cancel.hpp>
  11. #include <boost/redis/impl/log_utils.hpp>
  12. #include <boost/asio/cancellation_type.hpp>
  13. #include <boost/asio/error.hpp>
  14. namespace boost::redis::detail {
  15. reader_fsm::action reader_fsm::resume(
  16. connection_state& st,
  17. std::size_t bytes_read,
  18. system::error_code ec,
  19. asio::cancellation_type_t cancel_state)
  20. {
  21. switch (resume_point_) {
  22. BOOST_REDIS_CORO_INITIAL
  23. for (;;) {
  24. // Prepare the buffer for the read operation
  25. ec = st.mpx.prepare_read();
  26. if (ec) {
  27. log_debug(st.logger, "Reader task: error in prepare_read: ", ec);
  28. return {ec};
  29. }
  30. // Read. The connection might spend health_check_interval without writing data.
  31. // Give it another health_check_interval for the response to arrive.
  32. // If we don't get anything in this time, consider the connection as dead
  33. log_debug(st.logger, "Reader task: issuing read");
  34. BOOST_REDIS_YIELD(resume_point_, 1, action::read_some(2 * st.cfg.health_check_interval))
  35. // Check for cancellations
  36. if (is_terminal_cancel(cancel_state)) {
  37. log_debug(st.logger, "Reader task: cancelled (1)");
  38. return system::error_code(asio::error::operation_aborted);
  39. }
  40. // Translate timeout errors caused by operation_aborted to more legible ones.
  41. // A timeout here means that we didn't receive data in time.
  42. // Note that cancellation is already handled by the above statement.
  43. if (ec == asio::error::operation_aborted) {
  44. ec = error::pong_timeout;
  45. }
  46. // Log what we read
  47. if (ec) {
  48. log_debug(st.logger, "Reader task: ", bytes_read, " bytes read, error: ", ec);
  49. } else {
  50. log_debug(st.logger, "Reader task: ", bytes_read, " bytes read");
  51. }
  52. // Process the bytes read, even if there was an error
  53. st.mpx.commit_read(bytes_read);
  54. // Check for read errors
  55. if (ec) {
  56. // TODO: If an error occurred but data was read (i.e.
  57. // bytes_read != 0) we should try to process that data and
  58. // deliver it to the user before calling cancel_run.
  59. return ec;
  60. }
  61. // Process the data that we've read
  62. while (st.mpx.get_read_buffer_size() != 0) {
  63. res_ = st.mpx.consume(ec);
  64. if (ec) {
  65. // TODO: Perhaps log what has not been consumed to aid
  66. // debugging.
  67. log_debug(st.logger, "Reader task: error processing message: ", ec);
  68. return ec;
  69. }
  70. if (res_.first == consume_result::needs_more) {
  71. log_debug(st.logger, "Reader task: incomplete message received");
  72. break;
  73. }
  74. if (res_.first == consume_result::got_push) {
  75. BOOST_REDIS_YIELD(resume_point_, 2, action::notify_push_receiver(res_.second))
  76. // Check for cancellations
  77. if (is_terminal_cancel(cancel_state)) {
  78. log_debug(st.logger, "Reader task: cancelled (2)");
  79. return system::error_code(asio::error::operation_aborted);
  80. }
  81. // Check for other errors
  82. if (ec) {
  83. log_debug(st.logger, "Reader task: error notifying push receiver: ", ec);
  84. return ec;
  85. }
  86. } else {
  87. // TODO: Here we should notify the exec operation that
  88. // it can be completed. This will improve log clarity
  89. // and will make this code symmetrical in how it
  90. // handles pushes and other messages. The new action
  91. // type can be named notify_exec. To do that we need to
  92. // refactor the multiplexer.
  93. }
  94. }
  95. }
  96. }
  97. BOOST_ASSERT(false);
  98. return system::error_code();
  99. }
  100. } // namespace boost::redis::detail