connection.hpp 43 KB


  1. /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
  2. *
  3. * Distributed under the Boost Software License, Version 1.0. (See
  4. * accompanying file LICENSE.txt)
  5. */
  6. #ifndef BOOST_REDIS_CONNECTION_HPP
  7. #define BOOST_REDIS_CONNECTION_HPP
  8. #include <boost/redis/adapter/adapt.hpp>
  9. #include <boost/redis/adapter/any_adapter.hpp>
  10. #include <boost/redis/config.hpp>
  11. #include <boost/redis/detail/connection_state.hpp>
  12. #include <boost/redis/detail/exec_fsm.hpp>
  13. #include <boost/redis/detail/multiplexer.hpp>
  14. #include <boost/redis/detail/reader_fsm.hpp>
  15. #include <boost/redis/detail/redis_stream.hpp>
  16. #include <boost/redis/detail/run_fsm.hpp>
  17. #include <boost/redis/detail/writer_fsm.hpp>
  18. #include <boost/redis/error.hpp>
  19. #include <boost/redis/logger.hpp>
  20. #include <boost/redis/operation.hpp>
  21. #include <boost/redis/request.hpp>
  22. #include <boost/redis/resp3/type.hpp>
  23. #include <boost/redis/response.hpp>
  24. #include <boost/redis/usage.hpp>
  25. #include <boost/asio/any_completion_handler.hpp>
  26. #include <boost/asio/any_io_executor.hpp>
  27. #include <boost/asio/associated_cancellation_slot.hpp>
  28. #include <boost/asio/basic_stream_socket.hpp>
  29. #include <boost/asio/bind_cancellation_slot.hpp>
  30. #include <boost/asio/bind_executor.hpp>
  31. #include <boost/asio/buffer.hpp>
  32. #include <boost/asio/cancel_at.hpp>
  33. #include <boost/asio/cancellation_signal.hpp>
  34. #include <boost/asio/cancellation_type.hpp>
  35. #include <boost/asio/deferred.hpp>
  36. #include <boost/asio/error.hpp>
  37. #include <boost/asio/experimental/cancellation_condition.hpp>
  38. #include <boost/asio/experimental/channel.hpp>
  39. #include <boost/asio/experimental/parallel_group.hpp>
  40. #include <boost/asio/immediate.hpp>
  41. #include <boost/asio/io_context.hpp>
  42. #include <boost/asio/ip/tcp.hpp>
  43. #include <boost/asio/ssl/stream.hpp>
  44. #include <boost/asio/steady_timer.hpp>
  45. #include <boost/assert.hpp>
  46. #include <boost/config.hpp>
  47. #include <array>
  48. #include <chrono>
  49. #include <cstddef>
  50. #include <memory>
  51. #include <string>
  52. #include <utility>
  53. namespace boost::redis {
  54. namespace detail {
  55. // Given a timeout value, compute the expiry time. A zero timeout is considered to mean "no timeout"
  56. inline std::chrono::steady_clock::time_point compute_expiry(
  57. std::chrono::steady_clock::duration timeout)
  58. {
  59. return timeout.count() == 0 ? (std::chrono::steady_clock::time_point::max)()
  60. : std::chrono::steady_clock::now() + timeout;
  61. }
  62. template <class Executor>
  63. struct connection_impl {
  64. using clock_type = std::chrono::steady_clock;
  65. using clock_traits_type = asio::wait_traits<clock_type>;
  66. using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, Executor>;
  67. using receive_channel_type = asio::experimental::channel<
  68. Executor,
  69. void(system::error_code, std::size_t)>;
  70. using exec_notifier_type = asio::experimental::channel<
  71. Executor,
  72. void(system::error_code, std::size_t)>;
  73. redis_stream<Executor> stream_;
  74. timer_type writer_timer_; // timer used for write timeouts
  75. timer_type writer_cv_; // condition variable, cancelled when there is new data to write
  76. timer_type reader_timer_; // timer used for read timeouts
  77. timer_type reconnect_timer_; // to wait the reconnection period
  78. timer_type ping_timer_; // to wait between pings
  79. receive_channel_type receive_channel_;
  80. asio::cancellation_signal run_signal_;
  81. connection_state st_;
  82. using executor_type = Executor;
  83. executor_type get_executor() noexcept { return writer_cv_.get_executor(); }
  84. struct exec_op {
  85. connection_impl* obj_ = nullptr;
  86. std::shared_ptr<exec_notifier_type> notifier_ = nullptr;
  87. exec_fsm fsm_;
  88. template <class Self>
  89. void operator()(Self& self, system::error_code = {}, std::size_t = 0)
  90. {
  91. while (true) {
  92. // Invoke the state machine
  93. auto act = fsm_.resume(obj_->is_open(), self.get_cancellation_state().cancelled());
  94. // Do what the FSM said
  95. switch (act.type()) {
  96. case exec_action_type::setup_cancellation:
  97. self.reset_cancellation_state(asio::enable_total_cancellation());
  98. continue; // this action does not require yielding
  99. case exec_action_type::immediate:
  100. asio::async_immediate(self.get_io_executor(), std::move(self));
  101. return;
  102. case exec_action_type::notify_writer:
  103. obj_->writer_cv_.cancel();
  104. continue; // this action does not require yielding
  105. case exec_action_type::wait_for_response:
  106. notifier_->async_receive(std::move(self));
  107. return;
  108. case exec_action_type::done:
  109. notifier_.reset();
  110. self.complete(act.error(), act.bytes_read());
  111. return;
  112. }
  113. }
  114. }
  115. };
  116. connection_impl(Executor&& ex, asio::ssl::context&& ctx, logger&& lgr)
  117. : stream_{ex, std::move(ctx)}
  118. , writer_timer_{ex}
  119. , writer_cv_{ex}
  120. , reader_timer_{ex}
  121. , reconnect_timer_{ex}
  122. , ping_timer_{ex}
  123. , receive_channel_{ex, 256}
  124. , st_{{std::move(lgr)}}
  125. {
  126. set_receive_adapter(any_adapter{ignore});
  127. writer_cv_.expires_at((std::chrono::steady_clock::time_point::max)());
  128. }
  129. void cancel(operation op)
  130. {
  131. switch (op) {
  132. case operation::exec: st_.mpx.cancel_waiting(); break;
  133. case operation::receive: receive_channel_.cancel(); break;
  134. case operation::reconnection:
  135. st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero();
  136. break;
  137. case operation::run:
  138. case operation::resolve:
  139. case operation::connect:
  140. case operation::ssl_handshake:
  141. case operation::health_check: cancel_run(); break;
  142. case operation::all:
  143. st_.mpx.cancel_waiting(); // exec
  144. receive_channel_.cancel(); // receive
  145. st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect
  146. cancel_run(); // run
  147. break;
  148. default: /* ignore */;
  149. }
  150. }
  151. void cancel_run()
  152. {
  153. // Individual operations should see a terminal cancellation, regardless
  154. // of what we got requested. We take enough actions to ensure that this
  155. // doesn't prevent the object from being re-used (e.g. we reset the TLS stream).
  156. run_signal_.emit(asio::cancellation_type_t::terminal);
  157. // Name resolution doesn't support per-operation cancellation
  158. stream_.cancel_resolve();
  159. // Receive is technically not part of run, but we also cancel it for
  160. // backwards compatibility.
  161. receive_channel_.cancel();
  162. }
  163. bool is_open() const noexcept { return stream_.is_open(); }
  164. bool will_reconnect() const noexcept
  165. {
  166. return st_.cfg.reconnect_wait_interval != std::chrono::seconds::zero();
  167. }
  168. template <class CompletionToken>
  169. auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token)
  170. {
  171. auto notifier = std::make_shared<exec_notifier_type>(get_executor(), 1);
  172. auto info = make_elem(req, std::move(adapter));
  173. info->set_done_callback([notifier]() {
  174. notifier->try_send(std::error_code{}, 0);
  175. });
  176. return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
  177. exec_op{this, notifier, exec_fsm(st_.mpx, std::move(info))},
  178. token,
  179. writer_cv_);
  180. }
  181. void set_receive_adapter(any_adapter adapter)
  182. {
  183. st_.mpx.set_receive_adapter(std::move(adapter));
  184. }
  185. };
  186. template <class Executor>
  187. struct writer_op {
  188. connection_impl<Executor>* conn_;
  189. writer_fsm fsm_;
  190. explicit writer_op(connection_impl<Executor>& conn) noexcept
  191. : conn_(&conn)
  192. { }
  193. template <class Self>
  194. void operator()(Self& self, system::error_code ec = {}, std::size_t bytes_written = 0u)
  195. {
  196. auto* conn = conn_; // Prevent potential use-after-move errors with cancel_after
  197. auto act = fsm_.resume(
  198. conn->st_,
  199. ec,
  200. bytes_written,
  201. self.get_cancellation_state().cancelled());
  202. switch (act.type()) {
  203. case writer_action_type::done: self.complete(act.error()); return;
  204. case writer_action_type::write_some:
  205. conn->stream_.async_write_some(
  206. asio::buffer(conn->st_.mpx.get_write_buffer()),
  207. asio::cancel_at(
  208. conn->writer_timer_,
  209. compute_expiry(act.timeout()),
  210. std::move(self)));
  211. return;
  212. case writer_action_type::wait:
  213. conn->writer_cv_.expires_at(compute_expiry(act.timeout()));
  214. conn->writer_cv_.async_wait(std::move(self));
  215. return;
  216. }
  217. }
  218. };
  219. template <class Executor>
  220. struct reader_op {
  221. connection_impl<Executor>* conn_;
  222. reader_fsm fsm_;
  223. public:
  224. reader_op(connection_impl<Executor>& conn) noexcept
  225. : conn_{&conn}
  226. { }
  227. template <class Self>
  228. void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
  229. {
  230. for (;;) {
  231. auto* conn = conn_; // Prevent potential use-after-move errors with cancel_after
  232. auto act = fsm_.resume(conn->st_, n, ec, self.get_cancellation_state().cancelled());
  233. switch (act.get_type()) {
  234. case reader_fsm::action::type::read_some:
  235. conn->stream_.async_read_some(
  236. asio::buffer(conn->st_.mpx.get_prepared_read_buffer()),
  237. asio::cancel_at(
  238. conn->reader_timer_,
  239. compute_expiry(act.timeout()),
  240. std::move(self)));
  241. return;
  242. case reader_fsm::action::type::notify_push_receiver:
  243. if (conn->receive_channel_.try_send(ec, act.push_size())) {
  244. continue;
  245. } else {
  246. conn->receive_channel_.async_send(ec, act.push_size(), std::move(self));
  247. }
  248. return;
  249. case reader_fsm::action::type::done: self.complete(act.error()); return;
  250. }
  251. }
  252. }
  253. };
  254. template <class Executor>
  255. class run_op {
  256. private:
  257. connection_impl<Executor>* conn_;
  258. run_fsm fsm_{};
  259. template <class CompletionToken>
  260. auto reader(CompletionToken&& token)
  261. {
  262. return asio::async_compose<CompletionToken, void(system::error_code)>(
  263. reader_op<Executor>{*conn_},
  264. std::forward<CompletionToken>(token),
  265. conn_->writer_cv_);
  266. }
  267. template <class CompletionToken>
  268. auto writer(CompletionToken&& token)
  269. {
  270. return asio::async_compose<CompletionToken, void(system::error_code)>(
  271. writer_op<Executor>{*conn_},
  272. std::forward<CompletionToken>(token),
  273. conn_->writer_cv_);
  274. }
  275. public:
  276. run_op(connection_impl<Executor>* conn) noexcept
  277. : conn_{conn}
  278. { }
  279. // Called after the parallel group finishes
  280. template <class Self>
  281. void operator()(
  282. Self& self,
  283. std::array<std::size_t, 2u> order,
  284. system::error_code reader_ec,
  285. system::error_code writer_ec)
  286. {
  287. (*this)(self, order[0u] == 0u ? reader_ec : writer_ec);
  288. }
  289. template <class Self>
  290. void operator()(Self& self, system::error_code ec = {})
  291. {
  292. auto act = fsm_.resume(conn_->st_, ec, self.get_cancellation_state().cancelled());
  293. switch (act.type) {
  294. case run_action_type::done: self.complete(act.ec); return;
  295. case run_action_type::immediate:
  296. asio::async_immediate(self.get_io_executor(), std::move(self));
  297. return;
  298. case run_action_type::connect:
  299. conn_->stream_.async_connect(conn_->st_.cfg, conn_->st_.logger, std::move(self));
  300. return;
  301. case run_action_type::parallel_group:
  302. asio::experimental::make_parallel_group(
  303. [this](auto token) {
  304. return this->reader(token);
  305. },
  306. [this](auto token) {
  307. return this->writer(token);
  308. })
  309. .async_wait(asio::experimental::wait_for_one(), std::move(self));
  310. return;
  311. case run_action_type::cancel_receive:
  312. conn_->receive_channel_.cancel();
  313. (*this)(self); // this action does not require suspending
  314. return;
  315. case run_action_type::wait_for_reconnection:
  316. conn_->reconnect_timer_.expires_after(conn_->st_.cfg.reconnect_wait_interval);
  317. conn_->reconnect_timer_.async_wait(std::move(self));
  318. return;
  319. default: BOOST_ASSERT(false);
  320. }
  321. }
  322. };
  323. logger make_stderr_logger(logger::level lvl, std::string prefix);
  324. template <class Executor>
  325. class run_cancel_handler {
  326. connection_impl<Executor>* conn_;
  327. public:
  328. explicit run_cancel_handler(connection_impl<Executor>& conn) noexcept
  329. : conn_(&conn)
  330. { }
  331. void operator()(asio::cancellation_type_t cancel_type) const
  332. {
  333. // We support terminal and partial cancellation
  334. constexpr auto mask = asio::cancellation_type_t::terminal |
  335. asio::cancellation_type_t::partial;
  336. if ((cancel_type & mask) != asio::cancellation_type_t::none) {
  337. conn_->cancel(operation::run);
  338. }
  339. }
  340. };
  341. } // namespace detail
  342. /** @brief A SSL connection to the Redis server.
  343. *
  344. * This class keeps a healthy connection to the Redis instance where
  345. * commands can be sent at any time. For more details, please see the
  346. * documentation of each individual function.
  347. *
  348. * @tparam Executor The executor type used to create any required I/O objects.
  349. */
  350. template <class Executor>
  351. class basic_connection {
  352. public:
  353. using this_type = basic_connection<Executor>;
  354. /// (Deprecated) Type of the next layer
  355. BOOST_DEPRECATED("This typedef is deprecated, and will be removed with next_layer().")
  356. typedef asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>> next_layer_type;
  357. /// The type of the executor associated to this object.
  358. using executor_type = Executor;
  359. /// Rebinds the socket type to another executor.
  360. template <class Executor1>
  361. struct rebind_executor {
  362. /// The connection type when rebound to the specified executor.
  363. using other = basic_connection<Executor1>;
  364. };
  365. /** @brief Constructor from an executor.
  366. *
  367. * @param ex Executor used to create all internal I/O objects.
  368. * @param ctx SSL context.
  369. * @param lgr Logger configuration. It can be used to filter messages by level
  370. * and customize logging. By default, `logger::level::info` messages
  371. * and higher are logged to `stderr`.
  372. */
  373. explicit basic_connection(
  374. executor_type ex,
  375. asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
  376. logger lgr = {})
  377. : impl_(
  378. std::make_unique<detail::connection_impl<Executor>>(
  379. std::move(ex),
  380. std::move(ctx),
  381. std::move(lgr)))
  382. { }
  383. /** @brief Constructor from an executor and a logger.
  384. *
  385. * @param ex Executor used to create all internal I/O objects.
  386. * @param lgr Logger configuration. It can be used to filter messages by level
  387. * and customize logging. By default, `logger::level::info` messages
  388. * and higher are logged to `stderr`.
  389. *
  390. * An SSL context with default settings will be created.
  391. */
  392. basic_connection(executor_type ex, logger lgr)
  393. : basic_connection(
  394. std::move(ex),
  395. asio::ssl::context{asio::ssl::context::tlsv12_client},
  396. std::move(lgr))
  397. { }
  398. /**
  399. * @brief Constructor from an `io_context`.
  400. *
  401. * @param ioc I/O context used to create all internal I/O objects.
  402. * @param ctx SSL context.
  403. * @param lgr Logger configuration. It can be used to filter messages by level
  404. * and customize logging. By default, `logger::level::info` messages
  405. * and higher are logged to `stderr`.
  406. */
  407. explicit basic_connection(
  408. asio::io_context& ioc,
  409. asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
  410. logger lgr = {})
  411. : basic_connection(ioc.get_executor(), std::move(ctx), std::move(lgr))
  412. { }
  413. /**
  414. * @brief Constructor from an `io_context` and a logger.
  415. *
  416. * @param ioc I/O context used to create all internal I/O objects.
  417. * @param lgr Logger configuration. It can be used to filter messages by level
  418. * and customize logging. By default, `logger::level::info` messages
  419. * and higher are logged to `stderr`.
  420. */
  421. basic_connection(asio::io_context& ioc, logger lgr)
  422. : basic_connection(
  423. ioc.get_executor(),
  424. asio::ssl::context{asio::ssl::context::tlsv12_client},
  425. std::move(lgr))
  426. { }
  427. /// Returns the associated executor.
  428. executor_type get_executor() noexcept { return impl_->writer_cv_.get_executor(); }
  429. /** @brief Starts the underlying connection operations.
  430. *
  431. * This function establishes a connection to the Redis server and keeps
  432. * it healthy by performing the following operations:
  433. *
  434. * @li For TCP connections, resolves the server hostname passed in
  435. * @ref boost::redis::config::addr.
  436. * @li Establishes a physical connection to the server. For TCP connections,
  437. * connects to one of the endpoints obtained during name resolution.
  438. * For UNIX domain socket connections, it connects to @ref boost::redis::config::unix_sockets.
  439. * @li If @ref boost::redis::config::use_ssl is `true`, performs the TLS handshake.
  440. * @li Executes the setup request, as defined by the passed @ref config object.
  441. * By default, this is a `HELLO` command, but it can contain any other arbitrary
  442. * commands. See the @ref config::setup docs for more info.
  443. * @li Starts a health-check operation where `PING` commands are sent
  444. * at intervals specified by
  445. * @ref config::health_check_interval when the connection is idle.
  446. * See the documentation of @ref config::health_check_interval for more info.
  447. * @li Starts read and write operations. Requests issued using @ref async_exec
  448. * before `async_run` is called will be written to the server immediately.
  449. *
  450. * When a connection is lost for any reason, a new one is
  451. * established automatically. To disable reconnection
  452. * set @ref boost::redis::config::reconnect_wait_interval to zero.
  453. *
  454. * The completion token must have the following signature
  455. *
  456. * @code
  457. * void f(system::error_code);
  458. * @endcode
  459. *
  460. * @par Per-operation cancellation
  461. * This operation supports the following cancellation types:
  462. *
  463. * @li `asio::cancellation_type_t::terminal`.
  464. * @li `asio::cancellation_type_t::partial`.
  465. *
  466. * In both cases, cancellation is equivalent to calling @ref basic_connection::cancel
  467. * passing @ref operation::run as argument.
  468. *
  469. * After the operation completes, the token's associated cancellation slot
  470. * may still have a cancellation handler associated to this connection.
  471. * You should make sure to not invoke it after the connection has been destroyed.
  472. * This is consistent with what other Asio I/O objects do.
  473. *
  474. * For example on how to call this function refer to
  475. * cpp20_intro.cpp or any other example.
  476. *
  477. * @param cfg Configuration parameters.
  478. * @param token Completion token.
  479. */
  480. template <class CompletionToken = asio::default_completion_token_t<executor_type>>
  481. auto async_run(config const& cfg, CompletionToken&& token = {})
  482. {
  483. return asio::async_initiate<CompletionToken, void(system::error_code)>(
  484. run_initiation{impl_.get()},
  485. token,
  486. &cfg);
  487. }
  488. /**
  489. * @brief (Deprecated) Starts the underlying connection operations.
  490. * @copydetail async_run
  491. *
  492. * This function accepts an extra logger parameter. The passed `logger::lvl`
  493. * will be used, but `logger::fn` will be ignored. Instead, a function
  494. * that logs to `stderr` using `config::prefix` will be used.
  495. * This keeps backwards compatibility with previous versions.
  496. * Any logger configured in the constructor will be overriden.
  497. *
  498. * @par Deprecated
  499. * The logger should be passed to the connection's constructor instead of using this
  500. * function. Use the overload without a logger parameter, instead. This function is
  501. * deprecated and will be removed in subsequent releases.
  502. *
  503. * @param cfg Configuration parameters.
  504. * @param l Logger.
  505. * @param token Completion token.
  506. */
  507. template <class CompletionToken = asio::default_completion_token_t<executor_type>>
  508. BOOST_DEPRECATED(
  509. "The async_run overload taking a logger argument is deprecated. "
  510. "Please pass the logger to the connection's constructor, instead, "
  511. "and use the other async_run overloads.")
  512. auto async_run(config const& cfg, logger l, CompletionToken&& token = {})
  513. {
  514. set_stderr_logger(l.lvl, cfg);
  515. return async_run(cfg, std::forward<CompletionToken>(token));
  516. }
  517. /**
  518. * @brief (Deprecated) Starts the underlying connection operations.
  519. * @copydetail async_run
  520. *
  521. * Uses a default-constructed config object to run the connection.
  522. *
  523. * @par Deprecated
  524. * This function is deprecated and will be removed in subsequent releases.
  525. * Use the overload taking an explicit config object, instead.
  526. *
  527. * @param token Completion token.
  528. */
  529. template <class CompletionToken = asio::default_completion_token_t<executor_type>>
  530. BOOST_DEPRECATED(
  531. "Running without an explicit config object is deprecated."
  532. "Please create a config object and pass it to async_run.")
  533. auto async_run(CompletionToken&& token = {})
  534. {
  535. return async_run(config{}, std::forward<CompletionToken>(token));
  536. }
  537. /** @brief Receives server side pushes asynchronously.
  538. *
  539. * When pushes arrive and there is no `async_receive` operation in
  540. * progress, pushed data, requests, and responses will be paused
  541. * until `async_receive` is called again. Apps will usually want
  542. * to call `async_receive` in a loop.
  543. *
  544. * For an example see cpp20_subscriber.cpp. The completion token must
  545. * have the following signature
  546. *
  547. * @code
  548. * void f(system::error_code, std::size_t);
  549. * @endcode
  550. *
  551. * Where the second parameter is the size of the push received in
  552. * bytes.
  553. *
  554. * @par Per-operation cancellation
  555. * This operation supports the following cancellation types:
  556. *
  557. * @li `asio::cancellation_type_t::terminal`.
  558. * @li `asio::cancellation_type_t::partial`.
  559. * @li `asio::cancellation_type_t::total`.
  560. *
  561. * Calling `basic_connection::cancel(operation::receive)` will
  562. * also cancel any ongoing receive operations.
  563. *
  564. * @param token Completion token.
  565. */
  566. template <class CompletionToken = asio::default_completion_token_t<executor_type>>
  567. auto async_receive(CompletionToken&& token = {})
  568. {
  569. return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
  570. }
  571. /** @brief Receives server pushes synchronously without blocking.
  572. *
  573. * Receives a server push synchronously by calling `try_receive` on
  574. * the underlying channel. If the operation fails because
  575. * `try_receive` returns `false`, `ec` will be set to
  576. * @ref boost::redis::error::sync_receive_push_failed.
  577. *
  578. * @param ec Contains the error if any occurred.
  579. * @returns The number of bytes read from the socket.
  580. */
  581. std::size_t receive(system::error_code& ec)
  582. {
  583. std::size_t size = 0;
  584. auto f = [&](system::error_code const& ec2, std::size_t n) {
  585. ec = ec2;
  586. size = n;
  587. };
  588. auto const res = impl_->receive_channel_.try_receive(f);
  589. if (ec)
  590. return 0;
  591. if (!res)
  592. ec = error::sync_receive_push_failed;
  593. return size;
  594. }
  595. /** @brief Executes commands on the Redis server asynchronously.
  596. *
  597. * This function sends a request to the Redis server and waits for
  598. * the responses to each individual command in the request. If the
  599. * request contains only commands that don't expect a response,
  600. * the completion occurs after it has been written to the
  601. * underlying stream. Multiple concurrent calls to this function
  602. * will be automatically queued by the implementation.
  603. *
  604. * For an example see cpp20_echo_server.cpp.
  605. *
  606. * The completion token must have the following signature:
  607. *
  608. * @code
  609. * void f(system::error_code, std::size_t);
  610. * @endcode
  611. *
  612. * Where the second parameter is the size of the response received
  613. * in bytes.
  614. *
  615. * @par Per-operation cancellation
  616. * This operation supports per-operation cancellation. Depending on the state of the request
  617. * when cancellation is requested, we can encounter two scenarios:
  618. *
  619. * @li If the request hasn't been sent to the server yet, cancellation will prevent it
  620. * from being sent to the server. In this situation, all cancellation types are supported
  621. * (`asio::cancellation_type_t::terminal`, `asio::cancellation_type_t::partial` and
  622. * `asio::cancellation_type_t::total`).
  623. * @li If the request has been sent to the server but the response hasn't arrived yet,
  624. * cancellation will cause `async_exec` to complete immediately. When the response
  625. * arrives from the server, it will be ignored. In this situation, only
  626. * `asio::cancellation_type_t::terminal` and `asio::cancellation_type_t::partial`
  627. * are supported. Cancellation requests specifying `asio::cancellation_type_t::total`
  628. * only will be ignored.
  629. *
  630. * In any case, connections can be safely used after cancelling `async_exec` operations.
  631. *
  632. * @par Object lifetimes
  633. * Both `req` and `res` should be kept alive until the operation completes.
  634. * No copies of the request object are made.
  635. *
  636. * @param req The request to be executed.
  637. * @param resp The response object to parse data into.
  638. * @param token Completion token.
  639. */
  640. template <
  641. class Response = ignore_t,
  642. class CompletionToken = asio::default_completion_token_t<executor_type>>
  643. auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {})
  644. {
  645. return this->async_exec(req, any_adapter{resp}, std::forward<CompletionToken>(token));
  646. }
  647. /** @brief Executes commands on the Redis server asynchronously.
  648. *
  649. * This function sends a request to the Redis server and waits for
  650. * the responses to each individual command in the request. If the
  651. * request contains only commands that don't expect a response,
  652. * the completion occurs after it has been written to the
  653. * underlying stream. Multiple concurrent calls to this function
  654. * will be automatically queued by the implementation.
  655. *
  656. * For an example see cpp20_echo_server.cpp.
  657. *
  658. * The completion token must have the following signature:
  659. *
  660. * @code
  661. * void f(system::error_code, std::size_t);
  662. * @endcode
  663. *
  664. * Where the second parameter is the size of the response received
  665. * in bytes.
  666. *
  667. * @par Per-operation cancellation
  668. * This operation supports per-operation cancellation. Depending on the state of the request
  669. * when cancellation is requested, we can encounter two scenarios:
  670. *
  671. * @li If the request hasn't been sent to the server yet, cancellation will prevent it
  672. * from being sent to the server. In this situation, all cancellation types are supported
  673. * (`asio::cancellation_type_t::terminal`, `asio::cancellation_type_t::partial` and
  674. * `asio::cancellation_type_t::total`).
  675. * @li If the request has been sent to the server but the response hasn't arrived yet,
  676. * cancellation will cause `async_exec` to complete immediately. When the response
  677. * arrives from the server, it will be ignored. In this situation, only
  678. * `asio::cancellation_type_t::terminal` and `asio::cancellation_type_t::partial`
  679. * are supported. Cancellation requests specifying `asio::cancellation_type_t::total`
  680. * only will be ignored.
  681. *
  682. * In any case, connections can be safely used after cancelling `async_exec` operations.
  683. *
  684. * @par Object lifetimes
  685. * Both `req` and any response object referenced by `adapter`
  686. * should be kept alive until the operation completes.
  687. * No copies of the request object are made.
  688. *
  689. * @param req The request to be executed.
  690. * @param adapter An adapter object referencing a response to place data into.
  691. * @param token Completion token.
  692. */
  693. template <class CompletionToken = asio::default_completion_token_t<executor_type>>
  694. auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token = {})
  695. {
  696. return impl_->async_exec(req, std::move(adapter), std::forward<CompletionToken>(token));
  697. }
  698. /** @brief Cancel operations.
  699. *
  700. * @li `operation::exec`: cancels operations started with
  701. * `async_exec`. Affects only requests that haven't been written
  702. * yet.
  703. * @li `operation::run`: cancels the `async_run` operation.
  704. * @li `operation::receive`: cancels any ongoing calls to `async_receive`.
  705. * @li `operation::all`: cancels all operations listed above.
  706. *
  707. * @param op The operation to be cancelled.
  708. */
  709. void cancel(operation op = operation::all) { impl_->cancel(op); }
  710. /// Returns true if the connection will try to reconnect if an error is encountered.
  711. bool will_reconnect() const noexcept { return impl_->will_reconnect(); }
  712. /**
  713. * @brief (Deprecated) Returns the ssl context.
  714. *
  715. * `ssl::context` has no const methods, so this function should not be called.
  716. * Any TLS configuration should be set up by passing an `ssl::context`
  717. * to the connection's constructor.
  718. *
  719. * @returns The SSL context.
  720. */
  721. BOOST_DEPRECATED(
  722. "ssl::context has no const methods, so this function should not be called. Set up any "
  723. "required TLS configuration before passing the ssl::context to the connection's constructor.")
  724. asio::ssl::context const& get_ssl_context() const noexcept
  725. {
  726. return impl_->stream_.get_ssl_context();
  727. }
  728. /**
  729. * @brief (Deprecated) Resets the underlying stream.
  730. *
  731. * This function is no longer necessary and is currently a no-op.
  732. */
  733. BOOST_DEPRECATED(
  734. "This function is no longer necessary and is currently a no-op. connection resets the stream "
  735. "internally as required. This function will be removed in subsequent releases")
  736. void reset_stream() { }
  737. /**
  738. * @brief (Deprecated) Returns a reference to the next layer.
  739. *
  740. * This function returns a dummy object for connections using UNIX domain sockets.
  741. *
  742. * @par Deprecated
  743. * Accessing the underlying stream is deprecated and will be removed in the next release.
  744. * Use the other member functions to interact with the connection.
  745. *
  746. * @returns A reference to the underlying SSL stream object.
  747. */
  748. BOOST_DEPRECATED(
  749. "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
  750. "the other member functions to interact with the connection.")
  751. auto& next_layer() noexcept { return impl_->stream_.next_layer(); }
  752. /**
  753. * @brief (Deprecated) Returns a reference to the next layer.
  754. *
  755. * This function returns a dummy object for connections using UNIX domain sockets.
  756. *
  757. * @par Deprecated
  758. * Accessing the underlying stream is deprecated and will be removed in the next release.
  759. * Use the other member functions to interact with the connection.
  760. *
  761. * @returns A reference to the underlying SSL stream object.
  762. */
  763. BOOST_DEPRECATED(
  764. "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
  765. "the other member functions to interact with the connection.")
  766. auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); }
  767. /// Sets the response object of @ref async_receive operations.
  768. template <class Response>
  769. void set_receive_response(Response& resp)
  770. {
  771. impl_->set_receive_adapter(any_adapter{resp});
  772. }
  773. /// Returns connection usage information.
  774. usage get_usage() const noexcept { return impl_->st_.mpx.get_usage(); }
  775. private:
  776. using clock_type = std::chrono::steady_clock;
  777. using clock_traits_type = asio::wait_traits<clock_type>;
  778. using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
  779. using receive_channel_type = asio::experimental::channel<
  780. executor_type,
  781. void(system::error_code, std::size_t)>;
  782. auto use_ssl() const noexcept { return impl_->cfg_.use_ssl; }
  783. // Used by both this class and connection
  784. void set_stderr_logger(logger::level lvl, const config& cfg)
  785. {
  786. impl_->st_.logger.lgr = detail::make_stderr_logger(lvl, cfg.log_prefix);
  787. }
  788. // Initiation for async_run. This is required because we need access
  789. // to the final handler (rather than the completion token) within the initiation,
  790. // to modify the handler's cancellation slot.
  791. struct run_initiation {
  792. detail::connection_impl<Executor>* self;
  793. using executor_type = Executor;
  794. executor_type get_executor() const noexcept { return self->get_executor(); }
  795. template <class Handler>
  796. void operator()(Handler&& handler, config const* cfg)
  797. {
  798. self->st_.cfg = *cfg;
  799. self->st_.mpx.set_config(*cfg);
  800. // If the token's slot has cancellation enabled, it should just emit
  801. // the cancellation signal in our connection. This lets us unify the cancel()
  802. // function and per-operation cancellation
  803. auto slot = asio::get_associated_cancellation_slot(handler);
  804. if (slot.is_connected()) {
  805. slot.template emplace<detail::run_cancel_handler<Executor>>(*self);
  806. }
  807. // Overwrite the token's cancellation slot: the composed operation
  808. // should use the signal's slot so we can generate cancellations in cancel()
  809. auto token_with_slot = asio::bind_cancellation_slot(
  810. self->run_signal_.slot(),
  811. std::forward<Handler>(handler));
  812. asio::async_compose<decltype(token_with_slot), void(system::error_code)>(
  813. detail::run_op<Executor>{self},
  814. token_with_slot,
  815. self->writer_cv_);
  816. }
  817. };
  818. friend class connection;
  819. std::unique_ptr<detail::connection_impl<Executor>> impl_;
  820. };
  821. /** @brief A basic_connection that type erases the executor.
  822. *
  823. * This connection type uses `asio::any_io_executor` and
  824. * `asio::any_completion_token` to reduce compilation times.
  825. *
  826. * For documentation of each member function see
  827. * @ref boost::redis::basic_connection.
  828. */
  829. class connection {
  830. public:
  831. /// Executor type.
  832. using executor_type = asio::any_io_executor;
  833. /** @brief Constructor from an executor.
  834. *
  835. * @param ex Executor used to create all internal I/O objects.
  836. * @param ctx SSL context.
  837. * @param lgr Logger configuration. It can be used to filter messages by level
  838. * and customize logging. By default, `logger::level::info` messages
  839. * and higher are logged to `stderr`.
  840. */
  841. explicit connection(
  842. executor_type ex,
  843. asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
  844. logger lgr = {});
  845. /** @brief Constructor from an executor and a logger.
  846. *
  847. * @param ex Executor used to create all internal I/O objects.
  848. * @param lgr Logger configuration. It can be used to filter messages by level
  849. * and customize logging. By default, `logger::level::info` messages
  850. * and higher are logged to `stderr`.
  851. *
  852. * An SSL context with default settings will be created.
  853. */
  854. connection(executor_type ex, logger lgr)
  855. : connection(
  856. std::move(ex),
  857. asio::ssl::context{asio::ssl::context::tlsv12_client},
  858. std::move(lgr))
  859. { }
  860. /**
  861. * @brief Constructor from an `io_context`.
  862. *
  863. * @param ioc I/O context used to create all internal I/O objects.
  864. * @param ctx SSL context.
  865. * @param lgr Logger configuration. It can be used to filter messages by level
  866. * and customize logging. By default, `logger::level::info` messages
  867. * and higher are logged to `stderr`.
  868. */
  869. explicit connection(
  870. asio::io_context& ioc,
  871. asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
  872. logger lgr = {})
  873. : connection(ioc.get_executor(), std::move(ctx), std::move(lgr))
  874. { }
  875. /**
  876. * @brief Constructor from an `io_context` and a logger.
  877. *
  878. * @param ioc I/O context used to create all internal I/O objects.
  879. * @param lgr Logger configuration. It can be used to filter messages by level
  880. * and customize logging. By default, `logger::level::info` messages
  881. * and higher are logged to `stderr`.
  882. */
  883. connection(asio::io_context& ioc, logger lgr)
  884. : connection(
  885. ioc.get_executor(),
  886. asio::ssl::context{asio::ssl::context::tlsv12_client},
  887. std::move(lgr))
  888. { }
  889. /// Returns the underlying executor.
  890. executor_type get_executor() noexcept { return impl_.get_executor(); }
  891. /**
  892. * @brief Calls @ref boost::redis::basic_connection::async_run.
  893. *
  894. * @param cfg Configuration parameters.
  895. * @param token Completion token.
  896. */
  897. template <class CompletionToken = asio::deferred_t>
  898. auto async_run(config const& cfg, CompletionToken&& token = {})
  899. {
  900. return asio::async_initiate<CompletionToken, void(boost::system::error_code)>(
  901. initiation{this},
  902. token,
  903. &cfg);
  904. }
  905. /**
  906. * @brief (Deprecated) Calls @ref boost::redis::basic_connection::async_run.
  907. *
  908. * This function accepts an extra logger parameter. The passed logger
  909. * will be used by the connection, overwriting any logger passed to the connection's
  910. * constructor.
  911. *
  912. * @par Deprecated
  913. * The logger should be passed to the connection's constructor instead of using this
  914. * function. Use the overload without a logger parameter, instead. This function is
  915. * deprecated and will be removed in subsequent releases.
  916. *
  917. * @param cfg Configuration parameters.
  918. * @param l Logger.
  919. * @param token Completion token.
  920. */
  921. template <class CompletionToken = asio::deferred_t>
  922. BOOST_DEPRECATED(
  923. "The async_run overload taking a logger argument is deprecated. "
  924. "Please pass the logger to the connection's constructor, instead, "
  925. "and use the other async_run overloads.")
  926. auto async_run(config const& cfg, logger l, CompletionToken&& token = {})
  927. {
  928. return asio::async_initiate<CompletionToken, void(boost::system::error_code)>(
  929. initiation{this},
  930. token,
  931. &cfg,
  932. std::move(l));
  933. }
  934. /// @copydoc basic_connection::async_receive
  935. template <class CompletionToken = asio::deferred_t>
  936. auto async_receive(CompletionToken&& token = {})
  937. {
  938. return impl_.async_receive(std::forward<CompletionToken>(token));
  939. }
  940. /// @copydoc basic_connection::receive
  941. std::size_t receive(system::error_code& ec) { return impl_.receive(ec); }
  942. /**
  943. * @brief Calls @ref boost::redis::basic_connection::async_exec.
  944. *
  945. * @param req The request to be executed.
  946. * @param resp The response object to parse data into.
  947. * @param token Completion token.
  948. */
  949. template <class Response = ignore_t, class CompletionToken = asio::deferred_t>
  950. auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {})
  951. {
  952. return async_exec(req, any_adapter{resp}, std::forward<CompletionToken>(token));
  953. }
  954. /**
  955. * @brief Calls @ref boost::redis::basic_connection::async_exec.
  956. *
  957. * @param req The request to be executed.
  958. * @param adapter An adapter object referencing a response to place data into.
  959. * @param token Completion token.
  960. */
  961. template <class CompletionToken = asio::deferred_t>
  962. auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token = {})
  963. {
  964. return asio::async_initiate<CompletionToken, void(boost::system::error_code, std::size_t)>(
  965. initiation{this},
  966. token,
  967. &req,
  968. std::move(adapter));
  969. }
  970. /// @copydoc basic_connection::cancel
  971. void cancel(operation op = operation::all);
  972. /// @copydoc basic_connection::will_reconnect
  973. bool will_reconnect() const noexcept { return impl_.will_reconnect(); }
  974. /// (Deprecated) Calls @ref boost::redis::basic_connection::next_layer.
  975. BOOST_DEPRECATED(
  976. "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
  977. "the other member functions to interact with the connection.")
  978. asio::ssl::stream<asio::ip::tcp::socket>& next_layer() noexcept
  979. {
  980. return impl_.impl_->stream_.next_layer();
  981. }
  982. /// (Deprecated) Calls @ref boost::redis::basic_connection::next_layer.
  983. BOOST_DEPRECATED(
  984. "Accessing the underlying stream is deprecated and will be removed in the next release. Use "
  985. "the other member functions to interact with the connection.")
  986. asio::ssl::stream<asio::ip::tcp::socket> const& next_layer() const noexcept
  987. {
  988. return impl_.impl_->stream_.next_layer();
  989. }
  990. /// @copydoc basic_connection::reset_stream
  991. BOOST_DEPRECATED(
  992. "This function is no longer necessary and is currently a no-op. connection resets the stream "
  993. "internally as required. This function will be removed in subsequent releases")
  994. void reset_stream() { }
  995. /// @copydoc basic_connection::set_receive_response
  996. template <class Response>
  997. void set_receive_response(Response& response)
  998. {
  999. impl_.set_receive_response(response);
  1000. }
  1001. /// @copydoc basic_connection::get_usage
  1002. usage get_usage() const noexcept { return impl_.get_usage(); }
  1003. /// @copydoc basic_connection::get_ssl_context
  1004. BOOST_DEPRECATED(
  1005. "ssl::context has no const methods, so this function should not be called. Set up any "
  1006. "required TLS configuration before passing the ssl::context to the connection's constructor.")
  1007. asio::ssl::context const& get_ssl_context() const noexcept
  1008. {
  1009. return impl_.impl_->stream_.get_ssl_context();
  1010. }
  1011. private:
  1012. // Function object to initiate the async ops that use asio::any_completion_handler.
  1013. // Required for asio::cancel_after to work.
  1014. // Since all ops have different arguments, a single struct with different overloads is enough.
  1015. struct initiation {
  1016. connection* self;
  1017. using executor_type = asio::any_io_executor;
  1018. executor_type get_executor() const noexcept { return self->get_executor(); }
  1019. template <class Handler>
  1020. void operator()(Handler&& handler, config const* cfg, logger l)
  1021. {
  1022. self->async_run_impl(*cfg, std::move(l), std::forward<Handler>(handler));
  1023. }
  1024. template <class Handler>
  1025. void operator()(Handler&& handler, config const* cfg)
  1026. {
  1027. self->async_run_impl(*cfg, std::forward<Handler>(handler));
  1028. }
  1029. template <class Handler>
  1030. void operator()(Handler&& handler, request const* req, any_adapter&& adapter)
  1031. {
  1032. self->async_exec_impl(*req, std::move(adapter), std::forward<Handler>(handler));
  1033. }
  1034. };
  1035. void async_run_impl(
  1036. config const& cfg,
  1037. logger&& l,
  1038. asio::any_completion_handler<void(boost::system::error_code)> token);
  1039. void async_run_impl(
  1040. config const& cfg,
  1041. asio::any_completion_handler<void(boost::system::error_code)> token);
  1042. void async_exec_impl(
  1043. request const& req,
  1044. any_adapter&& adapter,
  1045. asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token);
  1046. basic_connection<executor_type> impl_;
  1047. };
  1048. } // namespace boost::redis
  1049. #endif // BOOST_REDIS_CONNECTION_HPP