read.ipp 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383
  1. //
  2. // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
  11. #include <boost/beast/websocket/teardown.hpp>
  12. #include <boost/beast/core/bind_handler.hpp>
  13. #include <boost/beast/core/buffers_prefix.hpp>
  14. #include <boost/beast/core/buffers_suffix.hpp>
  15. #include <boost/beast/core/flat_static_buffer.hpp>
  16. #include <boost/beast/core/type_traits.hpp>
  17. #include <boost/beast/core/detail/clamp.hpp>
  18. #include <boost/beast/core/detail/config.hpp>
  19. #include <boost/asio/associated_allocator.hpp>
  20. #include <boost/asio/associated_executor.hpp>
  21. #include <boost/asio/coroutine.hpp>
  22. #include <boost/asio/executor_work_guard.hpp>
  23. #include <boost/asio/handler_continuation_hook.hpp>
  24. #include <boost/asio/handler_invoke_hook.hpp>
  25. #include <boost/asio/post.hpp>
  26. #include <boost/assert.hpp>
  27. #include <boost/config.hpp>
  28. #include <boost/optional.hpp>
  29. #include <boost/throw_exception.hpp>
  30. #include <algorithm>
  31. #include <limits>
  32. #include <memory>
  33. namespace boost {
  34. namespace beast {
  35. namespace websocket {
  36. namespace detail {
  37. template<>
  38. inline
  39. void
  40. stream_base<true>::
  41. inflate(
  42. zlib::z_params& zs,
  43. zlib::Flush flush,
  44. error_code& ec)
  45. {
  46. this->pmd_->zi.write(zs, flush, ec);
  47. }
  48. template<>
  49. inline
  50. void
  51. stream_base<true>::
  52. do_context_takeover_read(role_type role)
  53. {
  54. if((role == role_type::client &&
  55. pmd_config_.server_no_context_takeover) ||
  56. (role == role_type::server &&
  57. pmd_config_.client_no_context_takeover))
  58. {
  59. pmd_->zi.reset();
  60. }
  61. }
  62. } // detail
  63. //------------------------------------------------------------------------------
  64. /* Read some message frame data.
  65. Also reads and handles control frames.
  66. */
  67. template<class NextLayer, bool deflateSupported>
  68. template<
  69. class MutableBufferSequence,
  70. class Handler>
  71. class stream<NextLayer, deflateSupported>::read_some_op
  72. : public boost::asio::coroutine
  73. {
  74. Handler h_;
  75. stream<NextLayer, deflateSupported>& ws_;
  76. boost::asio::executor_work_guard<decltype(std::declval<
  77. stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
  78. MutableBufferSequence bs_;
  79. buffers_suffix<MutableBufferSequence> cb_;
  80. std::size_t bytes_written_ = 0;
  81. error_code result_;
  82. close_code code_;
  83. bool did_read_ = false;
  84. bool cont_ = false;
  85. public:
  86. static constexpr int id = 1; // for soft_mutex
  87. read_some_op(read_some_op&&) = default;
  88. read_some_op(read_some_op const&) = delete;
  89. template<class DeducedHandler>
  90. read_some_op(
  91. DeducedHandler&& h,
  92. stream<NextLayer, deflateSupported>& ws,
  93. MutableBufferSequence const& bs)
  94. : h_(std::forward<DeducedHandler>(h))
  95. , ws_(ws)
  96. , wg_(ws_.get_executor())
  97. , bs_(bs)
  98. , cb_(bs)
  99. , code_(close_code::none)
  100. {
  101. }
  102. using allocator_type =
  103. boost::asio::associated_allocator_t<Handler>;
  104. allocator_type
  105. get_allocator() const noexcept
  106. {
  107. return (boost::asio::get_associated_allocator)(h_);
  108. }
  109. using executor_type = boost::asio::associated_executor_t<
  110. Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
  111. executor_type
  112. get_executor() const noexcept
  113. {
  114. return (boost::asio::get_associated_executor)(
  115. h_, ws_.get_executor());
  116. }
  117. Handler&
  118. handler()
  119. {
  120. return h_;
  121. }
  122. void operator()(
  123. error_code ec = {},
  124. std::size_t bytes_transferred = 0,
  125. bool cont = true);
  126. friend
  127. bool asio_handler_is_continuation(read_some_op* op)
  128. {
  129. using boost::asio::asio_handler_is_continuation;
  130. return op->cont_ || asio_handler_is_continuation(
  131. std::addressof(op->h_));
  132. }
  133. template<class Function>
  134. friend
  135. void asio_handler_invoke(Function&& f, read_some_op* op)
  136. {
  137. using boost::asio::asio_handler_invoke;
  138. asio_handler_invoke(f, std::addressof(op->h_));
  139. }
  140. };
  141. template<class NextLayer, bool deflateSupported>
  142. template<class MutableBufferSequence, class Handler>
  143. void
  144. stream<NextLayer, deflateSupported>::
  145. read_some_op<MutableBufferSequence, Handler>::
  146. operator()(
  147. error_code ec,
  148. std::size_t bytes_transferred,
  149. bool cont)
  150. {
  151. using beast::detail::clamp;
  152. using boost::asio::buffer;
  153. using boost::asio::buffer_size;
  154. cont_ = cont;
  155. BOOST_ASIO_CORO_REENTER(*this)
  156. {
  157. // Maybe suspend
  158. do_maybe_suspend:
  159. if(ws_.rd_block_.try_lock(this))
  160. {
  161. // Make sure the stream is not closed
  162. if( ws_.status_ == status::closed ||
  163. ws_.status_ == status::failed)
  164. {
  165. ec = boost::asio::error::operation_aborted;
  166. goto upcall;
  167. }
  168. }
  169. else
  170. {
  171. do_suspend:
  172. // Suspend
  173. BOOST_ASIO_CORO_YIELD
  174. ws_.paused_r_rd_.emplace(std::move(*this));
  175. // Acquire the read block
  176. ws_.rd_block_.lock(this);
  177. // Resume
  178. BOOST_ASIO_CORO_YIELD
  179. boost::asio::post(
  180. ws_.get_executor(), std::move(*this));
  181. BOOST_ASSERT(ws_.rd_block_.is_locked(this));
  182. // The only way to get read blocked is if
  183. // a `close_op` wrote a close frame
  184. BOOST_ASSERT(ws_.wr_close_);
  185. BOOST_ASSERT(ws_.status_ != status::open);
  186. ec = boost::asio::error::operation_aborted;
  187. goto upcall;
  188. }
  189. // if status_ == status::closing, we want to suspend
  190. // the read operation until the close completes,
  191. // then finish the read with operation_aborted.
  192. loop:
  193. BOOST_ASSERT(ws_.rd_block_.is_locked(this));
  194. // See if we need to read a frame header. This
  195. // condition is structured to give the decompressor
  196. // a chance to emit the final empty deflate block
  197. //
  198. if(ws_.rd_remain_ == 0 &&
  199. (! ws_.rd_fh_.fin || ws_.rd_done_))
  200. {
  201. // Read frame header
  202. while(! ws_.parse_fh(
  203. ws_.rd_fh_, ws_.rd_buf_, result_))
  204. {
  205. if(result_)
  206. {
  207. // _Fail the WebSocket Connection_
  208. if(result_ == error::message_too_big)
  209. code_ = close_code::too_big;
  210. else
  211. code_ = close_code::protocol_error;
  212. goto close;
  213. }
  214. BOOST_ASSERT(ws_.rd_block_.is_locked(this));
  215. BOOST_ASIO_CORO_YIELD
  216. ws_.stream_.async_read_some(
  217. ws_.rd_buf_.prepare(read_size(
  218. ws_.rd_buf_, ws_.rd_buf_.max_size())),
  219. std::move(*this));
  220. BOOST_ASSERT(ws_.rd_block_.is_locked(this));
  221. if(! ws_.check_ok(ec))
  222. goto upcall;
  223. ws_.rd_buf_.commit(bytes_transferred);
  224. // Allow a close operation
  225. // to acquire the read block
  226. ws_.rd_block_.unlock(this);
  227. if( ws_.paused_r_close_.maybe_invoke())
  228. {
  229. // Suspend
  230. BOOST_ASSERT(ws_.rd_block_.is_locked());
  231. goto do_suspend;
  232. }
  233. // Acquire read block
  234. ws_.rd_block_.lock(this);
  235. }
  236. // Immediately apply the mask to the portion
  237. // of the buffer holding payload data.
  238. if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask)
  239. detail::mask_inplace(buffers_prefix(
  240. clamp(ws_.rd_fh_.len),
  241. ws_.rd_buf_.mutable_data()),
  242. ws_.rd_key_);
  243. if(detail::is_control(ws_.rd_fh_.op))
  244. {
  245. // Clear this otherwise the next
  246. // frame will be considered final.
  247. ws_.rd_fh_.fin = false;
  248. // Handle ping frame
  249. if(ws_.rd_fh_.op == detail::opcode::ping)
  250. {
  251. if(ws_.ctrl_cb_)
  252. {
  253. if(! cont_)
  254. {
  255. BOOST_ASIO_CORO_YIELD
  256. boost::asio::post(
  257. ws_.get_executor(),
  258. std::move(*this));
  259. BOOST_ASSERT(cont_);
  260. }
  261. }
  262. {
  263. auto const b = buffers_prefix(
  264. clamp(ws_.rd_fh_.len),
  265. ws_.rd_buf_.data());
  266. auto const len = buffer_size(b);
  267. BOOST_ASSERT(len == ws_.rd_fh_.len);
  268. ping_data payload;
  269. detail::read_ping(payload, b);
  270. ws_.rd_buf_.consume(len);
  271. // Ignore ping when closing
  272. if(ws_.status_ == status::closing)
  273. goto loop;
  274. if(ws_.ctrl_cb_)
  275. ws_.ctrl_cb_(
  276. frame_type::ping, payload);
  277. ws_.rd_fb_.reset();
  278. ws_.template write_ping<
  279. flat_static_buffer_base>(ws_.rd_fb_,
  280. detail::opcode::pong, payload);
  281. }
  282. // Allow a close operation
  283. // to acquire the read block
  284. ws_.rd_block_.unlock(this);
  285. ws_.paused_r_close_.maybe_invoke();
  286. // Maybe suspend
  287. if(! ws_.wr_block_.try_lock(this))
  288. {
  289. // Suspend
  290. BOOST_ASIO_CORO_YIELD
  291. ws_.paused_rd_.emplace(std::move(*this));
  292. // Acquire the write block
  293. ws_.wr_block_.lock(this);
  294. // Resume
  295. BOOST_ASIO_CORO_YIELD
  296. boost::asio::post(
  297. ws_.get_executor(), std::move(*this));
  298. BOOST_ASSERT(ws_.wr_block_.is_locked(this));
  299. // Make sure the stream is open
  300. if(! ws_.check_open(ec))
  301. goto upcall;
  302. }
  303. // Send pong
  304. BOOST_ASSERT(ws_.wr_block_.is_locked(this));
  305. BOOST_ASIO_CORO_YIELD
  306. boost::asio::async_write(ws_.stream_,
  307. ws_.rd_fb_.data(), std::move(*this));
  308. BOOST_ASSERT(ws_.wr_block_.is_locked(this));
  309. if(! ws_.check_ok(ec))
  310. goto upcall;
  311. ws_.wr_block_.unlock(this);
  312. ws_.paused_close_.maybe_invoke() ||
  313. ws_.paused_ping_.maybe_invoke() ||
  314. ws_.paused_wr_.maybe_invoke();
  315. goto do_maybe_suspend;
  316. }
  317. // Handle pong frame
  318. if(ws_.rd_fh_.op == detail::opcode::pong)
  319. {
  320. // Ignore pong when closing
  321. if(! ws_.wr_close_ && ws_.ctrl_cb_)
  322. {
  323. if(! cont_)
  324. {
  325. BOOST_ASIO_CORO_YIELD
  326. boost::asio::post(
  327. ws_.get_executor(),
  328. std::move(*this));
  329. BOOST_ASSERT(cont_);
  330. }
  331. }
  332. auto const cb = buffers_prefix(clamp(
  333. ws_.rd_fh_.len), ws_.rd_buf_.data());
  334. auto const len = buffer_size(cb);
  335. BOOST_ASSERT(len == ws_.rd_fh_.len);
  336. ping_data payload;
  337. detail::read_ping(payload, cb);
  338. ws_.rd_buf_.consume(len);
  339. // Ignore pong when closing
  340. if(! ws_.wr_close_ && ws_.ctrl_cb_)
  341. ws_.ctrl_cb_(frame_type::pong, payload);
  342. goto loop;
  343. }
  344. // Handle close frame
  345. BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close);
  346. {
  347. if(ws_.ctrl_cb_)
  348. {
  349. if(! cont_)
  350. {
  351. BOOST_ASIO_CORO_YIELD
  352. boost::asio::post(
  353. ws_.get_executor(),
  354. std::move(*this));
  355. BOOST_ASSERT(cont_);
  356. }
  357. }
  358. auto const cb = buffers_prefix(clamp(
  359. ws_.rd_fh_.len), ws_.rd_buf_.data());
  360. auto const len = buffer_size(cb);
  361. BOOST_ASSERT(len == ws_.rd_fh_.len);
  362. BOOST_ASSERT(! ws_.rd_close_);
  363. ws_.rd_close_ = true;
  364. close_reason cr;
  365. detail::read_close(cr, cb, result_);
  366. if(result_)
  367. {
  368. // _Fail the WebSocket Connection_
  369. code_ = close_code::protocol_error;
  370. goto close;
  371. }
  372. ws_.cr_ = cr;
  373. ws_.rd_buf_.consume(len);
  374. if(ws_.ctrl_cb_)
  375. ws_.ctrl_cb_(frame_type::close,
  376. ws_.cr_.reason);
  377. // See if we are already closing
  378. if(ws_.status_ == status::closing)
  379. {
  380. // _Close the WebSocket Connection_
  381. BOOST_ASSERT(ws_.wr_close_);
  382. code_ = close_code::none;
  383. result_ = error::closed;
  384. goto close;
  385. }
  386. // _Start the WebSocket Closing Handshake_
  387. code_ = cr.code == close_code::none ?
  388. close_code::normal :
  389. static_cast<close_code>(cr.code);
  390. result_ = error::closed;
  391. goto close;
  392. }
  393. }
  394. if(ws_.rd_fh_.len == 0 && ! ws_.rd_fh_.fin)
  395. {
  396. // Empty non-final frame
  397. goto loop;
  398. }
  399. ws_.rd_done_ = false;
  400. }
  401. if(! ws_.rd_deflated())
  402. {
  403. if(ws_.rd_remain_ > 0)
  404. {
  405. if(ws_.rd_buf_.size() == 0 && ws_.rd_buf_.max_size() >
  406. (std::min)(clamp(ws_.rd_remain_),
  407. buffer_size(cb_)))
  408. {
  409. // Fill the read buffer first, otherwise we
  410. // get fewer bytes at the cost of one I/O.
  411. BOOST_ASIO_CORO_YIELD
  412. ws_.stream_.async_read_some(
  413. ws_.rd_buf_.prepare(read_size(
  414. ws_.rd_buf_, ws_.rd_buf_.max_size())),
  415. std::move(*this));
  416. if(! ws_.check_ok(ec))
  417. goto upcall;
  418. ws_.rd_buf_.commit(bytes_transferred);
  419. if(ws_.rd_fh_.mask)
  420. detail::mask_inplace(buffers_prefix(clamp(
  421. ws_.rd_remain_), ws_.rd_buf_.mutable_data()),
  422. ws_.rd_key_);
  423. }
  424. if(ws_.rd_buf_.size() > 0)
  425. {
  426. // Copy from the read buffer.
  427. // The mask was already applied.
  428. bytes_transferred = buffer_copy(cb_,
  429. ws_.rd_buf_.data(), clamp(ws_.rd_remain_));
  430. auto const mb = buffers_prefix(
  431. bytes_transferred, cb_);
  432. ws_.rd_remain_ -= bytes_transferred;
  433. if(ws_.rd_op_ == detail::opcode::text)
  434. {
  435. if(! ws_.rd_utf8_.write(mb) ||
  436. (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
  437. ! ws_.rd_utf8_.finish()))
  438. {
  439. // _Fail the WebSocket Connection_
  440. code_ = close_code::bad_payload;
  441. result_ = error::bad_frame_payload;
  442. goto close;
  443. }
  444. }
  445. bytes_written_ += bytes_transferred;
  446. ws_.rd_size_ += bytes_transferred;
  447. ws_.rd_buf_.consume(bytes_transferred);
  448. }
  449. else
  450. {
  451. // Read into caller's buffer
  452. BOOST_ASSERT(ws_.rd_remain_ > 0);
  453. BOOST_ASSERT(buffer_size(cb_) > 0);
  454. BOOST_ASSERT(buffer_size(buffers_prefix(
  455. clamp(ws_.rd_remain_), cb_)) > 0);
  456. BOOST_ASIO_CORO_YIELD
  457. ws_.stream_.async_read_some(buffers_prefix(
  458. clamp(ws_.rd_remain_), cb_), std::move(*this));
  459. if(! ws_.check_ok(ec))
  460. goto upcall;
  461. BOOST_ASSERT(bytes_transferred > 0);
  462. auto const mb = buffers_prefix(
  463. bytes_transferred, cb_);
  464. ws_.rd_remain_ -= bytes_transferred;
  465. if(ws_.rd_fh_.mask)
  466. detail::mask_inplace(mb, ws_.rd_key_);
  467. if(ws_.rd_op_ == detail::opcode::text)
  468. {
  469. if(! ws_.rd_utf8_.write(mb) ||
  470. (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
  471. ! ws_.rd_utf8_.finish()))
  472. {
  473. // _Fail the WebSocket Connection_
  474. code_ = close_code::bad_payload;
  475. result_ = error::bad_frame_payload;
  476. goto close;
  477. }
  478. }
  479. bytes_written_ += bytes_transferred;
  480. ws_.rd_size_ += bytes_transferred;
  481. }
  482. }
  483. ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin;
  484. }
  485. else
  486. {
  487. // Read compressed message frame payload:
  488. // inflate even if rd_fh_.len == 0, otherwise we
  489. // never emit the end-of-stream deflate block.
  490. while(buffer_size(cb_) > 0)
  491. {
  492. if( ws_.rd_remain_ > 0 &&
  493. ws_.rd_buf_.size() == 0 &&
  494. ! did_read_)
  495. {
  496. // read new
  497. BOOST_ASIO_CORO_YIELD
  498. ws_.stream_.async_read_some(
  499. ws_.rd_buf_.prepare(read_size(
  500. ws_.rd_buf_, ws_.rd_buf_.max_size())),
  501. std::move(*this));
  502. if(! ws_.check_ok(ec))
  503. goto upcall;
  504. BOOST_ASSERT(bytes_transferred > 0);
  505. ws_.rd_buf_.commit(bytes_transferred);
  506. if(ws_.rd_fh_.mask)
  507. detail::mask_inplace(
  508. buffers_prefix(clamp(ws_.rd_remain_),
  509. ws_.rd_buf_.mutable_data()), ws_.rd_key_);
  510. did_read_ = true;
  511. }
  512. zlib::z_params zs;
  513. {
  514. auto const out = buffers_front(cb_);
  515. zs.next_out = out.data();
  516. zs.avail_out = out.size();
  517. BOOST_ASSERT(zs.avail_out > 0);
  518. }
  519. if(ws_.rd_remain_ > 0)
  520. {
  521. if(ws_.rd_buf_.size() > 0)
  522. {
  523. // use what's there
  524. auto const in = buffers_prefix(
  525. clamp(ws_.rd_remain_), buffers_front(
  526. ws_.rd_buf_.data()));
  527. zs.avail_in = in.size();
  528. zs.next_in = in.data();
  529. }
  530. else
  531. {
  532. break;
  533. }
  534. }
  535. else if(ws_.rd_fh_.fin)
  536. {
  537. // append the empty block codes
  538. static std::uint8_t constexpr
  539. empty_block[4] = {
  540. 0x00, 0x00, 0xff, 0xff };
  541. zs.next_in = empty_block;
  542. zs.avail_in = sizeof(empty_block);
  543. ws_.inflate(zs, zlib::Flush::sync, ec);
  544. if(! ec)
  545. {
  546. // https://github.com/madler/zlib/issues/280
  547. if(zs.total_out > 0)
  548. ec = error::partial_deflate_block;
  549. }
  550. if(! ws_.check_ok(ec))
  551. goto upcall;
  552. ws_.do_context_takeover_read(ws_.role_);
  553. ws_.rd_done_ = true;
  554. break;
  555. }
  556. else
  557. {
  558. break;
  559. }
  560. ws_.inflate(zs, zlib::Flush::sync, ec);
  561. if(! ws_.check_ok(ec))
  562. goto upcall;
  563. if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
  564. ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
  565. {
  566. // _Fail the WebSocket Connection_
  567. code_ = close_code::too_big;
  568. result_ = error::message_too_big;
  569. goto close;
  570. }
  571. cb_.consume(zs.total_out);
  572. ws_.rd_size_ += zs.total_out;
  573. ws_.rd_remain_ -= zs.total_in;
  574. ws_.rd_buf_.consume(zs.total_in);
  575. bytes_written_ += zs.total_out;
  576. }
  577. if(ws_.rd_op_ == detail::opcode::text)
  578. {
  579. // check utf8
  580. if(! ws_.rd_utf8_.write(
  581. buffers_prefix(bytes_written_, bs_)) || (
  582. ws_.rd_done_ && ! ws_.rd_utf8_.finish()))
  583. {
  584. // _Fail the WebSocket Connection_
  585. code_ = close_code::bad_payload;
  586. result_ = error::bad_frame_payload;
  587. goto close;
  588. }
  589. }
  590. }
  591. goto upcall;
  592. close:
  593. // Try to acquire the write block
  594. if(! ws_.wr_block_.try_lock(this))
  595. {
  596. // Suspend
  597. BOOST_ASIO_CORO_YIELD
  598. ws_.paused_rd_.emplace(std::move(*this));
  599. // Acquire the write block
  600. ws_.wr_block_.lock(this);
  601. // Resume
  602. BOOST_ASIO_CORO_YIELD
  603. boost::asio::post(
  604. ws_.get_executor(), std::move(*this));
  605. BOOST_ASSERT(ws_.wr_block_.is_locked(this));
  606. // Make sure the stream is open
  607. if(! ws_.check_open(ec))
  608. goto upcall;
  609. }
  610. // Set the status
  611. ws_.status_ = status::closing;
  612. if(! ws_.wr_close_)
  613. {
  614. ws_.wr_close_ = true;
  615. // Serialize close frame
  616. ws_.rd_fb_.reset();
  617. ws_.template write_close<
  618. flat_static_buffer_base>(
  619. ws_.rd_fb_, code_);
  620. // Send close frame
  621. BOOST_ASSERT(ws_.wr_block_.is_locked(this));
  622. BOOST_ASIO_CORO_YIELD
  623. boost::asio::async_write(
  624. ws_.stream_, ws_.rd_fb_.data(),
  625. std::move(*this));
  626. BOOST_ASSERT(ws_.wr_block_.is_locked(this));
  627. if(! ws_.check_ok(ec))
  628. goto upcall;
  629. }
  630. // Teardown
  631. using beast::websocket::async_teardown;
  632. BOOST_ASSERT(ws_.wr_block_.is_locked(this));
  633. BOOST_ASIO_CORO_YIELD
  634. async_teardown(ws_.role_,
  635. ws_.stream_, std::move(*this));
  636. BOOST_ASSERT(ws_.wr_block_.is_locked(this));
  637. if(ec == boost::asio::error::eof)
  638. {
  639. // Rationale:
  640. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  641. ec.assign(0, ec.category());
  642. }
  643. if(! ec)
  644. ec = result_;
  645. if(ec && ec != error::closed)
  646. ws_.status_ = status::failed;
  647. else
  648. ws_.status_ = status::closed;
  649. ws_.close();
  650. upcall:
  651. ws_.rd_block_.try_unlock(this);
  652. ws_.paused_r_close_.maybe_invoke();
  653. if(ws_.wr_block_.try_unlock(this))
  654. ws_.paused_close_.maybe_invoke() ||
  655. ws_.paused_ping_.maybe_invoke() ||
  656. ws_.paused_wr_.maybe_invoke();
  657. if(! cont_)
  658. {
  659. BOOST_ASIO_CORO_YIELD
  660. boost::asio::post(
  661. ws_.get_executor(),
  662. bind_handler(std::move(*this),
  663. ec, bytes_written_));
  664. }
  665. h_(ec, bytes_written_);
  666. }
  667. }
  668. //------------------------------------------------------------------------------
  669. template<class NextLayer, bool deflateSupported>
  670. template<
  671. class DynamicBuffer,
  672. class Handler>
  673. class stream<NextLayer, deflateSupported>::read_op
  674. : public boost::asio::coroutine
  675. {
  676. Handler h_;
  677. stream<NextLayer, deflateSupported>& ws_;
  678. boost::asio::executor_work_guard<decltype(std::declval<
  679. stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
  680. DynamicBuffer& b_;
  681. std::size_t limit_;
  682. std::size_t bytes_written_ = 0;
  683. bool some_;
  684. public:
  685. using allocator_type =
  686. boost::asio::associated_allocator_t<Handler>;
  687. read_op(read_op&&) = default;
  688. read_op(read_op const&) = delete;
  689. template<class DeducedHandler>
  690. read_op(
  691. DeducedHandler&& h,
  692. stream<NextLayer, deflateSupported>& ws,
  693. DynamicBuffer& b,
  694. std::size_t limit,
  695. bool some)
  696. : h_(std::forward<DeducedHandler>(h))
  697. , ws_(ws)
  698. , wg_(ws_.get_executor())
  699. , b_(b)
  700. , limit_(limit ? limit : (
  701. std::numeric_limits<std::size_t>::max)())
  702. , some_(some)
  703. {
  704. }
  705. allocator_type
  706. get_allocator() const noexcept
  707. {
  708. return (boost::asio::get_associated_allocator)(h_);
  709. }
  710. using executor_type = boost::asio::associated_executor_t<
  711. Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
  712. executor_type
  713. get_executor() const noexcept
  714. {
  715. return (boost::asio::get_associated_executor)(
  716. h_, ws_.get_executor());
  717. }
  718. void operator()(
  719. error_code ec = {},
  720. std::size_t bytes_transferred = 0);
  721. friend
  722. bool asio_handler_is_continuation(read_op* op)
  723. {
  724. using boost::asio::asio_handler_is_continuation;
  725. return asio_handler_is_continuation(
  726. std::addressof(op->h_));
  727. }
  728. template<class Function>
  729. friend
  730. void asio_handler_invoke(Function&& f, read_op* op)
  731. {
  732. using boost::asio::asio_handler_invoke;
  733. asio_handler_invoke(f, std::addressof(op->h_));
  734. }
  735. };
  736. template<class NextLayer, bool deflateSupported>
  737. template<class DynamicBuffer, class Handler>
  738. void
  739. stream<NextLayer, deflateSupported>::
  740. read_op<DynamicBuffer, Handler>::
  741. operator()(
  742. error_code ec,
  743. std::size_t bytes_transferred)
  744. {
  745. using beast::detail::clamp;
  746. using buffers_type = typename
  747. DynamicBuffer::mutable_buffers_type;
  748. boost::optional<buffers_type> mb;
  749. BOOST_ASIO_CORO_REENTER(*this)
  750. {
  751. do
  752. {
  753. try
  754. {
  755. mb.emplace(b_.prepare(clamp(
  756. ws_.read_size_hint(b_), limit_)));
  757. }
  758. catch(std::length_error const&)
  759. {
  760. ec = error::buffer_overflow;
  761. }
  762. if(ec)
  763. {
  764. BOOST_ASIO_CORO_YIELD
  765. boost::asio::post(
  766. ws_.get_executor(),
  767. bind_handler(std::move(*this),
  768. error::buffer_overflow, 0));
  769. break;
  770. }
  771. BOOST_ASIO_CORO_YIELD
  772. read_some_op<buffers_type, read_op>{
  773. std::move(*this), ws_, *mb}(
  774. {}, 0, false);
  775. if(ec)
  776. break;
  777. b_.commit(bytes_transferred);
  778. bytes_written_ += bytes_transferred;
  779. }
  780. while(! some_ && ! ws_.is_message_done());
  781. h_(ec, bytes_written_);
  782. }
  783. }
  784. //------------------------------------------------------------------------------
  785. template<class NextLayer, bool deflateSupported>
  786. template<class DynamicBuffer>
  787. std::size_t
  788. stream<NextLayer, deflateSupported>::
  789. read(DynamicBuffer& buffer)
  790. {
  791. static_assert(is_sync_stream<next_layer_type>::value,
  792. "SyncStream requirements not met");
  793. static_assert(
  794. boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
  795. "DynamicBuffer requirements not met");
  796. error_code ec;
  797. auto const bytes_written = read(buffer, ec);
  798. if(ec)
  799. BOOST_THROW_EXCEPTION(system_error{ec});
  800. return bytes_written;
  801. }
  802. template<class NextLayer, bool deflateSupported>
  803. template<class DynamicBuffer>
  804. std::size_t
  805. stream<NextLayer, deflateSupported>::
  806. read(DynamicBuffer& buffer, error_code& ec)
  807. {
  808. static_assert(is_sync_stream<next_layer_type>::value,
  809. "SyncStream requirements not met");
  810. static_assert(
  811. boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
  812. "DynamicBuffer requirements not met");
  813. std::size_t bytes_written = 0;
  814. do
  815. {
  816. bytes_written += read_some(buffer, 0, ec);
  817. if(ec)
  818. return bytes_written;
  819. }
  820. while(! is_message_done());
  821. return bytes_written;
  822. }
  823. template<class NextLayer, bool deflateSupported>
  824. template<class DynamicBuffer, class ReadHandler>
  825. BOOST_ASIO_INITFN_RESULT_TYPE(
  826. ReadHandler, void(error_code, std::size_t))
  827. stream<NextLayer, deflateSupported>::
  828. async_read(DynamicBuffer& buffer, ReadHandler&& handler)
  829. {
  830. static_assert(is_async_stream<next_layer_type>::value,
  831. "AsyncStream requirements not met");
  832. static_assert(
  833. boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
  834. "DynamicBuffer requirements not met");
  835. BOOST_BEAST_HANDLER_INIT(
  836. ReadHandler, void(error_code, std::size_t));
  837. read_op<
  838. DynamicBuffer,
  839. BOOST_ASIO_HANDLER_TYPE(
  840. ReadHandler, void(error_code, std::size_t))>{
  841. std::move(init.completion_handler),
  842. *this,
  843. buffer,
  844. 0,
  845. false}();
  846. return init.result.get();
  847. }
  848. //------------------------------------------------------------------------------
  849. template<class NextLayer, bool deflateSupported>
  850. template<class DynamicBuffer>
  851. std::size_t
  852. stream<NextLayer, deflateSupported>::
  853. read_some(
  854. DynamicBuffer& buffer,
  855. std::size_t limit)
  856. {
  857. static_assert(is_sync_stream<next_layer_type>::value,
  858. "SyncStream requirements not met");
  859. static_assert(
  860. boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
  861. "DynamicBuffer requirements not met");
  862. error_code ec;
  863. auto const bytes_written =
  864. read_some(buffer, limit, ec);
  865. if(ec)
  866. BOOST_THROW_EXCEPTION(system_error{ec});
  867. return bytes_written;
  868. }
  869. template<class NextLayer, bool deflateSupported>
  870. template<class DynamicBuffer>
  871. std::size_t
  872. stream<NextLayer, deflateSupported>::
  873. read_some(
  874. DynamicBuffer& buffer,
  875. std::size_t limit,
  876. error_code& ec)
  877. {
  878. static_assert(is_sync_stream<next_layer_type>::value,
  879. "SyncStream requirements not met");
  880. static_assert(
  881. boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
  882. "DynamicBuffer requirements not met");
  883. using beast::detail::clamp;
  884. if(! limit)
  885. limit = (std::numeric_limits<std::size_t>::max)();
  886. auto const size =
  887. clamp(read_size_hint(buffer), limit);
  888. BOOST_ASSERT(size > 0);
  889. boost::optional<typename
  890. DynamicBuffer::mutable_buffers_type> mb;
  891. try
  892. {
  893. mb.emplace(buffer.prepare(size));
  894. }
  895. catch(std::length_error const&)
  896. {
  897. ec = error::buffer_overflow;
  898. return 0;
  899. }
  900. auto const bytes_written = read_some(*mb, ec);
  901. buffer.commit(bytes_written);
  902. return bytes_written;
  903. }
  904. template<class NextLayer, bool deflateSupported>
  905. template<class DynamicBuffer, class ReadHandler>
  906. BOOST_ASIO_INITFN_RESULT_TYPE(
  907. ReadHandler, void(error_code, std::size_t))
  908. stream<NextLayer, deflateSupported>::
  909. async_read_some(
  910. DynamicBuffer& buffer,
  911. std::size_t limit,
  912. ReadHandler&& handler)
  913. {
  914. static_assert(is_async_stream<next_layer_type>::value,
  915. "AsyncStream requirements not met");
  916. static_assert(
  917. boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
  918. "DynamicBuffer requirements not met");
  919. BOOST_BEAST_HANDLER_INIT(
  920. ReadHandler, void(error_code, std::size_t));
  921. read_op<
  922. DynamicBuffer,
  923. BOOST_ASIO_HANDLER_TYPE(
  924. ReadHandler, void(error_code, std::size_t))>{
  925. std::move(init.completion_handler),
  926. *this,
  927. buffer,
  928. limit,
  929. true}({}, 0);
  930. return init.result.get();
  931. }
  932. //------------------------------------------------------------------------------
  933. template<class NextLayer, bool deflateSupported>
  934. template<class MutableBufferSequence>
  935. std::size_t
  936. stream<NextLayer, deflateSupported>::
  937. read_some(
  938. MutableBufferSequence const& buffers)
  939. {
  940. static_assert(is_sync_stream<next_layer_type>::value,
  941. "SyncStream requirements not met");
  942. static_assert(boost::asio::is_mutable_buffer_sequence<
  943. MutableBufferSequence>::value,
  944. "MutableBufferSequence requirements not met");
  945. error_code ec;
  946. auto const bytes_written = read_some(buffers, ec);
  947. if(ec)
  948. BOOST_THROW_EXCEPTION(system_error{ec});
  949. return bytes_written;
  950. }
  951. template<class NextLayer, bool deflateSupported>
  952. template<class MutableBufferSequence>
  953. std::size_t
  954. stream<NextLayer, deflateSupported>::
  955. read_some(
  956. MutableBufferSequence const& buffers,
  957. error_code& ec)
  958. {
  959. static_assert(is_sync_stream<next_layer_type>::value,
  960. "SyncStream requirements not met");
  961. static_assert(boost::asio::is_mutable_buffer_sequence<
  962. MutableBufferSequence>::value,
  963. "MutableBufferSequence requirements not met");
  964. using beast::detail::clamp;
  965. using boost::asio::buffer;
  966. using boost::asio::buffer_size;
  967. close_code code{};
  968. std::size_t bytes_written = 0;
  969. ec.assign(0, ec.category());
  970. // Make sure the stream is open
  971. if(! check_open(ec))
  972. return 0;
  973. loop:
  974. // See if we need to read a frame header. This
  975. // condition is structured to give the decompressor
  976. // a chance to emit the final empty deflate block
  977. //
  978. if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_))
  979. {
  980. // Read frame header
  981. error_code result;
  982. while(! parse_fh(rd_fh_, rd_buf_, result))
  983. {
  984. if(result)
  985. {
  986. // _Fail the WebSocket Connection_
  987. if(result == error::message_too_big)
  988. code = close_code::too_big;
  989. else
  990. code = close_code::protocol_error;
  991. do_fail(code, result, ec);
  992. return bytes_written;
  993. }
  994. auto const bytes_transferred =
  995. stream_.read_some(
  996. rd_buf_.prepare(read_size(
  997. rd_buf_, rd_buf_.max_size())),
  998. ec);
  999. if(! check_ok(ec))
  1000. return bytes_written;
  1001. rd_buf_.commit(bytes_transferred);
  1002. }
  1003. // Immediately apply the mask to the portion
  1004. // of the buffer holding payload data.
  1005. if(rd_fh_.len > 0 && rd_fh_.mask)
  1006. detail::mask_inplace(buffers_prefix(
  1007. clamp(rd_fh_.len), rd_buf_.mutable_data()),
  1008. rd_key_);
  1009. if(detail::is_control(rd_fh_.op))
  1010. {
  1011. // Get control frame payload
  1012. auto const b = buffers_prefix(
  1013. clamp(rd_fh_.len), rd_buf_.data());
  1014. auto const len = buffer_size(b);
  1015. BOOST_ASSERT(len == rd_fh_.len);
  1016. // Clear this otherwise the next
  1017. // frame will be considered final.
  1018. rd_fh_.fin = false;
  1019. // Handle ping frame
  1020. if(rd_fh_.op == detail::opcode::ping)
  1021. {
  1022. ping_data payload;
  1023. detail::read_ping(payload, b);
  1024. rd_buf_.consume(len);
  1025. if(wr_close_)
  1026. {
  1027. // Ignore ping when closing
  1028. goto loop;
  1029. }
  1030. if(ctrl_cb_)
  1031. ctrl_cb_(frame_type::ping, payload);
  1032. detail::frame_buffer fb;
  1033. write_ping<flat_static_buffer_base>(fb,
  1034. detail::opcode::pong, payload);
  1035. boost::asio::write(stream_, fb.data(), ec);
  1036. if(! check_ok(ec))
  1037. return bytes_written;
  1038. goto loop;
  1039. }
  1040. // Handle pong frame
  1041. if(rd_fh_.op == detail::opcode::pong)
  1042. {
  1043. ping_data payload;
  1044. detail::read_ping(payload, b);
  1045. rd_buf_.consume(len);
  1046. if(ctrl_cb_)
  1047. ctrl_cb_(frame_type::pong, payload);
  1048. goto loop;
  1049. }
  1050. // Handle close frame
  1051. BOOST_ASSERT(rd_fh_.op == detail::opcode::close);
  1052. {
  1053. BOOST_ASSERT(! rd_close_);
  1054. rd_close_ = true;
  1055. close_reason cr;
  1056. detail::read_close(cr, b, result);
  1057. if(result)
  1058. {
  1059. // _Fail the WebSocket Connection_
  1060. do_fail(close_code::protocol_error,
  1061. result, ec);
  1062. return bytes_written;
  1063. }
  1064. cr_ = cr;
  1065. rd_buf_.consume(len);
  1066. if(ctrl_cb_)
  1067. ctrl_cb_(frame_type::close, cr_.reason);
  1068. BOOST_ASSERT(! wr_close_);
  1069. // _Start the WebSocket Closing Handshake_
  1070. do_fail(
  1071. cr.code == close_code::none ?
  1072. close_code::normal :
  1073. static_cast<close_code>(cr.code),
  1074. error::closed, ec);
  1075. return bytes_written;
  1076. }
  1077. }
  1078. if(rd_fh_.len == 0 && ! rd_fh_.fin)
  1079. {
  1080. // Empty non-final frame
  1081. goto loop;
  1082. }
  1083. rd_done_ = false;
  1084. }
  1085. else
  1086. {
  1087. ec.assign(0, ec.category());
  1088. }
  1089. if(! this->rd_deflated())
  1090. {
  1091. if(rd_remain_ > 0)
  1092. {
  1093. if(rd_buf_.size() == 0 && rd_buf_.max_size() >
  1094. (std::min)(clamp(rd_remain_),
  1095. buffer_size(buffers)))
  1096. {
  1097. // Fill the read buffer first, otherwise we
  1098. // get fewer bytes at the cost of one I/O.
  1099. rd_buf_.commit(stream_.read_some(
  1100. rd_buf_.prepare(read_size(rd_buf_,
  1101. rd_buf_.max_size())), ec));
  1102. if(! check_ok(ec))
  1103. return bytes_written;
  1104. if(rd_fh_.mask)
  1105. detail::mask_inplace(
  1106. buffers_prefix(clamp(rd_remain_),
  1107. rd_buf_.mutable_data()), rd_key_);
  1108. }
  1109. if(rd_buf_.size() > 0)
  1110. {
  1111. // Copy from the read buffer.
  1112. // The mask was already applied.
  1113. auto const bytes_transferred =
  1114. buffer_copy(buffers, rd_buf_.data(),
  1115. clamp(rd_remain_));
  1116. auto const mb = buffers_prefix(
  1117. bytes_transferred, buffers);
  1118. rd_remain_ -= bytes_transferred;
  1119. if(rd_op_ == detail::opcode::text)
  1120. {
  1121. if(! rd_utf8_.write(mb) ||
  1122. (rd_remain_ == 0 && rd_fh_.fin &&
  1123. ! rd_utf8_.finish()))
  1124. {
  1125. // _Fail the WebSocket Connection_
  1126. do_fail(close_code::bad_payload,
  1127. error::bad_frame_payload, ec);
  1128. return bytes_written;
  1129. }
  1130. }
  1131. bytes_written += bytes_transferred;
  1132. rd_size_ += bytes_transferred;
  1133. rd_buf_.consume(bytes_transferred);
  1134. }
  1135. else
  1136. {
  1137. // Read into caller's buffer
  1138. BOOST_ASSERT(rd_remain_ > 0);
  1139. BOOST_ASSERT(buffer_size(buffers) > 0);
  1140. BOOST_ASSERT(buffer_size(buffers_prefix(
  1141. clamp(rd_remain_), buffers)) > 0);
  1142. auto const bytes_transferred =
  1143. stream_.read_some(buffers_prefix(
  1144. clamp(rd_remain_), buffers), ec);
  1145. if(! check_ok(ec))
  1146. return bytes_written;
  1147. BOOST_ASSERT(bytes_transferred > 0);
  1148. auto const mb = buffers_prefix(
  1149. bytes_transferred, buffers);
  1150. rd_remain_ -= bytes_transferred;
  1151. if(rd_fh_.mask)
  1152. detail::mask_inplace(mb, rd_key_);
  1153. if(rd_op_ == detail::opcode::text)
  1154. {
  1155. if(! rd_utf8_.write(mb) ||
  1156. (rd_remain_ == 0 && rd_fh_.fin &&
  1157. ! rd_utf8_.finish()))
  1158. {
  1159. // _Fail the WebSocket Connection_
  1160. do_fail(close_code::bad_payload,
  1161. error::bad_frame_payload, ec);
  1162. return bytes_written;
  1163. }
  1164. }
  1165. bytes_written += bytes_transferred;
  1166. rd_size_ += bytes_transferred;
  1167. }
  1168. }
  1169. rd_done_ = rd_remain_ == 0 && rd_fh_.fin;
  1170. }
  1171. else
  1172. {
  1173. // Read compressed message frame payload:
  1174. // inflate even if rd_fh_.len == 0, otherwise we
  1175. // never emit the end-of-stream deflate block.
  1176. //
  1177. bool did_read = false;
  1178. buffers_suffix<MutableBufferSequence> cb{buffers};
  1179. while(buffer_size(cb) > 0)
  1180. {
  1181. zlib::z_params zs;
  1182. {
  1183. auto const out = buffers_front(cb);
  1184. zs.next_out = out.data();
  1185. zs.avail_out = out.size();
  1186. BOOST_ASSERT(zs.avail_out > 0);
  1187. }
  1188. if(rd_remain_ > 0)
  1189. {
  1190. if(rd_buf_.size() > 0)
  1191. {
  1192. // use what's there
  1193. auto const in = buffers_prefix(
  1194. clamp(rd_remain_), buffers_front(
  1195. rd_buf_.data()));
  1196. zs.avail_in = in.size();
  1197. zs.next_in = in.data();
  1198. }
  1199. else if(! did_read)
  1200. {
  1201. // read new
  1202. auto const bytes_transferred =
  1203. stream_.read_some(
  1204. rd_buf_.prepare(read_size(
  1205. rd_buf_, rd_buf_.max_size())),
  1206. ec);
  1207. if(! check_ok(ec))
  1208. return bytes_written;
  1209. BOOST_ASSERT(bytes_transferred > 0);
  1210. rd_buf_.commit(bytes_transferred);
  1211. if(rd_fh_.mask)
  1212. detail::mask_inplace(
  1213. buffers_prefix(clamp(rd_remain_),
  1214. rd_buf_.mutable_data()), rd_key_);
  1215. auto const in = buffers_prefix(
  1216. clamp(rd_remain_), buffers_front(
  1217. rd_buf_.data()));
  1218. zs.avail_in = in.size();
  1219. zs.next_in = in.data();
  1220. did_read = true;
  1221. }
  1222. else
  1223. {
  1224. break;
  1225. }
  1226. }
  1227. else if(rd_fh_.fin)
  1228. {
  1229. // append the empty block codes
  1230. static std::uint8_t constexpr
  1231. empty_block[4] = {
  1232. 0x00, 0x00, 0xff, 0xff };
  1233. zs.next_in = empty_block;
  1234. zs.avail_in = sizeof(empty_block);
  1235. this->inflate(zs, zlib::Flush::sync, ec);
  1236. if(! ec)
  1237. {
  1238. // https://github.com/madler/zlib/issues/280
  1239. if(zs.total_out > 0)
  1240. ec = error::partial_deflate_block;
  1241. }
  1242. if(! check_ok(ec))
  1243. return bytes_written;
  1244. this->do_context_takeover_read(role_);
  1245. rd_done_ = true;
  1246. break;
  1247. }
  1248. else
  1249. {
  1250. break;
  1251. }
  1252. this->inflate(zs, zlib::Flush::sync, ec);
  1253. if(! check_ok(ec))
  1254. return bytes_written;
  1255. if(rd_msg_max_ && beast::detail::sum_exceeds(
  1256. rd_size_, zs.total_out, rd_msg_max_))
  1257. {
  1258. do_fail(close_code::too_big,
  1259. error::message_too_big, ec);
  1260. return bytes_written;
  1261. }
  1262. cb.consume(zs.total_out);
  1263. rd_size_ += zs.total_out;
  1264. rd_remain_ -= zs.total_in;
  1265. rd_buf_.consume(zs.total_in);
  1266. bytes_written += zs.total_out;
  1267. }
  1268. if(rd_op_ == detail::opcode::text)
  1269. {
  1270. // check utf8
  1271. if(! rd_utf8_.write(
  1272. buffers_prefix(bytes_written, buffers)) || (
  1273. rd_done_ && ! rd_utf8_.finish()))
  1274. {
  1275. // _Fail the WebSocket Connection_
  1276. do_fail(close_code::bad_payload,
  1277. error::bad_frame_payload, ec);
  1278. return bytes_written;
  1279. }
  1280. }
  1281. }
  1282. return bytes_written;
  1283. }
  1284. template<class NextLayer, bool deflateSupported>
  1285. template<class MutableBufferSequence, class ReadHandler>
  1286. BOOST_ASIO_INITFN_RESULT_TYPE(
  1287. ReadHandler, void(error_code, std::size_t))
  1288. stream<NextLayer, deflateSupported>::
  1289. async_read_some(
  1290. MutableBufferSequence const& buffers,
  1291. ReadHandler&& handler)
  1292. {
  1293. static_assert(is_async_stream<next_layer_type>::value,
  1294. "AsyncStream requirements not met");
  1295. static_assert(boost::asio::is_mutable_buffer_sequence<
  1296. MutableBufferSequence>::value,
  1297. "MutableBufferSequence requirements not met");
  1298. BOOST_BEAST_HANDLER_INIT(
  1299. ReadHandler, void(error_code, std::size_t));
  1300. read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
  1301. ReadHandler, void(error_code, std::size_t))>{
  1302. std::move(init.completion_handler), *this, buffers}(
  1303. {}, 0, false);
  1304. return init.result.get();
  1305. }
  1306. } // websocket
  1307. } // beast
  1308. } // boost
  1309. #endif