run_pipeline.hpp 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. //
  2. // Copyright (c) 2019-2025 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
  8. #define BOOST_MYSQL_IMPL_INTERNAL_SANSIO_RUN_PIPELINE_HPP
  9. #include <boost/mysql/character_set.hpp>
  10. #include <boost/mysql/diagnostics.hpp>
  11. #include <boost/mysql/error_code.hpp>
  12. #include <boost/mysql/is_fatal_error.hpp>
  13. #include <boost/mysql/pipeline.hpp>
  14. #include <boost/mysql/detail/access.hpp>
  15. #include <boost/mysql/detail/algo_params.hpp>
  16. #include <boost/mysql/detail/next_action.hpp>
  17. #include <boost/mysql/detail/pipeline.hpp>
  18. #include <boost/mysql/impl/internal/sansio/connection_state_data.hpp>
  19. #include <boost/mysql/impl/internal/sansio/execute.hpp>
  20. #include <boost/mysql/impl/internal/sansio/ping.hpp>
  21. #include <boost/mysql/impl/internal/sansio/prepare_statement.hpp>
  22. #include <boost/mysql/impl/internal/sansio/reset_connection.hpp>
  23. #include <boost/mysql/impl/internal/sansio/set_character_set.hpp>
  24. #include <boost/assert.hpp>
  25. #include <boost/core/span.hpp>
  26. #include <cstddef>
  27. #include <vector>
  28. namespace boost {
  29. namespace mysql {
  30. namespace detail {
  31. class run_pipeline_algo
  32. {
  33. union any_read_algo
  34. {
  35. std::nullptr_t nothing;
  36. read_execute_response_algo execute;
  37. read_prepare_statement_response_algo prepare_statement;
  38. read_reset_connection_response_algo reset_connection;
  39. read_ping_response_algo ping;
  40. read_set_character_set_response_algo set_character_set;
  41. any_read_algo() noexcept : nothing{} {}
  42. };
  43. span<const std::uint8_t> request_buffer_;
  44. span<const pipeline_request_stage> stages_;
  45. std::vector<stage_response>* response_;
  46. int resume_point_{0};
  47. std::size_t current_stage_index_{0};
  48. error_code pipeline_ec_; // Result of the entire operation
  49. bool has_hatal_error_{}; // If true, fail further stages with pipeline_ec_
  50. any_read_algo read_response_algo_;
  51. diagnostics temp_diag_;
  52. void setup_response()
  53. {
  54. if (response_)
  55. {
  56. // Create as many response items as request stages
  57. response_->resize(stages_.size());
  58. // Setup them
  59. for (std::size_t i = 0u; i < stages_.size(); ++i)
  60. {
  61. // Execution stages need to be initialized to results objects.
  62. // Otherwise, clear any previous content
  63. auto& impl = access::get_impl((*response_)[i]);
  64. if (stages_[i].kind == pipeline_stage_kind::execute)
  65. impl.emplace_results();
  66. else
  67. impl.emplace_error();
  68. }
  69. }
  70. }
  71. void setup_current_stage(const connection_state_data& st)
  72. {
  73. // Reset previous data
  74. temp_diag_.clear();
  75. // Setup read algo
  76. auto stage = stages_[current_stage_index_];
  77. switch (stage.kind)
  78. {
  79. case pipeline_stage_kind::execute:
  80. {
  81. BOOST_ASSERT(response_ != nullptr); // we don't support execution ignoring the response
  82. auto& processor = access::get_impl((*response_)[current_stage_index_]).get_processor();
  83. processor.reset(stage.stage_specific.enc, st.meta_mode);
  84. processor.sequence_number() = stage.seqnum;
  85. read_response_algo_.execute = {&processor};
  86. break;
  87. }
  88. case pipeline_stage_kind::prepare_statement:
  89. read_response_algo_.prepare_statement = {stage.seqnum};
  90. break;
  91. case pipeline_stage_kind::close_statement:
  92. // Close statement doesn't have a response
  93. read_response_algo_.nothing = nullptr;
  94. break;
  95. case pipeline_stage_kind::set_character_set:
  96. read_response_algo_.set_character_set = {stage.stage_specific.charset, stage.seqnum};
  97. break;
  98. case pipeline_stage_kind::reset_connection:
  99. read_response_algo_.reset_connection = {stage.seqnum};
  100. break;
  101. case pipeline_stage_kind::ping: read_response_algo_.ping = {stage.seqnum}; break;
  102. default: BOOST_ASSERT(false); // LCOV_EXCL_LINE
  103. }
  104. }
  105. void set_stage_error(error_code ec, diagnostics&& diag)
  106. {
  107. if (response_)
  108. {
  109. access::get_impl((*response_)[current_stage_index_]).set_error(ec, std::move(diag));
  110. }
  111. }
  112. void on_stage_finished(const connection_state_data& st, diagnostics& output_diag, error_code stage_ec)
  113. {
  114. if (stage_ec)
  115. {
  116. // If the error was fatal, fail successive stages.
  117. // This error is the result of the operation
  118. if (is_fatal_error(stage_ec))
  119. {
  120. pipeline_ec_ = stage_ec;
  121. output_diag = temp_diag_;
  122. has_hatal_error_ = true;
  123. }
  124. else if (!pipeline_ec_)
  125. {
  126. // In the absence of fatal errors, the first error we encounter is the result of the operation
  127. pipeline_ec_ = stage_ec;
  128. output_diag = temp_diag_;
  129. }
  130. // Propagate the error
  131. if (response_ != nullptr)
  132. {
  133. set_stage_error(stage_ec, std::move(temp_diag_));
  134. }
  135. }
  136. else
  137. {
  138. if (stages_[current_stage_index_].kind == pipeline_stage_kind::prepare_statement)
  139. {
  140. // Propagate results. We don't support prepare statements ignoring the response
  141. BOOST_ASSERT(response_ != nullptr);
  142. access::get_impl((*response_)[current_stage_index_])
  143. .set_result(read_response_algo_.prepare_statement.result(st));
  144. }
  145. }
  146. }
  147. next_action resume_read_algo(connection_state_data& st, error_code ec)
  148. {
  149. switch (stages_[current_stage_index_].kind)
  150. {
  151. case pipeline_stage_kind::execute: return read_response_algo_.execute.resume(st, temp_diag_, ec);
  152. case pipeline_stage_kind::prepare_statement:
  153. return read_response_algo_.prepare_statement.resume(st, temp_diag_, ec);
  154. case pipeline_stage_kind::reset_connection:
  155. return read_response_algo_.reset_connection.resume(st, temp_diag_, ec);
  156. case pipeline_stage_kind::set_character_set:
  157. return read_response_algo_.set_character_set.resume(st, temp_diag_, ec);
  158. case pipeline_stage_kind::ping: return read_response_algo_.ping.resume(st, temp_diag_, ec);
  159. case pipeline_stage_kind::close_statement: return next_action(); // has no response
  160. default: BOOST_ASSERT(false); return next_action(); // LCOV_EXCL_LINE
  161. }
  162. }
  163. public:
  164. run_pipeline_algo(run_pipeline_algo_params params) noexcept
  165. : request_buffer_(params.request_buffer), stages_(params.request_stages), response_(params.response)
  166. {
  167. }
  168. next_action resume(connection_state_data& st, diagnostics& diag, error_code ec)
  169. {
  170. next_action act;
  171. switch (resume_point_)
  172. {
  173. case 0:
  174. // Clear previous state
  175. setup_response();
  176. // If the request is empty, don't do anything
  177. if (stages_.empty())
  178. break;
  179. // Check status
  180. ec = st.check_status_ready();
  181. if (ec)
  182. return ec;
  183. // Write the request. use_ssl is attached by top_level_algo
  184. BOOST_MYSQL_YIELD(resume_point_, 1, next_action::write({request_buffer_, false}))
  185. // If writing the request failed, fail all the stages with the given error code
  186. if (ec)
  187. {
  188. pipeline_ec_ = ec;
  189. has_hatal_error_ = true;
  190. }
  191. // For each stage
  192. for (; current_stage_index_ < stages_.size(); ++current_stage_index_)
  193. {
  194. // If there was a fatal error, just set the error and move forward
  195. if (has_hatal_error_)
  196. {
  197. set_stage_error(pipeline_ec_, diagnostics(diag));
  198. continue;
  199. }
  200. // Setup the stage
  201. setup_current_stage(st);
  202. // Run it until completion
  203. ec.clear();
  204. while (!(act = resume_read_algo(st, ec)).is_done())
  205. BOOST_MYSQL_YIELD(resume_point_, 2, act)
  206. // Process the stage's result
  207. on_stage_finished(st, diag, act.error());
  208. }
  209. }
  210. return pipeline_ec_;
  211. }
  212. };
  213. } // namespace detail
  214. } // namespace mysql
  215. } // namespace boost
  216. #endif