stream.hpp 11 KB


  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_TEST_IMPL_STREAM_HPP
  10. #define BOOST_BEAST_TEST_IMPL_STREAM_HPP
  11. #include <boost/beast/core/bind_handler.hpp>
  12. #include <boost/beast/core/buffer_traits.hpp>
  13. #include <boost/beast/core/buffers_prefix.hpp>
  14. #include <boost/beast/core/detail/service_base.hpp>
  15. #include <boost/beast/core/detail/type_traits.hpp>
  16. #include <mutex>
  17. #include <stdexcept>
  18. #include <vector>
  19. namespace boost {
  20. namespace beast {
  21. namespace test {
  22. //------------------------------------------------------------------------------
  23. struct stream::service_impl
  24. {
  25. std::mutex m_;
  26. std::vector<state*> v_;
  27. BOOST_BEAST_DECL
  28. void
  29. remove(state& impl);
  30. };
  31. class stream::service
  32. : public beast::detail::service_base<service>
  33. {
  34. boost::shared_ptr<service_impl> sp_;
  35. BOOST_BEAST_DECL
  36. void
  37. shutdown() override;
  38. public:
  39. BOOST_BEAST_DECL
  40. explicit
  41. service(net::execution_context& ctx);
  42. BOOST_BEAST_DECL
  43. static
  44. auto
  45. make_impl(
  46. net::io_context& ctx,
  47. test::fail_count* fc) ->
  48. boost::shared_ptr<state>;
  49. };
  50. //------------------------------------------------------------------------------
  51. template<class Handler, class Buffers>
  52. class stream::read_op : public stream::read_op_base
  53. {
  54. using ex1_type =
  55. net::io_context::executor_type;
  56. using ex2_type
  57. = net::associated_executor_t<Handler, ex1_type>;
  58. struct lambda
  59. {
  60. Handler h_;
  61. boost::weak_ptr<state> wp_;
  62. Buffers b_;
  63. net::executor_work_guard<ex2_type> wg2_;
  64. lambda(lambda&&) = default;
  65. lambda(lambda const&) = default;
  66. template<class Handler_>
  67. lambda(
  68. Handler_&& h,
  69. boost::shared_ptr<state> const& s,
  70. Buffers const& b)
  71. : h_(std::forward<Handler_>(h))
  72. , wp_(s)
  73. , b_(b)
  74. , wg2_(net::get_associated_executor(
  75. h_, s->ioc.get_executor()))
  76. {
  77. }
  78. void
  79. operator()(error_code ec)
  80. {
  81. std::size_t bytes_transferred = 0;
  82. auto sp = wp_.lock();
  83. if(! sp)
  84. ec = net::error::operation_aborted;
  85. if(! ec)
  86. {
  87. std::lock_guard<std::mutex> lock(sp->m);
  88. BOOST_ASSERT(! sp->op);
  89. if(sp->b.size() > 0)
  90. {
  91. bytes_transferred =
  92. net::buffer_copy(
  93. b_, sp->b.data(), sp->read_max);
  94. sp->b.consume(bytes_transferred);
  95. }
  96. else if (buffer_bytes(b_) > 0)
  97. {
  98. ec = net::error::eof;
  99. }
  100. }
  101. auto alloc = net::get_associated_allocator(h_);
  102. wg2_.get_executor().dispatch(
  103. beast::bind_front_handler(std::move(h_),
  104. ec, bytes_transferred), alloc);
  105. wg2_.reset();
  106. }
  107. };
  108. lambda fn_;
  109. net::executor_work_guard<ex1_type> wg1_;
  110. public:
  111. template<class Handler_>
  112. read_op(
  113. Handler_&& h,
  114. boost::shared_ptr<state> const& s,
  115. Buffers const& b)
  116. : fn_(std::forward<Handler_>(h), s, b)
  117. , wg1_(s->ioc.get_executor())
  118. {
  119. }
  120. void
  121. operator()(error_code ec) override
  122. {
  123. auto alloc = net::get_associated_allocator(fn_.h_);
  124. wg1_.get_executor().post(
  125. beast::bind_front_handler(std::move(fn_), ec), alloc);
  126. wg1_.reset();
  127. }
  128. };
  129. struct stream::run_read_op
  130. {
  131. template<
  132. class ReadHandler,
  133. class MutableBufferSequence>
  134. void
  135. operator()(
  136. ReadHandler&& h,
  137. boost::shared_ptr<state> const& in,
  138. MutableBufferSequence const& buffers)
  139. {
  140. // If you get an error on the following line it means
  141. // that your handler does not meet the documented type
  142. // requirements for the handler.
  143. static_assert(
  144. beast::detail::is_invocable<ReadHandler,
  145. void(error_code, std::size_t)>::value,
  146. "ReadHandler type requirements not met");
  147. initiate_read(
  148. in,
  149. std::unique_ptr<read_op_base>{
  150. new read_op<
  151. typename std::decay<ReadHandler>::type,
  152. MutableBufferSequence>(
  153. std::move(h),
  154. in,
  155. buffers)},
  156. buffer_bytes(buffers));
  157. }
  158. };
  159. struct stream::run_write_op
  160. {
  161. template<
  162. class WriteHandler,
  163. class ConstBufferSequence>
  164. void
  165. operator()(
  166. WriteHandler&& h,
  167. boost::shared_ptr<state> in_,
  168. boost::weak_ptr<state> out_,
  169. ConstBufferSequence const& buffers)
  170. {
  171. // If you get an error on the following line it means
  172. // that your handler does not meet the documented type
  173. // requirements for the handler.
  174. static_assert(
  175. beast::detail::is_invocable<WriteHandler,
  176. void(error_code, std::size_t)>::value,
  177. "WriteHandler type requirements not met");
  178. ++in_->nwrite;
  179. auto const upcall = [&](error_code ec, std::size_t n)
  180. {
  181. net::post(
  182. in_->ioc.get_executor(),
  183. beast::bind_front_handler(std::move(h), ec, n));
  184. };
  185. // test failure
  186. error_code ec;
  187. std::size_t n = 0;
  188. if(in_->fc && in_->fc->fail(ec))
  189. return upcall(ec, n);
  190. // A request to write 0 bytes to a stream is a no-op.
  191. if(buffer_bytes(buffers) == 0)
  192. return upcall(ec, n);
  193. // connection closed
  194. auto out = out_.lock();
  195. if(! out)
  196. return upcall(net::error::connection_reset, n);
  197. // copy buffers
  198. n = std::min<std::size_t>(
  199. buffer_bytes(buffers), in_->write_max);
  200. {
  201. std::lock_guard<std::mutex> lock(out->m);
  202. n = net::buffer_copy(out->b.prepare(n), buffers);
  203. out->b.commit(n);
  204. out->notify_read();
  205. }
  206. BOOST_ASSERT(! ec);
  207. upcall(ec, n);
  208. }
  209. };
  210. //------------------------------------------------------------------------------
  211. template<class MutableBufferSequence>
  212. std::size_t
  213. stream::
  214. read_some(MutableBufferSequence const& buffers)
  215. {
  216. static_assert(net::is_mutable_buffer_sequence<
  217. MutableBufferSequence>::value,
  218. "MutableBufferSequence type requirements not met");
  219. error_code ec;
  220. auto const n = read_some(buffers, ec);
  221. if(ec)
  222. BOOST_THROW_EXCEPTION(system_error{ec});
  223. return n;
  224. }
  225. template<class MutableBufferSequence>
  226. std::size_t
  227. stream::
  228. read_some(MutableBufferSequence const& buffers,
  229. error_code& ec)
  230. {
  231. static_assert(net::is_mutable_buffer_sequence<
  232. MutableBufferSequence>::value,
  233. "MutableBufferSequence type requirements not met");
  234. ++in_->nread;
  235. // test failure
  236. if(in_->fc && in_->fc->fail(ec))
  237. return 0;
  238. // A request to read 0 bytes from a stream is a no-op.
  239. if(buffer_bytes(buffers) == 0)
  240. {
  241. ec = {};
  242. return 0;
  243. }
  244. std::unique_lock<std::mutex> lock{in_->m};
  245. BOOST_ASSERT(! in_->op);
  246. in_->cv.wait(lock,
  247. [&]()
  248. {
  249. return
  250. in_->b.size() > 0 ||
  251. in_->code != status::ok;
  252. });
  253. // deliver bytes before eof
  254. if(in_->b.size() > 0)
  255. {
  256. auto const n = net::buffer_copy(
  257. buffers, in_->b.data(), in_->read_max);
  258. in_->b.consume(n);
  259. return n;
  260. }
  261. // deliver error
  262. BOOST_ASSERT(in_->code != status::ok);
  263. ec = net::error::eof;
  264. return 0;
  265. }
  266. template<class MutableBufferSequence, class ReadHandler>
  267. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  268. stream::
  269. async_read_some(
  270. MutableBufferSequence const& buffers,
  271. ReadHandler&& handler)
  272. {
  273. static_assert(net::is_mutable_buffer_sequence<
  274. MutableBufferSequence>::value,
  275. "MutableBufferSequence type requirements not met");
  276. return net::async_initiate<
  277. ReadHandler,
  278. void(error_code, std::size_t)>(
  279. run_read_op{},
  280. handler,
  281. in_,
  282. buffers);
  283. }
  284. template<class ConstBufferSequence>
  285. std::size_t
  286. stream::
  287. write_some(ConstBufferSequence const& buffers)
  288. {
  289. static_assert(net::is_const_buffer_sequence<
  290. ConstBufferSequence>::value,
  291. "ConstBufferSequence type requirements not met");
  292. error_code ec;
  293. auto const bytes_transferred =
  294. write_some(buffers, ec);
  295. if(ec)
  296. BOOST_THROW_EXCEPTION(system_error{ec});
  297. return bytes_transferred;
  298. }
  299. template<class ConstBufferSequence>
  300. std::size_t
  301. stream::
  302. write_some(
  303. ConstBufferSequence const& buffers, error_code& ec)
  304. {
  305. static_assert(net::is_const_buffer_sequence<
  306. ConstBufferSequence>::value,
  307. "ConstBufferSequence type requirements not met");
  308. ++in_->nwrite;
  309. // test failure
  310. if(in_->fc && in_->fc->fail(ec))
  311. return 0;
  312. // A request to write 0 bytes to a stream is a no-op.
  313. if(buffer_bytes(buffers) == 0)
  314. {
  315. ec = {};
  316. return 0;
  317. }
  318. // connection closed
  319. auto out = out_.lock();
  320. if(! out)
  321. {
  322. ec = net::error::connection_reset;
  323. return 0;
  324. }
  325. // copy buffers
  326. auto n = std::min<std::size_t>(
  327. buffer_bytes(buffers), in_->write_max);
  328. {
  329. std::lock_guard<std::mutex> lock(out->m);
  330. n = net::buffer_copy(out->b.prepare(n), buffers);
  331. out->b.commit(n);
  332. out->notify_read();
  333. }
  334. return n;
  335. }
  336. template<class ConstBufferSequence, class WriteHandler>
  337. BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
  338. stream::
  339. async_write_some(
  340. ConstBufferSequence const& buffers,
  341. WriteHandler&& handler)
  342. {
  343. static_assert(net::is_const_buffer_sequence<
  344. ConstBufferSequence>::value,
  345. "ConstBufferSequence type requirements not met");
  346. return net::async_initiate<
  347. WriteHandler,
  348. void(error_code, std::size_t)>(
  349. run_write_op{},
  350. handler,
  351. in_,
  352. out_,
  353. buffers);
  354. }
  355. //------------------------------------------------------------------------------
  356. template<class TeardownHandler>
  357. void
  358. async_teardown(
  359. role_type,
  360. stream& s,
  361. TeardownHandler&& handler)
  362. {
  363. error_code ec;
  364. if( s.in_->fc &&
  365. s.in_->fc->fail(ec))
  366. return net::post(
  367. s.get_executor(),
  368. beast::bind_front_handler(
  369. std::move(handler), ec));
  370. s.close();
  371. if( s.in_->fc &&
  372. s.in_->fc->fail(ec))
  373. ec = net::error::eof;
  374. else
  375. ec = {};
  376. net::post(
  377. s.get_executor(),
  378. beast::bind_front_handler(
  379. std::move(handler), ec));
  380. }
  381. //------------------------------------------------------------------------------
  382. template<class Arg1, class... ArgN>
  383. stream
  384. connect(stream& to, Arg1&& arg1, ArgN&&... argn)
  385. {
  386. stream from{
  387. std::forward<Arg1>(arg1),
  388. std::forward<ArgN>(argn)...};
  389. from.connect(to);
  390. return from;
  391. }
  392. } // test
  393. } // beast
  394. } // boost
  395. #endif