write_op.hpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_WRITE_OP_HPP
  8. #define BOOST_MQTT5_WRITE_OP_HPP
  9. #include <boost/mqtt5/detail/async_traits.hpp>
  10. #include <boost/asio/associated_allocator.hpp>
  11. #include <boost/asio/associated_executor.hpp>
  12. #include <boost/asio/error.hpp>
  13. #include <boost/asio/post.hpp>
  14. #include <boost/asio/prepend.hpp>
  15. #include <boost/asio/write.hpp>
  16. #include <boost/system/error_code.hpp>
  17. namespace boost::mqtt5::detail {
  18. template <typename Owner, typename Handler>
  19. class write_op {
  20. struct on_write {};
  21. struct on_reconnect {};
  22. Owner& _owner;
  23. using handler_type = Handler;
  24. handler_type _handler;
  25. public:
  26. write_op(Owner& owner, Handler&& handler) :
  27. _owner(owner), _handler(std::move(handler))
  28. {}
  29. write_op(write_op&&) = default;
  30. write_op(const write_op&) = delete;
  31. write_op& operator=(write_op&&) = default;
  32. write_op& operator=(const write_op&) = delete;
  33. using allocator_type = asio::associated_allocator_t<handler_type>;
  34. allocator_type get_allocator() const noexcept {
  35. return asio::get_associated_allocator(_handler);
  36. }
  37. using executor_type = asio::associated_executor_t<handler_type>;
  38. executor_type get_executor() const noexcept {
  39. return asio::get_associated_executor(_handler);
  40. }
  41. template <typename BufferType>
  42. void perform(BufferType& buffer) {
  43. auto stream_ptr = _owner._stream_ptr;
  44. if (_owner.was_connected())
  45. // note: write operation should not be time-limited
  46. detail::async_write(
  47. *stream_ptr, buffer,
  48. asio::prepend(std::move(*this), on_write {}, stream_ptr)
  49. );
  50. else
  51. _owner.async_reconnect(
  52. stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
  53. );
  54. }
  55. void operator()(
  56. on_write, typename Owner::stream_ptr stream_ptr,
  57. error_code ec, size_t bytes_written
  58. ) {
  59. if (!_owner.is_open())
  60. return complete(asio::error::operation_aborted, 0);
  61. if (!ec)
  62. return complete(ec, bytes_written);
  63. _owner.log().at_transport_error(ec);
  64. _owner.async_reconnect(
  65. stream_ptr, asio::prepend(std::move(*this), on_reconnect {})
  66. );
  67. }
  68. void operator()(on_reconnect, error_code ec) {
  69. if ((ec == asio::error::operation_aborted && _owner.is_open()) || !ec)
  70. ec = asio::error::try_again;
  71. return complete(ec, 0);
  72. }
  73. private:
  74. void complete(error_code ec, size_t bytes_written) {
  75. std::move(_handler)(ec, bytes_written);
  76. }
  77. };
  78. } // end namespace boost::mqtt5::detail
  79. #endif // !BOOST_MQTT5_WRITE_OP_HPP