paho_c_sub.c 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2020 IBM Corp., and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial contribution
  15. * Ian Craggs - fix for bug 413429 - connectionLost not called
  16. * Guilherme Maciel Ferreira - add keep alive option
  17. * Ian Craggs - add full capability
  18. *******************************************************************************/
  19. #include "MQTTAsync.h"
  20. #include "MQTTClientPersistence.h"
  21. #include "pubsub_opts.h"
  22. #include <stdio.h>
  23. #include <signal.h>
  24. #include <string.h>
  25. #include <stdlib.h>
  26. #if defined(_WIN32)
  27. #include <windows.h>
  28. #define sleep Sleep
  29. #else
  30. #include <sys/time.h>
  31. #include <unistd.h>
  32. #endif
  33. #if defined(_WRS_KERNEL)
  34. #include <OsWrapper.h>
  35. #endif
  36. volatile int finished = 0;
  37. int subscribed = 0;
  38. int disconnected = 0;
  39. void mysleep(int ms)
  40. {
  41. #if defined(_WIN32)
  42. Sleep(ms);
  43. #else
  44. usleep(ms * 1000);
  45. #endif
  46. }
  47. void cfinish(int sig)
  48. {
  49. signal(SIGINT, NULL);
  50. finished = 1;
  51. }
  52. struct pubsub_opts opts =
  53. {
  54. 0, 0, 0, 0, "\n", 100, /* debug/app options */
  55. NULL, NULL, 1, 0, 0, /* message options */
  56. MQTTVERSION_DEFAULT, NULL, "paho-c-sub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
  57. NULL, NULL, 0, 0, /* will options */
  58. 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
  59. 0, {NULL, NULL}, /* MQTT V5 options */
  60. NULL, NULL, /* HTTP and HTTPS proxies */
  61. };
  62. int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
  63. {
  64. size_t delimlen = 0;
  65. if (opts.verbose)
  66. printf("%d %s\t", message->payloadlen, topicName);
  67. if (opts.delimiter)
  68. delimlen = strlen(opts.delimiter);
  69. if (opts.delimiter == NULL || (message->payloadlen > delimlen &&
  70. strncmp(opts.delimiter, &((char*)message->payload)[message->payloadlen - delimlen], delimlen) == 0))
  71. printf("%.*s", message->payloadlen, (char*)message->payload);
  72. else
  73. printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
  74. if (message->struct_version == 1 && opts.verbose)
  75. logProperties(&message->properties);
  76. fflush(stdout);
  77. MQTTAsync_freeMessage(&message);
  78. MQTTAsync_free(topicName);
  79. return 1;
  80. }
  81. void onDisconnect(void* context, MQTTAsync_successData* response)
  82. {
  83. disconnected = 1;
  84. }
  85. void onSubscribe5(void* context, MQTTAsync_successData5* response)
  86. {
  87. subscribed = 1;
  88. }
  89. void onSubscribe(void* context, MQTTAsync_successData* response)
  90. {
  91. subscribed = 1;
  92. }
  93. void onSubscribeFailure5(void* context, MQTTAsync_failureData5* response)
  94. {
  95. if (!opts.quiet)
  96. fprintf(stderr, "Subscribe failed, rc %s reason code %s\n",
  97. MQTTAsync_strerror(response->code),
  98. MQTTReasonCode_toString(response->reasonCode));
  99. finished = 1;
  100. }
  101. void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  102. {
  103. if (!opts.quiet)
  104. fprintf(stderr, "Subscribe failed, rc %s\n",
  105. MQTTAsync_strerror(response->code));
  106. finished = 1;
  107. }
  108. void onConnectFailure5(void* context, MQTTAsync_failureData5* response)
  109. {
  110. if (!opts.quiet)
  111. fprintf(stderr, "Connect failed, rc %s reason code %s\n",
  112. MQTTAsync_strerror(response->code),
  113. MQTTReasonCode_toString(response->reasonCode));
  114. finished = 1;
  115. }
  116. void onConnectFailure(void* context, MQTTAsync_failureData* response)
  117. {
  118. if (!opts.quiet)
  119. fprintf(stderr, "Connect failed, rc %s\n", response ? MQTTAsync_strerror(response->code) : "none");
  120. finished = 1;
  121. }
  122. void onConnect5(void* context, MQTTAsync_successData5* response)
  123. {
  124. MQTTAsync client = (MQTTAsync)context;
  125. MQTTAsync_callOptions copts = MQTTAsync_callOptions_initializer;
  126. int rc;
  127. if (opts.verbose)
  128. printf("Subscribing to topic %s with client %s at QoS %d\n", opts.topic, opts.clientid, opts.qos);
  129. copts.onSuccess5 = onSubscribe5;
  130. copts.onFailure5 = onSubscribeFailure5;
  131. copts.context = client;
  132. if ((rc = MQTTAsync_subscribe(client, opts.topic, opts.qos, &copts)) != MQTTASYNC_SUCCESS)
  133. {
  134. if (!opts.quiet)
  135. fprintf(stderr, "Failed to start subscribe, return code %s\n", MQTTAsync_strerror(rc));
  136. finished = 1;
  137. }
  138. }
  139. void onConnect(void* context, MQTTAsync_successData* response)
  140. {
  141. MQTTAsync client = (MQTTAsync)context;
  142. MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
  143. int rc;
  144. if (opts.verbose)
  145. printf("Subscribing to topic %s with client %s at QoS %d\n", opts.topic, opts.clientid, opts.qos);
  146. ropts.onSuccess = onSubscribe;
  147. ropts.onFailure = onSubscribeFailure;
  148. ropts.context = client;
  149. if ((rc = MQTTAsync_subscribe(client, opts.topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS)
  150. {
  151. if (!opts.quiet)
  152. fprintf(stderr, "Failed to start subscribe, return code %s\n", MQTTAsync_strerror(rc));
  153. finished = 1;
  154. }
  155. }
  156. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  157. void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
  158. {
  159. fprintf(stderr, "Trace : %d, %s\n", level, message);
  160. }
  161. int main(int argc, char** argv)
  162. {
  163. MQTTAsync client;
  164. MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  165. MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
  166. MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
  167. MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
  168. int rc = 0;
  169. char* url = NULL;
  170. const char* version = NULL;
  171. const char* program_name = "paho_c_sub";
  172. MQTTAsync_nameValue* infos = MQTTAsync_getVersionInfo();
  173. #if !defined(_WIN32)
  174. struct sigaction sa;
  175. #endif
  176. if (argc < 2)
  177. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  178. if (getopts(argc, argv, &opts) != 0)
  179. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  180. if (strchr(opts.topic, '#') || strchr(opts.topic, '+'))
  181. opts.verbose = 1;
  182. if (opts.connection)
  183. url = opts.connection;
  184. else
  185. {
  186. url = malloc(100);
  187. sprintf(url, "%s:%s", opts.host, opts.port);
  188. }
  189. if (opts.verbose)
  190. printf("URL is %s\n", url);
  191. if (opts.tracelevel > 0)
  192. {
  193. MQTTAsync_setTraceCallback(trace_callback);
  194. MQTTAsync_setTraceLevel(opts.tracelevel);
  195. }
  196. if (opts.MQTTVersion >= MQTTVERSION_5)
  197. create_opts.MQTTVersion = MQTTVERSION_5;
  198. rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE,
  199. NULL, &create_opts);
  200. if (rc != MQTTASYNC_SUCCESS)
  201. {
  202. if (!opts.quiet)
  203. fprintf(stderr, "Failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
  204. exit(EXIT_FAILURE);
  205. }
  206. rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
  207. if (rc != MQTTASYNC_SUCCESS)
  208. {
  209. if (!opts.quiet)
  210. fprintf(stderr, "Failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc));
  211. exit(EXIT_FAILURE);
  212. }
  213. #if defined(_WIN32)
  214. signal(SIGINT, cfinish);
  215. signal(SIGTERM, cfinish);
  216. #else
  217. memset(&sa, 0, sizeof(struct sigaction));
  218. sa.sa_handler = cfinish;
  219. sa.sa_flags = 0;
  220. sigaction(SIGINT, &sa, NULL);
  221. sigaction(SIGTERM, &sa, NULL);
  222. #endif
  223. if (opts.MQTTVersion == MQTTVERSION_5)
  224. {
  225. MQTTAsync_connectOptions conn_opts5 = MQTTAsync_connectOptions_initializer5;
  226. conn_opts = conn_opts5;
  227. conn_opts.onSuccess5 = onConnect5;
  228. conn_opts.onFailure5 = onConnectFailure5;
  229. conn_opts.cleanstart = 1;
  230. }
  231. else
  232. {
  233. conn_opts.onSuccess = onConnect;
  234. conn_opts.onFailure = onConnectFailure;
  235. conn_opts.cleansession = 1;
  236. }
  237. conn_opts.keepAliveInterval = opts.keepalive;
  238. conn_opts.username = opts.username;
  239. conn_opts.password = opts.password;
  240. conn_opts.MQTTVersion = opts.MQTTVersion;
  241. conn_opts.context = client;
  242. conn_opts.automaticReconnect = 1;
  243. conn_opts.httpProxy = opts.http_proxy;
  244. conn_opts.httpsProxy = opts.https_proxy;
  245. if (opts.will_topic) /* will options */
  246. {
  247. will_opts.message = opts.will_payload;
  248. will_opts.topicName = opts.will_topic;
  249. will_opts.qos = opts.will_qos;
  250. will_opts.retained = opts.will_retain;
  251. conn_opts.will = &will_opts;
  252. }
  253. if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
  254. strncmp(opts.connection, "wss://", 6) == 0))
  255. {
  256. ssl_opts.verify = (opts.insecure) ? 0 : 1;
  257. ssl_opts.CApath = opts.capath;
  258. ssl_opts.keyStore = opts.cert;
  259. ssl_opts.trustStore = opts.cafile;
  260. ssl_opts.privateKey = opts.key;
  261. ssl_opts.privateKeyPassword = opts.keypass;
  262. ssl_opts.enabledCipherSuites = opts.ciphers;
  263. conn_opts.ssl = &ssl_opts;
  264. }
  265. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  266. {
  267. if (!opts.quiet)
  268. fprintf(stderr, "Failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
  269. exit(EXIT_FAILURE);
  270. }
  271. while (!subscribed)
  272. mysleep(100);
  273. if (finished)
  274. goto exit;
  275. while (!finished)
  276. mysleep(100);
  277. disc_opts.onSuccess = onDisconnect;
  278. if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
  279. {
  280. if (!opts.quiet)
  281. fprintf(stderr, "Failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
  282. exit(EXIT_FAILURE);
  283. }
  284. while (!disconnected)
  285. mysleep(100);
  286. exit:
  287. MQTTAsync_destroy(&client);
  288. return EXIT_SUCCESS;
  289. }