MQTTAsync.c 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2024 IBM Corp., Ian Craggs and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial implementation and documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL support
  16. * Ian Craggs - multiple server connection support
  17. * Ian Craggs - fix for bug 413429 - connectionLost not called
  18. * Ian Craggs - fix for bug 415042 - using already freed structure
  19. * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
  20. * Ian Craggs - fix for bug 420851
  21. * Ian Craggs - fix for bug 432903 - queue persistence
  22. * Ian Craggs - MQTT 3.1.1 support
  23. * Rong Xiang, Ian Craggs - C++ compatibility
  24. * Ian Craggs - fix for bug 442400: reconnecting after network cable unplugged
  25. * Ian Craggs - fix for bug 444934 - incorrect free in freeCommand1
  26. * Ian Craggs - fix for bug 445891 - assigning msgid is not thread safe
  27. * Ian Craggs - fix for bug 465369 - longer latency than expected
  28. * Ian Craggs - fix for bug 444103 - success/failure callbacks not invoked
  29. * Ian Craggs - fix for bug 484363 - segfault in getReadySocket
  30. * Ian Craggs - automatic reconnect and offline buffering (send while disconnected)
  31. * Ian Craggs - fix for bug 472250
  32. * Ian Craggs - fix for bug 486548
  33. * Ian Craggs - SNI support
  34. * Ian Craggs - auto reconnect timing fix #218
  35. * Ian Craggs - fix for issue #190
  36. * Ian Craggs - check for NULL SSL options #334
  37. * Ian Craggs - allocate username/password buffers #431
  38. * Ian Craggs - MQTT 5.0 support
  39. * Ian Craggs - refactor to reduce module size
  40. *******************************************************************************/
  41. #include <stdlib.h>
  42. #include <string.h>
  43. #if !defined(_WIN32) && !defined(_WIN64)
  44. #include <sys/time.h>
  45. #else
  46. #if defined(_MSC_VER) && _MSC_VER < 1900
  47. #define snprintf _snprintf
  48. #endif
  49. #endif
  50. #if !defined(NO_PERSISTENCE)
  51. #include "MQTTPersistence.h"
  52. #endif
  53. #include "MQTTAsync.h"
  54. #include "MQTTAsyncUtils.h"
  55. #include "utf-8.h"
  56. #include "MQTTProtocol.h"
  57. #include "MQTTProtocolOut.h"
  58. #include "Thread.h"
  59. #include "SocketBuffer.h"
  60. #include "StackTrace.h"
  61. #include "Heap.h"
  62. #include "OsWrapper.h"
  63. #include "WebSocket.h"
  64. static void MQTTAsync_freeServerURIs(MQTTAsyncs* m);
  65. #include "VersionInfo.h"
  66. const char *client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
  67. const char *client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
  68. volatile int global_initialized = 0;
  69. List* MQTTAsync_handles = NULL;
  70. List* MQTTAsync_commands = NULL;
  71. int MQTTAsync_tostop = 0;
  72. static ClientStates ClientState =
  73. {
  74. CLIENT_VERSION, /* version */
  75. NULL /* client list */
  76. };
  77. MQTTProtocol state;
  78. ClientStates* bstate = &ClientState;
  79. enum MQTTAsync_threadStates sendThread_state = STOPPED;
  80. enum MQTTAsync_threadStates receiveThread_state = STOPPED;
  81. thread_id_type sendThread_id = 0,
  82. receiveThread_id = 0;
  83. // global objects init declaration
  84. int MQTTAsync_init(void);
  85. void MQTTAsync_global_init(MQTTAsync_init_options* inits)
  86. {
  87. MQTTAsync_init();
  88. #if defined(OPENSSL)
  89. SSLSocket_handleOpensslInit(inits->do_openssl_init);
  90. #endif
  91. }
  92. #if !defined(min)
  93. #define min(a, b) (((a) < (b)) ? (a) : (b))
  94. #endif
  95. #if defined(WIN32) || defined(WIN64)
  96. void MQTTAsync_init_rand(void)
  97. {
  98. START_TIME_TYPE now = MQTTTime_start_clock();
  99. srand((unsigned int)now);
  100. }
  101. #elif defined(AIX)
  102. void MQTTAsync_init_rand(void)
  103. {
  104. START_TIME_TYPE now = MQTTTime_start_clock();
  105. srand(now.tv_nsec);
  106. }
  107. #else
  108. void MQTTAsync_init_rand(void)
  109. {
  110. START_TIME_TYPE now = MQTTTime_start_clock();
  111. srand(now.tv_usec);
  112. }
  113. #endif
  114. #if defined(_WIN32) || defined(_WIN64)
  115. mutex_type mqttasync_mutex = NULL;
  116. mutex_type socket_mutex = NULL;
  117. mutex_type mqttcommand_mutex = NULL;
  118. sem_type send_sem = NULL;
  119. #if !defined(NO_HEAP_TRACKING)
  120. extern mutex_type stack_mutex;
  121. extern mutex_type heap_mutex;
  122. #endif
  123. extern mutex_type log_mutex;
  124. int MQTTAsync_init(void)
  125. {
  126. DWORD rc = 0;
  127. if (mqttasync_mutex == NULL)
  128. {
  129. if ((mqttasync_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  130. {
  131. rc = GetLastError();
  132. printf("mqttasync_mutex error %d\n", rc);
  133. goto exit;
  134. }
  135. if ((mqttcommand_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  136. {
  137. rc = GetLastError();
  138. printf("mqttcommand_mutex error %d\n", rc);
  139. goto exit;
  140. }
  141. if ((send_sem = CreateEvent(
  142. NULL, /* default security attributes */
  143. FALSE, /* manual-reset event? */
  144. FALSE, /* initial state is nonsignaled */
  145. NULL /* object name */
  146. )) == NULL)
  147. {
  148. rc = GetLastError();
  149. printf("send_sem error %d\n", rc);
  150. goto exit;
  151. }
  152. #if !defined(NO_HEAP_TRACKING)
  153. if ((stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  154. {
  155. rc = GetLastError();
  156. printf("stack_mutex error %d\n", rc);
  157. goto exit;
  158. }
  159. if ((heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  160. {
  161. rc = GetLastError();
  162. printf("heap_mutex error %d\n", rc);
  163. goto exit;
  164. }
  165. #endif
  166. if ((log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  167. {
  168. rc = GetLastError();
  169. printf("log_mutex error %d\n", rc);
  170. goto exit;
  171. }
  172. if ((socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
  173. {
  174. rc = GetLastError();
  175. printf("socket_mutex error %d\n", rc);
  176. goto exit;
  177. }
  178. }
  179. else
  180. {
  181. Log(TRACE_MAX, -1, "Library already initialized");
  182. }
  183. exit:
  184. return rc;
  185. }
  186. void MQTTAsync_cleanup(void)
  187. {
  188. if (send_sem)
  189. CloseHandle(send_sem);
  190. #if !defined(NO_HEAP_TRACKING)
  191. if (stack_mutex)
  192. CloseHandle(stack_mutex);
  193. if (heap_mutex)
  194. CloseHandle(heap_mutex);
  195. #endif
  196. if (log_mutex)
  197. CloseHandle(log_mutex);
  198. if (socket_mutex)
  199. CloseHandle(socket_mutex);
  200. if (mqttasync_mutex)
  201. CloseHandle(mqttasync_mutex);
  202. }
  203. #if defined(PAHO_MQTT_STATIC)
  204. static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT; /* Global for one time initialization */
  205. /* This runs at most once */
  206. BOOL CALLBACK InitMutexesOnce (
  207. PINIT_ONCE InitOnce, /* Pointer to one-time initialization structure */
  208. PVOID Parameter, /* Optional parameter */
  209. PVOID *lpContext) /* Return data, if any */
  210. {
  211. int rc = MQTTAsync_init();
  212. return rc == 0;
  213. }
  214. #else
  215. BOOL APIENTRY DllMain(HANDLE hModule,
  216. DWORD ul_reason_for_call,
  217. LPVOID lpReserved)
  218. {
  219. switch (ul_reason_for_call)
  220. {
  221. case DLL_PROCESS_ATTACH:
  222. MQTTAsync_init();
  223. break;
  224. case DLL_THREAD_ATTACH:
  225. break;
  226. case DLL_THREAD_DETACH:
  227. break;
  228. case DLL_PROCESS_DETACH:
  229. if (lpReserved)
  230. MQTTAsync_cleanup();
  231. break;
  232. }
  233. return TRUE;
  234. }
  235. #endif
  236. #else
  237. static pthread_mutex_t mqttasync_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  238. mutex_type mqttasync_mutex = &mqttasync_mutex_store;
  239. static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  240. mutex_type socket_mutex = &socket_mutex_store;
  241. static pthread_mutex_t mqttcommand_mutex_store = PTHREAD_MUTEX_INITIALIZER;
  242. mutex_type mqttcommand_mutex = &mqttcommand_mutex_store;
  243. static cond_type_struct send_cond_store = { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER };
  244. cond_type send_cond = &send_cond_store;
  245. int MQTTAsync_init(void)
  246. {
  247. pthread_mutexattr_t attr;
  248. int rc;
  249. pthread_mutexattr_init(&attr);
  250. #if !defined(_WRS_KERNEL)
  251. pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
  252. #else
  253. /* #warning "no pthread_mutexattr_settype" */
  254. #endif
  255. if ((rc = pthread_mutex_init(mqttasync_mutex, &attr)) != 0)
  256. printf("MQTTAsync: error %d initializing async_mutex\n", rc);
  257. else if ((rc = pthread_mutex_init(mqttcommand_mutex, &attr)) != 0)
  258. printf("MQTTAsync: error %d initializing command_mutex\n", rc);
  259. else if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
  260. printf("MQTTClient: error %d initializing socket_mutex\n", rc);
  261. else if ((rc = pthread_cond_init(&send_cond->cond, NULL)) != 0)
  262. printf("MQTTAsync: error %d initializing send_cond cond\n", rc);
  263. else if ((rc = pthread_mutex_init(&send_cond->mutex, &attr)) != 0)
  264. printf("MQTTAsync: error %d initializing send_cond mutex\n", rc);
  265. return rc;
  266. }
  267. #endif
  268. int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const char* clientId,
  269. int persistence_type, void* persistence_context, MQTTAsync_createOptions* options)
  270. {
  271. int rc = 0;
  272. MQTTAsyncs *m = NULL;
  273. #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC)
  274. /* intializes mutexes once. Must come before FUNC_ENTRY */
  275. BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitMutexesOnce, NULL, NULL);
  276. #endif
  277. FUNC_ENTRY;
  278. MQTTAsync_lock_mutex(mqttasync_mutex);
  279. if (serverURI == NULL || clientId == NULL)
  280. {
  281. rc = MQTTASYNC_NULL_PARAMETER;
  282. goto exit;
  283. }
  284. if (!UTF8_validateString(clientId))
  285. {
  286. rc = MQTTASYNC_BAD_UTF8_STRING;
  287. goto exit;
  288. }
  289. if (strlen(clientId) == 0 && persistence_type == MQTTCLIENT_PERSISTENCE_DEFAULT)
  290. {
  291. rc = MQTTASYNC_PERSISTENCE_ERROR;
  292. goto exit;
  293. }
  294. if (strstr(serverURI, "://") != NULL)
  295. {
  296. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
  297. && strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) != 0
  298. #if defined(UNIXSOCK)
  299. && strncmp(URI_UNIX, serverURI, strlen(URI_UNIX)) != 0
  300. #endif
  301. && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
  302. #if defined(OPENSSL)
  303. && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
  304. && strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) != 0
  305. && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
  306. #endif
  307. )
  308. {
  309. rc = MQTTASYNC_BAD_PROTOCOL;
  310. goto exit;
  311. }
  312. }
  313. if (options && options->maxBufferedMessages <= 0)
  314. {
  315. rc = MQTTASYNC_MAX_BUFFERED;
  316. goto exit;
  317. }
  318. if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 ||
  319. options->struct_version < 0 || options->struct_version > 3))
  320. {
  321. rc = MQTTASYNC_BAD_STRUCTURE;
  322. goto exit;
  323. }
  324. if (!global_initialized)
  325. {
  326. #if !defined(NO_HEAP_TRACKING)
  327. Heap_initialize();
  328. #endif
  329. Log_initialize((Log_nameValue*)MQTTAsync_getVersionInfo());
  330. bstate->clients = ListInitialize();
  331. Socket_outInitialize();
  332. Socket_setWriteContinueCallback(MQTTAsync_writeContinue);
  333. Socket_setWriteCompleteCallback(MQTTAsync_writeComplete);
  334. Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);
  335. MQTTAsync_handles = ListInitialize();
  336. MQTTAsync_commands = ListInitialize();
  337. #if defined(OPENSSL)
  338. SSLSocket_initialize();
  339. #endif
  340. global_initialized = 1;
  341. }
  342. if ((m = malloc(sizeof(MQTTAsyncs))) == NULL)
  343. {
  344. rc = PAHO_MEMORY_ERROR;
  345. goto exit;
  346. }
  347. *handle = m;
  348. memset(m, '\0', sizeof(MQTTAsyncs));
  349. if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
  350. serverURI += strlen(URI_TCP);
  351. else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
  352. serverURI += strlen(URI_MQTT);
  353. #if defined(UNIXSOCK)
  354. else if (strncmp(URI_UNIX, serverURI, strlen(URI_UNIX)) == 0)
  355. {
  356. serverURI += strlen(URI_UNIX);
  357. m->unixsock = 1;
  358. }
  359. #endif
  360. else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
  361. {
  362. serverURI += strlen(URI_WS);
  363. m->websocket = 1;
  364. }
  365. #if defined(OPENSSL)
  366. else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
  367. {
  368. serverURI += strlen(URI_SSL);
  369. m->ssl = 1;
  370. }
  371. else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
  372. {
  373. serverURI += strlen(URI_MQTTS);
  374. m->ssl = 1;
  375. }
  376. else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
  377. {
  378. serverURI += strlen(URI_WSS);
  379. m->ssl = 1;
  380. m->websocket = 1;
  381. }
  382. #endif
  383. if ((m->serverURI = MQTTStrdup(serverURI)) == NULL)
  384. {
  385. rc = PAHO_MEMORY_ERROR;
  386. goto exit;
  387. }
  388. m->responses = ListInitialize();
  389. ListAppend(MQTTAsync_handles, m, sizeof(MQTTAsyncs));
  390. if ((m->c = malloc(sizeof(Clients))) == NULL)
  391. {
  392. rc = PAHO_MEMORY_ERROR;
  393. goto exit;
  394. }
  395. memset(m->c, '\0', sizeof(Clients));
  396. m->c->context = m;
  397. m->c->outboundMsgs = ListInitialize();
  398. m->c->inboundMsgs = ListInitialize();
  399. m->c->messageQueue = ListInitialize();
  400. m->c->outboundQueue = ListInitialize();
  401. m->c->clientID = MQTTStrdup(clientId);
  402. if (m->c->context == NULL || m->c->outboundMsgs == NULL || m->c->inboundMsgs == NULL ||
  403. m->c->messageQueue == NULL || m->c->outboundQueue == NULL || m->c->clientID == NULL)
  404. {
  405. rc = PAHO_MEMORY_ERROR;
  406. goto exit;
  407. }
  408. m->c->MQTTVersion = MQTTVERSION_DEFAULT;
  409. m->shouldBeConnected = 0;
  410. if (options)
  411. {
  412. if ((m->createOptions = malloc(sizeof(MQTTAsync_createOptions))) == NULL)
  413. {
  414. rc = PAHO_MEMORY_ERROR;
  415. goto exit;
  416. }
  417. memcpy(m->createOptions, options, sizeof(MQTTAsync_createOptions));
  418. if (options->struct_version > 0)
  419. m->c->MQTTVersion = options->MQTTVersion;
  420. }
  421. #if !defined(NO_PERSISTENCE)
  422. rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
  423. if (rc == 0)
  424. {
  425. rc = MQTTPersistence_initialize(m->c, m->serverURI); /* inflight messages restored here */
  426. if (rc == 0)
  427. {
  428. if (m->createOptions && m->createOptions->struct_version >= 2 && m->createOptions->restoreMessages == 0)
  429. MQTTAsync_unpersistCommandsAndMessages(m->c);
  430. else
  431. {
  432. MQTTAsync_restoreCommands(m);
  433. MQTTPersistence_restoreMessageQueue(m->c);
  434. }
  435. }
  436. }
  437. #endif
  438. ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
  439. exit:
  440. MQTTAsync_unlock_mutex(mqttasync_mutex);
  441. FUNC_EXIT_RC(rc);
  442. return rc;
  443. }
  444. int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
  445. int persistence_type, void* persistence_context)
  446. {
  447. MQTTAsync_init_rand();
  448. return MQTTAsync_createWithOptions(handle, serverURI, clientId, persistence_type,
  449. persistence_context, NULL);
  450. }
  451. void MQTTAsync_destroy(MQTTAsync* handle)
  452. {
  453. MQTTAsyncs* m = *handle;
  454. FUNC_ENTRY;
  455. MQTTAsync_lock_mutex(mqttasync_mutex);
  456. if (m == NULL)
  457. goto exit;
  458. MQTTAsync_closeSession(m->c, MQTTREASONCODE_SUCCESS, NULL);
  459. MQTTAsync_NULLPublishResponses(m);
  460. MQTTAsync_freeResponses(m);
  461. MQTTAsync_freeCommands(m);
  462. ListFree(m->responses);
  463. if (m->c)
  464. {
  465. SOCKET saved_socket = m->c->net.socket;
  466. char* saved_clientid = MQTTStrdup(m->c->clientID);
  467. #if !defined(NO_PERSISTENCE)
  468. MQTTPersistence_close(m->c);
  469. #endif
  470. MQTTAsync_emptyMessageQueue(m->c);
  471. MQTTProtocol_freeClient(m->c);
  472. if (!ListRemove(bstate->clients, m->c))
  473. Log(LOG_ERROR, 0, NULL);
  474. else
  475. Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
  476. free(saved_clientid);
  477. }
  478. if (m->serverURI)
  479. free(m->serverURI);
  480. if (m->createOptions)
  481. free(m->createOptions);
  482. MQTTAsync_freeServerURIs(m);
  483. if (m->connectProps)
  484. {
  485. MQTTProperties_free(m->connectProps);
  486. free(m->connectProps);
  487. m->connectProps = NULL;
  488. }
  489. if (m->willProps)
  490. {
  491. MQTTProperties_free(m->willProps);
  492. free(m->willProps);
  493. m->willProps = NULL;
  494. }
  495. if (!ListRemove(MQTTAsync_handles, m))
  496. Log(LOG_ERROR, -1, "free error");
  497. *handle = NULL;
  498. if (bstate->clients->count == 0)
  499. MQTTAsync_terminate();
  500. exit:
  501. MQTTAsync_unlock_mutex(mqttasync_mutex);
  502. FUNC_EXIT;
  503. }
  504. int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
  505. {
  506. MQTTAsyncs* m = handle;
  507. int rc = MQTTASYNC_SUCCESS;
  508. MQTTAsync_queuedCommand* conn;
  509. thread_id_type thread_id = 0;
  510. int locked = 0;
  511. FUNC_ENTRY;
  512. if (options == NULL)
  513. {
  514. rc = MQTTASYNC_NULL_PARAMETER;
  515. goto exit;
  516. }
  517. if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
  518. {
  519. rc = MQTTASYNC_BAD_STRUCTURE;
  520. goto exit;
  521. }
  522. #if defined(OPENSSL)
  523. if (m->ssl && options->ssl == NULL)
  524. {
  525. rc = MQTTASYNC_NULL_PARAMETER;
  526. goto exit;
  527. }
  528. #endif
  529. if (options->will) /* check validity of will options structure */
  530. {
  531. if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
  532. {
  533. rc = MQTTASYNC_BAD_STRUCTURE;
  534. goto exit;
  535. }
  536. if (options->will->qos < 0 || options->will->qos > 2)
  537. {
  538. rc = MQTTASYNC_BAD_QOS;
  539. goto exit;
  540. }
  541. if (options->will->topicName == NULL)
  542. {
  543. rc = MQTTASYNC_NULL_PARAMETER;
  544. goto exit;
  545. } else if (strlen(options->will->topicName) == 0)
  546. {
  547. rc = MQTTASYNC_0_LEN_WILL_TOPIC;
  548. goto exit;
  549. }
  550. }
  551. if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
  552. {
  553. if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 5)
  554. {
  555. rc = MQTTASYNC_BAD_STRUCTURE;
  556. goto exit;
  557. }
  558. }
  559. if (options->MQTTVersion >= MQTTVERSION_5 && m->c->MQTTVersion < MQTTVERSION_5)
  560. {
  561. rc = MQTTASYNC_WRONG_MQTT_VERSION;
  562. goto exit;
  563. }
  564. if ((options->username && !UTF8_validateString(options->username)) ||
  565. (options->password && !UTF8_validateString(options->password)))
  566. {
  567. rc = MQTTASYNC_BAD_UTF8_STRING;
  568. goto exit;
  569. }
  570. if (options->MQTTVersion >= MQTTVERSION_5 && options->struct_version < 6)
  571. {
  572. rc = MQTTASYNC_BAD_STRUCTURE;
  573. goto exit;
  574. }
  575. if (options->MQTTVersion >= MQTTVERSION_5 && options->cleansession != 0)
  576. {
  577. rc = MQTTASYNC_BAD_MQTT_OPTION;
  578. goto exit;
  579. }
  580. if (options->MQTTVersion < MQTTVERSION_5 && options->struct_version >= 6)
  581. {
  582. if (options->cleanstart != 0 || options->onFailure5 || options->onSuccess5 ||
  583. options->connectProperties || options->willProperties)
  584. {
  585. rc = MQTTASYNC_BAD_MQTT_OPTION;
  586. goto exit;
  587. }
  588. }
  589. m->connect.onSuccess = options->onSuccess;
  590. m->connect.onFailure = options->onFailure;
  591. if (options->struct_version >= 6)
  592. {
  593. m->connect.onSuccess5 = options->onSuccess5;
  594. m->connect.onFailure5 = options->onFailure5;
  595. }
  596. m->connect.context = options->context;
  597. m->connectTimeout = options->connectTimeout;
  598. /* don't lock async mutex if we are being called from a callback */
  599. thread_id = Paho_thread_getid();
  600. if (thread_id != sendThread_id && thread_id != receiveThread_id)
  601. {
  602. MQTTAsync_lock_mutex(mqttasync_mutex);
  603. locked = 1;
  604. }
  605. MQTTAsync_tostop = 0;
  606. if (sendThread_state != STARTING && sendThread_state != RUNNING)
  607. {
  608. sendThread_state = STARTING;
  609. Paho_thread_start(MQTTAsync_sendThread, NULL);
  610. }
  611. if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
  612. {
  613. receiveThread_state = STARTING;
  614. Paho_thread_start(MQTTAsync_receiveThread, handle);
  615. }
  616. if (locked)
  617. MQTTAsync_unlock_mutex(mqttasync_mutex);
  618. m->c->keepAliveInterval = m->c->savedKeepAliveInterval = options->keepAliveInterval;
  619. setRetryLoopInterval(options->keepAliveInterval);
  620. m->c->cleansession = options->cleansession;
  621. m->c->maxInflightMessages = options->maxInflight;
  622. if (options->struct_version >= 3)
  623. m->c->MQTTVersion = options->MQTTVersion;
  624. else
  625. m->c->MQTTVersion = MQTTVERSION_DEFAULT;
  626. if (options->struct_version >= 4)
  627. {
  628. m->automaticReconnect = options->automaticReconnect;
  629. m->minRetryInterval = options->minRetryInterval;
  630. m->maxRetryInterval = options->maxRetryInterval;
  631. }
  632. if (options->struct_version >= 7)
  633. {
  634. m->c->net.httpHeaders = (const MQTTClient_nameValue *) options->httpHeaders;
  635. }
  636. if (options->struct_version >= 8)
  637. {
  638. if (options->httpProxy)
  639. m->c->httpProxy = MQTTStrdup(options->httpProxy);
  640. if (options->httpsProxy)
  641. m->c->httpsProxy = MQTTStrdup(options->httpsProxy);
  642. }
  643. if (m->c->will)
  644. {
  645. free(m->c->will->payload);
  646. free(m->c->will->topic);
  647. free(m->c->will);
  648. m->c->will = NULL;
  649. }
  650. if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
  651. {
  652. const void* source = NULL;
  653. if ((m->c->will = malloc(sizeof(willMessages))) == NULL)
  654. {
  655. rc = PAHO_MEMORY_ERROR;
  656. goto exit;
  657. }
  658. if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
  659. {
  660. if (options->will->struct_version == 1 && options->will->payload.data)
  661. {
  662. m->c->will->payloadlen = options->will->payload.len;
  663. source = options->will->payload.data;
  664. }
  665. else
  666. {
  667. m->c->will->payloadlen = (int)strlen(options->will->message);
  668. source = (void*)options->will->message;
  669. }
  670. if ((m->c->will->payload = malloc(m->c->will->payloadlen)) == NULL)
  671. {
  672. rc = PAHO_MEMORY_ERROR;
  673. goto exit;
  674. }
  675. memcpy(m->c->will->payload, source, m->c->will->payloadlen);
  676. }
  677. else
  678. {
  679. m->c->will->payload = NULL;
  680. m->c->will->payloadlen = 0;
  681. }
  682. m->c->will->qos = options->will->qos;
  683. m->c->will->retained = options->will->retained;
  684. m->c->will->topic = MQTTStrdup(options->will->topicName);
  685. }
  686. #if defined(OPENSSL)
  687. if (m->c->sslopts)
  688. {
  689. if (m->c->sslopts->trustStore)
  690. free((void*)m->c->sslopts->trustStore);
  691. if (m->c->sslopts->keyStore)
  692. free((void*)m->c->sslopts->keyStore);
  693. if (m->c->sslopts->privateKey)
  694. free((void*)m->c->sslopts->privateKey);
  695. if (m->c->sslopts->privateKeyPassword)
  696. free((void*)m->c->sslopts->privateKeyPassword);
  697. if (m->c->sslopts->enabledCipherSuites)
  698. free((void*)m->c->sslopts->enabledCipherSuites);
  699. if (m->c->sslopts->struct_version >= 2)
  700. {
  701. if (m->c->sslopts->CApath)
  702. free((void*)m->c->sslopts->CApath);
  703. }
  704. free((void*)m->c->sslopts);
  705. m->c->sslopts = NULL;
  706. }
  707. if (options->struct_version != 0 && options->ssl)
  708. {
  709. if ((m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions))) == NULL)
  710. {
  711. rc = PAHO_MEMORY_ERROR;
  712. goto exit;
  713. }
  714. memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
  715. m->c->sslopts->struct_version = options->ssl->struct_version;
  716. if (options->ssl->trustStore)
  717. m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
  718. if (options->ssl->keyStore)
  719. m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
  720. if (options->ssl->privateKey)
  721. m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
  722. if (options->ssl->privateKeyPassword)
  723. m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
  724. if (options->ssl->enabledCipherSuites)
  725. m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
  726. m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
  727. if (m->c->sslopts->struct_version >= 1)
  728. m->c->sslopts->sslVersion = options->ssl->sslVersion;
  729. if (m->c->sslopts->struct_version >= 2)
  730. {
  731. m->c->sslopts->verify = options->ssl->verify;
  732. if (options->ssl->CApath)
  733. m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
  734. }
  735. if (m->c->sslopts->struct_version >= 3)
  736. {
  737. m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
  738. m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
  739. }
  740. if (m->c->sslopts->struct_version >= 4)
  741. {
  742. m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
  743. m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
  744. m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
  745. }
  746. if (m->c->sslopts->struct_version >= 5)
  747. {
  748. if (options->ssl->protos)
  749. m->c->sslopts->protos = (const unsigned char*)MQTTStrdup((const char*)options->ssl->protos);
  750. m->c->sslopts->protos_len = options->ssl->protos_len;
  751. }
  752. }
  753. #else
  754. if (options->struct_version != 0 && options->ssl)
  755. {
  756. rc = MQTTASYNC_SSL_NOT_SUPPORTED;
  757. goto exit;
  758. }
  759. #endif
  760. if (m->c->username)
  761. {
  762. free((void*)m->c->username);
  763. m->c->username = NULL;
  764. }
  765. if (options->username)
  766. m->c->username = MQTTStrdup(options->username);
  767. if (m->c->password)
  768. {
  769. free((void*)m->c->password);
  770. m->c->password = NULL;
  771. }
  772. if (options->password)
  773. {
  774. m->c->password = MQTTStrdup(options->password);
  775. m->c->passwordlen = (int)strlen(options->password);
  776. }
  777. else if (options->struct_version >= 5 && options->binarypwd.data)
  778. {
  779. m->c->passwordlen = options->binarypwd.len;
  780. if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
  781. {
  782. rc = PAHO_MEMORY_ERROR;
  783. goto exit;
  784. }
  785. memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
  786. }
  787. m->c->retryInterval = options->retryInterval;
  788. m->shouldBeConnected = 1;
  789. m->connectTimeout = options->connectTimeout;
  790. MQTTAsync_freeServerURIs(m);
  791. if (options->struct_version >= 2 && options->serverURIcount > 0)
  792. {
  793. int i;
  794. m->serverURIcount = options->serverURIcount;
  795. if ((m->serverURIs = malloc(options->serverURIcount * sizeof(char*))) == NULL)
  796. {
  797. rc = PAHO_MEMORY_ERROR;
  798. goto exit;
  799. }
  800. for (i = 0; i < options->serverURIcount; ++i)
  801. m->serverURIs[i] = MQTTStrdup(options->serverURIs[i]);
  802. }
  803. if (m->connectProps)
  804. {
  805. MQTTProperties_free(m->connectProps);
  806. free(m->connectProps);
  807. m->connectProps = NULL;
  808. }
  809. if (m->willProps)
  810. {
  811. MQTTProperties_free(m->willProps);
  812. free(m->willProps);
  813. m->willProps = NULL;
  814. }
  815. if (options->struct_version >=6)
  816. {
  817. if (options->connectProperties)
  818. {
  819. MQTTProperties initialized = MQTTProperties_initializer;
  820. if ((m->connectProps = malloc(sizeof(MQTTProperties))) == NULL)
  821. {
  822. rc = PAHO_MEMORY_ERROR;
  823. goto exit;
  824. }
  825. *m->connectProps = initialized;
  826. *m->connectProps = MQTTProperties_copy(options->connectProperties);
  827. if (MQTTProperties_hasProperty(options->connectProperties, MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL))
  828. m->c->sessionExpiry = (int)MQTTProperties_getNumericValue(options->connectProperties,
  829. MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL);
  830. }
  831. if (options->willProperties)
  832. {
  833. MQTTProperties initialized = MQTTProperties_initializer;
  834. if ((m->willProps = malloc(sizeof(MQTTProperties))) == NULL)
  835. {
  836. rc = PAHO_MEMORY_ERROR;
  837. goto exit;
  838. }
  839. *m->willProps = initialized;
  840. *m->willProps = MQTTProperties_copy(options->willProperties);
  841. }
  842. m->c->cleanstart = options->cleanstart;
  843. }
  844. /* Add connect request to operation queue */
  845. if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
  846. {
  847. rc = PAHO_MEMORY_ERROR;
  848. goto exit;
  849. }
  850. memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
  851. conn->client = m;
  852. if (options)
  853. {
  854. conn->command.onSuccess = options->onSuccess;
  855. conn->command.onFailure = options->onFailure;
  856. conn->command.onSuccess5 = options->onSuccess5;
  857. conn->command.onFailure5 = options->onFailure5;
  858. conn->command.context = options->context;
  859. }
  860. conn->command.type = CONNECT;
  861. conn->command.details.conn.currentURI = 0;
  862. rc = MQTTAsync_addCommand(conn, sizeof(conn));
  863. exit:
  864. FUNC_EXIT_RC(rc);
  865. return rc;
  866. }
  867. int MQTTAsync_reconnect(MQTTAsync handle)
  868. {
  869. int rc = MQTTASYNC_FAILURE;
  870. MQTTAsyncs* m = handle;
  871. FUNC_ENTRY;
  872. MQTTAsync_lock_mutex(mqttasync_mutex);
  873. if (m->automaticReconnect)
  874. {
  875. if (m->shouldBeConnected)
  876. {
  877. m->reconnectNow = 1;
  878. m->currentIntervalBase = m->minRetryInterval;
  879. m->currentInterval = m->minRetryInterval;
  880. m->retrying = 1;
  881. rc = MQTTASYNC_SUCCESS;
  882. }
  883. }
  884. else
  885. {
  886. /* to reconnect, put the connect command to the head of the command queue */
  887. MQTTAsync_queuedCommand* conn = malloc(sizeof(MQTTAsync_queuedCommand));
  888. if (!conn)
  889. {
  890. rc = PAHO_MEMORY_ERROR;
  891. goto exit;
  892. }
  893. memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
  894. conn->client = m;
  895. conn->command = m->connect;
  896. /* make sure that the version attempts are restarted */
  897. if (m->c->MQTTVersion == MQTTVERSION_DEFAULT)
  898. conn->command.details.conn.MQTTVersion = 0;
  899. rc = MQTTAsync_addCommand(conn, sizeof(m->connect));
  900. }
  901. exit:
  902. MQTTAsync_unlock_mutex(mqttasync_mutex);
  903. FUNC_EXIT_RC(rc);
  904. return rc;
  905. }
  906. int MQTTAsync_inCallback()
  907. {
  908. thread_id_type thread_id = Paho_thread_getid();
  909. return thread_id == sendThread_id || thread_id == receiveThread_id;
  910. }
  911. int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, const int* qos, MQTTAsync_responseOptions* response)
  912. {
  913. MQTTAsyncs* m = handle;
  914. int i = 0;
  915. int rc = MQTTASYNC_SUCCESS;
  916. MQTTAsync_queuedCommand* sub;
  917. int msgid = 0;
  918. FUNC_ENTRY;
  919. if (!MQTTAsync_inCallback())
  920. MQTTAsync_lock_mutex(mqttasync_mutex);
  921. if (m == NULL || m->c == NULL)
  922. rc = MQTTASYNC_FAILURE;
  923. else if (m->c->connected == 0)
  924. rc = MQTTASYNC_DISCONNECTED;
  925. else for (i = 0; i < count; i++)
  926. {
  927. if (!UTF8_validateString(topic[i]))
  928. {
  929. rc = MQTTASYNC_BAD_UTF8_STRING;
  930. break;
  931. }
  932. if (qos[i] < 0 || qos[i] > 2)
  933. {
  934. rc = MQTTASYNC_BAD_QOS;
  935. break;
  936. }
  937. }
  938. if (rc != MQTTASYNC_SUCCESS)
  939. ; /* don't overwrite a previous error code */
  940. else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
  941. rc = MQTTASYNC_NO_MORE_MSGIDS;
  942. else if (m->c->MQTTVersion >= MQTTVERSION_5 && count > 1 && (count != response->subscribeOptionsCount
  943. && response->subscribeOptionsCount != 0))
  944. rc = MQTTASYNC_BAD_MQTT_OPTION;
  945. else if (response)
  946. {
  947. if (m->c->MQTTVersion >= MQTTVERSION_5)
  948. {
  949. if (response->struct_version == 0 || response->onFailure || response->onSuccess)
  950. rc = MQTTASYNC_BAD_MQTT_OPTION;
  951. }
  952. else if (m->c->MQTTVersion < MQTTVERSION_5)
  953. {
  954. if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
  955. rc = MQTTASYNC_BAD_MQTT_OPTION;
  956. }
  957. }
  958. if (rc != MQTTASYNC_SUCCESS)
  959. goto exit;
  960. /* Add subscribe request to operation queue */
  961. if ((sub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
  962. {
  963. rc = PAHO_MEMORY_ERROR;
  964. goto exit;
  965. }
  966. memset(sub, '\0', sizeof(MQTTAsync_queuedCommand));
  967. sub->client = m;
  968. sub->command.token = msgid;
  969. if (response)
  970. {
  971. sub->command.onSuccess = response->onSuccess;
  972. sub->command.onFailure = response->onFailure;
  973. sub->command.onSuccess5 = response->onSuccess5;
  974. sub->command.onFailure5 = response->onFailure5;
  975. sub->command.context = response->context;
  976. response->token = sub->command.token;
  977. if (m->c->MQTTVersion >= MQTTVERSION_5)
  978. {
  979. sub->command.properties = MQTTProperties_copy(&response->properties);
  980. sub->command.details.sub.opts = response->subscribeOptions;
  981. if (count > 1)
  982. {
  983. if ((sub->command.details.sub.optlist = malloc(sizeof(MQTTSubscribe_options) * count)) == NULL)
  984. {
  985. rc = PAHO_MEMORY_ERROR;
  986. goto exit;
  987. }
  988. if (response->subscribeOptionsCount == 0)
  989. {
  990. MQTTSubscribe_options initialized = MQTTSubscribe_options_initializer;
  991. for (i = 0; i < count; ++i)
  992. sub->command.details.sub.optlist[i] = initialized;
  993. }
  994. else
  995. {
  996. for (i = 0; i < count; ++i)
  997. sub->command.details.sub.optlist[i] = response->subscribeOptionsList[i];
  998. }
  999. }
  1000. }
  1001. }
  1002. sub->command.type = SUBSCRIBE;
  1003. sub->command.details.sub.count = count;
  1004. sub->command.details.sub.topics = malloc(sizeof(char*) * count);
  1005. sub->command.details.sub.qoss = malloc(sizeof(int) * count);
  1006. if (sub->command.details.sub.topics && sub->command.details.sub.qoss)
  1007. {
  1008. for (i = 0; i < count; ++i)
  1009. {
  1010. if ((sub->command.details.sub.topics[i] = MQTTStrdup(topic[i])) == NULL)
  1011. {
  1012. rc = PAHO_MEMORY_ERROR;
  1013. goto exit;
  1014. }
  1015. sub->command.details.sub.qoss[i] = qos[i];
  1016. }
  1017. rc = MQTTAsync_addCommand(sub, sizeof(sub));
  1018. }
  1019. else
  1020. rc = PAHO_MEMORY_ERROR;
  1021. exit:
  1022. if (!MQTTAsync_inCallback())
  1023. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1024. FUNC_EXIT_RC(rc);
  1025. return rc;
  1026. }
  1027. int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response)
  1028. {
  1029. int rc = 0;
  1030. FUNC_ENTRY;
  1031. rc = MQTTAsync_subscribeMany(handle, 1, (char * const *)(&topic), &qos, response);
  1032. FUNC_EXIT_RC(rc);
  1033. return rc;
  1034. }
  1035. int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, MQTTAsync_responseOptions* response)
  1036. {
  1037. MQTTAsyncs* m = handle;
  1038. int i = 0;
  1039. int rc = MQTTASYNC_SUCCESS;
  1040. MQTTAsync_queuedCommand* unsub;
  1041. int msgid = 0;
  1042. FUNC_ENTRY;
  1043. if (!MQTTAsync_inCallback())
  1044. MQTTAsync_lock_mutex(mqttasync_mutex);
  1045. if (m == NULL || m->c == NULL)
  1046. rc = MQTTASYNC_FAILURE;
  1047. else if (m->c->connected == 0)
  1048. rc = MQTTASYNC_DISCONNECTED;
  1049. else for (i = 0; i < count; i++)
  1050. {
  1051. if (!UTF8_validateString(topic[i]))
  1052. {
  1053. rc = MQTTASYNC_BAD_UTF8_STRING;
  1054. break;
  1055. }
  1056. }
  1057. if (rc != MQTTASYNC_SUCCESS)
  1058. ; /* don't overwrite a previous error code */
  1059. else if ((msgid = MQTTAsync_assignMsgId(m)) == 0)
  1060. rc = MQTTASYNC_NO_MORE_MSGIDS;
  1061. else if (response)
  1062. {
  1063. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1064. {
  1065. if (response->struct_version == 0 || response->onFailure || response->onSuccess)
  1066. rc = MQTTASYNC_BAD_MQTT_OPTION;
  1067. }
  1068. else if (m->c->MQTTVersion < MQTTVERSION_5)
  1069. {
  1070. if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
  1071. rc = MQTTASYNC_BAD_MQTT_OPTION;
  1072. }
  1073. }
  1074. if (rc != MQTTASYNC_SUCCESS)
  1075. goto exit;
  1076. /* Add unsubscribe request to operation queue */
  1077. if ((unsub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
  1078. {
  1079. rc = PAHO_MEMORY_ERROR;
  1080. goto exit;
  1081. }
  1082. memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand));
  1083. unsub->client = m;
  1084. unsub->command.type = UNSUBSCRIBE;
  1085. unsub->command.token = msgid;
  1086. if (response)
  1087. {
  1088. unsub->command.onSuccess = response->onSuccess;
  1089. unsub->command.onFailure = response->onFailure;
  1090. unsub->command.onSuccess5 = response->onSuccess5;
  1091. unsub->command.onFailure5 = response->onFailure5;
  1092. unsub->command.context = response->context;
  1093. response->token = unsub->command.token;
  1094. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1095. unsub->command.properties = MQTTProperties_copy(&response->properties);
  1096. }
  1097. unsub->command.details.unsub.count = count;
  1098. if ((unsub->command.details.unsub.topics = malloc(sizeof(char*) * count)) == NULL)
  1099. {
  1100. rc = PAHO_MEMORY_ERROR;
  1101. goto exit;
  1102. }
  1103. for (i = 0; i < count; ++i)
  1104. unsub->command.details.unsub.topics[i] = MQTTStrdup(topic[i]);
  1105. rc = MQTTAsync_addCommand(unsub, sizeof(unsub));
  1106. exit:
  1107. if (!MQTTAsync_inCallback())
  1108. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1109. FUNC_EXIT_RC(rc);
  1110. return rc;
  1111. }
  1112. int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_responseOptions* response)
  1113. {
  1114. int rc = 0;
  1115. FUNC_ENTRY;
  1116. rc = MQTTAsync_unsubscribeMany(handle, 1, (char * const *)(&topic), response);
  1117. FUNC_EXIT_RC(rc);
  1118. return rc;
  1119. }
  1120. int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen, const void* payload,
  1121. int qos, int retained, MQTTAsync_responseOptions* response)
  1122. {
  1123. int rc = MQTTASYNC_SUCCESS;
  1124. MQTTAsyncs* m = handle;
  1125. MQTTAsync_queuedCommand* pub;
  1126. int msgid = 0;
  1127. FUNC_ENTRY;
  1128. if (!MQTTAsync_inCallback())
  1129. MQTTAsync_lock_mutex(mqttasync_mutex);
  1130. if (m == NULL || m->c == NULL)
  1131. rc = MQTTASYNC_FAILURE;
  1132. else if (m->c->connected == 0)
  1133. {
  1134. if (m->createOptions == NULL)
  1135. rc = MQTTASYNC_DISCONNECTED;
  1136. else if (m->createOptions->sendWhileDisconnected == 0)
  1137. rc = MQTTASYNC_DISCONNECTED;
  1138. else if (m->shouldBeConnected == 0 && (m->createOptions->struct_version < 2 || m->createOptions->allowDisconnectedSendAtAnyTime == 0))
  1139. rc = MQTTASYNC_DISCONNECTED;
  1140. }
  1141. if (rc != MQTTASYNC_SUCCESS)
  1142. goto exit;
  1143. if (!UTF8_validateString(destinationName))
  1144. rc = MQTTASYNC_BAD_UTF8_STRING;
  1145. else if (qos < 0 || qos > 2)
  1146. rc = MQTTASYNC_BAD_QOS;
  1147. else if (qos > 0 && (msgid = MQTTAsync_assignMsgId(m)) == 0)
  1148. rc = MQTTASYNC_NO_MORE_MSGIDS;
  1149. else if (m->createOptions &&
  1150. (m->createOptions->struct_version < 2 || m->createOptions->deleteOldestMessages == 0) &&
  1151. (MQTTAsync_getNoBufferedMessages(m) >= m->createOptions->maxBufferedMessages))
  1152. rc = MQTTASYNC_MAX_BUFFERED_MESSAGES;
  1153. else if (response)
  1154. {
  1155. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1156. {
  1157. if (response->struct_version == 0 || response->onFailure || response->onSuccess)
  1158. rc = MQTTASYNC_BAD_MQTT_OPTION;
  1159. }
  1160. else if (m->c->MQTTVersion < MQTTVERSION_5)
  1161. {
  1162. if (response->struct_version >= 1 && (response->onFailure5 || response->onSuccess5))
  1163. rc = MQTTASYNC_BAD_MQTT_OPTION;
  1164. }
  1165. }
  1166. if (rc != MQTTASYNC_SUCCESS)
  1167. goto exit;
  1168. /* Add publish request to operation queue */
  1169. if ((pub = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
  1170. {
  1171. rc = PAHO_MEMORY_ERROR;
  1172. goto exit;
  1173. }
  1174. memset(pub, '\0', sizeof(MQTTAsync_queuedCommand));
  1175. pub->client = m;
  1176. pub->command.type = PUBLISH;
  1177. pub->command.token = msgid;
  1178. if (response)
  1179. {
  1180. pub->command.onSuccess = response->onSuccess;
  1181. pub->command.onFailure = response->onFailure;
  1182. pub->command.onSuccess5 = response->onSuccess5;
  1183. pub->command.onFailure5 = response->onFailure5;
  1184. pub->command.context = response->context;
  1185. response->token = pub->command.token;
  1186. if (m->c->MQTTVersion >= MQTTVERSION_5)
  1187. pub->command.properties = MQTTProperties_copy(&response->properties);
  1188. }
  1189. if ((pub->command.details.pub.destinationName = MQTTStrdup(destinationName)) == NULL)
  1190. {
  1191. free(pub);
  1192. rc = PAHO_MEMORY_ERROR;
  1193. goto exit;
  1194. }
  1195. pub->command.details.pub.payloadlen = payloadlen;
  1196. if ((pub->command.details.pub.payload = malloc(payloadlen)) == NULL)
  1197. {
  1198. free(pub->command.details.pub.destinationName);
  1199. free(pub);
  1200. rc = PAHO_MEMORY_ERROR;
  1201. goto exit;
  1202. }
  1203. memcpy(pub->command.details.pub.payload, payload, payloadlen);
  1204. pub->command.details.pub.qos = qos;
  1205. pub->command.details.pub.retained = retained;
  1206. rc = MQTTAsync_addCommand(pub, sizeof(pub));
  1207. exit:
  1208. if (!MQTTAsync_inCallback())
  1209. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1210. FUNC_EXIT_RC(rc);
  1211. return rc;
  1212. }
  1213. int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, const MQTTAsync_message* message,
  1214. MQTTAsync_responseOptions* response)
  1215. {
  1216. int rc = MQTTASYNC_SUCCESS;
  1217. MQTTAsyncs* m = handle;
  1218. FUNC_ENTRY;
  1219. if (message == NULL)
  1220. {
  1221. rc = MQTTASYNC_NULL_PARAMETER;
  1222. goto exit;
  1223. }
  1224. if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
  1225. (message->struct_version != 0 && message->struct_version != 1))
  1226. {
  1227. rc = MQTTASYNC_BAD_STRUCTURE;
  1228. goto exit;
  1229. }
  1230. if (m->c->MQTTVersion >= MQTTVERSION_5 && response)
  1231. response->properties = message->properties;
  1232. rc = MQTTAsync_send(handle, destinationName, message->payloadlen, message->payload,
  1233. message->qos, message->retained, response);
  1234. exit:
  1235. FUNC_EXIT_RC(rc);
  1236. return rc;
  1237. }
  1238. int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options)
  1239. {
  1240. int rc = 0;
  1241. FUNC_ENTRY;
  1242. if (!MQTTAsync_inCallback())
  1243. MQTTAsync_lock_mutex(mqttasync_mutex);
  1244. if (options != NULL && (strncmp(options->struct_id, "MQTD", 4) != 0 || options->struct_version < 0 || options->struct_version > 1))
  1245. rc = MQTTASYNC_BAD_STRUCTURE;
  1246. else
  1247. rc = MQTTAsync_disconnect1(handle, options, 0);
  1248. if (!MQTTAsync_inCallback())
  1249. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1250. FUNC_EXIT_RC(rc);
  1251. return rc;
  1252. }
  1253. int MQTTAsync_isConnected(MQTTAsync handle)
  1254. {
  1255. MQTTAsyncs* m = handle;
  1256. int rc = 0;
  1257. FUNC_ENTRY;
  1258. MQTTAsync_lock_mutex(mqttasync_mutex);
  1259. if (m && m->c)
  1260. rc = m->c->connected;
  1261. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1262. FUNC_EXIT_RC(rc);
  1263. return rc;
  1264. }
  1265. int MQTTAsync_isComplete(MQTTAsync handle, MQTTAsync_token dt)
  1266. {
  1267. int rc = MQTTASYNC_SUCCESS;
  1268. MQTTAsyncs* m = handle;
  1269. ListElement* current = NULL;
  1270. FUNC_ENTRY;
  1271. MQTTAsync_lock_mutex(mqttasync_mutex);
  1272. if (m == NULL)
  1273. {
  1274. rc = MQTTASYNC_FAILURE;
  1275. goto exit;
  1276. }
  1277. /* First check unprocessed commands */
  1278. current = NULL;
  1279. while (ListNextElement(MQTTAsync_commands, &current))
  1280. {
  1281. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  1282. if (cmd->client == m && cmd->command.token == dt)
  1283. goto exit;
  1284. }
  1285. /* Now check the inflight messages */
  1286. if (m->c && m->c->outboundMsgs->count > 0)
  1287. {
  1288. current = NULL;
  1289. while (ListNextElement(m->c->outboundMsgs, &current))
  1290. {
  1291. Messages* m2 = (Messages*)(current->content);
  1292. if (m2->msgid == dt)
  1293. goto exit;
  1294. }
  1295. }
  1296. rc = MQTTASYNC_TRUE; /* Can't find it, so it must be complete */
  1297. exit:
  1298. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1299. FUNC_EXIT_RC(rc);
  1300. return rc;
  1301. }
  1302. int MQTTAsync_waitForCompletion(MQTTAsync handle, MQTTAsync_token dt, unsigned long timeout)
  1303. {
  1304. int rc = MQTTASYNC_FAILURE;
  1305. START_TIME_TYPE start = MQTTTime_start_clock();
  1306. ELAPSED_TIME_TYPE elapsed = 0L;
  1307. MQTTAsyncs* m = handle;
  1308. FUNC_ENTRY;
  1309. MQTTAsync_lock_mutex(mqttasync_mutex);
  1310. if (m == NULL || m->c == NULL)
  1311. {
  1312. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1313. rc = MQTTASYNC_FAILURE;
  1314. goto exit;
  1315. }
  1316. if (m->c->connected == 0)
  1317. {
  1318. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1319. rc = MQTTASYNC_DISCONNECTED;
  1320. goto exit;
  1321. }
  1322. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1323. if (MQTTAsync_isComplete(handle, dt) == 1)
  1324. {
  1325. rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
  1326. goto exit;
  1327. }
  1328. elapsed = MQTTTime_elapsed(start);
  1329. while (elapsed < timeout && rc == MQTTASYNC_FAILURE)
  1330. {
  1331. MQTTTime_sleep(100);
  1332. if (MQTTAsync_isComplete(handle, dt) == 1)
  1333. rc = MQTTASYNC_SUCCESS; /* well we couldn't find it */
  1334. MQTTAsync_lock_mutex(mqttasync_mutex);
  1335. if (m->c->connected == 0)
  1336. rc = MQTTASYNC_DISCONNECTED;
  1337. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1338. elapsed = MQTTTime_elapsed(start);
  1339. }
  1340. exit:
  1341. FUNC_EXIT_RC(rc);
  1342. return rc;
  1343. }
  1344. int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
  1345. {
  1346. int rc = MQTTASYNC_SUCCESS;
  1347. MQTTAsyncs* m = handle;
  1348. ListElement* current = NULL;
  1349. int count = 0;
  1350. FUNC_ENTRY;
  1351. MQTTAsync_lock_mutex(mqttasync_mutex);
  1352. MQTTAsync_lock_mutex(mqttcommand_mutex);
  1353. *tokens = NULL;
  1354. if (m == NULL)
  1355. {
  1356. rc = MQTTASYNC_FAILURE;
  1357. goto exit;
  1358. }
  1359. /* calculate the number of pending tokens - commands plus inflight */
  1360. while (ListNextElement(MQTTAsync_commands, &current))
  1361. {
  1362. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  1363. if (cmd->client == m && cmd->command.type == PUBLISH)
  1364. count++;
  1365. }
  1366. if (m->c)
  1367. count += m->c->outboundMsgs->count;
  1368. if (count == 0)
  1369. goto exit; /* no tokens to return */
  1370. *tokens = malloc(sizeof(MQTTAsync_token) * (count + 1)); /* add space for sentinel at end of list */
  1371. if (!*tokens)
  1372. {
  1373. rc = PAHO_MEMORY_ERROR;
  1374. goto exit;
  1375. }
  1376. /* First add the unprocessed commands to the pending tokens */
  1377. current = NULL;
  1378. count = 0;
  1379. while (ListNextElement(MQTTAsync_commands, &current))
  1380. {
  1381. MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
  1382. if (cmd->client == m && cmd->command.type == PUBLISH)
  1383. (*tokens)[count++] = cmd->command.token;
  1384. }
  1385. /* Now add the inflight messages */
  1386. if (m->c && m->c->outboundMsgs->count > 0)
  1387. {
  1388. current = NULL;
  1389. while (ListNextElement(m->c->outboundMsgs, &current))
  1390. {
  1391. Messages* m2 = (Messages*)(current->content);
  1392. (*tokens)[count++] = m2->msgid;
  1393. }
  1394. }
  1395. (*tokens)[count] = -1; /* indicate end of list */
  1396. exit:
  1397. MQTTAsync_unlock_mutex(mqttcommand_mutex);
  1398. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1399. FUNC_EXIT_RC(rc);
  1400. return rc;
  1401. }
  1402. int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
  1403. MQTTAsync_connectionLost* cl,
  1404. MQTTAsync_messageArrived* ma,
  1405. MQTTAsync_deliveryComplete* dc)
  1406. {
  1407. int rc = MQTTASYNC_SUCCESS;
  1408. MQTTAsyncs* m = handle;
  1409. FUNC_ENTRY;
  1410. MQTTAsync_lock_mutex(mqttasync_mutex);
  1411. if (m == NULL || ma == NULL || m->c == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  1412. rc = MQTTASYNC_FAILURE;
  1413. else
  1414. {
  1415. m->clContext = m->maContext = m->dcContext = context;
  1416. m->cl = cl;
  1417. m->ma = ma;
  1418. m->dc = dc;
  1419. }
  1420. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1421. FUNC_EXIT_RC(rc);
  1422. return rc;
  1423. }
  1424. int MQTTAsync_setConnectionLostCallback(MQTTAsync handle, void* context,
  1425. MQTTAsync_connectionLost* cl)
  1426. {
  1427. int rc = MQTTASYNC_SUCCESS;
  1428. MQTTAsyncs* m = handle;
  1429. FUNC_ENTRY;
  1430. MQTTAsync_lock_mutex(mqttasync_mutex);
  1431. if (m == NULL || m->c->connect_state != 0)
  1432. rc = MQTTASYNC_FAILURE;
  1433. else
  1434. {
  1435. m->clContext = context;
  1436. m->cl = cl;
  1437. }
  1438. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1439. FUNC_EXIT_RC(rc);
  1440. return rc;
  1441. }
  1442. int MQTTAsync_setMessageArrivedCallback(MQTTAsync handle, void* context,
  1443. MQTTAsync_messageArrived* ma)
  1444. {
  1445. int rc = MQTTASYNC_SUCCESS;
  1446. MQTTAsyncs* m = handle;
  1447. FUNC_ENTRY;
  1448. MQTTAsync_lock_mutex(mqttasync_mutex);
  1449. if (m == NULL || ma == NULL || m->c->connect_state != 0)
  1450. rc = MQTTASYNC_FAILURE;
  1451. else
  1452. {
  1453. m->maContext = context;
  1454. m->ma = ma;
  1455. }
  1456. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1457. FUNC_EXIT_RC(rc);
  1458. return rc;
  1459. }
  1460. int MQTTAsync_setDeliveryCompleteCallback(MQTTAsync handle, void* context,
  1461. MQTTAsync_deliveryComplete* dc)
  1462. {
  1463. int rc = MQTTASYNC_SUCCESS;
  1464. MQTTAsyncs* m = handle;
  1465. FUNC_ENTRY;
  1466. MQTTAsync_lock_mutex(mqttasync_mutex);
  1467. if (m == NULL || m->c->connect_state != 0)
  1468. rc = MQTTASYNC_FAILURE;
  1469. else
  1470. {
  1471. m->dcContext = context;
  1472. m->dc = dc;
  1473. }
  1474. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1475. FUNC_EXIT_RC(rc);
  1476. return rc;
  1477. }
  1478. int MQTTAsync_setDisconnected(MQTTAsync handle, void* context, MQTTAsync_disconnected* disconnected)
  1479. {
  1480. int rc = MQTTASYNC_SUCCESS;
  1481. MQTTAsyncs* m = handle;
  1482. FUNC_ENTRY;
  1483. MQTTAsync_lock_mutex(mqttasync_mutex);
  1484. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  1485. rc = MQTTASYNC_FAILURE;
  1486. else
  1487. {
  1488. m->disconnected_context = context;
  1489. m->disconnected = disconnected;
  1490. }
  1491. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1492. FUNC_EXIT_RC(rc);
  1493. return rc;
  1494. }
  1495. int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* connected)
  1496. {
  1497. int rc = MQTTASYNC_SUCCESS;
  1498. MQTTAsyncs* m = handle;
  1499. FUNC_ENTRY;
  1500. MQTTAsync_lock_mutex(mqttasync_mutex);
  1501. if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
  1502. rc = MQTTASYNC_FAILURE;
  1503. else
  1504. {
  1505. m->connected_context = context;
  1506. m->connected = connected;
  1507. }
  1508. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1509. FUNC_EXIT_RC(rc);
  1510. return rc;
  1511. }
  1512. int MQTTAsync_setUpdateConnectOptions(MQTTAsync handle, void* context, MQTTAsync_updateConnectOptions* updateOptions)
  1513. {
  1514. int rc = MQTTASYNC_SUCCESS;
  1515. MQTTAsyncs* m = handle;
  1516. FUNC_ENTRY;
  1517. MQTTAsync_lock_mutex(mqttasync_mutex);
  1518. if (m == NULL)
  1519. rc = MQTTASYNC_FAILURE;
  1520. else
  1521. {
  1522. m->updateConnectOptions_context = context;
  1523. m->updateConnectOptions = updateOptions;
  1524. }
  1525. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1526. FUNC_EXIT_RC(rc);
  1527. return rc;
  1528. }
  1529. #if !defined(NO_PERSISTENCE)
  1530. int MQTTAsync_setBeforePersistenceWrite(MQTTAsync handle, void* context, MQTTPersistence_beforeWrite* co)
  1531. {
  1532. int rc = MQTTASYNC_SUCCESS;
  1533. MQTTAsyncs* m = handle;
  1534. FUNC_ENTRY;
  1535. MQTTAsync_lock_mutex(mqttasync_mutex);
  1536. if (m == NULL)
  1537. rc = MQTTASYNC_FAILURE;
  1538. else
  1539. {
  1540. m->c->beforeWrite = co;
  1541. m->c->beforeWrite_context = context;
  1542. }
  1543. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1544. FUNC_EXIT_RC(rc);
  1545. return rc;
  1546. }
  1547. int MQTTAsync_setAfterPersistenceRead(MQTTAsync handle, void* context, MQTTPersistence_afterRead* co)
  1548. {
  1549. int rc = MQTTASYNC_SUCCESS;
  1550. MQTTAsyncs* m = handle;
  1551. FUNC_ENTRY;
  1552. MQTTAsync_lock_mutex(mqttasync_mutex);
  1553. if (m == NULL)
  1554. rc = MQTTASYNC_FAILURE;
  1555. else
  1556. {
  1557. m->c->afterRead = co;
  1558. m->c->afterRead_context = context;
  1559. }
  1560. MQTTAsync_unlock_mutex(mqttasync_mutex);
  1561. FUNC_EXIT_RC(rc);
  1562. return rc;
  1563. }
  1564. #endif
  1565. void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
  1566. {
  1567. Log_setTraceLevel((enum LOG_LEVELS)level);
  1568. }
  1569. void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback* callback)
  1570. {
  1571. Log_setTraceCallback((Log_traceCallback*)callback);
  1572. }
  1573. MQTTAsync_nameValue* MQTTAsync_getVersionInfo(void)
  1574. {
  1575. #define MAX_INFO_STRINGS 8
  1576. static MQTTAsync_nameValue libinfo[MAX_INFO_STRINGS + 1];
  1577. int i = 0;
  1578. libinfo[i].name = "Product name";
  1579. libinfo[i++].value = "Eclipse Paho Asynchronous MQTT C Client Library";
  1580. libinfo[i].name = "Version";
  1581. libinfo[i++].value = CLIENT_VERSION;
  1582. libinfo[i].name = "Build level";
  1583. libinfo[i++].value = BUILD_TIMESTAMP;
  1584. #if defined(OPENSSL)
  1585. libinfo[i].name = "OpenSSL version";
  1586. libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
  1587. libinfo[i].name = "OpenSSL flags";
  1588. libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
  1589. libinfo[i].name = "OpenSSL build timestamp";
  1590. libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
  1591. libinfo[i].name = "OpenSSL platform";
  1592. libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
  1593. libinfo[i].name = "OpenSSL directory";
  1594. libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
  1595. #endif
  1596. libinfo[i].name = NULL;
  1597. libinfo[i].value = NULL;
  1598. return libinfo;
  1599. }
  1600. const char* MQTTAsync_strerror(int code)
  1601. {
  1602. static char buf[30];
  1603. int chars = 0;
  1604. switch (code) {
  1605. case MQTTASYNC_SUCCESS:
  1606. return "Success";
  1607. case MQTTASYNC_FAILURE:
  1608. return "Failure";
  1609. case MQTTASYNC_PERSISTENCE_ERROR:
  1610. return "Persistence error";
  1611. case MQTTASYNC_DISCONNECTED:
  1612. return "Disconnected";
  1613. case MQTTASYNC_MAX_MESSAGES_INFLIGHT:
  1614. return "Maximum in-flight messages amount reached";
  1615. case MQTTASYNC_BAD_UTF8_STRING:
  1616. return "Invalid UTF8 string";
  1617. case MQTTASYNC_NULL_PARAMETER:
  1618. return "Invalid (NULL) parameter";
  1619. case MQTTASYNC_TOPICNAME_TRUNCATED:
  1620. return "Topic containing NULL characters has been truncated";
  1621. case MQTTASYNC_BAD_STRUCTURE:
  1622. return "Bad structure";
  1623. case MQTTASYNC_BAD_QOS:
  1624. return "Invalid QoS value";
  1625. case MQTTASYNC_NO_MORE_MSGIDS:
  1626. return "Too many pending commands";
  1627. case MQTTASYNC_OPERATION_INCOMPLETE:
  1628. return "Operation discarded before completion";
  1629. case MQTTASYNC_MAX_BUFFERED_MESSAGES:
  1630. return "No more messages can be buffered";
  1631. case MQTTASYNC_SSL_NOT_SUPPORTED:
  1632. return "SSL is not supported";
  1633. case MQTTASYNC_BAD_PROTOCOL:
  1634. return "Invalid protocol scheme";
  1635. case MQTTASYNC_BAD_MQTT_OPTION:
  1636. return "Options for wrong MQTT version";
  1637. case MQTTASYNC_WRONG_MQTT_VERSION:
  1638. return "Client created for another version of MQTT";
  1639. case MQTTASYNC_0_LEN_WILL_TOPIC:
  1640. return "Zero length will topic on connect";
  1641. case MQTTASYNC_COMMAND_IGNORED:
  1642. return "Connect or disconnect command ignored";
  1643. case MQTTASYNC_MAX_BUFFERED:
  1644. return "maxBufferedMessages in the connect options must be >= 0";
  1645. }
  1646. chars = snprintf(buf, sizeof(buf), "Unknown error code %d", code);
  1647. if (chars >= sizeof(buf))
  1648. {
  1649. buf[sizeof(buf)-1] = '\0';
  1650. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  1651. }
  1652. return buf;
  1653. }
  1654. void MQTTAsync_freeMessage(MQTTAsync_message** message)
  1655. {
  1656. FUNC_ENTRY;
  1657. MQTTProperties_free(&(*message)->properties);
  1658. free((*message)->payload);
  1659. free(*message);
  1660. *message = NULL;
  1661. FUNC_EXIT;
  1662. }
  1663. void MQTTAsync_free(void* memory)
  1664. {
  1665. FUNC_ENTRY;
  1666. free(memory);
  1667. FUNC_EXIT;
  1668. }
  1669. void* MQTTAsync_malloc(size_t size)
  1670. {
  1671. void* val;
  1672. int rc = 0;
  1673. FUNC_ENTRY;
  1674. val = malloc(size);
  1675. rc = (val != NULL);
  1676. FUNC_EXIT_RC(rc);
  1677. return val;
  1678. }
  1679. static void MQTTAsync_freeServerURIs(MQTTAsyncs* m)
  1680. {
  1681. int i;
  1682. for (i = 0; i < m->serverURIcount; ++i)
  1683. free(m->serverURIs[i]);
  1684. m->serverURIcount = 0;
  1685. if (m->serverURIs)
  1686. free(m->serverURIs);
  1687. m->serverURIs = NULL;
  1688. }