SocketBuffer.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL updates
  16. * Ian Craggs - fix for issue #244, issue #20
  17. *******************************************************************************/
  18. /**
  19. * @file
  20. * \brief Socket buffering related functions
  21. *
  22. * Some other related functions are in the Socket module
  23. */
  24. #include "SocketBuffer.h"
  25. #include "LinkedList.h"
  26. #include "Log.h"
  27. #include "Messages.h"
  28. #include "StackTrace.h"
  29. #include <stdlib.h>
  30. #include <stdio.h>
  31. #include <string.h>
  32. #include "Heap.h"
  33. #if defined(_WIN32) || defined(_WIN64)
  34. #define iov_len len
  35. #define iov_base buf
  36. #endif
  37. /**
  38. * Default input queue buffer
  39. */
  40. static socket_queue* def_queue;
  41. /**
  42. * List of queued input buffers
  43. */
  44. static List* queues;
  45. /**
  46. * List of queued write buffers
  47. */
  48. static List writes;
  49. int socketcompare(void* a, void* b);
  50. int SocketBuffer_newDefQ(void);
  51. void SocketBuffer_freeDefQ(void);
  52. int pending_socketcompare(void* a, void* b);
  53. /**
  54. * List callback function for comparing socket_queues by socket
  55. * @param a first integer value
  56. * @param b second integer value
  57. * @return boolean indicating whether a and b are equal
  58. */
  59. int socketcompare(void* a, void* b)
  60. {
  61. return ((socket_queue*)a)->socket == *(int*)b;
  62. }
  63. /**
  64. * Create a new default queue when one has just been used.
  65. */
  66. int SocketBuffer_newDefQ(void)
  67. {
  68. int rc = PAHO_MEMORY_ERROR;
  69. def_queue = malloc(sizeof(socket_queue));
  70. if (def_queue)
  71. {
  72. def_queue->buflen = 1000;
  73. def_queue->buf = malloc(def_queue->buflen);
  74. if (def_queue->buf)
  75. {
  76. def_queue->socket = def_queue->index = 0;
  77. def_queue->buflen = def_queue->datalen = def_queue->headerlen = 0;
  78. rc = 0;
  79. }
  80. }
  81. return rc;
  82. }
  83. /**
  84. * Initialize the socketBuffer module
  85. */
  86. int SocketBuffer_initialize(void)
  87. {
  88. int rc = 0;
  89. FUNC_ENTRY;
  90. rc = SocketBuffer_newDefQ();
  91. if (rc == 0)
  92. {
  93. if ((queues = ListInitialize()) == NULL)
  94. rc = PAHO_MEMORY_ERROR;
  95. }
  96. ListZero(&writes);
  97. FUNC_EXIT_RC(rc);
  98. return rc;
  99. }
  100. /**
  101. * Free the default queue memory
  102. */
  103. void SocketBuffer_freeDefQ(void)
  104. {
  105. free(def_queue->buf);
  106. free(def_queue);
  107. def_queue = NULL;
  108. }
  109. /**
  110. * Terminate the socketBuffer module
  111. */
  112. void SocketBuffer_terminate(void)
  113. {
  114. ListElement* cur = NULL;
  115. ListEmpty(&writes);
  116. FUNC_ENTRY;
  117. while (ListNextElement(queues, &cur))
  118. free(((socket_queue*)(cur->content))->buf);
  119. ListFree(queues);
  120. SocketBuffer_freeDefQ();
  121. FUNC_EXIT;
  122. }
  123. /**
  124. * Cleanup any buffers for a specific socket
  125. * @param socket the socket to clean up
  126. */
  127. void SocketBuffer_cleanup(SOCKET socket)
  128. {
  129. FUNC_ENTRY;
  130. SocketBuffer_writeComplete(socket); /* clean up write buffers */
  131. if (ListFindItem(queues, &socket, socketcompare))
  132. {
  133. free(((socket_queue*)(queues->current->content))->buf);
  134. ListRemove(queues, queues->current->content);
  135. }
  136. if (def_queue->socket == socket)
  137. {
  138. def_queue->socket = def_queue->index = 0;
  139. def_queue->headerlen = def_queue->datalen = 0;
  140. }
  141. FUNC_EXIT;
  142. }
  143. /**
  144. * Get any queued data for a specific socket
  145. * @param socket the socket to get queued data for
  146. * @param bytes the number of bytes of data to retrieve
  147. * @param actual_len the actual length returned
  148. * @return the actual data
  149. */
  150. char* SocketBuffer_getQueuedData(SOCKET socket, size_t bytes, size_t* actual_len)
  151. {
  152. socket_queue* queue = NULL;
  153. FUNC_ENTRY;
  154. if (ListFindItem(queues, &socket, socketcompare))
  155. { /* if there is queued data for this socket, add any data read to it */
  156. queue = (socket_queue*)(queues->current->content);
  157. *actual_len = queue->datalen;
  158. }
  159. else
  160. {
  161. *actual_len = 0;
  162. queue = def_queue;
  163. }
  164. if (bytes > queue->buflen)
  165. {
  166. if (queue->datalen > 0)
  167. {
  168. void* newmem = malloc(bytes);
  169. if (newmem)
  170. {
  171. memcpy(newmem, queue->buf, queue->datalen);
  172. free(queue->buf);
  173. queue->buf = newmem;
  174. }
  175. else
  176. {
  177. free(queue->buf);
  178. queue->buf = NULL;
  179. goto exit;
  180. }
  181. }
  182. else
  183. {
  184. void* newmem = realloc(queue->buf, bytes);
  185. if (newmem)
  186. {
  187. queue->buf = newmem;
  188. }
  189. else
  190. {
  191. free(queue->buf);
  192. queue->buf = NULL;
  193. goto exit;
  194. }
  195. }
  196. queue->buflen = bytes;
  197. }
  198. exit:
  199. FUNC_EXIT;
  200. return queue->buf;
  201. }
  202. /**
  203. * Get any queued character for a specific socket
  204. * @param socket the socket to get queued data for
  205. * @param c the character returned if any
  206. * @return completion code
  207. */
  208. int SocketBuffer_getQueuedChar(SOCKET socket, char* c)
  209. {
  210. int rc = SOCKETBUFFER_INTERRUPTED;
  211. FUNC_ENTRY;
  212. if (ListFindItem(queues, &socket, socketcompare))
  213. { /* if there is queued data for this socket, read that first */
  214. socket_queue* queue = (socket_queue*)(queues->current->content);
  215. if (queue->index < queue->headerlen)
  216. {
  217. *c = queue->fixed_header[(queue->index)++];
  218. Log(TRACE_MAX, -1, "index is now %d, headerlen %d", queue->index, (int)queue->headerlen);
  219. rc = SOCKETBUFFER_COMPLETE;
  220. goto exit;
  221. }
  222. else if (queue->index > 4)
  223. {
  224. Log(LOG_FATAL, -1, "header is already at full length");
  225. rc = SOCKET_ERROR;
  226. goto exit;
  227. }
  228. }
  229. exit:
  230. FUNC_EXIT_RC(rc);
  231. return rc; /* there was no queued char if rc is SOCKETBUFFER_INTERRUPTED*/
  232. }
  233. /**
  234. * A socket read was interrupted so we need to queue data
  235. * @param socket the socket to get queued data for
  236. * @param actual_len the actual length of data that was read
  237. */
  238. void SocketBuffer_interrupted(SOCKET socket, size_t actual_len)
  239. {
  240. socket_queue* queue = NULL;
  241. FUNC_ENTRY;
  242. if (ListFindItem(queues, &socket, socketcompare))
  243. queue = (socket_queue*)(queues->current->content);
  244. else /* new saved queue */
  245. {
  246. queue = def_queue;
  247. /* if SocketBuffer_queueChar() has not yet been called, then the socket number
  248. in def_queue will not have been set. Issue #244.
  249. If actual_len == 0 then we may not need to do anything - I'll leave that
  250. optimization for another time. */
  251. queue->socket = socket;
  252. ListAppend(queues, def_queue, sizeof(socket_queue)+def_queue->buflen);
  253. SocketBuffer_newDefQ();
  254. }
  255. queue->index = 0;
  256. queue->datalen = actual_len;
  257. FUNC_EXIT;
  258. }
  259. /**
  260. * A socket read has now completed so we can get rid of the queue
  261. * @param socket the socket for which the operation is now complete
  262. * @return pointer to the default queue data
  263. */
  264. char* SocketBuffer_complete(SOCKET socket)
  265. {
  266. FUNC_ENTRY;
  267. if (ListFindItem(queues, &socket, socketcompare))
  268. {
  269. socket_queue* queue = (socket_queue*)(queues->current->content);
  270. SocketBuffer_freeDefQ();
  271. def_queue = queue;
  272. ListDetach(queues, queue);
  273. }
  274. def_queue->socket = def_queue->index = 0;
  275. def_queue->headerlen = def_queue->datalen = 0;
  276. FUNC_EXIT;
  277. return def_queue->buf;
  278. }
  279. /**
  280. * Queued a Charactor to a specific socket
  281. * @param socket the socket for which to queue char for
  282. * @param c the character to queue
  283. */
  284. void SocketBuffer_queueChar(SOCKET socket, char c)
  285. {
  286. int error = 0;
  287. socket_queue* curq = def_queue;
  288. FUNC_ENTRY;
  289. if (ListFindItem(queues, &socket, socketcompare))
  290. curq = (socket_queue*)(queues->current->content);
  291. else if (def_queue->socket == 0)
  292. {
  293. def_queue->socket = socket;
  294. def_queue->index = 0;
  295. def_queue->datalen = 0;
  296. }
  297. else if (def_queue->socket != socket)
  298. {
  299. Log(LOG_FATAL, -1, "attempt to reuse socket queue");
  300. error = 1;
  301. }
  302. if (curq->index > 4)
  303. {
  304. Log(LOG_FATAL, -1, "socket queue fixed_header field full");
  305. error = 1;
  306. }
  307. if (!error)
  308. {
  309. curq->fixed_header[(curq->index)++] = c;
  310. curq->headerlen = curq->index;
  311. }
  312. Log(TRACE_MAX, -1, "queueChar: index is now %d, headerlen %d", curq->index, (int)curq->headerlen);
  313. FUNC_EXIT;
  314. }
  315. /**
  316. * A socket write was interrupted so store the remaining data
  317. * @param socket the socket for which the write was interrupted
  318. * @param count the number of iovec buffers
  319. * @param iovecs buffer array
  320. * @param frees a set of flags indicating which of the iovecs array should be freed
  321. * @param total total data length to be written
  322. * @param bytes actual data length that was written
  323. */
  324. #if defined(OPENSSL)
  325. int SocketBuffer_pendingWrite(SOCKET socket, SSL* ssl, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
  326. #else
  327. int SocketBuffer_pendingWrite(SOCKET socket, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
  328. #endif
  329. {
  330. int i = 0;
  331. pending_writes* pw = NULL;
  332. int rc = 0;
  333. FUNC_ENTRY;
  334. /* store the buffers until the whole packet is written */
  335. if ((pw = malloc(sizeof(pending_writes))) == NULL)
  336. {
  337. rc = PAHO_MEMORY_ERROR;
  338. goto exit;
  339. }
  340. pw->socket = socket;
  341. #if defined(OPENSSL)
  342. pw->ssl = ssl;
  343. #endif
  344. pw->bytes = bytes;
  345. pw->total = total;
  346. pw->count = count;
  347. for (i = 0; i < count; i++)
  348. {
  349. pw->iovecs[i] = iovecs[i];
  350. pw->frees[i] = frees[i];
  351. }
  352. ListAppend(&writes, pw, sizeof(pw) + total);
  353. exit:
  354. FUNC_EXIT_RC(rc);
  355. return rc;
  356. }
  357. /**
  358. * List callback function for comparing pending_writes by socket
  359. * @param a first integer value
  360. * @param b second integer value
  361. * @return boolean indicating whether a and b are equal
  362. */
  363. int pending_socketcompare(void* a, void* b)
  364. {
  365. return ((pending_writes*)a)->socket == *(int*)b;
  366. }
  367. /**
  368. * Get any queued write data for a specific socket
  369. * @param socket the socket to get queued data for
  370. * @return pointer to the queued data or NULL
  371. */
  372. pending_writes* SocketBuffer_getWrite(SOCKET socket)
  373. {
  374. ListElement* le = ListFindItem(&writes, &socket, pending_socketcompare);
  375. return (le) ? (pending_writes*)(le->content) : NULL;
  376. }
  377. /**
  378. * A socket write has now completed so we can get rid of the queue
  379. * @param socket the socket for which the operation is now complete
  380. * @return completion code, boolean - was the queue removed?
  381. */
  382. int SocketBuffer_writeComplete(SOCKET socket)
  383. {
  384. return ListRemoveItem(&writes, &socket, pending_socketcompare);
  385. }
  386. /**
  387. * Update the queued write data for a socket in the case of QoS 0 messages.
  388. * @param socket the socket for which the operation is now complete
  389. * @param topic the topic of the QoS 0 write
  390. * @param payload the payload of the QoS 0 write
  391. * @return pointer to the updated queued data structure, or NULL
  392. */
  393. pending_writes* SocketBuffer_updateWrite(SOCKET socket, char* topic, char* payload)
  394. {
  395. pending_writes* pw = NULL;
  396. ListElement* le = NULL;
  397. FUNC_ENTRY;
  398. if ((le = ListFindItem(&writes, &socket, pending_socketcompare)) != NULL)
  399. {
  400. pw = (pending_writes*)(le->content);
  401. if (pw->count == 4)
  402. {
  403. pw->iovecs[2].iov_base = topic;
  404. pw->iovecs[3].iov_base = payload;
  405. }
  406. }
  407. FUNC_EXIT;
  408. return pw;
  409. }