close.ipp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. //
  2. // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP
  11. #include <boost/beast/websocket/teardown.hpp>
  12. #include <boost/beast/core/handler_ptr.hpp>
  13. #include <boost/beast/core/flat_static_buffer.hpp>
  14. #include <boost/beast/core/type_traits.hpp>
  15. #include <boost/beast/core/detail/config.hpp>
  16. #include <boost/asio/associated_allocator.hpp>
  17. #include <boost/asio/associated_executor.hpp>
  18. #include <boost/asio/coroutine.hpp>
  19. #include <boost/asio/executor_work_guard.hpp>
  20. #include <boost/asio/handler_continuation_hook.hpp>
  21. #include <boost/asio/handler_invoke_hook.hpp>
  22. #include <boost/asio/post.hpp>
  23. #include <boost/throw_exception.hpp>
  24. #include <memory>
  25. namespace boost {
  26. namespace beast {
  27. namespace websocket {
  28. /* Close the WebSocket Connection
  29. This composed operation sends the close frame if it hasn't already
  30. been sent, then reads and discards frames until receiving a close
  31. frame. Finally it invokes the teardown operation to shut down the
  32. underlying connection.
  33. */
  34. template<class NextLayer, bool deflateSupported>
  35. template<class Handler>
  36. class stream<NextLayer, deflateSupported>::close_op
  37. : public boost::asio::coroutine
  38. {
  39. struct state
  40. {
  41. stream<NextLayer, deflateSupported>& ws;
  42. boost::asio::executor_work_guard<decltype(std::declval<
  43. stream<NextLayer, deflateSupported>&>().get_executor())> wg;
  44. detail::frame_buffer fb;
  45. error_code ev;
  46. bool cont = false;
  47. state(
  48. Handler const&,
  49. stream<NextLayer, deflateSupported>& ws_,
  50. close_reason const& cr)
  51. : ws(ws_)
  52. , wg(ws.get_executor())
  53. {
  54. // Serialize the close frame
  55. ws.template write_close<
  56. flat_static_buffer_base>(fb, cr);
  57. }
  58. };
  59. handler_ptr<state, Handler> d_;
  60. public:
  61. static constexpr int id = 4; // for soft_mutex
  62. close_op(close_op&&) = default;
  63. close_op(close_op const&) = delete;
  64. template<class DeducedHandler>
  65. close_op(
  66. DeducedHandler&& h,
  67. stream<NextLayer, deflateSupported>& ws,
  68. close_reason const& cr)
  69. : d_(std::forward<DeducedHandler>(h), ws, cr)
  70. {
  71. }
  72. using allocator_type =
  73. boost::asio::associated_allocator_t<Handler>;
  74. allocator_type
  75. get_allocator() const noexcept
  76. {
  77. return (boost::asio::get_associated_allocator)(d_.handler());
  78. }
  79. using executor_type = boost::asio::associated_executor_t<
  80. Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
  81. executor_type
  82. get_executor() const noexcept
  83. {
  84. return (boost::asio::get_associated_executor)(
  85. d_.handler(), d_->ws.get_executor());
  86. }
  87. void
  88. operator()(
  89. error_code ec = {},
  90. std::size_t bytes_transferred = 0,
  91. bool cont = true);
  92. friend
  93. bool asio_handler_is_continuation(close_op* op)
  94. {
  95. using boost::asio::asio_handler_is_continuation;
  96. return op->d_->cont || asio_handler_is_continuation(
  97. std::addressof(op->d_.handler()));
  98. }
  99. template<class Function>
  100. friend
  101. void asio_handler_invoke(Function&& f, close_op* op)
  102. {
  103. using boost::asio::asio_handler_invoke;
  104. asio_handler_invoke(f,
  105. std::addressof(op->d_.handler()));
  106. }
  107. };
  108. template<class NextLayer, bool deflateSupported>
  109. template<class Handler>
  110. void
  111. stream<NextLayer, deflateSupported>::
  112. close_op<Handler>::
  113. operator()(
  114. error_code ec,
  115. std::size_t bytes_transferred,
  116. bool cont)
  117. {
  118. using beast::detail::clamp;
  119. auto& d = *d_;
  120. d.cont = cont;
  121. BOOST_ASIO_CORO_REENTER(*this)
  122. {
  123. // Attempt to acquire write block
  124. if(! d.ws.wr_block_.try_lock(this))
  125. {
  126. // Suspend
  127. BOOST_ASIO_CORO_YIELD
  128. d.ws.paused_close_.emplace(std::move(*this));
  129. // Acquire the write block
  130. d.ws.wr_block_.lock(this);
  131. // Resume
  132. BOOST_ASIO_CORO_YIELD
  133. boost::asio::post(
  134. d.ws.get_executor(), std::move(*this));
  135. BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
  136. }
  137. // Make sure the stream is open
  138. if(! d.ws.check_open(ec))
  139. goto upcall;
  140. // Can't call close twice
  141. BOOST_ASSERT(! d.ws.wr_close_);
  142. // Change status to closing
  143. BOOST_ASSERT(d.ws.status_ == status::open);
  144. d.ws.status_ = status::closing;
  145. // Send close frame
  146. d.ws.wr_close_ = true;
  147. BOOST_ASIO_CORO_YIELD
  148. boost::asio::async_write(d.ws.stream_,
  149. d.fb.data(), std::move(*this));
  150. if(! d.ws.check_ok(ec))
  151. goto upcall;
  152. if(d.ws.rd_close_)
  153. {
  154. // This happens when the read_op gets a close frame
  155. // at the same time close_op is sending the close frame.
  156. // The read_op will be suspended on the write block.
  157. goto teardown;
  158. }
  159. // Maybe suspend
  160. if(! d.ws.rd_block_.try_lock(this))
  161. {
  162. // Suspend
  163. BOOST_ASIO_CORO_YIELD
  164. d.ws.paused_r_close_.emplace(std::move(*this));
  165. // Acquire the read block
  166. d.ws.rd_block_.lock(this);
  167. // Resume
  168. BOOST_ASIO_CORO_YIELD
  169. boost::asio::post(
  170. d.ws.get_executor(), std::move(*this));
  171. BOOST_ASSERT(d.ws.rd_block_.is_locked(this));
  172. // Make sure the stream is open
  173. BOOST_ASSERT(d.ws.status_ != status::open);
  174. BOOST_ASSERT(d.ws.status_ != status::closed);
  175. if( d.ws.status_ == status::failed)
  176. goto upcall;
  177. BOOST_ASSERT(! d.ws.rd_close_);
  178. }
  179. // Drain
  180. if(d.ws.rd_remain_ > 0)
  181. goto read_payload;
  182. for(;;)
  183. {
  184. // Read frame header
  185. while(! d.ws.parse_fh(
  186. d.ws.rd_fh_, d.ws.rd_buf_, d.ev))
  187. {
  188. if(d.ev)
  189. goto teardown;
  190. BOOST_ASIO_CORO_YIELD
  191. d.ws.stream_.async_read_some(
  192. d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
  193. d.ws.rd_buf_.max_size())),
  194. std::move(*this));
  195. if(! d.ws.check_ok(ec))
  196. goto upcall;
  197. d.ws.rd_buf_.commit(bytes_transferred);
  198. }
  199. if(detail::is_control(d.ws.rd_fh_.op))
  200. {
  201. // Process control frame
  202. if(d.ws.rd_fh_.op == detail::opcode::close)
  203. {
  204. BOOST_ASSERT(! d.ws.rd_close_);
  205. d.ws.rd_close_ = true;
  206. auto const mb = buffers_prefix(
  207. clamp(d.ws.rd_fh_.len),
  208. d.ws.rd_buf_.mutable_data());
  209. if(d.ws.rd_fh_.len > 0 && d.ws.rd_fh_.mask)
  210. detail::mask_inplace(mb, d.ws.rd_key_);
  211. detail::read_close(d.ws.cr_, mb, d.ev);
  212. if(d.ev)
  213. goto teardown;
  214. d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
  215. goto teardown;
  216. }
  217. d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
  218. }
  219. else
  220. {
  221. read_payload:
  222. while(d.ws.rd_buf_.size() < d.ws.rd_remain_)
  223. {
  224. d.ws.rd_remain_ -= d.ws.rd_buf_.size();
  225. d.ws.rd_buf_.consume(d.ws.rd_buf_.size());
  226. BOOST_ASIO_CORO_YIELD
  227. d.ws.stream_.async_read_some(
  228. d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
  229. d.ws.rd_buf_.max_size())),
  230. std::move(*this));
  231. if(! d.ws.check_ok(ec))
  232. goto upcall;
  233. d.ws.rd_buf_.commit(bytes_transferred);
  234. }
  235. BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_);
  236. d.ws.rd_buf_.consume(clamp(d.ws.rd_remain_));
  237. d.ws.rd_remain_ = 0;
  238. }
  239. }
  240. teardown:
  241. // Teardown
  242. BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
  243. using beast::websocket::async_teardown;
  244. BOOST_ASIO_CORO_YIELD
  245. async_teardown(d.ws.role_,
  246. d.ws.stream_, std::move(*this));
  247. BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
  248. if(ec == boost::asio::error::eof)
  249. {
  250. // Rationale:
  251. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  252. ec.assign(0, ec.category());
  253. }
  254. if(! ec)
  255. ec = d.ev;
  256. if(ec)
  257. d.ws.status_ = status::failed;
  258. else
  259. d.ws.status_ = status::closed;
  260. d.ws.close();
  261. upcall:
  262. BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
  263. d.ws.wr_block_.unlock(this);
  264. if(d.ws.rd_block_.try_unlock(this))
  265. d.ws.paused_r_rd_.maybe_invoke();
  266. d.ws.paused_rd_.maybe_invoke() ||
  267. d.ws.paused_ping_.maybe_invoke() ||
  268. d.ws.paused_wr_.maybe_invoke();
  269. if(! d.cont)
  270. {
  271. BOOST_ASIO_CORO_YIELD
  272. boost::asio::post(
  273. d.ws.get_executor(),
  274. bind_handler(std::move(*this), ec));
  275. }
  276. {
  277. auto wg = std::move(d.wg);
  278. d_.invoke(ec);
  279. }
  280. }
  281. }
  282. //------------------------------------------------------------------------------
  283. template<class NextLayer, bool deflateSupported>
  284. void
  285. stream<NextLayer, deflateSupported>::
  286. close(close_reason const& cr)
  287. {
  288. static_assert(is_sync_stream<next_layer_type>::value,
  289. "SyncStream requirements not met");
  290. error_code ec;
  291. close(cr, ec);
  292. if(ec)
  293. BOOST_THROW_EXCEPTION(system_error{ec});
  294. }
  295. template<class NextLayer, bool deflateSupported>
  296. void
  297. stream<NextLayer, deflateSupported>::
  298. close(close_reason const& cr, error_code& ec)
  299. {
  300. static_assert(is_sync_stream<next_layer_type>::value,
  301. "SyncStream requirements not met");
  302. using beast::detail::clamp;
  303. ec.assign(0, ec.category());
  304. // Make sure the stream is open
  305. if(! check_open(ec))
  306. return;
  307. // If rd_close_ is set then we already sent a close
  308. BOOST_ASSERT(! rd_close_);
  309. BOOST_ASSERT(! wr_close_);
  310. wr_close_ = true;
  311. {
  312. detail::frame_buffer fb;
  313. write_close<flat_static_buffer_base>(fb, cr);
  314. boost::asio::write(stream_, fb.data(), ec);
  315. }
  316. if(! check_ok(ec))
  317. return;
  318. status_ = status::closing;
  319. error_code result;
  320. // Drain the connection
  321. if(rd_remain_ > 0)
  322. goto read_payload;
  323. for(;;)
  324. {
  325. // Read frame header
  326. while(! parse_fh(rd_fh_, rd_buf_, result))
  327. {
  328. if(result)
  329. return do_fail(
  330. close_code::none, result, ec);
  331. auto const bytes_transferred =
  332. stream_.read_some(
  333. rd_buf_.prepare(read_size(rd_buf_,
  334. rd_buf_.max_size())), ec);
  335. if(! check_ok(ec))
  336. return;
  337. rd_buf_.commit(bytes_transferred);
  338. }
  339. if(detail::is_control(rd_fh_.op))
  340. {
  341. // Process control frame
  342. if(rd_fh_.op == detail::opcode::close)
  343. {
  344. BOOST_ASSERT(! rd_close_);
  345. rd_close_ = true;
  346. auto const mb = buffers_prefix(
  347. clamp(rd_fh_.len),
  348. rd_buf_.mutable_data());
  349. if(rd_fh_.len > 0 && rd_fh_.mask)
  350. detail::mask_inplace(mb, rd_key_);
  351. detail::read_close(cr_, mb, result);
  352. if(result)
  353. {
  354. // Protocol violation
  355. return do_fail(
  356. close_code::none, result, ec);
  357. }
  358. rd_buf_.consume(clamp(rd_fh_.len));
  359. break;
  360. }
  361. rd_buf_.consume(clamp(rd_fh_.len));
  362. }
  363. else
  364. {
  365. read_payload:
  366. while(rd_buf_.size() < rd_remain_)
  367. {
  368. rd_remain_ -= rd_buf_.size();
  369. rd_buf_.consume(rd_buf_.size());
  370. auto const bytes_transferred =
  371. stream_.read_some(
  372. rd_buf_.prepare(read_size(rd_buf_,
  373. rd_buf_.max_size())), ec);
  374. if(! check_ok(ec))
  375. return;
  376. rd_buf_.commit(bytes_transferred);
  377. }
  378. BOOST_ASSERT(rd_buf_.size() >= rd_remain_);
  379. rd_buf_.consume(clamp(rd_remain_));
  380. rd_remain_ = 0;
  381. }
  382. }
  383. // _Close the WebSocket Connection_
  384. do_fail(close_code::none, error::closed, ec);
  385. if(ec == error::closed)
  386. ec.assign(0, ec.category());
  387. }
  388. template<class NextLayer, bool deflateSupported>
  389. template<class CloseHandler>
  390. BOOST_ASIO_INITFN_RESULT_TYPE(
  391. CloseHandler, void(error_code))
  392. stream<NextLayer, deflateSupported>::
  393. async_close(close_reason const& cr, CloseHandler&& handler)
  394. {
  395. static_assert(is_async_stream<next_layer_type>::value,
  396. "AsyncStream requirements not met");
  397. BOOST_BEAST_HANDLER_INIT(
  398. CloseHandler, void(error_code));
  399. close_op<BOOST_ASIO_HANDLER_TYPE(
  400. CloseHandler, void(error_code))>{
  401. std::move(init.completion_handler), *this, cr}(
  402. {}, 0, false);
  403. return init.result.get();
  404. }
  405. } // websocket
  406. } // beast
  407. } // boost
  408. #endif