async_client.h 43 KB


  1. /////////////////////////////////////////////////////////////////////////////
  2. /// @file async_client.h
  3. /// Declaration of MQTT async_client class
  4. /// @date May 1, 2013
  5. /// @author Frank Pagliughi
  6. ///
  7. /// @mainpage The Eclipse Paho MQTT Library for C++
  8. ///
  9. /// @section Introduction
  10. ///
  11. /// This is the Eclipse Paho MQTT Library for C++. It contains an MQTT
  12. /// client for memory-managed operating systems like Windows, macOS, Linux,
  13. /// and other *nix-style systems.
  14. ///
  15. /////////////////////////////////////////////////////////////////////////////
  16. /*******************************************************************************
  17. * Copyright (c) 2013-2025 Frank Pagliughi <fpagliughi@mindspring.com>
  18. *
  19. * All rights reserved. This program and the accompanying materials
  20. * are made available under the terms of the Eclipse Public License v2.0
  21. * and Eclipse Distribution License v1.0 which accompany this distribution.
  22. *
  23. * The Eclipse Public License is available at
  24. * http://www.eclipse.org/legal/epl-v20.html
  25. * and the Eclipse Distribution License is available at
  26. * http://www.eclipse.org/org/documents/edl-v10.php.
  27. *
  28. * Contributors:
  29. * Frank Pagliughi - initial implementation and documentation
  30. * Frank Pagliughi - MQTT v5 support
  31. *******************************************************************************/
  32. #ifndef __mqtt_async_client_h
  33. #define __mqtt_async_client_h
  34. #include <functional>
  35. #include <list>
  36. #include <memory>
  37. #include <stdexcept>
  38. #include <tuple>
  39. #include <vector>
  40. #include "MQTTAsync.h"
  41. #include "mqtt/callback.h"
  42. #include "mqtt/create_options.h"
  43. #include "mqtt/delivery_token.h"
  44. #include "mqtt/event.h"
  45. #include "mqtt/exception.h"
  46. #include "mqtt/iaction_listener.h"
  47. #include "mqtt/iasync_client.h"
  48. #include "mqtt/iclient_persistence.h"
  49. #include "mqtt/message.h"
  50. #include "mqtt/properties.h"
  51. #include "mqtt/string_collection.h"
  52. #include "mqtt/thread_queue.h"
  53. #include "mqtt/token.h"
  54. #include "mqtt/types.h"
  55. namespace mqtt {
  56. // OBSOLETE: The legacy constants that lacked the "PAHO_MQTTPP_" prefix
  57. // clashed with #define's from other libraries and will be removed at the
  58. // next major version upgrade.
  59. #if defined(PAHO_MQTTPP_VERSIONS)
  60. /** The version number for the client library. */
  61. const uint32_t PAHO_MQTTPP_VERSION = 0x01050004;
  62. /** The version string for the client library */
  63. const string PAHO_MQTTPP_VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.4");
  64. /** Copyright notice for the client library */
  65. const string PAHO_MQTTPP_COPYRIGHT("Copyright (c) 2013-2025 Frank Pagliughi");
  66. #else
  67. /** The version number for the client library. */
  68. const uint32_t VERSION = 0x01050004;
  69. /** The version string for the client library */
  70. const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.4");
  71. /** Copyright notice for the client library */
  72. const string COPYRIGHT("Copyright (c) 2013-2025 Frank Pagliughi");
  73. #endif
  74. /////////////////////////////////////////////////////////////////////////////
  75. /**
  76. * Client for talking to an MQTT server using non-blocking
  77. * methods that allow an operation to run in the background.
  78. *
  79. * The location of the server is specified as a URI string with the
  80. * following schemas supported to specify the type and security used for the
  81. * connection:
  82. * @li @em "mqtt://" - A standard (insecure) connection over TCP. (Also,
  83. * "tcp://")
  84. * @li @em "mqtts://" - A secure connection using SSL/TLS sockets. (Also
  85. * "ssl://")
  86. * @li @em "ws://" - A standard (insecure) WebSocket connection.
  87. * @li @em "wss:// - A secure websocket connection using SSL/TLS.
  88. * @li @em "unix://" - A UNIX-domain connection on the local machine. (*nix
  89. * systems, only)
  90. *
  91. * The secure connection types assume that the library was built with
  92. * SSL/TLS support, otherwise requesting a secure connection will result in
  93. * an error.
  94. *
  95. * The communication methods of this class - `connect()`, `publish()`,
  96. * `subscribe()`, etc. - are all asynchronous. They create the request for
  97. * the server, but return immediately, before a response is received back
  98. * from the server.
  99. *
  100. * These methods return a `token` to the caller which is akin to a C++
  101. * std::future. The caller can keep the Token, then use it later to block
  102. * until the asynchronous operation is complete and retrieve the result of
  103. * the operation, including any response from the server.
  104. *
  105. * Alternately, the application can choose to set callbacks to be fired when
  106. * each operation completes. This can be used to create an event-driven
  107. * architecture, but is more complex in that it forces the user to avoid any
  108. * blocking operations and manually handle thread synchronization (since
  109. * the callbacks run in a separate thread managed by the library).
  110. *
  111. * Note that the numerous constructors pre-date the current, expansive,
  112. * `create_options` structure. For a full set of create options, a
  113. * builder can be used to specify the full set of options, then construct
  114. * the client with those options, like this:
  115. *
  116. * @code
  117. * auto createOpts = mqtt::create_options_builder()
  118. * .server_uri(serverURI)
  119. * .send_while_disconnected()
  120. * .max_buffered_messages(25)
  121. * .delete_oldest_messages()
  122. * .finalize();
  123. *
  124. * mqtt::async_client cli(createOpts);
  125. * @endcode
  126. */
  127. class async_client : public virtual iasync_client
  128. {
  129. public:
  130. /** Smart/shared pointer for an object of this class */
  131. using ptr_t = std::shared_ptr<async_client>;
  132. /** Type for a thread-safe queue to consume events synchronously */
  133. using consumer_queue_type = std::unique_ptr<thread_queue<event>>;
  134. /** Handler type for registering an individual message callback */
  135. using message_handler = std::function<void(const_message_ptr)>;
  136. /** Handler type for when a connection is made or lost */
  137. using connection_handler = std::function<void(const string& cause)>;
  138. /** Handler type for when a disconnect packet is received */
  139. using disconnected_handler = std::function<void(const properties&, ReasonCode)>;
  140. /** Handler for updating connection data before an auto-reconnect. */
  141. using update_connection_handler = std::function<bool(connect_data&)>;
  142. private:
  143. /** Lock guard type for this class */
  144. using guard = std::unique_lock<std::mutex>;
  145. /** Unique lock type for this class */
  146. using unique_lock = std::unique_lock<std::mutex>;
  147. /** Object monitor mutex */
  148. mutable std::mutex lock_;
  149. /** The underlying C-lib client. */
  150. MQTTAsync cli_;
  151. /** The options used to create the client */
  152. const create_options createOpts_;
  153. /** The MQTT protocol version of the connection */
  154. int mqttVersion_;
  155. /** A user persistence wrapper (if any) */
  156. std::unique_ptr<MQTTClient_persistence> persist_{};
  157. /** Callback supplied by the user (if any) */
  158. callback* userCallback_{};
  159. /** Connection handler */
  160. connection_handler connHandler_;
  161. /** Connection lost handler */
  162. connection_handler connLostHandler_;
  163. /** Disconnected handler */
  164. disconnected_handler disconnectedHandler_;
  165. /** Update connect data/options */
  166. update_connection_handler updateConnectionHandler_;
  167. /** Message handler */
  168. message_handler msgHandler_;
  169. /** Cached options from the last connect */
  170. connect_options connOpts_;
  171. /** Copy of connect token (for re-connects) */
  172. token_ptr connTok_;
  173. /** A list of tokens that are in play */
  174. std::list<token_ptr> pendingTokens_;
  175. /** A list of delivery tokens that are in play */
  176. std::list<delivery_token_ptr> pendingDeliveryTokens_;
  177. /** A queue of messages for consumer API */
  178. consumer_queue_type que_;
  179. /** Callbacks from the C library */
  180. static void on_connected(void* context, char* cause);
  181. static void on_connection_lost(void* context, char* cause);
  182. static void on_disconnected(
  183. void* context, MQTTProperties* cprops, MQTTReasonCodes reasonCode
  184. );
  185. static int on_message_arrived(
  186. void* context, char* topicName, int topicLen, MQTTAsync_message* msg
  187. );
  188. static void on_delivery_complete(void* context, MQTTAsync_token tok);
  189. static int on_update_connection(void* context, MQTTAsync_connectData* cdata);
  190. /** Manage internal list of active tokens */
  191. friend class token;
  192. virtual void add_token(token_ptr tok);
  193. virtual void add_token(delivery_token_ptr tok);
  194. virtual void remove_token(token* tok) override;
  195. virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
  196. void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
  197. /** Non-copyable */
  198. async_client() = delete;
  199. async_client(const async_client&) = delete;
  200. async_client& operator=(const async_client&) = delete;
  201. /** Checks a function return code and throws on error. */
  202. static void check_ret(int rc) {
  203. if (rc != MQTTASYNC_SUCCESS)
  204. throw exception(rc);
  205. }
  206. /**
  207. * Create an async_client that can be used to communicate with an MQTT
  208. * server, which allows for off-line message buffering.
  209. * This allows the caller to specify a user-defined persistence object,
  210. * or use no persistence.
  211. * @throw exception if an argument is invalid
  212. */
  213. void create();
  214. public:
  215. /**
  216. * Create an async_client that can be used to communicate with an MQTT
  217. * server.
  218. * This uses file-based persistence in the specified directory.
  219. * @param serverURI the address of the server to connect to, specified
  220. * as a URI.
  221. * @param clientId a client identifier that is unique on the server
  222. * being connected to
  223. * @param persistence The persistence that the client should use.
  224. * @throw exception if an argument is invalid
  225. */
  226. explicit async_client(
  227. const string& serverURI, const string& clientId = string{},
  228. const persistence_type& persistence = NO_PERSISTENCE
  229. )
  230. : createOpts_{serverURI, clientId, persistence} {
  231. create();
  232. }
  233. /**
  234. * Create an async_client that can be used to communicate with an MQTT
  235. * server, which allows for off-line message buffering.
  236. * This uses file-based persistence in the specified directory.
  237. * @param serverURI the address of the server to connect to, specified
  238. * as a URI.
  239. * @param clientId a client identifier that is unique on the server
  240. * being connected to
  241. * @param maxBufferedMessages the maximum number of messages allowed to
  242. * be buffered while not connected
  243. * @param persistence The persistence that the client should use.
  244. * @throw exception if an argument is invalid
  245. */
  246. async_client(
  247. const string& serverURI, const string& clientId, int maxBufferedMessages,
  248. const persistence_type& persistence = NO_PERSISTENCE
  249. )
  250. : createOpts_{serverURI, clientId, maxBufferedMessages, persistence} {
  251. create();
  252. }
  253. /**
  254. * Create an async_client that can be used to communicate with an MQTT
  255. * server, which allows for off-line message buffering.
  256. * This uses file-based persistence in the specified directory.
  257. * @param serverURI the address of the server to connect to, specified
  258. * as a URI.
  259. * @param clientId a client identifier that is unique on the server
  260. * being connected to
  261. * @param opts The create options
  262. * @param persistence The persistence that the client should use.
  263. * @throw exception if an argument is invalid
  264. */
  265. async_client(
  266. const string& serverURI, const string& clientId, const create_options& opts,
  267. const persistence_type& persistence = NO_PERSISTENCE
  268. )
  269. : createOpts_{serverURI, clientId, opts, persistence} {
  270. create();
  271. }
  272. /**
  273. * Create an async_client that can be used to communicate with an MQTT
  274. * server, which allows for off-line message buffering.
  275. * This allows the caller to specify a user-defined persistence object,
  276. * or use no persistence.
  277. * @param opts The create options
  278. * @throw exception if an argument is invalid
  279. */
  280. async_client(const create_options& opts) : createOpts_{opts} { create(); }
  281. /**
  282. * Destructor
  283. */
  284. ~async_client() override;
  285. /**
  286. * Sets a callback listener to use for events that happen
  287. * asynchronously.
  288. * @param cb callback receiver which will be invoked for certain
  289. * asynchronous events
  290. */
  291. void set_callback(callback& cb) override;
  292. /**
  293. * Stops callbacks.
  294. * This is not normally called by the application. It should be used
  295. * cautiously as it may cause the application to lose messages.
  296. */
  297. void disable_callbacks() override;
  298. /**
  299. * Callback for when a connection is made.
  300. * @param cb Callback functor for when the connection is made.
  301. */
  302. void set_connected_handler(connection_handler cb) /*override*/;
  303. /**
  304. * Callback for when a connection is lost.
  305. * @param cb Callback functor for when the connection is lost.
  306. */
  307. void set_connection_lost_handler(connection_handler cb) /*override*/;
  308. /**
  309. * Callback for when a disconnect packet is received from the server.
  310. * @param cb Callback for when the disconnect packet is received.
  311. */
  312. void set_disconnected_handler(disconnected_handler cb) /*override*/;
  313. /**
  314. * Sets the callback for when a message arrives from the broker.
  315. * Note that the application can only have one message handler which can
  316. * be installed individually using this method, or installled as a
  317. * listener object.
  318. * @param cb The callback functor to register with the library.
  319. */
  320. void set_message_callback(message_handler cb) /*override*/;
  321. /**
  322. * Sets a callback to allow the application to update the connection
  323. * data on automatic reconnects.
  324. * @param cb The callback functor to register with the library.
  325. */
  326. void set_update_connection_handler(update_connection_handler cb);
  327. /**
  328. * Connects to an MQTT server using the default options.
  329. * @return token used to track and wait for the connect to complete. The
  330. * token will be passed to any callback that has been set.
  331. * @throw exception for non security related problems
  332. * @throw security_exception for security related problems
  333. */
  334. token_ptr connect() override;
  335. /**
  336. * Connects to an MQTT server using the provided connect options.
  337. * @param options a set of connection parameters that override the
  338. * defaults.
  339. * @return token used to track and wait for the connect to complete. The
  340. * token will be passed to any callback that has been set.
  341. * @throw exception for non security related problems
  342. * @throw security_exception for security related problems
  343. */
  344. token_ptr connect(connect_options options) override;
  345. /**
  346. * Connects to an MQTT server using the specified options.
  347. * @param options a set of connection parameters that override the
  348. * defaults.
  349. * @param userContext optional object used to pass context to the
  350. * callback. Use @em nullptr if not required.
  351. * @param cb callback listener that will be notified when the connect
  352. * completes.
  353. * @return token used to track and wait for the connect to complete. The
  354. * token will be passed to any callback that has been set.
  355. * @throw exception for non security related problems
  356. * @throw security_exception for security related problems
  357. */
  358. token_ptr connect(
  359. connect_options options, void* userContext, iaction_listener& cb
  360. ) override;
  361. /**
  362. *
  363. * @param userContext optional object used to pass context to the
  364. * callback. Use @em nullptr if not required.
  365. * @param cb callback listener that will be notified when the connect
  366. * completes.
  367. * @return token used to track and wait for the connect to complete. The
  368. * token will be passed to any callback that has been set.
  369. * @throw exception for non security related problems
  370. * @throw security_exception for security related problems
  371. */
  372. token_ptr connect(void* userContext, iaction_listener& cb) override {
  373. return connect(connect_options{}, userContext, cb);
  374. }
  375. /**
  376. * Reconnects the client using options from the previous connect.
  377. * The client must have previously called connect() for this to work.
  378. * @return token used to track the progress of the reconnect.
  379. */
  380. token_ptr reconnect() override;
  381. /**
  382. * Disconnects from the server.
  383. * @return token used to track and wait for the disconnect to complete.
  384. * The token will be passed to any callback that has been set.
  385. * @throw exception for problems encountered while disconnecting
  386. */
  387. token_ptr disconnect() override { return disconnect(disconnect_options()); }
  388. /**
  389. * Disconnects from the server.
  390. * @param opts Options for disconnecting.
  391. * @return token used to track and wait for the disconnect to complete.
  392. * The token will be passed to any callback that has been set.
  393. * @throw exception for problems encountered while disconnecting
  394. */
  395. token_ptr disconnect(disconnect_options opts) override;
  396. /**
  397. * Disconnects from the server.
  398. * @param timeout the amount of time in milliseconds to allow for
  399. * existing work to finish before disconnecting. A value
  400. * of zero or less means the client will not quiesce.
  401. * @return Token used to track and wait for disconnect to complete. The
  402. * token will be passed to the callback methods if a callback is
  403. * set.
  404. * @throw exception for problems encountered while disconnecting
  405. */
  406. token_ptr disconnect(int timeout) override {
  407. return disconnect(disconnect_options(timeout));
  408. }
  409. /**
  410. * Disconnects from the server.
  411. * @param timeout the amount of time in milliseconds to allow for
  412. * existing work to finish before disconnecting. A value
  413. * of zero or less means the client will not quiesce.
  414. * @return Token used to track and wait for disconnect to complete. The
  415. * token will be passed to the callback methods if a callback is
  416. * set.
  417. * @throw exception for problems encountered while disconnecting
  418. */
  419. template <class Rep, class Period>
  420. token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
  421. // TODO: check range
  422. return disconnect((int)to_milliseconds_count(timeout));
  423. }
  424. /**
  425. * Disconnects from the server.
  426. * @param timeout the amount of time in milliseconds to allow for
  427. * existing work to finish before disconnecting. A value
  428. * of zero or less means the client will not quiesce.
  429. * @param userContext optional object used to pass context to the
  430. * callback. Use @em nullptr if not required.
  431. * @param cb callback listener that will be notified when the disconnect
  432. * completes.
  433. * @return token_ptr Token used to track and wait for disconnect to
  434. * complete. The token will be passed to the callback methods if
  435. * a callback is set.
  436. * @throw exception for problems encountered while disconnecting
  437. */
  438. token_ptr disconnect(int timeout, void* userContext, iaction_listener& cb) override;
  439. /**
  440. * Disconnects from the server.
  441. * @param timeout the amount of time in milliseconds to allow for
  442. * existing work to finish before disconnecting. A value
  443. * of zero or less means the client will not quiesce.
  444. * @param userContext optional object used to pass context to the
  445. * callback. Use @em nullptr if not required.
  446. * @param cb callback listener that will be notified when the disconnect
  447. * completes.
  448. * @return token_ptr Token used to track and wait for disconnect to
  449. * complete. The token will be passed to the callback methods if
  450. * a callback is set.
  451. * @throw exception for problems encountered while disconnecting
  452. */
  453. template <class Rep, class Period>
  454. token_ptr disconnect(
  455. const std::chrono::duration<Rep, Period>& timeout, void* userContext,
  456. iaction_listener& cb
  457. ) {
  458. // TODO: check range
  459. return disconnect((int)to_milliseconds_count(timeout), userContext, cb);
  460. }
  461. /**
  462. * Disconnects from the server.
  463. * @param userContext optional object used to pass context to the
  464. * callback. Use @em nullptr if not required.
  465. * @param cb callback listener that will be notified when the disconnect
  466. * completes.
  467. * @return token_ptr Token used to track and wait for disconnect to
  468. * complete. The token will be passed to the callback methods if
  469. * a callback is set.
  470. * @throw exception for problems encountered while disconnecting
  471. */
  472. token_ptr disconnect(void* userContext, iaction_listener& cb) override {
  473. return disconnect(0L, userContext, cb);
  474. }
  475. /**
  476. * Returns the delivery token for the specified message ID.
  477. * @return delivery_token
  478. */
  479. delivery_token_ptr get_pending_delivery_token(int msgID) const override;
  480. /**
  481. * Returns the delivery tokens for any outstanding publish operations.
  482. * @return delivery_token[]
  483. */
  484. std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
  485. /**
  486. * Returns the client ID used by this client.
  487. * @return The client ID used by this client.
  488. */
  489. string get_client_id() const override { return createOpts_.get_client_id(); }
  490. /**
  491. * Returns the address of the server used by this client.
  492. * @return The server's address, as a URI String.
  493. */
  494. string get_server_uri() const override { return createOpts_.get_server_uri(); }
  495. /**
  496. * Gets the MQTT version used by the client.
  497. * @return The MQTT version used by the client
  498. * @li MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if
  499. * that fails, fall back to 3.1
  500. * @li MQTTVERSION_3_1 (3) = only try version 3.1
  501. * @li MQTTVERSION_3_1_1 (4) = only try version 3.1.1
  502. * @li MQTTVERSION_5 (5) = only try version 5
  503. */
  504. int mqtt_version() const noexcept { return mqttVersion_; }
  505. /**
  506. * Gets a copy of the connect options that were last used in a request
  507. * to connect to the broker.
  508. * @returns The last connect options that were used.
  509. */
  510. connect_options get_connect_options() const {
  511. guard g(lock_);
  512. return connOpts_;
  513. }
  514. /**
  515. * Determines if this client is currently connected to the server.
  516. * @return true if connected, false otherwise.
  517. */
  518. bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
  519. /**
  520. * Publishes a message to a topic on the server
  521. * @param topic The topic to deliver the message to
  522. * @param payload the bytes to use as the message payload
  523. * @param n the number of bytes in the payload
  524. * @param qos the Quality of Service to deliver the message at. Valid
  525. * values are 0, 1 or 2.
  526. * @param retained whether or not this message should be retained by the
  527. * server.
  528. * @return token used to track and wait for the publish to complete. The
  529. * token will be passed to callback methods if set.
  530. */
  531. delivery_token_ptr publish(
  532. string_ref topic, const void* payload, size_t n, int qos, bool retained,
  533. const properties& props = properties()
  534. ) override;
  535. /**
  536. * Publishes a message to a topic on the server
  537. * @param topic The topic to deliver the message to
  538. * @param payload the bytes to use as the message payload
  539. * @param n the number of bytes in the payload
  540. * @return token used to track and wait for the publish to complete. The
  541. * token will be passed to callback methods if set.
  542. */
  543. delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
  544. return publish(
  545. std::move(topic), payload, n, message::DFLT_QOS, message::DFLT_RETAINED
  546. );
  547. }
  548. /**
  549. * Publishes a message to a topic on the server
  550. * @param topic The topic to deliver the message to
  551. * @param payload the bytes to use as the message payload
  552. * @param qos the Quality of Service to deliver the message at. Valid
  553. * values are 0, 1 or 2.
  554. * @param retained whether or not this message should be retained by the
  555. * server.
  556. * @return token used to track and wait for the publish to complete. The
  557. * token will be passed to callback methods if set.
  558. */
  559. delivery_token_ptr publish(
  560. string_ref topic, binary_ref payload, int qos, bool retained,
  561. const properties& props = properties()
  562. ) override;
  563. /**
  564. * Publishes a message to a topic on the server
  565. * @param topic The topic to deliver the message to
  566. * @param payload the bytes to use as the message payload
  567. * @return token used to track and wait for the publish to complete. The
  568. * token will be passed to callback methods if set.
  569. */
  570. delivery_token_ptr publish(string_ref topic, binary_ref payload) override {
  571. return publish(
  572. std::move(topic), std::move(payload), message::DFLT_QOS, message::DFLT_RETAINED
  573. );
  574. }
  575. /**
  576. * Publishes a message to a topic on the server
  577. * @param topic The topic to deliver the message to
  578. * @param payload the bytes to use as the message payload
  579. * @param n the number of bytes in the payload
  580. * @param qos the Quality of Service to deliver the message at. Valid
  581. * values are 0, 1 or 2.
  582. * @param retained whether or not this message should be retained by the
  583. * server.
  584. * @param userContext optional object used to pass context to the
  585. * callback. Use @em nullptr if not required.
  586. * @param cb Listener callback object
  587. * @return token used to track and wait for the publish to complete. The
  588. * token will be passed to callback methods if set.
  589. */
  590. delivery_token_ptr publish(
  591. string_ref topic, const void* payload, size_t n, int qos, bool retained,
  592. void* userContext, iaction_listener& cb
  593. ) override;
  594. /**
  595. * Publishes a message to a topic on the server Takes an Message
  596. * message and delivers it to the server at the requested quality of
  597. * service.
  598. * @param msg the message to deliver to the server
  599. * @return token used to track and wait for the publish to complete. The
  600. * token will be passed to callback methods if set.
  601. */
  602. delivery_token_ptr publish(const_message_ptr msg) override;
  603. /**
  604. * Publishes a message to a topic on the server.
  605. * @param msg the message to deliver to the server
  606. * @param userContext optional object used to pass context to the
  607. * callback. Use @em nullptr if not required.
  608. * @param cb callback optional listener that will be notified when message
  609. * delivery has completed to the requested quality of
  610. * service
  611. * @return token used to track and wait for the publish to complete. The
  612. * token will be passed to callback methods if set.
  613. */
  614. delivery_token_ptr publish(
  615. const_message_ptr msg, void* userContext, iaction_listener& cb
  616. ) override;
  617. /**
  618. * Subscribe to a topic, which may include wildcards.
  619. * @param topicFilter the topic to subscribe to, which can include
  620. * wildcards.
  621. * @param qos The quality of service for the subscription
  622. * @param opts The MQTT v5 subscribe options for the topic
  623. * @param props The MQTT v5 properties.
  624. * @return token used to track and wait for the subscribe to complete.
  625. * The token will be passed to callback methods if set.
  626. */
  627. token_ptr subscribe(
  628. const string& topicFilter, int qos,
  629. const subscribe_options& opts = subscribe_options(),
  630. const properties& props = properties()
  631. ) override;
  632. /**
  633. * Subscribe to a topic, which may include wildcards.
  634. * @param topicFilter the topic to subscribe to, which can include
  635. * wildcards.
  636. * @param qos The maximum quality of service at which to subscribe.
  637. * Messages published at a lower quality of service will be
  638. * received at the published QoS. Messages published at a
  639. * higher quality of service will be received using the QoS
  640. * specified on the subscribe.
  641. * @param userContext optional object used to pass context to the
  642. * callback. Use @em nullptr if not required.
  643. * @param cb listener that will be notified when subscribe has completed
  644. * @param opts The MQTT v5 subscribe options for the topic
  645. * @param props The MQTT v5 properties.
  646. * @return token used to track and wait for the subscribe to complete.
  647. * The token will be passed to callback methods if set.
  648. */
  649. token_ptr subscribe(
  650. const string& topicFilter, int qos, void* userContext, iaction_listener& cb,
  651. const subscribe_options& opts = subscribe_options(),
  652. const properties& props = properties()
  653. ) override;
  654. /**
  655. * Subscribe to multiple topics, each of which may include wildcards.
  656. * @param topicFilters The collection of topic filters to subscribe to,
  657. * any of which can include wildcards
  658. * @param qos The maximum quality of service at which to subscribe.
  659. * Messages published at a lower quality of service will be
  660. * received at the published QoS. Messages published at a
  661. * higher quality of service will be received using the QoS
  662. * specified on the subscribe.
  663. * @param opts The MQTT v5 subscribe options (one for each topic)
  664. * @param props The MQTT v5 properties.
  665. * @return token used to track and wait for the subscribe to complete.
  666. * The token will be passed to callback methods if set.
  667. */
  668. token_ptr subscribe(
  669. const_string_collection_ptr topicFilters, const qos_collection& qos,
  670. const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
  671. const properties& props = properties()
  672. ) override;
  673. /**
  674. * Subscribes to multiple topics, each of which may include wildcards.
  675. * @param topicFilters The collection of topic filters to subscribe to,
  676. * any of which can include wildcards
  677. * @param qos The maximum quality of service at which to subscribe.
  678. * Messages published at a lower quality of service will be
  679. * received at the published QoS. Messages published at a
  680. * higher quality of service will be received using the QoS
  681. * specified on the subscribe.
  682. * @param userContext Optional object used to pass context to the
  683. * callback. Use @em nullptr if not required.
  684. * @param cb listener that will be notified when subscribe has completed
  685. * @param opts The MQTT v5 subscribe options (one for each topic)
  686. * @param props The MQTT v5 properties.
  687. * @return token used to track and wait for the subscribe to complete.
  688. * The token will be passed to callback methods if set.
  689. */
  690. token_ptr subscribe(
  691. const_string_collection_ptr topicFilters, const qos_collection& qos,
  692. void* userContext, iaction_listener& cb,
  693. const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
  694. const properties& props = properties()
  695. ) override;
  696. /**
  697. * Requests the server unsubscribe the client from a topic.
  698. * @param topicFilter The topic to unsubscribe from. It must match a
  699. * topicFilter specified on an earlier subscribe.
  700. * @param props The MQTT v5 properties.
  701. * @return token Used to track and wait for the unsubscribe to complete.
  702. * The token will be passed to callback methods if set.
  703. */
  704. token_ptr unsubscribe(
  705. const string& topicFilter, const properties& props = properties()
  706. ) override;
  707. /**
  708. * Requests the server unsubscribe the client from one or more topics.
  709. * @param topicFilters One or more topics to unsubscribe from. Each
  710. * topicFilter must match one specified on an
  711. * earlier subscribe.
  712. * @param props The MQTT v5 properties.
  713. * @return token used to track and wait for the unsubscribe to complete.
  714. * The token will be passed to callback methods if set.
  715. */
  716. token_ptr unsubscribe(
  717. const_string_collection_ptr topicFilters, const properties& props = properties()
  718. ) override;
  719. /**
  720. * Requests the server unsubscribe the client from one or more topics.
  721. * @param topicFilters One or more topics to unsubscribe from. Each
  722. * topicFilter must match one specified on an
  723. * earlier subscribe.
  724. * @param userContext optional object used to pass context to the
  725. * callback. Use @em nullptr if not required.
  726. * @param cb listener that will be notified when unsubscribe has
  727. * completed
  728. * @param props The MQTT v5 properties.
  729. * @return token used to track and wait for the unsubscribe to complete.
  730. * The token will be passed to callback methods if set.
  731. */
  732. token_ptr unsubscribe(
  733. const_string_collection_ptr topicFilters, void* userContext, iaction_listener& cb,
  734. const properties& props = properties()
  735. ) override;
  736. /**
  737. * Requests the server unsubscribe the client from a topics.
  738. * @param topicFilter the topic to unsubscribe from. It must match a
  739. * topicFilter specified on an earlier subscribe.
  740. * @param userContext optional object used to pass context to the
  741. * callback. Use @em nullptr if not required.
  742. * @param cb listener that will be notified when unsubscribe has
  743. * completed
  744. * @param props The MQTT v5 properties.
  745. * @return token used to track and wait for the unsubscribe to complete.
  746. * The token will be passed to callback methods if set.
  747. */
  748. token_ptr unsubscribe(
  749. const string& topicFilter, void* userContext, iaction_listener& cb,
  750. const properties& props = properties()
  751. ) override;
  752. /**
  753. * Start consuming messages.
  754. *
  755. * This initializes the client to receive messages through a queue that
  756. * can be read synchronously.
  757. *
  758. * Normally this should be called _before_ connecting the client to the
  759. * broker, in order to have the consumer queue in place in the event
  760. * that the immediately starts sending messages (such as any retained
  761. * messages) while the client is still in the context of the connect
  762. * call.
  763. *
  764. * This _must_ also be called before calling any 'consume_message' or
  765. * "'consume_event' methods.
  766. *
  767. * Internally, this just creates a thread-safe queue for `mqtt::event`
  768. * objects, then hooks into the message and state-change callback to
  769. * push events into the queue in the order received.
  770. */
  771. void start_consuming() override;
  772. /**
  773. * Stop consuming messages.
  774. *
  775. * This shuts down the internal callback and closes the internal
  776. * consumer queue. Any remaining messages and events can be read until
  777. * the queue is emptied, but nothing further will be added to it.
  778. * This will also wake up any thread waiting on the queue.
  779. */
  780. void stop_consuming() override;
  781. /**
  782. * This clears the consumer queue, discarding any pending event.
  783. */
  784. void clear_consumer() override {
  785. if (que_)
  786. que_->clear();
  787. }
  788. /**
  789. * Determines if the consumer queue has been closed.
  790. * Once closed, any events in the queue can still be read, but no new
  791. * events can be added to it.
  792. * @return @true if the consumer queue has been closed, @false
  793. * otherwise.
  794. */
  795. bool consumer_closed() noexcept override { return !que_ || que_->closed(); }
  796. /**
  797. * Determines if the consumer queue is "done" (closed and empty).
  798. * Once the queue is done, no more events can be added or removed from
  799. * the queue.
  800. * @return @true if the consumer queue is closed and empty, @false
  801. * otherwise.
  802. */
  803. bool consumer_done() noexcept override { return !que_ || que_->done(); }
  804. /**
  805. * Gets the number of events available for immediate consumption.
  806. * Note that this retrieves the number of "raw" events, not messages,
  807. * e.g. may include a connected_event which is not returned by try_consume_message().
  808. * When polling the queue from multiple threads, prefer using try_consume_event(),
  809. * as the event count may change between checking the size and actual retrieval.
  810. * @return the number of events in the queue.
  811. */
  812. std::size_t consumer_queue_size() const override { return (que_) ? que_->size() : 0; }
  813. /**
  814. * Read the next client event from the queue.
  815. * This blocks until a new message arrives.
  816. * If the consumer queue is closed, this returns a shutdown event.
  817. * @return The client event.
  818. */
  819. event consume_event() override;
  820. /**
  821. * Try to read the next client event without blocking.
  822. * @param evt Pointer to the value to receive the event
  823. * @return @em true if an event was read, @em false if no
  824. * event was available.
  825. */
  826. bool try_consume_event(event* evt) override;
  827. /**
  828. * Waits a limited time for a client event to appear.
  829. * @param evt Pointer to the value to receive the event.
  830. * @param relTime The maximum amount of time to wait for an event.
  831. * @return @em true if an event was read, @em false if a timeout
  832. * occurred.
  833. */
  834. template <typename Rep, class Period>
  835. bool try_consume_event_for(
  836. event* evt, const std::chrono::duration<Rep, Period>& relTime
  837. ) {
  838. if (!que_)
  839. throw mqtt::exception(-1, "Consumer not started");
  840. try {
  841. return que_->try_get_for(evt, relTime);
  842. }
  843. catch (queue_closed&) {
  844. *evt = event{shutdown_event{}};
  845. return true;
  846. }
  847. }
  848. /**
  849. * Waits a limited time for a client event to arrive.
  850. * @param relTime The maximum amount of time to wait for an event.
  851. * @return The event that was received. It will contain empty message on
  852. * timeout.
  853. */
  854. template <typename Rep, class Period>
  855. event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
  856. event evt;
  857. try {
  858. que_->try_get_for(&evt, relTime);
  859. }
  860. catch (queue_closed&) {
  861. evt = event{shutdown_event{}};
  862. }
  863. return evt;
  864. }
  865. /**
  866. * Waits until a specific time for a client event to appear.
  867. * @param evt Pointer to the value to receive the event.
  868. * @param absTime The time point to wait until, before timing out.
  869. * @return @em true if an event was recceived, @em false if a timeout
  870. * occurred.
  871. */
  872. template <class Clock, class Duration>
  873. bool try_consume_event_until(
  874. event* evt, const std::chrono::time_point<Clock, Duration>& absTime
  875. ) {
  876. if (!que_)
  877. throw mqtt::exception(-1, "Consumer not started");
  878. try {
  879. return que_->try_get_until(evt, absTime);
  880. }
  881. catch (queue_closed&) {
  882. *evt = event{shutdown_event{}};
  883. return true;
  884. }
  885. }
  886. /**
  887. * Waits until a specific time for a client event to appear.
  888. * @param absTime The time point to wait until, before timing out.
  889. * @return The event that was received. It will contain empty message on
  890. * timeout.
  891. */
  892. template <class Clock, class Duration>
  893. event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime) {
  894. event evt;
  895. try {
  896. que_->try_get_until(&evt, absTime);
  897. }
  898. catch (queue_closed&) {
  899. evt = event{shutdown_event{}};
  900. }
  901. return evt;
  902. }
  903. /**
  904. * Read the next message from the queue.
  905. * This blocks until a new message arrives or until a disconnect or
  906. * shutdown occurs.
  907. * @return The message and topic.
  908. */
  909. const_message_ptr consume_message() override;
  910. /**
  911. * Try to read the next message from the queue without blocking.
  912. * @param msg Pointer to the value to receive the message
  913. * @return @em true is a message was read, @em false if no message was
  914. * available.
  915. */
  916. bool try_consume_message(const_message_ptr* msg) override;
  917. /**
  918. * Waits a limited time for a message to arrive.
  919. * @param msg Pointer to the value to receive the message
  920. * @param relTime The maximum amount of time to wait for a message.
  921. * @return @em true if a message was read, @em false if a timeout
  922. * occurred.
  923. */
  924. template <typename Rep, class Period>
  925. bool try_consume_message_for(
  926. const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
  927. ) {
  928. if (!que_)
  929. throw mqtt::exception(-1, "Consumer not started");
  930. event evt;
  931. while (true) {
  932. if (!try_consume_event_for(&evt, relTime))
  933. return false;
  934. if (const auto* pval = evt.get_message_if()) {
  935. *msg = std::move(*pval);
  936. break;
  937. }
  938. if (evt.is_any_disconnect()) {
  939. *msg = const_message_ptr{};
  940. break;
  941. }
  942. }
  943. return true;
  944. }
  945. /**
  946. * Waits a limited time for a message to arrive.
  947. * @param relTime The maximum amount of time to wait for a message.
  948. * @return A shared pointer to the message that was received. It will be
  949. * empty on timeout.
  950. */
  951. template <typename Rep, class Period>
  952. const_message_ptr try_consume_message_for(
  953. const std::chrono::duration<Rep, Period>& relTime
  954. ) {
  955. const_message_ptr msg;
  956. this->try_consume_message_for(&msg, relTime);
  957. return msg;
  958. }
  959. /**
  960. * Waits until a specific time for a message to appear.
  961. * @param msg Pointer to the value to receive the message
  962. * @param absTime The time point to wait until, before timing out.
  963. * @return @em true if a message was read, @em false if a timeout
  964. * occurred.
  965. */
  966. template <class Clock, class Duration>
  967. bool try_consume_message_until(
  968. const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
  969. ) {
  970. if (!que_)
  971. throw mqtt::exception(-1, "Consumer not started");
  972. event evt;
  973. while (true) {
  974. if (!try_consume_event_until(&evt, absTime))
  975. return false;
  976. if (const auto* pval = evt.get_message_if()) {
  977. *msg = std::move(*pval);
  978. break;
  979. }
  980. if (!evt.is_any_disconnect()) {
  981. *msg = const_message_ptr{};
  982. break;
  983. }
  984. }
  985. return true;
  986. }
  987. /**
  988. * Waits until a specific time for a message to appear.
  989. * @param absTime The time point to wait until, before timing out.
  990. * @return The message, if read, an empty pointer if not.
  991. */
  992. template <class Clock, class Duration>
  993. const_message_ptr try_consume_message_until(
  994. const std::chrono::time_point<Clock, Duration>& absTime
  995. ) {
  996. const_message_ptr msg;
  997. this->try_consume_message_until(&msg, absTime);
  998. return msg;
  999. }
  1000. };
  1001. /** Smart/shared pointer to an asynchronous MQTT client object */
  1002. using async_client_ptr = async_client::ptr_t;
  1003. /////////////////////////////////////////////////////////////////////////////
  1004. } // namespace mqtt
  1005. #endif // __mqtt_async_client_h