stream.ipp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP
  10. #define BOOST_BEAST_TEST_IMPL_STREAM_IPP
  11. #include <boost/beast/_experimental/test/stream.hpp>
  12. #include <boost/beast/core/bind_handler.hpp>
  13. #include <boost/beast/core/buffer_traits.hpp>
  14. #include <boost/beast/core/buffers_prefix.hpp>
  15. #include <boost/make_shared.hpp>
  16. #include <stdexcept>
  17. #include <vector>
  18. namespace boost {
  19. namespace beast {
  20. namespace test {
  21. //------------------------------------------------------------------------------
  22. stream::
  23. service::
  24. service(net::execution_context& ctx)
  25. : beast::detail::service_base<service>(ctx)
  26. , sp_(boost::make_shared<service_impl>())
  27. {
  28. }
  29. void
  30. stream::
  31. service::
  32. shutdown()
  33. {
  34. std::vector<std::unique_ptr<read_op_base>> v;
  35. std::lock_guard<std::mutex> g1(sp_->m_);
  36. v.reserve(sp_->v_.size());
  37. for(auto p : sp_->v_)
  38. {
  39. std::lock_guard<std::mutex> g2(p->m);
  40. v.emplace_back(std::move(p->op));
  41. p->code = status::eof;
  42. }
  43. }
  44. auto
  45. stream::
  46. service::
  47. make_impl(
  48. net::io_context& ctx,
  49. test::fail_count* fc) ->
  50. boost::shared_ptr<state>
  51. {
  52. auto& svc = net::use_service<service>(ctx);
  53. auto sp = boost::make_shared<state>(ctx, svc.sp_, fc);
  54. std::lock_guard<std::mutex> g(svc.sp_->m_);
  55. svc.sp_->v_.push_back(sp.get());
  56. return sp;
  57. }
  58. void
  59. stream::
  60. service_impl::
  61. remove(state& impl)
  62. {
  63. std::lock_guard<std::mutex> g(m_);
  64. *std::find(
  65. v_.begin(), v_.end(),
  66. &impl) = std::move(v_.back());
  67. v_.pop_back();
  68. }
  69. //------------------------------------------------------------------------------
  70. void stream::initiate_read(
  71. boost::shared_ptr<state> const& in_,
  72. std::unique_ptr<stream::read_op_base>&& op,
  73. std::size_t buf_size)
  74. {
  75. std::unique_lock<std::mutex> lock(in_->m);
  76. ++in_->nread;
  77. if(in_->op != nullptr)
  78. BOOST_THROW_EXCEPTION(
  79. std::logic_error{"in_->op != nullptr"});
  80. // test failure
  81. error_code ec;
  82. if(in_->fc && in_->fc->fail(ec))
  83. {
  84. lock.unlock();
  85. (*op)(ec);
  86. return;
  87. }
  88. // A request to read 0 bytes from a stream is a no-op.
  89. if(buf_size == 0 || buffer_bytes(in_->b.data()) > 0)
  90. {
  91. lock.unlock();
  92. (*op)(ec);
  93. return;
  94. }
  95. // deliver error
  96. if(in_->code != status::ok)
  97. {
  98. lock.unlock();
  99. (*op)(net::error::eof);
  100. return;
  101. }
  102. // complete when bytes available or closed
  103. in_->op = std::move(op);
  104. }
  105. stream::
  106. state::
  107. state(
  108. net::io_context& ioc_,
  109. boost::weak_ptr<service_impl> wp_,
  110. fail_count* fc_)
  111. : ioc(ioc_)
  112. , wp(std::move(wp_))
  113. , fc(fc_)
  114. {
  115. }
  116. stream::
  117. state::
  118. ~state()
  119. {
  120. // cancel outstanding read
  121. if(op != nullptr)
  122. (*op)(net::error::operation_aborted);
  123. }
  124. void
  125. stream::
  126. state::
  127. remove() noexcept
  128. {
  129. auto sp = wp.lock();
  130. // If this goes off, it means the lifetime of a test::stream object
  131. // extended beyond the lifetime of the associated execution context.
  132. BOOST_ASSERT(sp);
  133. sp->remove(*this);
  134. }
  135. void
  136. stream::
  137. state::
  138. notify_read()
  139. {
  140. if(op)
  141. {
  142. auto op_ = std::move(op);
  143. op_->operator()(error_code{});
  144. }
  145. else
  146. {
  147. cv.notify_all();
  148. }
  149. }
  150. void
  151. stream::
  152. state::
  153. cancel_read()
  154. {
  155. std::unique_ptr<read_op_base> p;
  156. {
  157. std::lock_guard<std::mutex> lock(m);
  158. code = status::eof;
  159. p = std::move(op);
  160. }
  161. if(p != nullptr)
  162. (*p)(net::error::operation_aborted);
  163. }
  164. //------------------------------------------------------------------------------
  165. stream::
  166. ~stream()
  167. {
  168. close();
  169. in_->remove();
  170. }
  171. stream::
  172. stream(stream&& other)
  173. {
  174. auto in = service::make_impl(
  175. other.in_->ioc, other.in_->fc);
  176. in_ = std::move(other.in_);
  177. out_ = std::move(other.out_);
  178. other.in_ = in;
  179. }
  180. stream&
  181. stream::
  182. operator=(stream&& other)
  183. {
  184. close();
  185. auto in = service::make_impl(
  186. other.in_->ioc, other.in_->fc);
  187. in_->remove();
  188. in_ = std::move(other.in_);
  189. out_ = std::move(other.out_);
  190. other.in_ = in;
  191. return *this;
  192. }
  193. //------------------------------------------------------------------------------
  194. stream::
  195. stream(net::io_context& ioc)
  196. : in_(service::make_impl(ioc, nullptr))
  197. {
  198. }
  199. stream::
  200. stream(
  201. net::io_context& ioc,
  202. fail_count& fc)
  203. : in_(service::make_impl(ioc, &fc))
  204. {
  205. }
  206. stream::
  207. stream(
  208. net::io_context& ioc,
  209. string_view s)
  210. : in_(service::make_impl(ioc, nullptr))
  211. {
  212. in_->b.commit(net::buffer_copy(
  213. in_->b.prepare(s.size()),
  214. net::buffer(s.data(), s.size())));
  215. }
  216. stream::
  217. stream(
  218. net::io_context& ioc,
  219. fail_count& fc,
  220. string_view s)
  221. : in_(service::make_impl(ioc, &fc))
  222. {
  223. in_->b.commit(net::buffer_copy(
  224. in_->b.prepare(s.size()),
  225. net::buffer(s.data(), s.size())));
  226. }
  227. void
  228. stream::
  229. connect(stream& remote)
  230. {
  231. BOOST_ASSERT(! out_.lock());
  232. BOOST_ASSERT(! remote.out_.lock());
  233. out_ = remote.in_;
  234. remote.out_ = in_;
  235. in_->code = status::ok;
  236. remote.in_->code = status::ok;
  237. }
  238. string_view
  239. stream::
  240. str() const
  241. {
  242. auto const bs = in_->b.data();
  243. if(buffer_bytes(bs) == 0)
  244. return {};
  245. auto const b = beast::buffers_front(bs);
  246. return {static_cast<char const*>(b.data()), b.size()};
  247. }
  248. void
  249. stream::
  250. append(string_view s)
  251. {
  252. std::lock_guard<std::mutex> lock{in_->m};
  253. in_->b.commit(net::buffer_copy(
  254. in_->b.prepare(s.size()),
  255. net::buffer(s.data(), s.size())));
  256. }
  257. void
  258. stream::
  259. clear()
  260. {
  261. std::lock_guard<std::mutex> lock{in_->m};
  262. in_->b.consume(in_->b.size());
  263. }
  264. void
  265. stream::
  266. close()
  267. {
  268. in_->cancel_read();
  269. // disconnect
  270. {
  271. auto out = out_.lock();
  272. out_.reset();
  273. // notify peer
  274. if(out)
  275. {
  276. std::lock_guard<std::mutex> lock(out->m);
  277. if(out->code == status::ok)
  278. {
  279. out->code = status::eof;
  280. out->notify_read();
  281. }
  282. }
  283. }
  284. }
  285. void
  286. stream::
  287. close_remote()
  288. {
  289. std::lock_guard<std::mutex> lock{in_->m};
  290. if(in_->code == status::ok)
  291. {
  292. in_->code = status::eof;
  293. in_->notify_read();
  294. }
  295. }
  296. void
  297. teardown(
  298. role_type,
  299. stream& s,
  300. boost::system::error_code& ec)
  301. {
  302. if( s.in_->fc &&
  303. s.in_->fc->fail(ec))
  304. return;
  305. s.close();
  306. if( s.in_->fc &&
  307. s.in_->fc->fail(ec))
  308. ec = net::error::eof;
  309. else
  310. ec = {};
  311. }
  312. //------------------------------------------------------------------------------
  313. stream
  314. connect(stream& to)
  315. {
  316. stream from{to.get_executor().context()};
  317. from.connect(to);
  318. return from;
  319. }
  320. void
  321. connect(stream& s1, stream& s2)
  322. {
  323. s1.connect(s2);
  324. }
  325. } // test
  326. } // beast
  327. } // boost
  328. #endif