redis_stream.hpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
  2. * Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. *
  4. * Distributed under the Boost Software License, Version 1.0. (See
  5. * accompanying file LICENSE.txt)
  6. */
  7. #ifndef BOOST_REDIS_REDIS_STREAM_HPP
  8. #define BOOST_REDIS_REDIS_STREAM_HPP
  9. #include <boost/redis/config.hpp>
  10. #include <boost/redis/detail/connect_fsm.hpp>
  11. #include <boost/redis/error.hpp>
  12. #include <boost/redis/logger.hpp>
  13. #include <boost/asio/basic_waitable_timer.hpp>
  14. #include <boost/asio/cancel_after.hpp>
  15. #include <boost/asio/compose.hpp>
  16. #include <boost/asio/connect.hpp>
  17. #include <boost/asio/coroutine.hpp>
  18. #include <boost/asio/ip/basic_resolver.hpp>
  19. #include <boost/asio/ip/tcp.hpp>
  20. #include <boost/asio/local/stream_protocol.hpp>
  21. #include <boost/asio/ssl/context.hpp>
  22. #include <boost/asio/ssl/stream.hpp>
  23. #include <boost/asio/ssl/stream_base.hpp>
  24. #include <boost/asio/steady_timer.hpp>
  25. #include <boost/system/error_code.hpp>
  26. #include <utility>
  27. namespace boost {
  28. namespace redis {
  29. namespace detail {
  30. template <class Executor>
  31. class redis_stream {
  32. asio::ssl::context ssl_ctx_;
  33. asio::ip::basic_resolver<asio::ip::tcp, Executor> resolv_;
  34. asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>> stream_;
  35. #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
  36. asio::basic_stream_socket<asio::local::stream_protocol, Executor> unix_socket_;
  37. #endif
  38. typename asio::steady_timer::template rebind_executor<Executor>::other timer_;
  39. redis_stream_state st_;
  40. void reset_stream() { stream_ = {resolv_.get_executor(), ssl_ctx_}; }
  41. struct connect_op {
  42. redis_stream& obj_;
  43. connect_fsm fsm_;
  44. template <class Self>
  45. void execute_action(Self& self, connect_action act)
  46. {
  47. auto& obj = this->obj_; // prevent use-after-move errors
  48. const auto& cfg = fsm_.get_config();
  49. switch (act.type) {
  50. case connect_action_type::unix_socket_close:
  51. #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
  52. {
  53. system::error_code ec;
  54. obj.unix_socket_.close(ec);
  55. (*this)(self, ec); // This is a sync action
  56. }
  57. #else
  58. BOOST_ASSERT(false);
  59. #endif
  60. return;
  61. case connect_action_type::unix_socket_connect:
  62. #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
  63. obj.unix_socket_.async_connect(
  64. cfg.unix_socket,
  65. asio::cancel_after(obj.timer_, cfg.connect_timeout, std::move(self)));
  66. #else
  67. BOOST_ASSERT(false);
  68. #endif
  69. return;
  70. case connect_action_type::tcp_resolve:
  71. obj.resolv_.async_resolve(
  72. cfg.addr.host,
  73. cfg.addr.port,
  74. asio::cancel_after(obj.timer_, cfg.resolve_timeout, std::move(self)));
  75. return;
  76. case connect_action_type::ssl_stream_reset:
  77. obj.reset_stream();
  78. // this action does not require yielding. Execute the next action immediately
  79. (*this)(self);
  80. return;
  81. case connect_action_type::ssl_handshake:
  82. obj.stream_.async_handshake(
  83. asio::ssl::stream_base::client,
  84. asio::cancel_after(obj.timer_, cfg.ssl_handshake_timeout, std::move(self)));
  85. return;
  86. case connect_action_type::done: self.complete(act.ec); break;
  87. // Connect should use the specialized handler, where resolver results are available
  88. case connect_action_type::tcp_connect:
  89. default: BOOST_ASSERT(false);
  90. }
  91. }
  92. // This overload will be used for connects
  93. template <class Self>
  94. void operator()(
  95. Self& self,
  96. system::error_code ec,
  97. const asio::ip::tcp::endpoint& selected_endpoint)
  98. {
  99. auto act = fsm_.resume(
  100. ec,
  101. selected_endpoint,
  102. obj_.st_,
  103. self.get_cancellation_state().cancelled());
  104. execute_action(self, act);
  105. }
  106. // This overload will be used for resolves
  107. template <class Self>
  108. void operator()(
  109. Self& self,
  110. system::error_code ec,
  111. asio::ip::tcp::resolver::results_type endpoints)
  112. {
  113. auto act = fsm_.resume(ec, endpoints, obj_.st_, self.get_cancellation_state().cancelled());
  114. if (act.type == connect_action_type::tcp_connect) {
  115. auto& obj = this->obj_; // prevent use-after-free errors
  116. asio::async_connect(
  117. obj.stream_.next_layer(),
  118. std::move(endpoints),
  119. asio::cancel_after(obj.timer_, fsm_.get_config().connect_timeout, std::move(self)));
  120. } else {
  121. execute_action(self, act);
  122. }
  123. }
  124. template <class Self>
  125. void operator()(Self& self, system::error_code ec = {})
  126. {
  127. auto act = fsm_.resume(ec, obj_.st_, self.get_cancellation_state().cancelled());
  128. execute_action(self, act);
  129. }
  130. };
  131. public:
  132. explicit redis_stream(Executor ex, asio::ssl::context&& ssl_ctx)
  133. : ssl_ctx_{std::move(ssl_ctx)}
  134. , resolv_{ex}
  135. , stream_{ex, ssl_ctx_}
  136. #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
  137. , unix_socket_{ex}
  138. #endif
  139. , timer_{std::move(ex)}
  140. { }
  141. // Executor. Required to satisfy the AsyncStream concept
  142. using executor_type = Executor;
  143. executor_type get_executor() noexcept { return resolv_.get_executor(); }
  144. // Accessors
  145. const auto& get_ssl_context() const noexcept { return ssl_ctx_; }
  146. bool is_open() const
  147. {
  148. #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
  149. if (st_.type == transport_type::unix_socket)
  150. return unix_socket_.is_open();
  151. #endif
  152. return stream_.next_layer().is_open();
  153. }
  154. auto& next_layer() { return stream_; }
  155. const auto& next_layer() const { return stream_; }
  156. // I/O
  157. template <class CompletionToken>
  158. auto async_connect(const config& cfg, buffered_logger& l, CompletionToken&& token)
  159. {
  160. return asio::async_compose<CompletionToken, void(system::error_code)>(
  161. connect_op{*this, connect_fsm(cfg, l)},
  162. token);
  163. }
  164. // These functions should only be used with callbacks (e.g. within async_compose function bodies)
  165. template <class ConstBufferSequence, class CompletionToken>
  166. void async_write_some(const ConstBufferSequence& buffers, CompletionToken&& token)
  167. {
  168. switch (st_.type) {
  169. case transport_type::tcp:
  170. {
  171. stream_.next_layer().async_write_some(buffers, std::forward<CompletionToken>(token));
  172. break;
  173. }
  174. case transport_type::tcp_tls:
  175. {
  176. stream_.async_write_some(buffers, std::forward<CompletionToken>(token));
  177. break;
  178. }
  179. #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
  180. case transport_type::unix_socket:
  181. {
  182. unix_socket_.async_write_some(buffers, std::forward<CompletionToken>(token));
  183. break;
  184. }
  185. #endif
  186. default: BOOST_ASSERT(false);
  187. }
  188. }
  189. template <class MutableBufferSequence, class CompletionToken>
  190. void async_read_some(const MutableBufferSequence& buffers, CompletionToken&& token)
  191. {
  192. switch (st_.type) {
  193. case transport_type::tcp:
  194. {
  195. return stream_.next_layer().async_read_some(
  196. buffers,
  197. std::forward<CompletionToken>(token));
  198. break;
  199. }
  200. case transport_type::tcp_tls:
  201. {
  202. return stream_.async_read_some(buffers, std::forward<CompletionToken>(token));
  203. break;
  204. }
  205. #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
  206. case transport_type::unix_socket:
  207. {
  208. unix_socket_.async_read_some(buffers, std::forward<CompletionToken>(token));
  209. break;
  210. }
  211. #endif
  212. default: BOOST_ASSERT(false);
  213. }
  214. }
  215. // Cancels resolve operations. Resolve operations don't support per-operation
  216. // cancellation, but resolvers have a cancel() function. Resolve operations are
  217. // in general blocking and run in a separate thread. cancel() has effect only
  218. // if the operation hasn't started yet. Still, trying is better than nothing
  219. void cancel_resolve() { resolv_.cancel(); }
  220. };
  221. } // namespace detail
  222. } // namespace redis
  223. } // namespace boost
  224. #endif