MQTTAsync_publish_time.c 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2023 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial contribution
  15. * Frank Pagliughi - loop to repeatedly read and sent time values.
  16. *******************************************************************************/
  17. // This is a somewhat contrived example to show an application that publishes
  18. // continuously, like a data acquisition app might do. In this case, though,
  19. // we don't have a sensor to read, so we use the system time as the number
  20. // of milliseconds since the epoch to simulate a data input.
  21. #include <stdio.h>
  22. #include <stdlib.h>
  23. #include <string.h>
  24. #include <stdint.h>
  25. #include <time.h>
  26. #include "MQTTAsync.h"
  27. #if !defined(_WIN32)
  28. #include <unistd.h>
  29. #else
  30. #include <windows.h>
  31. #include <Minwinbase.h>
  32. #endif
  33. #if defined(_WRS_KERNEL)
  34. #include <OsWrapper.h>
  35. #endif
  36. #if defined(_WIN32) || defined(_WIN64)
  37. #define snprintf _snprintf
  38. #endif
  39. // Better not to flood a public broker. Test against localhost.
  40. #define ADDRESS "mqtt://localhost:1883"
  41. #define CLIENTID "ExampleClientTimePub"
  42. #define TOPIC "data/time"
  43. #define QOS 1
  44. #define TIMEOUT 10000L
  45. #define SAMPLE_PERIOD 10L // in ms
  46. volatile int finished = 0;
  47. volatile int connected = 0;
  48. void connlost(void *context, char *cause)
  49. {
  50. MQTTAsync client = (MQTTAsync)context;
  51. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  52. int rc;
  53. printf("\nConnection lost\n");
  54. if (cause)
  55. printf(" cause: %s\n", cause);
  56. printf("Reconnecting\n");
  57. conn_opts.keepAliveInterval = 20;
  58. conn_opts.cleansession = 1;
  59. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  60. {
  61. printf("Failed to start connect, return code %d\n", rc);
  62. finished = 1;
  63. }
  64. }
  65. void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  66. {
  67. printf("Disconnect failed\n");
  68. finished = 1;
  69. }
  70. void onDisconnect(void* context, MQTTAsync_successData* response)
  71. {
  72. printf("Successful disconnection\n");
  73. finished = 1;
  74. }
  75. void onSendFailure(void* context, MQTTAsync_failureData* response)
  76. {
  77. MQTTAsync client = (MQTTAsync)context;
  78. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  79. int rc;
  80. printf("Message send failed token %d error code %d\n", response->token, response->code);
  81. opts.onSuccess = onDisconnect;
  82. opts.onFailure = onDisconnectFailure;
  83. opts.context = client;
  84. if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
  85. {
  86. printf("Failed to start disconnect, return code %d\n", rc);
  87. exit(EXIT_FAILURE);
  88. }
  89. }
  90. void onSend(void* context, MQTTAsync_successData* response)
  91. {
  92. // This gets called when a message is acknowledged successfully.
  93. }
  94. void onConnectFailure(void* context, MQTTAsync_failureData* response)
  95. {
  96. printf("Connect failed, rc %d\n", response ? response->code : 0);
  97. finished = 1;
  98. }
  99. void onConnect(void* context, MQTTAsync_successData* response)
  100. {
  101. printf("Successful connection\n");
  102. connected = 1;
  103. }
  104. int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
  105. {
  106. /* not expecting any messages */
  107. return 1;
  108. }
  109. int64_t getTime(void)
  110. {
  111. #if defined(_WIN32)
  112. FILETIME ft;
  113. GetSystemTimeAsFileTime(&ft);
  114. return ((((int64_t) ft.dwHighDateTime) << 8) + ft.dwLowDateTime) / 10000;
  115. #else
  116. struct timespec ts;
  117. clock_gettime(CLOCK_REALTIME, &ts);
  118. return ((int64_t) ts.tv_sec * 1000) + ((int64_t) ts.tv_nsec / 1000000);
  119. #endif
  120. }
  121. int main(int argc, char* argv[])
  122. {
  123. MQTTAsync client;
  124. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  125. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  126. MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
  127. int rc;
  128. if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS)
  129. {
  130. printf("Failed to create client object, return code %d\n", rc);
  131. exit(EXIT_FAILURE);
  132. }
  133. if ((rc = MQTTAsync_setCallbacks(client, client, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS)
  134. {
  135. printf("Failed to set callback, return code %d\n", rc);
  136. exit(EXIT_FAILURE);
  137. }
  138. conn_opts.keepAliveInterval = 20;
  139. conn_opts.cleansession = 1;
  140. conn_opts.onSuccess = onConnect;
  141. conn_opts.onFailure = onConnectFailure;
  142. conn_opts.context = client;
  143. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  144. {
  145. printf("Failed to start connect, return code %d\n", rc);
  146. exit(EXIT_FAILURE);
  147. }
  148. while (!connected) {
  149. #if defined(_WIN32)
  150. Sleep(100);
  151. #else
  152. usleep(100000L);
  153. #endif
  154. }
  155. while (!finished) {
  156. int64_t t = getTime();
  157. char buf[256];
  158. int n = snprintf(buf, sizeof(buf), "%lld", (long long) t);
  159. printf("%s\n", buf);
  160. pub_opts.onSuccess = onSend;
  161. pub_opts.onFailure = onSendFailure;
  162. pub_opts.context = client;
  163. pubmsg.payload = buf;
  164. pubmsg.payloadlen = n;
  165. pubmsg.qos = QOS;
  166. pubmsg.retained = 0;
  167. if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &pub_opts)) != MQTTASYNC_SUCCESS)
  168. {
  169. printf("Failed to start sendMessage, return code %d\n", rc);
  170. exit(EXIT_FAILURE);
  171. }
  172. #if defined(_WIN32)
  173. Sleep(SAMPLE_PERIOD);
  174. #else
  175. usleep(SAMPLE_PERIOD * 1000);
  176. #endif
  177. }
  178. MQTTAsync_destroy(&client);
  179. return rc;
  180. }