MQTTPersistence.c 25 KB


  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2023 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - async client updates
  16. * Ian Craggs - fix for bug 432903 - queue persistence
  17. * Ian Craggs - MQTT V5 updates
  18. *******************************************************************************/
  19. /**
  20. * @file
  21. * \brief Functions that apply to persistence operations.
  22. *
  23. */
  24. #include <stdio.h>
  25. #include <string.h>
  26. #include "MQTTPersistence.h"
  27. #include "MQTTPersistenceDefault.h"
  28. #include "MQTTProtocolClient.h"
  29. #include "Heap.h"
  30. #if defined(_WIN32) || defined(_WIN64)
  31. #define snprintf _snprintf
  32. #endif
  33. static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion);
  34. static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size);
  35. /**
  36. * Creates a ::MQTTClient_persistence structure representing a persistence implementation.
  37. * @param persistence the ::MQTTClient_persistence structure.
  38. * @param type the type of the persistence implementation. See ::MQTTClient_create.
  39. * @param pcontext the context for this persistence implementation. See ::MQTTClient_create.
  40. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  41. */
  42. #include "StackTrace.h"
  43. int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext)
  44. {
  45. int rc = 0;
  46. MQTTClient_persistence* per = NULL;
  47. FUNC_ENTRY;
  48. #if !defined(NO_PERSISTENCE)
  49. switch (type)
  50. {
  51. case MQTTCLIENT_PERSISTENCE_NONE :
  52. per = NULL;
  53. break;
  54. case MQTTCLIENT_PERSISTENCE_DEFAULT :
  55. per = malloc(sizeof(MQTTClient_persistence));
  56. if ( per != NULL )
  57. {
  58. if ( pcontext == NULL )
  59. pcontext = "."; /* working directory */
  60. if ((per->context = malloc(strlen(pcontext) + 1)) == NULL)
  61. {
  62. free(per);
  63. rc = PAHO_MEMORY_ERROR;
  64. goto exit;
  65. }
  66. strcpy(per->context, pcontext);
  67. /* file system functions */
  68. per->popen = pstopen;
  69. per->pclose = pstclose;
  70. per->pput = pstput;
  71. per->pget = pstget;
  72. per->premove = pstremove;
  73. per->pkeys = pstkeys;
  74. per->pclear = pstclear;
  75. per->pcontainskey = pstcontainskey;
  76. }
  77. else
  78. rc = PAHO_MEMORY_ERROR;
  79. break;
  80. case MQTTCLIENT_PERSISTENCE_USER :
  81. per = (MQTTClient_persistence *)pcontext;
  82. if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL ||
  83. per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL ||
  84. per->popen == NULL || per->pput == NULL || per->premove == NULL)) )
  85. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  86. break;
  87. default:
  88. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  89. break;
  90. }
  91. #endif
  92. *persistence = per;
  93. exit:
  94. FUNC_EXIT_RC(rc);
  95. return rc;
  96. }
  97. /**
  98. * Open persistent store and restore any persisted messages.
  99. * @param client the client as ::Clients.
  100. * @param serverURI the URI of the remote end.
  101. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  102. */
  103. int MQTTPersistence_initialize(Clients *c, const char *serverURI)
  104. {
  105. int rc = 0;
  106. FUNC_ENTRY;
  107. if ( c->persistence != NULL )
  108. {
  109. rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context);
  110. if ( rc == 0 )
  111. rc = MQTTPersistence_restorePackets(c);
  112. }
  113. FUNC_EXIT_RC(rc);
  114. return rc;
  115. }
  116. /**
  117. * Close persistent store.
  118. * @param client the client as ::Clients.
  119. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  120. */
  121. int MQTTPersistence_close(Clients *c)
  122. {
  123. int rc = 0;
  124. FUNC_ENTRY;
  125. #if !defined(NO_PERSISTENCE)
  126. if (c->persistence != NULL)
  127. {
  128. rc = c->persistence->pclose(c->phandle);
  129. if (c->persistence->popen == pstopen) {
  130. if (c->persistence->context)
  131. free(c->persistence->context);
  132. free(c->persistence);
  133. }
  134. c->phandle = NULL;
  135. c->persistence = NULL;
  136. }
  137. #endif
  138. FUNC_EXIT_RC(rc);
  139. return rc;
  140. }
  141. /**
  142. * Clears the persistent store.
  143. * @param client the client as ::Clients.
  144. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  145. */
  146. int MQTTPersistence_clear(Clients *c)
  147. {
  148. int rc = 0;
  149. FUNC_ENTRY;
  150. if (c->persistence != NULL)
  151. rc = c->persistence->pclear(c->phandle);
  152. FUNC_EXIT_RC(rc);
  153. return rc;
  154. }
  155. /**
  156. * Restores the persisted records to the outbound and inbound message queues of the
  157. * client.
  158. * @param client the client as ::Clients.
  159. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  160. */
  161. int MQTTPersistence_restorePackets(Clients *c)
  162. {
  163. int rc = 0;
  164. char **msgkeys = NULL,
  165. *buffer = NULL;
  166. int nkeys = 0, buflen;
  167. int i = 0;
  168. int msgs_sent = 0;
  169. int msgs_rcvd = 0;
  170. FUNC_ENTRY;
  171. if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
  172. {
  173. while (rc == 0 && i < nkeys)
  174. {
  175. if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0 ||
  176. strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
  177. {
  178. ;
  179. }
  180. else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0 ||
  181. strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
  182. {
  183. ;
  184. }
  185. else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
  186. (c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
  187. {
  188. int data_MQTTVersion = MQTTVERSION_3_1_1;
  189. char* cur_key = msgkeys[i];
  190. MQTTPacket* pack = NULL;
  191. if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_RECEIVED,
  192. strlen(PERSISTENCE_V5_PUBLISH_RECEIVED)) == 0)
  193. {
  194. data_MQTTVersion = MQTTVERSION_5;
  195. cur_key = PERSISTENCE_PUBLISH_RECEIVED;
  196. }
  197. else if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_SENT,
  198. strlen(PERSISTENCE_V5_PUBLISH_SENT)) == 0)
  199. {
  200. data_MQTTVersion = MQTTVERSION_5;
  201. cur_key = PERSISTENCE_PUBLISH_SENT;
  202. }
  203. else if (strncmp(cur_key, PERSISTENCE_V5_PUBREL,
  204. strlen(PERSISTENCE_V5_PUBREL)) == 0)
  205. {
  206. data_MQTTVersion = MQTTVERSION_5;
  207. cur_key = PERSISTENCE_PUBREL;
  208. }
  209. if (data_MQTTVersion == MQTTVERSION_5 && c->MQTTVersion < MQTTVERSION_5)
  210. {
  211. rc = MQTTCLIENT_PERSISTENCE_ERROR; /* can't restore version 5 data with a version 3 client */
  212. goto exit;
  213. }
  214. pack = MQTTPersistence_restorePacket(data_MQTTVersion, buffer, buflen);
  215. if ( pack != NULL )
  216. {
  217. if (strncmp(cur_key, PERSISTENCE_PUBLISH_RECEIVED,
  218. strlen(PERSISTENCE_PUBLISH_RECEIVED)) == 0)
  219. {
  220. Publish* publish = (Publish*)pack;
  221. Messages* msg = NULL;
  222. publish->MQTTVersion = c->MQTTVersion;
  223. msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
  224. msg->nextMessageType = PUBREL;
  225. /* order does not matter for persisted received messages */
  226. ListAppend(c->inboundMsgs, msg, msg->len);
  227. if (c->MQTTVersion >= MQTTVERSION_5)
  228. {
  229. free(msg->publish->payload);
  230. free(msg->publish->topic);
  231. msg->publish->payload = msg->publish->topic = NULL;
  232. }
  233. publish->topic = NULL;
  234. MQTTPacket_freePublish(publish);
  235. msgs_rcvd++;
  236. }
  237. else if (strncmp(cur_key, PERSISTENCE_PUBLISH_SENT,
  238. strlen(PERSISTENCE_PUBLISH_SENT)) == 0)
  239. {
  240. Publish* publish = (Publish*)pack;
  241. Messages* msg = NULL;
  242. const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
  243. char *key = malloc(keysize);
  244. int chars = 0;
  245. if (!key)
  246. {
  247. rc = PAHO_MEMORY_ERROR;
  248. goto exit;
  249. }
  250. publish->MQTTVersion = c->MQTTVersion;
  251. if (publish->MQTTVersion >= MQTTVERSION_5)
  252. chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, publish->msgId);
  253. else
  254. chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, publish->msgId);
  255. if (chars >= keysize)
  256. {
  257. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  258. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  259. }
  260. else
  261. {
  262. msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
  263. if (c->persistence->pcontainskey(c->phandle, key) == 0)
  264. /* PUBLISH Qo2 and PUBREL sent */
  265. msg->nextMessageType = PUBCOMP;
  266. /* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */
  267. /* retry at the first opportunity */
  268. memset(&msg->lastTouch, '\0', sizeof(msg->lastTouch));
  269. MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len);
  270. publish->topic = NULL;
  271. MQTTPacket_freePublish(publish);
  272. msgs_sent++;
  273. }
  274. free(key);
  275. }
  276. else if (strncmp(cur_key, PERSISTENCE_PUBREL, strlen(PERSISTENCE_PUBREL)) == 0)
  277. {
  278. /* orphaned PUBRELs ? */
  279. Pubrel* pubrel = (Pubrel*)pack;
  280. const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
  281. char *key = malloc(keysize);
  282. int chars = 0;
  283. if (!key)
  284. {
  285. rc = PAHO_MEMORY_ERROR;
  286. goto exit;
  287. }
  288. pubrel->MQTTVersion = c->MQTTVersion;
  289. if (pubrel->MQTTVersion >= MQTTVERSION_5)
  290. chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, pubrel->msgId);
  291. else
  292. chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
  293. if (chars >= keysize)
  294. {
  295. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  296. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  297. }
  298. else if (c->persistence->pcontainskey(c->phandle, key) != 0)
  299. rc = c->persistence->premove(c->phandle, msgkeys[i]);
  300. free(pubrel);
  301. free(key);
  302. }
  303. }
  304. else /* pack == NULL -> bad persisted record */
  305. rc = c->persistence->premove(c->phandle, msgkeys[i]);
  306. }
  307. if (buffer)
  308. {
  309. free(buffer);
  310. buffer = NULL;
  311. }
  312. if (msgkeys[i])
  313. {
  314. free(msgkeys[i]);
  315. msgkeys[i] = NULL;
  316. }
  317. i++;
  318. }
  319. }
  320. Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n",
  321. msgs_sent, msgs_rcvd, c->clientID);
  322. MQTTPersistence_wrapMsgID(c);
  323. exit:
  324. if (msgkeys)
  325. {
  326. for (i = 0; i < nkeys; ++i)
  327. {
  328. if (msgkeys[i])
  329. free(msgkeys[i]);
  330. }
  331. free(msgkeys);
  332. }
  333. if (buffer)
  334. free(buffer);
  335. FUNC_EXIT_RC(rc);
  336. return rc;
  337. }
  338. /**
  339. * Returns a MQTT packet restored from persisted data.
  340. * @param buffer the persisted data.
  341. * @param buflen the number of bytes of the data buffer.
  342. */
  343. void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen)
  344. {
  345. void* pack = NULL;
  346. Header header;
  347. int fixed_header_length = 1, ptype, remaining_length = 0;
  348. char c;
  349. int multiplier = 1;
  350. extern pf new_packets[];
  351. FUNC_ENTRY;
  352. header.byte = buffer[0];
  353. /* decode the message length according to the MQTT algorithm */
  354. do
  355. {
  356. c = *(++buffer);
  357. remaining_length += (c & 127) * multiplier;
  358. multiplier *= 128;
  359. fixed_header_length++;
  360. } while ((c & 128) != 0);
  361. if ( (fixed_header_length + remaining_length) == buflen )
  362. {
  363. ptype = header.bits.type;
  364. if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
  365. pack = (*new_packets[ptype])(MQTTVersion, header.byte, ++buffer, remaining_length);
  366. }
  367. FUNC_EXIT;
  368. return pack;
  369. }
  370. /**
  371. * Inserts the specified message into the list, maintaining message ID order.
  372. * @param list the list to insert the message into.
  373. * @param content the message to add.
  374. * @param size size of the message.
  375. */
  376. void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
  377. {
  378. ListElement* index = NULL;
  379. ListElement* current = NULL;
  380. FUNC_ENTRY;
  381. while(ListNextElement(list, &current) != NULL && index == NULL)
  382. {
  383. if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid )
  384. index = current;
  385. }
  386. ListInsert(list, content, size, index);
  387. FUNC_EXIT;
  388. }
  389. /**
  390. * Adds a record to the persistent store. This function must not be called for QoS0
  391. * messages.
  392. * @param socket the socket of the client.
  393. * @param buf0 fixed header.
  394. * @param buf0len length of the fixed header.
  395. * @param count number of buffers representing the variable header and/or the payload.
  396. * @param buffers the buffers representing the variable header and/or the payload.
  397. * @param buflens length of the buffers representing the variable header and/or the payload.
  398. * @param htype MQTT packet type - PUBLISH or PUBREL
  399. * @param msgId the message ID.
  400. * @param scr 0 indicates message in the sending direction; 1 indicates message in the
  401. * receiving direction.
  402. * @param the MQTT version being used (>= MQTTVERSION_5 means properties included)
  403. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  404. */
  405. int MQTTPersistence_putPacket(SOCKET socket, char* buf0, size_t buf0len, int count,
  406. char** buffers, size_t* buflens, int htype, int msgId, int scr, int MQTTVersion)
  407. {
  408. int rc = 0;
  409. extern ClientStates* bstate;
  410. int nbufs, i;
  411. int* lens = NULL;
  412. char** bufs = NULL;
  413. char *key;
  414. Clients* client = NULL;
  415. FUNC_ENTRY;
  416. client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
  417. if (client->persistence != NULL)
  418. {
  419. const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
  420. if ((key = malloc(keysize)) == NULL)
  421. {
  422. rc = PAHO_MEMORY_ERROR;
  423. goto exit;
  424. }
  425. nbufs = 1 + count;
  426. if ((lens = (int *)malloc(nbufs * sizeof(int))) == NULL)
  427. {
  428. free(key);
  429. rc = PAHO_MEMORY_ERROR;
  430. goto exit;
  431. }
  432. if ((bufs = (char **)malloc(nbufs * sizeof(char *))) == NULL)
  433. {
  434. free(key);
  435. free(lens);
  436. rc = PAHO_MEMORY_ERROR;
  437. goto exit;
  438. }
  439. lens[0] = (int)buf0len;
  440. bufs[0] = buf0;
  441. for (i = 0; i < count; i++)
  442. {
  443. lens[i+1] = (int)buflens[i];
  444. bufs[i+1] = buffers[i];
  445. }
  446. /* key */
  447. if (scr == 0)
  448. { /* sending */
  449. char* key_id = PERSISTENCE_PUBLISH_SENT;
  450. if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/
  451. {
  452. if (MQTTVersion >= MQTTVERSION_5)
  453. key_id = PERSISTENCE_V5_PUBLISH_SENT;
  454. }
  455. else if (htype == PUBREL) /* PUBREL */
  456. {
  457. if (MQTTVersion >= MQTTVERSION_5)
  458. key_id = PERSISTENCE_V5_PUBREL;
  459. else
  460. key_id = PERSISTENCE_PUBREL;
  461. }
  462. if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
  463. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  464. }
  465. else if (scr == 1) /* receiving PUBLISH QoS2 */
  466. {
  467. char* key_id = PERSISTENCE_PUBLISH_RECEIVED;
  468. if (MQTTVersion >= MQTTVERSION_5)
  469. key_id = PERSISTENCE_V5_PUBLISH_RECEIVED;
  470. if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
  471. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  472. }
  473. if (rc == 0 && client->beforeWrite)
  474. rc = client->beforeWrite(client->beforeWrite_context, nbufs, bufs, lens);
  475. if (rc == 0)
  476. rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens);
  477. free(key);
  478. free(lens);
  479. free(bufs);
  480. }
  481. exit:
  482. FUNC_EXIT_RC(rc);
  483. return rc;
  484. }
  485. /**
  486. * Deletes a record from the persistent store.
  487. * @param client the client as ::Clients.
  488. * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL
  489. * or #PERSISTENCE_PUBLISH_RECEIVED.
  490. * @param qos the qos field of the message.
  491. * @param msgId the message ID.
  492. * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
  493. */
  494. int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
  495. {
  496. int rc = 0;
  497. FUNC_ENTRY;
  498. if (c->persistence != NULL)
  499. {
  500. const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
  501. char *key = malloc(keysize);
  502. int chars = 0;
  503. if (!key)
  504. {
  505. rc = PAHO_MEMORY_ERROR;
  506. goto exit;
  507. }
  508. if (strcmp(type, PERSISTENCE_PUBLISH_SENT) == 0 ||
  509. strcmp(type, PERSISTENCE_V5_PUBLISH_SENT) == 0)
  510. {
  511. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, msgId)) >= keysize)
  512. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  513. else
  514. {
  515. rc = c->persistence->premove(c->phandle, key);
  516. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, msgId)) >= keysize)
  517. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  518. else
  519. {
  520. rc += c->persistence->premove(c->phandle, key);
  521. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId)) >= keysize)
  522. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  523. else
  524. {
  525. rc += c->persistence->premove(c->phandle, key);
  526. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, msgId)) >= keysize)
  527. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  528. else
  529. rc += c->persistence->premove(c->phandle, key);
  530. }
  531. }
  532. }
  533. }
  534. else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
  535. { /* or PERSISTENCE_PUBLISH_RECEIVED */
  536. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_RECEIVED, msgId)) >= keysize)
  537. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  538. else
  539. {
  540. rc = c->persistence->premove(c->phandle, key);
  541. if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId)) >= keysize)
  542. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  543. else
  544. rc += c->persistence->premove(c->phandle, key);
  545. }
  546. }
  547. if (rc == MQTTCLIENT_PERSISTENCE_ERROR)
  548. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  549. free(key);
  550. }
  551. exit:
  552. FUNC_EXIT_RC(rc);
  553. return rc;
  554. }
  555. /**
  556. * Checks whether the message IDs wrapped by looking for the largest gap between two consecutive
  557. * message IDs in the outboundMsgs queue.
  558. * @param client the client as ::Clients.
  559. */
  560. void MQTTPersistence_wrapMsgID(Clients *client)
  561. {
  562. ListElement* wrapel = NULL;
  563. ListElement* current = NULL;
  564. FUNC_ENTRY;
  565. if ( client->outboundMsgs->count > 0 )
  566. {
  567. int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid;
  568. int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid;
  569. int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
  570. current = ListNextElement(client->outboundMsgs, &current);
  571. while(ListNextElement(client->outboundMsgs, &current) != NULL)
  572. {
  573. int curMsgID = ((Messages*)current->content)->msgid;
  574. int curPrevMsgID = ((Messages*)current->prev->content)->msgid;
  575. int curgap = curMsgID - curPrevMsgID;
  576. if ( curgap > gap )
  577. {
  578. gap = curgap;
  579. wrapel = current;
  580. }
  581. }
  582. }
  583. if ( wrapel != NULL )
  584. {
  585. /* put wrapel at the beginning of the queue */
  586. client->outboundMsgs->first->prev = client->outboundMsgs->last;
  587. client->outboundMsgs->last->next = client->outboundMsgs->first;
  588. client->outboundMsgs->first = wrapel;
  589. client->outboundMsgs->last = wrapel->prev;
  590. client->outboundMsgs->first->prev = NULL;
  591. client->outboundMsgs->last->next = NULL;
  592. }
  593. FUNC_EXIT;
  594. }
  595. #if !defined(NO_PERSISTENCE)
  596. int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe)
  597. {
  598. int rc = 0;
  599. #if defined(_WIN32) || defined(_WIN64)
  600. #define KEYSIZE PERSISTENCE_MAX_KEY_LENGTH + 1
  601. #else
  602. const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
  603. #endif
  604. char key[KEYSIZE];
  605. int chars = 0;
  606. FUNC_ENTRY;
  607. if (client->MQTTVersion >= MQTTVERSION_5)
  608. chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, qe->seqno);
  609. else
  610. chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
  611. if (chars >= KEYSIZE)
  612. {
  613. Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
  614. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  615. }
  616. else if ((rc = client->persistence->premove(client->phandle, key)) != 0)
  617. Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
  618. FUNC_EXIT_RC(rc);
  619. return rc;
  620. }
  621. #define MAX_NO_OF_BUFFERS 9
  622. int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe)
  623. {
  624. int rc = 0;
  625. int bufindex = 0;
  626. #if !defined(_WIN32) && !defined(_WIN64)
  627. const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
  628. #endif
  629. char key[KEYSIZE];
  630. int chars = 0;
  631. int lens[MAX_NO_OF_BUFFERS];
  632. void* bufs[MAX_NO_OF_BUFFERS];
  633. int props_allocated = 0;
  634. FUNC_ENTRY;
  635. bufs[bufindex] = &qe->msg->payloadlen;
  636. lens[bufindex++] = sizeof(qe->msg->payloadlen);
  637. bufs[bufindex] = qe->msg->payload;
  638. lens[bufindex++] = qe->msg->payloadlen;
  639. bufs[bufindex] = &qe->msg->qos;
  640. lens[bufindex++] = sizeof(qe->msg->qos);
  641. bufs[bufindex] = &qe->msg->retained;
  642. lens[bufindex++] = sizeof(qe->msg->retained);
  643. bufs[bufindex] = &qe->msg->dup;
  644. lens[bufindex++] = sizeof(qe->msg->dup);
  645. bufs[bufindex] = &qe->msg->msgid;
  646. lens[bufindex++] = sizeof(qe->msg->msgid);
  647. bufs[bufindex] = qe->topicName;
  648. lens[bufindex++] = (int)strlen(qe->topicName) + 1;
  649. bufs[bufindex] = &qe->topicLen;
  650. lens[bufindex++] = sizeof(qe->topicLen);
  651. if (++aclient->qentry_seqno == PERSISTENCE_SEQNO_LIMIT)
  652. aclient->qentry_seqno = 0;
  653. if (aclient->MQTTVersion >= MQTTVERSION_5) /* persist properties */
  654. {
  655. MQTTProperties no_props = MQTTProperties_initializer;
  656. MQTTProperties* props = &no_props;
  657. int temp_len = 0;
  658. char* ptr = NULL;
  659. if (qe->msg->struct_version >= 1)
  660. props = &qe->msg->properties;
  661. temp_len = MQTTProperties_len(props);
  662. ptr = bufs[bufindex] = malloc(temp_len);
  663. if (!ptr)
  664. {
  665. rc = PAHO_MEMORY_ERROR;
  666. goto exit;
  667. }
  668. props_allocated = bufindex;
  669. rc = MQTTProperties_write(&ptr, props);
  670. lens[bufindex++] = temp_len;
  671. chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, aclient->qentry_seqno);
  672. }
  673. else
  674. chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, aclient->qentry_seqno);
  675. if (chars >= KEYSIZE)
  676. rc = MQTTCLIENT_PERSISTENCE_ERROR;
  677. else
  678. {
  679. qe->seqno = aclient->qentry_seqno;
  680. if (aclient->beforeWrite)
  681. rc = aclient->beforeWrite(aclient->beforeWrite_context, bufindex, (char**)bufs, lens);
  682. if (rc == 0 && (rc = aclient->persistence->pput(aclient->phandle, key, bufindex, (char**)bufs, lens)) != 0)
  683. Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
  684. }
  685. if (props_allocated != 0)
  686. free(bufs[props_allocated]);
  687. exit:
  688. FUNC_EXIT_RC(rc);
  689. return rc;
  690. }
  691. static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion)
  692. {
  693. MQTTPersistence_qEntry* qe = NULL;
  694. char* ptr = buffer;
  695. int data_size;
  696. FUNC_ENTRY;
  697. if ((qe = malloc(sizeof(MQTTPersistence_qEntry))) == NULL)
  698. goto exit;
  699. memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
  700. if ((qe->msg = malloc(sizeof(MQTTPersistence_message))) == NULL)
  701. {
  702. free(qe);
  703. qe = NULL;
  704. goto exit;
  705. }
  706. memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
  707. qe->msg->struct_version = 1;
  708. qe->msg->payloadlen = *(int*)ptr;
  709. ptr += sizeof(int);
  710. data_size = qe->msg->payloadlen;
  711. if ((qe->msg->payload = malloc(data_size)) == NULL)
  712. {
  713. free(qe->msg);
  714. free(qe);
  715. qe = NULL;
  716. goto exit;
  717. }
  718. memcpy(qe->msg->payload, ptr, data_size);
  719. ptr += data_size;
  720. qe->msg->qos = *(int*)ptr;
  721. ptr += sizeof(int);
  722. qe->msg->retained = *(int*)ptr;
  723. ptr += sizeof(int);
  724. qe->msg->dup = *(int*)ptr;
  725. ptr += sizeof(int);
  726. qe->msg->msgid = *(int*)ptr;
  727. ptr += sizeof(int);
  728. data_size = (int)strlen(ptr) + 1;
  729. if ((qe->topicName = malloc(data_size)) == NULL)
  730. {
  731. free(qe->msg->payload);
  732. free(qe->msg);
  733. free(qe);
  734. qe = NULL;
  735. goto exit;
  736. }
  737. strcpy(qe->topicName, ptr);
  738. ptr += data_size;
  739. qe->topicLen = *(int*)ptr;
  740. ptr += sizeof(int);
  741. if (MQTTVersion >= MQTTVERSION_5 &&
  742. MQTTProperties_read(&qe->msg->properties, &ptr, buffer + buflen) != 1)
  743. Log(LOG_ERROR, -1, "Error restoring properties from persistence");
  744. exit:
  745. FUNC_EXIT;
  746. return qe;
  747. }
  748. static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size)
  749. {
  750. ListElement* index = NULL;
  751. ListElement* current = NULL;
  752. FUNC_ENTRY;
  753. while (ListNextElement(list, &current) != NULL && index == NULL)
  754. {
  755. if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
  756. index = current;
  757. }
  758. ListInsert(list, qEntry, size, index);
  759. FUNC_EXIT;
  760. }
  761. /**
  762. * Restores a queue of messages from persistence to memory
  763. * @param c the client as ::Clients - the client object to restore the messages to
  764. * @return return code, 0 if successful
  765. */
  766. int MQTTPersistence_restoreMessageQueue(Clients* c)
  767. {
  768. int rc = 0;
  769. char **msgkeys;
  770. int nkeys;
  771. int i = 0;
  772. int entries_restored = 0;
  773. FUNC_ENTRY;
  774. if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
  775. {
  776. while (rc == 0 && i < nkeys)
  777. {
  778. char *buffer = NULL;
  779. int buflen;
  780. if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0 &&
  781. strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) != 0)
  782. {
  783. ; /* ignore if not a queue entry key */
  784. }
  785. else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
  786. (c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
  787. {
  788. int MQTTVersion =
  789. (strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
  790. ? MQTTVERSION_5 : MQTTVERSION_3_1_1;
  791. MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen, MQTTVersion);
  792. if (qe)
  793. {
  794. qe->seqno = atoi(strchr(msgkeys[i], '-')+1); /* key format is tag'-'seqno */
  795. MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry));
  796. c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
  797. entries_restored++;
  798. }
  799. if (buffer)
  800. free(buffer);
  801. }
  802. if (msgkeys[i])
  803. {
  804. free(msgkeys[i]);
  805. }
  806. i++;
  807. }
  808. if (msgkeys != NULL)
  809. free(msgkeys);
  810. }
  811. Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
  812. FUNC_EXIT_RC(rc);
  813. return rc;
  814. }
  815. #endif