read.hpp 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
  11. #include <boost/beast/core/buffer_traits.hpp>
  12. #include <boost/beast/websocket/teardown.hpp>
  13. #include <boost/beast/websocket/detail/mask.hpp>
  14. #include <boost/beast/websocket/impl/stream_impl.hpp>
  15. #include <boost/beast/core/async_base.hpp>
  16. #include <boost/beast/core/buffers_prefix.hpp>
  17. #include <boost/beast/core/buffers_suffix.hpp>
  18. #include <boost/beast/core/flat_static_buffer.hpp>
  19. #include <boost/beast/core/read_size.hpp>
  20. #include <boost/beast/core/stream_traits.hpp>
  21. #include <boost/beast/core/detail/bind_continuation.hpp>
  22. #include <boost/beast/core/detail/buffer.hpp>
  23. #include <boost/beast/core/detail/clamp.hpp>
  24. #include <boost/beast/core/detail/config.hpp>
  25. #include <boost/asio/coroutine.hpp>
  26. #include <boost/assert.hpp>
  27. #include <boost/config.hpp>
  28. #include <boost/optional.hpp>
  29. #include <boost/throw_exception.hpp>
  30. #include <algorithm>
  31. #include <limits>
  32. #include <memory>
  33. namespace boost {
  34. namespace beast {
  35. namespace websocket {
  36. /* Read some message data into a buffer sequence.
  37. Also reads and handles control frames.
  38. */
  39. template<class NextLayer, bool deflateSupported>
  40. template<class Handler, class MutableBufferSequence>
  41. class stream<NextLayer, deflateSupported>::read_some_op
  42. : public beast::async_base<
  43. Handler, beast::executor_type<stream>>
  44. , public asio::coroutine
  45. {
  46. boost::weak_ptr<impl_type> wp_;
  47. MutableBufferSequence bs_;
  48. buffers_suffix<MutableBufferSequence> cb_;
  49. std::size_t bytes_written_ = 0;
  50. error_code result_;
  51. close_code code_;
  52. bool did_read_ = false;
  53. public:
  54. static constexpr int id = 1; // for soft_mutex
  55. template<class Handler_>
  56. read_some_op(
  57. Handler_&& h,
  58. boost::shared_ptr<impl_type> const& sp,
  59. MutableBufferSequence const& bs)
  60. : async_base<
  61. Handler, beast::executor_type<stream>>(
  62. std::forward<Handler_>(h),
  63. sp->stream().get_executor())
  64. , wp_(sp)
  65. , bs_(bs)
  66. , cb_(bs)
  67. , code_(close_code::none)
  68. {
  69. (*this)({}, 0, false);
  70. }
  71. void operator()(
  72. error_code ec = {},
  73. std::size_t bytes_transferred = 0,
  74. bool cont = true)
  75. {
  76. using beast::detail::clamp;
  77. auto sp = wp_.lock();
  78. if(! sp)
  79. {
  80. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  81. bytes_written_ = 0;
  82. return this->complete(cont, ec, bytes_written_);
  83. }
  84. auto& impl = *sp;
  85. BOOST_ASIO_CORO_REENTER(*this)
  86. {
  87. impl.update_timer(this->get_executor());
  88. acquire_read_lock:
  89. // Acquire the read lock
  90. if(! impl.rd_block.try_lock(this))
  91. {
  92. do_suspend:
  93. BOOST_ASIO_CORO_YIELD
  94. {
  95. BOOST_ASIO_HANDLER_LOCATION((
  96. __FILE__, __LINE__,
  97. "websocket::async_read_some"));
  98. this->set_allowed_cancellation(net::cancellation_type::all);
  99. impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
  100. }
  101. if (ec)
  102. return this->complete(cont, ec, bytes_written_);
  103. this->set_allowed_cancellation(net::cancellation_type::terminal);
  104. impl.rd_block.lock(this);
  105. BOOST_ASIO_CORO_YIELD
  106. {
  107. BOOST_ASIO_HANDLER_LOCATION((
  108. __FILE__, __LINE__,
  109. "websocket::async_read_some"));
  110. const auto ex = this->get_immediate_executor();
  111. net::dispatch(ex, std::move(*this));
  112. }
  113. BOOST_ASSERT(impl.rd_block.is_locked(this));
  114. BOOST_ASSERT(!ec);
  115. if(impl.check_stop_now(ec))
  116. {
  117. // Issue 2264 - There is no guarantee that the next
  118. // error will be operation_aborted.
  119. // The error could be a result of the peer resetting the
  120. // connection
  121. // BOOST_ASSERT(ec == net::error::operation_aborted);
  122. goto upcall;
  123. }
  124. // VFALCO Should never get here
  125. // The only way to get read blocked is if
  126. // a `close_op` wrote a close frame
  127. BOOST_ASSERT(impl.wr_close);
  128. BOOST_ASSERT(impl.status_ != status::open);
  129. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  130. goto upcall;
  131. }
  132. else
  133. {
  134. // Make sure the stream is not closed
  135. if( impl.status_ == status::closed ||
  136. impl.status_ == status::failed)
  137. {
  138. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  139. goto upcall;
  140. }
  141. }
  142. // if status_ == status::closing, we want to suspend
  143. // the read operation until the close completes,
  144. // then finish the read with operation_aborted.
  145. loop:
  146. BOOST_ASSERT(impl.rd_block.is_locked(this));
  147. // See if we need to read a frame header. This
  148. // condition is structured to give the decompressor
  149. // a chance to emit the final empty deflate block
  150. //
  151. if(impl.rd_remain == 0 &&
  152. (! impl.rd_fh.fin || impl.rd_done))
  153. {
  154. // Read frame header
  155. while(! impl.parse_fh(
  156. impl.rd_fh, impl.rd_buf, result_))
  157. {
  158. if(result_)
  159. {
  160. // _Fail the WebSocket Connection_
  161. if(result_ == error::message_too_big)
  162. code_ = close_code::too_big;
  163. else
  164. code_ = close_code::protocol_error;
  165. goto close;
  166. }
  167. BOOST_ASSERT(impl.rd_block.is_locked(this));
  168. BOOST_ASIO_CORO_YIELD
  169. {
  170. BOOST_ASIO_HANDLER_LOCATION((
  171. __FILE__, __LINE__,
  172. "websocket::async_read_some"));
  173. impl.stream().async_read_some(
  174. impl.rd_buf.prepare(read_size(
  175. impl.rd_buf, impl.rd_buf.max_size())),
  176. std::move(*this));
  177. }
  178. BOOST_ASSERT(impl.rd_block.is_locked(this));
  179. impl.rd_buf.commit(bytes_transferred);
  180. if(impl.check_stop_now(ec))
  181. goto upcall;
  182. impl.reset_idle();
  183. // Allow a close operation
  184. // to acquire the read block
  185. impl.rd_block.unlock(this);
  186. if( impl.op_r_close.maybe_invoke())
  187. {
  188. // Suspend
  189. BOOST_ASSERT(impl.rd_block.is_locked());
  190. goto do_suspend;
  191. }
  192. // Acquire read block
  193. impl.rd_block.lock(this);
  194. }
  195. // Immediately apply the mask to the portion
  196. // of the buffer holding payload data.
  197. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  198. detail::mask_inplace(buffers_prefix(
  199. clamp(impl.rd_fh.len),
  200. impl.rd_buf.data()),
  201. impl.rd_key);
  202. if(detail::is_control(impl.rd_fh.op))
  203. {
  204. // Clear this otherwise the next
  205. // frame will be considered final.
  206. impl.rd_fh.fin = false;
  207. // Handle ping frame
  208. if(impl.rd_fh.op == detail::opcode::ping)
  209. {
  210. impl.update_timer(this->get_executor());
  211. if(impl.ctrl_cb)
  212. {
  213. if(! cont)
  214. {
  215. BOOST_ASIO_CORO_YIELD
  216. {
  217. BOOST_ASIO_HANDLER_LOCATION((
  218. __FILE__, __LINE__,
  219. "websocket::async_read_some"));
  220. const auto ex = this->get_immediate_executor();
  221. net::dispatch(ex, std::move(*this));
  222. }
  223. BOOST_ASSERT(cont);
  224. // VFALCO call check_stop_now() here?
  225. }
  226. }
  227. {
  228. auto const b = buffers_prefix(
  229. clamp(impl.rd_fh.len),
  230. impl.rd_buf.data());
  231. auto const len = buffer_bytes(b);
  232. BOOST_ASSERT(len == impl.rd_fh.len);
  233. ping_data payload;
  234. detail::read_ping(payload, b);
  235. impl.rd_buf.consume(len);
  236. // Ignore ping when closing
  237. if(impl.status_ == status::closing)
  238. goto loop;
  239. if(impl.ctrl_cb)
  240. impl.ctrl_cb(
  241. frame_type::ping, to_string_view(payload));
  242. impl.rd_fb.clear();
  243. impl.template write_ping<
  244. flat_static_buffer_base>(impl.rd_fb,
  245. detail::opcode::pong, payload);
  246. }
  247. // Allow a close operation
  248. // to acquire the read block
  249. impl.rd_block.unlock(this);
  250. impl.op_r_close.maybe_invoke();
  251. // Acquire the write lock
  252. if(! impl.wr_block.try_lock(this))
  253. {
  254. BOOST_ASIO_CORO_YIELD
  255. {
  256. BOOST_ASIO_HANDLER_LOCATION((
  257. __FILE__, __LINE__,
  258. "websocket::async_read_some"));
  259. impl.op_rd.emplace(std::move(*this));
  260. }
  261. if (ec)
  262. return this->complete(cont, ec, bytes_written_);
  263. impl.wr_block.lock(this);
  264. BOOST_ASIO_CORO_YIELD
  265. {
  266. BOOST_ASIO_HANDLER_LOCATION((
  267. __FILE__, __LINE__,
  268. "websocket::async_read_some"));
  269. const auto ex = this->get_immediate_executor();
  270. net::dispatch(ex, std::move(*this));
  271. }
  272. BOOST_ASSERT(impl.wr_block.is_locked(this));
  273. if(impl.check_stop_now(ec))
  274. goto upcall;
  275. }
  276. // Send pong
  277. BOOST_ASSERT(impl.wr_block.is_locked(this));
  278. BOOST_ASIO_CORO_YIELD
  279. {
  280. BOOST_ASIO_HANDLER_LOCATION((
  281. __FILE__, __LINE__,
  282. "websocket::async_read_some"));
  283. net::async_write(
  284. impl.stream(), net::const_buffer(impl.rd_fb.data()),
  285. beast::detail::bind_continuation(std::move(*this)));
  286. }
  287. BOOST_ASSERT(impl.wr_block.is_locked(this));
  288. if(impl.check_stop_now(ec))
  289. goto upcall;
  290. impl.wr_block.unlock(this);
  291. impl.op_close.maybe_invoke()
  292. || impl.op_idle_ping.maybe_invoke()
  293. || impl.op_ping.maybe_invoke()
  294. || impl.op_wr.maybe_invoke();
  295. goto acquire_read_lock;
  296. }
  297. // Handle pong frame
  298. if(impl.rd_fh.op == detail::opcode::pong)
  299. {
  300. // Ignore pong when closing
  301. if(! impl.wr_close && impl.ctrl_cb)
  302. {
  303. if(! cont)
  304. {
  305. BOOST_ASIO_CORO_YIELD
  306. {
  307. BOOST_ASIO_HANDLER_LOCATION((
  308. __FILE__, __LINE__,
  309. "websocket::async_read_some"));
  310. const auto ex = this->get_immediate_executor();
  311. net::dispatch(ex, std::move(*this));
  312. }
  313. BOOST_ASSERT(cont);
  314. }
  315. }
  316. auto const cb = buffers_prefix(clamp(
  317. impl.rd_fh.len), impl.rd_buf.data());
  318. auto const len = buffer_bytes(cb);
  319. BOOST_ASSERT(len == impl.rd_fh.len);
  320. ping_data payload;
  321. detail::read_ping(payload, cb);
  322. impl.rd_buf.consume(len);
  323. // Ignore pong when closing
  324. if(! impl.wr_close && impl.ctrl_cb)
  325. impl.ctrl_cb(frame_type::pong, to_string_view(payload));
  326. goto loop;
  327. }
  328. // Handle close frame
  329. BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
  330. {
  331. if(impl.ctrl_cb)
  332. {
  333. if(! cont)
  334. {
  335. BOOST_ASIO_CORO_YIELD
  336. {
  337. BOOST_ASIO_HANDLER_LOCATION((
  338. __FILE__, __LINE__,
  339. "websocket::async_read_some"));
  340. const auto ex = this->get_immediate_executor();
  341. net::dispatch(ex, std::move(*this));
  342. }
  343. BOOST_ASSERT(cont);
  344. }
  345. }
  346. auto const cb = buffers_prefix(clamp(
  347. impl.rd_fh.len), impl.rd_buf.data());
  348. auto const len = buffer_bytes(cb);
  349. BOOST_ASSERT(len == impl.rd_fh.len);
  350. BOOST_ASSERT(! impl.rd_close);
  351. impl.rd_close = true;
  352. close_reason cr;
  353. detail::read_close(cr, cb, result_);
  354. if(result_)
  355. {
  356. // _Fail the WebSocket Connection_
  357. code_ = close_code::protocol_error;
  358. goto close;
  359. }
  360. impl.cr = cr;
  361. impl.rd_buf.consume(len);
  362. if(impl.ctrl_cb)
  363. impl.ctrl_cb(frame_type::close,
  364. to_string_view(impl.cr.reason));
  365. // See if we are already closing
  366. if(impl.status_ == status::closing)
  367. {
  368. // _Close the WebSocket Connection_
  369. BOOST_ASSERT(impl.wr_close);
  370. code_ = close_code::none;
  371. result_ = error::closed;
  372. goto close;
  373. }
  374. // _Start the WebSocket Closing Handshake_
  375. code_ = cr.code == close_code::none ?
  376. close_code::normal :
  377. static_cast<close_code>(cr.code);
  378. result_ = error::closed;
  379. goto close;
  380. }
  381. }
  382. if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
  383. {
  384. // Empty non-final frame
  385. goto loop;
  386. }
  387. impl.rd_done = false;
  388. }
  389. if(! impl.rd_deflated())
  390. {
  391. if(impl.rd_remain > 0)
  392. {
  393. if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
  394. (std::min)(clamp(impl.rd_remain),
  395. buffer_bytes(cb_)))
  396. {
  397. // Fill the read buffer first, otherwise we
  398. // get fewer bytes at the cost of one I/O.
  399. BOOST_ASIO_CORO_YIELD
  400. {
  401. BOOST_ASIO_HANDLER_LOCATION((
  402. __FILE__, __LINE__,
  403. "websocket::async_read_some"));
  404. impl.stream().async_read_some(
  405. impl.rd_buf.prepare(read_size(
  406. impl.rd_buf, impl.rd_buf.max_size())),
  407. std::move(*this));
  408. }
  409. impl.rd_buf.commit(bytes_transferred);
  410. if(impl.check_stop_now(ec))
  411. goto upcall;
  412. impl.reset_idle();
  413. if(impl.rd_fh.mask)
  414. detail::mask_inplace(buffers_prefix(clamp(
  415. impl.rd_remain), impl.rd_buf.data()),
  416. impl.rd_key);
  417. }
  418. if(impl.rd_buf.size() > 0)
  419. {
  420. // Copy from the read buffer.
  421. // The mask was already applied.
  422. bytes_transferred = net::buffer_copy(cb_,
  423. impl.rd_buf.data(), clamp(impl.rd_remain));
  424. auto const mb = buffers_prefix(
  425. bytes_transferred, cb_);
  426. impl.rd_remain -= bytes_transferred;
  427. if(impl.rd_op == detail::opcode::text)
  428. {
  429. if(! impl.rd_utf8.write(mb) ||
  430. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  431. ! impl.rd_utf8.finish()))
  432. {
  433. // _Fail the WebSocket Connection_
  434. code_ = close_code::bad_payload;
  435. result_ = error::bad_frame_payload;
  436. goto close;
  437. }
  438. }
  439. bytes_written_ += bytes_transferred;
  440. impl.rd_size += bytes_transferred;
  441. impl.rd_buf.consume(bytes_transferred);
  442. }
  443. else
  444. {
  445. // Read into caller's buffer
  446. BOOST_ASSERT(impl.rd_remain > 0);
  447. BOOST_ASSERT(buffer_bytes(cb_) > 0);
  448. BOOST_ASSERT(buffer_bytes(buffers_prefix(
  449. clamp(impl.rd_remain), cb_)) > 0);
  450. BOOST_ASIO_CORO_YIELD
  451. {
  452. BOOST_ASIO_HANDLER_LOCATION((
  453. __FILE__, __LINE__,
  454. "websocket::async_read_some"));
  455. impl.stream().async_read_some(buffers_prefix(
  456. clamp(impl.rd_remain), cb_), std::move(*this));
  457. }
  458. if(impl.check_stop_now(ec))
  459. goto upcall;
  460. impl.reset_idle();
  461. BOOST_ASSERT(bytes_transferred > 0);
  462. auto const mb = buffers_prefix(
  463. bytes_transferred, cb_);
  464. impl.rd_remain -= bytes_transferred;
  465. if(impl.rd_fh.mask)
  466. detail::mask_inplace(mb, impl.rd_key);
  467. if(impl.rd_op == detail::opcode::text)
  468. {
  469. if(! impl.rd_utf8.write(mb) ||
  470. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  471. ! impl.rd_utf8.finish()))
  472. {
  473. // _Fail the WebSocket Connection_
  474. code_ = close_code::bad_payload;
  475. result_ = error::bad_frame_payload;
  476. goto close;
  477. }
  478. }
  479. bytes_written_ += bytes_transferred;
  480. impl.rd_size += bytes_transferred;
  481. }
  482. }
  483. BOOST_ASSERT( ! impl.rd_done );
  484. if( impl.rd_remain == 0 && impl.rd_fh.fin )
  485. impl.rd_done = true;
  486. }
  487. else
  488. {
  489. // Read compressed message frame payload:
  490. // inflate even if rd_fh_.len == 0, otherwise we
  491. // never emit the end-of-stream deflate block.
  492. while(buffer_bytes(cb_) > 0)
  493. {
  494. if( impl.rd_remain > 0 &&
  495. impl.rd_buf.size() == 0 &&
  496. ! did_read_)
  497. {
  498. // read new
  499. BOOST_ASIO_CORO_YIELD
  500. {
  501. BOOST_ASIO_HANDLER_LOCATION((
  502. __FILE__, __LINE__,
  503. "websocket::async_read_some"));
  504. impl.stream().async_read_some(
  505. impl.rd_buf.prepare(read_size(
  506. impl.rd_buf, impl.rd_buf.max_size())),
  507. std::move(*this));
  508. }
  509. if(impl.check_stop_now(ec))
  510. goto upcall;
  511. impl.reset_idle();
  512. BOOST_ASSERT(bytes_transferred > 0);
  513. impl.rd_buf.commit(bytes_transferred);
  514. if(impl.rd_fh.mask)
  515. detail::mask_inplace(
  516. buffers_prefix(clamp(impl.rd_remain),
  517. impl.rd_buf.data()), impl.rd_key);
  518. did_read_ = true;
  519. }
  520. zlib::z_params zs;
  521. {
  522. auto const out = buffers_front(cb_);
  523. zs.next_out = out.data();
  524. zs.avail_out = out.size();
  525. BOOST_ASSERT(zs.avail_out > 0);
  526. }
  527. if(impl.rd_remain > 0)
  528. {
  529. if(impl.rd_buf.size() > 0)
  530. {
  531. // use what's there
  532. auto const in = buffers_prefix(
  533. clamp(impl.rd_remain), buffers_front(
  534. impl.rd_buf.data()));
  535. zs.avail_in = in.size();
  536. zs.next_in = in.data();
  537. }
  538. else
  539. {
  540. break;
  541. }
  542. impl.inflate(zs, ec);
  543. if(impl.check_stop_now(ec))
  544. goto upcall;
  545. impl.rd_remain -= zs.total_in;
  546. impl.rd_buf.consume(zs.total_in);
  547. }
  548. else if(impl.rd_fh.fin)
  549. {
  550. impl.inflate_with_eb(zs, ec);
  551. if(impl.check_stop_now(ec))
  552. goto upcall;
  553. if(zs.total_out == 0)
  554. {
  555. impl.do_context_takeover_read(impl.role);
  556. impl.rd_done = true;
  557. break;
  558. }
  559. }
  560. else
  561. {
  562. break;
  563. }
  564. if(impl.rd_msg_max && beast::detail::sum_exceeds(
  565. impl.rd_size, zs.total_out, impl.rd_msg_max))
  566. {
  567. // _Fail the WebSocket Connection_
  568. code_ = close_code::too_big;
  569. result_ = error::message_too_big;
  570. goto close;
  571. }
  572. cb_.consume(zs.total_out);
  573. impl.rd_size += zs.total_out;
  574. bytes_written_ += zs.total_out;
  575. }
  576. if(impl.rd_op == detail::opcode::text)
  577. {
  578. // check utf8
  579. if(! impl.rd_utf8.write(
  580. buffers_prefix(bytes_written_, bs_)) || (
  581. impl.rd_done && ! impl.rd_utf8.finish()))
  582. {
  583. // _Fail the WebSocket Connection_
  584. code_ = close_code::bad_payload;
  585. result_ = error::bad_frame_payload;
  586. goto close;
  587. }
  588. }
  589. }
  590. goto upcall;
  591. close:
  592. // Acquire the write lock
  593. if(! impl.wr_block.try_lock(this))
  594. {
  595. BOOST_ASIO_CORO_YIELD
  596. {
  597. BOOST_ASIO_HANDLER_LOCATION((
  598. __FILE__, __LINE__,
  599. "websocket::async_read_some"));
  600. impl.op_rd.emplace(std::move(*this));
  601. }
  602. if (ec)
  603. return this->complete(cont, ec, bytes_written_);
  604. impl.wr_block.lock(this);
  605. BOOST_ASIO_CORO_YIELD
  606. {
  607. BOOST_ASIO_HANDLER_LOCATION((
  608. __FILE__, __LINE__,
  609. "websocket::async_read_some"));
  610. const auto ex = this->get_immediate_executor();
  611. net::dispatch(ex, std::move(*this));
  612. }
  613. BOOST_ASSERT(impl.wr_block.is_locked(this));
  614. if(impl.check_stop_now(ec))
  615. goto upcall;
  616. }
  617. impl.change_status(status::closing);
  618. impl.update_timer(this->get_executor());
  619. if(! impl.wr_close)
  620. {
  621. impl.wr_close = true;
  622. // Serialize close frame
  623. impl.rd_fb.clear();
  624. impl.template write_close<
  625. flat_static_buffer_base>(
  626. impl.rd_fb, code_);
  627. // Send close frame
  628. BOOST_ASSERT(impl.wr_block.is_locked(this));
  629. BOOST_ASIO_CORO_YIELD
  630. {
  631. BOOST_ASIO_HANDLER_LOCATION((
  632. __FILE__, __LINE__,
  633. "websocket::async_read_some"));
  634. net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
  635. beast::detail::bind_continuation(std::move(*this)));
  636. }
  637. BOOST_ASSERT(impl.wr_block.is_locked(this));
  638. if(impl.check_stop_now(ec))
  639. goto upcall;
  640. }
  641. // Teardown
  642. using beast::websocket::async_teardown;
  643. BOOST_ASSERT(impl.wr_block.is_locked(this));
  644. BOOST_ASIO_CORO_YIELD
  645. {
  646. BOOST_ASIO_HANDLER_LOCATION((
  647. __FILE__, __LINE__,
  648. "websocket::async_read_some"));
  649. async_teardown(impl.role, impl.stream(),
  650. beast::detail::bind_continuation(std::move(*this)));
  651. }
  652. BOOST_ASSERT(impl.wr_block.is_locked(this));
  653. if(ec == net::error::eof)
  654. {
  655. // Rationale:
  656. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  657. ec = {};
  658. }
  659. if(! ec)
  660. {
  661. BOOST_BEAST_ASSIGN_EC(ec, result_);
  662. }
  663. if(ec && ec != error::closed)
  664. impl.change_status(status::failed);
  665. else
  666. impl.change_status(status::closed);
  667. impl.close();
  668. upcall:
  669. impl.rd_block.try_unlock(this);
  670. impl.op_r_close.maybe_invoke();
  671. if(impl.wr_block.try_unlock(this))
  672. impl.op_close.maybe_invoke()
  673. || impl.op_idle_ping.maybe_invoke()
  674. || impl.op_ping.maybe_invoke()
  675. || impl.op_wr.maybe_invoke();
  676. this->complete(cont, ec, bytes_written_);
  677. }
  678. }
  679. };
  680. //------------------------------------------------------------------------------
  681. template<class NextLayer, bool deflateSupported>
  682. template<class Handler, class DynamicBuffer>
  683. class stream<NextLayer, deflateSupported>::read_op
  684. : public beast::async_base<
  685. Handler, beast::executor_type<stream>>
  686. , public asio::coroutine
  687. {
  688. boost::weak_ptr<impl_type> wp_;
  689. DynamicBuffer& b_;
  690. std::size_t limit_;
  691. std::size_t bytes_written_ = 0;
  692. bool some_;
  693. public:
  694. template<class Handler_>
  695. read_op(
  696. Handler_&& h,
  697. boost::shared_ptr<impl_type> const& sp,
  698. DynamicBuffer& b,
  699. std::size_t limit,
  700. bool some)
  701. : async_base<Handler,
  702. beast::executor_type<stream>>(
  703. std::forward<Handler_>(h),
  704. sp->stream().get_executor())
  705. , wp_(sp)
  706. , b_(b)
  707. , limit_(limit ? limit : (
  708. std::numeric_limits<std::size_t>::max)())
  709. , some_(some)
  710. {
  711. (*this)({}, 0, false);
  712. }
  713. void operator()(
  714. error_code ec = {},
  715. std::size_t bytes_transferred = 0,
  716. bool cont = true)
  717. {
  718. using beast::detail::clamp;
  719. auto sp = wp_.lock();
  720. if(! sp)
  721. {
  722. BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
  723. bytes_written_ = 0;
  724. return this->complete(cont, ec, bytes_written_);
  725. }
  726. auto& impl = *sp;
  727. using mutable_buffers_type = typename
  728. DynamicBuffer::mutable_buffers_type;
  729. BOOST_ASIO_CORO_REENTER(*this)
  730. {
  731. do
  732. {
  733. // VFALCO TODO use boost::beast::bind_continuation
  734. BOOST_ASIO_CORO_YIELD
  735. {
  736. auto mb = beast::detail::dynamic_buffer_prepare(b_,
  737. clamp(impl.read_size_hint_db(b_), limit_),
  738. ec, error::buffer_overflow);
  739. if(impl.check_stop_now(ec))
  740. goto upcall;
  741. BOOST_ASIO_HANDLER_LOCATION((
  742. __FILE__, __LINE__,
  743. "websocket::async_read"));
  744. read_some_op<read_op, mutable_buffers_type>(
  745. std::move(*this), sp, *mb);
  746. }
  747. b_.commit(bytes_transferred);
  748. bytes_written_ += bytes_transferred;
  749. if(ec)
  750. goto upcall;
  751. }
  752. while(! some_ && ! impl.rd_done);
  753. upcall:
  754. this->complete(cont, ec, bytes_written_);
  755. }
  756. }
  757. };
  758. template<class NextLayer, bool deflateSupported>
  759. struct stream<NextLayer, deflateSupported>::
  760. run_read_some_op
  761. {
  762. boost::shared_ptr<impl_type> const& self;
  763. using executor_type = typename stream::executor_type;
  764. executor_type
  765. get_executor() const noexcept
  766. {
  767. return self->stream().get_executor();
  768. }
  769. template<
  770. class ReadHandler,
  771. class MutableBufferSequence>
  772. void
  773. operator()(
  774. ReadHandler&& h,
  775. MutableBufferSequence const& b)
  776. {
  777. // If you get an error on the following line it means
  778. // that your handler does not meet the documented type
  779. // requirements for the handler.
  780. static_assert(
  781. beast::detail::is_invocable<ReadHandler,
  782. void(error_code, std::size_t)>::value,
  783. "ReadHandler type requirements not met");
  784. read_some_op<
  785. typename std::decay<ReadHandler>::type,
  786. MutableBufferSequence>(
  787. std::forward<ReadHandler>(h),
  788. self,
  789. b);
  790. }
  791. };
  792. template<class NextLayer, bool deflateSupported>
  793. struct stream<NextLayer, deflateSupported>::
  794. run_read_op
  795. {
  796. boost::shared_ptr<impl_type> const& self;
  797. using executor_type = typename stream::executor_type;
  798. executor_type
  799. get_executor() const noexcept
  800. {
  801. return self->stream().get_executor();
  802. }
  803. template<
  804. class ReadHandler,
  805. class DynamicBuffer>
  806. void
  807. operator()(
  808. ReadHandler&& h,
  809. DynamicBuffer* b,
  810. std::size_t limit,
  811. bool some)
  812. {
  813. // If you get an error on the following line it means
  814. // that your handler does not meet the documented type
  815. // requirements for the handler.
  816. static_assert(
  817. beast::detail::is_invocable<ReadHandler,
  818. void(error_code, std::size_t)>::value,
  819. "ReadHandler type requirements not met");
  820. read_op<
  821. typename std::decay<ReadHandler>::type,
  822. DynamicBuffer>(
  823. std::forward<ReadHandler>(h),
  824. self,
  825. *b,
  826. limit,
  827. some);
  828. }
  829. };
  830. //------------------------------------------------------------------------------
  831. template<class NextLayer, bool deflateSupported>
  832. template<class DynamicBuffer>
  833. std::size_t
  834. stream<NextLayer, deflateSupported>::
  835. read(DynamicBuffer& buffer)
  836. {
  837. static_assert(is_sync_stream<next_layer_type>::value,
  838. "SyncStream type requirements not met");
  839. static_assert(
  840. net::is_dynamic_buffer<DynamicBuffer>::value,
  841. "DynamicBuffer type requirements not met");
  842. error_code ec;
  843. auto const bytes_written = read(buffer, ec);
  844. if(ec)
  845. BOOST_THROW_EXCEPTION(system_error{ec});
  846. return bytes_written;
  847. }
  848. template<class NextLayer, bool deflateSupported>
  849. template<class DynamicBuffer>
  850. std::size_t
  851. stream<NextLayer, deflateSupported>::
  852. read(DynamicBuffer& buffer, error_code& ec)
  853. {
  854. static_assert(is_sync_stream<next_layer_type>::value,
  855. "SyncStream type requirements not met");
  856. static_assert(
  857. net::is_dynamic_buffer<DynamicBuffer>::value,
  858. "DynamicBuffer type requirements not met");
  859. std::size_t bytes_written = 0;
  860. do
  861. {
  862. bytes_written += read_some(buffer, 0, ec);
  863. if(ec)
  864. return bytes_written;
  865. }
  866. while(! is_message_done());
  867. return bytes_written;
  868. }
  869. template<class NextLayer, bool deflateSupported>
  870. template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  871. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  872. stream<NextLayer, deflateSupported>::
  873. async_read(DynamicBuffer& buffer, ReadHandler&& handler)
  874. {
  875. static_assert(is_async_stream<next_layer_type>::value,
  876. "AsyncStream type requirements not met");
  877. static_assert(
  878. net::is_dynamic_buffer<DynamicBuffer>::value,
  879. "DynamicBuffer type requirements not met");
  880. return net::async_initiate<
  881. ReadHandler,
  882. void(error_code, std::size_t)>(
  883. run_read_op{impl_},
  884. handler,
  885. &buffer,
  886. 0,
  887. false);
  888. }
  889. //------------------------------------------------------------------------------
  890. template<class NextLayer, bool deflateSupported>
  891. template<class DynamicBuffer>
  892. std::size_t
  893. stream<NextLayer, deflateSupported>::
  894. read_some(
  895. DynamicBuffer& buffer,
  896. std::size_t limit)
  897. {
  898. static_assert(is_sync_stream<next_layer_type>::value,
  899. "SyncStream type requirements not met");
  900. static_assert(
  901. net::is_dynamic_buffer<DynamicBuffer>::value,
  902. "DynamicBuffer type requirements not met");
  903. error_code ec;
  904. auto const bytes_written =
  905. read_some(buffer, limit, ec);
  906. if(ec)
  907. BOOST_THROW_EXCEPTION(system_error{ec});
  908. return bytes_written;
  909. }
  910. template<class NextLayer, bool deflateSupported>
  911. template<class DynamicBuffer>
  912. std::size_t
  913. stream<NextLayer, deflateSupported>::
  914. read_some(
  915. DynamicBuffer& buffer,
  916. std::size_t limit,
  917. error_code& ec)
  918. {
  919. static_assert(is_sync_stream<next_layer_type>::value,
  920. "SyncStream type requirements not met");
  921. static_assert(
  922. net::is_dynamic_buffer<DynamicBuffer>::value,
  923. "DynamicBuffer type requirements not met");
  924. using beast::detail::clamp;
  925. if(! limit)
  926. limit = (std::numeric_limits<std::size_t>::max)();
  927. auto const size =
  928. clamp(impl_->read_size_hint_db(buffer), limit);
  929. BOOST_ASSERT(size > 0);
  930. auto mb = beast::detail::dynamic_buffer_prepare(
  931. buffer, size, ec, error::buffer_overflow);
  932. if(impl_->check_stop_now(ec))
  933. return 0;
  934. auto const bytes_written = read_some(*mb, ec);
  935. buffer.commit(bytes_written);
  936. return bytes_written;
  937. }
  938. template<class NextLayer, bool deflateSupported>
  939. template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  940. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  941. stream<NextLayer, deflateSupported>::
  942. async_read_some(
  943. DynamicBuffer& buffer,
  944. std::size_t limit,
  945. ReadHandler&& handler)
  946. {
  947. static_assert(is_async_stream<next_layer_type>::value,
  948. "AsyncStream type requirements not met");
  949. static_assert(
  950. net::is_dynamic_buffer<DynamicBuffer>::value,
  951. "DynamicBuffer type requirements not met");
  952. return net::async_initiate<
  953. ReadHandler,
  954. void(error_code, std::size_t)>(
  955. run_read_op{impl_},
  956. handler,
  957. &buffer,
  958. limit,
  959. true);
  960. }
  961. //------------------------------------------------------------------------------
  962. template<class NextLayer, bool deflateSupported>
  963. template<class MutableBufferSequence>
  964. std::size_t
  965. stream<NextLayer, deflateSupported>::
  966. read_some(
  967. MutableBufferSequence const& buffers)
  968. {
  969. static_assert(is_sync_stream<next_layer_type>::value,
  970. "SyncStream type requirements not met");
  971. static_assert(net::is_mutable_buffer_sequence<
  972. MutableBufferSequence>::value,
  973. "MutableBufferSequence type requirements not met");
  974. error_code ec;
  975. auto const bytes_written = read_some(buffers, ec);
  976. if(ec)
  977. BOOST_THROW_EXCEPTION(system_error{ec});
  978. return bytes_written;
  979. }
  980. template<class NextLayer, bool deflateSupported>
  981. template<class MutableBufferSequence>
  982. std::size_t
  983. stream<NextLayer, deflateSupported>::
  984. read_some(
  985. MutableBufferSequence const& buffers,
  986. error_code& ec)
  987. {
  988. static_assert(is_sync_stream<next_layer_type>::value,
  989. "SyncStream type requirements not met");
  990. static_assert(net::is_mutable_buffer_sequence<
  991. MutableBufferSequence>::value,
  992. "MutableBufferSequence type requirements not met");
  993. using beast::detail::clamp;
  994. auto& impl = *impl_;
  995. close_code code{};
  996. std::size_t bytes_written = 0;
  997. ec = {};
  998. // Make sure the stream is open
  999. if(impl.check_stop_now(ec))
  1000. return bytes_written;
  1001. loop:
  1002. // See if we need to read a frame header. This
  1003. // condition is structured to give the decompressor
  1004. // a chance to emit the final empty deflate block
  1005. //
  1006. if(impl.rd_remain == 0 && (
  1007. ! impl.rd_fh.fin || impl.rd_done))
  1008. {
  1009. // Read frame header
  1010. error_code result;
  1011. while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
  1012. {
  1013. if(result)
  1014. {
  1015. // _Fail the WebSocket Connection_
  1016. if(result == error::message_too_big)
  1017. code = close_code::too_big;
  1018. else
  1019. code = close_code::protocol_error;
  1020. do_fail(code, result, ec);
  1021. return bytes_written;
  1022. }
  1023. auto const bytes_transferred =
  1024. impl.stream().read_some(
  1025. impl.rd_buf.prepare(read_size(
  1026. impl.rd_buf, impl.rd_buf.max_size())),
  1027. ec);
  1028. impl.rd_buf.commit(bytes_transferred);
  1029. if(impl.check_stop_now(ec))
  1030. return bytes_written;
  1031. }
  1032. // Immediately apply the mask to the portion
  1033. // of the buffer holding payload data.
  1034. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  1035. detail::mask_inplace(buffers_prefix(
  1036. clamp(impl.rd_fh.len), impl.rd_buf.data()),
  1037. impl.rd_key);
  1038. if(detail::is_control(impl.rd_fh.op))
  1039. {
  1040. // Get control frame payload
  1041. auto const b = buffers_prefix(
  1042. clamp(impl.rd_fh.len), impl.rd_buf.data());
  1043. auto const len = buffer_bytes(b);
  1044. BOOST_ASSERT(len == impl.rd_fh.len);
  1045. // Clear this otherwise the next
  1046. // frame will be considered final.
  1047. impl.rd_fh.fin = false;
  1048. // Handle ping frame
  1049. if(impl.rd_fh.op == detail::opcode::ping)
  1050. {
  1051. ping_data payload;
  1052. detail::read_ping(payload, b);
  1053. impl.rd_buf.consume(len);
  1054. if(impl.wr_close)
  1055. {
  1056. // Ignore ping when closing
  1057. goto loop;
  1058. }
  1059. if(impl.ctrl_cb)
  1060. impl.ctrl_cb(frame_type::ping, to_string_view(payload));
  1061. detail::frame_buffer fb;
  1062. impl.template write_ping<flat_static_buffer_base>(fb,
  1063. detail::opcode::pong, payload);
  1064. net::write(impl.stream(), fb.data(), ec);
  1065. if(impl.check_stop_now(ec))
  1066. return bytes_written;
  1067. goto loop;
  1068. }
  1069. // Handle pong frame
  1070. if(impl.rd_fh.op == detail::opcode::pong)
  1071. {
  1072. ping_data payload;
  1073. detail::read_ping(payload, b);
  1074. impl.rd_buf.consume(len);
  1075. if(impl.ctrl_cb)
  1076. impl.ctrl_cb(frame_type::pong, to_string_view(payload));
  1077. goto loop;
  1078. }
  1079. // Handle close frame
  1080. BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
  1081. {
  1082. BOOST_ASSERT(! impl.rd_close);
  1083. impl.rd_close = true;
  1084. close_reason cr;
  1085. detail::read_close(cr, b, result);
  1086. if(result)
  1087. {
  1088. // _Fail the WebSocket Connection_
  1089. do_fail(close_code::protocol_error,
  1090. result, ec);
  1091. return bytes_written;
  1092. }
  1093. impl.cr = cr;
  1094. impl.rd_buf.consume(len);
  1095. if(impl.ctrl_cb)
  1096. impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
  1097. BOOST_ASSERT(! impl.wr_close);
  1098. // _Start the WebSocket Closing Handshake_
  1099. do_fail(
  1100. cr.code == close_code::none ?
  1101. close_code::normal :
  1102. static_cast<close_code>(cr.code),
  1103. error::closed, ec);
  1104. return bytes_written;
  1105. }
  1106. }
  1107. if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
  1108. {
  1109. // Empty non-final frame
  1110. goto loop;
  1111. }
  1112. impl.rd_done = false;
  1113. }
  1114. else
  1115. {
  1116. ec = {};
  1117. }
  1118. if(! impl.rd_deflated())
  1119. {
  1120. if(impl.rd_remain > 0)
  1121. {
  1122. if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
  1123. (std::min)(clamp(impl.rd_remain),
  1124. buffer_bytes(buffers)))
  1125. {
  1126. // Fill the read buffer first, otherwise we
  1127. // get fewer bytes at the cost of one I/O.
  1128. impl.rd_buf.commit(impl.stream().read_some(
  1129. impl.rd_buf.prepare(read_size(impl.rd_buf,
  1130. impl.rd_buf.max_size())), ec));
  1131. if(impl.check_stop_now(ec))
  1132. return bytes_written;
  1133. if(impl.rd_fh.mask)
  1134. detail::mask_inplace(
  1135. buffers_prefix(clamp(impl.rd_remain),
  1136. impl.rd_buf.data()), impl.rd_key);
  1137. }
  1138. if(impl.rd_buf.size() > 0)
  1139. {
  1140. // Copy from the read buffer.
  1141. // The mask was already applied.
  1142. auto const bytes_transferred = net::buffer_copy(
  1143. buffers, impl.rd_buf.data(),
  1144. clamp(impl.rd_remain));
  1145. auto const mb = buffers_prefix(
  1146. bytes_transferred, buffers);
  1147. impl.rd_remain -= bytes_transferred;
  1148. if(impl.rd_op == detail::opcode::text)
  1149. {
  1150. if(! impl.rd_utf8.write(mb) ||
  1151. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  1152. ! impl.rd_utf8.finish()))
  1153. {
  1154. // _Fail the WebSocket Connection_
  1155. do_fail(close_code::bad_payload,
  1156. error::bad_frame_payload, ec);
  1157. return bytes_written;
  1158. }
  1159. }
  1160. bytes_written += bytes_transferred;
  1161. impl.rd_size += bytes_transferred;
  1162. impl.rd_buf.consume(bytes_transferred);
  1163. }
  1164. else
  1165. {
  1166. // Read into caller's buffer
  1167. BOOST_ASSERT(impl.rd_remain > 0);
  1168. BOOST_ASSERT(buffer_bytes(buffers) > 0);
  1169. BOOST_ASSERT(buffer_bytes(buffers_prefix(
  1170. clamp(impl.rd_remain), buffers)) > 0);
  1171. auto const bytes_transferred =
  1172. impl.stream().read_some(buffers_prefix(
  1173. clamp(impl.rd_remain), buffers), ec);
  1174. // VFALCO What if some bytes were written?
  1175. if(impl.check_stop_now(ec))
  1176. return bytes_written;
  1177. BOOST_ASSERT(bytes_transferred > 0);
  1178. auto const mb = buffers_prefix(
  1179. bytes_transferred, buffers);
  1180. impl.rd_remain -= bytes_transferred;
  1181. if(impl.rd_fh.mask)
  1182. detail::mask_inplace(mb, impl.rd_key);
  1183. if(impl.rd_op == detail::opcode::text)
  1184. {
  1185. if(! impl.rd_utf8.write(mb) ||
  1186. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  1187. ! impl.rd_utf8.finish()))
  1188. {
  1189. // _Fail the WebSocket Connection_
  1190. do_fail(close_code::bad_payload,
  1191. error::bad_frame_payload, ec);
  1192. return bytes_written;
  1193. }
  1194. }
  1195. bytes_written += bytes_transferred;
  1196. impl.rd_size += bytes_transferred;
  1197. }
  1198. }
  1199. BOOST_ASSERT( ! impl.rd_done );
  1200. if( impl.rd_remain == 0 && impl.rd_fh.fin )
  1201. impl.rd_done = true;
  1202. }
  1203. else
  1204. {
  1205. // Read compressed message frame payload:
  1206. // inflate even if rd_fh_.len == 0, otherwise we
  1207. // never emit the end-of-stream deflate block.
  1208. //
  1209. bool did_read = false;
  1210. buffers_suffix<MutableBufferSequence> cb(buffers);
  1211. while(buffer_bytes(cb) > 0)
  1212. {
  1213. zlib::z_params zs;
  1214. {
  1215. auto const out = beast::buffers_front(cb);
  1216. zs.next_out = out.data();
  1217. zs.avail_out = out.size();
  1218. BOOST_ASSERT(zs.avail_out > 0);
  1219. }
  1220. if(impl.rd_remain > 0)
  1221. {
  1222. if(impl.rd_buf.size() > 0)
  1223. {
  1224. // use what's there
  1225. auto const in = buffers_prefix(
  1226. clamp(impl.rd_remain), beast::buffers_front(
  1227. impl.rd_buf.data()));
  1228. zs.avail_in = in.size();
  1229. zs.next_in = in.data();
  1230. }
  1231. else if(! did_read)
  1232. {
  1233. // read new
  1234. auto const bytes_transferred =
  1235. impl.stream().read_some(
  1236. impl.rd_buf.prepare(read_size(
  1237. impl.rd_buf, impl.rd_buf.max_size())),
  1238. ec);
  1239. if(impl.check_stop_now(ec))
  1240. return bytes_written;
  1241. BOOST_ASSERT(bytes_transferred > 0);
  1242. impl.rd_buf.commit(bytes_transferred);
  1243. if(impl.rd_fh.mask)
  1244. detail::mask_inplace(
  1245. buffers_prefix(clamp(impl.rd_remain),
  1246. impl.rd_buf.data()), impl.rd_key);
  1247. auto const in = buffers_prefix(
  1248. clamp(impl.rd_remain), buffers_front(
  1249. impl.rd_buf.data()));
  1250. zs.avail_in = in.size();
  1251. zs.next_in = in.data();
  1252. did_read = true;
  1253. }
  1254. else
  1255. {
  1256. break;
  1257. }
  1258. impl.inflate(zs, ec);
  1259. if(impl.check_stop_now(ec))
  1260. return bytes_written;
  1261. impl.rd_remain -= zs.total_in;
  1262. impl.rd_buf.consume(zs.total_in);
  1263. }
  1264. else if(impl.rd_fh.fin)
  1265. {
  1266. impl.inflate_with_eb(zs, ec);
  1267. if(impl.check_stop_now(ec))
  1268. return bytes_written;
  1269. if(zs.total_out == 0)
  1270. {
  1271. impl.do_context_takeover_read(impl.role);
  1272. impl.rd_done = true;
  1273. break;
  1274. }
  1275. }
  1276. else
  1277. {
  1278. break;
  1279. }
  1280. if(impl.rd_msg_max && beast::detail::sum_exceeds(
  1281. impl.rd_size, zs.total_out, impl.rd_msg_max))
  1282. {
  1283. do_fail(close_code::too_big,
  1284. error::message_too_big, ec);
  1285. return bytes_written;
  1286. }
  1287. cb.consume(zs.total_out);
  1288. impl.rd_size += zs.total_out;
  1289. bytes_written += zs.total_out;
  1290. }
  1291. if(impl.rd_op == detail::opcode::text)
  1292. {
  1293. // check utf8
  1294. if(! impl.rd_utf8.write(beast::buffers_prefix(
  1295. bytes_written, buffers)) || (
  1296. impl.rd_done && ! impl.rd_utf8.finish()))
  1297. {
  1298. // _Fail the WebSocket Connection_
  1299. do_fail(close_code::bad_payload,
  1300. error::bad_frame_payload, ec);
  1301. return bytes_written;
  1302. }
  1303. }
  1304. }
  1305. return bytes_written;
  1306. }
  1307. template<class NextLayer, bool deflateSupported>
  1308. template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
  1309. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  1310. stream<NextLayer, deflateSupported>::
  1311. async_read_some(
  1312. MutableBufferSequence const& buffers,
  1313. ReadHandler&& handler)
  1314. {
  1315. static_assert(is_async_stream<next_layer_type>::value,
  1316. "AsyncStream type requirements not met");
  1317. static_assert(net::is_mutable_buffer_sequence<
  1318. MutableBufferSequence>::value,
  1319. "MutableBufferSequence type requirements not met");
  1320. return net::async_initiate<
  1321. ReadHandler,
  1322. void(error_code, std::size_t)>(
  1323. run_read_some_op{impl_},
  1324. handler,
  1325. buffers);
  1326. }
  1327. } // websocket
  1328. } // beast
  1329. } // boost
  1330. #endif