async_client.h 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812
  1. /////////////////////////////////////////////////////////////////////////////
  2. /// @file async_client.h
  3. /// Declaration of MQTT async_client class
  4. /// @date May 1, 2013
  5. /// @author Frank Pagliughi
  6. /////////////////////////////////////////////////////////////////////////////
  7. /*******************************************************************************
  8. * Copyright (c) 2013-2022 Frank Pagliughi <fpagliughi@mindspring.com>
  9. *
  10. * All rights reserved. This program and the accompanying materials
  11. * are made available under the terms of the Eclipse Public License v2.0
  12. * and Eclipse Distribution License v1.0 which accompany this distribution.
  13. *
  14. * The Eclipse Public License is available at
  15. * http://www.eclipse.org/legal/epl-v20.html
  16. * and the Eclipse Distribution License is available at
  17. * http://www.eclipse.org/org/documents/edl-v10.php.
  18. *
  19. * Contributors:
  20. * Frank Pagliughi - initial implementation and documentation
  21. * Frank Pagliughi - MQTT v5 support
  22. *******************************************************************************/
  23. #ifndef __mqtt_async_client_h
  24. #define __mqtt_async_client_h
  25. #include "MQTTAsync.h"
  26. #include "mqtt/types.h"
  27. #include "mqtt/token.h"
  28. #include "mqtt/create_options.h"
  29. #include "mqtt/string_collection.h"
  30. #include "mqtt/delivery_token.h"
  31. #include "mqtt/iclient_persistence.h"
  32. #include "mqtt/iaction_listener.h"
  33. #include "mqtt/properties.h"
  34. #include "mqtt/exception.h"
  35. #include "mqtt/message.h"
  36. #include "mqtt/callback.h"
  37. #include "mqtt/thread_queue.h"
  38. #include "mqtt/iasync_client.h"
  39. #include <vector>
  40. #include <list>
  41. #include <memory>
  42. #include <tuple>
  43. #include <functional>
  44. #include <stdexcept>
  45. namespace mqtt {
  46. // OBSOLETE: The legacy constants that lacked the "PAHO_MQTTPP_" prefix
  47. // clashed with #define's from other libraries and will be removed at the
  48. // next major version upgrade.
  49. #if defined(PAHO_MQTTPP_VERSIONS)
  50. /** The version number for the client library. */
  51. const uint32_t PAHO_MQTTPP_VERSION = 0x01030001;
  52. /** The version string for the client library */
  53. const string PAHO_MQTTPP_VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.3.1");
  54. /** Copyright notice for the client library */
  55. const string PAHO_MQTTPP_COPYRIGHT("Copyright (c) 2013-2023 Frank Pagliughi");
  56. #else
  57. /** The version number for the client library. */
  58. const uint32_t VERSION = 0x01030001;
  59. /** The version string for the client library */
  60. const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.3.1");
  61. /** Copyright notice for the client library */
  62. const string COPYRIGHT("Copyright (c) 2013-2023 Frank Pagliughi");
  63. #endif
  64. /////////////////////////////////////////////////////////////////////////////
  65. /**
  66. * Client for talking to an MQTT server using non-blocking
  67. * methods that allow an operation to run in the background.
  68. *
  69. * The location of the server is specified as a URI string with the
  70. * following schemas supported to specify the type and security used for the
  71. * connection:
  72. * @li @em "mqtt://" - A standard (insecure) connection over TCP. (Also,
  73. * "tcp://")
  74. * @li @em "mqtts://" - A secure connection using SSL/TLS sockets. (Also
  75. * "ssl://")
  76. * @li @em "ws://" - A standard (insecure) WebSocket connection.
  77. * @li @em "wss:// - A secure websocket connection using SSL/TLS.
  78. *
  79. * The secure connection types assume that the library was built with
  80. * OpenSSL support, otherwise requesting a secure conection will result in
  81. * an error.
  82. *
  83. * The communication methods of this class - `connect()`, `publish()`,
  84. * `subscribe()`, etc. - are all asynchronous. They create the request for
  85. * the server, but return imediately, before a response is received back
  86. * from the server.
  87. *
  88. * These methods return a `Token` to the caller which is akin to a C++
  89. * std::future. The caller can keep the Token, then use it later to block
  90. * until the asynchronous operation is complete and retrieve the result of
  91. * the operation, including any response from the server.
  92. *
  93. * Alternately, the application can choose to set callbacks to be fired when
  94. * each operation completes. This can be used to create an event-driven
  95. * architecture, but is more complex in that it forces the user to avoid any
  96. * blocking operations and manually handle thread synchronization (since
  97. * the callbacks run in a separate thread managed by the library).
  98. */
  99. class async_client : public virtual iasync_client
  100. {
  101. public:
  102. /** Smart/shared pointer for an object of this class */
  103. using ptr_t = std::shared_ptr<async_client>;
  104. /** Type for a thread-safe queue to consume messages synchronously */
  105. using consumer_queue_type = std::unique_ptr<thread_queue<const_message_ptr>>;
  106. /** Handler type for registering an individual message callback */
  107. using message_handler = std::function<void(const_message_ptr)>;
  108. /** Handler type for when a connecion is made or lost */
  109. using connection_handler = std::function<void(const string& cause)>;
  110. /** Handler type for when a disconnect packet is received */
  111. using disconnected_handler = std::function<void(const properties&, ReasonCode)>;
  112. /** Handler for updaing connection data before an auto-reconnect. */
  113. using update_connection_handler = std::function<bool(connect_data&)>;
  114. private:
  115. /** Lock guard type for this class */
  116. using guard = std::unique_lock<std::mutex>;
  117. /** Unique lock type for this class */
  118. using unique_lock = std::unique_lock<std::mutex>;
  119. /** Object monitor mutex */
  120. mutable std::mutex lock_;
  121. /** The underlying C-lib client. */
  122. MQTTAsync cli_;
  123. /** The server URI string. */
  124. string serverURI_;
  125. /** The client ID string that we provided to the server. */
  126. string clientId_;
  127. /** The MQTT protocol version we're connected at */
  128. int mqttVersion_;
  129. /** A user persistence wrapper (if any) */
  130. std::unique_ptr<MQTTClient_persistence> persist_;
  131. /** Callback supplied by the user (if any) */
  132. callback* userCallback_;
  133. /** Connection handler */
  134. connection_handler connHandler_;
  135. /** Connection lost handler */
  136. connection_handler connLostHandler_;
  137. /** Disconnected handler */
  138. disconnected_handler disconnectedHandler_;
  139. /** Update connect data/options */
  140. update_connection_handler updateConnectionHandler_;
  141. /** Message handler */
  142. message_handler msgHandler_;
  143. /** Cached options from the last connect */
  144. connect_options connOpts_;
  145. /** Copy of connect token (for re-connects) */
  146. token_ptr connTok_;
  147. /** A list of tokens that are in play */
  148. std::list<token_ptr> pendingTokens_;
  149. /** A list of delivery tokens that are in play */
  150. std::list<delivery_token_ptr> pendingDeliveryTokens_;
  151. /** A queue of messages for consumer API */
  152. consumer_queue_type que_;
  153. /** Callbacks from the C library */
  154. static void on_connected(void* context, char* cause);
  155. static void on_connection_lost(void *context, char *cause);
  156. static void on_disconnected(void* context, MQTTProperties* cprops,
  157. MQTTReasonCodes reasonCode);
  158. static int on_message_arrived(void* context, char* topicName, int topicLen,
  159. MQTTAsync_message* msg);
  160. static void on_delivery_complete(void* context, MQTTAsync_token tok);
  161. static int on_update_connection(void* context, MQTTAsync_connectData* cdata);
  162. /** Manage internal list of active tokens */
  163. friend class token;
  164. virtual void add_token(token_ptr tok);
  165. virtual void add_token(delivery_token_ptr tok);
  166. virtual void remove_token(token* tok) override;
  167. virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
  168. void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
  169. /** Non-copyable */
  170. async_client() =delete;
  171. async_client(const async_client&) =delete;
  172. async_client& operator=(const async_client&) =delete;
  173. /** Checks a function return code and throws on error. */
  174. static void check_ret(int rc) {
  175. if (rc != MQTTASYNC_SUCCESS)
  176. throw exception(rc);
  177. }
  178. public:
  179. /**
  180. * Create an async_client that can be used to communicate with an MQTT
  181. * server.
  182. * This uses file-based persistence in the specified directory.
  183. * @param serverURI the address of the server to connect to, specified
  184. * as a URI.
  185. * @param clientId a client identifier that is unique on the server
  186. * being connected to
  187. * @param persistDir The directory to use for persistence data
  188. * @throw exception if an argument is invalid
  189. */
  190. async_client(const string& serverURI, const string& clientId,
  191. const string& persistDir);
  192. /**
  193. * Create an async_client that can be used to communicate with an MQTT
  194. * server.
  195. * This allows the caller to specify a user-defined persistence object,
  196. * or use no persistence.
  197. * @param serverURI the address of the server to connect to, specified
  198. * as a URI.
  199. * @param clientId a client identifier that is unique on the server
  200. * being connected to
  201. * @param persistence The user persistence structure. If this is null,
  202. * then no persistence is used.
  203. * @throw exception if an argument is invalid
  204. */
  205. async_client(const string& serverURI, const string& clientId,
  206. iclient_persistence* persistence=nullptr);
  207. /**
  208. * Create an async_client that can be used to communicate with an MQTT
  209. * server, which allows for off-line message buffering.
  210. * This uses file-based persistence in the specified directory.
  211. * @param serverURI the address of the server to connect to, specified
  212. * as a URI.
  213. * @param clientId a client identifier that is unique on the server
  214. * being connected to
  215. * @param maxBufferedMessages the maximum number of messages allowed to
  216. * be buffered while not connected
  217. * @param persistDir The directory to use for persistence data
  218. * @throw exception if an argument is invalid
  219. */
  220. async_client(const string& serverURI, const string& clientId,
  221. int maxBufferedMessages, const string& persistDir);
  222. /**
  223. * Create an async_client that can be used to communicate with an MQTT
  224. * server, which allows for off-line message buffering.
  225. * This allows the caller to specify a user-defined persistence object,
  226. * or use no persistence.
  227. * @param serverURI the address of the server to connect to, specified
  228. * as a URI.
  229. * @param clientId a client identifier that is unique on the server
  230. * being connected to
  231. * @param maxBufferedMessages the maximum number of messages allowed to
  232. * be buffered while not connected
  233. * @param persistence The user persistence structure. If this is null,
  234. * then no persistence is used.
  235. * @throw exception if an argument is invalid
  236. */
  237. async_client(const string& serverURI, const string& clientId,
  238. int maxBufferedMessages,
  239. iclient_persistence* persistence=nullptr);
  240. /**
  241. * Create an async_client that can be used to communicate with an MQTT
  242. * server, which allows for off-line message buffering.
  243. * This uses file-based persistence in the specified directory.
  244. * @param serverURI the address of the server to connect to, specified
  245. * as a URI.
  246. * @param clientId a client identifier that is unique on the server
  247. * being connected to
  248. * @param opts The create options
  249. * @param persistDir The directory to use for persistence data
  250. * @throw exception if an argument is invalid
  251. */
  252. async_client(const string& serverURI, const string& clientId,
  253. const create_options& opts, const string& persistDir);
  254. /**
  255. * Create an async_client that can be used to communicate with an MQTT
  256. * server, which allows for off-line message buffering.
  257. * This allows the caller to specify a user-defined persistence object,
  258. * or use no persistence.
  259. * @param serverURI the address of the server to connect to, specified
  260. * as a URI.
  261. * @param clientId a client identifier that is unique on the server
  262. * being connected to
  263. * @param opts The create options
  264. * @param persistence The user persistence structure. If this is null,
  265. * then no persistence is used.
  266. * @throw exception if an argument is invalid
  267. */
  268. async_client(const string& serverURI, const string& clientId,
  269. const create_options& opts,
  270. iclient_persistence* persistence=nullptr);
  271. /**
  272. * Destructor
  273. */
  274. ~async_client() override;
  275. /**
  276. * Sets a callback listener to use for events that happen
  277. * asynchronously.
  278. * @param cb callback receiver which will be invoked for certain
  279. * asynchronous events
  280. */
  281. void set_callback(callback& cb) override;
  282. /**
  283. * Stops callbacks.
  284. * This is not normally called by the application. It should be used
  285. * cautiously as it may cause the application to lose messages.
  286. */
  287. void disable_callbacks() override;
  288. /**
  289. * Callback for when a connection is made.
  290. * @param cb Callback functor for when the connection is made.
  291. */
  292. void set_connected_handler(connection_handler cb) /*override*/;
  293. /**
  294. * Callback for when a connection is lost.
  295. * @param cb Callback functor for when the connection is lost.
  296. */
  297. void set_connection_lost_handler(connection_handler cb) /*override*/;
  298. /**
  299. * Callback for when a disconnect packet is received from the server.
  300. * @param cb Callback for when the disconnect packet is received.
  301. */
  302. void set_disconnected_handler(disconnected_handler cb) /*override*/;
  303. /**
  304. * Sets the callback for when a message arrives from the broker.
  305. * Note that the application can only have one message handler which can
  306. * be installed individually using this method, or installled as a
  307. * listener object.
  308. * @param cb The callback functor to register with the library.
  309. */
  310. void set_message_callback(message_handler cb) /*override*/;
  311. /**
  312. * Sets a callback to allow the application to update the connection
  313. * data on automatic reconnects.
  314. * @param cb The callback functor to register with the library.
  315. */
  316. void set_update_connection_handler(update_connection_handler cb);
  317. /**
  318. * Connects to an MQTT server using the default options.
  319. * @return token used to track and wait for the connect to complete. The
  320. * token will be passed to any callback that has been set.
  321. * @throw exception for non security related problems
  322. * @throw security_exception for security related problems
  323. */
  324. token_ptr connect() override;
  325. /**
  326. * Connects to an MQTT server using the provided connect options.
  327. * @param options a set of connection parameters that override the
  328. * defaults.
  329. * @return token used to track and wait for the connect to complete. The
  330. * token will be passed to any callback that has been set.
  331. * @throw exception for non security related problems
  332. * @throw security_exception for security related problems
  333. */
  334. token_ptr connect(connect_options options) override;
  335. /**
  336. * Connects to an MQTT server using the specified options.
  337. * @param options a set of connection parameters that override the
  338. * defaults.
  339. * @param userContext optional object used to pass context to the
  340. * callback. Use @em nullptr if not required.
  341. * @param cb callback listener that will be notified when the connect
  342. * completes.
  343. * @return token used to track and wait for the connect to complete. The
  344. * token will be passed to any callback that has been set.
  345. * @throw exception for non security related problems
  346. * @throw security_exception for security related problems
  347. */
  348. token_ptr connect(connect_options options, void* userContext,
  349. iaction_listener& cb) override;
  350. /**
  351. *
  352. * @param userContext optional object used to pass context to the
  353. * callback. Use @em nullptr if not required.
  354. * @param cb callback listener that will be notified when the connect
  355. * completes.
  356. * @return token used to track and wait for the connect to complete. The
  357. * token will be passed to any callback that has been set.
  358. * @throw exception for non security related problems
  359. * @throw security_exception for security related problems
  360. */
  361. token_ptr connect(void* userContext, iaction_listener& cb) override {
  362. return connect(connect_options{}, userContext, cb);
  363. }
  364. /**
  365. * Reconnects the client using options from the previous connect.
  366. * The client must have previously called connect() for this to work.
  367. * @return token used to track the progress of the reconnect.
  368. */
  369. token_ptr reconnect() override;
  370. /**
  371. * Disconnects from the server.
  372. * @return token used to track and wait for the disconnect to complete.
  373. * The token will be passed to any callback that has been set.
  374. * @throw exception for problems encountered while disconnecting
  375. */
  376. token_ptr disconnect() override { return disconnect(disconnect_options()); }
  377. /**
  378. * Disconnects from the server.
  379. * @param opts Options for disconnecting.
  380. * @return token used to track and wait for the disconnect to complete.
  381. * The token will be passed to any callback that has been set.
  382. * @throw exception for problems encountered while disconnecting
  383. */
  384. token_ptr disconnect(disconnect_options opts) override;
  385. /**
  386. * Disconnects from the server.
  387. * @param timeout the amount of time in milliseconds to allow for
  388. * existing work to finish before disconnecting. A value
  389. * of zero or less means the client will not quiesce.
  390. * @return Token used to track and wait for disconnect to complete. The
  391. * token will be passed to the callback methods if a callback is
  392. * set.
  393. * @throw exception for problems encountered while disconnecting
  394. */
  395. token_ptr disconnect(int timeout) override {
  396. return disconnect(disconnect_options(timeout));
  397. }
  398. /**
  399. * Disconnects from the server.
  400. * @param timeout the amount of time in milliseconds to allow for
  401. * existing work to finish before disconnecting. A value
  402. * of zero or less means the client will not quiesce.
  403. * @return Token used to track and wait for disconnect to complete. The
  404. * token will be passed to the callback methods if a callback is
  405. * set.
  406. * @throw exception for problems encountered while disconnecting
  407. */
  408. template <class Rep, class Period>
  409. token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
  410. // TODO: check range
  411. return disconnect((int) to_milliseconds_count(timeout));
  412. }
  413. /**
  414. * Disconnects from the server.
  415. * @param timeout the amount of time in milliseconds to allow for
  416. * existing work to finish before disconnecting. A value
  417. * of zero or less means the client will not quiesce.
  418. * @param userContext optional object used to pass context to the
  419. * callback. Use @em nullptr if not required.
  420. * @param cb callback listener that will be notified when the disconnect
  421. * completes.
  422. * @return token_ptr Token used to track and wait for disconnect to
  423. * complete. The token will be passed to the callback methods if
  424. * a callback is set.
  425. * @throw exception for problems encountered while disconnecting
  426. */
  427. token_ptr disconnect(int timeout, void* userContext,
  428. iaction_listener& cb) override;
  429. /**
  430. * Disconnects from the server.
  431. * @param timeout the amount of time in milliseconds to allow for
  432. * existing work to finish before disconnecting. A value
  433. * of zero or less means the client will not quiesce.
  434. * @param userContext optional object used to pass context to the
  435. * callback. Use @em nullptr if not required.
  436. * @param cb callback listener that will be notified when the disconnect
  437. * completes.
  438. * @return token_ptr Token used to track and wait for disconnect to
  439. * complete. The token will be passed to the callback methods if
  440. * a callback is set.
  441. * @throw exception for problems encountered while disconnecting
  442. */
  443. template <class Rep, class Period>
  444. token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout,
  445. void* userContext, iaction_listener& cb) {
  446. // TODO: check range
  447. return disconnect((int) to_milliseconds_count(timeout), userContext, cb);
  448. }
  449. /**
  450. * Disconnects from the server.
  451. * @param userContext optional object used to pass context to the
  452. * callback. Use @em nullptr if not required.
  453. * @param cb callback listener that will be notified when the disconnect
  454. * completes.
  455. * @return token_ptr Token used to track and wait for disconnect to
  456. * complete. The token will be passed to the callback methods if
  457. * a callback is set.
  458. * @throw exception for problems encountered while disconnecting
  459. */
  460. token_ptr disconnect(void* userContext, iaction_listener& cb) override {
  461. return disconnect(0L, userContext, cb);
  462. }
  463. /**
  464. * Returns the delivery token for the specified message ID.
  465. * @return delivery_token
  466. */
  467. delivery_token_ptr get_pending_delivery_token(int msgID) const override;
  468. /**
  469. * Returns the delivery tokens for any outstanding publish operations.
  470. * @return delivery_token[]
  471. */
  472. std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
  473. /**
  474. * Returns the client ID used by this client.
  475. * @return The client ID used by this client.
  476. */
  477. string get_client_id() const override { return clientId_; }
  478. /**
  479. * Returns the address of the server used by this client.
  480. * @return The server's address, as a URI String.
  481. */
  482. string get_server_uri() const override { return serverURI_; }
  483. /**
  484. * Gets the MQTT version used by the client.
  485. * @return The MQTT version used by the client
  486. * @li MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if
  487. * that fails, fall back to 3.1
  488. * @li MQTTVERSION_3_1 (3) = only try version 3.1
  489. * @li MQTTVERSION_3_1_1 (4) = only try version 3.1.1
  490. * @li MQTTVERSION_5 (5) = only try version 5
  491. */
  492. int mqtt_version() const noexcept { return mqttVersion_; }
  493. /**
  494. * Determines if this client is currently connected to the server.
  495. * @return true if connected, false otherwise.
  496. */
  497. bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
  498. /**
  499. * Publishes a message to a topic on the server
  500. * @param topic The topic to deliver the message to
  501. * @param payload the bytes to use as the message payload
  502. * @param n the number of bytes in the payload
  503. * @param qos the Quality of Service to deliver the message at. Valid
  504. * values are 0, 1 or 2.
  505. * @param retained whether or not this message should be retained by the
  506. * server.
  507. * @return token used to track and wait for the publish to complete. The
  508. * token will be passed to callback methods if set.
  509. */
  510. delivery_token_ptr publish(string_ref topic, const void* payload, size_t n,
  511. int qos, bool retained) override;
  512. /**
  513. * Publishes a message to a topic on the server
  514. * @param topic The topic to deliver the message to
  515. * @param payload the bytes to use as the message payload
  516. * @param n the number of bytes in the payload
  517. * @return token used to track and wait for the publish to complete. The
  518. * token will be passed to callback methods if set.
  519. */
  520. delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
  521. return publish(std::move(topic), payload, n,
  522. message::DFLT_QOS, message::DFLT_RETAINED);
  523. }
  524. /**
  525. * Publishes a message to a topic on the server
  526. * @param topic The topic to deliver the message to
  527. * @param payload the bytes to use as the message payload
  528. * @param qos the Quality of Service to deliver the message at. Valid
  529. * values are 0, 1 or 2.
  530. * @param retained whether or not this message should be retained by the
  531. * server.
  532. * @return token used to track and wait for the publish to complete. The
  533. * token will be passed to callback methods if set.
  534. */
  535. delivery_token_ptr publish(string_ref topic, binary_ref payload,
  536. int qos, bool retained) override;
  537. /**
  538. * Publishes a message to a topic on the server
  539. * @param topic The topic to deliver the message to
  540. * @param payload the bytes to use as the message payload
  541. * @return token used to track and wait for the publish to complete. The
  542. * token will be passed to callback methods if set.
  543. */
  544. delivery_token_ptr publish(string_ref topic, binary_ref payload) override {
  545. return publish(std::move(topic), std::move(payload),
  546. message::DFLT_QOS, message::DFLT_RETAINED);
  547. }
  548. /**
  549. * Publishes a message to a topic on the server
  550. * @param topic The topic to deliver the message to
  551. * @param payload the bytes to use as the message payload
  552. * @param n the number of bytes in the payload
  553. * @param qos the Quality of Service to deliver the message at. Valid
  554. * values are 0, 1 or 2.
  555. * @param retained whether or not this message should be retained by the
  556. * server.
  557. * @param userContext optional object used to pass context to the
  558. * callback. Use @em nullptr if not required.
  559. * @param cb
  560. * @return token used to track and wait for the publish to complete. The
  561. * token will be passed to callback methods if set.
  562. */
  563. delivery_token_ptr publish(string_ref topic,
  564. const void* payload, size_t n,
  565. int qos, bool retained,
  566. void* userContext, iaction_listener& cb) override;
  567. /**
  568. * Publishes a message to a topic on the server Takes an Message
  569. * message and delivers it to the server at the requested quality of
  570. * service.
  571. * @param msg the message to deliver to the server
  572. * @return token used to track and wait for the publish to complete. The
  573. * token will be passed to callback methods if set.
  574. */
  575. delivery_token_ptr publish(const_message_ptr msg) override;
  576. /**
  577. * Publishes a message to a topic on the server.
  578. * @param msg the message to deliver to the server
  579. * @param userContext optional object used to pass context to the
  580. * callback. Use @em nullptr if not required.
  581. * @param cb callback optional listener that will be notified when message
  582. * delivery has completed to the requested quality of
  583. * service
  584. * @return token used to track and wait for the publish to complete. The
  585. * token will be passed to callback methods if set.
  586. */
  587. delivery_token_ptr publish(const_message_ptr msg,
  588. void* userContext, iaction_listener& cb) override;
  589. /**
  590. * Subscribe to a topic, which may include wildcards.
  591. * @param topicFilter the topic to subscribe to, which can include
  592. * wildcards.
  593. * @param qos The quality of service for the subscription
  594. * @param opts The MQTT v5 subscribe options for the topic
  595. * @param props The MQTT v5 properties.
  596. * @return token used to track and wait for the subscribe to complete.
  597. * The token will be passed to callback methods if set.
  598. */
  599. token_ptr subscribe(const string& topicFilter, int qos,
  600. const subscribe_options& opts=subscribe_options(),
  601. const properties& props=properties()) override;
  602. /**
  603. * Subscribe to a topic, which may include wildcards.
  604. * @param topicFilter the topic to subscribe to, which can include
  605. * wildcards.
  606. * @param qos the maximum quality of service at which to subscribe.
  607. * Messages published at a lower quality of service will be
  608. * received at the published QoS. Messages published at a
  609. * higher quality of service will be received using the QoS
  610. * specified on the subscribe.
  611. * @param userContext optional object used to pass context to the
  612. * callback. Use @em nullptr if not required.
  613. * @param cb listener that will be notified when subscribe has completed
  614. * @param opts The MQTT v5 subscribe options for the topic
  615. * @param props The MQTT v5 properties.
  616. * @return token used to track and wait for the subscribe to complete.
  617. * The token will be passed to callback methods if set.
  618. */
  619. token_ptr subscribe(const string& topicFilter, int qos,
  620. void* userContext, iaction_listener& cb,
  621. const subscribe_options& opts=subscribe_options(),
  622. const properties& props=properties()) override;
  623. /**
  624. * Subscribe to multiple topics, each of which may include wildcards.
  625. * @param topicFilters
  626. * @param qos the maximum quality of service at which to subscribe.
  627. * Messages published at a lower quality of service will be
  628. * received at the published QoS. Messages published at a
  629. * higher quality of service will be received using the QoS
  630. * specified on the subscribe.
  631. * @param opts The MQTT v5 subscribe options (one for each topic)
  632. * @param props The MQTT v5 properties.
  633. * @return token used to track and wait for the subscribe to complete.
  634. * The token will be passed to callback methods if set.
  635. */
  636. token_ptr subscribe(const_string_collection_ptr topicFilters,
  637. const qos_collection& qos,
  638. const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
  639. const properties& props=properties()) override;
  640. /**
  641. * Subscribes to multiple topics, each of which may include wildcards.
  642. * @param topicFilters
  643. * @param qos the maximum quality of service at which to subscribe.
  644. * Messages published at a lower quality of service will be
  645. * received at the published QoS. Messages published at a
  646. * higher quality of service will be received using the QoS
  647. * specified on the subscribe.
  648. * @param userContext optional object used to pass context to the
  649. * callback. Use @em nullptr if not required.
  650. * @param cb listener that will be notified when subscribe has completed
  651. * @param opts The MQTT v5 subscribe options (one for each topic)
  652. * @param props The MQTT v5 properties.
  653. * @return token used to track and wait for the subscribe to complete.
  654. * The token will be passed to callback methods if set.
  655. */
  656. token_ptr subscribe(const_string_collection_ptr topicFilters,
  657. const qos_collection& qos,
  658. void* userContext, iaction_listener& cb,
  659. const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
  660. const properties& props=properties()) override;
  661. /**
  662. * Requests the server unsubscribe the client from a topic.
  663. * @param topicFilter the topic to unsubscribe from. It must match a
  664. * topicFilter specified on an earlier subscribe.
  665. * @param props The MQTT v5 properties.
  666. * @return token used to track and wait for the unsubscribe to complete.
  667. * The token will be passed to callback methods if set.
  668. */
  669. token_ptr unsubscribe(const string& topicFilter,
  670. const properties& props=properties()) override;
  671. /**
  672. * Requests the server unsubscribe the client from one or more topics.
  673. * @param topicFilters one or more topics to unsubscribe from. Each
  674. * topicFilter must match one specified on an
  675. * earlier subscribe.
  676. * @param props The MQTT v5 properties.
  677. * @return token used to track and wait for the unsubscribe to complete.
  678. * The token will be passed to callback methods if set.
  679. */
  680. token_ptr unsubscribe(const_string_collection_ptr topicFilters,
  681. const properties& props=properties()) override;
  682. /**
  683. * Requests the server unsubscribe the client from one or more topics.
  684. * @param topicFilters
  685. * @param userContext optional object used to pass context to the
  686. * callback. Use @em nullptr if not required.
  687. * @param cb listener that will be notified when unsubscribe has
  688. * completed
  689. * @param props The MQTT v5 properties.
  690. * @return token used to track and wait for the unsubscribe to complete.
  691. * The token will be passed to callback methods if set.
  692. */
  693. token_ptr unsubscribe(const_string_collection_ptr topicFilters,
  694. void* userContext, iaction_listener& cb,
  695. const properties& props=properties()) override;
  696. /**
  697. * Requests the server unsubscribe the client from a topics.
  698. * @param topicFilter the topic to unsubscribe from. It must match a
  699. * topicFilter specified on an earlier subscribe.
  700. * @param userContext optional object used to pass context to the
  701. * callback. Use @em nullptr if not required.
  702. * @param cb listener that will be notified when unsubscribe has
  703. * completed
  704. * @param props The MQTT v5 properties.
  705. * @return token used to track and wait for the unsubscribe to complete.
  706. * The token will be passed to callback methods if set.
  707. */
  708. token_ptr unsubscribe(const string& topicFilter,
  709. void* userContext, iaction_listener& cb,
  710. const properties& props=properties()) override;
  711. /**
  712. * Start consuming messages.
  713. * This initializes the client to receive messages through a queue that
  714. * can be read synchronously.
  715. */
  716. void start_consuming() override;
  717. /**
  718. * Stop consuming messages.
  719. * This shuts down the internal callback and discards any unread
  720. * messages.
  721. */
  722. void stop_consuming() override;
  723. /**
  724. * Read the next message from the queue.
  725. * This blocks until a new message arrives.
  726. * @return The message and topic.
  727. */
  728. const_message_ptr consume_message() override { return que_->get(); }
  729. /**
  730. * Try to read the next message from the queue without blocking.
  731. * @param msg Pointer to the value to receive the message
  732. * @return @em true is a message was read, @em false if no message was
  733. * available.
  734. */
  735. bool try_consume_message(const_message_ptr* msg) override {
  736. return que_->try_get(msg);
  737. }
  738. /**
  739. * Waits a limited time for a message to arrive.
  740. * @param msg Pointer to the value to receive the message
  741. * @param relTime The maximum amount of time to wait for a message.
  742. * @return @em true if a message was read, @em false if a timeout
  743. * occurred.
  744. */
  745. template <typename Rep, class Period>
  746. bool try_consume_message_for(const_message_ptr* msg,
  747. const std::chrono::duration<Rep, Period>& relTime) {
  748. return que_->try_get_for(msg, relTime);
  749. }
  750. /**
  751. * Waits a limited time for a message to arrive.
  752. * @param relTime The maximum amount of time to wait for a message.
  753. * @return A shared pointer to the message that was received. It will be
  754. * empty on timeout.
  755. */
  756. template <typename Rep, class Period>
  757. const_message_ptr try_consume_message_for(const std::chrono::duration<Rep, Period>& relTime) {
  758. const_message_ptr msg;
  759. que_->try_get_for(&msg, relTime);
  760. return msg;
  761. }
  762. /**
  763. * Waits until a specific time for a message to appear.
  764. * @param msg Pointer to the value to receive the message
  765. * @param absTime The time point to wait until, before timing out.
  766. * @return @em true if a message was read, @em false if a timeout
  767. * occurred.
  768. */
  769. template <class Clock, class Duration>
  770. bool try_consume_message_until(const_message_ptr* msg,
  771. const std::chrono::time_point<Clock,Duration>& absTime) {
  772. return que_->try_get_until(msg, absTime);
  773. }
  774. /**
  775. * Waits until a specific time for a message to appear.
  776. * @param absTime The time point to wait until, before timing out.
  777. * @return The message, if read, an empty pointer if not.
  778. */
  779. template <class Clock, class Duration>
  780. const_message_ptr try_consume_message_until(const std::chrono::time_point<Clock,Duration>& absTime) {
  781. const_message_ptr msg;
  782. que_->try_get_until(&msg, absTime);
  783. return msg;
  784. }
  785. };
  786. /** Smart/shared pointer to an asynchronous MQTT client object */
  787. using async_client_ptr = async_client::ptr_t;
  788. /////////////////////////////////////////////////////////////////////////////
  789. // end namespace mqtt
  790. }
  791. #endif // __mqtt_async_client_h