client_service.hpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. //
  2. // Copyright (c) 2023-2025 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
  3. //
  4. // Distributed under the Boost Software License, Version 1.0.
  5. // (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_MQTT5_CLIENT_SERVICE_HPP
  8. #define BOOST_MQTT5_CLIENT_SERVICE_HPP
  9. #include <boost/mqtt5/detail/channel_traits.hpp>
  10. #include <boost/mqtt5/detail/internal_types.hpp>
  11. #include <boost/mqtt5/detail/log_invoke.hpp>
  12. #include <boost/mqtt5/impl/assemble_op.hpp>
  13. #include <boost/mqtt5/impl/async_sender.hpp>
  14. #include <boost/mqtt5/impl/autoconnect_stream.hpp>
  15. #include <boost/mqtt5/impl/replies.hpp>
  16. #include <boost/asio/async_result.hpp>
  17. #include <boost/asio/experimental/basic_channel.hpp>
  18. #include <boost/asio/post.hpp>
  19. #include <boost/asio/prepend.hpp>
  20. #include <cstdint>
  21. #include <memory>
  22. #include <string>
  23. #include <type_traits>
  24. #include <variant> // std::monostate
  25. namespace boost::mqtt5::detail {
  26. namespace asio = boost::asio;
  27. template <
  28. typename StreamType, typename TlsContext,
  29. typename Enable = void
  30. >
  31. class stream_context;
  32. template <
  33. typename StreamType, typename TlsContext
  34. >
  35. class stream_context<
  36. StreamType, TlsContext,
  37. std::enable_if_t<has_tls_layer<StreamType>>
  38. > {
  39. using tls_context_type = TlsContext;
  40. mqtt_ctx _mqtt_context;
  41. std::shared_ptr<tls_context_type> _tls_context_ptr;
  42. public:
  43. explicit stream_context(TlsContext tls_context) :
  44. _tls_context_ptr(std::make_shared<tls_context_type>(std::move(tls_context)))
  45. {}
  46. stream_context(const stream_context& other) :
  47. _mqtt_context(other._mqtt_context), _tls_context_ptr(other._tls_context_ptr)
  48. {}
  49. auto& mqtt_context() {
  50. return _mqtt_context;
  51. }
  52. const auto& mqtt_context() const {
  53. return _mqtt_context;
  54. }
  55. auto& tls_context() {
  56. return *_tls_context_ptr;
  57. }
  58. auto& session_state() {
  59. return _mqtt_context.state;
  60. }
  61. const auto& session_state() const {
  62. return _mqtt_context.state;
  63. }
  64. void will(will will) {
  65. _mqtt_context.will_msg = std::move(will);
  66. }
  67. template <prop::property_type p>
  68. const auto& connack_property(
  69. std::integral_constant<prop::property_type, p> prop
  70. ) const {
  71. return _mqtt_context.ca_props[prop];
  72. }
  73. const auto& connack_properties() const {
  74. return _mqtt_context.ca_props;
  75. }
  76. template <prop::property_type p>
  77. const auto& connect_property(
  78. std::integral_constant<prop::property_type, p> prop
  79. ) const {
  80. return _mqtt_context.co_props[prop];
  81. }
  82. template <prop::property_type p>
  83. auto& connect_property(
  84. std::integral_constant<prop::property_type, p> prop
  85. ) {
  86. return _mqtt_context.co_props[prop];
  87. }
  88. void connect_properties(connect_props props) {
  89. _mqtt_context.co_props = std::move(props);
  90. }
  91. void credentials(
  92. std::string client_id,
  93. std::string username = "", std::string password = ""
  94. ) {
  95. _mqtt_context.creds = {
  96. std::move(client_id),
  97. std::move(username), std::move(password)
  98. };
  99. }
  100. template <typename Authenticator>
  101. void authenticator(Authenticator&& authenticator) {
  102. _mqtt_context.authenticator = any_authenticator(
  103. std::forward<Authenticator>(authenticator)
  104. );
  105. }
  106. };
  107. template <typename StreamType>
  108. class stream_context<
  109. StreamType, std::monostate,
  110. std::enable_if_t<!has_tls_layer<StreamType>>
  111. > {
  112. mqtt_ctx _mqtt_context;
  113. public:
  114. explicit stream_context(std::monostate) {}
  115. stream_context(const stream_context& other) :
  116. _mqtt_context(other._mqtt_context)
  117. {}
  118. auto& mqtt_context() {
  119. return _mqtt_context;
  120. }
  121. const auto& mqtt_context() const {
  122. return _mqtt_context;
  123. }
  124. auto& session_state() {
  125. return _mqtt_context.state;
  126. }
  127. const auto& session_state() const {
  128. return _mqtt_context.state;
  129. }
  130. void will(will will) {
  131. _mqtt_context.will_msg = std::move(will);
  132. }
  133. template <prop::property_type p>
  134. const auto& connack_property(
  135. std::integral_constant<prop::property_type, p> prop
  136. ) const {
  137. return _mqtt_context.ca_props[prop];
  138. }
  139. const auto& connack_properties() const {
  140. return _mqtt_context.ca_props;
  141. }
  142. template <prop::property_type p>
  143. const auto& connect_property(
  144. std::integral_constant<prop::property_type, p> prop
  145. ) const {
  146. return _mqtt_context.co_props[prop];
  147. }
  148. template <prop::property_type p>
  149. auto& connect_property(
  150. std::integral_constant<prop::property_type, p> prop
  151. ) {
  152. return _mqtt_context.co_props[prop];
  153. }
  154. void connect_properties(connect_props props) {
  155. _mqtt_context.co_props = std::move(props);
  156. }
  157. void credentials(
  158. std::string client_id,
  159. std::string username = "", std::string password = ""
  160. ) {
  161. _mqtt_context.creds = {
  162. std::move(client_id),
  163. std::move(username), std::move(password)
  164. };
  165. }
  166. template <typename Authenticator>
  167. void authenticator(Authenticator&& authenticator) {
  168. _mqtt_context.authenticator = any_authenticator(
  169. std::forward<Authenticator>(authenticator)
  170. );
  171. }
  172. };
  173. template <
  174. typename StreamType,
  175. typename TlsContext = std::monostate,
  176. typename LoggerType = noop_logger
  177. >
  178. class client_service {
  179. using self_type = client_service<StreamType, TlsContext, LoggerType>;
  180. using stream_context_type = stream_context<StreamType, TlsContext>;
  181. using stream_type = autoconnect_stream<
  182. StreamType, stream_context_type, LoggerType
  183. >;
  184. public:
  185. using executor_type = typename stream_type::executor_type;
  186. private:
  187. using tls_context_type = TlsContext;
  188. using logger_type = LoggerType;
  189. using receive_channel = asio::experimental::basic_channel<
  190. executor_type,
  191. channel_traits<>,
  192. void (error_code, std::string, std::string, publish_props)
  193. >;
  194. template <typename ClientService, typename Handler>
  195. friend class run_op;
  196. template <typename ClientService>
  197. friend class async_sender;
  198. template <typename ClientService, typename Handler>
  199. friend class assemble_op;
  200. template <typename ClientService, typename Handler>
  201. friend class ping_op;
  202. template <typename ClientService, typename Handler>
  203. friend class sentry_op;
  204. template <typename ClientService>
  205. friend class re_auth_op;
  206. executor_type _executor;
  207. log_invoke<logger_type> _log;
  208. stream_context_type _stream_context;
  209. stream_type _stream;
  210. packet_id_allocator _pid_allocator;
  211. replies _replies;
  212. async_sender<client_service> _async_sender;
  213. std::string _read_buff;
  214. data_span _active_span;
  215. receive_channel _rec_channel;
  216. asio::steady_timer _ping_timer;
  217. asio::steady_timer _sentry_timer;
  218. client_service(const client_service& other) :
  219. _executor(other._executor),
  220. _log(other._log),
  221. _stream_context(other._stream_context),
  222. _stream(_executor, _stream_context, _log),
  223. _replies(_executor),
  224. _async_sender(*this),
  225. _active_span(_read_buff.cbegin(), _read_buff.cbegin()),
  226. _rec_channel(_executor, (std::numeric_limits<size_t>::max)()),
  227. _ping_timer(_executor),
  228. _sentry_timer(_executor)
  229. {
  230. _stream.clone_endpoints(other._stream);
  231. }
  232. public:
  233. explicit client_service(
  234. const executor_type& ex,
  235. tls_context_type tls_context = {}, logger_type logger = {}
  236. ) :
  237. _executor(ex),
  238. _log(std::move(logger)),
  239. _stream_context(std::move(tls_context)),
  240. _stream(ex, _stream_context, _log),
  241. _replies(ex),
  242. _async_sender(*this),
  243. _active_span(_read_buff.cbegin(), _read_buff.cbegin()),
  244. _rec_channel(ex, (std::numeric_limits<size_t>::max)()),
  245. _ping_timer(ex),
  246. _sentry_timer(ex)
  247. {}
  248. executor_type get_executor() const noexcept {
  249. return _executor;
  250. }
  251. auto dup() const {
  252. return std::shared_ptr<client_service>(new client_service(*this));
  253. }
  254. template <
  255. typename Ctx = TlsContext,
  256. std::enable_if_t<!std::is_same_v<Ctx, std::monostate>, bool> = true
  257. >
  258. decltype(auto) tls_context() {
  259. return _stream_context.tls_context();
  260. }
  261. void will(will will) {
  262. if (!is_open())
  263. _stream_context.will(std::move(will));
  264. }
  265. void credentials(
  266. std::string client_id,
  267. std::string username = "", std::string password = ""
  268. ) {
  269. if (!is_open())
  270. _stream_context.credentials(
  271. std::move(client_id),
  272. std::move(username), std::move(password)
  273. );
  274. }
  275. void brokers(std::string hosts, uint16_t default_port) {
  276. if (!is_open())
  277. _stream.brokers(std::move(hosts), default_port);
  278. }
  279. template <
  280. typename Authenticator,
  281. std::enable_if_t<is_authenticator<Authenticator>, bool> = true
  282. >
  283. void authenticator(Authenticator&& authenticator) {
  284. if (!is_open())
  285. _stream_context.authenticator(
  286. std::forward<Authenticator>(authenticator)
  287. );
  288. }
  289. uint16_t negotiated_keep_alive() const {
  290. return connack_property(prop::server_keep_alive)
  291. .value_or(_stream_context.mqtt_context().keep_alive);
  292. }
  293. void keep_alive(uint16_t seconds) {
  294. if (!is_open())
  295. _stream_context.mqtt_context().keep_alive = seconds;
  296. }
  297. template <prop::property_type p>
  298. const auto& connect_property(
  299. std::integral_constant<prop::property_type, p> prop
  300. ) const {
  301. return _stream_context.connect_property(prop);
  302. }
  303. template <prop::property_type p>
  304. void connect_property(
  305. std::integral_constant<prop::property_type, p> prop,
  306. prop::value_type_t<p> value
  307. ){
  308. if (!is_open())
  309. _stream_context.connect_property(prop) = value;
  310. }
  311. void connect_properties(connect_props props) {
  312. if (!is_open())
  313. _stream_context.connect_properties(std::move(props));
  314. }
  315. template <prop::property_type p>
  316. const auto& connack_property(
  317. std::integral_constant<prop::property_type, p> prop
  318. ) const {
  319. return _stream_context.connack_property(prop);
  320. }
  321. const auto& connack_properties() const {
  322. return _stream_context.connack_properties();
  323. }
  324. void open_stream() {
  325. _stream.open();
  326. }
  327. bool is_open() const {
  328. return _stream.is_open();
  329. }
  330. template <typename CompletionToken>
  331. decltype(auto) async_shutdown(CompletionToken&& token) {
  332. return _stream.async_shutdown(std::forward<CompletionToken>(token));
  333. }
  334. void cancel() {
  335. if (!_stream.is_open()) return;
  336. _ping_timer.cancel();
  337. _sentry_timer.cancel();
  338. _rec_channel.close();
  339. _replies.cancel_unanswered();
  340. _async_sender.cancel();
  341. _stream.cancel();
  342. _stream.close();
  343. }
  344. log_invoke<LoggerType>& log() {
  345. return _log;
  346. }
  347. uint16_t allocate_pid() {
  348. return _pid_allocator.allocate();
  349. }
  350. void free_pid(uint16_t pid, bool was_throttled = false) {
  351. _pid_allocator.free(pid);
  352. if (was_throttled)
  353. _async_sender.throttled_op_done();
  354. }
  355. serial_num_t next_serial_num() {
  356. return _async_sender.next_serial_num();
  357. }
  358. bool subscriptions_present() const {
  359. return _stream_context.session_state().subscriptions_present();
  360. }
  361. void subscriptions_present(bool present) {
  362. _stream_context.session_state().subscriptions_present(present);
  363. }
  364. void update_session_state() {
  365. auto& session_state = _stream_context.session_state();
  366. if (!session_state.session_present()) {
  367. _replies.clear_pending_pubrels();
  368. session_state.session_present(true);
  369. if (session_state.subscriptions_present()) {
  370. channel_store_error(client::error::session_expired);
  371. session_state.subscriptions_present(false);
  372. }
  373. }
  374. _ping_timer.cancel();
  375. }
  376. bool channel_store(decoders::publish_message message) {
  377. auto& [topic, packet_id, flags, props, payload] = message;
  378. return _rec_channel.try_send(
  379. error_code {}, std::move(topic),
  380. std::move(payload), std::move(props)
  381. );
  382. }
  383. bool channel_store_error(error_code ec) {
  384. return _rec_channel.try_send(
  385. ec, std::string {}, std::string {}, publish_props {}
  386. );
  387. }
  388. template <typename BufferType, typename CompletionToken>
  389. decltype(auto) async_send(
  390. const BufferType& buffer,
  391. serial_num_t serial_num, unsigned flags,
  392. CompletionToken&& token
  393. ) {
  394. return _async_sender.async_send(
  395. buffer, serial_num, flags, std::forward<CompletionToken>(token)
  396. );
  397. }
  398. template <typename CompletionToken>
  399. decltype(auto) async_assemble(CompletionToken&& token) {
  400. using Signature = void (error_code, uint8_t, byte_citer, byte_citer);
  401. auto initiation = [] (
  402. auto handler, self_type& self,
  403. std::string& read_buff, data_span& active_span
  404. ) {
  405. assemble_op {
  406. self, std::move(handler), read_buff, active_span
  407. }.perform(asio::transfer_at_least(0));
  408. };
  409. return asio::async_initiate<CompletionToken, Signature> (
  410. initiation, token, std::ref(*this),
  411. std::ref(_read_buff), std::ref(_active_span)
  412. );
  413. }
  414. template <typename CompletionToken>
  415. decltype(auto) async_wait_reply(
  416. control_code_e code, uint16_t packet_id, CompletionToken&& token
  417. ) {
  418. return _replies.async_wait_reply(
  419. code, packet_id, std::forward<CompletionToken>(token)
  420. );
  421. }
  422. template <typename CompletionToken>
  423. decltype(auto) async_channel_receive(CompletionToken&& token) {
  424. return _rec_channel.async_receive(std::forward<CompletionToken>(token));
  425. }
  426. };
  427. } // namespace boost::mqtt5::detail
  428. #endif // !BOOST_MQTT5_CLIENT_SERVICE_HPP