thread_queue.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. /////////////////////////////////////////////////////////////////////////////
  2. /// @file thread_queue.h
  3. /// Implementation of the template class 'thread_queue', a thread-safe,
  4. /// blocking queue for passing data between threads, safe for use with smart
  5. /// pointers.
  6. /// @date 09-Jan-2017
  7. /////////////////////////////////////////////////////////////////////////////
  8. /*******************************************************************************
  9. * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
  10. *
  11. * All rights reserved. This program and the accompanying materials
  12. * are made available under the terms of the Eclipse Public License v2.0
  13. * and Eclipse Distribution License v1.0 which accompany this distribution.
  14. *
  15. * The Eclipse Public License is available at
  16. * http://www.eclipse.org/legal/epl-v20.html
  17. * and the Eclipse Distribution License is available at
  18. * http://www.eclipse.org/org/documents/edl-v10.php.
  19. *
  20. * Contributors:
  21. * Frank Pagliughi - initial implementation and documentation
  22. *******************************************************************************/
  23. #ifndef __mqtt_thread_queue_h
  24. #define __mqtt_thread_queue_h
  25. #include <algorithm>
  26. #include <condition_variable>
  27. #include <deque>
  28. #include <limits>
  29. #include <mutex>
  30. #include <queue>
  31. #include <thread>
  32. namespace mqtt {
  33. /**
  34. * Exception that is thrown when operations are performed on a closed
  35. * queue.
  36. */
  37. class queue_closed : public std::runtime_error
  38. {
  39. public:
  40. queue_closed() : std::runtime_error("queue is closed") {}
  41. };
  42. /////////////////////////////////////////////////////////////////////////////
  43. /**
  44. * A thread-safe queue for inter-thread communication.
  45. *
  46. * This is a locking queue with blocking operations. The get() operations
  47. * can always block on an empty queue, but have variations for non-blocking
  48. * (try_get) and bounded-time blocking (try_get_for, try_get_until).
  49. * @par
  50. * The default queue has a capacity that is unbounded in the practical
  51. * sense, limited by available memory. In this mode the object will not
  52. * block when placing values into the queue. A capacity can bet set with the
  53. * constructor or, at any time later by calling the @ref capacity(size_type)
  54. * method. Using this latter method, the capacity can be set to an amount
  55. * smaller than the current size of the queue. In that case all put's to the
  56. * queue will block until the number of items are removed from the queue to
  57. * bring the size below the new capacity.
  58. * @par
  59. * The queue can be closed. After that, no new items can be placed into it;
  60. * a `put()` calls will fail. Receivers can still continue to get any items
  61. * out of the queue that were added before it was closed. Once there are no
  62. * more items left in the queue after it is closed, it is considered "done".
  63. * Nothing useful can be done with the queue.
  64. * @par
  65. * Note that the queue uses move semantics to place items into the queue and
  66. * remove items from the queue. This means that the type, T, of the data
  67. * held by the queue only needs to follow move semantics; not copy
  68. * semantics. In addition, this means that copies of the value will @em not
  69. * be left in the queue. This is especially useful when creating queues of
  70. * shared pointers, as the "dead" part of the queue will not hold onto a
  71. * reference count after the item has been removed from the queue.
  72. *
  73. * @tparam T The type of the items to be held in the queue.
  74. * @tparam Container The type of the underlying container to use. It must
  75. * support back(), front(), push_back(), pop_front().
  76. */
  77. template <typename T, class Container = std::deque<T>>
  78. class thread_queue
  79. {
  80. public:
  81. /** The underlying container type to use for the queue. */
  82. using container_type = Container;
  83. /** The type of items to be held in the queue. */
  84. using value_type = T;
  85. /** The type used to specify number of items in the container. */
  86. using size_type = typename Container::size_type;
  87. /** The maximum capacity of the queue. */
  88. static constexpr size_type MAX_CAPACITY = (std::numeric_limits<size_type>::max)();
  89. private:
  90. /** Object lock */
  91. mutable std::mutex lock_;
  92. /** Condition get signaled when item added to empty queue */
  93. std::condition_variable notEmptyCond_;
  94. /** Condition gets signaled then item removed from full queue */
  95. std::condition_variable notFullCond_;
  96. /** The capacity of the queue */
  97. size_type cap_{MAX_CAPACITY};
  98. /** Whether the queue is closed */
  99. bool closed_{false};
  100. /** The actual STL container to hold data */
  101. std::queue<T, Container> que_;
  102. /** Simple, scope-based lock guard */
  103. using guard = std::lock_guard<std::mutex>;
  104. /** General purpose guard */
  105. using unique_guard = std::unique_lock<std::mutex>;
  106. /** Checks if the queue is done (unsafe) */
  107. bool is_done() const { return closed_ && que_.empty(); }
  108. public:
  109. /**
  110. * Constructs a queue with the maximum capacity.
  111. * This is effectively an unbounded queue.
  112. */
  113. thread_queue() {}
  114. /**
  115. * Constructs a queue with the specified capacity.
  116. * This is a bounded queue.
  117. * @param cap The maximum number of items that can be placed in the
  118. * queue. The minimum capacity is 1.
  119. */
  120. explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
  121. /**
  122. * Determine if the queue is empty.
  123. * @return @em true if there are no elements in the queue, @em false if
  124. * there are any items in the queue.
  125. */
  126. bool empty() const {
  127. guard g{lock_};
  128. return que_.empty();
  129. }
  130. /**
  131. * Gets the capacity of the queue.
  132. * @return The maximum number of elements before the queue is full.
  133. */
  134. size_type capacity() const {
  135. guard g{lock_};
  136. return cap_;
  137. }
  138. /**
  139. * Sets the capacity of the queue.
  140. * Note that the capacity can be set to a value smaller than the current
  141. * size of the queue. In that event, all calls to put() will block until
  142. * a sufficient number of items are removed from the queue.
  143. */
  144. void capacity(size_type cap) {
  145. guard g{lock_};
  146. cap_ = std::max<size_type>(cap, 1);
  147. if (cap_ > que_.size())
  148. notFullCond_.notify_all();
  149. }
  150. /**
  151. * Gets the number of items in the queue.
  152. * @return The number of items in the queue.
  153. */
  154. size_type size() const {
  155. guard g{lock_};
  156. return que_.size();
  157. }
  158. /**
  159. * Close the queue.
  160. * Once closed, the queue will not accept any new items, but receievers
  161. * will still be able to get any remaining items out of the queue until
  162. * it is empty.
  163. */
  164. void close() {
  165. guard g{lock_};
  166. closed_ = true;
  167. notFullCond_.notify_all();
  168. notEmptyCond_.notify_all();
  169. }
  170. /**
  171. * Determines if the queue is closed.
  172. * Once closed, the queue will not accept any new items, but receievers
  173. * will still be able to get any remaining items out of the queue until
  174. * it is empty.
  175. * @return @em true if the queue is closed, @false otherwise.
  176. */
  177. bool closed() const {
  178. guard g{lock_};
  179. return closed_;
  180. }
  181. /**
  182. * Determines if all possible operations are done on the queue.
  183. * If the queue is closed and empty, then no further useful operations
  184. * can be done on it.
  185. * @return @true if the queue is closed and empty, @em false otherwise.
  186. */
  187. bool done() const {
  188. guard g{lock_};
  189. return is_done();
  190. }
  191. /**
  192. * Clear the contents of the queue.
  193. * This discards all items in the queue.
  194. */
  195. void clear() {
  196. guard g{lock_};
  197. while (!que_.empty()) que_.pop();
  198. notFullCond_.notify_all();
  199. }
  200. /**
  201. * Put an item into the queue.
  202. * If the queue is full, this will block the caller until items are
  203. * removed bringing the size less than the capacity.
  204. * @param val The value to add to the queue.
  205. */
  206. void put(value_type val) {
  207. unique_guard g{lock_};
  208. notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; });
  209. if (closed_)
  210. throw queue_closed{};
  211. que_.emplace(std::move(val));
  212. notEmptyCond_.notify_one();
  213. }
  214. /**
  215. * Non-blocking attempt to place an item into the queue.
  216. * @param val The value to add to the queue.
  217. * @return @em true if the item was added to the queue, @em false if the
  218. * item was not added because the queue is currently full.
  219. */
  220. bool try_put(value_type val) {
  221. guard g{lock_};
  222. if (que_.size() >= cap_ || closed_)
  223. return false;
  224. que_.emplace(std::move(val));
  225. notEmptyCond_.notify_one();
  226. return true;
  227. }
  228. /**
  229. * Attempt to place an item in the queue with a bounded wait.
  230. * This will attempt to place the value in the queue, but if it is full,
  231. * it will wait up to the specified time duration before timing out.
  232. * @param val The value to add to the queue.
  233. * @param relTime The amount of time to wait until timing out.
  234. * @return @em true if the value was added to the queue, @em false if a
  235. * timeout occurred.
  236. */
  237. template <typename Rep, class Period>
  238. bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
  239. unique_guard g{lock_};
  240. bool to = !notFullCond_.wait_for(g, relTime, [this] {
  241. return que_.size() < cap_ || closed_;
  242. });
  243. if (to || closed_)
  244. return false;
  245. que_.emplace(std::move(val));
  246. notEmptyCond_.notify_one();
  247. return true;
  248. }
  249. /**
  250. * Attempt to place an item in the queue with a bounded wait to an
  251. * absolute time point.
  252. * This will attempt to place the value in the queue, but if it is full,
  253. * it will wait up until the specified time before timing out.
  254. * @param val The value to add to the queue.
  255. * @param absTime The absolute time to wait to before timing out.
  256. * @return @em true if the value was added to the queue, @em false if a
  257. * timeout occurred.
  258. */
  259. template <class Clock, class Duration>
  260. bool try_put_until(
  261. value_type val, const std::chrono::time_point<Clock, Duration>& absTime
  262. ) {
  263. unique_guard g{lock_};
  264. bool to = !notFullCond_.wait_until(g, absTime, [this] {
  265. return que_.size() < cap_ || closed_;
  266. });
  267. if (to || closed_)
  268. return false;
  269. que_.emplace(std::move(val));
  270. notEmptyCond_.notify_one();
  271. return true;
  272. }
  273. /**
  274. * Retrieve a value from the queue.
  275. * If the queue is empty, this will block indefinitely until a value is
  276. * added to the queue by another thread,
  277. * @param val Pointer to a variable to receive the value.
  278. */
  279. bool get(value_type* val) {
  280. if (!val)
  281. return false;
  282. unique_guard g{lock_};
  283. notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
  284. if (que_.empty()) // We must be done
  285. return false;
  286. *val = std::move(que_.front());
  287. que_.pop();
  288. notFullCond_.notify_one();
  289. return true;
  290. }
  291. /**
  292. * Retrieve a value from the queue.
  293. * If the queue is empty, this will block indefinitely until a value is
  294. * added to the queue by another thread,
  295. * @return The value removed from the queue
  296. */
  297. value_type get() {
  298. unique_guard g{lock_};
  299. notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
  300. if (que_.empty()) // We must be done
  301. throw queue_closed{};
  302. value_type val = std::move(que_.front());
  303. que_.pop();
  304. notFullCond_.notify_one();
  305. return val;
  306. }
  307. /**
  308. * Attempts to remove a value from the queue without blocking.
  309. * If the queue is currently empty, this will return immediately with a
  310. * failure, otherwise it will get the next value and return it.
  311. * @param val Pointer to a variable to receive the value.
  312. * @return @em true if a value was removed from the queue, @em false if
  313. * the queue is empty.
  314. */
  315. bool try_get(value_type* val) {
  316. if (!val)
  317. return false;
  318. guard g{lock_};
  319. if (que_.empty())
  320. return false;
  321. *val = std::move(que_.front());
  322. que_.pop();
  323. notFullCond_.notify_one();
  324. return true;
  325. }
  326. /**
  327. * Attempt to remove an item from the queue for a bounded amount of time.
  328. * This will retrieve the next item from the queue. If the queue is
  329. * empty, it will wait the specified amount of time for an item to arrive
  330. * before timing out.
  331. * @param val Pointer to a variable to receive the value.
  332. * @param relTime The amount of time to wait until timing out.
  333. * @return @em true if the value was removed the queue, @em false if a
  334. * timeout occurred.
  335. */
  336. template <typename Rep, class Period>
  337. bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
  338. if (!val)
  339. return false;
  340. unique_guard g{lock_};
  341. notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; });
  342. if (que_.empty())
  343. return false;
  344. *val = std::move(que_.front());
  345. que_.pop();
  346. notFullCond_.notify_one();
  347. return true;
  348. }
  349. /**
  350. * Attempt to remove an item from the queue for a bounded amount of time.
  351. * This will retrieve the next item from the queue. If the queue is
  352. * empty, it will wait until the specified time for an item to arrive
  353. * before timing out.
  354. * @param val Pointer to a variable to receive the value.
  355. * @param absTime The absolute time to wait to before timing out.
  356. * @return @em true if the value was removed from the queue, @em false
  357. * if a timeout occurred.
  358. */
  359. template <class Clock, class Duration>
  360. bool try_get_until(
  361. value_type* val, const std::chrono::time_point<Clock, Duration>& absTime
  362. ) {
  363. if (!val)
  364. return false;
  365. unique_guard g{lock_};
  366. notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty() || closed_; });
  367. if (que_.empty())
  368. return false;
  369. *val = std::move(que_.front());
  370. que_.pop();
  371. notFullCond_.notify_one();
  372. return true;
  373. }
  374. };
  375. /////////////////////////////////////////////////////////////////////////////
  376. } // namespace mqtt
  377. #endif // __mqtt_thread_queue_h