client.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. /////////////////////////////////////////////////////////////////////////////
  2. /// @file client.h
  3. /// Declaration of MQTT client class
  4. /// @date May 1, 2013
  5. /// @author Frank Pagliughi
  6. /////////////////////////////////////////////////////////////////////////////
  7. /*******************************************************************************
  8. * Copyright (c) 2013-2023 Frank Pagliughi <fpagliughi@mindspring.com>
  9. *
  10. * All rights reserved. This program and the accompanying materials
  11. * are made available under the terms of the Eclipse Public License v2.0
  12. * and Eclipse Distribution License v1.0 which accompany this distribution.
  13. *
  14. * The Eclipse Public License is available at
  15. * http://www.eclipse.org/legal/epl-v20.html
  16. * and the Eclipse Distribution License is available at
  17. * http://www.eclipse.org/org/documents/edl-v10.php.
  18. *
  19. * Contributors:
  20. * Frank Pagliughi - initial implementation and documentation
  21. *******************************************************************************/
  22. #ifndef __mqtt_client_h
  23. #define __mqtt_client_h
  24. #include <future>
  25. #include "mqtt/async_client.h"
  26. namespace mqtt {
  27. /////////////////////////////////////////////////////////////////////////////
  28. /**
  29. * Lightweight client for talking to an MQTT server using methods that block
  30. * until an operation completes.
  31. */
  32. class client : private callback
  33. {
  34. /** An arbitrary, but relatively long timeout */
  35. PAHO_MQTTPP_EXPORT static const std::chrono::seconds DFLT_TIMEOUT;
  36. /** The default quality of service */
  37. PAHO_MQTTPP_EXPORT static const int DFLT_QOS; // =1;
  38. /** The actual client */
  39. async_client cli_;
  40. /** The longest time to wait for an operation to complete. */
  41. std::chrono::milliseconds timeout_;
  42. /** Callback supplied by the user (if any) */
  43. callback* userCallback_;
  44. /**
  45. * Creates a shared pointer to an existing non-heap object.
  46. * The shared pointer is given a no-op deleter, so it will not try to
  47. * destroy the object when it goes out of scope. It is up to the caller
  48. * to ensure that the object remains in memory for as long as there may
  49. * be pointers to it.
  50. * @param val A value which may live anywhere in memory (stack,
  51. * file-scope, etc).
  52. * @return A shared pointer to the object.
  53. */
  54. template <typename T>
  55. std::shared_ptr<T> ptr(const T& val) {
  56. return std::shared_ptr<T>(const_cast<T*>(&val), [](T*) {});
  57. }
  58. // User callbacks
  59. // Most are launched in a separate thread, for convenience, except
  60. // message_arrived, for performance.
  61. void connected(const string& cause) override {
  62. std::async(std::launch::async, &callback::connected, userCallback_, cause).wait();
  63. }
  64. void connection_lost(const string& cause) override {
  65. std::async(std::launch::async, &callback::connection_lost, userCallback_, cause)
  66. .wait();
  67. }
  68. void message_arrived(const_message_ptr msg) override {
  69. userCallback_->message_arrived(msg);
  70. }
  71. void delivery_complete(delivery_token_ptr tok) override {
  72. std::async(std::launch::async, &callback::delivery_complete, userCallback_, tok)
  73. .wait();
  74. }
  75. /** Non-copyable */
  76. client() = delete;
  77. client(const async_client&) = delete;
  78. client& operator=(const async_client&) = delete;
  79. public:
  80. /** Smart pointer type for this object */
  81. using ptr_t = std::shared_ptr<client>;
  82. /** Type for a collection of QOS values */
  83. using qos_collection = async_client::qos_collection;
  84. /** Handler for updating connection data before an auto-reconnect. */
  85. using update_connection_handler = async_client::update_connection_handler;
  86. /**
  87. * Create a client that can be used to communicate with an MQTT server.
  88. * @param serverURI the address of the server to connect to, specified
  89. * as a URI.
  90. * @param clientId a client identifier that is unique on the server
  91. * being connected to.
  92. * @param persistence The desired persistence type.
  93. */
  94. client(
  95. const string& serverURI, const string& clientId = string{},
  96. const persistence_type& persistence = NO_PERSISTENCE
  97. );
  98. /**
  99. * Create a client that can be used to communicate with an MQTT server,
  100. * which allows for off-line message buffering.
  101. * This allows the caller to specify a user-defined persistence object,
  102. * or use no persistence.
  103. * @param serverURI the address of the server to connect to, specified
  104. * as a URI.
  105. * @param clientId a client identifier that is unique on the server
  106. * being connected to
  107. * @param maxBufferedMessages the maximum number of messages allowed to
  108. * be buffered while not connected
  109. * @param persistence The desired persistence type.
  110. */
  111. client(
  112. const string& serverURI, const string& clientId, int maxBufferedMessages,
  113. const persistence_type& persistence = NO_PERSISTENCE
  114. );
  115. /**
  116. * Create a client that can be used to communicate with an MQTT server,
  117. * which allows for off-line message buffering. This allows the caller
  118. * to specify a user-defined persistence object, or use no persistence.
  119. * @param serverURI the address of the server to connect to, specified
  120. * as a URI.
  121. * @param clientId a client identifier that is unique on the server
  122. * being connected to
  123. * @param opts The create options
  124. * @param persistence The desired persistence type.
  125. */
  126. client(
  127. const string& serverURI, const string& clientId, const create_options& opts,
  128. const persistence_type& persistence = NO_PERSISTENCE
  129. );
  130. /**
  131. * Create a client that can be used to communicate with an MQTT server.
  132. * @param opts The create options
  133. */
  134. client(const create_options& opts);
  135. /**
  136. * Virtual destructor
  137. */
  138. virtual ~client() {}
  139. /**
  140. * Connects to an MQTT server using the default options.
  141. */
  142. virtual connect_response connect();
  143. /**
  144. * Connects to an MQTT server using the specified options.
  145. * @param opts The connect options
  146. */
  147. virtual connect_response connect(connect_options opts);
  148. /**
  149. * Reconnects the client using options from the previous connect.
  150. * The client must have previously called connect() for this to work.
  151. */
  152. virtual connect_response reconnect();
  153. /**
  154. * Disconnects from the server.
  155. */
  156. virtual void disconnect();
  157. /**
  158. * Disconnects from the server.
  159. * @param timeoutMS the amount of time in milliseconds to allow for
  160. * existing work to finish before disconnecting. A value
  161. * of zero or less means the client will not quiesce.
  162. */
  163. virtual void disconnect(int timeoutMS);
  164. /**
  165. * Disconnects from the server.
  166. * @param to the amount of time in milliseconds to allow for
  167. * existing work to finish before disconnecting. A value
  168. * of zero or less means the client will not quiesce.
  169. */
  170. template <class Rep, class Period>
  171. void disconnect(const std::chrono::duration<Rep, Period>& to) {
  172. disconnect((int)to_milliseconds_count(to));
  173. }
  174. /**
  175. * Gets the client ID used by this client.
  176. * @return The client ID used by this client.
  177. */
  178. virtual string get_client_id() const { return cli_.get_client_id(); }
  179. /**
  180. * Gets the address of the server used by this client.
  181. * @return The address of the server used by this client, as a URI.
  182. */
  183. virtual string get_server_uri() const { return cli_.get_server_uri(); }
  184. /**
  185. * Return the maximum time to wait for an action to complete.
  186. * @return int
  187. */
  188. virtual std::chrono::milliseconds get_timeout() const { return timeout_; }
  189. /**
  190. * Gets a copy of the connect options that were last used in a request
  191. * to connect to the broker.
  192. * @returns The last connect options that were used.
  193. */
  194. connect_options get_connect_options() const { return cli_.get_connect_options(); }
  195. /**
  196. * Get a topic object which can be used to publish messages on this
  197. * client.
  198. * @param top The topic name
  199. * @param qos The Quality of Service for the topic
  200. * @param retained Whether the published messages set the retain flag.
  201. * @return A topic attached to this client.
  202. */
  203. virtual topic get_topic(
  204. const string& top, int qos = message::DFLT_QOS, bool retained = message::DFLT_RETAINED
  205. ) {
  206. return topic(cli_, top, qos, retained);
  207. }
  208. /**
  209. * Determines if this client is currently connected to the server.
  210. * @return @em true if the client is currently connected, @em false if
  211. * not.
  212. */
  213. virtual bool is_connected() const { return cli_.is_connected(); }
  214. /**
  215. * Sets a callback to allow the application to update the connection
  216. * data on automatic reconnects.
  217. * @param cb The callback functor to register with the library.
  218. */
  219. void set_update_connection_handler(update_connection_handler cb) {
  220. cli_.set_update_connection_handler(cb);
  221. }
  222. /**
  223. * Publishes a message to a topic on the server and return once it is
  224. * delivered.
  225. * @param top The topic to publish
  226. * @param payload The data to publish
  227. * @param n The size in bytes of the data
  228. * @param qos The QoS for message delivery
  229. * @param retained Whether the broker should retain the message
  230. */
  231. virtual void publish(
  232. string_ref top, const void* payload, size_t n, int qos, bool retained
  233. ) {
  234. if (!cli_.publish(std::move(top), payload, n, qos, retained)->wait_for(timeout_))
  235. throw timeout_error();
  236. }
  237. /**
  238. * Publishes a message to a topic on the server and return once it is
  239. * delivered.
  240. * @param top The topic to publish
  241. * @param payload The data to publish
  242. * @param n The size in bytes of the data
  243. */
  244. virtual void publish(string_ref top, const void* payload, size_t n) {
  245. if (!cli_.publish(std::move(top), payload, n)->wait_for(timeout_))
  246. throw timeout_error();
  247. }
  248. /**
  249. * Publishes a message to a topic on the server.
  250. * @param msg The message
  251. */
  252. virtual void publish(const_message_ptr msg) {
  253. if (!cli_.publish(msg)->wait_for(timeout_))
  254. throw timeout_error();
  255. }
  256. /**
  257. * Publishes a message to a topic on the server.
  258. * This version will not timeout since that could leave the library with
  259. * a reference to memory that could disappear while the library is still
  260. * using it.
  261. * @param msg The message
  262. */
  263. virtual void publish(const message& msg) { cli_.publish(ptr(msg))->wait(); }
  264. /**
  265. * Sets the callback listener to use for events that happen
  266. * asynchronously.
  267. * @param cb The callback functions
  268. */
  269. virtual void set_callback(callback& cb);
  270. /**
  271. * Set the maximum time to wait for an action to complete.
  272. * @param timeoutMS The timeout in milliseconds
  273. */
  274. virtual void set_timeout(int timeoutMS) {
  275. timeout_ = std::chrono::milliseconds(timeoutMS);
  276. }
  277. /**
  278. * Set the maximum time to wait for an action to complete.
  279. * @param to The timeout as a std::chrono duration.
  280. */
  281. template <class Rep, class Period>
  282. void set_timeout(const std::chrono::duration<Rep, Period>& to) {
  283. timeout_ = to_milliseconds(to);
  284. }
  285. /**
  286. * Subscribe to a topic, which may include wildcards using a QoS of 1.
  287. * @param topicFilter A single topic to subscribe
  288. maked * @param props The MQTT v5 properties.
  289. * @param opts The MQTT v5 subscribe options for the topic
  290. * @return The "subscribe" response from the server.
  291. */
  292. virtual subscribe_response subscribe(
  293. const string& topicFilter, const subscribe_options& opts = subscribe_options(),
  294. const properties& props = properties()
  295. );
  296. /**
  297. * Subscribe to a topic, which may include wildcards.
  298. * @param topicFilter A single topic to subscribe
  299. * @param qos The QoS of the subscription
  300. * @param opts The MQTT v5 subscribe options for the topic
  301. * @param props The MQTT v5 properties.
  302. * @return The "subscribe" response from the server.
  303. */
  304. virtual subscribe_response subscribe(
  305. const string& topicFilter, int qos,
  306. const subscribe_options& opts = subscribe_options(),
  307. const properties& props = properties()
  308. );
  309. /**
  310. * Subscribes to a one or more topics, which may include wildcards using
  311. * a QoS of 1.
  312. * @param topicFilters A set of topics to subscribe
  313. * @param opts The MQTT v5 subscribe options (one for each topic)
  314. * @param props The MQTT v5 properties.
  315. * @return The "subscribe" response from the server.
  316. */
  317. virtual subscribe_response subscribe(
  318. const string_collection& topicFilters,
  319. const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
  320. const properties& props = properties()
  321. );
  322. /**
  323. * Subscribes to multiple topics, each of which may include wildcards.
  324. * @param topicFilters A collection of topics to subscribe
  325. * @param qos A collection of QoS for each topic
  326. * @param opts The MQTT v5 subscribe options (one for each topic)
  327. * @param props The MQTT v5 properties.
  328. * @return The "subscribe" response from the server.
  329. */
  330. virtual subscribe_response subscribe(
  331. const string_collection& topicFilters, const qos_collection& qos,
  332. const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
  333. const properties& props = properties()
  334. );
  335. /**
  336. * Requests the server unsubscribe the client from a topic.
  337. * @param topicFilter A single topic to unsubscribe.
  338. * @param props The MQTT v5 properties.
  339. * @return The "unsubscribe" response from the server.
  340. */
  341. virtual unsubscribe_response unsubscribe(
  342. const string& topicFilter, const properties& props = properties()
  343. );
  344. /**
  345. * Requests the server unsubscribe the client from one or more topics.
  346. * @param topicFilters A collection of topics to unsubscribe.
  347. * @param props The MQTT v5 properties.
  348. * @return The "unsubscribe" response from the server.
  349. */
  350. virtual unsubscribe_response unsubscribe(
  351. const string_collection& topicFilters, const properties& props = properties()
  352. );
  353. /**
  354. * Start consuming messages.
  355. * This initializes the client to receive messages through a queue that
  356. * can be read synchronously.
  357. */
  358. virtual void start_consuming() { cli_.start_consuming(); }
  359. /**
  360. * Stop consuming messages.
  361. * This shuts down the internal callback and discards any unread
  362. * messages.
  363. */
  364. virtual void stop_consuming() { cli_.stop_consuming(); }
  365. /**
  366. * Read the next message from the queue.
  367. * This blocks until a new message arrives.
  368. * @return The message and topic.
  369. */
  370. virtual const_message_ptr consume_message() { return cli_.consume_message(); }
  371. /**
  372. * Try to read the next message from the queue without blocking.
  373. * @param msg Pointer to the value to receive the message
  374. * @return @em true is a message was read, @em false if no message was
  375. * available.
  376. */
  377. virtual bool try_consume_message(const_message_ptr* msg) {
  378. return cli_.try_consume_message(msg);
  379. }
  380. /**
  381. * Waits a limited time for a message to arrive.
  382. * @param msg Pointer to the value to receive the message
  383. * @param relTime The maximum amount of time to wait for a message.
  384. * @return @em true if a message was read, @em false if a timeout
  385. * occurred.
  386. */
  387. template <typename Rep, class Period>
  388. bool try_consume_message_for(
  389. const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
  390. ) {
  391. return cli_.try_consume_message_for(msg, relTime);
  392. }
  393. /**
  394. * Waits until a specific time for a message to occur.
  395. * @param msg Pointer to the value to receive the message
  396. * @param absTime The time point to wait until, before timing out.
  397. * @return @em true if a message was read, @em false if a timeout
  398. * occurred.
  399. */
  400. template <class Clock, class Duration>
  401. bool try_consume_message_until(
  402. const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
  403. ) {
  404. return cli_.try_consume_message_until(msg, absTime);
  405. }
  406. };
  407. /** Smart/shared pointer to an MQTT synchronous client object */
  408. using client_ptr = client::ptr_t;
  409. /////////////////////////////////////////////////////////////////////////////
  410. } // namespace mqtt
  411. #endif // __mqtt_client_h