multiplexer.hpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
  2. *
  3. * Distributed under the Boost Software License, Version 1.0. (See
  4. * accompanying file LICENSE.txt)
  5. */
  6. #ifndef BOOST_REDIS_MULTIPLEXER_HPP
  7. #define BOOST_REDIS_MULTIPLEXER_HPP
  8. #include <boost/redis/adapter/adapt.hpp>
  9. #include <boost/redis/adapter/any_adapter.hpp>
  10. #include <boost/redis/config.hpp>
  11. #include <boost/redis/detail/read_buffer.hpp>
  12. #include <boost/redis/resp3/node.hpp>
  13. #include <boost/redis/resp3/parser.hpp>
  14. #include <boost/redis/resp3/type.hpp>
  15. #include <boost/redis/usage.hpp>
  16. #include <boost/system/error_code.hpp>
  17. #include <cstddef>
  18. #include <deque>
  19. #include <functional>
  20. #include <memory>
  21. #include <string_view>
  22. #include <utility>
  23. namespace boost::redis {
  24. class request;
  25. namespace detail {
  26. // Return type of the multiplexer::consume_next function
  27. enum class consume_result
  28. {
  29. needs_more, // consume_next didn't have enough data
  30. got_response, // got a response to a regular command, vs. a push
  31. got_push, // got a response to a push
  32. };
  33. class multiplexer {
  34. public:
  35. struct elem {
  36. public:
  37. explicit elem(request const& req, any_adapter adapter);
  38. void set_done_callback(std::function<void()> f) noexcept { done_ = std::move(f); };
  39. auto notify_done() noexcept -> void
  40. {
  41. status_ = status::done;
  42. done_();
  43. }
  44. auto notify_error(system::error_code ec) noexcept -> void;
  45. [[nodiscard]]
  46. auto is_waiting() const noexcept
  47. {
  48. return status_ == status::waiting;
  49. }
  50. [[nodiscard]]
  51. auto is_written() const noexcept
  52. {
  53. return status_ == status::written;
  54. }
  55. [[nodiscard]]
  56. auto is_staged() const noexcept
  57. {
  58. return status_ == status::staged;
  59. }
  60. [[nodiscard]]
  61. bool is_done() const noexcept
  62. {
  63. return status_ == status::done;
  64. }
  65. void mark_written() noexcept { status_ = status::written; }
  66. void mark_staged() noexcept { status_ = status::staged; }
  67. void mark_waiting() noexcept { status_ = status::waiting; }
  68. auto get_error() const -> system::error_code const& { return ec_; }
  69. auto get_request() const -> request const& { return *req_; }
  70. auto get_read_size() const -> std::size_t { return read_size_; }
  71. auto get_remaining_responses() const -> std::size_t { return remaining_responses_; }
  72. auto commit_response(std::size_t read_size) -> void;
  73. auto get_adapter() -> any_adapter& { return adapter_; }
  74. // Marks the element as an abandoned request. An abandoned request
  75. // won't cause problems when its response arrives, but that response will be ignored.
  76. void mark_abandoned();
  77. [[nodiscard]]
  78. bool is_abandoned() const
  79. {
  80. return !req_;
  81. }
  82. private:
  83. enum class status
  84. {
  85. waiting, // the request hasn't been written yet
  86. staged, // we've issued the write for this request, but it hasn't finished yet
  87. written, // the request has been written successfully
  88. done, // the request has completed and the done callback has been invoked
  89. };
  90. request const* req_;
  91. any_adapter adapter_;
  92. std::function<void()> done_;
  93. // Contains the number of commands that haven't been read yet.
  94. std::size_t remaining_responses_;
  95. status status_;
  96. system::error_code ec_;
  97. std::size_t read_size_;
  98. };
  99. multiplexer();
  100. // To be called before a write operation. Coalesces all available requests
  101. // into a single buffer. Returns the number of coalesced requests.
  102. // Must be called before cancel_on_conn_lost() because it might change
  103. // request status.
  104. [[nodiscard]]
  105. auto prepare_write() -> std::size_t;
  106. // To be called after a write operation.
  107. // Returns true once all the bytes in the buffer generated by prepare_write
  108. // have been written.
  109. // Must be called before cancel_on_conn_lost() because it might change
  110. // request status.
  111. auto commit_write(std::size_t bytes_written) -> bool;
  112. // To be called after a successful read operation.
  113. // Must be called before cancel_on_conn_lost() because it might change
  114. // request status.
  115. [[nodiscard]]
  116. auto consume(system::error_code& ec) -> std::pair<consume_result, std::size_t>;
  117. auto add(std::shared_ptr<elem> const& ptr) -> void;
  118. void cancel(std::shared_ptr<elem> const& ptr);
  119. auto reset() -> void;
  120. [[nodiscard]]
  121. auto const& get_parser() const noexcept
  122. {
  123. return parser_;
  124. }
  125. auto cancel_waiting() -> std::size_t;
  126. // To be called exactly once to clean up state after a connection becomes unhealthy.
  127. // Requests are canceled or returned to the waiting state to be re-sent to the server,
  128. // depending on their configuration. After this function is called, prepare_write,
  129. // commit_write and consume_next must not be called until a reset() happens.
  130. // Otherwise, race conditions like the following might happen
  131. // (see https://github.com/boostorg/redis/pull/309 and https://github.com/boostorg/redis/issues/181):
  132. //
  133. // - This function runs and cancels a request, then consume_next runs. It tries to access
  134. // a request and adapter that might have been destroyed.
  135. // - This function runs and returns a request to waiting, then prepare_write runs.
  136. // It incorrectly sets the request state to staged, causing de synchronization between requests and responses.
  137. void cancel_on_conn_lost();
  138. [[nodiscard]]
  139. auto get_write_buffer() const noexcept -> std::string_view
  140. {
  141. return std::string_view{write_buffer_}.substr(write_offset_);
  142. }
  143. [[nodiscard]]
  144. auto get_prepared_read_buffer() noexcept -> read_buffer::span_type;
  145. [[nodiscard]]
  146. auto prepare_read() noexcept -> system::error_code;
  147. void commit_read(std::size_t read_size);
  148. [[nodiscard]]
  149. auto get_read_buffer_size() const noexcept -> std::size_t;
  150. void set_receive_adapter(any_adapter adapter);
  151. [[nodiscard]]
  152. auto get_usage() const noexcept -> usage
  153. {
  154. return usage_;
  155. }
  156. void set_config(config const& cfg);
  157. private:
  158. void commit_usage(bool is_push, read_buffer::consume_result res);
  159. [[nodiscard]]
  160. auto is_next_push(std::string_view data) const noexcept -> bool;
  161. // Completes requests that don't expect a response
  162. void release_push_requests();
  163. [[nodiscard]]
  164. consume_result consume_impl(system::error_code& ec);
  165. read_buffer read_buffer_;
  166. std::string write_buffer_;
  167. std::size_t write_offset_{}; // how many bytes of the write buffer have been written?
  168. std::deque<std::shared_ptr<elem>> reqs_;
  169. resp3::parser parser_{};
  170. bool on_push_ = false;
  171. bool cancel_run_called_ = false;
  172. usage usage_;
  173. any_adapter receive_adapter_;
  174. };
  175. auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr<multiplexer::elem>;
  176. } // namespace detail
  177. } // namespace boost::redis
  178. #endif // BOOST_REDIS_MULTIPLEXER_HPP