context_spmc_queue.hpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. 
  2. // Copyright Oliver Kowalke 2013.
  3. // Distributed under the Boost Software License, Version 1.0.
  4. // (See accompanying file LICENSE_1_0.txt or copy at
  5. // http://www.boost.org/LICENSE_1_0.txt)
  6. #ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
  7. #define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
  8. #include <atomic>
  9. #include <cstddef>
  10. #include <cstdint>
  11. #include <memory>
  12. #include <type_traits>
  13. #include <utility>
  14. #include <boost/assert.hpp>
  15. #include <boost/config.hpp>
  16. #include <boost/fiber/detail/config.hpp>
  17. #include <boost/fiber/context.hpp>
  18. // David Chase and Yossi Lev. Dynamic circular work-stealing deque.
  19. // In SPAA ’05: Proceedings of the seventeenth annual ACM symposium
  20. // on Parallelism in algorithms and architectures, pages 21–28,
  21. // New York, NY, USA, 2005. ACM.
  22. //
  23. // Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013.
  24. // Correct and efficient work-stealing for weak memory models.
  25. // In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
  26. // of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
  27. #if BOOST_COMP_CLANG
  28. #pragma clang diagnostic push
  29. #pragma clang diagnostic ignored "-Wunused-private-field"
  30. #endif
  31. namespace boost {
  32. namespace fibers {
  33. namespace detail {
  34. class context_spmc_queue {
  35. private:
  36. class array {
  37. private:
  38. typedef std::atomic< context * > atomic_type;
  39. typedef atomic_type storage_type;
  40. std::size_t capacity_;
  41. storage_type * storage_;
  42. public:
  43. array( std::size_t capacity) :
  44. capacity_{ capacity },
  45. storage_{ new storage_type[capacity_] } {
  46. for ( std::size_t i = 0; i < capacity_; ++i) {
  47. ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
  48. }
  49. }
  50. ~array() {
  51. for ( std::size_t i = 0; i < capacity_; ++i) {
  52. reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
  53. }
  54. delete [] storage_;
  55. }
  56. std::size_t capacity() const noexcept {
  57. return capacity_;
  58. }
  59. void push( std::size_t bottom, context * ctx) noexcept {
  60. reinterpret_cast< atomic_type * >(
  61. std::addressof( storage_[bottom % capacity_]) )
  62. ->store( ctx, std::memory_order_relaxed);
  63. }
  64. context * pop( std::size_t top) noexcept {
  65. return reinterpret_cast< atomic_type * >(
  66. std::addressof( storage_[top % capacity_]) )
  67. ->load( std::memory_order_relaxed);
  68. }
  69. array * resize( std::size_t bottom, std::size_t top) {
  70. std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
  71. for ( std::size_t i = top; i != bottom; ++i) {
  72. tmp->push( i, pop( i) );
  73. }
  74. return tmp.release();
  75. }
  76. };
  77. std::atomic< std::size_t > top_{ 0 };
  78. std::atomic< std::size_t > bottom_{ 0 };
  79. std::atomic< array * > array_;
  80. std::vector< array * > old_arrays_{};
  81. char padding_[cacheline_length];
  82. public:
  83. context_spmc_queue( std::size_t capacity = 4096) :
  84. array_{ new array{ capacity } } {
  85. old_arrays_.reserve( 32);
  86. }
  87. ~context_spmc_queue() {
  88. for ( array * a : old_arrays_) {
  89. delete a;
  90. }
  91. delete array_.load();
  92. }
  93. context_spmc_queue( context_spmc_queue const&) = delete;
  94. context_spmc_queue & operator=( context_spmc_queue const&) = delete;
  95. bool empty() const noexcept {
  96. std::size_t bottom = bottom_.load( std::memory_order_relaxed);
  97. std::size_t top = top_.load( std::memory_order_relaxed);
  98. return bottom <= top;
  99. }
  100. void push( context * ctx) {
  101. std::size_t bottom = bottom_.load( std::memory_order_relaxed);
  102. std::size_t top = top_.load( std::memory_order_acquire);
  103. array * a = array_.load( std::memory_order_relaxed);
  104. if ( (a->capacity() - 1) < (bottom - top) ) {
  105. // queue is full
  106. // resize
  107. array * tmp = a->resize( bottom, top);
  108. old_arrays_.push_back( a);
  109. std::swap( a, tmp);
  110. array_.store( a, std::memory_order_relaxed);
  111. }
  112. a->push( bottom, ctx);
  113. std::atomic_thread_fence( std::memory_order_release);
  114. bottom_.store( bottom + 1, std::memory_order_relaxed);
  115. }
  116. context * pop() {
  117. std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
  118. array * a = array_.load( std::memory_order_relaxed);
  119. bottom_.store( bottom, std::memory_order_relaxed);
  120. std::atomic_thread_fence( std::memory_order_seq_cst);
  121. std::size_t top = top_.load( std::memory_order_relaxed);
  122. context * ctx = nullptr;
  123. if ( top <= bottom) {
  124. // queue is not empty
  125. ctx = a->pop( bottom);
  126. BOOST_ASSERT( nullptr != ctx);
  127. if ( top == bottom) {
  128. // last element dequeued
  129. if ( ! top_.compare_exchange_strong( top, top + 1,
  130. std::memory_order_seq_cst,
  131. std::memory_order_relaxed) ) {
  132. // lose the race
  133. ctx = nullptr;
  134. }
  135. bottom_.store( bottom + 1, std::memory_order_relaxed);
  136. }
  137. } else {
  138. // queue is empty
  139. bottom_.store( bottom + 1, std::memory_order_relaxed);
  140. }
  141. return ctx;
  142. }
  143. context * steal() {
  144. std::size_t top = top_.load( std::memory_order_acquire);
  145. std::atomic_thread_fence( std::memory_order_seq_cst);
  146. std::size_t bottom = bottom_.load( std::memory_order_acquire);
  147. context * ctx = nullptr;
  148. if ( top < bottom) {
  149. // queue is not empty
  150. array * a = array_.load( std::memory_order_consume);
  151. ctx = a->pop( top);
  152. BOOST_ASSERT( nullptr != ctx);
  153. // do not steal pinned context (e.g. main-/dispatcher-context)
  154. if ( ctx->is_context( type::pinned_context) ) {
  155. return nullptr;
  156. }
  157. if ( ! top_.compare_exchange_strong( top, top + 1,
  158. std::memory_order_seq_cst,
  159. std::memory_order_relaxed) ) {
  160. // lose the race
  161. return nullptr;
  162. }
  163. }
  164. return ctx;
  165. }
  166. };
  167. }}}
  168. #if BOOST_COMP_CLANG
  169. #pragma clang diagnostic pop
  170. #endif
  171. #endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H