| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383 |
- //
- // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
- //
- // Distributed under the Boost Software License, Version 1.0. (See accompanying
- // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- //
- // Official repository: https://github.com/boostorg/beast
- //
- #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
- #define BOOST_BEAST_WEBSOCKET_IMPL_READ_IPP
- #include <boost/beast/websocket/teardown.hpp>
- #include <boost/beast/core/bind_handler.hpp>
- #include <boost/beast/core/buffers_prefix.hpp>
- #include <boost/beast/core/buffers_suffix.hpp>
- #include <boost/beast/core/flat_static_buffer.hpp>
- #include <boost/beast/core/type_traits.hpp>
- #include <boost/beast/core/detail/clamp.hpp>
- #include <boost/beast/core/detail/config.hpp>
- #include <boost/asio/associated_allocator.hpp>
- #include <boost/asio/associated_executor.hpp>
- #include <boost/asio/coroutine.hpp>
- #include <boost/asio/executor_work_guard.hpp>
- #include <boost/asio/handler_continuation_hook.hpp>
- #include <boost/asio/handler_invoke_hook.hpp>
- #include <boost/asio/post.hpp>
- #include <boost/assert.hpp>
- #include <boost/config.hpp>
- #include <boost/optional.hpp>
- #include <boost/throw_exception.hpp>
- #include <algorithm>
- #include <limits>
- #include <memory>
- namespace boost {
- namespace beast {
- namespace websocket {
- namespace detail {
- template<>
- inline
- void
- stream_base<true>::
- inflate(
- zlib::z_params& zs,
- zlib::Flush flush,
- error_code& ec)
- {
- this->pmd_->zi.write(zs, flush, ec);
- }
- template<>
- inline
- void
- stream_base<true>::
- do_context_takeover_read(role_type role)
- {
- if((role == role_type::client &&
- pmd_config_.server_no_context_takeover) ||
- (role == role_type::server &&
- pmd_config_.client_no_context_takeover))
- {
- pmd_->zi.reset();
- }
- }
- } // detail
- //------------------------------------------------------------------------------
- /* Read some message frame data.
- Also reads and handles control frames.
- */
- template<class NextLayer, bool deflateSupported>
- template<
- class MutableBufferSequence,
- class Handler>
- class stream<NextLayer, deflateSupported>::read_some_op
- : public boost::asio::coroutine
- {
- Handler h_;
- stream<NextLayer, deflateSupported>& ws_;
- boost::asio::executor_work_guard<decltype(std::declval<
- stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
- MutableBufferSequence bs_;
- buffers_suffix<MutableBufferSequence> cb_;
- std::size_t bytes_written_ = 0;
- error_code result_;
- close_code code_;
- bool did_read_ = false;
- bool cont_ = false;
- public:
- static constexpr int id = 1; // for soft_mutex
- read_some_op(read_some_op&&) = default;
- read_some_op(read_some_op const&) = delete;
- template<class DeducedHandler>
- read_some_op(
- DeducedHandler&& h,
- stream<NextLayer, deflateSupported>& ws,
- MutableBufferSequence const& bs)
- : h_(std::forward<DeducedHandler>(h))
- , ws_(ws)
- , wg_(ws_.get_executor())
- , bs_(bs)
- , cb_(bs)
- , code_(close_code::none)
- {
- }
- using allocator_type =
- boost::asio::associated_allocator_t<Handler>;
- allocator_type
- get_allocator() const noexcept
- {
- return (boost::asio::get_associated_allocator)(h_);
- }
- using executor_type = boost::asio::associated_executor_t<
- Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
- executor_type
- get_executor() const noexcept
- {
- return (boost::asio::get_associated_executor)(
- h_, ws_.get_executor());
- }
- Handler&
- handler()
- {
- return h_;
- }
- void operator()(
- error_code ec = {},
- std::size_t bytes_transferred = 0,
- bool cont = true);
- friend
- bool asio_handler_is_continuation(read_some_op* op)
- {
- using boost::asio::asio_handler_is_continuation;
- return op->cont_ || asio_handler_is_continuation(
- std::addressof(op->h_));
- }
- template<class Function>
- friend
- void asio_handler_invoke(Function&& f, read_some_op* op)
- {
- using boost::asio::asio_handler_invoke;
- asio_handler_invoke(f, std::addressof(op->h_));
- }
- };
- template<class NextLayer, bool deflateSupported>
- template<class MutableBufferSequence, class Handler>
- void
- stream<NextLayer, deflateSupported>::
- read_some_op<MutableBufferSequence, Handler>::
- operator()(
- error_code ec,
- std::size_t bytes_transferred,
- bool cont)
- {
- using beast::detail::clamp;
- using boost::asio::buffer;
- using boost::asio::buffer_size;
- cont_ = cont;
- BOOST_ASIO_CORO_REENTER(*this)
- {
- // Maybe suspend
- do_maybe_suspend:
- if(ws_.rd_block_.try_lock(this))
- {
- // Make sure the stream is not closed
- if( ws_.status_ == status::closed ||
- ws_.status_ == status::failed)
- {
- ec = boost::asio::error::operation_aborted;
- goto upcall;
- }
- }
- else
- {
- do_suspend:
- // Suspend
- BOOST_ASIO_CORO_YIELD
- ws_.paused_r_rd_.emplace(std::move(*this));
- // Acquire the read block
- ws_.rd_block_.lock(this);
- // Resume
- BOOST_ASIO_CORO_YIELD
- boost::asio::post(
- ws_.get_executor(), std::move(*this));
- BOOST_ASSERT(ws_.rd_block_.is_locked(this));
- // The only way to get read blocked is if
- // a `close_op` wrote a close frame
- BOOST_ASSERT(ws_.wr_close_);
- BOOST_ASSERT(ws_.status_ != status::open);
- ec = boost::asio::error::operation_aborted;
- goto upcall;
- }
- // if status_ == status::closing, we want to suspend
- // the read operation until the close completes,
- // then finish the read with operation_aborted.
- loop:
- BOOST_ASSERT(ws_.rd_block_.is_locked(this));
- // See if we need to read a frame header. This
- // condition is structured to give the decompressor
- // a chance to emit the final empty deflate block
- //
- if(ws_.rd_remain_ == 0 &&
- (! ws_.rd_fh_.fin || ws_.rd_done_))
- {
- // Read frame header
- while(! ws_.parse_fh(
- ws_.rd_fh_, ws_.rd_buf_, result_))
- {
- if(result_)
- {
- // _Fail the WebSocket Connection_
- if(result_ == error::message_too_big)
- code_ = close_code::too_big;
- else
- code_ = close_code::protocol_error;
- goto close;
- }
- BOOST_ASSERT(ws_.rd_block_.is_locked(this));
- BOOST_ASIO_CORO_YIELD
- ws_.stream_.async_read_some(
- ws_.rd_buf_.prepare(read_size(
- ws_.rd_buf_, ws_.rd_buf_.max_size())),
- std::move(*this));
- BOOST_ASSERT(ws_.rd_block_.is_locked(this));
- if(! ws_.check_ok(ec))
- goto upcall;
- ws_.rd_buf_.commit(bytes_transferred);
- // Allow a close operation
- // to acquire the read block
- ws_.rd_block_.unlock(this);
- if( ws_.paused_r_close_.maybe_invoke())
- {
- // Suspend
- BOOST_ASSERT(ws_.rd_block_.is_locked());
- goto do_suspend;
- }
- // Acquire read block
- ws_.rd_block_.lock(this);
- }
- // Immediately apply the mask to the portion
- // of the buffer holding payload data.
- if(ws_.rd_fh_.len > 0 && ws_.rd_fh_.mask)
- detail::mask_inplace(buffers_prefix(
- clamp(ws_.rd_fh_.len),
- ws_.rd_buf_.mutable_data()),
- ws_.rd_key_);
- if(detail::is_control(ws_.rd_fh_.op))
- {
- // Clear this otherwise the next
- // frame will be considered final.
- ws_.rd_fh_.fin = false;
- // Handle ping frame
- if(ws_.rd_fh_.op == detail::opcode::ping)
- {
- if(ws_.ctrl_cb_)
- {
- if(! cont_)
- {
- BOOST_ASIO_CORO_YIELD
- boost::asio::post(
- ws_.get_executor(),
- std::move(*this));
- BOOST_ASSERT(cont_);
- }
- }
- {
- auto const b = buffers_prefix(
- clamp(ws_.rd_fh_.len),
- ws_.rd_buf_.data());
- auto const len = buffer_size(b);
- BOOST_ASSERT(len == ws_.rd_fh_.len);
- ping_data payload;
- detail::read_ping(payload, b);
- ws_.rd_buf_.consume(len);
- // Ignore ping when closing
- if(ws_.status_ == status::closing)
- goto loop;
- if(ws_.ctrl_cb_)
- ws_.ctrl_cb_(
- frame_type::ping, payload);
- ws_.rd_fb_.reset();
- ws_.template write_ping<
- flat_static_buffer_base>(ws_.rd_fb_,
- detail::opcode::pong, payload);
- }
- // Allow a close operation
- // to acquire the read block
- ws_.rd_block_.unlock(this);
- ws_.paused_r_close_.maybe_invoke();
- // Maybe suspend
- if(! ws_.wr_block_.try_lock(this))
- {
- // Suspend
- BOOST_ASIO_CORO_YIELD
- ws_.paused_rd_.emplace(std::move(*this));
- // Acquire the write block
- ws_.wr_block_.lock(this);
- // Resume
- BOOST_ASIO_CORO_YIELD
- boost::asio::post(
- ws_.get_executor(), std::move(*this));
- BOOST_ASSERT(ws_.wr_block_.is_locked(this));
- // Make sure the stream is open
- if(! ws_.check_open(ec))
- goto upcall;
- }
- // Send pong
- BOOST_ASSERT(ws_.wr_block_.is_locked(this));
- BOOST_ASIO_CORO_YIELD
- boost::asio::async_write(ws_.stream_,
- ws_.rd_fb_.data(), std::move(*this));
- BOOST_ASSERT(ws_.wr_block_.is_locked(this));
- if(! ws_.check_ok(ec))
- goto upcall;
- ws_.wr_block_.unlock(this);
- ws_.paused_close_.maybe_invoke() ||
- ws_.paused_ping_.maybe_invoke() ||
- ws_.paused_wr_.maybe_invoke();
- goto do_maybe_suspend;
- }
- // Handle pong frame
- if(ws_.rd_fh_.op == detail::opcode::pong)
- {
- // Ignore pong when closing
- if(! ws_.wr_close_ && ws_.ctrl_cb_)
- {
- if(! cont_)
- {
- BOOST_ASIO_CORO_YIELD
- boost::asio::post(
- ws_.get_executor(),
- std::move(*this));
- BOOST_ASSERT(cont_);
- }
- }
- auto const cb = buffers_prefix(clamp(
- ws_.rd_fh_.len), ws_.rd_buf_.data());
- auto const len = buffer_size(cb);
- BOOST_ASSERT(len == ws_.rd_fh_.len);
- ping_data payload;
- detail::read_ping(payload, cb);
- ws_.rd_buf_.consume(len);
- // Ignore pong when closing
- if(! ws_.wr_close_ && ws_.ctrl_cb_)
- ws_.ctrl_cb_(frame_type::pong, payload);
- goto loop;
- }
- // Handle close frame
- BOOST_ASSERT(ws_.rd_fh_.op == detail::opcode::close);
- {
- if(ws_.ctrl_cb_)
- {
- if(! cont_)
- {
- BOOST_ASIO_CORO_YIELD
- boost::asio::post(
- ws_.get_executor(),
- std::move(*this));
- BOOST_ASSERT(cont_);
- }
- }
- auto const cb = buffers_prefix(clamp(
- ws_.rd_fh_.len), ws_.rd_buf_.data());
- auto const len = buffer_size(cb);
- BOOST_ASSERT(len == ws_.rd_fh_.len);
- BOOST_ASSERT(! ws_.rd_close_);
- ws_.rd_close_ = true;
- close_reason cr;
- detail::read_close(cr, cb, result_);
- if(result_)
- {
- // _Fail the WebSocket Connection_
- code_ = close_code::protocol_error;
- goto close;
- }
- ws_.cr_ = cr;
- ws_.rd_buf_.consume(len);
- if(ws_.ctrl_cb_)
- ws_.ctrl_cb_(frame_type::close,
- ws_.cr_.reason);
- // See if we are already closing
- if(ws_.status_ == status::closing)
- {
- // _Close the WebSocket Connection_
- BOOST_ASSERT(ws_.wr_close_);
- code_ = close_code::none;
- result_ = error::closed;
- goto close;
- }
- // _Start the WebSocket Closing Handshake_
- code_ = cr.code == close_code::none ?
- close_code::normal :
- static_cast<close_code>(cr.code);
- result_ = error::closed;
- goto close;
- }
- }
- if(ws_.rd_fh_.len == 0 && ! ws_.rd_fh_.fin)
- {
- // Empty non-final frame
- goto loop;
- }
- ws_.rd_done_ = false;
- }
- if(! ws_.rd_deflated())
- {
- if(ws_.rd_remain_ > 0)
- {
- if(ws_.rd_buf_.size() == 0 && ws_.rd_buf_.max_size() >
- (std::min)(clamp(ws_.rd_remain_),
- buffer_size(cb_)))
- {
- // Fill the read buffer first, otherwise we
- // get fewer bytes at the cost of one I/O.
- BOOST_ASIO_CORO_YIELD
- ws_.stream_.async_read_some(
- ws_.rd_buf_.prepare(read_size(
- ws_.rd_buf_, ws_.rd_buf_.max_size())),
- std::move(*this));
- if(! ws_.check_ok(ec))
- goto upcall;
- ws_.rd_buf_.commit(bytes_transferred);
- if(ws_.rd_fh_.mask)
- detail::mask_inplace(buffers_prefix(clamp(
- ws_.rd_remain_), ws_.rd_buf_.mutable_data()),
- ws_.rd_key_);
- }
- if(ws_.rd_buf_.size() > 0)
- {
- // Copy from the read buffer.
- // The mask was already applied.
- bytes_transferred = buffer_copy(cb_,
- ws_.rd_buf_.data(), clamp(ws_.rd_remain_));
- auto const mb = buffers_prefix(
- bytes_transferred, cb_);
- ws_.rd_remain_ -= bytes_transferred;
- if(ws_.rd_op_ == detail::opcode::text)
- {
- if(! ws_.rd_utf8_.write(mb) ||
- (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
- ! ws_.rd_utf8_.finish()))
- {
- // _Fail the WebSocket Connection_
- code_ = close_code::bad_payload;
- result_ = error::bad_frame_payload;
- goto close;
- }
- }
- bytes_written_ += bytes_transferred;
- ws_.rd_size_ += bytes_transferred;
- ws_.rd_buf_.consume(bytes_transferred);
- }
- else
- {
- // Read into caller's buffer
- BOOST_ASSERT(ws_.rd_remain_ > 0);
- BOOST_ASSERT(buffer_size(cb_) > 0);
- BOOST_ASSERT(buffer_size(buffers_prefix(
- clamp(ws_.rd_remain_), cb_)) > 0);
- BOOST_ASIO_CORO_YIELD
- ws_.stream_.async_read_some(buffers_prefix(
- clamp(ws_.rd_remain_), cb_), std::move(*this));
- if(! ws_.check_ok(ec))
- goto upcall;
- BOOST_ASSERT(bytes_transferred > 0);
- auto const mb = buffers_prefix(
- bytes_transferred, cb_);
- ws_.rd_remain_ -= bytes_transferred;
- if(ws_.rd_fh_.mask)
- detail::mask_inplace(mb, ws_.rd_key_);
- if(ws_.rd_op_ == detail::opcode::text)
- {
- if(! ws_.rd_utf8_.write(mb) ||
- (ws_.rd_remain_ == 0 && ws_.rd_fh_.fin &&
- ! ws_.rd_utf8_.finish()))
- {
- // _Fail the WebSocket Connection_
- code_ = close_code::bad_payload;
- result_ = error::bad_frame_payload;
- goto close;
- }
- }
- bytes_written_ += bytes_transferred;
- ws_.rd_size_ += bytes_transferred;
- }
- }
- ws_.rd_done_ = ws_.rd_remain_ == 0 && ws_.rd_fh_.fin;
- }
- else
- {
- // Read compressed message frame payload:
- // inflate even if rd_fh_.len == 0, otherwise we
- // never emit the end-of-stream deflate block.
- while(buffer_size(cb_) > 0)
- {
- if( ws_.rd_remain_ > 0 &&
- ws_.rd_buf_.size() == 0 &&
- ! did_read_)
- {
- // read new
- BOOST_ASIO_CORO_YIELD
- ws_.stream_.async_read_some(
- ws_.rd_buf_.prepare(read_size(
- ws_.rd_buf_, ws_.rd_buf_.max_size())),
- std::move(*this));
- if(! ws_.check_ok(ec))
- goto upcall;
- BOOST_ASSERT(bytes_transferred > 0);
- ws_.rd_buf_.commit(bytes_transferred);
- if(ws_.rd_fh_.mask)
- detail::mask_inplace(
- buffers_prefix(clamp(ws_.rd_remain_),
- ws_.rd_buf_.mutable_data()), ws_.rd_key_);
- did_read_ = true;
- }
- zlib::z_params zs;
- {
- auto const out = buffers_front(cb_);
- zs.next_out = out.data();
- zs.avail_out = out.size();
- BOOST_ASSERT(zs.avail_out > 0);
- }
- if(ws_.rd_remain_ > 0)
- {
- if(ws_.rd_buf_.size() > 0)
- {
- // use what's there
- auto const in = buffers_prefix(
- clamp(ws_.rd_remain_), buffers_front(
- ws_.rd_buf_.data()));
- zs.avail_in = in.size();
- zs.next_in = in.data();
- }
- else
- {
- break;
- }
- }
- else if(ws_.rd_fh_.fin)
- {
- // append the empty block codes
- static std::uint8_t constexpr
- empty_block[4] = {
- 0x00, 0x00, 0xff, 0xff };
- zs.next_in = empty_block;
- zs.avail_in = sizeof(empty_block);
- ws_.inflate(zs, zlib::Flush::sync, ec);
- if(! ec)
- {
- // https://github.com/madler/zlib/issues/280
- if(zs.total_out > 0)
- ec = error::partial_deflate_block;
- }
- if(! ws_.check_ok(ec))
- goto upcall;
- ws_.do_context_takeover_read(ws_.role_);
- ws_.rd_done_ = true;
- break;
- }
- else
- {
- break;
- }
- ws_.inflate(zs, zlib::Flush::sync, ec);
- if(! ws_.check_ok(ec))
- goto upcall;
- if(ws_.rd_msg_max_ && beast::detail::sum_exceeds(
- ws_.rd_size_, zs.total_out, ws_.rd_msg_max_))
- {
- // _Fail the WebSocket Connection_
- code_ = close_code::too_big;
- result_ = error::message_too_big;
- goto close;
- }
- cb_.consume(zs.total_out);
- ws_.rd_size_ += zs.total_out;
- ws_.rd_remain_ -= zs.total_in;
- ws_.rd_buf_.consume(zs.total_in);
- bytes_written_ += zs.total_out;
- }
- if(ws_.rd_op_ == detail::opcode::text)
- {
- // check utf8
- if(! ws_.rd_utf8_.write(
- buffers_prefix(bytes_written_, bs_)) || (
- ws_.rd_done_ && ! ws_.rd_utf8_.finish()))
- {
- // _Fail the WebSocket Connection_
- code_ = close_code::bad_payload;
- result_ = error::bad_frame_payload;
- goto close;
- }
- }
- }
- goto upcall;
- close:
- // Try to acquire the write block
- if(! ws_.wr_block_.try_lock(this))
- {
- // Suspend
- BOOST_ASIO_CORO_YIELD
- ws_.paused_rd_.emplace(std::move(*this));
- // Acquire the write block
- ws_.wr_block_.lock(this);
- // Resume
- BOOST_ASIO_CORO_YIELD
- boost::asio::post(
- ws_.get_executor(), std::move(*this));
- BOOST_ASSERT(ws_.wr_block_.is_locked(this));
- // Make sure the stream is open
- if(! ws_.check_open(ec))
- goto upcall;
- }
- // Set the status
- ws_.status_ = status::closing;
- if(! ws_.wr_close_)
- {
- ws_.wr_close_ = true;
- // Serialize close frame
- ws_.rd_fb_.reset();
- ws_.template write_close<
- flat_static_buffer_base>(
- ws_.rd_fb_, code_);
- // Send close frame
- BOOST_ASSERT(ws_.wr_block_.is_locked(this));
- BOOST_ASIO_CORO_YIELD
- boost::asio::async_write(
- ws_.stream_, ws_.rd_fb_.data(),
- std::move(*this));
- BOOST_ASSERT(ws_.wr_block_.is_locked(this));
- if(! ws_.check_ok(ec))
- goto upcall;
- }
- // Teardown
- using beast::websocket::async_teardown;
- BOOST_ASSERT(ws_.wr_block_.is_locked(this));
- BOOST_ASIO_CORO_YIELD
- async_teardown(ws_.role_,
- ws_.stream_, std::move(*this));
- BOOST_ASSERT(ws_.wr_block_.is_locked(this));
- if(ec == boost::asio::error::eof)
- {
- // Rationale:
- // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
- ec.assign(0, ec.category());
- }
- if(! ec)
- ec = result_;
- if(ec && ec != error::closed)
- ws_.status_ = status::failed;
- else
- ws_.status_ = status::closed;
- ws_.close();
- upcall:
- ws_.rd_block_.try_unlock(this);
- ws_.paused_r_close_.maybe_invoke();
- if(ws_.wr_block_.try_unlock(this))
- ws_.paused_close_.maybe_invoke() ||
- ws_.paused_ping_.maybe_invoke() ||
- ws_.paused_wr_.maybe_invoke();
- if(! cont_)
- {
- BOOST_ASIO_CORO_YIELD
- boost::asio::post(
- ws_.get_executor(),
- bind_handler(std::move(*this),
- ec, bytes_written_));
- }
- h_(ec, bytes_written_);
- }
- }
- //------------------------------------------------------------------------------
- template<class NextLayer, bool deflateSupported>
- template<
- class DynamicBuffer,
- class Handler>
- class stream<NextLayer, deflateSupported>::read_op
- : public boost::asio::coroutine
- {
- Handler h_;
- stream<NextLayer, deflateSupported>& ws_;
- boost::asio::executor_work_guard<decltype(std::declval<
- stream<NextLayer, deflateSupported>&>().get_executor())> wg_;
- DynamicBuffer& b_;
- std::size_t limit_;
- std::size_t bytes_written_ = 0;
- bool some_;
- public:
- using allocator_type =
- boost::asio::associated_allocator_t<Handler>;
- read_op(read_op&&) = default;
- read_op(read_op const&) = delete;
- template<class DeducedHandler>
- read_op(
- DeducedHandler&& h,
- stream<NextLayer, deflateSupported>& ws,
- DynamicBuffer& b,
- std::size_t limit,
- bool some)
- : h_(std::forward<DeducedHandler>(h))
- , ws_(ws)
- , wg_(ws_.get_executor())
- , b_(b)
- , limit_(limit ? limit : (
- std::numeric_limits<std::size_t>::max)())
- , some_(some)
- {
- }
- allocator_type
- get_allocator() const noexcept
- {
- return (boost::asio::get_associated_allocator)(h_);
- }
- using executor_type = boost::asio::associated_executor_t<
- Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
- executor_type
- get_executor() const noexcept
- {
- return (boost::asio::get_associated_executor)(
- h_, ws_.get_executor());
- }
- void operator()(
- error_code ec = {},
- std::size_t bytes_transferred = 0);
- friend
- bool asio_handler_is_continuation(read_op* op)
- {
- using boost::asio::asio_handler_is_continuation;
- return asio_handler_is_continuation(
- std::addressof(op->h_));
- }
- template<class Function>
- friend
- void asio_handler_invoke(Function&& f, read_op* op)
- {
- using boost::asio::asio_handler_invoke;
- asio_handler_invoke(f, std::addressof(op->h_));
- }
- };
- template<class NextLayer, bool deflateSupported>
- template<class DynamicBuffer, class Handler>
- void
- stream<NextLayer, deflateSupported>::
- read_op<DynamicBuffer, Handler>::
- operator()(
- error_code ec,
- std::size_t bytes_transferred)
- {
- using beast::detail::clamp;
- using buffers_type = typename
- DynamicBuffer::mutable_buffers_type;
- boost::optional<buffers_type> mb;
- BOOST_ASIO_CORO_REENTER(*this)
- {
- do
- {
- try
- {
- mb.emplace(b_.prepare(clamp(
- ws_.read_size_hint(b_), limit_)));
- }
- catch(std::length_error const&)
- {
- ec = error::buffer_overflow;
- }
- if(ec)
- {
- BOOST_ASIO_CORO_YIELD
- boost::asio::post(
- ws_.get_executor(),
- bind_handler(std::move(*this),
- error::buffer_overflow, 0));
- break;
- }
- BOOST_ASIO_CORO_YIELD
- read_some_op<buffers_type, read_op>{
- std::move(*this), ws_, *mb}(
- {}, 0, false);
- if(ec)
- break;
- b_.commit(bytes_transferred);
- bytes_written_ += bytes_transferred;
- }
- while(! some_ && ! ws_.is_message_done());
- h_(ec, bytes_written_);
- }
- }
- //------------------------------------------------------------------------------
- template<class NextLayer, bool deflateSupported>
- template<class DynamicBuffer>
- std::size_t
- stream<NextLayer, deflateSupported>::
- read(DynamicBuffer& buffer)
- {
- static_assert(is_sync_stream<next_layer_type>::value,
- "SyncStream requirements not met");
- static_assert(
- boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
- "DynamicBuffer requirements not met");
- error_code ec;
- auto const bytes_written = read(buffer, ec);
- if(ec)
- BOOST_THROW_EXCEPTION(system_error{ec});
- return bytes_written;
- }
- template<class NextLayer, bool deflateSupported>
- template<class DynamicBuffer>
- std::size_t
- stream<NextLayer, deflateSupported>::
- read(DynamicBuffer& buffer, error_code& ec)
- {
- static_assert(is_sync_stream<next_layer_type>::value,
- "SyncStream requirements not met");
- static_assert(
- boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
- "DynamicBuffer requirements not met");
- std::size_t bytes_written = 0;
- do
- {
- bytes_written += read_some(buffer, 0, ec);
- if(ec)
- return bytes_written;
- }
- while(! is_message_done());
- return bytes_written;
- }
- template<class NextLayer, bool deflateSupported>
- template<class DynamicBuffer, class ReadHandler>
- BOOST_ASIO_INITFN_RESULT_TYPE(
- ReadHandler, void(error_code, std::size_t))
- stream<NextLayer, deflateSupported>::
- async_read(DynamicBuffer& buffer, ReadHandler&& handler)
- {
- static_assert(is_async_stream<next_layer_type>::value,
- "AsyncStream requirements not met");
- static_assert(
- boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
- "DynamicBuffer requirements not met");
- BOOST_BEAST_HANDLER_INIT(
- ReadHandler, void(error_code, std::size_t));
- read_op<
- DynamicBuffer,
- BOOST_ASIO_HANDLER_TYPE(
- ReadHandler, void(error_code, std::size_t))>{
- std::move(init.completion_handler),
- *this,
- buffer,
- 0,
- false}();
- return init.result.get();
- }
- //------------------------------------------------------------------------------
- template<class NextLayer, bool deflateSupported>
- template<class DynamicBuffer>
- std::size_t
- stream<NextLayer, deflateSupported>::
- read_some(
- DynamicBuffer& buffer,
- std::size_t limit)
- {
- static_assert(is_sync_stream<next_layer_type>::value,
- "SyncStream requirements not met");
- static_assert(
- boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
- "DynamicBuffer requirements not met");
- error_code ec;
- auto const bytes_written =
- read_some(buffer, limit, ec);
- if(ec)
- BOOST_THROW_EXCEPTION(system_error{ec});
- return bytes_written;
- }
- template<class NextLayer, bool deflateSupported>
- template<class DynamicBuffer>
- std::size_t
- stream<NextLayer, deflateSupported>::
- read_some(
- DynamicBuffer& buffer,
- std::size_t limit,
- error_code& ec)
- {
- static_assert(is_sync_stream<next_layer_type>::value,
- "SyncStream requirements not met");
- static_assert(
- boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
- "DynamicBuffer requirements not met");
- using beast::detail::clamp;
- if(! limit)
- limit = (std::numeric_limits<std::size_t>::max)();
- auto const size =
- clamp(read_size_hint(buffer), limit);
- BOOST_ASSERT(size > 0);
- boost::optional<typename
- DynamicBuffer::mutable_buffers_type> mb;
- try
- {
- mb.emplace(buffer.prepare(size));
- }
- catch(std::length_error const&)
- {
- ec = error::buffer_overflow;
- return 0;
- }
- auto const bytes_written = read_some(*mb, ec);
- buffer.commit(bytes_written);
- return bytes_written;
- }
- template<class NextLayer, bool deflateSupported>
- template<class DynamicBuffer, class ReadHandler>
- BOOST_ASIO_INITFN_RESULT_TYPE(
- ReadHandler, void(error_code, std::size_t))
- stream<NextLayer, deflateSupported>::
- async_read_some(
- DynamicBuffer& buffer,
- std::size_t limit,
- ReadHandler&& handler)
- {
- static_assert(is_async_stream<next_layer_type>::value,
- "AsyncStream requirements not met");
- static_assert(
- boost::asio::is_dynamic_buffer<DynamicBuffer>::value,
- "DynamicBuffer requirements not met");
- BOOST_BEAST_HANDLER_INIT(
- ReadHandler, void(error_code, std::size_t));
- read_op<
- DynamicBuffer,
- BOOST_ASIO_HANDLER_TYPE(
- ReadHandler, void(error_code, std::size_t))>{
- std::move(init.completion_handler),
- *this,
- buffer,
- limit,
- true}({}, 0);
- return init.result.get();
- }
- //------------------------------------------------------------------------------
- template<class NextLayer, bool deflateSupported>
- template<class MutableBufferSequence>
- std::size_t
- stream<NextLayer, deflateSupported>::
- read_some(
- MutableBufferSequence const& buffers)
- {
- static_assert(is_sync_stream<next_layer_type>::value,
- "SyncStream requirements not met");
- static_assert(boost::asio::is_mutable_buffer_sequence<
- MutableBufferSequence>::value,
- "MutableBufferSequence requirements not met");
- error_code ec;
- auto const bytes_written = read_some(buffers, ec);
- if(ec)
- BOOST_THROW_EXCEPTION(system_error{ec});
- return bytes_written;
- }
- template<class NextLayer, bool deflateSupported>
- template<class MutableBufferSequence>
- std::size_t
- stream<NextLayer, deflateSupported>::
- read_some(
- MutableBufferSequence const& buffers,
- error_code& ec)
- {
- static_assert(is_sync_stream<next_layer_type>::value,
- "SyncStream requirements not met");
- static_assert(boost::asio::is_mutable_buffer_sequence<
- MutableBufferSequence>::value,
- "MutableBufferSequence requirements not met");
- using beast::detail::clamp;
- using boost::asio::buffer;
- using boost::asio::buffer_size;
- close_code code{};
- std::size_t bytes_written = 0;
- ec.assign(0, ec.category());
- // Make sure the stream is open
- if(! check_open(ec))
- return 0;
- loop:
- // See if we need to read a frame header. This
- // condition is structured to give the decompressor
- // a chance to emit the final empty deflate block
- //
- if(rd_remain_ == 0 && (! rd_fh_.fin || rd_done_))
- {
- // Read frame header
- error_code result;
- while(! parse_fh(rd_fh_, rd_buf_, result))
- {
- if(result)
- {
- // _Fail the WebSocket Connection_
- if(result == error::message_too_big)
- code = close_code::too_big;
- else
- code = close_code::protocol_error;
- do_fail(code, result, ec);
- return bytes_written;
- }
- auto const bytes_transferred =
- stream_.read_some(
- rd_buf_.prepare(read_size(
- rd_buf_, rd_buf_.max_size())),
- ec);
- if(! check_ok(ec))
- return bytes_written;
- rd_buf_.commit(bytes_transferred);
- }
- // Immediately apply the mask to the portion
- // of the buffer holding payload data.
- if(rd_fh_.len > 0 && rd_fh_.mask)
- detail::mask_inplace(buffers_prefix(
- clamp(rd_fh_.len), rd_buf_.mutable_data()),
- rd_key_);
- if(detail::is_control(rd_fh_.op))
- {
- // Get control frame payload
- auto const b = buffers_prefix(
- clamp(rd_fh_.len), rd_buf_.data());
- auto const len = buffer_size(b);
- BOOST_ASSERT(len == rd_fh_.len);
- // Clear this otherwise the next
- // frame will be considered final.
- rd_fh_.fin = false;
- // Handle ping frame
- if(rd_fh_.op == detail::opcode::ping)
- {
- ping_data payload;
- detail::read_ping(payload, b);
- rd_buf_.consume(len);
- if(wr_close_)
- {
- // Ignore ping when closing
- goto loop;
- }
- if(ctrl_cb_)
- ctrl_cb_(frame_type::ping, payload);
- detail::frame_buffer fb;
- write_ping<flat_static_buffer_base>(fb,
- detail::opcode::pong, payload);
- boost::asio::write(stream_, fb.data(), ec);
- if(! check_ok(ec))
- return bytes_written;
- goto loop;
- }
- // Handle pong frame
- if(rd_fh_.op == detail::opcode::pong)
- {
- ping_data payload;
- detail::read_ping(payload, b);
- rd_buf_.consume(len);
- if(ctrl_cb_)
- ctrl_cb_(frame_type::pong, payload);
- goto loop;
- }
- // Handle close frame
- BOOST_ASSERT(rd_fh_.op == detail::opcode::close);
- {
- BOOST_ASSERT(! rd_close_);
- rd_close_ = true;
- close_reason cr;
- detail::read_close(cr, b, result);
- if(result)
- {
- // _Fail the WebSocket Connection_
- do_fail(close_code::protocol_error,
- result, ec);
- return bytes_written;
- }
- cr_ = cr;
- rd_buf_.consume(len);
- if(ctrl_cb_)
- ctrl_cb_(frame_type::close, cr_.reason);
- BOOST_ASSERT(! wr_close_);
- // _Start the WebSocket Closing Handshake_
- do_fail(
- cr.code == close_code::none ?
- close_code::normal :
- static_cast<close_code>(cr.code),
- error::closed, ec);
- return bytes_written;
- }
- }
- if(rd_fh_.len == 0 && ! rd_fh_.fin)
- {
- // Empty non-final frame
- goto loop;
- }
- rd_done_ = false;
- }
- else
- {
- ec.assign(0, ec.category());
- }
- if(! this->rd_deflated())
- {
- if(rd_remain_ > 0)
- {
- if(rd_buf_.size() == 0 && rd_buf_.max_size() >
- (std::min)(clamp(rd_remain_),
- buffer_size(buffers)))
- {
- // Fill the read buffer first, otherwise we
- // get fewer bytes at the cost of one I/O.
- rd_buf_.commit(stream_.read_some(
- rd_buf_.prepare(read_size(rd_buf_,
- rd_buf_.max_size())), ec));
- if(! check_ok(ec))
- return bytes_written;
- if(rd_fh_.mask)
- detail::mask_inplace(
- buffers_prefix(clamp(rd_remain_),
- rd_buf_.mutable_data()), rd_key_);
- }
- if(rd_buf_.size() > 0)
- {
- // Copy from the read buffer.
- // The mask was already applied.
- auto const bytes_transferred =
- buffer_copy(buffers, rd_buf_.data(),
- clamp(rd_remain_));
- auto const mb = buffers_prefix(
- bytes_transferred, buffers);
- rd_remain_ -= bytes_transferred;
- if(rd_op_ == detail::opcode::text)
- {
- if(! rd_utf8_.write(mb) ||
- (rd_remain_ == 0 && rd_fh_.fin &&
- ! rd_utf8_.finish()))
- {
- // _Fail the WebSocket Connection_
- do_fail(close_code::bad_payload,
- error::bad_frame_payload, ec);
- return bytes_written;
- }
- }
- bytes_written += bytes_transferred;
- rd_size_ += bytes_transferred;
- rd_buf_.consume(bytes_transferred);
- }
- else
- {
- // Read into caller's buffer
- BOOST_ASSERT(rd_remain_ > 0);
- BOOST_ASSERT(buffer_size(buffers) > 0);
- BOOST_ASSERT(buffer_size(buffers_prefix(
- clamp(rd_remain_), buffers)) > 0);
- auto const bytes_transferred =
- stream_.read_some(buffers_prefix(
- clamp(rd_remain_), buffers), ec);
- if(! check_ok(ec))
- return bytes_written;
- BOOST_ASSERT(bytes_transferred > 0);
- auto const mb = buffers_prefix(
- bytes_transferred, buffers);
- rd_remain_ -= bytes_transferred;
- if(rd_fh_.mask)
- detail::mask_inplace(mb, rd_key_);
- if(rd_op_ == detail::opcode::text)
- {
- if(! rd_utf8_.write(mb) ||
- (rd_remain_ == 0 && rd_fh_.fin &&
- ! rd_utf8_.finish()))
- {
- // _Fail the WebSocket Connection_
- do_fail(close_code::bad_payload,
- error::bad_frame_payload, ec);
- return bytes_written;
- }
- }
- bytes_written += bytes_transferred;
- rd_size_ += bytes_transferred;
- }
- }
- rd_done_ = rd_remain_ == 0 && rd_fh_.fin;
- }
- else
- {
- // Read compressed message frame payload:
- // inflate even if rd_fh_.len == 0, otherwise we
- // never emit the end-of-stream deflate block.
- //
- bool did_read = false;
- buffers_suffix<MutableBufferSequence> cb{buffers};
- while(buffer_size(cb) > 0)
- {
- zlib::z_params zs;
- {
- auto const out = buffers_front(cb);
- zs.next_out = out.data();
- zs.avail_out = out.size();
- BOOST_ASSERT(zs.avail_out > 0);
- }
- if(rd_remain_ > 0)
- {
- if(rd_buf_.size() > 0)
- {
- // use what's there
- auto const in = buffers_prefix(
- clamp(rd_remain_), buffers_front(
- rd_buf_.data()));
- zs.avail_in = in.size();
- zs.next_in = in.data();
- }
- else if(! did_read)
- {
- // read new
- auto const bytes_transferred =
- stream_.read_some(
- rd_buf_.prepare(read_size(
- rd_buf_, rd_buf_.max_size())),
- ec);
- if(! check_ok(ec))
- return bytes_written;
- BOOST_ASSERT(bytes_transferred > 0);
- rd_buf_.commit(bytes_transferred);
- if(rd_fh_.mask)
- detail::mask_inplace(
- buffers_prefix(clamp(rd_remain_),
- rd_buf_.mutable_data()), rd_key_);
- auto const in = buffers_prefix(
- clamp(rd_remain_), buffers_front(
- rd_buf_.data()));
- zs.avail_in = in.size();
- zs.next_in = in.data();
- did_read = true;
- }
- else
- {
- break;
- }
- }
- else if(rd_fh_.fin)
- {
- // append the empty block codes
- static std::uint8_t constexpr
- empty_block[4] = {
- 0x00, 0x00, 0xff, 0xff };
- zs.next_in = empty_block;
- zs.avail_in = sizeof(empty_block);
- this->inflate(zs, zlib::Flush::sync, ec);
- if(! ec)
- {
- // https://github.com/madler/zlib/issues/280
- if(zs.total_out > 0)
- ec = error::partial_deflate_block;
- }
- if(! check_ok(ec))
- return bytes_written;
- this->do_context_takeover_read(role_);
- rd_done_ = true;
- break;
- }
- else
- {
- break;
- }
- this->inflate(zs, zlib::Flush::sync, ec);
- if(! check_ok(ec))
- return bytes_written;
- if(rd_msg_max_ && beast::detail::sum_exceeds(
- rd_size_, zs.total_out, rd_msg_max_))
- {
- do_fail(close_code::too_big,
- error::message_too_big, ec);
- return bytes_written;
- }
- cb.consume(zs.total_out);
- rd_size_ += zs.total_out;
- rd_remain_ -= zs.total_in;
- rd_buf_.consume(zs.total_in);
- bytes_written += zs.total_out;
- }
- if(rd_op_ == detail::opcode::text)
- {
- // check utf8
- if(! rd_utf8_.write(
- buffers_prefix(bytes_written, buffers)) || (
- rd_done_ && ! rd_utf8_.finish()))
- {
- // _Fail the WebSocket Connection_
- do_fail(close_code::bad_payload,
- error::bad_frame_payload, ec);
- return bytes_written;
- }
- }
- }
- return bytes_written;
- }
- template<class NextLayer, bool deflateSupported>
- template<class MutableBufferSequence, class ReadHandler>
- BOOST_ASIO_INITFN_RESULT_TYPE(
- ReadHandler, void(error_code, std::size_t))
- stream<NextLayer, deflateSupported>::
- async_read_some(
- MutableBufferSequence const& buffers,
- ReadHandler&& handler)
- {
- static_assert(is_async_stream<next_layer_type>::value,
- "AsyncStream requirements not met");
- static_assert(boost::asio::is_mutable_buffer_sequence<
- MutableBufferSequence>::value,
- "MutableBufferSequence requirements not met");
- BOOST_BEAST_HANDLER_INIT(
- ReadHandler, void(error_code, std::size_t));
- read_some_op<MutableBufferSequence, BOOST_ASIO_HANDLER_TYPE(
- ReadHandler, void(error_code, std::size_t))>{
- std::move(init.completion_handler), *this, buffers}(
- {}, 0, false);
- return init.result.get();
- }
- } // websocket
- } // beast
- } // boost
- #endif
|