| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- /////////////////////////////////////////////////////////////////////////////
- /// @file thread_queue.h
- /// Implementation of the template class 'thread_queue', a thread-safe,
- /// blocking queue for passing data between threads, safe for use with smart
- /// pointers.
- /// @date 09-Jan-2017
- /////////////////////////////////////////////////////////////////////////////
- /*******************************************************************************
- * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v2.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v20.html
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Frank Pagliughi - initial implementation and documentation
- *******************************************************************************/
- #ifndef __mqtt_thread_queue_h
- #define __mqtt_thread_queue_h
- #include <algorithm>
- #include <condition_variable>
- #include <deque>
- #include <limits>
- #include <mutex>
- #include <queue>
- #include <thread>
- namespace mqtt {
- /**
- * Exception that is thrown when operations are performed on a closed
- * queue.
- */
- class queue_closed : public std::runtime_error
- {
- public:
- queue_closed() : std::runtime_error("queue is closed") {}
- };
- /////////////////////////////////////////////////////////////////////////////
- /**
- * A thread-safe queue for inter-thread communication.
- *
- * This is a locking queue with blocking operations. The get() operations
- * can always block on an empty queue, but have variations for non-blocking
- * (try_get) and bounded-time blocking (try_get_for, try_get_until).
- * @par
- * The default queue has a capacity that is unbounded in the practical
- * sense, limited by available memory. In this mode the object will not
- * block when placing values into the queue. A capacity can bet set with the
- * constructor or, at any time later by calling the @ref capacity(size_type)
- * method. Using this latter method, the capacity can be set to an amount
- * smaller than the current size of the queue. In that case all put's to the
- * queue will block until the number of items are removed from the queue to
- * bring the size below the new capacity.
- * @par
- * The queue can be closed. After that, no new items can be placed into it;
- * a `put()` calls will fail. Receivers can still continue to get any items
- * out of the queue that were added before it was closed. Once there are no
- * more items left in the queue after it is closed, it is considered "done".
- * Nothing useful can be done with the queue.
- * @par
- * Note that the queue uses move semantics to place items into the queue and
- * remove items from the queue. This means that the type, T, of the data
- * held by the queue only needs to follow move semantics; not copy
- * semantics. In addition, this means that copies of the value will @em not
- * be left in the queue. This is especially useful when creating queues of
- * shared pointers, as the "dead" part of the queue will not hold onto a
- * reference count after the item has been removed from the queue.
- *
- * @tparam T The type of the items to be held in the queue.
- * @tparam Container The type of the underlying container to use. It must
- * support back(), front(), push_back(), pop_front().
- */
- template <typename T, class Container = std::deque<T>>
- class thread_queue
- {
- public:
- /** The underlying container type to use for the queue. */
- using container_type = Container;
- /** The type of items to be held in the queue. */
- using value_type = T;
- /** The type used to specify number of items in the container. */
- using size_type = typename Container::size_type;
- /** The maximum capacity of the queue. */
- static constexpr size_type MAX_CAPACITY = (std::numeric_limits<size_type>::max)();
- private:
- /** Object lock */
- mutable std::mutex lock_;
- /** Condition get signaled when item added to empty queue */
- std::condition_variable notEmptyCond_;
- /** Condition gets signaled then item removed from full queue */
- std::condition_variable notFullCond_;
- /** The capacity of the queue */
- size_type cap_{MAX_CAPACITY};
- /** Whether the queue is closed */
- bool closed_{false};
- /** The actual STL container to hold data */
- std::queue<T, Container> que_;
- /** Simple, scope-based lock guard */
- using guard = std::lock_guard<std::mutex>;
- /** General purpose guard */
- using unique_guard = std::unique_lock<std::mutex>;
- /** Checks if the queue is done (unsafe) */
- bool is_done() const { return closed_ && que_.empty(); }
- public:
- /**
- * Constructs a queue with the maximum capacity.
- * This is effectively an unbounded queue.
- */
- thread_queue() {}
- /**
- * Constructs a queue with the specified capacity.
- * This is a bounded queue.
- * @param cap The maximum number of items that can be placed in the
- * queue. The minimum capacity is 1.
- */
- explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
- /**
- * Determine if the queue is empty.
- * @return @em true if there are no elements in the queue, @em false if
- * there are any items in the queue.
- */
- bool empty() const {
- guard g{lock_};
- return que_.empty();
- }
- /**
- * Gets the capacity of the queue.
- * @return The maximum number of elements before the queue is full.
- */
- size_type capacity() const {
- guard g{lock_};
- return cap_;
- }
- /**
- * Sets the capacity of the queue.
- * Note that the capacity can be set to a value smaller than the current
- * size of the queue. In that event, all calls to put() will block until
- * a sufficient number of items are removed from the queue.
- */
- void capacity(size_type cap) {
- guard g{lock_};
- cap_ = std::max<size_type>(cap, 1);
- if (cap_ > que_.size())
- notFullCond_.notify_all();
- }
- /**
- * Gets the number of items in the queue.
- * @return The number of items in the queue.
- */
- size_type size() const {
- guard g{lock_};
- return que_.size();
- }
- /**
- * Close the queue.
- * Once closed, the queue will not accept any new items, but receievers
- * will still be able to get any remaining items out of the queue until
- * it is empty.
- */
- void close() {
- guard g{lock_};
- closed_ = true;
- notFullCond_.notify_all();
- notEmptyCond_.notify_all();
- }
- /**
- * Determines if the queue is closed.
- * Once closed, the queue will not accept any new items, but receievers
- * will still be able to get any remaining items out of the queue until
- * it is empty.
- * @return @em true if the queue is closed, @false otherwise.
- */
- bool closed() const {
- guard g{lock_};
- return closed_;
- }
- /**
- * Determines if all possible operations are done on the queue.
- * If the queue is closed and empty, then no further useful operations
- * can be done on it.
- * @return @true if the queue is closed and empty, @em false otherwise.
- */
- bool done() const {
- guard g{lock_};
- return is_done();
- }
- /**
- * Clear the contents of the queue.
- * This discards all items in the queue.
- */
- void clear() {
- guard g{lock_};
- while (!que_.empty()) que_.pop();
- notFullCond_.notify_all();
- }
- /**
- * Put an item into the queue.
- * If the queue is full, this will block the caller until items are
- * removed bringing the size less than the capacity.
- * @param val The value to add to the queue.
- */
- void put(value_type val) {
- unique_guard g{lock_};
- notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; });
- if (closed_)
- throw queue_closed{};
- que_.emplace(std::move(val));
- notEmptyCond_.notify_one();
- }
- /**
- * Non-blocking attempt to place an item into the queue.
- * @param val The value to add to the queue.
- * @return @em true if the item was added to the queue, @em false if the
- * item was not added because the queue is currently full.
- */
- bool try_put(value_type val) {
- guard g{lock_};
- if (que_.size() >= cap_ || closed_)
- return false;
- que_.emplace(std::move(val));
- notEmptyCond_.notify_one();
- return true;
- }
- /**
- * Attempt to place an item in the queue with a bounded wait.
- * This will attempt to place the value in the queue, but if it is full,
- * it will wait up to the specified time duration before timing out.
- * @param val The value to add to the queue.
- * @param relTime The amount of time to wait until timing out.
- * @return @em true if the value was added to the queue, @em false if a
- * timeout occurred.
- */
- template <typename Rep, class Period>
- bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
- unique_guard g{lock_};
- bool to = !notFullCond_.wait_for(g, relTime, [this] {
- return que_.size() < cap_ || closed_;
- });
- if (to || closed_)
- return false;
- que_.emplace(std::move(val));
- notEmptyCond_.notify_one();
- return true;
- }
- /**
- * Attempt to place an item in the queue with a bounded wait to an
- * absolute time point.
- * This will attempt to place the value in the queue, but if it is full,
- * it will wait up until the specified time before timing out.
- * @param val The value to add to the queue.
- * @param absTime The absolute time to wait to before timing out.
- * @return @em true if the value was added to the queue, @em false if a
- * timeout occurred.
- */
- template <class Clock, class Duration>
- bool try_put_until(
- value_type val, const std::chrono::time_point<Clock, Duration>& absTime
- ) {
- unique_guard g{lock_};
- bool to = !notFullCond_.wait_until(g, absTime, [this] {
- return que_.size() < cap_ || closed_;
- });
- if (to || closed_)
- return false;
- que_.emplace(std::move(val));
- notEmptyCond_.notify_one();
- return true;
- }
- /**
- * Retrieve a value from the queue.
- * If the queue is empty, this will block indefinitely until a value is
- * added to the queue by another thread,
- * @param val Pointer to a variable to receive the value.
- */
- bool get(value_type* val) {
- if (!val)
- return false;
- unique_guard g{lock_};
- notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
- if (que_.empty()) // We must be done
- return false;
- *val = std::move(que_.front());
- que_.pop();
- notFullCond_.notify_one();
- return true;
- }
- /**
- * Retrieve a value from the queue.
- * If the queue is empty, this will block indefinitely until a value is
- * added to the queue by another thread,
- * @return The value removed from the queue
- */
- value_type get() {
- unique_guard g{lock_};
- notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
- if (que_.empty()) // We must be done
- throw queue_closed{};
- value_type val = std::move(que_.front());
- que_.pop();
- notFullCond_.notify_one();
- return val;
- }
- /**
- * Attempts to remove a value from the queue without blocking.
- * If the queue is currently empty, this will return immediately with a
- * failure, otherwise it will get the next value and return it.
- * @param val Pointer to a variable to receive the value.
- * @return @em true if a value was removed from the queue, @em false if
- * the queue is empty.
- */
- bool try_get(value_type* val) {
- if (!val)
- return false;
- guard g{lock_};
- if (que_.empty())
- return false;
- *val = std::move(que_.front());
- que_.pop();
- notFullCond_.notify_one();
- return true;
- }
- /**
- * Attempt to remove an item from the queue for a bounded amount of time.
- * This will retrieve the next item from the queue. If the queue is
- * empty, it will wait the specified amount of time for an item to arrive
- * before timing out.
- * @param val Pointer to a variable to receive the value.
- * @param relTime The amount of time to wait until timing out.
- * @return @em true if the value was removed the queue, @em false if a
- * timeout occurred.
- */
- template <typename Rep, class Period>
- bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
- if (!val)
- return false;
- unique_guard g{lock_};
- notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; });
- if (que_.empty())
- return false;
- *val = std::move(que_.front());
- que_.pop();
- notFullCond_.notify_one();
- return true;
- }
- /**
- * Attempt to remove an item from the queue for a bounded amount of time.
- * This will retrieve the next item from the queue. If the queue is
- * empty, it will wait until the specified time for an item to arrive
- * before timing out.
- * @param val Pointer to a variable to receive the value.
- * @param absTime The absolute time to wait to before timing out.
- * @return @em true if the value was removed from the queue, @em false
- * if a timeout occurred.
- */
- template <class Clock, class Duration>
- bool try_get_until(
- value_type* val, const std::chrono::time_point<Clock, Duration>& absTime
- ) {
- if (!val)
- return false;
- unique_guard g{lock_};
- notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty() || closed_; });
- if (que_.empty())
- return false;
- *val = std::move(que_.front());
- que_.pop();
- notFullCond_.notify_one();
- return true;
- }
- };
- /////////////////////////////////////////////////////////////////////////////
- } // namespace mqtt
- #endif // __mqtt_thread_queue_h
|