paho_cs_sub.c 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2022 IBM Corp., and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial contribution
  15. * Ian Craggs - change delimiter option from char to string
  16. * Guilherme Maciel Ferreira - add keep alive option
  17. * Ian Craggs - add full capability
  18. *******************************************************************************/
  19. #include "MQTTClient.h"
  20. #include "MQTTClientPersistence.h"
  21. #include "pubsub_opts.h"
  22. #include <stdio.h>
  23. #include <signal.h>
  24. #include <string.h>
  25. #include <stdlib.h>
  26. #if defined(_WIN32)
  27. #define sleep Sleep
  28. #else
  29. #include <sys/time.h>
  30. #endif
  31. volatile int toStop = 0;
  32. struct pubsub_opts opts =
  33. {
  34. 0, 0, 0, 0, "\n", 100, /* debug/app options */
  35. NULL, NULL, 1, 0, 0, /* message options */
  36. MQTTVERSION_DEFAULT, NULL, "paho-cs-sub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
  37. NULL, NULL, 0, 0, /* will options */
  38. 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
  39. 0, {NULL, NULL}, /* MQTT V5 options */
  40. NULL, NULL, /* HTTP and HTTPS proxies */
  41. };
  42. int myconnect(MQTTClient client)
  43. {
  44. MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  45. MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
  46. MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer;
  47. int rc = 0;
  48. if (opts.verbose)
  49. printf("Connecting\n");
  50. if (opts.MQTTVersion == MQTTVERSION_5)
  51. {
  52. MQTTClient_connectOptions conn_opts5 = MQTTClient_connectOptions_initializer5;
  53. conn_opts = conn_opts5;
  54. }
  55. conn_opts.keepAliveInterval = opts.keepalive;
  56. conn_opts.username = opts.username;
  57. conn_opts.password = opts.password;
  58. conn_opts.MQTTVersion = opts.MQTTVersion;
  59. conn_opts.httpProxy = opts.http_proxy;
  60. conn_opts.httpsProxy = opts.https_proxy;
  61. if (opts.will_topic) /* will options */
  62. {
  63. will_opts.message = opts.will_payload;
  64. will_opts.topicName = opts.will_topic;
  65. will_opts.qos = opts.will_qos;
  66. will_opts.retained = opts.will_retain;
  67. conn_opts.will = &will_opts;
  68. }
  69. if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
  70. strncmp(opts.connection, "wss://", 6) == 0))
  71. {
  72. if (opts.insecure)
  73. ssl_opts.verify = 0;
  74. else
  75. ssl_opts.verify = 1;
  76. ssl_opts.CApath = opts.capath;
  77. ssl_opts.keyStore = opts.cert;
  78. ssl_opts.trustStore = opts.cafile;
  79. ssl_opts.privateKey = opts.key;
  80. ssl_opts.privateKeyPassword = opts.keypass;
  81. ssl_opts.enabledCipherSuites = opts.ciphers;
  82. conn_opts.ssl = &ssl_opts;
  83. }
  84. if (opts.MQTTVersion == MQTTVERSION_5)
  85. {
  86. MQTTProperties props = MQTTProperties_initializer;
  87. MQTTProperties willProps = MQTTProperties_initializer;
  88. MQTTResponse response = MQTTResponse_initializer;
  89. conn_opts.cleanstart = 1;
  90. response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
  91. rc = response.reasonCode;
  92. MQTTResponse_free(response);
  93. }
  94. else
  95. {
  96. conn_opts.cleansession = 1;
  97. rc = MQTTClient_connect(client, &conn_opts);
  98. }
  99. if (opts.verbose && rc == MQTTCLIENT_SUCCESS)
  100. fprintf(stderr, "Connected\n");
  101. else if (rc != MQTTCLIENT_SUCCESS && !opts.quiet)
  102. fprintf(stderr, "Connect failed return code: %s\n", MQTTClient_strerror(rc));
  103. return rc;
  104. }
  105. void cfinish(int sig)
  106. {
  107. signal(SIGINT, NULL);
  108. toStop = 1;
  109. }
  110. void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
  111. {
  112. fprintf(stderr, "Trace : %d, %s\n", level, message);
  113. }
  114. int main(int argc, char** argv)
  115. {
  116. MQTTClient client;
  117. MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  118. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  119. int rc = 0;
  120. char* url;
  121. const char* version = NULL;
  122. #if !defined(_WIN32)
  123. struct sigaction sa;
  124. #endif
  125. const char* program_name = "paho_cs_sub";
  126. MQTTClient_nameValue* infos = MQTTClient_getVersionInfo();
  127. if (argc < 2)
  128. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  129. if (getopts(argc, argv, &opts) != 0)
  130. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  131. if (strchr(opts.topic, '#') || strchr(opts.topic, '+'))
  132. opts.verbose = 1;
  133. if (opts.connection)
  134. url = opts.connection;
  135. else
  136. {
  137. url = malloc(100);
  138. sprintf(url, "%s:%s", opts.host, opts.port);
  139. }
  140. if (opts.verbose)
  141. printf("URL is %s\n", url);
  142. if (opts.tracelevel > 0)
  143. {
  144. MQTTClient_setTraceCallback(trace_callback);
  145. MQTTClient_setTraceLevel(opts.tracelevel);
  146. }
  147. if (opts.MQTTVersion >= MQTTVERSION_5)
  148. createOpts.MQTTVersion = MQTTVERSION_5;
  149. rc = MQTTClient_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE,
  150. NULL, &createOpts);
  151. if (rc != MQTTCLIENT_SUCCESS)
  152. {
  153. if (!opts.quiet)
  154. fprintf(stderr, "Failed to create client, return code: %s\n", MQTTClient_strerror(rc));
  155. exit(EXIT_FAILURE);
  156. }
  157. #if defined(_WIN32)
  158. signal(SIGINT, cfinish);
  159. signal(SIGTERM, cfinish);
  160. #else
  161. memset(&sa, 0, sizeof(struct sigaction));
  162. sa.sa_handler = cfinish;
  163. sa.sa_flags = 0;
  164. sigaction(SIGINT, &sa, NULL);
  165. sigaction(SIGTERM, &sa, NULL);
  166. #endif
  167. if (myconnect(client) != MQTTCLIENT_SUCCESS)
  168. goto exit;
  169. if (opts.MQTTVersion >= MQTTVERSION_5)
  170. {
  171. MQTTResponse response = MQTTClient_subscribe5(client, opts.topic, opts.qos, NULL, NULL);
  172. rc = response.reasonCode;
  173. MQTTResponse_free(response);
  174. }
  175. else
  176. rc = MQTTClient_subscribe(client, opts.topic, opts.qos);
  177. if (rc != MQTTCLIENT_SUCCESS && rc != opts.qos)
  178. {
  179. if (!opts.quiet)
  180. fprintf(stderr, "Error %d subscribing to topic %s\n", rc, opts.topic);
  181. goto exit;
  182. }
  183. while (!toStop)
  184. {
  185. char* topicName = NULL;
  186. int topicLen;
  187. MQTTClient_message* message = NULL;
  188. rc = MQTTClient_receive(client, &topicName, &topicLen, &message, 1000);
  189. if (rc == MQTTCLIENT_DISCONNECTED)
  190. myconnect(client);
  191. else if (message)
  192. {
  193. size_t delimlen = 0;
  194. if (opts.verbose)
  195. printf("%s\t", topicName);
  196. if (opts.delimiter)
  197. delimlen = strlen(opts.delimiter);
  198. if (opts.delimiter == NULL || (message->payloadlen > delimlen &&
  199. strncmp(opts.delimiter, &((char*)message->payload)[message->payloadlen - delimlen], delimlen) == 0))
  200. printf("%.*s", message->payloadlen, (char*)message->payload);
  201. else
  202. printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
  203. if (message->struct_version == 1 && opts.verbose)
  204. logProperties(&message->properties);
  205. fflush(stdout);
  206. MQTTClient_freeMessage(&message);
  207. MQTTClient_free(topicName);
  208. }
  209. }
  210. exit:
  211. MQTTClient_disconnect(client, 0);
  212. MQTTClient_destroy(&client);
  213. return EXIT_SUCCESS;
  214. }