close.hpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  11. #include <boost/beast/websocket/teardown.hpp>
  12. #include <boost/beast/websocket/detail/mask.hpp>
  13. #include <boost/beast/websocket/impl/stream_impl.hpp>
  14. #include <boost/beast/core/async_base.hpp>
  15. #include <boost/beast/core/flat_static_buffer.hpp>
  16. #include <boost/beast/core/stream_traits.hpp>
  17. #include <boost/beast/core/detail/bind_continuation.hpp>
  18. #include <boost/asio/coroutine.hpp>
  19. #include <boost/asio/dispatch.hpp>
  20. #include <boost/throw_exception.hpp>
  21. #include <memory>
  22. namespace boost {
  23. namespace beast {
  24. namespace websocket {
  25. /* Close the WebSocket Connection
  26. This composed operation sends the close frame if it hasn't already
  27. been sent, then reads and discards frames until receiving a close
  28. frame. Finally it invokes the teardown operation to shut down the
  29. underlying connection.
  30. */
  31. template<class NextLayer, bool deflateSupported>
  32. template<class Handler>
  33. class stream<NextLayer, deflateSupported>::close_op
  34. : public beast::stable_async_base<
  35. Handler, beast::executor_type<stream>>
  36. , public asio::coroutine
  37. {
  38. boost::weak_ptr<impl_type> wp_;
  39. error_code ev_;
  40. detail::frame_buffer& fb_;
  41. public:
  42. static constexpr int id = 5; // for soft_mutex
  43. template<class Handler_>
  44. close_op(
  45. Handler_&& h,
  46. boost::shared_ptr<impl_type> const& sp,
  47. close_reason const& cr)
  48. : stable_async_base<Handler,
  49. beast::executor_type<stream>>(
  50. std::forward<Handler_>(h),
  51. sp->stream().get_executor())
  52. , wp_(sp)
  53. , fb_(beast::allocate_stable<
  54. detail::frame_buffer>(*this))
  55. {
  56. // Serialize the close frame
  57. sp->template write_close<
  58. flat_static_buffer_base>(fb_, cr);
  59. (*this)({}, 0, false);
  60. }
  61. void
  62. operator()(
  63. error_code ec = {},
  64. std::size_t bytes_transferred = 0,
  65. bool cont = true)
  66. {
  67. using beast::detail::clamp;
  68. auto sp = wp_.lock();
  69. if(! sp)
  70. {
  71. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  72. return this->complete(cont, ec);
  73. }
  74. auto& impl = *sp;
  75. BOOST_ASIO_CORO_REENTER(*this)
  76. {
  77. // Acquire the write lock
  78. if(! impl.wr_block.try_lock(this))
  79. {
  80. BOOST_ASIO_CORO_YIELD
  81. {
  82. BOOST_ASIO_HANDLER_LOCATION((
  83. __FILE__, __LINE__,
  84. "websocket::async_close"));
  85. this->set_allowed_cancellation(net::cancellation_type::all);
  86. impl.op_close.emplace(std::move(*this),
  87. net::cancellation_type::all);
  88. }
  89. // cancel fired before we could do anything.
  90. if (ec == net::error::operation_aborted)
  91. return this->complete(cont, ec);
  92. this->set_allowed_cancellation(net::cancellation_type::terminal);
  93. impl.wr_block.lock(this);
  94. BOOST_ASIO_CORO_YIELD
  95. {
  96. BOOST_ASIO_HANDLER_LOCATION((
  97. __FILE__, __LINE__,
  98. "websocket::async_close"));
  99. const auto ex = this->get_immediate_executor();
  100. net::dispatch(ex, std::move(*this));
  101. }
  102. BOOST_ASSERT(impl.wr_block.is_locked(this));
  103. }
  104. if(impl.check_stop_now(ec))
  105. goto upcall;
  106. // Can't call close twice
  107. // TODO return a custom error code
  108. BOOST_ASSERT(! impl.wr_close);
  109. // Send close frame
  110. impl.wr_close = true;
  111. impl.change_status(status::closing);
  112. impl.update_timer(this->get_executor());
  113. BOOST_ASIO_CORO_YIELD
  114. {
  115. BOOST_ASIO_HANDLER_LOCATION((
  116. __FILE__, __LINE__,
  117. "websocket::async_close"));
  118. net::async_write(impl.stream(), fb_.data(),
  119. beast::detail::bind_continuation(std::move(*this)));
  120. }
  121. if(impl.check_stop_now(ec))
  122. goto upcall;
  123. if(impl.rd_close)
  124. {
  125. // This happens when the read_op gets a close frame
  126. // at the same time close_op is sending the close frame.
  127. // The read_op will be suspended on the write block.
  128. goto teardown;
  129. }
  130. // Acquire the read lock
  131. if(! impl.rd_block.try_lock(this))
  132. {
  133. BOOST_ASIO_CORO_YIELD
  134. {
  135. BOOST_ASIO_HANDLER_LOCATION((
  136. __FILE__, __LINE__,
  137. "websocket::async_close"));
  138. // terminal only, that's the default
  139. impl.op_r_close.emplace(std::move(*this));
  140. }
  141. if (ec == net::error::operation_aborted)
  142. {
  143. // if a cancellation fires here, we do a dirty shutdown
  144. impl.change_status(status::closed);
  145. close_socket(get_lowest_layer(impl.stream()));
  146. return this->complete(cont, ec);
  147. }
  148. impl.rd_block.lock(this);
  149. BOOST_ASIO_CORO_YIELD
  150. {
  151. BOOST_ASIO_HANDLER_LOCATION((
  152. __FILE__, __LINE__,
  153. "websocket::async_close"));
  154. const auto ex = this->get_immediate_executor();
  155. net::dispatch(ex, std::move(*this));
  156. }
  157. BOOST_ASSERT(impl.rd_block.is_locked(this));
  158. if(impl.check_stop_now(ec))
  159. goto upcall;
  160. BOOST_ASSERT(! impl.rd_close);
  161. }
  162. // Read until a receiving a close frame
  163. if(impl.rd_remain > 0)
  164. goto read_payload;
  165. for(;;)
  166. {
  167. // Read frame header
  168. while(! impl.parse_fh(
  169. impl.rd_fh, impl.rd_buf, ev_))
  170. {
  171. if(ev_)
  172. goto teardown;
  173. BOOST_ASIO_CORO_YIELD
  174. {
  175. BOOST_ASIO_HANDLER_LOCATION((
  176. __FILE__, __LINE__,
  177. "websocket::async_close"));
  178. impl.stream().async_read_some(
  179. impl.rd_buf.prepare(read_size(
  180. impl.rd_buf, impl.rd_buf.max_size())),
  181. beast::detail::bind_continuation(std::move(*this)));
  182. }
  183. impl.rd_buf.commit(bytes_transferred);
  184. if(impl.check_stop_now(ec)) //< this catches cancellation
  185. goto upcall;
  186. }
  187. if(detail::is_control(impl.rd_fh.op))
  188. {
  189. // Discard ping or pong frame
  190. if(impl.rd_fh.op != detail::opcode::close)
  191. {
  192. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  193. continue;
  194. }
  195. // Process close frame
  196. // TODO Should we invoke the control callback?
  197. BOOST_ASSERT(! impl.rd_close);
  198. impl.rd_close = true;
  199. auto const mb = buffers_prefix(
  200. clamp(impl.rd_fh.len),
  201. impl.rd_buf.data());
  202. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  203. detail::mask_inplace(mb, impl.rd_key);
  204. detail::read_close(impl.cr, mb, ev_);
  205. if(ev_)
  206. goto teardown;
  207. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  208. goto teardown;
  209. }
  210. read_payload:
  211. // Discard message frame
  212. while(impl.rd_buf.size() < impl.rd_remain)
  213. {
  214. impl.rd_remain -= impl.rd_buf.size();
  215. impl.rd_buf.consume(impl.rd_buf.size());
  216. BOOST_ASIO_CORO_YIELD
  217. {
  218. BOOST_ASIO_HANDLER_LOCATION((
  219. __FILE__, __LINE__,
  220. "websocket::async_close"));
  221. impl.stream().async_read_some(
  222. impl.rd_buf.prepare(read_size(
  223. impl.rd_buf, impl.rd_buf.max_size())),
  224. beast::detail::bind_continuation(std::move(*this)));
  225. }
  226. impl.rd_buf.commit(bytes_transferred);
  227. if(impl.check_stop_now(ec))
  228. goto upcall;
  229. }
  230. BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
  231. impl.rd_buf.consume(clamp(impl.rd_remain));
  232. impl.rd_remain = 0;
  233. }
  234. teardown:
  235. // Teardown
  236. BOOST_ASSERT(impl.wr_block.is_locked(this));
  237. using beast::websocket::async_teardown;
  238. BOOST_ASIO_CORO_YIELD
  239. {
  240. BOOST_ASIO_HANDLER_LOCATION((
  241. __FILE__, __LINE__,
  242. "websocket::async_close"));
  243. async_teardown(impl.role, impl.stream(),
  244. beast::detail::bind_continuation(std::move(*this)));
  245. }
  246. BOOST_ASSERT(impl.wr_block.is_locked(this));
  247. if(ec == net::error::eof)
  248. {
  249. // Rationale:
  250. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  251. ec = {};
  252. }
  253. if(! ec)
  254. {
  255. BOOST_BEAST_ASSIGN_EC(ec, ev_);
  256. }
  257. if(ec)
  258. impl.change_status(status::failed);
  259. else
  260. impl.change_status(status::closed);
  261. impl.close();
  262. upcall:
  263. impl.wr_block.unlock(this);
  264. impl.rd_block.try_unlock(this)
  265. && impl.op_r_rd.maybe_invoke();
  266. impl.op_rd.maybe_invoke()
  267. || impl.op_idle_ping.maybe_invoke()
  268. || impl.op_ping.maybe_invoke()
  269. || impl.op_wr.maybe_invoke();
  270. this->complete(cont, ec);
  271. }
  272. }
  273. };
  274. template<class NextLayer, bool deflateSupported>
  275. struct stream<NextLayer, deflateSupported>::
  276. run_close_op
  277. {
  278. boost::shared_ptr<impl_type> const& self;
  279. using executor_type = typename stream::executor_type;
  280. executor_type
  281. get_executor() const noexcept
  282. {
  283. return self->stream().get_executor();
  284. }
  285. template<class CloseHandler>
  286. void
  287. operator()(
  288. CloseHandler&& h,
  289. close_reason const& cr)
  290. {
  291. // If you get an error on the following line it means
  292. // that your handler does not meet the documented type
  293. // requirements for the handler.
  294. static_assert(
  295. beast::detail::is_invocable<CloseHandler,
  296. void(error_code)>::value,
  297. "CloseHandler type requirements not met");
  298. close_op<
  299. typename std::decay<CloseHandler>::type>(
  300. std::forward<CloseHandler>(h),
  301. self,
  302. cr);
  303. }
  304. };
  305. //------------------------------------------------------------------------------
  306. template<class NextLayer, bool deflateSupported>
  307. void
  308. stream<NextLayer, deflateSupported>::
  309. close(close_reason const& cr)
  310. {
  311. static_assert(is_sync_stream<next_layer_type>::value,
  312. "SyncStream type requirements not met");
  313. error_code ec;
  314. close(cr, ec);
  315. if(ec)
  316. BOOST_THROW_EXCEPTION(system_error{ec});
  317. }
  318. template<class NextLayer, bool deflateSupported>
  319. void
  320. stream<NextLayer, deflateSupported>::
  321. close(close_reason const& cr, error_code& ec)
  322. {
  323. static_assert(is_sync_stream<next_layer_type>::value,
  324. "SyncStream type requirements not met");
  325. using beast::detail::clamp;
  326. auto& impl = *impl_;
  327. ec = {};
  328. if(impl.check_stop_now(ec))
  329. return;
  330. BOOST_ASSERT(! impl.rd_close);
  331. // Can't call close twice
  332. // TODO return a custom error code
  333. BOOST_ASSERT(! impl.wr_close);
  334. // Send close frame
  335. {
  336. impl.wr_close = true;
  337. impl.change_status(status::closing);
  338. detail::frame_buffer fb;
  339. impl.template write_close<flat_static_buffer_base>(fb, cr);
  340. net::write(impl.stream(), fb.data(), ec);
  341. if(impl.check_stop_now(ec))
  342. return;
  343. }
  344. // Read until a receiving a close frame
  345. error_code ev;
  346. if(impl.rd_remain > 0)
  347. goto read_payload;
  348. for(;;)
  349. {
  350. // Read frame header
  351. while(! impl.parse_fh(
  352. impl.rd_fh, impl.rd_buf, ev))
  353. {
  354. if(ev)
  355. {
  356. // Protocol violation
  357. return do_fail(close_code::none, ev, ec);
  358. }
  359. impl.rd_buf.commit(impl.stream().read_some(
  360. impl.rd_buf.prepare(read_size(
  361. impl.rd_buf, impl.rd_buf.max_size())), ec));
  362. if(impl.check_stop_now(ec))
  363. return;
  364. }
  365. if(detail::is_control(impl.rd_fh.op))
  366. {
  367. // Discard ping/pong frame
  368. if(impl.rd_fh.op != detail::opcode::close)
  369. {
  370. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  371. continue;
  372. }
  373. // Handle close frame
  374. // TODO Should we invoke the control callback?
  375. BOOST_ASSERT(! impl.rd_close);
  376. impl.rd_close = true;
  377. auto const mb = buffers_prefix(
  378. clamp(impl.rd_fh.len),
  379. impl.rd_buf.data());
  380. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  381. detail::mask_inplace(mb, impl.rd_key);
  382. detail::read_close(impl.cr, mb, ev);
  383. if(ev)
  384. {
  385. // Protocol violation
  386. return do_fail(close_code::none, ev, ec);
  387. }
  388. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  389. break;
  390. }
  391. read_payload:
  392. // Discard message frame
  393. while(impl.rd_buf.size() < impl.rd_remain)
  394. {
  395. impl.rd_remain -= impl.rd_buf.size();
  396. impl.rd_buf.consume(impl.rd_buf.size());
  397. impl.rd_buf.commit(
  398. impl.stream().read_some(
  399. impl.rd_buf.prepare(
  400. read_size(
  401. impl.rd_buf,
  402. impl.rd_buf.max_size())),
  403. ec));
  404. if(impl.check_stop_now(ec))
  405. return;
  406. }
  407. BOOST_ASSERT(
  408. impl.rd_buf.size() >= impl.rd_remain);
  409. impl.rd_buf.consume(clamp(impl.rd_remain));
  410. impl.rd_remain = 0;
  411. }
  412. // _Close the WebSocket Connection_
  413. do_fail(close_code::none, error::closed, ec);
  414. if(ec == error::closed)
  415. ec = {};
  416. }
  417. template<class NextLayer, bool deflateSupported>
  418. template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
  419. BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
  420. stream<NextLayer, deflateSupported>::
  421. async_close(close_reason const& cr, CloseHandler&& handler)
  422. {
  423. static_assert(is_async_stream<next_layer_type>::value,
  424. "AsyncStream type requirements not met");
  425. return net::async_initiate<
  426. CloseHandler,
  427. void(error_code)>(
  428. run_close_op{impl_},
  429. handler,
  430. cr);
  431. }
  432. } // websocket
  433. } // beast
  434. } // boost
  435. #endif