spsc_queue.hpp 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970
  1. // lock-free single-producer/single-consumer ringbuffer
  2. // this algorithm is implemented in various projects (linux kernel)
  3. //
  4. // Copyright (C) 2009-2013 Tim Blechmann
  5. //
  6. // Distributed under the Boost Software License, Version 1.0. (See
  7. // accompanying file LICENSE_1_0.txt or copy at
  8. // http://www.boost.org/LICENSE_1_0.txt)
  9. #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
  10. #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
  11. #include <boost/config.hpp>
  12. #ifdef BOOST_HAS_PRAGMA_ONCE
  13. # pragma once
  14. #endif
  15. #include <algorithm>
  16. #include <memory>
  17. #include <type_traits>
  18. #include <boost/aligned_storage.hpp>
  19. #include <boost/assert.hpp>
  20. #include <boost/core/allocator_access.hpp>
  21. #include <boost/core/span.hpp>
  22. #include <boost/parameter/optional.hpp>
  23. #include <boost/parameter/parameters.hpp>
  24. #include <boost/static_assert.hpp>
  25. #include <boost/lockfree/detail/atomic.hpp>
  26. #include <boost/lockfree/detail/copy_payload.hpp>
  27. #include <boost/lockfree/detail/parameter.hpp>
  28. #include <boost/lockfree/detail/prefix.hpp>
  29. #include <boost/lockfree/detail/uses_optional.hpp>
  30. #include <boost/lockfree/lockfree_forward.hpp>
  31. namespace boost { namespace lockfree {
  32. namespace detail {
  33. template < typename T >
  34. class ringbuffer_base
  35. {
  36. #ifndef BOOST_DOXYGEN_INVOKED
  37. protected:
  38. typedef std::size_t size_t;
  39. static constexpr int padding_size = cacheline_bytes - sizeof( size_t );
  40. atomic< size_t > write_index_;
  41. char padding1[ padding_size ]; /* force read_index and write_index to different cache lines */
  42. atomic< size_t > read_index_;
  43. protected:
  44. ringbuffer_base( void ) :
  45. write_index_( 0 ),
  46. read_index_( 0 )
  47. {}
  48. static size_t next_index( size_t arg, size_t max_size )
  49. {
  50. size_t ret = arg + 1;
  51. while ( BOOST_UNLIKELY( ret >= max_size ) )
  52. ret -= max_size;
  53. return ret;
  54. }
  55. static size_t read_available( size_t write_index, size_t read_index, size_t max_size )
  56. {
  57. if ( write_index >= read_index )
  58. return write_index - read_index;
  59. const size_t ret = write_index + max_size - read_index;
  60. return ret;
  61. }
  62. static size_t write_available( size_t write_index, size_t read_index, size_t max_size )
  63. {
  64. size_t ret = read_index - write_index - 1;
  65. if ( write_index >= read_index )
  66. ret += max_size;
  67. return ret;
  68. }
  69. size_t read_available( size_t max_size ) const
  70. {
  71. size_t write_index = write_index_.load( memory_order_acquire );
  72. const size_t read_index = read_index_.load( memory_order_relaxed );
  73. return read_available( write_index, read_index, max_size );
  74. }
  75. size_t write_available( size_t max_size ) const
  76. {
  77. size_t write_index = write_index_.load( memory_order_relaxed );
  78. const size_t read_index = read_index_.load( memory_order_acquire );
  79. return write_available( write_index, read_index, max_size );
  80. }
  81. bool push( const T& t, T* buffer, size_t max_size )
  82. {
  83. const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
  84. const size_t next = next_index( write_index, max_size );
  85. if ( next == read_index_.load( memory_order_acquire ) )
  86. return false; /* ringbuffer is full */
  87. new ( buffer + write_index ) T( t ); // copy-construct
  88. write_index_.store( next, memory_order_release );
  89. return true;
  90. }
  91. bool push( T&& t, T* buffer, size_t max_size )
  92. {
  93. const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
  94. const size_t next = next_index( write_index, max_size );
  95. if ( next == read_index_.load( memory_order_acquire ) )
  96. return false; /* ringbuffer is full */
  97. new ( buffer + write_index ) T( std::forward< T >( t ) ); // move-construct
  98. write_index_.store( next, memory_order_release );
  99. return true;
  100. }
  101. size_t push( const T* input_buffer, size_t input_count, T* internal_buffer, size_t max_size )
  102. {
  103. return push( input_buffer, input_buffer + input_count, internal_buffer, max_size ) - input_buffer;
  104. }
  105. template < typename ConstIterator >
  106. ConstIterator push( ConstIterator begin, ConstIterator end, T* internal_buffer, size_t max_size )
  107. {
  108. // FIXME: avoid std::distance
  109. const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
  110. const size_t read_index = read_index_.load( memory_order_acquire );
  111. const size_t avail = write_available( write_index, read_index, max_size );
  112. if ( avail == 0 )
  113. return begin;
  114. size_t input_count = std::distance( begin, end );
  115. input_count = (std::min)( input_count, avail );
  116. size_t new_write_index = write_index + input_count;
  117. const ConstIterator last = std::next( begin, input_count );
  118. if ( write_index + input_count > max_size ) {
  119. /* copy data in two sections */
  120. const size_t count0 = max_size - write_index;
  121. const ConstIterator midpoint = std::next( begin, count0 );
  122. std::uninitialized_copy( begin, midpoint, internal_buffer + write_index );
  123. std::uninitialized_copy( midpoint, last, internal_buffer );
  124. new_write_index -= max_size;
  125. } else {
  126. std::uninitialized_copy( begin, last, internal_buffer + write_index );
  127. if ( new_write_index == max_size )
  128. new_write_index = 0;
  129. }
  130. write_index_.store( new_write_index, memory_order_release );
  131. return last;
  132. }
  133. template < typename Functor >
  134. bool consume_one( Functor&& functor, T* buffer, size_t max_size )
  135. {
  136. const size_t write_index = write_index_.load( memory_order_acquire );
  137. const size_t read_index = read_index_.load( memory_order_relaxed ); // only written from pop thread
  138. if ( empty( write_index, read_index ) )
  139. return false;
  140. T& object_to_consume = buffer[ read_index ];
  141. functor( std::move( object_to_consume ) );
  142. object_to_consume.~T();
  143. size_t next = next_index( read_index, max_size );
  144. read_index_.store( next, memory_order_release );
  145. return true;
  146. }
  147. template < typename Functor >
  148. size_t consume_all( Functor&& functor, T* internal_buffer, size_t max_size )
  149. {
  150. const size_t write_index = write_index_.load( memory_order_acquire );
  151. const size_t read_index = read_index_.load( memory_order_relaxed ); // only written from pop thread
  152. const size_t avail = read_available( write_index, read_index, max_size );
  153. if ( avail == 0 )
  154. return 0;
  155. const size_t output_count = avail;
  156. size_t new_read_index = read_index + output_count;
  157. if ( read_index + output_count > max_size ) {
  158. /* copy data in two sections */
  159. const size_t count0 = max_size - read_index;
  160. const size_t count1 = output_count - count0;
  161. run_functor_and_delete( internal_buffer + read_index, internal_buffer + max_size, functor );
  162. run_functor_and_delete( internal_buffer, internal_buffer + count1, functor );
  163. new_read_index -= max_size;
  164. } else {
  165. run_functor_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, functor );
  166. if ( new_read_index == max_size )
  167. new_read_index = 0;
  168. }
  169. read_index_.store( new_read_index, memory_order_release );
  170. return output_count;
  171. }
  172. size_t pop( T* output_buffer, size_t output_count, T* internal_buffer, size_t max_size )
  173. {
  174. const size_t write_index = write_index_.load( memory_order_acquire );
  175. const size_t read_index = read_index_.load( memory_order_relaxed ); // only written from pop thread
  176. const size_t avail = read_available( write_index, read_index, max_size );
  177. if ( avail == 0 )
  178. return 0;
  179. output_count = (std::min)( output_count, avail );
  180. size_t new_read_index = read_index + output_count;
  181. if ( read_index + output_count > max_size ) {
  182. /* copy data in two sections */
  183. const size_t count0 = max_size - read_index;
  184. const size_t count1 = output_count - count0;
  185. move_and_delete( internal_buffer + read_index, internal_buffer + max_size, output_buffer );
  186. move_and_delete( internal_buffer, internal_buffer + count1, output_buffer + count0 );
  187. new_read_index -= max_size;
  188. } else {
  189. move_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer );
  190. if ( new_read_index == max_size )
  191. new_read_index = 0;
  192. }
  193. read_index_.store( new_read_index, memory_order_release );
  194. return output_count;
  195. }
  196. template < typename OutputIterator >
  197. size_t pop_to_output_iterator( OutputIterator it, T* internal_buffer, size_t max_size )
  198. {
  199. const size_t write_index = write_index_.load( memory_order_acquire );
  200. const size_t read_index = read_index_.load( memory_order_relaxed ); // only written from pop thread
  201. const size_t avail = read_available( write_index, read_index, max_size );
  202. if ( avail == 0 )
  203. return 0;
  204. size_t new_read_index = read_index + avail;
  205. if ( read_index + avail > max_size ) {
  206. /* copy data in two sections */
  207. const size_t count0 = max_size - read_index;
  208. const size_t count1 = avail - count0;
  209. it = move_and_delete( internal_buffer + read_index, internal_buffer + max_size, it );
  210. move_and_delete( internal_buffer, internal_buffer + count1, it );
  211. new_read_index -= max_size;
  212. } else {
  213. move_and_delete( internal_buffer + read_index, internal_buffer + read_index + avail, it );
  214. if ( new_read_index == max_size )
  215. new_read_index = 0;
  216. }
  217. read_index_.store( new_read_index, memory_order_release );
  218. return avail;
  219. }
  220. const T& front( const T* internal_buffer ) const
  221. {
  222. const size_t read_index = read_index_.load( memory_order_relaxed ); // only written from pop thread
  223. return *( internal_buffer + read_index );
  224. }
  225. T& front( T* internal_buffer )
  226. {
  227. const size_t read_index = read_index_.load( memory_order_relaxed ); // only written from pop thread
  228. return *( internal_buffer + read_index );
  229. }
  230. #endif
  231. public:
  232. /** reset the ringbuffer
  233. *
  234. * \note Not thread-safe
  235. * */
  236. void reset( void )
  237. {
  238. if ( !std::is_trivially_destructible< T >::value ) {
  239. // make sure to call all destructors!
  240. consume_all( []( const T& ) {} );
  241. } else {
  242. write_index_.store( 0, memory_order_relaxed );
  243. read_index_.store( 0, memory_order_release );
  244. }
  245. }
  246. /** Check if the ringbuffer is empty
  247. *
  248. * \return true, if the ringbuffer is empty, false otherwise
  249. * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
  250. * */
  251. bool empty( void )
  252. {
  253. return empty( write_index_.load( memory_order_relaxed ), read_index_.load( memory_order_relaxed ) );
  254. }
  255. /**
  256. * \return true, if implementation is lock-free.
  257. *
  258. * */
  259. bool is_lock_free( void ) const
  260. {
  261. return write_index_.is_lock_free() && read_index_.is_lock_free();
  262. }
  263. private:
  264. bool empty( size_t write_index, size_t read_index )
  265. {
  266. return write_index == read_index;
  267. }
  268. template < class OutputIterator >
  269. OutputIterator move_and_delete( T* first, T* last, OutputIterator out )
  270. {
  271. if ( std::is_trivially_destructible< T >::value ) {
  272. return std::copy( first, last, out ); // will use memcpy if possible
  273. } else {
  274. for ( ; first != last; ++first, ++out ) {
  275. *out = std::move( *first );
  276. first->~T();
  277. }
  278. return out;
  279. }
  280. }
  281. template < class Functor >
  282. void run_functor_and_delete( T* first, T* last, Functor&& functor )
  283. {
  284. for ( ; first != last; ++first ) {
  285. functor( std::move( *first ) );
  286. first->~T();
  287. }
  288. }
  289. };
  290. template < typename T, std::size_t MaxSize >
  291. class compile_time_sized_ringbuffer : public ringbuffer_base< T >
  292. {
  293. typedef std::size_t size_type;
  294. static constexpr std::size_t max_size = MaxSize + 1;
  295. typedef
  296. typename boost::aligned_storage< max_size * sizeof( T ), boost::alignment_of< T >::value >::type storage_type;
  297. storage_type storage_;
  298. T* data()
  299. {
  300. return static_cast< T* >( storage_.address() );
  301. }
  302. const T* data() const
  303. {
  304. return static_cast< const T* >( storage_.address() );
  305. }
  306. protected:
  307. size_type max_number_of_elements() const
  308. {
  309. return max_size;
  310. }
  311. ~compile_time_sized_ringbuffer( void )
  312. {
  313. // destroy all remaining items
  314. consume_all( []( const T& ) {} );
  315. }
  316. public:
  317. bool push( const T& t )
  318. {
  319. return ringbuffer_base< T >::push( t, data(), max_size );
  320. }
  321. bool push( T&& t )
  322. {
  323. return ringbuffer_base< T >::push( std::forward< T >( t ), data(), max_size );
  324. }
  325. template < typename Functor >
  326. bool consume_one( Functor&& f )
  327. {
  328. return ringbuffer_base< T >::consume_one( f, data(), max_size );
  329. }
  330. template < typename Functor >
  331. size_type consume_all( Functor&& f )
  332. {
  333. return ringbuffer_base< T >::consume_all( f, data(), max_size );
  334. }
  335. size_type push( T const* t, size_type size )
  336. {
  337. return ringbuffer_base< T >::push( t, size, data(), max_size );
  338. }
  339. template < size_type size >
  340. size_type push( T const ( &t )[ size ] )
  341. {
  342. return push( t, size );
  343. }
  344. template < typename ConstIterator >
  345. ConstIterator push( ConstIterator begin, ConstIterator end )
  346. {
  347. return ringbuffer_base< T >::push( begin, end, data(), max_size );
  348. }
  349. size_type pop( T* ret, size_type size )
  350. {
  351. return ringbuffer_base< T >::pop( ret, size, data(), max_size );
  352. }
  353. template < typename OutputIterator >
  354. size_type pop_to_output_iterator( OutputIterator it )
  355. {
  356. return ringbuffer_base< T >::pop_to_output_iterator( it, data(), max_size );
  357. }
  358. const T& front( void ) const
  359. {
  360. return ringbuffer_base< T >::front( data() );
  361. }
  362. T& front( void )
  363. {
  364. return ringbuffer_base< T >::front( data() );
  365. }
  366. };
  367. template < typename T, typename Alloc >
  368. class runtime_sized_ringbuffer : public ringbuffer_base< T >, private Alloc
  369. {
  370. typedef std::size_t size_type;
  371. size_type max_elements_;
  372. typedef std::allocator_traits< Alloc > allocator_traits;
  373. typedef typename allocator_traits::pointer pointer;
  374. pointer array_;
  375. protected:
  376. size_type max_number_of_elements() const
  377. {
  378. return max_elements_;
  379. }
  380. public:
  381. explicit runtime_sized_ringbuffer( size_type max_elements ) :
  382. max_elements_( max_elements + 1 )
  383. {
  384. Alloc& alloc = *this;
  385. array_ = allocator_traits::allocate( alloc, max_elements_ );
  386. }
  387. template < typename U >
  388. runtime_sized_ringbuffer( typename boost::allocator_rebind< Alloc, U >::type const& alloc, size_type max_elements ) :
  389. Alloc( alloc ),
  390. max_elements_( max_elements + 1 )
  391. {
  392. Alloc& allocator = *this;
  393. array_ = allocator_traits::allocate( allocator, max_elements_ );
  394. }
  395. runtime_sized_ringbuffer( Alloc const& alloc, size_type max_elements ) :
  396. Alloc( alloc ),
  397. max_elements_( max_elements + 1 )
  398. {
  399. Alloc& allocator = *this;
  400. array_ = allocator_traits::allocate( allocator, max_elements_ );
  401. }
  402. ~runtime_sized_ringbuffer( void )
  403. {
  404. // destroy all remaining items
  405. consume_all( []( const T& ) {} );
  406. Alloc& allocator = *this;
  407. allocator_traits::deallocate( allocator, array_, max_elements_ );
  408. }
  409. bool push( const T& t )
  410. {
  411. return ringbuffer_base< T >::push( t, &*array_, max_elements_ );
  412. }
  413. bool push( T&& t )
  414. {
  415. return ringbuffer_base< T >::push( std::forward< T >( t ), &*array_, max_elements_ );
  416. }
  417. template < typename Functor >
  418. bool consume_one( Functor&& f )
  419. {
  420. return ringbuffer_base< T >::consume_one( f, &*array_, max_elements_ );
  421. }
  422. template < typename Functor >
  423. size_type consume_all( Functor&& f )
  424. {
  425. return ringbuffer_base< T >::consume_all( f, &*array_, max_elements_ );
  426. }
  427. size_type push( T const* t, size_type size )
  428. {
  429. return ringbuffer_base< T >::push( t, size, &*array_, max_elements_ );
  430. }
  431. template < size_type size >
  432. size_type push( T const ( &t )[ size ] )
  433. {
  434. return push( t, size );
  435. }
  436. template < typename ConstIterator >
  437. ConstIterator push( ConstIterator begin, ConstIterator end )
  438. {
  439. return ringbuffer_base< T >::push( begin, end, &*array_, max_elements_ );
  440. }
  441. size_type pop( T* ret, size_type size )
  442. {
  443. return ringbuffer_base< T >::pop( ret, size, &*array_, max_elements_ );
  444. }
  445. template < typename OutputIterator >
  446. size_type pop_to_output_iterator( OutputIterator it )
  447. {
  448. return ringbuffer_base< T >::pop_to_output_iterator( it, &*array_, max_elements_ );
  449. }
  450. const T& front( void ) const
  451. {
  452. return ringbuffer_base< T >::front( &*array_ );
  453. }
  454. T& front( void )
  455. {
  456. return ringbuffer_base< T >::front( &*array_ );
  457. }
  458. };
  459. typedef parameter::parameters< boost::parameter::optional< tag::capacity >, boost::parameter::optional< tag::allocator > >
  460. ringbuffer_signature;
  461. template < typename T, typename... Options >
  462. struct make_ringbuffer
  463. {
  464. typedef typename ringbuffer_signature::bind< Options... >::type bound_args;
  465. typedef extract_capacity< bound_args > extract_capacity_t;
  466. static constexpr bool runtime_sized = !extract_capacity_t::has_capacity;
  467. static constexpr size_t capacity = extract_capacity_t::capacity;
  468. typedef extract_allocator< bound_args, T > extract_allocator_t;
  469. static constexpr bool has_allocator = extract_allocator_t::has_allocator;
  470. typedef typename extract_allocator_t::type allocator;
  471. static constexpr bool signature_is_valid = runtime_sized ? true : !has_allocator;
  472. BOOST_STATIC_ASSERT( signature_is_valid );
  473. typedef std::conditional_t< runtime_sized,
  474. runtime_sized_ringbuffer< T, allocator >,
  475. compile_time_sized_ringbuffer< T, capacity > >
  476. ringbuffer_type;
  477. };
  478. } /* namespace detail */
  479. /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
  480. *
  481. * \b Policies:
  482. * - \c boost::lockfree::capacity<>, optional <br>
  483. * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
  484. *
  485. * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
  486. * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is
  487. * configured to be sized at run-time
  488. *
  489. * \b Requirements:
  490. * - T must have a default constructor
  491. * - T must be copyable or movable
  492. * */
  493. template < typename T, typename... Options >
  494. #if !defined( BOOST_NO_CXX20_HDR_CONCEPTS )
  495. requires( std::is_default_constructible_v< T >, std::is_move_assignable_v< T > || std::is_copy_assignable_v< T > )
  496. #endif
  497. class spsc_queue : public detail::make_ringbuffer< T, Options... >::ringbuffer_type
  498. {
  499. private:
  500. #ifndef BOOST_DOXYGEN_INVOKED
  501. typedef typename detail::make_ringbuffer< T, Options... >::ringbuffer_type base_type;
  502. static constexpr bool runtime_sized = detail::make_ringbuffer< T, Options... >::runtime_sized;
  503. typedef typename detail::make_ringbuffer< T, Options... >::allocator allocator_arg;
  504. struct implementation_defined
  505. {
  506. typedef allocator_arg allocator;
  507. typedef std::size_t size_type;
  508. };
  509. #endif
  510. public:
  511. typedef T value_type;
  512. typedef typename implementation_defined::allocator allocator;
  513. typedef typename implementation_defined::size_type size_type;
  514. /** Constructs a spsc_queue
  515. *
  516. * \pre spsc_queue must be configured to be sized at compile-time
  517. */
  518. spsc_queue( void )
  519. #if !defined( BOOST_NO_CXX20_HDR_CONCEPTS )
  520. requires( !runtime_sized )
  521. #endif
  522. {
  523. // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
  524. // this function and this function may be compiled even when it isn't being used.
  525. BOOST_ASSERT( !runtime_sized );
  526. }
  527. /** Constructs a spsc_queue with a custom allocator
  528. *
  529. * \pre spsc_queue must be configured to be sized at compile-time
  530. *
  531. * \note This is just for API compatibility: an allocator isn't actually needed
  532. */
  533. template < typename U, typename Enabler = std::enable_if< !runtime_sized > >
  534. explicit spsc_queue( typename boost::allocator_rebind< allocator, U >::type const& )
  535. {}
  536. /** Constructs a spsc_queue with a custom allocator
  537. *
  538. * \pre spsc_queue must be configured to be sized at compile-time
  539. *
  540. * \note This is just for API compatibility: an allocator isn't actually needed
  541. */
  542. template < typename Enabler = std::enable_if< !runtime_sized > >
  543. explicit spsc_queue( allocator const& )
  544. {}
  545. /** Constructs a spsc_queue for element_count elements
  546. *
  547. * \pre spsc_queue must be configured to be sized at run-time
  548. */
  549. template < typename Enabler = std::enable_if< runtime_sized > >
  550. explicit spsc_queue( size_type element_count ) :
  551. base_type( element_count )
  552. {}
  553. /** Constructs a spsc_queue for element_count elements with a custom allocator
  554. *
  555. * \pre spsc_queue must be configured to be sized at run-time
  556. */
  557. template < typename U, typename Enabler = std::enable_if< runtime_sized > >
  558. spsc_queue( size_type element_count, typename boost::allocator_rebind< allocator, U >::type const& alloc ) :
  559. base_type( alloc, element_count )
  560. {}
  561. /** Constructs a spsc_queue for element_count elements with a custom allocator
  562. *
  563. * \pre spsc_queue must be configured to be sized at run-time
  564. */
  565. template < typename Enabler = std::enable_if< runtime_sized > >
  566. spsc_queue( size_type element_count, allocator_arg const& alloc ) :
  567. base_type( alloc, element_count )
  568. {}
  569. spsc_queue( const spsc_queue& ) = delete;
  570. spsc_queue& operator=( const spsc_queue& ) = delete;
  571. spsc_queue( spsc_queue&& ) = delete;
  572. spsc_queue& operator=( spsc_queue&& ) = delete;
  573. /** Pushes object t to the ringbuffer.
  574. *
  575. * \pre only one thread is allowed to push data to the spsc_queue
  576. * \post object will be pushed to the spsc_queue, unless it is full.
  577. * \return true, if the push operation is successful.
  578. *
  579. * \note Thread-safe and wait-free
  580. * */
  581. bool push( const T& t )
  582. {
  583. return base_type::push( t );
  584. }
  585. /// \copydoc boost::lockfree::spsc_queue::push(const T& t)
  586. bool push( T&& t )
  587. {
  588. return base_type::push( std::forward< T >( t ) );
  589. }
  590. /** Pops one object from ringbuffer.
  591. *
  592. * \pre only one thread is allowed to pop data from the spsc_queue
  593. * \post if ringbuffer is not empty, object will be discarded.
  594. * \return true, if the pop operation is successful, false if ringbuffer was empty.
  595. *
  596. * \note Thread-safe and wait-free
  597. */
  598. bool pop()
  599. {
  600. return consume_one( []( const T& ) {} );
  601. }
  602. /** Pops one object from ringbuffer.
  603. *
  604. * \pre only one thread is allowed to pop data from the spsc_queue
  605. * \post if ringbuffer is not empty, object will be copied to ret.
  606. * \return true, if the pop operation is successful, false if ringbuffer was empty.
  607. *
  608. * \note Thread-safe and wait-free
  609. */
  610. template < typename U, typename Enabler = std::enable_if< std::is_convertible< T, U >::value > >
  611. bool pop( U& ret )
  612. {
  613. return consume_one( [ & ]( T&& t ) {
  614. ret = std::forward< T >( t );
  615. } );
  616. }
  617. #if !defined( BOOST_NO_CXX17_HDR_OPTIONAL ) || defined( BOOST_DOXYGEN_INVOKED )
  618. /** Pops object from spsc_queue, returning a std::optional<>
  619. *
  620. * \returns `std::optional` with value if successful, `std::nullopt` if spsc_queue is empty.
  621. *
  622. * \note Thread-safe and non-blocking
  623. *
  624. * */
  625. std::optional< T > pop( uses_optional_t )
  626. {
  627. T to_dequeue;
  628. if ( pop( to_dequeue ) )
  629. return to_dequeue;
  630. else
  631. return std::nullopt;
  632. }
  633. /** Pops object from spsc_queue, returning a std::optional<>
  634. *
  635. * \pre type T must be convertible to U
  636. * \returns `std::optional` with value if successful, `std::nullopt` if spsc_queue is empty.
  637. *
  638. * \note Thread-safe and non-blocking
  639. *
  640. * */
  641. template < typename U >
  642. std::optional< U > pop( uses_optional_t )
  643. {
  644. U to_dequeue;
  645. if ( pop( to_dequeue ) )
  646. return to_dequeue;
  647. else
  648. return std::nullopt;
  649. }
  650. #endif
  651. /** Pushes as many objects from the array t as there is space.
  652. *
  653. * \pre only one thread is allowed to push data to the spsc_queue
  654. * \return number of pushed items
  655. *
  656. * \note Thread-safe and wait-free
  657. */
  658. size_type push( T const* t, size_type size )
  659. {
  660. return base_type::push( t, size );
  661. }
  662. /** Pushes as many objects from the array t as there is space available.
  663. *
  664. * \pre only one thread is allowed to push data to the spsc_queue
  665. * \return number of pushed items
  666. *
  667. * \note Thread-safe and wait-free
  668. */
  669. template < size_type size >
  670. size_type push( T const ( &t )[ size ] )
  671. {
  672. return push( t, size );
  673. }
  674. /** Pushes as many objects from the span t as there is space available.
  675. *
  676. * \pre only one thread is allowed to push data to the spsc_queue
  677. * \return number of pushed items
  678. *
  679. * \note Thread-safe and wait-free
  680. */
  681. template < std::size_t Extent >
  682. size_type push( boost::span< const T, Extent > t )
  683. {
  684. return push( t.data(), t.size() );
  685. }
  686. /** Pushes as many objects from the range [begin, end) as there is space.
  687. *
  688. * \pre only one thread is allowed to push data to the spsc_queue
  689. * \return iterator to the first element, which has not been pushed
  690. *
  691. * \note Thread-safe and wait-free
  692. */
  693. template < typename ConstIterator >
  694. ConstIterator push( ConstIterator begin, ConstIterator end )
  695. {
  696. return base_type::push( begin, end );
  697. }
  698. /** Pops a maximum of size objects from ringbuffer.
  699. *
  700. * \pre only one thread is allowed to pop data from the spsc_queue
  701. * \return number of popped items
  702. *
  703. * \note Thread-safe and wait-free
  704. * */
  705. size_type pop( T* ret, size_type size )
  706. {
  707. return base_type::pop( ret, size );
  708. }
  709. /** Pops a maximum of size objects from spsc_queue.
  710. *
  711. * \pre only one thread is allowed to pop data from the spsc_queue
  712. * \return number of popped items
  713. *
  714. * \note Thread-safe and wait-free
  715. * */
  716. template < size_type size >
  717. size_type pop( T ( &ret )[ size ] )
  718. {
  719. return pop( ret, size );
  720. }
  721. /** Pops objects to the output iterator it
  722. *
  723. * \pre only one thread is allowed to pop data from the spsc_queue
  724. * \return number of popped items
  725. *
  726. * \note Thread-safe and wait-free
  727. * */
  728. template < typename OutputIterator >
  729. typename std::enable_if< !std::is_convertible< T, OutputIterator >::value, size_type >::type pop( OutputIterator it )
  730. {
  731. return base_type::pop_to_output_iterator( it );
  732. }
  733. /** consumes one element via a functor
  734. *
  735. * pops one element from the queue and applies the functor on this object
  736. *
  737. * \returns true, if one element was consumed
  738. *
  739. * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
  740. * */
  741. template < typename Functor >
  742. bool consume_one( Functor&& f )
  743. {
  744. return base_type::consume_one( f );
  745. }
  746. /** consumes all elements via a functor
  747. *
  748. * sequentially pops all elements from the queue and applies the functor on each object
  749. *
  750. * \returns number of elements that are consumed
  751. *
  752. * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
  753. * */
  754. template < typename Functor >
  755. size_type consume_all( Functor&& f )
  756. {
  757. return base_type::consume_all( f );
  758. }
  759. /** get number of elements that are available for read
  760. *
  761. * \return number of available elements that can be popped from the spsc_queue
  762. *
  763. * \note Thread-safe and wait-free, should only be called from the consumer thread
  764. * */
  765. size_type read_available() const
  766. {
  767. return base_type::read_available( base_type::max_number_of_elements() );
  768. }
  769. /** get write space to write elements
  770. *
  771. * \return number of elements that can be pushed to the spsc_queue
  772. *
  773. * \note Thread-safe and wait-free, should only be called from the producer thread
  774. * */
  775. size_type write_available() const
  776. {
  777. return base_type::write_available( base_type::max_number_of_elements() );
  778. }
  779. /** get reference to element in the front of the queue
  780. *
  781. * Availability of front element can be checked using read_available().
  782. *
  783. * \pre only a consuming thread is allowed to check front element
  784. * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
  785. * \return reference to the first element in the queue
  786. *
  787. * \note Thread-safe and wait-free
  788. */
  789. const T& front() const
  790. {
  791. BOOST_ASSERT( read_available() > 0 );
  792. return base_type::front();
  793. }
  794. /// \copydoc boost::lockfree::spsc_queue::front() const
  795. T& front()
  796. {
  797. BOOST_ASSERT( read_available() > 0 );
  798. return base_type::front();
  799. }
  800. /** reset the ringbuffer
  801. *
  802. * \note Not thread-safe
  803. * */
  804. void reset( void )
  805. {
  806. if ( !std::is_trivially_destructible< T >::value ) {
  807. // make sure to call all destructors!
  808. consume_all( []( const T& ) {} );
  809. } else {
  810. base_type::write_index_.store( 0, memory_order_relaxed );
  811. base_type::read_index_.store( 0, memory_order_release );
  812. }
  813. }
  814. };
  815. }} // namespace boost::lockfree
  816. #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */