MQTTProtocolOut.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2025 IBM Corp., Ian Craggs and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs, Allan Stockdill-Mander - SSL updates
  16. * Ian Craggs - fix for buffer overflow in addressPort bug #433290
  17. * Ian Craggs - MQTT 3.1.1 support
  18. * Rong Xiang, Ian Craggs - C++ compatibility
  19. * Ian Craggs - fix for bug 479376
  20. * Ian Craggs - SNI support
  21. * Ian Craggs - fix for issue #164
  22. * Ian Craggs - fix for issue #179
  23. * Ian Craggs - MQTT 5.0 support
  24. * Sven Gambel - add generic proxy support
  25. *******************************************************************************/
  26. /**
  27. * @file
  28. * \brief Functions dealing with the MQTT protocol exchanges
  29. *
  30. * Some other related functions are in the MQTTProtocolClient module
  31. */
  32. #include <stdlib.h>
  33. #include <string.h>
  34. #include <ctype.h>
  35. #include "MQTTProtocolOut.h"
  36. #include "StackTrace.h"
  37. #include "Heap.h"
  38. #include "WebSocket.h"
  39. #include "Proxy.h"
  40. #include "Base64.h"
  41. extern ClientStates* bstate;
  42. /**
  43. * Separates an address:port into two separate values
  44. * @param[in] uri the input string - hostname:port
  45. * @param[out] port the returned port integer
  46. * @param[out] topic optional topic portion of the address starting with '/'
  47. * @return the address string
  48. */
  49. size_t MQTTProtocol_addressPort(const char* uri, int* port, const char **topic, int default_port)
  50. {
  51. char* buf = (char*)uri;
  52. char* colon_pos;
  53. size_t len;
  54. char* topic_pos;
  55. FUNC_ENTRY;
  56. colon_pos = strrchr(uri, ':'); /* reverse find to allow for ':' in IPv6 addresses */
  57. if (uri[0] == '[')
  58. { /* ip v6 */
  59. if (colon_pos < strrchr(uri, ']'))
  60. colon_pos = NULL; /* means it was an IPv6 separator, not for host:port */
  61. }
  62. if (colon_pos) /* have to strip off the port */
  63. {
  64. len = colon_pos - uri;
  65. *port = atoi(colon_pos + 1);
  66. }
  67. else
  68. {
  69. len = strlen(buf);
  70. *port = default_port;
  71. }
  72. /* find any topic portion */
  73. topic_pos = (char*)uri;
  74. if (colon_pos)
  75. topic_pos = colon_pos;
  76. topic_pos = strchr(topic_pos, '/');
  77. if (topic_pos)
  78. {
  79. if (topic)
  80. *topic = topic_pos;
  81. if (!colon_pos)
  82. len = topic_pos - uri;
  83. }
  84. if (buf[len - 1] == ']')
  85. {
  86. /* we are stripping off the final ], so length is 1 shorter */
  87. --len;
  88. }
  89. FUNC_EXIT;
  90. return len;
  91. }
  92. /**
  93. * MQTT outgoing connect processing for a client
  94. * @param address The address of the server. For TCP this is in the form
  95. * 'address:port; for a UNIX socket it's the path to the
  96. * socket file, etc.
  97. * @param aClient a structure with all MQTT data needed
  98. * @param unixsock Whether the address if for a UNIX-domain socket
  99. * @param ssl Whether we're connecting with SSL/TLS
  100. * @param websocket Whether we should use a websocket for the connection
  101. * @param MQTTVersion the MQTT version to connect with (3, 4, or 5)
  102. * @param connectProperties The connection properties
  103. * @param willProperties Properties for the LWT
  104. * @param timeout how long to wait for a new socket to be created
  105. * @return return code
  106. */
  107. #if defined(OPENSSL)
  108. #if defined(__GNUC__) && defined(__linux__)
  109. int MQTTProtocol_connect(const char* address, Clients* aClient, int unixsock, int ssl, int websocket, int MQTTVersion,
  110. MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
  111. #else
  112. int MQTTProtocol_connect(const char* address, Clients* aClient, int unixsock, int ssl, int websocket, int MQTTVersion,
  113. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  114. #endif
  115. #else
  116. #if defined(__GNUC__) && defined(__linux__)
  117. int MQTTProtocol_connect(const char* address, Clients* aClient, int unixsock, int websocket, int MQTTVersion,
  118. MQTTProperties* connectProperties, MQTTProperties* willProperties, long timeout)
  119. #else
  120. int MQTTProtocol_connect(const char* address, Clients* aClient, int unixsock, int websocket, int MQTTVersion,
  121. MQTTProperties* connectProperties, MQTTProperties* willProperties)
  122. #endif
  123. #endif
  124. {
  125. int rc = 0,
  126. port;
  127. size_t addr_len;
  128. char* p0 = NULL;
  129. FUNC_ENTRY;
  130. aClient->good = 1;
  131. if (!unixsock)
  132. {
  133. if (aClient->httpProxy)
  134. p0 = aClient->httpProxy;
  135. else /* if the proxy isn't set in the API, then we can look in the environment */
  136. {
  137. /* Don't use the environment HTTP proxy settings by default - for backwards compatibility */
  138. char* use_proxy = getenv("PAHO_C_CLIENT_USE_HTTP_PROXY");
  139. if (use_proxy)
  140. {
  141. if (strncmp(use_proxy, "TRUE", strlen("TRUE")) == 0)
  142. {
  143. char* http_proxy = getenv("http_proxy");
  144. if (http_proxy)
  145. {
  146. char* no_proxy = getenv("no_proxy");
  147. if (no_proxy)
  148. {
  149. if (Proxy_noProxy(address, no_proxy))
  150. p0 = http_proxy;
  151. }
  152. else
  153. p0 = http_proxy; /* no no_proxy set */
  154. }
  155. }
  156. }
  157. }
  158. if (p0)
  159. {
  160. if ((rc = Proxy_setHTTPProxy(aClient, p0, &aClient->net.http_proxy, &aClient->net.http_proxy_auth, "http://")) != 0)
  161. goto exit;
  162. Log(TRACE_PROTOCOL, -1, "Setting http proxy to %s", aClient->net.http_proxy);
  163. if (aClient->net.http_proxy_auth)
  164. Log(TRACE_PROTOCOL, -1, "Setting http proxy auth to %s", aClient->net.http_proxy_auth);
  165. }
  166. }
  167. #if defined(OPENSSL)
  168. if (!unixsock)
  169. {
  170. if (aClient->httpsProxy)
  171. p0 = aClient->httpsProxy;
  172. else /* if the proxy isn't set in the API then we can look in the environment */
  173. {
  174. /* Don't use the environment HTTP proxy settings by default - for backwards compatibility */
  175. char* use_proxy = getenv("PAHO_C_CLIENT_USE_HTTP_PROXY");
  176. if (use_proxy)
  177. {
  178. if (strncmp(use_proxy, "TRUE", strlen("TRUE")) == 0)
  179. {
  180. char* https_proxy = getenv("https_proxy");
  181. if (https_proxy)
  182. {
  183. char* no_proxy = getenv("no_proxy");
  184. if (no_proxy)
  185. {
  186. if (Proxy_noProxy(address, no_proxy))
  187. p0 = https_proxy;
  188. }
  189. else
  190. p0 = https_proxy; /* no no_proxy set */
  191. }
  192. }
  193. }
  194. }
  195. if (p0)
  196. {
  197. char* prefix = NULL;
  198. if (memcmp(p0, "http://", 7) == 0)
  199. prefix = "http://";
  200. else if (memcmp(p0, "https://", 8) == 0)
  201. prefix = "https://";
  202. else
  203. {
  204. rc = -1;
  205. goto exit;
  206. }
  207. if ((rc = Proxy_setHTTPProxy(aClient, p0, &aClient->net.https_proxy, &aClient->net.https_proxy_auth, prefix)) != 0)
  208. goto exit;
  209. Log(TRACE_PROTOCOL, -1, "Setting https proxy to %s", aClient->net.https_proxy);
  210. if (aClient->net.https_proxy_auth)
  211. Log(TRACE_PROTOCOL, -1, "Setting https proxy auth to %s", aClient->net.https_proxy_auth);
  212. }
  213. }
  214. if (!ssl && aClient->net.http_proxy) {
  215. #else
  216. if (aClient->net.http_proxy) {
  217. #endif
  218. addr_len = MQTTProtocol_addressPort(aClient->net.http_proxy, &port, NULL, PROXY_DEFAULT_PORT);
  219. #if defined(__GNUC__) && defined(__linux__)
  220. if (timeout < 0)
  221. rc = -1;
  222. else
  223. rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket), timeout);
  224. #else
  225. rc = Socket_new(aClient->net.http_proxy, addr_len, port, &(aClient->net.socket));
  226. #endif
  227. }
  228. #if defined(OPENSSL)
  229. else if (ssl && aClient->net.https_proxy) {
  230. addr_len = MQTTProtocol_addressPort(aClient->net.https_proxy, &port, NULL, PROXY_DEFAULT_PORT);
  231. #if defined(__GNUC__) && defined(__linux__)
  232. if (timeout < 0)
  233. rc = -1;
  234. else
  235. rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket), timeout);
  236. #else
  237. rc = Socket_new(aClient->net.https_proxy, addr_len, port, &(aClient->net.socket));
  238. #endif
  239. }
  240. #endif
  241. #if defined(UNIXSOCK)
  242. else if (unixsock) {
  243. addr_len = strlen(address);
  244. rc = Socket_unix_new(address, addr_len, &(aClient->net.socket));
  245. }
  246. #endif
  247. else {
  248. #if defined(OPENSSL)
  249. addr_len = MQTTProtocol_addressPort(address, &port, NULL, ssl ?
  250. (websocket ? WSS_DEFAULT_PORT : SECURE_MQTT_DEFAULT_PORT) :
  251. (websocket ? WS_DEFAULT_PORT : MQTT_DEFAULT_PORT) );
  252. #else
  253. addr_len = MQTTProtocol_addressPort(address, &port, NULL, websocket ? WS_DEFAULT_PORT : MQTT_DEFAULT_PORT);
  254. #endif
  255. #if defined(__GNUC__) && defined(__linux__)
  256. if (timeout < 0)
  257. rc = -1;
  258. else
  259. rc = Socket_new(address, addr_len, port, &(aClient->net.socket), timeout);
  260. #else
  261. rc = Socket_new(address, addr_len, port, &(aClient->net.socket));
  262. #endif
  263. }
  264. if (rc == EINPROGRESS || rc == EWOULDBLOCK)
  265. aClient->connect_state = TCP_IN_PROGRESS; /* TCP connect called - wait for connect completion */
  266. else if (rc == 0)
  267. { /* TCP connect completed. If SSL, send SSL connect */
  268. #if defined(OPENSSL)
  269. if (ssl)
  270. {
  271. if (aClient->net.https_proxy) {
  272. aClient->connect_state = PROXY_CONNECT_IN_PROGRESS;
  273. rc = Proxy_connect( &aClient->net, 1, address);
  274. }
  275. if (rc == 0 && SSLSocket_setSocketForSSL(&aClient->net, aClient->sslopts, address, addr_len) == 1)
  276. {
  277. rc = aClient->sslopts->struct_version >= 3 ?
  278. SSLSocket_connect(aClient->net.ssl, aClient->net.socket, address,
  279. aClient->sslopts->verify, aClient->sslopts->ssl_error_cb, aClient->sslopts->ssl_error_context) :
  280. SSLSocket_connect(aClient->net.ssl, aClient->net.socket, address,
  281. aClient->sslopts->verify, NULL, NULL);
  282. if (rc == TCPSOCKET_INTERRUPTED)
  283. aClient->connect_state = SSL_IN_PROGRESS; /* SSL connect called - wait for completion */
  284. }
  285. else
  286. rc = SOCKET_ERROR;
  287. }
  288. else if (aClient->net.http_proxy) {
  289. #else
  290. if (aClient->net.http_proxy) {
  291. #endif
  292. aClient->connect_state = PROXY_CONNECT_IN_PROGRESS;
  293. rc = Proxy_connect( &aClient->net, 0, address);
  294. }
  295. if ( websocket )
  296. {
  297. #if defined(OPENSSL)
  298. rc = WebSocket_connect(&aClient->net, ssl, address);
  299. #else
  300. rc = WebSocket_connect(&aClient->net, 0, address);
  301. #endif
  302. if ( rc == TCPSOCKET_INTERRUPTED )
  303. aClient->connect_state = WEBSOCKET_IN_PROGRESS; /* Websocket connect called - wait for completion */
  304. }
  305. if (rc == 0)
  306. {
  307. /* Now send the MQTT connect packet */
  308. if ((rc = MQTTPacket_send_connect(aClient, MQTTVersion, connectProperties, willProperties)) == 0)
  309. aClient->connect_state = WAIT_FOR_CONNACK; /* MQTT Connect sent - wait for CONNACK */
  310. else
  311. aClient->connect_state = NOT_IN_PROGRESS;
  312. }
  313. }
  314. exit:
  315. FUNC_EXIT_RC(rc);
  316. return rc;
  317. }
  318. /**
  319. * Process an incoming pingresp packet for a socket
  320. * @param pack pointer to the publish packet
  321. * @param sock the socket on which the packet was received
  322. * @return completion code
  323. */
  324. int MQTTProtocol_handlePingresps(void* pack, SOCKET sock)
  325. {
  326. Clients* client = NULL;
  327. ListElement* result = NULL;
  328. int rc = TCPSOCKET_COMPLETE;
  329. FUNC_ENTRY;
  330. result = ListFindItem(bstate->clients, &sock, clientSocketCompare);
  331. if (result)
  332. {
  333. client = (Clients*)(result->content);
  334. Log(LOG_PROTOCOL, 21, NULL, sock, client->clientID);
  335. }
  336. client->ping_outstanding = 0;
  337. FUNC_EXIT_RC(rc);
  338. return rc;
  339. }
  340. /**
  341. * MQTT outgoing subscribe processing for a client
  342. * @param client the client structure
  343. * @param topics list of topics
  344. * @param qoss corresponding list of QoSs
  345. * @param opts MQTT 5.0 subscribe options
  346. * @param props MQTT 5.0 subscribe properties
  347. * @return completion code
  348. */
  349. int MQTTProtocol_subscribe(Clients* client, List* topics, List* qoss, int msgID,
  350. MQTTSubscribe_options* opts, MQTTProperties* props)
  351. {
  352. int rc = 0;
  353. FUNC_ENTRY;
  354. rc = MQTTPacket_send_subscribe(topics, qoss, opts, props, msgID, 0, client);
  355. FUNC_EXIT_RC(rc);
  356. return rc;
  357. }
  358. /**
  359. * Process an incoming suback packet for a socket
  360. * @param pack pointer to the publish packet
  361. * @param sock the socket on which the packet was received
  362. * @return completion code
  363. */
  364. int MQTTProtocol_handleSubacks(void* pack, SOCKET sock)
  365. {
  366. Suback* suback = (Suback*)pack;
  367. Clients* client = NULL;
  368. ListElement* result = NULL;
  369. int rc = TCPSOCKET_COMPLETE;
  370. FUNC_ENTRY;
  371. result = ListFindItem(bstate->clients, &sock, clientSocketCompare);
  372. if (result)
  373. {
  374. client = (Clients*)(result->content);
  375. Log(LOG_PROTOCOL, 23, NULL, sock, client->clientID, suback->msgId);
  376. }
  377. MQTTPacket_freeSuback(suback);
  378. FUNC_EXIT_RC(rc);
  379. return rc;
  380. }
  381. /**
  382. * MQTT outgoing unsubscribe processing for a client
  383. * @param client the client structure
  384. * @param topics list of topics
  385. * @return completion code
  386. */
  387. int MQTTProtocol_unsubscribe(Clients* client, List* topics, int msgID, MQTTProperties* props)
  388. {
  389. int rc = 0;
  390. FUNC_ENTRY;
  391. rc = MQTTPacket_send_unsubscribe(topics, props, msgID, 0, client);
  392. FUNC_EXIT_RC(rc);
  393. return rc;
  394. }
  395. /**
  396. * Process an incoming unsuback packet for a socket
  397. * @param pack pointer to the publish packet
  398. * @param sock the socket on which the packet was received
  399. * @return completion code
  400. */
  401. int MQTTProtocol_handleUnsubacks(void* pack, SOCKET sock)
  402. {
  403. Unsuback* unsuback = (Unsuback*)pack;
  404. Clients* client = NULL;
  405. ListElement* result = NULL;
  406. int rc = TCPSOCKET_COMPLETE;
  407. FUNC_ENTRY;
  408. result = ListFindItem(bstate->clients, &sock, clientSocketCompare);
  409. if (result)
  410. {
  411. client = (Clients*)(result->content);
  412. Log(LOG_PROTOCOL, 24, NULL, sock, client->clientID, unsuback->msgId);
  413. }
  414. MQTTPacket_freeUnsuback(unsuback);
  415. FUNC_EXIT_RC(rc);
  416. return rc;
  417. }
  418. /**
  419. * Process an incoming disconnect packet for a socket
  420. * @param pack pointer to the disconnect packet
  421. * @param sock the socket on which the packet was received
  422. * @return completion code
  423. */
  424. int MQTTProtocol_handleDisconnects(void* pack, SOCKET sock)
  425. {
  426. Ack* disconnect = (Ack*)pack;
  427. Clients* client = NULL;
  428. ListElement* result = NULL;
  429. int rc = TCPSOCKET_COMPLETE;
  430. FUNC_ENTRY;
  431. result = ListFindItem(bstate->clients, &sock, clientSocketCompare);
  432. if (result)
  433. {
  434. client = (Clients*)(result->content);
  435. Log(LOG_PROTOCOL, 30, NULL, sock, client->clientID, disconnect->rc);
  436. }
  437. MQTTPacket_freeAck(disconnect);
  438. FUNC_EXIT_RC(rc);
  439. return rc;
  440. }