MQTTPacket.c 29 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2024 IBM Corp. and Ian Craggs
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL updates
  16. * Ian Craggs - MQTT 3.1.1 support
  17. * Ian Craggs - fix for issue 453
  18. * Ian Craggs - MQTT 5.0 support
  19. *******************************************************************************/
  20. /**
  21. * @file
  22. * \brief functions to deal with reading and writing of MQTT packets from and to sockets
  23. *
  24. * Some other related functions are in the MQTTPacketOut module
  25. */
  26. #include "MQTTPacket.h"
  27. #include "Log.h"
  28. #if !defined(NO_PERSISTENCE)
  29. #include "MQTTPersistence.h"
  30. #endif
  31. #include <ctype.h>
  32. #include "Messages.h"
  33. #include "StackTrace.h"
  34. #include "WebSocket.h"
  35. #include "MQTTTime.h"
  36. #include <stdlib.h>
  37. #include <string.h>
  38. #include "Heap.h"
  39. #if !defined(min)
  40. #define min(A,B) ( (A) < (B) ? (A):(B))
  41. #endif
  42. /**
  43. * List of the predefined MQTT v3/v5 packet names.
  44. */
  45. static const char *packet_names[] =
  46. {
  47. "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
  48. "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
  49. "PINGREQ", "PINGRESP", "DISCONNECT", "AUTH"
  50. };
  51. const char** MQTTClient_packet_names = packet_names;
  52. /**
  53. * Converts an MQTT packet code into its name
  54. * @param ptype packet code
  55. * @return the corresponding string, or "UNKNOWN"
  56. */
  57. const char* MQTTPacket_name(int ptype)
  58. {
  59. return (ptype >= 0 && ptype <= AUTH) ? packet_names[ptype] : "UNKNOWN";
  60. }
  61. /**
  62. * Array of functions to build packets, indexed according to packet code
  63. */
  64. pf new_packets[] =
  65. {
  66. NULL, /**< reserved */
  67. NULL, /**< MQTTPacket_connect*/
  68. MQTTPacket_connack, /**< CONNACK */
  69. MQTTPacket_publish, /**< PUBLISH */
  70. MQTTPacket_ack, /**< PUBACK */
  71. MQTTPacket_ack, /**< PUBREC */
  72. MQTTPacket_ack, /**< PUBREL */
  73. MQTTPacket_ack, /**< PUBCOMP */
  74. NULL, /**< MQTTPacket_subscribe*/
  75. MQTTPacket_suback, /**< SUBACK */
  76. NULL, /**< MQTTPacket_unsubscribe*/
  77. MQTTPacket_unsuback, /**< UNSUBACK */
  78. MQTTPacket_header_only, /**< PINGREQ */
  79. MQTTPacket_header_only, /**< PINGRESP */
  80. MQTTPacket_ack, /**< DISCONNECT */
  81. MQTTPacket_ack /**< AUTH */
  82. };
  83. static char* readUTFlen(char** pptr, char* enddata, int* len);
  84. static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net);
  85. /**
  86. * Reads one MQTT packet from a socket.
  87. * @param socket a socket from which to read an MQTT packet
  88. * @param error pointer to the error code which is completed if no packet is returned
  89. * @return the packet structure or NULL if there was an error
  90. */
  91. void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error)
  92. {
  93. char* data = NULL;
  94. static Header header;
  95. size_t remaining_length;
  96. int ptype;
  97. void* pack = NULL;
  98. size_t actual_len = 0;
  99. FUNC_ENTRY;
  100. *error = SOCKET_ERROR; /* indicate whether an error occurred, or not */
  101. const size_t headerWsFramePos = WebSocket_framePos();
  102. /* read the packet data from the socket */
  103. *error = WebSocket_getch(net, &header.byte);
  104. if (*error != TCPSOCKET_COMPLETE) /* first byte is the header byte */
  105. goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
  106. /* now read the remaining length, so we know how much more to read */
  107. if ((*error = MQTTPacket_decode(net, &remaining_length)) != TCPSOCKET_COMPLETE)
  108. goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
  109. /* now read the rest, the variable header and payload */
  110. data = WebSocket_getdata(net, remaining_length, &actual_len);
  111. if (remaining_length && data == NULL)
  112. {
  113. *error = SOCKET_ERROR;
  114. goto exit; /* socket error */
  115. }
  116. if (actual_len < remaining_length)
  117. {
  118. *error = TCPSOCKET_INTERRUPTED;
  119. net->lastReceived = MQTTTime_now();
  120. }
  121. else
  122. {
  123. ptype = header.bits.type;
  124. if (ptype < CONNECT || (MQTTVersion < MQTTVERSION_5 && ptype >= DISCONNECT) ||
  125. (MQTTVersion >= MQTTVERSION_5 && ptype > AUTH) ||
  126. new_packets[ptype] == NULL)
  127. Log(TRACE_MIN, 2, NULL, ptype);
  128. else
  129. {
  130. if ((pack = (*new_packets[ptype])(MQTTVersion, header.byte, data, remaining_length)) == NULL)
  131. {
  132. *error = SOCKET_ERROR; // was BAD_MQTT_PACKET;
  133. Log(LOG_ERROR, -1, "Bad MQTT packet, type %d", ptype);
  134. }
  135. #if !defined(NO_PERSISTENCE)
  136. else if (header.bits.type == PUBLISH && header.bits.qos == 2)
  137. {
  138. int buf0len;
  139. char *buf = malloc(10);
  140. if (buf == NULL)
  141. {
  142. *error = SOCKET_ERROR;
  143. goto exit;
  144. }
  145. buf[0] = header.byte;
  146. buf0len = 1 + MQTTPacket_encode(&buf[1], remaining_length);
  147. *error = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1,
  148. &data, &remaining_length, header.bits.type, ((Publish *)pack)->msgId, 1, MQTTVersion);
  149. free(buf);
  150. }
  151. #endif
  152. }
  153. }
  154. if (pack)
  155. net->lastReceived = MQTTTime_now();
  156. exit:
  157. if (*error == TCPSOCKET_INTERRUPTED)
  158. WebSocket_framePosSeekTo(headerWsFramePos);
  159. FUNC_EXIT_RC(*error);
  160. return pack;
  161. }
  162. /**
  163. * Sends an MQTT packet in one system call write
  164. * @param socket the socket to which to write the data
  165. * @param header the one-byte MQTT header
  166. * @param buffer the rest of the buffer to write (not including remaining length)
  167. * @param buflen the length of the data in buffer to be written
  168. * @param MQTTVersion the version of MQTT being used
  169. * @return the completion code (TCPSOCKET_COMPLETE etc)
  170. */
  171. int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int freeData,
  172. int MQTTVersion)
  173. {
  174. int rc = SOCKET_ERROR;
  175. size_t buf0len;
  176. char *buf;
  177. PacketBuffers packetbufs;
  178. FUNC_ENTRY;
  179. buf0len = 1 + MQTTPacket_encode(NULL, buflen);
  180. buf = malloc(buf0len);
  181. if (buf == NULL)
  182. {
  183. rc = SOCKET_ERROR;
  184. goto exit;
  185. }
  186. buf[0] = header.byte;
  187. MQTTPacket_encode(&buf[1], buflen);
  188. #if !defined(NO_PERSISTENCE)
  189. if (header.bits.type == PUBREL)
  190. {
  191. char* ptraux = buffer;
  192. int msgId = readInt(&ptraux);
  193. rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1, &buffer, &buflen,
  194. header.bits.type, msgId, 0, MQTTVersion);
  195. }
  196. #endif
  197. packetbufs.count = 1;
  198. packetbufs.buffers = &buffer;
  199. packetbufs.buflens = &buflen;
  200. packetbufs.frees = &freeData;
  201. memset(packetbufs.mask, '\0', sizeof(packetbufs.mask));
  202. rc = WebSocket_putdatas(net, &buf, &buf0len, &packetbufs);
  203. if (rc == TCPSOCKET_COMPLETE)
  204. net->lastSent = MQTTTime_now();
  205. if (rc != TCPSOCKET_INTERRUPTED)
  206. free(buf);
  207. exit:
  208. FUNC_EXIT_RC(rc);
  209. return rc;
  210. }
  211. /**
  212. * Sends an MQTT packet from multiple buffers in one system call write
  213. * @param socket the socket to which to write the data
  214. * @param header the one-byte MQTT header
  215. * @param count the number of buffers
  216. * @param buffers the rest of the buffers to write (not including remaining length)
  217. * @param buflens the lengths of the data in the array of buffers to be written
  218. * @param the MQTT version being used
  219. * @return the completion code (TCPSOCKET_COMPLETE etc)
  220. */
  221. int MQTTPacket_sends(networkHandles* net, Header header, PacketBuffers* bufs, int MQTTVersion)
  222. {
  223. int i, rc = SOCKET_ERROR;
  224. size_t buf0len, total = 0;
  225. char *buf;
  226. FUNC_ENTRY;
  227. for (i = 0; i < bufs->count; i++)
  228. total += bufs->buflens[i];
  229. buf0len = 1 + MQTTPacket_encode(NULL, total);
  230. buf = malloc(buf0len);
  231. if (buf == NULL)
  232. {
  233. rc = SOCKET_ERROR;
  234. goto exit;
  235. }
  236. buf[0] = header.byte;
  237. MQTTPacket_encode(&buf[1], total);
  238. #if !defined(NO_PERSISTENCE)
  239. if (header.bits.type == PUBLISH && header.bits.qos != 0)
  240. { /* persist PUBLISH QoS1 and Qo2 */
  241. char *ptraux = bufs->buffers[2];
  242. int msgId = readInt(&ptraux);
  243. rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, bufs->count, bufs->buffers, bufs->buflens,
  244. header.bits.type, msgId, 0, MQTTVersion);
  245. }
  246. #endif
  247. rc = WebSocket_putdatas(net, &buf, &buf0len, bufs);
  248. if (rc == TCPSOCKET_COMPLETE)
  249. net->lastSent = MQTTTime_now();
  250. if (rc != TCPSOCKET_INTERRUPTED)
  251. free(buf);
  252. exit:
  253. FUNC_EXIT_RC(rc);
  254. return rc;
  255. }
  256. /**
  257. * Encodes the message length according to the MQTT algorithm
  258. * @param buf the buffer into which the encoded data is written
  259. * @param length the length to be encoded
  260. * @return the number of bytes written to buffer
  261. */
  262. int MQTTPacket_encode(char* buf, size_t length)
  263. {
  264. int rc = 0;
  265. FUNC_ENTRY;
  266. do
  267. {
  268. char d = length % 128;
  269. length /= 128;
  270. /* if there are more digits to encode, set the top bit of this digit */
  271. if (length > 0)
  272. d |= 0x80;
  273. if (buf)
  274. buf[rc++] = d;
  275. else
  276. rc++;
  277. } while (length > 0);
  278. FUNC_EXIT_RC(rc);
  279. return rc;
  280. }
  281. /**
  282. * Decodes the message length according to the MQTT algorithm
  283. * @param socket the socket from which to read the bytes
  284. * @param value the decoded length returned
  285. * @return the number of bytes read from the socket
  286. */
  287. int MQTTPacket_decode(networkHandles* net, size_t* value)
  288. {
  289. int rc = SOCKET_ERROR;
  290. char c;
  291. int multiplier = 1;
  292. int len = 0;
  293. #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
  294. FUNC_ENTRY;
  295. *value = 0;
  296. do
  297. {
  298. if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
  299. {
  300. rc = SOCKET_ERROR; /* bad data */
  301. goto exit;
  302. }
  303. rc = WebSocket_getch(net, &c);
  304. if (rc != TCPSOCKET_COMPLETE)
  305. goto exit;
  306. *value += (c & 127) * multiplier;
  307. multiplier *= 128;
  308. } while ((c & 128) != 0);
  309. exit:
  310. FUNC_EXIT_RC(rc);
  311. return rc;
  312. }
  313. /**
  314. * Calculates an integer from two bytes read from the input buffer
  315. * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  316. * @return the integer value calculated
  317. */
  318. int readInt(char** pptr)
  319. {
  320. char* ptr = *pptr;
  321. int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
  322. *pptr += 2;
  323. return len;
  324. }
  325. /**
  326. * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
  327. * a length delimited string. So it reads the two byte length then the data according to
  328. * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
  329. * by an incorrect length.
  330. * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  331. * @param enddata pointer to the end of the buffer not to be read beyond
  332. * @param len returns the calculcated value of the length bytes read
  333. * @return an allocated C string holding the characters read, or NULL if the length read would
  334. * have caused an overrun.
  335. *
  336. */
  337. static char* readUTFlen(char** pptr, char* enddata, int* len)
  338. {
  339. char* string = NULL;
  340. FUNC_ENTRY;
  341. if (enddata - (*pptr) > 1) /* enough length to read the integer? */
  342. {
  343. *len = readInt(pptr);
  344. if (&(*pptr)[*len] <= enddata)
  345. {
  346. if ((string = malloc(*len+1)) == NULL)
  347. goto exit;
  348. memcpy(string, *pptr, *len);
  349. string[*len] = '\0';
  350. *pptr += *len;
  351. }
  352. }
  353. exit:
  354. FUNC_EXIT;
  355. return string;
  356. }
  357. /**
  358. * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
  359. * a length delimited string. So it reads the two byte length then the data according to
  360. * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
  361. * by an incorrect length.
  362. * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  363. * @param enddata pointer to the end of the buffer not to be read beyond
  364. * @return an allocated C string holding the characters read, or NULL if the length read would
  365. * have caused an overrun.
  366. */
  367. char* readUTF(char** pptr, char* enddata)
  368. {
  369. int len;
  370. return readUTFlen(pptr, enddata, &len);
  371. }
  372. /**
  373. * Reads one character from the input buffer.
  374. * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  375. * @return the character read
  376. */
  377. unsigned char readChar(char** pptr)
  378. {
  379. unsigned char c = **pptr;
  380. (*pptr)++;
  381. return c;
  382. }
  383. /**
  384. * Writes one character to an output buffer.
  385. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  386. * @param c the character to write
  387. */
  388. void writeChar(char** pptr, char c)
  389. {
  390. **pptr = c;
  391. (*pptr)++;
  392. }
  393. /**
  394. * Writes an integer as 2 bytes to an output buffer.
  395. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  396. * @param anInt the integer to write
  397. */
  398. void writeInt(char** pptr, int anInt)
  399. {
  400. **pptr = (char)(anInt / 256);
  401. (*pptr)++;
  402. **pptr = (char)(anInt % 256);
  403. (*pptr)++;
  404. }
  405. /**
  406. * Writes a "UTF" string to an output buffer. Converts C string to length-delimited.
  407. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  408. * @param string the C string to write
  409. */
  410. void writeUTF(char** pptr, const char* string)
  411. {
  412. size_t len = strlen(string);
  413. writeInt(pptr, (int)len);
  414. memcpy(*pptr, string, len);
  415. *pptr += len;
  416. }
  417. /**
  418. * Writes length delimited data to an output buffer
  419. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  420. * @param data the data to write
  421. * @param datalen the length of the data to write
  422. */
  423. void writeData(char** pptr, const void* data, int datalen)
  424. {
  425. writeInt(pptr, datalen);
  426. memcpy(*pptr, data, datalen);
  427. *pptr += datalen;
  428. }
  429. /**
  430. * Function used in the new packets table to create packets which have only a header.
  431. * @param MQTTVersion the version of MQTT
  432. * @param aHeader the MQTT header byte
  433. * @param data the rest of the packet
  434. * @param datalen the length of the rest of the packet
  435. * @return pointer to the packet structure
  436. */
  437. void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
  438. {
  439. static unsigned char header = 0;
  440. header = aHeader;
  441. return &header;
  442. }
  443. /**
  444. * Send an MQTT disconnect packet down a socket.
  445. * @param socket the open socket to send the data to
  446. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  447. */
  448. int MQTTPacket_send_disconnect(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
  449. {
  450. Header header;
  451. int rc = 0;
  452. FUNC_ENTRY;
  453. header.byte = 0;
  454. header.bits.type = DISCONNECT;
  455. if (client->MQTTVersion >= 5 && (props || reason != MQTTREASONCODE_SUCCESS))
  456. {
  457. size_t buflen = 1 + ((props == NULL) ? 0 : MQTTProperties_len(props));
  458. char *buf = NULL;
  459. char *ptr = NULL;
  460. if ((buf = malloc(buflen)) == NULL)
  461. {
  462. rc = SOCKET_ERROR;
  463. goto exit;
  464. }
  465. ptr = buf;
  466. writeChar(&ptr, reason);
  467. if (props)
  468. MQTTProperties_write(&ptr, props);
  469. if ((rc = MQTTPacket_send(&client->net, header, buf, buflen, 1,
  470. client->MQTTVersion)) != TCPSOCKET_INTERRUPTED)
  471. free(buf);
  472. }
  473. else
  474. rc = MQTTPacket_send(&client->net, header, NULL, 0, 0, client->MQTTVersion);
  475. exit:
  476. Log(LOG_PROTOCOL, 28, NULL, client->net.socket, client->clientID, rc);
  477. FUNC_EXIT_RC(rc);
  478. return rc;
  479. }
  480. /**
  481. * Function used in the new packets table to create publish packets.
  482. * @param MQTTVersion
  483. * @param aHeader the MQTT header byte
  484. * @param data the rest of the packet
  485. * @param datalen the length of the rest of the packet
  486. * @return pointer to the packet structure
  487. */
  488. void* MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
  489. {
  490. Publish* pack = NULL;
  491. char* curdata = data;
  492. char* enddata = &data[datalen];
  493. FUNC_ENTRY;
  494. if ((pack = malloc(sizeof(Publish))) == NULL)
  495. goto exit;
  496. memset(pack, '\0', sizeof(Publish));
  497. pack->MQTTVersion = MQTTVersion;
  498. pack->header.byte = aHeader;
  499. if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == NULL) /* Topic name on which to publish */
  500. {
  501. free(pack);
  502. pack = NULL;
  503. goto exit;
  504. }
  505. if (pack->header.bits.qos > 0) /* Msgid only exists for QoS 1 or 2 */
  506. {
  507. if (enddata - curdata < 2) /* Is there enough data for the msgid? */
  508. {
  509. free(pack);
  510. pack = NULL;
  511. goto exit;
  512. }
  513. pack->msgId = readInt(&curdata);
  514. }
  515. else
  516. pack->msgId = 0;
  517. if (MQTTVersion >= MQTTVERSION_5)
  518. {
  519. MQTTProperties props = MQTTProperties_initializer;
  520. pack->properties = props;
  521. if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
  522. {
  523. if (pack->properties.array)
  524. free(pack->properties.array);
  525. if (pack)
  526. free(pack);
  527. pack = NULL; /* signal protocol error */
  528. goto exit;
  529. }
  530. }
  531. pack->payload = curdata;
  532. pack->payloadlen = (int)(datalen-(curdata-data));
  533. exit:
  534. FUNC_EXIT;
  535. return pack;
  536. }
  537. /**
  538. * Free allocated storage for a publish packet.
  539. * @param pack pointer to the publish packet structure
  540. */
  541. void MQTTPacket_freePublish(Publish* pack)
  542. {
  543. FUNC_ENTRY;
  544. if (pack->topic != NULL)
  545. free(pack->topic);
  546. if (pack->MQTTVersion >= MQTTVERSION_5)
  547. MQTTProperties_free(&pack->properties);
  548. free(pack);
  549. FUNC_EXIT;
  550. }
  551. /**
  552. * Free allocated storage for an ack packet.
  553. * @param pack pointer to the publish packet structure
  554. */
  555. void MQTTPacket_freeAck(Ack* pack)
  556. {
  557. FUNC_ENTRY;
  558. if (pack->MQTTVersion >= MQTTVERSION_5)
  559. MQTTProperties_free(&pack->properties);
  560. free(pack);
  561. FUNC_EXIT;
  562. }
  563. /**
  564. * Send an MQTT acknowledgement packet down a socket.
  565. * @param MQTTVersion the version of MQTT being used
  566. * @param type the MQTT packet type e.g. SUBACK
  567. * @param msgid the MQTT message id to use
  568. * @param dup boolean - whether to set the MQTT DUP flag
  569. * @param net the network handle to send the data to
  570. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  571. */
  572. static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net)
  573. {
  574. Header header;
  575. int rc = SOCKET_ERROR;
  576. char *buf = NULL;
  577. char *ptr = NULL;
  578. FUNC_ENTRY;
  579. if ((ptr = buf = malloc(2)) == NULL)
  580. goto exit;
  581. header.byte = 0;
  582. header.bits.type = type;
  583. header.bits.dup = dup;
  584. if (type == PUBREL)
  585. header.bits.qos = 1;
  586. writeInt(&ptr, msgid);
  587. if ((rc = MQTTPacket_send(net, header, buf, 2, 1, MQTTVersion)) != TCPSOCKET_INTERRUPTED)
  588. free(buf);
  589. exit:
  590. FUNC_EXIT_RC(rc);
  591. return rc;
  592. }
  593. /**
  594. * Send an MQTT PUBACK packet down a socket.
  595. * @param MQTTVersion the version of MQTT being used
  596. * @param msgid the MQTT message id to use
  597. * @param socket the open socket to send the data to
  598. * @param clientID the string client identifier, only used for tracing
  599. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  600. */
  601. int MQTTPacket_send_puback(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
  602. {
  603. int rc = 0;
  604. FUNC_ENTRY;
  605. rc = MQTTPacket_send_ack(MQTTVersion, PUBACK, msgid, 0, net);
  606. Log(LOG_PROTOCOL, 12, NULL, net->socket, clientID, msgid, rc);
  607. FUNC_EXIT_RC(rc);
  608. return rc;
  609. }
  610. /**
  611. * Free allocated storage for a suback packet.
  612. * @param pack pointer to the suback packet structure
  613. */
  614. void MQTTPacket_freeSuback(Suback* pack)
  615. {
  616. FUNC_ENTRY;
  617. if (pack->MQTTVersion >= MQTTVERSION_5)
  618. MQTTProperties_free(&pack->properties);
  619. if (pack->qoss != NULL)
  620. ListFree(pack->qoss);
  621. free(pack);
  622. FUNC_EXIT;
  623. }
  624. /**
  625. * Free allocated storage for a suback packet.
  626. * @param pack pointer to the suback packet structure
  627. */
  628. void MQTTPacket_freeUnsuback(Unsuback* pack)
  629. {
  630. FUNC_ENTRY;
  631. if (pack->MQTTVersion >= MQTTVERSION_5)
  632. {
  633. MQTTProperties_free(&pack->properties);
  634. if (pack->reasonCodes != NULL)
  635. ListFree(pack->reasonCodes);
  636. }
  637. free(pack);
  638. FUNC_EXIT;
  639. }
  640. /**
  641. * Send an MQTT PUBREC packet down a socket.
  642. * @param MQTTVersion the version of MQTT being used
  643. * @param msgid the MQTT message id to use
  644. * @param socket the open socket to send the data to
  645. * @param clientID the string client identifier, only used for tracing
  646. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  647. */
  648. int MQTTPacket_send_pubrec(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
  649. {
  650. int rc = 0;
  651. FUNC_ENTRY;
  652. rc = MQTTPacket_send_ack(MQTTVersion, PUBREC, msgid, 0, net);
  653. Log(LOG_PROTOCOL, 13, NULL, net->socket, clientID, msgid, rc);
  654. FUNC_EXIT_RC(rc);
  655. return rc;
  656. }
  657. /**
  658. * Send an MQTT PUBREL packet down a socket.
  659. * @param MQTTVersion the version of MQTT being used
  660. * @param msgid the MQTT message id to use
  661. * @param dup boolean - whether to set the MQTT DUP flag
  662. * @param socket the open socket to send the data to
  663. * @param clientID the string client identifier, only used for tracing
  664. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  665. */
  666. int MQTTPacket_send_pubrel(int MQTTVersion, int msgid, int dup, networkHandles* net, const char* clientID)
  667. {
  668. int rc = 0;
  669. FUNC_ENTRY;
  670. rc = MQTTPacket_send_ack(MQTTVersion, PUBREL, msgid, dup, net);
  671. Log(LOG_PROTOCOL, 16, NULL, net->socket, clientID, msgid, rc);
  672. FUNC_EXIT_RC(rc);
  673. return rc;
  674. }
  675. /**
  676. * Send an MQTT PUBCOMP packet down a socket.
  677. * @param MQTTVersion the version of MQTT being used
  678. * @param msgid the MQTT message id to use
  679. * @param socket the open socket to send the data to
  680. * @param clientID the string client identifier, only used for tracing
  681. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  682. */
  683. int MQTTPacket_send_pubcomp(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
  684. {
  685. int rc = 0;
  686. FUNC_ENTRY;
  687. rc = MQTTPacket_send_ack(MQTTVersion, PUBCOMP, msgid, 0, net);
  688. Log(LOG_PROTOCOL, 18, NULL, net->socket, clientID, msgid, rc);
  689. FUNC_EXIT_RC(rc);
  690. return rc;
  691. }
  692. /**
  693. * Function used in the new packets table to create acknowledgement packets.
  694. * @param MQTTVersion the version of MQTT being used
  695. * @param aHeader the MQTT header byte
  696. * @param data the rest of the packet
  697. * @param datalen the length of the rest of the packet
  698. * @return pointer to the packet structure
  699. */
  700. void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
  701. {
  702. Ack* pack = NULL;
  703. char* curdata = data;
  704. char* enddata = &data[datalen];
  705. FUNC_ENTRY;
  706. if ((pack = malloc(sizeof(Ack))) == NULL)
  707. goto exit;
  708. pack->MQTTVersion = MQTTVersion;
  709. pack->header.byte = aHeader;
  710. if (pack->header.bits.type != DISCONNECT)
  711. {
  712. if (enddata - curdata < 2) /* Is there enough data for the msgid? */
  713. {
  714. free(pack);
  715. pack = NULL;
  716. goto exit;
  717. }
  718. pack->msgId = readInt(&curdata);
  719. }
  720. if (MQTTVersion >= MQTTVERSION_5)
  721. {
  722. MQTTProperties props = MQTTProperties_initializer;
  723. pack->rc = MQTTREASONCODE_SUCCESS;
  724. pack->properties = props;
  725. /* disconnect has no msgid */
  726. if (datalen > 2 || (pack->header.bits.type == DISCONNECT && datalen > 0))
  727. pack->rc = readChar(&curdata); /* reason code */
  728. if (datalen > 3 || (pack->header.bits.type == DISCONNECT && datalen > 1))
  729. {
  730. if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
  731. {
  732. if (pack->properties.array)
  733. free(pack->properties.array);
  734. if (pack)
  735. free(pack);
  736. pack = NULL; /* signal protocol error */
  737. goto exit;
  738. }
  739. }
  740. }
  741. exit:
  742. FUNC_EXIT;
  743. return pack;
  744. }
  745. /**
  746. * Format the payload for printing in the trace.
  747. * Any unprintable characters output as hex.
  748. * @param buflen the length of the supplied print buffer
  749. * @param buf the supplied print buffer
  750. * @param payloadlen the length of the payload to be printed
  751. * @param payload the payload data to be printed
  752. * @return the length of the data output to the print buffer
  753. */
  754. int MQTTPacket_formatPayload(int buflen, char* buf, int payloadlen, char* payload)
  755. {
  756. int pos = 0;
  757. int i = 0;
  758. for (i = 0; i < payloadlen; i++)
  759. {
  760. if (isprint(payload[i]))
  761. {
  762. if (pos >= buflen)
  763. break;
  764. buf[pos++] = payload[i];
  765. }
  766. else
  767. {
  768. static char *hexdigit = "0123456789ABCDEF";
  769. if (pos >= buflen - 3)
  770. break;
  771. buf[pos++] = '\\';
  772. buf[pos++] = 'x';
  773. buf[pos++] = hexdigit[payload[i] & 0xF0];
  774. buf[pos++] = hexdigit[payload[i] & 0x0F];
  775. }
  776. }
  777. return pos;
  778. }
  779. /**
  780. * Send an MQTT PUBLISH packet down a socket.
  781. * @param pack a structure from which to get some values to use, e.g topic, payload
  782. * @param dup boolean - whether to set the MQTT DUP flag
  783. * @param qos the value to use for the MQTT QoS setting
  784. * @param retained boolean - whether to set the MQTT retained flag
  785. * @param socket the open socket to send the data to
  786. * @param clientID the string client identifier, only used for tracing
  787. * @return the completion code (e.g. TCPSOCKET_COMPLETE)
  788. */
  789. int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID)
  790. {
  791. Header header;
  792. char *topiclen;
  793. int rc = SOCKET_ERROR;
  794. FUNC_ENTRY;
  795. topiclen = malloc(2);
  796. if (topiclen == NULL)
  797. goto exit;
  798. header.bits.type = PUBLISH;
  799. header.bits.dup = dup;
  800. header.bits.qos = qos;
  801. header.bits.retain = retained;
  802. if (qos > 0 || pack->MQTTVersion >= 5)
  803. {
  804. int buflen = ((qos > 0) ? 2 : 0) + ((pack->MQTTVersion >= 5) ? MQTTProperties_len(&pack->properties) : 0);
  805. char *ptr = NULL;
  806. char* bufs[4] = {topiclen, pack->topic, NULL, pack->payload};
  807. size_t lens[4] = {2, strlen(pack->topic), buflen, pack->payloadlen};
  808. int frees[4] = {1, 0, 1, 0};
  809. PacketBuffers packetbufs = {4, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
  810. bufs[2] = ptr = malloc(buflen);
  811. if (ptr == NULL)
  812. goto exit_free;
  813. if (qos > 0)
  814. writeInt(&ptr, pack->msgId);
  815. if (pack->MQTTVersion >= 5)
  816. MQTTProperties_write(&ptr, &pack->properties);
  817. ptr = topiclen;
  818. writeInt(&ptr, (int)lens[1]);
  819. rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
  820. if (rc != TCPSOCKET_INTERRUPTED)
  821. free(bufs[2]);
  822. memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
  823. }
  824. else
  825. {
  826. char* ptr = topiclen;
  827. char* bufs[3] = {topiclen, pack->topic, pack->payload};
  828. size_t lens[3] = {2, strlen(pack->topic), pack->payloadlen};
  829. int frees[3] = {1, 0, 0};
  830. PacketBuffers packetbufs = {3, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
  831. writeInt(&ptr, (int)lens[1]);
  832. rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
  833. memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
  834. }
  835. {
  836. #if defined(_WIN32) || defined(_WIN64)
  837. #define buflen 30
  838. #else
  839. const int buflen = 30;
  840. #endif
  841. char buf[buflen];
  842. int len = 0;
  843. len = MQTTPacket_formatPayload(buflen, buf, pack->payloadlen, pack->payload);
  844. if (qos == 0)
  845. Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, retained, rc, pack->payloadlen, len, buf);
  846. else
  847. Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, qos, retained, rc, pack->payloadlen,
  848. len, buf);
  849. }
  850. exit_free:
  851. if (rc != TCPSOCKET_INTERRUPTED)
  852. free(topiclen);
  853. exit:
  854. FUNC_EXIT_RC(rc);
  855. return rc;
  856. }
  857. /**
  858. * Free allocated storage for a various packet tyoes
  859. * @param pack pointer to the suback packet structure
  860. */
  861. void MQTTPacket_free_packet(MQTTPacket* pack)
  862. {
  863. FUNC_ENTRY;
  864. if (pack->header.bits.type == PUBLISH)
  865. MQTTPacket_freePublish((Publish*)pack);
  866. /*else if (pack->header.type == SUBSCRIBE)
  867. MQTTPacket_freeSubscribe((Subscribe*)pack, 1);
  868. else if (pack->header.type == UNSUBSCRIBE)
  869. MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/
  870. else
  871. free(pack);
  872. FUNC_EXIT;
  873. }
  874. /**
  875. * Writes an integer as 4 bytes to an output buffer.
  876. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  877. * @param anInt the integer to write
  878. */
  879. void writeInt4(char** pptr, unsigned int anInt)
  880. {
  881. **pptr = (char)(anInt / 16777216);
  882. (*pptr)++;
  883. anInt %= 16777216;
  884. **pptr = (char)(anInt / 65536);
  885. (*pptr)++;
  886. anInt %= 65536;
  887. **pptr = (char)(anInt / 256);
  888. (*pptr)++;
  889. **pptr = (char)(anInt % 256);
  890. (*pptr)++;
  891. }
  892. /**
  893. * Calculates an integer from two bytes read from the input buffer
  894. * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  895. * @return the integer value calculated
  896. */
  897. unsigned int readInt4(char** pptr)
  898. {
  899. unsigned char* ptr = (unsigned char*)*pptr;
  900. unsigned int value = 16777216*(*ptr) + 65536*(*(ptr+1)) + 256*(*(ptr+2)) + (*(ptr+3));
  901. *pptr += 4;
  902. return value;
  903. }
  904. void writeMQTTLenString(char** pptr, MQTTLenString lenstring)
  905. {
  906. writeInt(pptr, lenstring.len);
  907. memcpy(*pptr, lenstring.data, lenstring.len);
  908. *pptr += lenstring.len;
  909. }
  910. int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata)
  911. {
  912. int len = -1;
  913. /* the first two bytes are the length of the string */
  914. if (enddata - (*pptr) > 1) /* enough length to read the integer? */
  915. {
  916. lenstring->len = readInt(pptr); /* increments pptr to point past length */
  917. if (&(*pptr)[lenstring->len] <= enddata)
  918. {
  919. lenstring->data = (char*)*pptr;
  920. *pptr += lenstring->len;
  921. len = 2 + lenstring->len;
  922. }
  923. }
  924. return len;
  925. }
  926. /*
  927. if (prop->value.integer4 >= 0 && prop->value.integer4 <= 127)
  928. len = 1;
  929. else if (prop->value.integer4 >= 128 && prop->value.integer4 <= 16383)
  930. len = 2;
  931. else if (prop->value.integer4 >= 16384 && prop->value.integer4 < 2097151)
  932. len = 3;
  933. else if (prop->value.integer4 >= 2097152 && prop->value.integer4 < 268435455)
  934. len = 4;
  935. */
  936. int MQTTPacket_VBIlen(int rem_len)
  937. {
  938. int rc = 0;
  939. if (rem_len < 128)
  940. rc = 1;
  941. else if (rem_len < 16384)
  942. rc = 2;
  943. else if (rem_len < 2097152)
  944. rc = 3;
  945. else
  946. rc = 4;
  947. return rc;
  948. }
  949. /**
  950. * Decodes the message length according to the MQTT algorithm
  951. * @param getcharfn pointer to function to read the next character from the data source
  952. * @param value the decoded length returned
  953. * @return the number of bytes read from the socket
  954. */
  955. int MQTTPacket_VBIdecode(int (*getcharfn)(char*, int), unsigned int* value)
  956. {
  957. char c;
  958. int multiplier = 1;
  959. int len = 0;
  960. #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
  961. *value = 0;
  962. do
  963. {
  964. int rc = MQTTPACKET_READ_ERROR;
  965. if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
  966. {
  967. rc = MQTTPACKET_READ_ERROR; /* bad data */
  968. goto exit;
  969. }
  970. rc = (*getcharfn)(&c, 1);
  971. if (rc != 1)
  972. goto exit;
  973. *value += (c & 127) * multiplier;
  974. multiplier *= 128;
  975. } while ((c & 128) != 0);
  976. exit:
  977. return len;
  978. }
  979. static char* bufptr;
  980. int bufchar(char* c, int count)
  981. {
  982. int i;
  983. for (i = 0; i < count; ++i)
  984. *c = *bufptr++;
  985. return count;
  986. }
  987. int MQTTPacket_decodeBuf(char* buf, unsigned int* value)
  988. {
  989. bufptr = buf;
  990. return MQTTPacket_VBIdecode(bufchar, value);
  991. }