engine_impl.hpp 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. //
  2. // Copyright (c) 2019-2025 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
  8. #define BOOST_MYSQL_DETAIL_ENGINE_IMPL_HPP
  9. #include <boost/mysql/error_code.hpp>
  10. #include <boost/mysql/detail/any_resumable_ref.hpp>
  11. #include <boost/mysql/detail/engine.hpp>
  12. #include <boost/mysql/detail/next_action.hpp>
  13. #include <boost/mysql/impl/internal/coroutine.hpp>
  14. #include <boost/asio/any_io_executor.hpp>
  15. #include <boost/asio/buffer.hpp>
  16. #include <boost/asio/cancellation_type.hpp>
  17. #include <boost/asio/compose.hpp>
  18. #include <boost/asio/error.hpp>
  19. #include <boost/asio/immediate.hpp>
  20. #include <boost/asio/post.hpp>
  21. #include <boost/assert.hpp>
  22. #include <cstddef>
  23. #include <utility>
  24. namespace boost {
  25. namespace mysql {
  26. namespace detail {
  27. inline asio::mutable_buffer to_buffer(span<std::uint8_t> buff) noexcept
  28. {
  29. return asio::mutable_buffer(buff.data(), buff.size());
  30. }
  31. inline bool has_terminal_cancellation(asio::cancellation_type_t cancel_type)
  32. {
  33. return static_cast<bool>(cancel_type & asio::cancellation_type_t::terminal);
  34. }
  35. template <class EngineStream>
  36. struct run_algo_op
  37. {
  38. int resume_point_{0};
  39. EngineStream& stream_;
  40. any_resumable_ref resumable_;
  41. bool has_done_io_{false};
  42. error_code stored_ec_;
  43. run_algo_op(EngineStream& stream, any_resumable_ref algo) noexcept : stream_(stream), resumable_(algo) {}
  44. template <class Self>
  45. void operator()(Self& self, error_code io_ec = {}, std::size_t bytes_transferred = 0)
  46. {
  47. next_action act;
  48. switch (resume_point_)
  49. {
  50. case 0:
  51. while (true)
  52. {
  53. // If we were cancelled, but the last operation completed successfully,
  54. // set a cancelled error code so the algorithm exits. This might happen
  55. // if a cancellation signal is emitted after an intermediate operation succeeded
  56. // but before the handler was called.
  57. if (!io_ec && has_terminal_cancellation(self.cancelled()))
  58. io_ec = asio::error::operation_aborted;
  59. // Run the op
  60. act = resumable_.resume(io_ec, bytes_transferred);
  61. if (act.is_done())
  62. {
  63. stored_ec_ = act.error();
  64. if (!has_done_io_)
  65. {
  66. BOOST_MYSQL_YIELD(
  67. resume_point_,
  68. 1,
  69. asio::async_immediate(stream_.get_executor(), std::move(self))
  70. )
  71. }
  72. self.complete(stored_ec_);
  73. return;
  74. }
  75. else if (act.type() == next_action_type::read)
  76. {
  77. BOOST_MYSQL_YIELD(
  78. resume_point_,
  79. 2,
  80. stream_.async_read_some(
  81. to_buffer(act.read_args().buffer),
  82. act.read_args().use_ssl,
  83. std::move(self)
  84. )
  85. )
  86. has_done_io_ = true;
  87. }
  88. else if (act.type() == next_action_type::write)
  89. {
  90. BOOST_MYSQL_YIELD(
  91. resume_point_,
  92. 3,
  93. stream_.async_write_some(
  94. asio::buffer(act.write_args().buffer),
  95. act.write_args().use_ssl,
  96. std::move(self)
  97. )
  98. )
  99. has_done_io_ = true;
  100. }
  101. else if (act.type() == next_action_type::ssl_handshake)
  102. {
  103. BOOST_MYSQL_YIELD(resume_point_, 4, stream_.async_ssl_handshake(std::move(self)))
  104. has_done_io_ = true;
  105. }
  106. else if (act.type() == next_action_type::ssl_shutdown)
  107. {
  108. BOOST_MYSQL_YIELD(resume_point_, 5, stream_.async_ssl_shutdown(std::move(self)))
  109. has_done_io_ = true;
  110. }
  111. else if (act.type() == next_action_type::connect)
  112. {
  113. BOOST_MYSQL_YIELD(
  114. resume_point_,
  115. 6,
  116. stream_.async_connect(act.connect_endpoint(), std::move(self))
  117. )
  118. has_done_io_ = true;
  119. }
  120. else
  121. {
  122. BOOST_ASSERT(act.type() == next_action_type::close);
  123. stream_.close(io_ec);
  124. }
  125. }
  126. }
  127. }
  128. };
  129. // EngineStream is an "extended" stream concept, with the following operations:
  130. // using executor_type = asio::any_io_executor;
  131. // executor_type get_executor();
  132. // bool supports_ssl() const;
  133. // std::size_t read_some(asio::mutable_buffer, bool use_ssl, error_code&);
  134. // void async_read_some(asio::mutable_buffer, bool use_ssl, CompletinToken&&);
  135. // std::size_t write_some(asio::const_buffer, bool use_ssl, error_code&);
  136. // void async_write_some(asio::const_buffer, bool use_ssl, CompletinToken&&);
  137. // void ssl_handshake(error_code&);
  138. // void async_ssl_handshake(CompletionToken&&);
  139. // void ssl_shutdown(error_code&);
  140. // void async_ssl_shutdown(CompletionToken&&);
  141. // void connect(const void* server_address, error_code&);
  142. // void async_connect(const void* server_address, CompletionToken&&);
  143. // void close(error_code&);
  144. // Async operations are only required to support callback types
  145. // See stream_adaptor for an implementation
  146. template <class EngineStream>
  147. class engine_impl final : public engine
  148. {
  149. EngineStream stream_;
  150. public:
  151. template <class... Args>
  152. engine_impl(Args&&... args) : stream_(std::forward<Args>(args)...)
  153. {
  154. }
  155. EngineStream& stream() { return stream_; }
  156. const EngineStream& stream() const { return stream_; }
  157. using executor_type = asio::any_io_executor;
  158. executor_type get_executor() override final { return stream_.get_executor(); }
  159. bool supports_ssl() const override final { return stream_.supports_ssl(); }
  160. void run(any_resumable_ref resumable, error_code& ec) override final
  161. {
  162. ec.clear();
  163. error_code io_ec;
  164. std::size_t bytes_transferred = 0;
  165. while (true)
  166. {
  167. // Run the op
  168. auto act = resumable.resume(io_ec, bytes_transferred);
  169. // Apply the next action
  170. bytes_transferred = 0;
  171. if (act.is_done())
  172. {
  173. ec = act.error();
  174. return;
  175. }
  176. else if (act.type() == next_action_type::read)
  177. {
  178. bytes_transferred = stream_.read_some(
  179. to_buffer(act.read_args().buffer),
  180. act.read_args().use_ssl,
  181. io_ec
  182. );
  183. }
  184. else if (act.type() == next_action_type::write)
  185. {
  186. bytes_transferred = stream_.write_some(
  187. asio::buffer(act.write_args().buffer),
  188. act.write_args().use_ssl,
  189. io_ec
  190. );
  191. }
  192. else if (act.type() == next_action_type::ssl_handshake)
  193. {
  194. stream_.ssl_handshake(io_ec);
  195. }
  196. else if (act.type() == next_action_type::ssl_shutdown)
  197. {
  198. stream_.ssl_shutdown(io_ec);
  199. }
  200. else if (act.type() == next_action_type::connect)
  201. {
  202. stream_.connect(act.connect_endpoint(), io_ec);
  203. }
  204. else
  205. {
  206. BOOST_ASSERT(act.type() == next_action_type::close);
  207. stream_.close(io_ec);
  208. }
  209. }
  210. }
  211. void async_run(any_resumable_ref resumable, asio::any_completion_handler<void(error_code)> h)
  212. override final
  213. {
  214. return asio::async_compose<asio::any_completion_handler<void(error_code)>, void(error_code)>(
  215. run_algo_op<EngineStream>(stream_, resumable),
  216. h,
  217. stream_
  218. );
  219. }
  220. };
  221. } // namespace detail
  222. } // namespace mysql
  223. } // namespace boost
  224. #endif