| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037 |
- /////////////////////////////////////////////////////////////////////////////
- /// @file async_client.h
- /// Declaration of MQTT async_client class
- /// @date May 1, 2013
- /// @author Frank Pagliughi
- ///
- /// @mainpage The Eclipse Paho MQTT Library for C++
- ///
- /// @section Introduction
- ///
- /// This is the Eclipse Paho MQTT Library for C++. It contains an MQTT
- /// client for memory-managed operating systems like Windows, macOS, Linux,
- /// and other *nix-style systems.
- ///
- /////////////////////////////////////////////////////////////////////////////
- /*******************************************************************************
- * Copyright (c) 2013-2025 Frank Pagliughi <fpagliughi@mindspring.com>
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v2.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v20.html
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Frank Pagliughi - initial implementation and documentation
- * Frank Pagliughi - MQTT v5 support
- *******************************************************************************/
- #ifndef __mqtt_async_client_h
- #define __mqtt_async_client_h
- #include <functional>
- #include <list>
- #include <memory>
- #include <stdexcept>
- #include <tuple>
- #include <vector>
- #include "MQTTAsync.h"
- #include "mqtt/callback.h"
- #include "mqtt/create_options.h"
- #include "mqtt/delivery_token.h"
- #include "mqtt/event.h"
- #include "mqtt/exception.h"
- #include "mqtt/iaction_listener.h"
- #include "mqtt/iasync_client.h"
- #include "mqtt/iclient_persistence.h"
- #include "mqtt/message.h"
- #include "mqtt/properties.h"
- #include "mqtt/string_collection.h"
- #include "mqtt/thread_queue.h"
- #include "mqtt/token.h"
- #include "mqtt/types.h"
- namespace mqtt {
- // OBSOLETE: The legacy constants that lacked the "PAHO_MQTTPP_" prefix
- // clashed with #define's from other libraries and will be removed at the
- // next major version upgrade.
- #if defined(PAHO_MQTTPP_VERSIONS)
- /** The version number for the client library. */
- const uint32_t PAHO_MQTTPP_VERSION = 0x01050004;
- /** The version string for the client library */
- const string PAHO_MQTTPP_VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.4");
- /** Copyright notice for the client library */
- const string PAHO_MQTTPP_COPYRIGHT("Copyright (c) 2013-2025 Frank Pagliughi");
- #else
- /** The version number for the client library. */
- const uint32_t VERSION = 0x01050004;
- /** The version string for the client library */
- const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.4");
- /** Copyright notice for the client library */
- const string COPYRIGHT("Copyright (c) 2013-2025 Frank Pagliughi");
- #endif
- /////////////////////////////////////////////////////////////////////////////
- /**
- * Client for talking to an MQTT server using non-blocking
- * methods that allow an operation to run in the background.
- *
- * The location of the server is specified as a URI string with the
- * following schemas supported to specify the type and security used for the
- * connection:
- * @li @em "mqtt://" - A standard (insecure) connection over TCP. (Also,
- * "tcp://")
- * @li @em "mqtts://" - A secure connection using SSL/TLS sockets. (Also
- * "ssl://")
- * @li @em "ws://" - A standard (insecure) WebSocket connection.
- * @li @em "wss:// - A secure websocket connection using SSL/TLS.
- * @li @em "unix://" - A UNIX-domain connection on the local machine. (*nix
- * systems, only)
- *
- * The secure connection types assume that the library was built with
- * SSL/TLS support, otherwise requesting a secure connection will result in
- * an error.
- *
- * The communication methods of this class - `connect()`, `publish()`,
- * `subscribe()`, etc. - are all asynchronous. They create the request for
- * the server, but return immediately, before a response is received back
- * from the server.
- *
- * These methods return a `token` to the caller which is akin to a C++
- * std::future. The caller can keep the Token, then use it later to block
- * until the asynchronous operation is complete and retrieve the result of
- * the operation, including any response from the server.
- *
- * Alternately, the application can choose to set callbacks to be fired when
- * each operation completes. This can be used to create an event-driven
- * architecture, but is more complex in that it forces the user to avoid any
- * blocking operations and manually handle thread synchronization (since
- * the callbacks run in a separate thread managed by the library).
- *
- * Note that the numerous constructors pre-date the current, expansive,
- * `create_options` structure. For a full set of create options, a
- * builder can be used to specify the full set of options, then construct
- * the client with those options, like this:
- *
- * @code
- * auto createOpts = mqtt::create_options_builder()
- * .server_uri(serverURI)
- * .send_while_disconnected()
- * .max_buffered_messages(25)
- * .delete_oldest_messages()
- * .finalize();
- *
- * mqtt::async_client cli(createOpts);
- * @endcode
- */
- class async_client : public virtual iasync_client
- {
- public:
- /** Smart/shared pointer for an object of this class */
- using ptr_t = std::shared_ptr<async_client>;
- /** Type for a thread-safe queue to consume events synchronously */
- using consumer_queue_type = std::unique_ptr<thread_queue<event>>;
- /** Handler type for registering an individual message callback */
- using message_handler = std::function<void(const_message_ptr)>;
- /** Handler type for when a connection is made or lost */
- using connection_handler = std::function<void(const string& cause)>;
- /** Handler type for when a disconnect packet is received */
- using disconnected_handler = std::function<void(const properties&, ReasonCode)>;
- /** Handler for updating connection data before an auto-reconnect. */
- using update_connection_handler = std::function<bool(connect_data&)>;
- private:
- /** Lock guard type for this class */
- using guard = std::unique_lock<std::mutex>;
- /** Unique lock type for this class */
- using unique_lock = std::unique_lock<std::mutex>;
- /** Object monitor mutex */
- mutable std::mutex lock_;
- /** The underlying C-lib client. */
- MQTTAsync cli_;
- /** The options used to create the client */
- const create_options createOpts_;
- /** The MQTT protocol version of the connection */
- int mqttVersion_;
- /** A user persistence wrapper (if any) */
- std::unique_ptr<MQTTClient_persistence> persist_{};
- /** Callback supplied by the user (if any) */
- callback* userCallback_{};
- /** Connection handler */
- connection_handler connHandler_;
- /** Connection lost handler */
- connection_handler connLostHandler_;
- /** Disconnected handler */
- disconnected_handler disconnectedHandler_;
- /** Update connect data/options */
- update_connection_handler updateConnectionHandler_;
- /** Message handler */
- message_handler msgHandler_;
- /** Cached options from the last connect */
- connect_options connOpts_;
- /** Copy of connect token (for re-connects) */
- token_ptr connTok_;
- /** A list of tokens that are in play */
- std::list<token_ptr> pendingTokens_;
- /** A list of delivery tokens that are in play */
- std::list<delivery_token_ptr> pendingDeliveryTokens_;
- /** A queue of messages for consumer API */
- consumer_queue_type que_;
- /** Callbacks from the C library */
- static void on_connected(void* context, char* cause);
- static void on_connection_lost(void* context, char* cause);
- static void on_disconnected(
- void* context, MQTTProperties* cprops, MQTTReasonCodes reasonCode
- );
- static int on_message_arrived(
- void* context, char* topicName, int topicLen, MQTTAsync_message* msg
- );
- static void on_delivery_complete(void* context, MQTTAsync_token tok);
- static int on_update_connection(void* context, MQTTAsync_connectData* cdata);
- /** Manage internal list of active tokens */
- friend class token;
- virtual void add_token(token_ptr tok);
- virtual void add_token(delivery_token_ptr tok);
- virtual void remove_token(token* tok) override;
- virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
- void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
- /** Non-copyable */
- async_client() = delete;
- async_client(const async_client&) = delete;
- async_client& operator=(const async_client&) = delete;
- /** Checks a function return code and throws on error. */
- static void check_ret(int rc) {
- if (rc != MQTTASYNC_SUCCESS)
- throw exception(rc);
- }
- /**
- * Create an async_client that can be used to communicate with an MQTT
- * server, which allows for off-line message buffering.
- * This allows the caller to specify a user-defined persistence object,
- * or use no persistence.
- * @throw exception if an argument is invalid
- */
- void create();
- public:
- /**
- * Create an async_client that can be used to communicate with an MQTT
- * server.
- * This uses file-based persistence in the specified directory.
- * @param serverURI the address of the server to connect to, specified
- * as a URI.
- * @param clientId a client identifier that is unique on the server
- * being connected to
- * @param persistence The persistence that the client should use.
- * @throw exception if an argument is invalid
- */
- explicit async_client(
- const string& serverURI, const string& clientId = string{},
- const persistence_type& persistence = NO_PERSISTENCE
- )
- : createOpts_{serverURI, clientId, persistence} {
- create();
- }
- /**
- * Create an async_client that can be used to communicate with an MQTT
- * server, which allows for off-line message buffering.
- * This uses file-based persistence in the specified directory.
- * @param serverURI the address of the server to connect to, specified
- * as a URI.
- * @param clientId a client identifier that is unique on the server
- * being connected to
- * @param maxBufferedMessages the maximum number of messages allowed to
- * be buffered while not connected
- * @param persistence The persistence that the client should use.
- * @throw exception if an argument is invalid
- */
- async_client(
- const string& serverURI, const string& clientId, int maxBufferedMessages,
- const persistence_type& persistence = NO_PERSISTENCE
- )
- : createOpts_{serverURI, clientId, maxBufferedMessages, persistence} {
- create();
- }
- /**
- * Create an async_client that can be used to communicate with an MQTT
- * server, which allows for off-line message buffering.
- * This uses file-based persistence in the specified directory.
- * @param serverURI the address of the server to connect to, specified
- * as a URI.
- * @param clientId a client identifier that is unique on the server
- * being connected to
- * @param opts The create options
- * @param persistence The persistence that the client should use.
- * @throw exception if an argument is invalid
- */
- async_client(
- const string& serverURI, const string& clientId, const create_options& opts,
- const persistence_type& persistence = NO_PERSISTENCE
- )
- : createOpts_{serverURI, clientId, opts, persistence} {
- create();
- }
- /**
- * Create an async_client that can be used to communicate with an MQTT
- * server, which allows for off-line message buffering.
- * This allows the caller to specify a user-defined persistence object,
- * or use no persistence.
- * @param opts The create options
- * @throw exception if an argument is invalid
- */
- async_client(const create_options& opts) : createOpts_{opts} { create(); }
- /**
- * Destructor
- */
- ~async_client() override;
- /**
- * Sets a callback listener to use for events that happen
- * asynchronously.
- * @param cb callback receiver which will be invoked for certain
- * asynchronous events
- */
- void set_callback(callback& cb) override;
- /**
- * Stops callbacks.
- * This is not normally called by the application. It should be used
- * cautiously as it may cause the application to lose messages.
- */
- void disable_callbacks() override;
- /**
- * Callback for when a connection is made.
- * @param cb Callback functor for when the connection is made.
- */
- void set_connected_handler(connection_handler cb) /*override*/;
- /**
- * Callback for when a connection is lost.
- * @param cb Callback functor for when the connection is lost.
- */
- void set_connection_lost_handler(connection_handler cb) /*override*/;
- /**
- * Callback for when a disconnect packet is received from the server.
- * @param cb Callback for when the disconnect packet is received.
- */
- void set_disconnected_handler(disconnected_handler cb) /*override*/;
- /**
- * Sets the callback for when a message arrives from the broker.
- * Note that the application can only have one message handler which can
- * be installed individually using this method, or installled as a
- * listener object.
- * @param cb The callback functor to register with the library.
- */
- void set_message_callback(message_handler cb) /*override*/;
- /**
- * Sets a callback to allow the application to update the connection
- * data on automatic reconnects.
- * @param cb The callback functor to register with the library.
- */
- void set_update_connection_handler(update_connection_handler cb);
- /**
- * Connects to an MQTT server using the default options.
- * @return token used to track and wait for the connect to complete. The
- * token will be passed to any callback that has been set.
- * @throw exception for non security related problems
- * @throw security_exception for security related problems
- */
- token_ptr connect() override;
- /**
- * Connects to an MQTT server using the provided connect options.
- * @param options a set of connection parameters that override the
- * defaults.
- * @return token used to track and wait for the connect to complete. The
- * token will be passed to any callback that has been set.
- * @throw exception for non security related problems
- * @throw security_exception for security related problems
- */
- token_ptr connect(connect_options options) override;
- /**
- * Connects to an MQTT server using the specified options.
- * @param options a set of connection parameters that override the
- * defaults.
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb callback listener that will be notified when the connect
- * completes.
- * @return token used to track and wait for the connect to complete. The
- * token will be passed to any callback that has been set.
- * @throw exception for non security related problems
- * @throw security_exception for security related problems
- */
- token_ptr connect(
- connect_options options, void* userContext, iaction_listener& cb
- ) override;
- /**
- *
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb callback listener that will be notified when the connect
- * completes.
- * @return token used to track and wait for the connect to complete. The
- * token will be passed to any callback that has been set.
- * @throw exception for non security related problems
- * @throw security_exception for security related problems
- */
- token_ptr connect(void* userContext, iaction_listener& cb) override {
- return connect(connect_options{}, userContext, cb);
- }
- /**
- * Reconnects the client using options from the previous connect.
- * The client must have previously called connect() for this to work.
- * @return token used to track the progress of the reconnect.
- */
- token_ptr reconnect() override;
- /**
- * Disconnects from the server.
- * @return token used to track and wait for the disconnect to complete.
- * The token will be passed to any callback that has been set.
- * @throw exception for problems encountered while disconnecting
- */
- token_ptr disconnect() override { return disconnect(disconnect_options()); }
- /**
- * Disconnects from the server.
- * @param opts Options for disconnecting.
- * @return token used to track and wait for the disconnect to complete.
- * The token will be passed to any callback that has been set.
- * @throw exception for problems encountered while disconnecting
- */
- token_ptr disconnect(disconnect_options opts) override;
- /**
- * Disconnects from the server.
- * @param timeout the amount of time in milliseconds to allow for
- * existing work to finish before disconnecting. A value
- * of zero or less means the client will not quiesce.
- * @return Token used to track and wait for disconnect to complete. The
- * token will be passed to the callback methods if a callback is
- * set.
- * @throw exception for problems encountered while disconnecting
- */
- token_ptr disconnect(int timeout) override {
- return disconnect(disconnect_options(timeout));
- }
- /**
- * Disconnects from the server.
- * @param timeout the amount of time in milliseconds to allow for
- * existing work to finish before disconnecting. A value
- * of zero or less means the client will not quiesce.
- * @return Token used to track and wait for disconnect to complete. The
- * token will be passed to the callback methods if a callback is
- * set.
- * @throw exception for problems encountered while disconnecting
- */
- template <class Rep, class Period>
- token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
- // TODO: check range
- return disconnect((int)to_milliseconds_count(timeout));
- }
- /**
- * Disconnects from the server.
- * @param timeout the amount of time in milliseconds to allow for
- * existing work to finish before disconnecting. A value
- * of zero or less means the client will not quiesce.
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb callback listener that will be notified when the disconnect
- * completes.
- * @return token_ptr Token used to track and wait for disconnect to
- * complete. The token will be passed to the callback methods if
- * a callback is set.
- * @throw exception for problems encountered while disconnecting
- */
- token_ptr disconnect(int timeout, void* userContext, iaction_listener& cb) override;
- /**
- * Disconnects from the server.
- * @param timeout the amount of time in milliseconds to allow for
- * existing work to finish before disconnecting. A value
- * of zero or less means the client will not quiesce.
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb callback listener that will be notified when the disconnect
- * completes.
- * @return token_ptr Token used to track and wait for disconnect to
- * complete. The token will be passed to the callback methods if
- * a callback is set.
- * @throw exception for problems encountered while disconnecting
- */
- template <class Rep, class Period>
- token_ptr disconnect(
- const std::chrono::duration<Rep, Period>& timeout, void* userContext,
- iaction_listener& cb
- ) {
- // TODO: check range
- return disconnect((int)to_milliseconds_count(timeout), userContext, cb);
- }
- /**
- * Disconnects from the server.
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb callback listener that will be notified when the disconnect
- * completes.
- * @return token_ptr Token used to track and wait for disconnect to
- * complete. The token will be passed to the callback methods if
- * a callback is set.
- * @throw exception for problems encountered while disconnecting
- */
- token_ptr disconnect(void* userContext, iaction_listener& cb) override {
- return disconnect(0L, userContext, cb);
- }
- /**
- * Returns the delivery token for the specified message ID.
- * @return delivery_token
- */
- delivery_token_ptr get_pending_delivery_token(int msgID) const override;
- /**
- * Returns the delivery tokens for any outstanding publish operations.
- * @return delivery_token[]
- */
- std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
- /**
- * Returns the client ID used by this client.
- * @return The client ID used by this client.
- */
- string get_client_id() const override { return createOpts_.get_client_id(); }
- /**
- * Returns the address of the server used by this client.
- * @return The server's address, as a URI String.
- */
- string get_server_uri() const override { return createOpts_.get_server_uri(); }
- /**
- * Gets the MQTT version used by the client.
- * @return The MQTT version used by the client
- * @li MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if
- * that fails, fall back to 3.1
- * @li MQTTVERSION_3_1 (3) = only try version 3.1
- * @li MQTTVERSION_3_1_1 (4) = only try version 3.1.1
- * @li MQTTVERSION_5 (5) = only try version 5
- */
- int mqtt_version() const noexcept { return mqttVersion_; }
- /**
- * Gets a copy of the connect options that were last used in a request
- * to connect to the broker.
- * @returns The last connect options that were used.
- */
- connect_options get_connect_options() const {
- guard g(lock_);
- return connOpts_;
- }
- /**
- * Determines if this client is currently connected to the server.
- * @return true if connected, false otherwise.
- */
- bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
- /**
- * Publishes a message to a topic on the server
- * @param topic The topic to deliver the message to
- * @param payload the bytes to use as the message payload
- * @param n the number of bytes in the payload
- * @param qos the Quality of Service to deliver the message at. Valid
- * values are 0, 1 or 2.
- * @param retained whether or not this message should be retained by the
- * server.
- * @return token used to track and wait for the publish to complete. The
- * token will be passed to callback methods if set.
- */
- delivery_token_ptr publish(
- string_ref topic, const void* payload, size_t n, int qos, bool retained,
- const properties& props = properties()
- ) override;
- /**
- * Publishes a message to a topic on the server
- * @param topic The topic to deliver the message to
- * @param payload the bytes to use as the message payload
- * @param n the number of bytes in the payload
- * @return token used to track and wait for the publish to complete. The
- * token will be passed to callback methods if set.
- */
- delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
- return publish(
- std::move(topic), payload, n, message::DFLT_QOS, message::DFLT_RETAINED
- );
- }
- /**
- * Publishes a message to a topic on the server
- * @param topic The topic to deliver the message to
- * @param payload the bytes to use as the message payload
- * @param qos the Quality of Service to deliver the message at. Valid
- * values are 0, 1 or 2.
- * @param retained whether or not this message should be retained by the
- * server.
- * @return token used to track and wait for the publish to complete. The
- * token will be passed to callback methods if set.
- */
- delivery_token_ptr publish(
- string_ref topic, binary_ref payload, int qos, bool retained,
- const properties& props = properties()
- ) override;
- /**
- * Publishes a message to a topic on the server
- * @param topic The topic to deliver the message to
- * @param payload the bytes to use as the message payload
- * @return token used to track and wait for the publish to complete. The
- * token will be passed to callback methods if set.
- */
- delivery_token_ptr publish(string_ref topic, binary_ref payload) override {
- return publish(
- std::move(topic), std::move(payload), message::DFLT_QOS, message::DFLT_RETAINED
- );
- }
- /**
- * Publishes a message to a topic on the server
- * @param topic The topic to deliver the message to
- * @param payload the bytes to use as the message payload
- * @param n the number of bytes in the payload
- * @param qos the Quality of Service to deliver the message at. Valid
- * values are 0, 1 or 2.
- * @param retained whether or not this message should be retained by the
- * server.
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb Listener callback object
- * @return token used to track and wait for the publish to complete. The
- * token will be passed to callback methods if set.
- */
- delivery_token_ptr publish(
- string_ref topic, const void* payload, size_t n, int qos, bool retained,
- void* userContext, iaction_listener& cb
- ) override;
- /**
- * Publishes a message to a topic on the server Takes an Message
- * message and delivers it to the server at the requested quality of
- * service.
- * @param msg the message to deliver to the server
- * @return token used to track and wait for the publish to complete. The
- * token will be passed to callback methods if set.
- */
- delivery_token_ptr publish(const_message_ptr msg) override;
- /**
- * Publishes a message to a topic on the server.
- * @param msg the message to deliver to the server
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb callback optional listener that will be notified when message
- * delivery has completed to the requested quality of
- * service
- * @return token used to track and wait for the publish to complete. The
- * token will be passed to callback methods if set.
- */
- delivery_token_ptr publish(
- const_message_ptr msg, void* userContext, iaction_listener& cb
- ) override;
- /**
- * Subscribe to a topic, which may include wildcards.
- * @param topicFilter the topic to subscribe to, which can include
- * wildcards.
- * @param qos The quality of service for the subscription
- * @param opts The MQTT v5 subscribe options for the topic
- * @param props The MQTT v5 properties.
- * @return token used to track and wait for the subscribe to complete.
- * The token will be passed to callback methods if set.
- */
- token_ptr subscribe(
- const string& topicFilter, int qos,
- const subscribe_options& opts = subscribe_options(),
- const properties& props = properties()
- ) override;
- /**
- * Subscribe to a topic, which may include wildcards.
- * @param topicFilter the topic to subscribe to, which can include
- * wildcards.
- * @param qos The maximum quality of service at which to subscribe.
- * Messages published at a lower quality of service will be
- * received at the published QoS. Messages published at a
- * higher quality of service will be received using the QoS
- * specified on the subscribe.
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb listener that will be notified when subscribe has completed
- * @param opts The MQTT v5 subscribe options for the topic
- * @param props The MQTT v5 properties.
- * @return token used to track and wait for the subscribe to complete.
- * The token will be passed to callback methods if set.
- */
- token_ptr subscribe(
- const string& topicFilter, int qos, void* userContext, iaction_listener& cb,
- const subscribe_options& opts = subscribe_options(),
- const properties& props = properties()
- ) override;
- /**
- * Subscribe to multiple topics, each of which may include wildcards.
- * @param topicFilters The collection of topic filters to subscribe to,
- * any of which can include wildcards
- * @param qos The maximum quality of service at which to subscribe.
- * Messages published at a lower quality of service will be
- * received at the published QoS. Messages published at a
- * higher quality of service will be received using the QoS
- * specified on the subscribe.
- * @param opts The MQTT v5 subscribe options (one for each topic)
- * @param props The MQTT v5 properties.
- * @return token used to track and wait for the subscribe to complete.
- * The token will be passed to callback methods if set.
- */
- token_ptr subscribe(
- const_string_collection_ptr topicFilters, const qos_collection& qos,
- const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
- const properties& props = properties()
- ) override;
- /**
- * Subscribes to multiple topics, each of which may include wildcards.
- * @param topicFilters The collection of topic filters to subscribe to,
- * any of which can include wildcards
- * @param qos The maximum quality of service at which to subscribe.
- * Messages published at a lower quality of service will be
- * received at the published QoS. Messages published at a
- * higher quality of service will be received using the QoS
- * specified on the subscribe.
- * @param userContext Optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb listener that will be notified when subscribe has completed
- * @param opts The MQTT v5 subscribe options (one for each topic)
- * @param props The MQTT v5 properties.
- * @return token used to track and wait for the subscribe to complete.
- * The token will be passed to callback methods if set.
- */
- token_ptr subscribe(
- const_string_collection_ptr topicFilters, const qos_collection& qos,
- void* userContext, iaction_listener& cb,
- const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
- const properties& props = properties()
- ) override;
- /**
- * Requests the server unsubscribe the client from a topic.
- * @param topicFilter The topic to unsubscribe from. It must match a
- * topicFilter specified on an earlier subscribe.
- * @param props The MQTT v5 properties.
- * @return token Used to track and wait for the unsubscribe to complete.
- * The token will be passed to callback methods if set.
- */
- token_ptr unsubscribe(
- const string& topicFilter, const properties& props = properties()
- ) override;
- /**
- * Requests the server unsubscribe the client from one or more topics.
- * @param topicFilters One or more topics to unsubscribe from. Each
- * topicFilter must match one specified on an
- * earlier subscribe.
- * @param props The MQTT v5 properties.
- * @return token used to track and wait for the unsubscribe to complete.
- * The token will be passed to callback methods if set.
- */
- token_ptr unsubscribe(
- const_string_collection_ptr topicFilters, const properties& props = properties()
- ) override;
- /**
- * Requests the server unsubscribe the client from one or more topics.
- * @param topicFilters One or more topics to unsubscribe from. Each
- * topicFilter must match one specified on an
- * earlier subscribe.
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb listener that will be notified when unsubscribe has
- * completed
- * @param props The MQTT v5 properties.
- * @return token used to track and wait for the unsubscribe to complete.
- * The token will be passed to callback methods if set.
- */
- token_ptr unsubscribe(
- const_string_collection_ptr topicFilters, void* userContext, iaction_listener& cb,
- const properties& props = properties()
- ) override;
- /**
- * Requests the server unsubscribe the client from a topics.
- * @param topicFilter the topic to unsubscribe from. It must match a
- * topicFilter specified on an earlier subscribe.
- * @param userContext optional object used to pass context to the
- * callback. Use @em nullptr if not required.
- * @param cb listener that will be notified when unsubscribe has
- * completed
- * @param props The MQTT v5 properties.
- * @return token used to track and wait for the unsubscribe to complete.
- * The token will be passed to callback methods if set.
- */
- token_ptr unsubscribe(
- const string& topicFilter, void* userContext, iaction_listener& cb,
- const properties& props = properties()
- ) override;
- /**
- * Start consuming messages.
- *
- * This initializes the client to receive messages through a queue that
- * can be read synchronously.
- *
- * Normally this should be called _before_ connecting the client to the
- * broker, in order to have the consumer queue in place in the event
- * that the immediately starts sending messages (such as any retained
- * messages) while the client is still in the context of the connect
- * call.
- *
- * This _must_ also be called before calling any 'consume_message' or
- * "'consume_event' methods.
- *
- * Internally, this just creates a thread-safe queue for `mqtt::event`
- * objects, then hooks into the message and state-change callback to
- * push events into the queue in the order received.
- */
- void start_consuming() override;
- /**
- * Stop consuming messages.
- *
- * This shuts down the internal callback and closes the internal
- * consumer queue. Any remaining messages and events can be read until
- * the queue is emptied, but nothing further will be added to it.
- * This will also wake up any thread waiting on the queue.
- */
- void stop_consuming() override;
- /**
- * This clears the consumer queue, discarding any pending event.
- */
- void clear_consumer() override {
- if (que_)
- que_->clear();
- }
- /**
- * Determines if the consumer queue has been closed.
- * Once closed, any events in the queue can still be read, but no new
- * events can be added to it.
- * @return @true if the consumer queue has been closed, @false
- * otherwise.
- */
- bool consumer_closed() noexcept override { return !que_ || que_->closed(); }
- /**
- * Determines if the consumer queue is "done" (closed and empty).
- * Once the queue is done, no more events can be added or removed from
- * the queue.
- * @return @true if the consumer queue is closed and empty, @false
- * otherwise.
- */
- bool consumer_done() noexcept override { return !que_ || que_->done(); }
- /**
- * Gets the number of events available for immediate consumption.
- * Note that this retrieves the number of "raw" events, not messages,
- * e.g. may include a connected_event which is not returned by try_consume_message().
- * When polling the queue from multiple threads, prefer using try_consume_event(),
- * as the event count may change between checking the size and actual retrieval.
- * @return the number of events in the queue.
- */
- std::size_t consumer_queue_size() const override { return (que_) ? que_->size() : 0; }
- /**
- * Read the next client event from the queue.
- * This blocks until a new message arrives.
- * If the consumer queue is closed, this returns a shutdown event.
- * @return The client event.
- */
- event consume_event() override;
- /**
- * Try to read the next client event without blocking.
- * @param evt Pointer to the value to receive the event
- * @return @em true if an event was read, @em false if no
- * event was available.
- */
- bool try_consume_event(event* evt) override;
- /**
- * Waits a limited time for a client event to appear.
- * @param evt Pointer to the value to receive the event.
- * @param relTime The maximum amount of time to wait for an event.
- * @return @em true if an event was read, @em false if a timeout
- * occurred.
- */
- template <typename Rep, class Period>
- bool try_consume_event_for(
- event* evt, const std::chrono::duration<Rep, Period>& relTime
- ) {
- if (!que_)
- throw mqtt::exception(-1, "Consumer not started");
- try {
- return que_->try_get_for(evt, relTime);
- }
- catch (queue_closed&) {
- *evt = event{shutdown_event{}};
- return true;
- }
- }
- /**
- * Waits a limited time for a client event to arrive.
- * @param relTime The maximum amount of time to wait for an event.
- * @return The event that was received. It will contain empty message on
- * timeout.
- */
- template <typename Rep, class Period>
- event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
- event evt;
- try {
- que_->try_get_for(&evt, relTime);
- }
- catch (queue_closed&) {
- evt = event{shutdown_event{}};
- }
- return evt;
- }
- /**
- * Waits until a specific time for a client event to appear.
- * @param evt Pointer to the value to receive the event.
- * @param absTime The time point to wait until, before timing out.
- * @return @em true if an event was recceived, @em false if a timeout
- * occurred.
- */
- template <class Clock, class Duration>
- bool try_consume_event_until(
- event* evt, const std::chrono::time_point<Clock, Duration>& absTime
- ) {
- if (!que_)
- throw mqtt::exception(-1, "Consumer not started");
- try {
- return que_->try_get_until(evt, absTime);
- }
- catch (queue_closed&) {
- *evt = event{shutdown_event{}};
- return true;
- }
- }
- /**
- * Waits until a specific time for a client event to appear.
- * @param absTime The time point to wait until, before timing out.
- * @return The event that was received. It will contain empty message on
- * timeout.
- */
- template <class Clock, class Duration>
- event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime) {
- event evt;
- try {
- que_->try_get_until(&evt, absTime);
- }
- catch (queue_closed&) {
- evt = event{shutdown_event{}};
- }
- return evt;
- }
- /**
- * Read the next message from the queue.
- * This blocks until a new message arrives or until a disconnect or
- * shutdown occurs.
- * @return The message and topic.
- */
- const_message_ptr consume_message() override;
- /**
- * Try to read the next message from the queue without blocking.
- * @param msg Pointer to the value to receive the message
- * @return @em true is a message was read, @em false if no message was
- * available.
- */
- bool try_consume_message(const_message_ptr* msg) override;
- /**
- * Waits a limited time for a message to arrive.
- * @param msg Pointer to the value to receive the message
- * @param relTime The maximum amount of time to wait for a message.
- * @return @em true if a message was read, @em false if a timeout
- * occurred.
- */
- template <typename Rep, class Period>
- bool try_consume_message_for(
- const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
- ) {
- if (!que_)
- throw mqtt::exception(-1, "Consumer not started");
- event evt;
- while (true) {
- if (!try_consume_event_for(&evt, relTime))
- return false;
- if (const auto* pval = evt.get_message_if()) {
- *msg = std::move(*pval);
- break;
- }
- if (evt.is_any_disconnect()) {
- *msg = const_message_ptr{};
- break;
- }
- }
- return true;
- }
- /**
- * Waits a limited time for a message to arrive.
- * @param relTime The maximum amount of time to wait for a message.
- * @return A shared pointer to the message that was received. It will be
- * empty on timeout.
- */
- template <typename Rep, class Period>
- const_message_ptr try_consume_message_for(
- const std::chrono::duration<Rep, Period>& relTime
- ) {
- const_message_ptr msg;
- this->try_consume_message_for(&msg, relTime);
- return msg;
- }
- /**
- * Waits until a specific time for a message to appear.
- * @param msg Pointer to the value to receive the message
- * @param absTime The time point to wait until, before timing out.
- * @return @em true if a message was read, @em false if a timeout
- * occurred.
- */
- template <class Clock, class Duration>
- bool try_consume_message_until(
- const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
- ) {
- if (!que_)
- throw mqtt::exception(-1, "Consumer not started");
- event evt;
- while (true) {
- if (!try_consume_event_until(&evt, absTime))
- return false;
- if (const auto* pval = evt.get_message_if()) {
- *msg = std::move(*pval);
- break;
- }
- if (!evt.is_any_disconnect()) {
- *msg = const_message_ptr{};
- break;
- }
- }
- return true;
- }
- /**
- * Waits until a specific time for a message to appear.
- * @param absTime The time point to wait until, before timing out.
- * @return The message, if read, an empty pointer if not.
- */
- template <class Clock, class Duration>
- const_message_ptr try_consume_message_until(
- const std::chrono::time_point<Clock, Duration>& absTime
- ) {
- const_message_ptr msg;
- this->try_consume_message_until(&msg, absTime);
- return msg;
- }
- };
- /** Smart/shared pointer to an asynchronous MQTT client object */
- using async_client_ptr = async_client::ptr_t;
- /////////////////////////////////////////////////////////////////////////////
- } // namespace mqtt
- #endif // __mqtt_async_client_h
|