stream.hpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_TEST_IMPL_STREAM_HPP
  10. #define BOOST_BEAST_TEST_IMPL_STREAM_HPP
  11. #include <boost/beast/core/buffer_traits.hpp>
  12. #include <boost/beast/core/detail/service_base.hpp>
  13. #include <boost/beast/core/detail/is_invocable.hpp>
  14. #include <boost/asio/any_io_executor.hpp>
  15. #include <boost/asio/append.hpp>
  16. #include <boost/asio/associated_cancellation_slot.hpp>
  17. #include <boost/asio/dispatch.hpp>
  18. #include <boost/asio/executor_work_guard.hpp>
  19. #include <boost/asio/post.hpp>
  20. #include <mutex>
  21. #include <stdexcept>
  22. #include <vector>
  23. namespace boost {
  24. namespace beast {
  25. namespace test {
  26. namespace detail
  27. {
  28. template<class To>
  29. struct extract_executor_op
  30. {
  31. To operator()(net::any_io_executor& ex) const
  32. {
  33. assert(ex.template target<To>());
  34. return *ex.template target<To>();
  35. }
  36. };
  37. template<>
  38. struct extract_executor_op<net::any_io_executor>
  39. {
  40. net::any_io_executor operator()(net::any_io_executor& ex) const
  41. {
  42. return ex;
  43. }
  44. };
  45. } // detail
  46. template<class Executor>
  47. template<class Handler, class Buffers>
  48. class basic_stream<Executor>::read_op : public detail::stream_read_op_base
  49. {
  50. struct lambda
  51. {
  52. Handler h_;
  53. boost::weak_ptr<detail::stream_state> wp_;
  54. Buffers b_;
  55. net::executor_work_guard<
  56. net::associated_executor_t<Handler, net::any_io_executor>> wg2_;
  57. lambda(lambda&&) = default;
  58. lambda(lambda const&) = default;
  59. template<class Handler_>
  60. lambda(
  61. Handler_&& h,
  62. boost::shared_ptr<detail::stream_state> const& s,
  63. Buffers const& b)
  64. : h_(std::forward<Handler_>(h))
  65. , wp_(s)
  66. , b_(b)
  67. , wg2_(net::get_associated_executor(h_, s->exec))
  68. {
  69. }
  70. using allocator_type = net::associated_allocator_t<Handler>;
  71. allocator_type get_allocator() const noexcept
  72. {
  73. return net::get_associated_allocator(h_);
  74. }
  75. using cancellation_slot_type =
  76. net::associated_cancellation_slot_t<Handler>;
  77. cancellation_slot_type
  78. get_cancellation_slot() const noexcept
  79. {
  80. return net::get_associated_cancellation_slot(h_,
  81. net::cancellation_slot());
  82. }
  83. void
  84. operator()(error_code ec)
  85. {
  86. std::size_t bytes_transferred = 0;
  87. auto sp = wp_.lock();
  88. if(! sp)
  89. {
  90. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  91. }
  92. if(! ec)
  93. {
  94. std::lock_guard<std::mutex> lock(sp->m);
  95. BOOST_ASSERT(! sp->op);
  96. if(sp->b.size() > 0)
  97. {
  98. bytes_transferred =
  99. net::buffer_copy(
  100. b_, sp->b.data(), sp->read_max);
  101. sp->b.consume(bytes_transferred);
  102. sp->nread_bytes += bytes_transferred;
  103. }
  104. else if (buffer_bytes(b_) > 0)
  105. {
  106. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  107. }
  108. }
  109. net::dispatch(wg2_.get_executor(),
  110. net::append(std::move(h_), ec, bytes_transferred));
  111. wg2_.reset();
  112. }
  113. };
  114. lambda fn_;
  115. net::executor_work_guard<net::any_io_executor> wg1_;
  116. public:
  117. template<class Handler_>
  118. read_op(
  119. Handler_&& h,
  120. boost::shared_ptr<detail::stream_state> const& s,
  121. Buffers const& b)
  122. : fn_(std::forward<Handler_>(h), s, b)
  123. , wg1_(s->exec)
  124. {
  125. }
  126. void
  127. operator()(error_code ec) override
  128. {
  129. net::post(wg1_.get_executor(), net::append(std::move(fn_), ec));
  130. wg1_.reset();
  131. }
  132. };
  133. template<class Executor>
  134. struct basic_stream<Executor>::run_read_op
  135. {
  136. boost::shared_ptr<detail::stream_state> const& in;
  137. using executor_type = typename basic_stream::executor_type;
  138. executor_type
  139. get_executor() const noexcept
  140. {
  141. return detail::extract_executor_op<Executor>()(in->exec);
  142. }
  143. template<
  144. class ReadHandler,
  145. class MutableBufferSequence>
  146. void
  147. operator()(
  148. ReadHandler&& h,
  149. MutableBufferSequence const& buffers)
  150. {
  151. // If you get an error on the following line it means
  152. // that your handler does not meet the documented type
  153. // requirements for the handler.
  154. static_assert(
  155. beast::detail::is_invocable<ReadHandler,
  156. void(error_code, std::size_t)>::value,
  157. "ReadHandler type requirements not met");
  158. initiate_read(
  159. in,
  160. std::unique_ptr<detail::stream_read_op_base>{
  161. new read_op<
  162. typename std::decay<ReadHandler>::type,
  163. MutableBufferSequence>(
  164. std::move(h),
  165. in,
  166. buffers)},
  167. buffer_bytes(buffers));
  168. }
  169. };
  170. template<class Executor>
  171. struct basic_stream<Executor>::run_write_op
  172. {
  173. boost::shared_ptr<detail::stream_state> const& in_;
  174. using executor_type = typename basic_stream::executor_type;
  175. executor_type
  176. get_executor() const noexcept
  177. {
  178. return detail::extract_executor_op<Executor>()(in_->exec);
  179. }
  180. template<
  181. class WriteHandler,
  182. class ConstBufferSequence>
  183. void
  184. operator()(
  185. WriteHandler&& h,
  186. boost::weak_ptr<detail::stream_state> out_,
  187. ConstBufferSequence const& buffers)
  188. {
  189. // If you get an error on the following line it means
  190. // that your handler does not meet the documented type
  191. // requirements for the handler.
  192. static_assert(
  193. beast::detail::is_invocable<WriteHandler,
  194. void(error_code, std::size_t)>::value,
  195. "WriteHandler type requirements not met");
  196. ++in_->nwrite;
  197. auto const upcall = [&](error_code ec, std::size_t n)
  198. {
  199. net::post(in_->exec, net::append(std::move(h), ec, n));
  200. };
  201. // test failure
  202. error_code ec;
  203. std::size_t n = 0;
  204. if(in_->fc && in_->fc->fail(ec))
  205. return upcall(ec, n);
  206. // A request to write 0 bytes to a stream is a no-op.
  207. if(buffer_bytes(buffers) == 0)
  208. return upcall(ec, n);
  209. // connection closed
  210. auto out = out_.lock();
  211. if(! out)
  212. return upcall(net::error::connection_reset, n);
  213. // copy buffers
  214. n = std::min<std::size_t>(
  215. buffer_bytes(buffers), in_->write_max);
  216. {
  217. std::lock_guard<std::mutex> lock(out->m);
  218. n = net::buffer_copy(out->b.prepare(n), buffers);
  219. out->b.commit(n);
  220. out->nwrite_bytes += n;
  221. out->notify_read();
  222. }
  223. BOOST_ASSERT(! ec);
  224. upcall(ec, n);
  225. }
  226. };
  227. //------------------------------------------------------------------------------
  228. template<class Executor>
  229. template<class MutableBufferSequence>
  230. std::size_t
  231. basic_stream<Executor>::
  232. read_some(MutableBufferSequence const& buffers)
  233. {
  234. static_assert(net::is_mutable_buffer_sequence<
  235. MutableBufferSequence>::value,
  236. "MutableBufferSequence type requirements not met");
  237. error_code ec;
  238. auto const n = read_some(buffers, ec);
  239. if(ec)
  240. BOOST_THROW_EXCEPTION(system_error{ec});
  241. return n;
  242. }
  243. template<class Executor>
  244. template<class MutableBufferSequence>
  245. std::size_t
  246. basic_stream<Executor>::
  247. read_some(MutableBufferSequence const& buffers,
  248. error_code& ec)
  249. {
  250. static_assert(net::is_mutable_buffer_sequence<
  251. MutableBufferSequence>::value,
  252. "MutableBufferSequence type requirements not met");
  253. ++in_->nread;
  254. // test failure
  255. if(in_->fc && in_->fc->fail(ec))
  256. return 0;
  257. // A request to read 0 bytes from a stream is a no-op.
  258. if(buffer_bytes(buffers) == 0)
  259. {
  260. ec = {};
  261. return 0;
  262. }
  263. std::unique_lock<std::mutex> lock{in_->m};
  264. BOOST_ASSERT(! in_->op);
  265. in_->cv.wait(lock,
  266. [&]()
  267. {
  268. return
  269. in_->b.size() > 0 ||
  270. in_->code != detail::stream_status::ok;
  271. });
  272. // deliver bytes before eof
  273. if(in_->b.size() > 0)
  274. {
  275. auto const n = net::buffer_copy(
  276. buffers, in_->b.data(), in_->read_max);
  277. in_->b.consume(n);
  278. in_->nread_bytes += n;
  279. return n;
  280. }
  281. // deliver error
  282. BOOST_ASSERT(in_->code != detail::stream_status::ok);
  283. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  284. return 0;
  285. }
  286. template<class Executor>
  287. template<class MutableBufferSequence,
  288. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) ReadHandler>
  289. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(ReadHandler, void(error_code, std::size_t))
  290. basic_stream<Executor>::
  291. async_read_some(
  292. MutableBufferSequence const& buffers,
  293. ReadHandler&& handler)
  294. {
  295. static_assert(net::is_mutable_buffer_sequence<
  296. MutableBufferSequence>::value,
  297. "MutableBufferSequence type requirements not met");
  298. return net::async_initiate<
  299. ReadHandler,
  300. void(error_code, std::size_t)>(
  301. run_read_op{in_},
  302. handler,
  303. buffers);
  304. }
  305. template<class Executor>
  306. template<class ConstBufferSequence>
  307. std::size_t
  308. basic_stream<Executor>::
  309. write_some(ConstBufferSequence const& buffers)
  310. {
  311. static_assert(net::is_const_buffer_sequence<
  312. ConstBufferSequence>::value,
  313. "ConstBufferSequence type requirements not met");
  314. error_code ec;
  315. auto const bytes_transferred =
  316. write_some(buffers, ec);
  317. if(ec)
  318. BOOST_THROW_EXCEPTION(system_error{ec});
  319. return bytes_transferred;
  320. }
  321. template<class Executor>
  322. template<class ConstBufferSequence>
  323. std::size_t
  324. basic_stream<Executor>::
  325. write_some(
  326. ConstBufferSequence const& buffers, error_code& ec)
  327. {
  328. static_assert(net::is_const_buffer_sequence<
  329. ConstBufferSequence>::value,
  330. "ConstBufferSequence type requirements not met");
  331. ++in_->nwrite;
  332. // test failure
  333. if(in_->fc && in_->fc->fail(ec))
  334. return 0;
  335. // A request to write 0 bytes to a stream is a no-op.
  336. if(buffer_bytes(buffers) == 0)
  337. {
  338. ec = {};
  339. return 0;
  340. }
  341. // connection closed
  342. auto out = out_.lock();
  343. if(! out)
  344. {
  345. BOOST_BEAST_ASSIGN_EC(ec, net::error::connection_reset);
  346. return 0;
  347. }
  348. // copy buffers
  349. auto n = std::min<std::size_t>(
  350. buffer_bytes(buffers), in_->write_max);
  351. {
  352. std::lock_guard<std::mutex> lock(out->m);
  353. n = net::buffer_copy(out->b.prepare(n), buffers);
  354. out->b.commit(n);
  355. out->nwrite_bytes += n;
  356. out->notify_read();
  357. }
  358. return n;
  359. }
  360. template<class Executor>
  361. template<class ConstBufferSequence,
  362. BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) WriteHandler>
  363. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(WriteHandler, void(error_code, std::size_t))
  364. basic_stream<Executor>::
  365. async_write_some(
  366. ConstBufferSequence const& buffers,
  367. WriteHandler&& handler)
  368. {
  369. static_assert(net::is_const_buffer_sequence<
  370. ConstBufferSequence>::value,
  371. "ConstBufferSequence type requirements not met");
  372. return net::async_initiate<
  373. WriteHandler,
  374. void(error_code, std::size_t)>(
  375. run_write_op{in_},
  376. handler,
  377. out_,
  378. buffers);
  379. }
  380. //------------------------------------------------------------------------------
  381. template<class Executor, class TeardownHandler>
  382. void
  383. async_teardown(
  384. role_type,
  385. basic_stream<Executor>& s,
  386. TeardownHandler&& handler)
  387. {
  388. error_code ec;
  389. if( s.in_->fc &&
  390. s.in_->fc->fail(ec))
  391. return net::post(
  392. s.get_executor(),
  393. net::append(std::move(handler), ec));
  394. s.close();
  395. if( s.in_->fc &&
  396. s.in_->fc->fail(ec))
  397. {
  398. BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
  399. }
  400. else
  401. ec = {};
  402. net::post(s.get_executor(), net::append(std::move(handler), ec));
  403. }
  404. //------------------------------------------------------------------------------
  405. template<class Executor, class Arg1, class... ArgN>
  406. basic_stream<Executor>
  407. connect(stream& to, Arg1&& arg1, ArgN&&... argn)
  408. {
  409. stream from{
  410. std::forward<Arg1>(arg1),
  411. std::forward<ArgN>(argn)...};
  412. from.connect(to);
  413. return from;
  414. }
  415. template<class Executor>
  416. auto basic_stream<Executor>::get_executor() noexcept -> executor_type
  417. {
  418. return detail::extract_executor_op<Executor>()(in_->exec);
  419. }
  420. } // test
  421. } // beast
  422. } // boost
  423. #endif