spsc_value.hpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. // lock-free single-producer/single-consumer value
  2. // implemented via a triple buffer
  3. //
  4. // Copyright (C) 2023-2024 Tim Blechmann
  5. //
  6. // Distributed under the Boost Software License, Version 1.0. (See
  7. // accompanying file LICENSE_1_0.txt or copy at
  8. // http://www.boost.org/LICENSE_1_0.txt)
  9. #ifndef BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED
  10. #define BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED
  11. #include <boost/config.hpp>
  12. #ifdef BOOST_HAS_PRAGMA_ONCE
  13. # pragma once
  14. #endif
  15. #include <boost/lockfree/detail/atomic.hpp>
  16. #include <boost/lockfree/detail/parameter.hpp>
  17. #include <boost/lockfree/detail/uses_optional.hpp>
  18. #include <boost/lockfree/lockfree_forward.hpp>
  19. #include <boost/lockfree/policies.hpp>
  20. #include <boost/parameter/optional.hpp>
  21. #include <boost/parameter/parameters.hpp>
  22. #include <array>
  23. #include <cstdint>
  24. #ifndef BOOST_DOXYGEN_INVOKED
  25. # ifdef BOOST_NO_CXX17_IF_CONSTEXPR
  26. # define ifconstexpr
  27. # else
  28. # define ifconstexpr constexpr
  29. # endif
  30. #endif
  31. namespace boost { namespace lockfree {
  32. /** The spcs_value provides a single-writer/single-reader value, implemented by a triple buffer
  33. *
  34. * \b Policies:
  35. * - \ref boost::lockfree::allow_multiple_reads, defaults to
  36. * \ref boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads<false>" \n
  37. * If multiple reads are allowed, a value written to the spsc_value can be read multiple times, but not moved out
  38. * of the instance. If multiple reads are not allowed, the class works as single-element queue that overwrites on
  39. * write
  40. *
  41. * */
  42. template < typename T, typename... Options >
  43. struct spsc_value
  44. {
  45. #ifndef BOOST_DOXYGEN_INVOKED
  46. private:
  47. using spsc_value_signature = parameter::parameters< boost::parameter::optional< tag::allow_multiple_reads > >;
  48. using bound_args = typename spsc_value_signature::bind< Options... >::type;
  49. static const bool allow_multiple_reads = detail::extract_allow_multiple_reads< bound_args >::value;
  50. public:
  51. #endif
  52. /** Construct a \ref boost::lockfree::spsc_value "spsc_value"
  53. *
  54. * If configured with \ref boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads<true>" it
  55. * is initialized to a default-constructed value
  56. *
  57. * */
  58. explicit spsc_value()
  59. {
  60. if ifconstexpr ( allow_multiple_reads ) {
  61. // populate initial reader
  62. m_write_index = tagged_index {
  63. 1,
  64. };
  65. m_available_index.store(
  66. tagged_index {
  67. 0,
  68. true,
  69. },
  70. std::memory_order_relaxed );
  71. m_buffer[ 0 ].value = {};
  72. }
  73. }
  74. /** Construct a \ref boost::lockfree::spsc_value "spsc_value", initialized to a value
  75. * */
  76. explicit spsc_value( T value ) :
  77. m_write_index {
  78. 1,
  79. },
  80. m_available_index {
  81. tagged_index {
  82. 0,
  83. true,
  84. },
  85. }
  86. {
  87. m_buffer[ 0 ].value = std::move( value );
  88. }
  89. /** Writes `value` to the \ref boost::lockfree::spsc_value "spsc_value"
  90. *
  91. * \pre only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value"
  92. * \post object will be written to the \ref boost::lockfree::spsc_value "spsc_value"
  93. *
  94. * \note Thread-safe and wait-free
  95. * */
  96. void write( T&& value )
  97. {
  98. m_buffer[ m_write_index.index() ].value = std::forward< T >( value );
  99. swap_write_buffer();
  100. }
  101. /// \copydoc boost::lockfree::spsc_value::write(T&& value)
  102. void write( const T& value )
  103. {
  104. m_buffer[ m_write_index.index() ].value = value;
  105. swap_write_buffer();
  106. }
  107. /** Reads content of the \ref boost::lockfree::spsc_value "spsc_value"
  108. *
  109. * \pre only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value"
  110. * \post if read operation is successful, object will be copied to `ret`.
  111. * \returns `true`, if the read operation is successful, false if the \ref boost::lockfree::spsc_value "spsc_value" is
  112. * configured with \ref boost::lockfree::allow_multiple_reads
  113. * "boost::lockfree::allow_multiple_reads<false>" and no value is available for reading
  114. *
  115. * \note Thread-safe and wait-free
  116. * */
  117. bool read( T& ret )
  118. {
  119. #ifndef BOOST_NO_CXX17_IF_CONSTEXPR
  120. bool read_index_updated = swap_read_buffer();
  121. if constexpr ( allow_multiple_reads ) {
  122. ret = m_buffer[ m_read_index.index() ].value;
  123. } else {
  124. if ( !read_index_updated )
  125. return false;
  126. ret = std::move( m_buffer[ m_read_index.index() ].value );
  127. }
  128. return true;
  129. #else
  130. return read_helper( ret, std::integral_constant< bool, allow_multiple_reads > {} );
  131. #endif
  132. }
  133. #if !defined( BOOST_NO_CXX17_HDR_OPTIONAL ) || defined( BOOST_DOXYGEN_INVOKED )
  134. /** Reads content of the \ref boost::lockfree::spsc_value "spsc_value", returning an optional
  135. *
  136. * \pre only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value"
  137. * \returns `std::optional` with value if successful, `std::nullopt` if spsc_value is configured with \ref
  138. * boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads<false>" and no value is
  139. * available for reading
  140. *
  141. * \note Thread-safe and wait-free
  142. * */
  143. std::optional< T > read( uses_optional_t )
  144. {
  145. T to_dequeue;
  146. if ( read( to_dequeue ) )
  147. return to_dequeue;
  148. else
  149. return std::nullopt;
  150. }
  151. #endif
  152. /** consumes value via a functor
  153. *
  154. * reads element from the spsc_value and applies the functor on this object
  155. *
  156. * \returns `true`, if element was consumed
  157. *
  158. * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
  159. * */
  160. template < typename Functor >
  161. bool consume( Functor&& f )
  162. {
  163. #ifndef BOOST_NO_CXX17_IF_CONSTEXPR
  164. bool read_index_updated = swap_read_buffer();
  165. if constexpr ( allow_multiple_reads ) {
  166. f( m_buffer[ m_read_index.index() ].value );
  167. } else {
  168. if ( !read_index_updated )
  169. return false;
  170. f( std::move( m_buffer[ m_read_index.index() ].value ) );
  171. }
  172. return true;
  173. #else
  174. return consume_helper( f, std::integral_constant< bool, allow_multiple_reads > {} );
  175. #endif
  176. }
  177. private:
  178. #ifndef BOOST_DOXYGEN_INVOKED
  179. using allow_multiple_reads_true = std::true_type;
  180. using allow_multiple_reads_false = std::false_type;
  181. # ifdef BOOST_NO_CXX17_IF_CONSTEXPR
  182. template < typename Functor >
  183. bool consume_helper( Functor&& f, allow_multiple_reads_true = {} )
  184. {
  185. swap_read_buffer();
  186. f( m_buffer[ m_read_index.index() ].value );
  187. return true;
  188. }
  189. template < typename Functor >
  190. bool consume_helper( Functor&& f, allow_multiple_reads_false = {} )
  191. {
  192. bool read_index_updated = swap_read_buffer();
  193. if ( !read_index_updated )
  194. return false;
  195. f( std::move( m_buffer[ m_read_index.index() ].value ) );
  196. return true;
  197. }
  198. template < typename TT >
  199. bool read_helper( TT& ret, allow_multiple_reads_true = {} )
  200. {
  201. swap_read_buffer();
  202. ret = m_buffer[ m_read_index.index() ].value;
  203. return true;
  204. }
  205. template < typename TT >
  206. bool read_helper( TT& ret, allow_multiple_reads_false = {} )
  207. {
  208. bool read_index_updated = swap_read_buffer();
  209. if ( !read_index_updated )
  210. return false;
  211. ret = std::move( m_buffer[ m_read_index.index() ].value );
  212. return true;
  213. }
  214. # endif
  215. void swap_write_buffer()
  216. {
  217. tagged_index old_avail_index = m_available_index.exchange(
  218. tagged_index {
  219. m_write_index.index(),
  220. true,
  221. },
  222. std::memory_order_release );
  223. m_write_index.set_tag_and_index( old_avail_index.index(), false );
  224. }
  225. bool swap_read_buffer()
  226. {
  227. constexpr bool use_compare_exchange = false; // exchange is most likely faster
  228. if ifconstexpr ( use_compare_exchange ) {
  229. tagged_index new_avail_index = m_read_index;
  230. tagged_index current_avail_index_with_tag = tagged_index {
  231. m_available_index.load( std::memory_order_acquire ).index(),
  232. true,
  233. };
  234. if ( m_available_index.compare_exchange_strong( current_avail_index_with_tag,
  235. new_avail_index,
  236. std::memory_order_acquire ) ) {
  237. m_read_index = tagged_index( current_avail_index_with_tag.index(), false );
  238. return true;
  239. } else
  240. return false;
  241. } else {
  242. tagged_index new_avail_index = m_read_index;
  243. tagged_index current_avail_index = m_available_index.load( std::memory_order_acquire );
  244. if ( !current_avail_index.is_consumable() )
  245. return false;
  246. current_avail_index = m_available_index.exchange( new_avail_index, std::memory_order_acquire );
  247. m_read_index = tagged_index {
  248. current_avail_index.index(),
  249. false,
  250. };
  251. return true;
  252. }
  253. }
  254. struct tagged_index
  255. {
  256. tagged_index( uint8_t index, bool tag = false )
  257. {
  258. set_tag_and_index( index, tag );
  259. }
  260. uint8_t index() const
  261. {
  262. return byte & 0x07;
  263. }
  264. bool is_consumable() const
  265. {
  266. return byte & 0x08;
  267. }
  268. void set_tag_and_index( uint8_t index, bool tag )
  269. {
  270. byte = index | ( tag ? 0x08 : 0x00 );
  271. }
  272. uint8_t byte;
  273. };
  274. static constexpr size_t cacheline_bytes = detail::cacheline_bytes;
  275. struct alignas( cacheline_bytes ) cache_aligned_value
  276. {
  277. T value;
  278. };
  279. std::array< cache_aligned_value, 3 > m_buffer;
  280. alignas( cacheline_bytes ) tagged_index m_write_index { 0 };
  281. alignas( cacheline_bytes ) detail::atomic< tagged_index > m_available_index { 1 };
  282. alignas( cacheline_bytes ) tagged_index m_read_index { 2 };
  283. #endif
  284. };
  285. }} // namespace boost::lockfree
  286. #undef ifconstexpr
  287. #endif /* BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED */