bounded_fifo_queue.hpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. * Copyright Andrey Semashev 2007 - 2015.
  3. * Distributed under the Boost Software License, Version 1.0.
  4. * (See accompanying file LICENSE_1_0.txt or copy at
  5. * http://www.boost.org/LICENSE_1_0.txt)
  6. */
  7. /*!
  8. * \file bounded_fifo_queue.hpp
  9. * \author Andrey Semashev
  10. * \date 04.01.2012
  11. *
  12. * The header contains implementation of bounded FIFO queueing strategy for
  13. * the asynchronous sink frontend.
  14. */
  15. #ifndef BOOST_LOG_SINKS_BOUNDED_FIFO_QUEUE_HPP_INCLUDED_
  16. #define BOOST_LOG_SINKS_BOUNDED_FIFO_QUEUE_HPP_INCLUDED_
  17. #include <boost/log/detail/config.hpp>
  18. #ifdef BOOST_HAS_PRAGMA_ONCE
  19. #pragma once
  20. #endif
  21. #if defined(BOOST_LOG_NO_THREADS)
  22. #error Boost.Log: This header content is only supported in multithreaded environment
  23. #endif
  24. #include <cstddef>
  25. #include <queue>
  26. #include <mutex>
  27. #include <condition_variable>
  28. #include <boost/log/core/record_view.hpp>
  29. #include <boost/log/detail/header.hpp>
  30. namespace boost {
  31. BOOST_LOG_OPEN_NAMESPACE
  32. namespace sinks {
  33. /*!
  34. * \brief Bounded FIFO log record queueing strategy
  35. *
  36. * The \c bounded_fifo_queue class is intended to be used with
  37. * the \c asynchronous_sink frontend as a log record queueing strategy.
  38. *
  39. * This strategy describes log record queueing logic.
  40. * The queue has a limited capacity, upon reaching which the enqueue operation will
  41. * invoke the overflow handling strategy specified in the \c OverflowStrategyT
  42. * template parameter to handle the situation. The library provides overflow handling
  43. * strategies for most common cases: \c drop_on_overflow will silently discard the log record,
  44. * and \c block_on_overflow will put the enqueueing thread to wait until there is space
  45. * in the queue.
  46. *
  47. * The log record queue imposes no ordering over the queued
  48. * elements aside from the order in which they are enqueued.
  49. */
  50. template< std::size_t MaxQueueSizeV, typename OverflowStrategyT >
  51. class bounded_fifo_queue :
  52. private OverflowStrategyT
  53. {
  54. private:
  55. typedef OverflowStrategyT overflow_strategy;
  56. typedef std::queue< record_view > queue_type;
  57. typedef std::mutex mutex_type;
  58. private:
  59. //! Synchronization primitive
  60. mutex_type m_mutex;
  61. //! Condition to block the consuming thread on
  62. std::condition_variable m_cond;
  63. //! Log record queue
  64. queue_type m_queue;
  65. //! Interruption flag
  66. bool m_interruption_requested;
  67. protected:
  68. //! Default constructor
  69. bounded_fifo_queue() : m_interruption_requested(false)
  70. {
  71. }
  72. //! Initializing constructor
  73. template< typename ArgsT >
  74. explicit bounded_fifo_queue(ArgsT const&) : m_interruption_requested(false)
  75. {
  76. }
  77. //! Enqueues log record to the queue
  78. void enqueue(record_view const& rec)
  79. {
  80. std::unique_lock< mutex_type > lock(m_mutex);
  81. std::size_t size = m_queue.size();
  82. for (; size >= MaxQueueSizeV; size = m_queue.size())
  83. {
  84. if (!overflow_strategy::on_overflow(rec, lock))
  85. return;
  86. }
  87. m_queue.push(rec);
  88. if (size == 0)
  89. m_cond.notify_one();
  90. }
  91. //! Attempts to enqueue log record to the queue
  92. bool try_enqueue(record_view const& rec)
  93. {
  94. std::unique_lock< mutex_type > lock(m_mutex, std::try_to_lock);
  95. if (lock.owns_lock())
  96. {
  97. const std::size_t size = m_queue.size();
  98. // Do not invoke the bounding strategy in case of overflow as it may block
  99. if (size < MaxQueueSizeV)
  100. {
  101. m_queue.push(rec);
  102. if (size == 0)
  103. m_cond.notify_one();
  104. return true;
  105. }
  106. }
  107. return false;
  108. }
  109. //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty
  110. bool try_dequeue_ready(record_view& rec)
  111. {
  112. return try_dequeue(rec);
  113. }
  114. //! Attempts to dequeue log record from the queue, does not block if the queue is empty
  115. bool try_dequeue(record_view& rec)
  116. {
  117. std::lock_guard< mutex_type > lock(m_mutex);
  118. const std::size_t size = m_queue.size();
  119. if (size > 0)
  120. {
  121. rec.swap(m_queue.front());
  122. m_queue.pop();
  123. overflow_strategy::on_queue_space_available();
  124. return true;
  125. }
  126. return false;
  127. }
  128. //! Dequeues log record from the queue, blocks if the queue is empty
  129. bool dequeue_ready(record_view& rec)
  130. {
  131. std::unique_lock< mutex_type > lock(m_mutex);
  132. while (!m_interruption_requested)
  133. {
  134. const std::size_t size = m_queue.size();
  135. if (size > 0)
  136. {
  137. rec.swap(m_queue.front());
  138. m_queue.pop();
  139. overflow_strategy::on_queue_space_available();
  140. return true;
  141. }
  142. else
  143. {
  144. m_cond.wait(lock);
  145. }
  146. }
  147. m_interruption_requested = false;
  148. return false;
  149. }
  150. //! Wakes a thread possibly blocked in the \c dequeue method
  151. void interrupt_dequeue()
  152. {
  153. std::lock_guard< mutex_type > lock(m_mutex);
  154. m_interruption_requested = true;
  155. overflow_strategy::interrupt();
  156. m_cond.notify_one();
  157. }
  158. };
  159. } // namespace sinks
  160. BOOST_LOG_CLOSE_NAMESPACE // namespace log
  161. } // namespace boost
  162. #include <boost/log/detail/footer.hpp>
  163. #endif // BOOST_LOG_SINKS_BOUNDED_FIFO_QUEUE_HPP_INCLUDED_