stream.ipp 14 KB


  1. //
  2. // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP
  10. #define BOOST_BEAST_TEST_IMPL_STREAM_IPP
  11. #include <boost/beast/core/buffers_prefix.hpp>
  12. namespace boost {
  13. namespace beast {
  14. namespace test {
  15. inline
  16. stream::
  17. ~stream()
  18. {
  19. {
  20. std::unique_lock<std::mutex> lock{in_->m};
  21. in_->op.reset();
  22. }
  23. auto out = out_.lock();
  24. if(out)
  25. {
  26. std::unique_lock<std::mutex> lock{out->m};
  27. if(out->code == status::ok)
  28. {
  29. out->code = status::reset;
  30. out->on_write();
  31. }
  32. }
  33. }
  34. inline
  35. stream::
  36. stream(stream&& other)
  37. {
  38. auto in = std::make_shared<state>(
  39. other.in_->ioc, other.in_->fc);
  40. in_ = std::move(other.in_);
  41. out_ = std::move(other.out_);
  42. other.in_ = in;
  43. }
  44. inline
  45. stream&
  46. stream::
  47. operator=(stream&& other)
  48. {
  49. auto in = std::make_shared<state>(
  50. other.in_->ioc, other.in_->fc);
  51. in_ = std::move(other.in_);
  52. out_ = std::move(other.out_);
  53. other.in_ = in;
  54. return *this;
  55. }
  56. inline
  57. stream::
  58. stream(boost::asio::io_context& ioc)
  59. : in_(std::make_shared<state>(ioc, nullptr))
  60. {
  61. }
  62. inline
  63. stream::
  64. stream(
  65. boost::asio::io_context& ioc,
  66. fail_count& fc)
  67. : in_(std::make_shared<state>(ioc, &fc))
  68. {
  69. }
  70. inline
  71. stream::
  72. stream(
  73. boost::asio::io_context& ioc,
  74. string_view s)
  75. : in_(std::make_shared<state>(ioc, nullptr))
  76. {
  77. using boost::asio::buffer;
  78. using boost::asio::buffer_copy;
  79. in_->b.commit(buffer_copy(
  80. in_->b.prepare(s.size()),
  81. buffer(s.data(), s.size())));
  82. }
  83. inline
  84. stream::
  85. stream(
  86. boost::asio::io_context& ioc,
  87. fail_count& fc,
  88. string_view s)
  89. : in_(std::make_shared<state>(ioc, &fc))
  90. {
  91. using boost::asio::buffer;
  92. using boost::asio::buffer_copy;
  93. in_->b.commit(buffer_copy(
  94. in_->b.prepare(s.size()),
  95. buffer(s.data(), s.size())));
  96. }
  97. inline
  98. void
  99. stream::
  100. connect(stream& remote)
  101. {
  102. BOOST_ASSERT(! out_.lock());
  103. BOOST_ASSERT(! remote.out_.lock());
  104. out_ = remote.in_;
  105. remote.out_ = in_;
  106. }
  107. inline
  108. string_view
  109. stream::
  110. str() const
  111. {
  112. auto const bs = in_->b.data();
  113. if(boost::asio::buffer_size(bs) == 0)
  114. return {};
  115. auto const b = buffers_front(bs);
  116. return {static_cast<char const*>(b.data()), b.size()};
  117. }
  118. inline
  119. void
  120. stream::
  121. append(string_view s)
  122. {
  123. using boost::asio::buffer;
  124. using boost::asio::buffer_copy;
  125. std::lock_guard<std::mutex> lock{in_->m};
  126. in_->b.commit(buffer_copy(
  127. in_->b.prepare(s.size()),
  128. buffer(s.data(), s.size())));
  129. }
  130. inline
  131. void
  132. stream::
  133. clear()
  134. {
  135. std::lock_guard<std::mutex> lock{in_->m};
  136. in_->b.consume(in_->b.size());
  137. }
  138. inline
  139. void
  140. stream::
  141. close()
  142. {
  143. BOOST_ASSERT(! in_->op);
  144. auto out = out_.lock();
  145. if(! out)
  146. return;
  147. std::lock_guard<std::mutex> lock{out->m};
  148. if(out->code == status::ok)
  149. {
  150. out->code = status::eof;
  151. out->on_write();
  152. }
  153. }
  154. inline
  155. void
  156. stream::
  157. close_remote()
  158. {
  159. std::lock_guard<std::mutex> lock{in_->m};
  160. if(in_->code == status::ok)
  161. {
  162. in_->code = status::eof;
  163. in_->on_write();
  164. }
  165. }
  166. template<class MutableBufferSequence>
  167. std::size_t
  168. stream::
  169. read_some(MutableBufferSequence const& buffers)
  170. {
  171. static_assert(boost::asio::is_mutable_buffer_sequence<
  172. MutableBufferSequence>::value,
  173. "MutableBufferSequence requirements not met");
  174. error_code ec;
  175. auto const n = read_some(buffers, ec);
  176. if(ec)
  177. BOOST_THROW_EXCEPTION(system_error{ec});
  178. return n;
  179. }
  180. template<class MutableBufferSequence>
  181. std::size_t
  182. stream::
  183. read_some(MutableBufferSequence const& buffers,
  184. error_code& ec)
  185. {
  186. static_assert(boost::asio::is_mutable_buffer_sequence<
  187. MutableBufferSequence>::value,
  188. "MutableBufferSequence requirements not met");
  189. using boost::asio::buffer_copy;
  190. using boost::asio::buffer_size;
  191. if(in_->fc && in_->fc->fail(ec))
  192. return 0;
  193. if(buffer_size(buffers) == 0)
  194. {
  195. ec.clear();
  196. return 0;
  197. }
  198. std::unique_lock<std::mutex> lock{in_->m};
  199. BOOST_ASSERT(! in_->op);
  200. in_->cv.wait(lock,
  201. [&]()
  202. {
  203. return
  204. in_->b.size() > 0 ||
  205. in_->code != status::ok;
  206. });
  207. std::size_t bytes_transferred;
  208. if(in_->b.size() > 0)
  209. {
  210. ec.assign(0, ec.category());
  211. bytes_transferred = buffer_copy(
  212. buffers, in_->b.data(), in_->read_max);
  213. in_->b.consume(bytes_transferred);
  214. }
  215. else
  216. {
  217. BOOST_ASSERT(in_->code != status::ok);
  218. bytes_transferred = 0;
  219. if(in_->code == status::eof)
  220. ec = boost::asio::error::eof;
  221. else if(in_->code == status::reset)
  222. ec = boost::asio::error::connection_reset;
  223. }
  224. ++in_->nread;
  225. return bytes_transferred;
  226. }
  227. template<class MutableBufferSequence, class ReadHandler>
  228. BOOST_ASIO_INITFN_RESULT_TYPE(
  229. ReadHandler, void(error_code, std::size_t))
  230. stream::
  231. async_read_some(
  232. MutableBufferSequence const& buffers,
  233. ReadHandler&& handler)
  234. {
  235. static_assert(boost::asio::is_mutable_buffer_sequence<
  236. MutableBufferSequence>::value,
  237. "MutableBufferSequence requirements not met");
  238. using boost::asio::buffer_copy;
  239. using boost::asio::buffer_size;
  240. BOOST_BEAST_HANDLER_INIT(
  241. ReadHandler, void(error_code, std::size_t));
  242. if(in_->fc)
  243. {
  244. error_code ec;
  245. if(in_->fc->fail(ec))
  246. return boost::asio::post(
  247. in_->ioc.get_executor(),
  248. bind_handler(
  249. std::move(init.completion_handler),
  250. ec,
  251. 0));
  252. }
  253. {
  254. std::unique_lock<std::mutex> lock{in_->m};
  255. BOOST_ASSERT(! in_->op);
  256. if(buffer_size(buffers) == 0 ||
  257. buffer_size(in_->b.data()) > 0)
  258. {
  259. auto const bytes_transferred = buffer_copy(
  260. buffers, in_->b.data(), in_->read_max);
  261. in_->b.consume(bytes_transferred);
  262. lock.unlock();
  263. ++in_->nread;
  264. boost::asio::post(
  265. in_->ioc.get_executor(),
  266. bind_handler(
  267. std::move(init.completion_handler),
  268. error_code{},
  269. bytes_transferred));
  270. }
  271. else if(in_->code != status::ok)
  272. {
  273. lock.unlock();
  274. ++in_->nread;
  275. error_code ec;
  276. if(in_->code == status::eof)
  277. ec = boost::asio::error::eof;
  278. else if(in_->code == status::reset)
  279. ec = boost::asio::error::connection_reset;
  280. boost::asio::post(
  281. in_->ioc.get_executor(),
  282. bind_handler(
  283. std::move(init.completion_handler),
  284. ec,
  285. 0));
  286. }
  287. else
  288. {
  289. in_->op.reset(new read_op<BOOST_ASIO_HANDLER_TYPE(
  290. ReadHandler, void(error_code, std::size_t)),
  291. MutableBufferSequence>{*in_, buffers,
  292. std::move(init.completion_handler)});
  293. }
  294. }
  295. return init.result.get();
  296. }
  297. template<class ConstBufferSequence>
  298. std::size_t
  299. stream::
  300. write_some(ConstBufferSequence const& buffers)
  301. {
  302. static_assert(boost::asio::is_const_buffer_sequence<
  303. ConstBufferSequence>::value,
  304. "ConstBufferSequence requirements not met");
  305. error_code ec;
  306. auto const bytes_transferred =
  307. write_some(buffers, ec);
  308. if(ec)
  309. BOOST_THROW_EXCEPTION(system_error{ec});
  310. return bytes_transferred;
  311. }
  312. template<class ConstBufferSequence>
  313. std::size_t
  314. stream::
  315. write_some(
  316. ConstBufferSequence const& buffers, error_code& ec)
  317. {
  318. static_assert(boost::asio::is_const_buffer_sequence<
  319. ConstBufferSequence>::value,
  320. "ConstBufferSequence requirements not met");
  321. using boost::asio::buffer_copy;
  322. using boost::asio::buffer_size;
  323. auto out = out_.lock();
  324. if(! out)
  325. {
  326. ec = boost::asio::error::connection_reset;
  327. return 0;
  328. }
  329. BOOST_ASSERT(out->code == status::ok);
  330. if(in_->fc && in_->fc->fail(ec))
  331. return 0;
  332. auto const n = (std::min)(
  333. buffer_size(buffers), in_->write_max);
  334. std::unique_lock<std::mutex> lock{out->m};
  335. auto const bytes_transferred =
  336. buffer_copy(out->b.prepare(n), buffers);
  337. out->b.commit(bytes_transferred);
  338. out->on_write();
  339. lock.unlock();
  340. ++in_->nwrite;
  341. ec.assign(0, ec.category());
  342. return bytes_transferred;
  343. }
  344. template<class ConstBufferSequence, class WriteHandler>
  345. BOOST_ASIO_INITFN_RESULT_TYPE(
  346. WriteHandler, void(error_code, std::size_t))
  347. stream::
  348. async_write_some(ConstBufferSequence const& buffers,
  349. WriteHandler&& handler)
  350. {
  351. static_assert(boost::asio::is_const_buffer_sequence<
  352. ConstBufferSequence>::value,
  353. "ConstBufferSequence requirements not met");
  354. using boost::asio::buffer_copy;
  355. using boost::asio::buffer_size;
  356. BOOST_BEAST_HANDLER_INIT(
  357. WriteHandler, void(error_code, std::size_t));
  358. auto out = out_.lock();
  359. if(! out)
  360. return boost::asio::post(
  361. in_->ioc.get_executor(),
  362. bind_handler(
  363. std::move(init.completion_handler),
  364. boost::asio::error::connection_reset,
  365. 0));
  366. BOOST_ASSERT(out->code == status::ok);
  367. if(in_->fc)
  368. {
  369. error_code ec;
  370. if(in_->fc->fail(ec))
  371. return boost::asio::post(
  372. in_->ioc.get_executor(),
  373. bind_handler(
  374. std::move(init.completion_handler),
  375. ec,
  376. 0));
  377. }
  378. auto const n =
  379. (std::min)(buffer_size(buffers), in_->write_max);
  380. std::unique_lock<std::mutex> lock{out->m};
  381. auto const bytes_transferred =
  382. buffer_copy(out->b.prepare(n), buffers);
  383. out->b.commit(bytes_transferred);
  384. out->on_write();
  385. lock.unlock();
  386. ++in_->nwrite;
  387. boost::asio::post(
  388. in_->ioc.get_executor(),
  389. bind_handler(
  390. std::move(init.completion_handler),
  391. error_code{},
  392. bytes_transferred));
  393. return init.result.get();
  394. }
  395. inline
  396. void
  397. teardown(
  398. websocket::role_type,
  399. stream& s,
  400. boost::system::error_code& ec)
  401. {
  402. if( s.in_->fc &&
  403. s.in_->fc->fail(ec))
  404. return;
  405. s.close();
  406. if( s.in_->fc &&
  407. s.in_->fc->fail(ec))
  408. ec = boost::asio::error::eof;
  409. else
  410. ec.assign(0, ec.category());
  411. }
  412. template<class TeardownHandler>
  413. inline
  414. void
  415. async_teardown(
  416. websocket::role_type,
  417. stream& s,
  418. TeardownHandler&& handler)
  419. {
  420. error_code ec;
  421. if( s.in_->fc &&
  422. s.in_->fc->fail(ec))
  423. return boost::asio::post(
  424. s.get_executor(),
  425. bind_handler(std::move(handler), ec));
  426. s.close();
  427. if( s.in_->fc &&
  428. s.in_->fc->fail(ec))
  429. ec = boost::asio::error::eof;
  430. else
  431. ec.assign(0, ec.category());
  432. boost::asio::post(
  433. s.get_executor(),
  434. bind_handler(std::move(handler), ec));
  435. }
  436. //------------------------------------------------------------------------------
  437. template<class Handler, class Buffers>
  438. class stream::read_op : public stream::read_op_base
  439. {
  440. class lambda
  441. {
  442. state& s_;
  443. Buffers b_;
  444. Handler h_;
  445. boost::asio::executor_work_guard<
  446. boost::asio::io_context::executor_type> work_;
  447. public:
  448. lambda(lambda&&) = default;
  449. lambda(lambda const&) = default;
  450. template<class DeducedHandler>
  451. lambda(state& s, Buffers const& b, DeducedHandler&& h)
  452. : s_(s)
  453. , b_(b)
  454. , h_(std::forward<DeducedHandler>(h))
  455. , work_(s_.ioc.get_executor())
  456. {
  457. }
  458. void
  459. post()
  460. {
  461. boost::asio::post(
  462. s_.ioc.get_executor(),
  463. std::move(*this));
  464. work_.reset();
  465. }
  466. void
  467. operator()()
  468. {
  469. using boost::asio::buffer_copy;
  470. using boost::asio::buffer_size;
  471. std::unique_lock<std::mutex> lock{s_.m};
  472. BOOST_ASSERT(! s_.op);
  473. if(s_.b.size() > 0)
  474. {
  475. auto const bytes_transferred = buffer_copy(
  476. b_, s_.b.data(), s_.read_max);
  477. s_.b.consume(bytes_transferred);
  478. auto& s = s_;
  479. Handler h{std::move(h_)};
  480. lock.unlock();
  481. ++s.nread;
  482. boost::asio::post(
  483. s.ioc.get_executor(),
  484. bind_handler(
  485. std::move(h),
  486. error_code{},
  487. bytes_transferred));
  488. }
  489. else
  490. {
  491. BOOST_ASSERT(s_.code != status::ok);
  492. auto& s = s_;
  493. Handler h{std::move(h_)};
  494. lock.unlock();
  495. ++s.nread;
  496. error_code ec;
  497. if(s.code == status::eof)
  498. ec = boost::asio::error::eof;
  499. else if(s.code == status::reset)
  500. ec = boost::asio::error::connection_reset;
  501. boost::asio::post(
  502. s.ioc.get_executor(),
  503. bind_handler(std::move(h), ec, 0));
  504. }
  505. }
  506. };
  507. lambda fn_;
  508. public:
  509. template<class DeducedHandler>
  510. read_op(state& s, Buffers const& b, DeducedHandler&& h)
  511. : fn_(s, b, std::forward<DeducedHandler>(h))
  512. {
  513. }
  514. void
  515. operator()() override
  516. {
  517. fn_.post();
  518. }
  519. };
  520. inline
  521. stream
  522. connect(stream& to)
  523. {
  524. stream from{to.get_executor().context()};
  525. from.connect(to);
  526. return from;
  527. }
  528. template<class Arg1, class... ArgN>
  529. stream
  530. connect(stream& to, Arg1&& arg1, ArgN&&... argn)
  531. {
  532. stream from{
  533. std::forward<Arg1>(arg1),
  534. std::forward<ArgN>(argn)...};
  535. from.connect(to);
  536. return from;
  537. }
  538. } // test
  539. } // beast
  540. } // boost
  541. #endif