paho_cs_pub.c 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2022 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial contribution
  15. * Ian Craggs - add full capability
  16. *******************************************************************************/
  17. #include "MQTTClient.h"
  18. #include "MQTTClientPersistence.h"
  19. #include "pubsub_opts.h"
  20. #include <stdio.h>
  21. #include <signal.h>
  22. #include <string.h>
  23. #include <stdlib.h>
  24. #if defined(_WIN32)
  25. #define sleep Sleep
  26. #else
  27. #include <sys/time.h>
  28. #endif
  29. volatile int toStop = 0;
  30. void cfinish(int sig)
  31. {
  32. signal(SIGINT, NULL);
  33. toStop = 1;
  34. }
  35. struct pubsub_opts opts =
  36. {
  37. 1, 0, 0, 0, "\n", 100, /* debug/app options */
  38. NULL, NULL, 1, 0, 0, /* message options */
  39. MQTTVERSION_DEFAULT, NULL, "paho-cs-pub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
  40. NULL, NULL, 0, 0, /* will options */
  41. 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
  42. 0, {NULL, NULL}, /* MQTT V5 options */
  43. NULL, NULL, /* HTTP and HTTPS proxies */
  44. };
  45. int myconnect(MQTTClient client)
  46. {
  47. MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  48. MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer;
  49. MQTTClient_willOptions will_opts = MQTTClient_willOptions_initializer;
  50. int rc = 0;
  51. if (opts.verbose)
  52. printf("Connecting\n");
  53. if (opts.MQTTVersion == MQTTVERSION_5)
  54. {
  55. MQTTClient_connectOptions conn_opts5 = MQTTClient_connectOptions_initializer5;
  56. conn_opts = conn_opts5;
  57. }
  58. conn_opts.keepAliveInterval = opts.keepalive;
  59. conn_opts.username = opts.username;
  60. conn_opts.password = opts.password;
  61. conn_opts.MQTTVersion = opts.MQTTVersion;
  62. conn_opts.httpProxy = opts.http_proxy;
  63. conn_opts.httpsProxy = opts.https_proxy;
  64. if (opts.will_topic) /* will options */
  65. {
  66. will_opts.message = opts.will_payload;
  67. will_opts.topicName = opts.will_topic;
  68. will_opts.qos = opts.will_qos;
  69. will_opts.retained = opts.will_retain;
  70. conn_opts.will = &will_opts;
  71. }
  72. if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
  73. strncmp(opts.connection, "wss://", 6) == 0))
  74. {
  75. if (opts.insecure)
  76. ssl_opts.verify = 0;
  77. else
  78. ssl_opts.verify = 1;
  79. ssl_opts.CApath = opts.capath;
  80. ssl_opts.keyStore = opts.cert;
  81. ssl_opts.trustStore = opts.cafile;
  82. ssl_opts.privateKey = opts.key;
  83. ssl_opts.privateKeyPassword = opts.keypass;
  84. ssl_opts.enabledCipherSuites = opts.ciphers;
  85. conn_opts.ssl = &ssl_opts;
  86. }
  87. if (opts.MQTTVersion == MQTTVERSION_5)
  88. {
  89. MQTTProperties props = MQTTProperties_initializer;
  90. MQTTProperties willProps = MQTTProperties_initializer;
  91. MQTTResponse response = MQTTResponse_initializer;
  92. conn_opts.cleanstart = 1;
  93. response = MQTTClient_connect5(client, &conn_opts, &props, &willProps);
  94. rc = response.reasonCode;
  95. MQTTResponse_free(response);
  96. }
  97. else
  98. {
  99. conn_opts.cleansession = 1;
  100. rc = MQTTClient_connect(client, &conn_opts);
  101. }
  102. if (opts.verbose && rc == MQTTCLIENT_SUCCESS)
  103. printf("Connected\n");
  104. else if (rc != MQTTCLIENT_SUCCESS && !opts.quiet)
  105. fprintf(stderr, "Connect failed return code: %s\n", MQTTClient_strerror(rc));
  106. return rc;
  107. }
  108. int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
  109. {
  110. /* not expecting any messages */
  111. return 1;
  112. }
  113. void connectionLost(void* context, char* reason)
  114. {
  115. MQTTClient client = (MQTTClient)context;
  116. if (opts.verbose)
  117. printf("ConnectionLost, reconnecting\n");
  118. myconnect(client);
  119. }
  120. void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
  121. {
  122. fprintf(stderr, "Trace : %d, %s\n", level, message);
  123. }
  124. int main(int argc, char** argv)
  125. {
  126. MQTTClient client;
  127. MQTTProperties pub_props = MQTTProperties_initializer;
  128. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  129. MQTTClient_deliveryToken last_token;
  130. char* buffer = NULL;
  131. int rc = 0;
  132. char* url;
  133. const char* version = NULL;
  134. #if !defined(_WIN32)
  135. struct sigaction sa;
  136. #endif
  137. const char* program_name = "paho_cs_pub";
  138. MQTTClient_nameValue* infos = MQTTClient_getVersionInfo();
  139. if (argc < 2)
  140. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  141. if (getopts(argc, argv, &opts) != 0)
  142. usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
  143. if (opts.connection)
  144. url = opts.connection;
  145. else
  146. {
  147. url = malloc(100);
  148. sprintf(url, "%s:%s", opts.host, opts.port);
  149. }
  150. if (opts.verbose)
  151. printf("URL is %s\n", url);
  152. if (opts.tracelevel > 0)
  153. {
  154. MQTTClient_setTraceCallback(trace_callback);
  155. MQTTClient_setTraceLevel(opts.tracelevel);
  156. }
  157. if (opts.MQTTVersion >= MQTTVERSION_5)
  158. createOpts.MQTTVersion = MQTTVERSION_5;
  159. rc = MQTTClient_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE,
  160. NULL, &createOpts);
  161. if (rc != MQTTCLIENT_SUCCESS)
  162. {
  163. if (!opts.quiet)
  164. fprintf(stderr, "Failed to create client, return code: %s\n", MQTTClient_strerror(rc));
  165. exit(EXIT_FAILURE);
  166. }
  167. #if defined(_WIN32)
  168. signal(SIGINT, cfinish);
  169. signal(SIGTERM, cfinish);
  170. #else
  171. memset(&sa, 0, sizeof(struct sigaction));
  172. sa.sa_handler = cfinish;
  173. sa.sa_flags = 0;
  174. sigaction(SIGINT, &sa, NULL);
  175. sigaction(SIGTERM, &sa, NULL);
  176. #endif
  177. rc = MQTTClient_setCallbacks(client, client, connectionLost, messageArrived, NULL);
  178. if (rc != MQTTCLIENT_SUCCESS)
  179. {
  180. if (!opts.quiet)
  181. fprintf(stderr, "Failed to set callbacks, return code: %s\n", MQTTClient_strerror(rc));
  182. exit(EXIT_FAILURE);
  183. }
  184. if (myconnect(client) != MQTTCLIENT_SUCCESS)
  185. goto exit;
  186. if (opts.MQTTVersion >= MQTTVERSION_5)
  187. {
  188. MQTTProperty property;
  189. if (opts.message_expiry > 0)
  190. {
  191. property.identifier = MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL;
  192. property.value.integer4 = opts.message_expiry;
  193. MQTTProperties_add(&pub_props, &property);
  194. }
  195. if (opts.user_property.name)
  196. {
  197. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  198. property.value.data.data = opts.user_property.name;
  199. property.value.data.len = (int)strlen(opts.user_property.name);
  200. property.value.value.data = opts.user_property.value;
  201. property.value.value.len = (int)strlen(opts.user_property.value);
  202. MQTTProperties_add(&pub_props, &property);
  203. }
  204. }
  205. while (!toStop)
  206. {
  207. int data_len = 0;
  208. int delim_len = 0;
  209. if (opts.stdin_lines)
  210. {
  211. buffer = malloc(opts.maxdatalen);
  212. delim_len = (int)strlen(opts.delimiter);
  213. do
  214. {
  215. int c = getchar();
  216. if (c < 0)
  217. goto exit;
  218. buffer[data_len++] = c;
  219. if (data_len > delim_len)
  220. {
  221. if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
  222. break;
  223. }
  224. } while (data_len < opts.maxdatalen);
  225. }
  226. else if (opts.message)
  227. {
  228. buffer = opts.message;
  229. data_len = (int)strlen(opts.message);
  230. }
  231. else if (opts.filename)
  232. {
  233. buffer = readfile(&data_len, &opts);
  234. if (buffer == NULL)
  235. goto exit;
  236. }
  237. if (opts.verbose)
  238. fprintf(stderr, "Publishing data of length %d\n", data_len);
  239. if (opts.MQTTVersion == MQTTVERSION_5)
  240. {
  241. MQTTResponse response = MQTTResponse_initializer;
  242. response = MQTTClient_publish5(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &pub_props, &last_token);
  243. rc = response.reasonCode;
  244. }
  245. else
  246. rc = MQTTClient_publish(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &last_token);
  247. if (opts.stdin_lines == 0)
  248. break;
  249. if (rc != 0)
  250. {
  251. myconnect(client);
  252. if (opts.MQTTVersion == MQTTVERSION_5)
  253. {
  254. MQTTResponse response = MQTTResponse_initializer;
  255. response = MQTTClient_publish5(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &pub_props, &last_token);
  256. rc = response.reasonCode;
  257. }
  258. else
  259. rc = MQTTClient_publish(client, opts.topic, data_len, buffer, opts.qos, opts.retained, &last_token);
  260. }
  261. if (opts.qos > 0)
  262. MQTTClient_yield();
  263. }
  264. rc = MQTTClient_waitForCompletion(client, last_token, 5000);
  265. exit:
  266. if (opts.filename || opts.stdin_lines)
  267. free(buffer);
  268. if (opts.MQTTVersion == MQTTVERSION_5)
  269. rc = MQTTClient_disconnect5(client, 0, MQTTREASONCODE_SUCCESS, NULL);
  270. else
  271. rc = MQTTClient_disconnect(client, 0);
  272. MQTTClient_destroy(&client);
  273. return EXIT_SUCCESS;
  274. }