MQTTAsync_subscribe.c 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2022 IBM Corp., Ian Craggs
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial contribution
  15. *******************************************************************************/
  16. #include <stdio.h>
  17. #include <stdlib.h>
  18. #include <string.h>
  19. #include "MQTTAsync.h"
  20. #if !defined(_WIN32)
  21. #include <unistd.h>
  22. #else
  23. #include <windows.h>
  24. #endif
  25. #if defined(_WRS_KERNEL)
  26. #include <OsWrapper.h>
  27. #endif
  28. #define ADDRESS "tcp://mqtt.eclipseprojects.io:1883"
  29. #define CLIENTID "ExampleClientSub"
  30. #define TOPIC "MQTT Examples"
  31. #define PAYLOAD "Hello World!"
  32. #define QOS 1
  33. #define TIMEOUT 10000L
  34. int disc_finished = 0;
  35. int subscribed = 0;
  36. int finished = 0;
  37. void onConnect(void* context, MQTTAsync_successData* response);
  38. void onConnectFailure(void* context, MQTTAsync_failureData* response);
  39. void connlost(void *context, char *cause)
  40. {
  41. MQTTAsync client = (MQTTAsync)context;
  42. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  43. int rc;
  44. printf("\nConnection lost\n");
  45. if (cause)
  46. printf(" cause: %s\n", cause);
  47. printf("Reconnecting\n");
  48. conn_opts.keepAliveInterval = 20;
  49. conn_opts.cleansession = 1;
  50. conn_opts.onSuccess = onConnect;
  51. conn_opts.onFailure = onConnectFailure;
  52. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  53. {
  54. printf("Failed to start connect, return code %d\n", rc);
  55. finished = 1;
  56. }
  57. }
  58. int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
  59. {
  60. printf("Message arrived\n");
  61. printf(" topic: %s\n", topicName);
  62. printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
  63. MQTTAsync_freeMessage(&message);
  64. MQTTAsync_free(topicName);
  65. return 1;
  66. }
  67. void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  68. {
  69. printf("Disconnect failed, rc %d\n", response->code);
  70. disc_finished = 1;
  71. }
  72. void onDisconnect(void* context, MQTTAsync_successData* response)
  73. {
  74. printf("Successful disconnection\n");
  75. disc_finished = 1;
  76. }
  77. void onSubscribe(void* context, MQTTAsync_successData* response)
  78. {
  79. printf("Subscribe succeeded\n");
  80. subscribed = 1;
  81. }
  82. void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  83. {
  84. printf("Subscribe failed, rc %d\n", response->code);
  85. finished = 1;
  86. }
  87. void onConnectFailure(void* context, MQTTAsync_failureData* response)
  88. {
  89. printf("Connect failed, rc %d\n", response->code);
  90. finished = 1;
  91. }
  92. void onConnect(void* context, MQTTAsync_successData* response)
  93. {
  94. MQTTAsync client = (MQTTAsync)context;
  95. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  96. int rc;
  97. printf("Successful connection\n");
  98. printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  99. "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
  100. opts.onSuccess = onSubscribe;
  101. opts.onFailure = onSubscribeFailure;
  102. opts.context = client;
  103. if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
  104. {
  105. printf("Failed to start subscribe, return code %d\n", rc);
  106. finished = 1;
  107. }
  108. }
  109. int main(int argc, char* argv[])
  110. {
  111. MQTTAsync client;
  112. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  113. MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  114. int rc;
  115. int ch;
  116. const char* uri = (argc > 1) ? argv[1] : ADDRESS;
  117. printf("Using server at %s\n", uri);
  118. if ((rc = MQTTAsync_create(&client, uri, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
  119. != MQTTASYNC_SUCCESS)
  120. {
  121. printf("Failed to create client, return code %d\n", rc);
  122. rc = EXIT_FAILURE;
  123. goto exit;
  124. }
  125. if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS)
  126. {
  127. printf("Failed to set callbacks, return code %d\n", rc);
  128. rc = EXIT_FAILURE;
  129. goto destroy_exit;
  130. }
  131. conn_opts.keepAliveInterval = 20;
  132. conn_opts.cleansession = 1;
  133. conn_opts.onSuccess = onConnect;
  134. conn_opts.onFailure = onConnectFailure;
  135. conn_opts.context = client;
  136. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  137. {
  138. printf("Failed to start connect, return code %d\n", rc);
  139. rc = EXIT_FAILURE;
  140. goto destroy_exit;
  141. }
  142. while (!subscribed && !finished)
  143. #if defined(_WIN32)
  144. Sleep(100);
  145. #else
  146. usleep(10000L);
  147. #endif
  148. if (finished)
  149. goto exit;
  150. do
  151. {
  152. ch = getchar();
  153. } while (ch!='Q' && ch != 'q');
  154. disc_opts.onSuccess = onDisconnect;
  155. disc_opts.onFailure = onDisconnectFailure;
  156. if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
  157. {
  158. printf("Failed to start disconnect, return code %d\n", rc);
  159. rc = EXIT_FAILURE;
  160. goto destroy_exit;
  161. }
  162. while (!disc_finished)
  163. {
  164. #if defined(_WIN32)
  165. Sleep(100);
  166. #else
  167. usleep(10000L);
  168. #endif
  169. }
  170. destroy_exit:
  171. MQTTAsync_destroy(&client);
  172. exit:
  173. return rc;
  174. }