buffered_channel.hpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. // Copyright Oliver Kowalke 2016.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
  7. #define BOOST_FIBERS_BUFFERED_CHANNEL_H
  8. #include <atomic>
  9. #include <chrono>
  10. #include <cstddef>
  11. #include <cstdint>
  12. #include <memory>
  13. #include <type_traits>
  14. #include <boost/config.hpp>
  15. #include <boost/fiber/channel_op_status.hpp>
  16. #include <boost/fiber/context.hpp>
  17. #include <boost/fiber/detail/config.hpp>
  18. #include <boost/fiber/detail/convert.hpp>
  19. #include <boost/fiber/detail/spinlock.hpp>
  20. #include <boost/fiber/exceptions.hpp>
  21. #ifdef BOOST_HAS_ABI_HEADERS
  22. # include BOOST_ABI_PREFIX
  23. #endif
  24. namespace boost {
  25. namespace fibers {
  26. template< typename T >
  27. class buffered_channel {
  28. public:
  29. typedef typename std::remove_reference< T >::type value_type;
  30. private:
  31. typedef context::wait_queue_t wait_queue_type;
  32. typedef value_type slot_type;
  33. mutable detail::spinlock splk_{};
  34. wait_queue_type waiting_producers_{};
  35. wait_queue_type waiting_consumers_{};
  36. slot_type * slots_;
  37. std::size_t pidx_{ 0 };
  38. std::size_t cidx_{ 0 };
  39. std::size_t capacity_;
  40. bool closed_{ false };
  41. bool is_full_() const noexcept {
  42. return cidx_ == ((pidx_ + 1) % capacity_);
  43. }
  44. bool is_empty_() const noexcept {
  45. return cidx_ == pidx_;
  46. }
  47. bool is_closed_() const noexcept {
  48. return closed_;
  49. }
  50. public:
  51. explicit buffered_channel( std::size_t capacity) :
  52. capacity_{ capacity } {
  53. if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
  54. throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
  55. "boost fiber: buffer capacity is invalid" };
  56. }
  57. slots_ = new slot_type[capacity_];
  58. }
  59. ~buffered_channel() {
  60. close();
  61. delete [] slots_;
  62. }
  63. buffered_channel( buffered_channel const&) = delete;
  64. buffered_channel & operator=( buffered_channel const&) = delete;
  65. bool is_closed() const noexcept {
  66. detail::spinlock_lock lk{ splk_ };
  67. return is_closed_();
  68. }
  69. void close() noexcept {
  70. context * active_ctx = context::active();
  71. detail::spinlock_lock lk{ splk_ };
  72. closed_ = true;
  73. // notify all waiting producers
  74. while ( ! waiting_producers_.empty() ) {
  75. context * producer_ctx = & waiting_producers_.front();
  76. waiting_producers_.pop_front();
  77. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  78. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  79. // notify context
  80. active_ctx->schedule( producer_ctx);
  81. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  82. // no timed-wait op.
  83. // notify context
  84. active_ctx->schedule( producer_ctx);
  85. }
  86. }
  87. // notify all waiting consumers
  88. while ( ! waiting_consumers_.empty() ) {
  89. context * consumer_ctx = & waiting_consumers_.front();
  90. waiting_consumers_.pop_front();
  91. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  92. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  93. // notify context
  94. active_ctx->schedule( consumer_ctx);
  95. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  96. // no timed-wait op.
  97. // notify context
  98. active_ctx->schedule( consumer_ctx);
  99. }
  100. }
  101. }
  102. channel_op_status try_push( value_type const& value) {
  103. context * active_ctx = context::active();
  104. detail::spinlock_lock lk{ splk_ };
  105. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  106. return channel_op_status::closed;
  107. } else if ( is_full_() ) {
  108. return channel_op_status::full;
  109. } else {
  110. slots_[pidx_] = value;
  111. pidx_ = (pidx_ + 1) % capacity_;
  112. // notify one waiting consumer
  113. while ( ! waiting_consumers_.empty() ) {
  114. context * consumer_ctx = & waiting_consumers_.front();
  115. waiting_consumers_.pop_front();
  116. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  117. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  118. lk.unlock();
  119. // notify context
  120. active_ctx->schedule( consumer_ctx);
  121. break;
  122. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  123. lk.unlock();
  124. // no timed-wait op.
  125. // notify context
  126. active_ctx->schedule( consumer_ctx);
  127. break;
  128. }
  129. }
  130. return channel_op_status::success;
  131. }
  132. }
  133. channel_op_status try_push( value_type && value) {
  134. context * active_ctx = context::active();
  135. detail::spinlock_lock lk{ splk_ };
  136. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  137. return channel_op_status::closed;
  138. } else if ( is_full_() ) {
  139. return channel_op_status::full;
  140. } else {
  141. slots_[pidx_] = std::move( value);
  142. pidx_ = (pidx_ + 1) % capacity_;
  143. // notify one waiting consumer
  144. while ( ! waiting_consumers_.empty() ) {
  145. context * consumer_ctx = & waiting_consumers_.front();
  146. waiting_consumers_.pop_front();
  147. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  148. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  149. lk.unlock();
  150. // notify context
  151. active_ctx->schedule( consumer_ctx);
  152. break;
  153. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  154. lk.unlock();
  155. // no timed-wait op.
  156. // notify context
  157. active_ctx->schedule( consumer_ctx);
  158. break;
  159. }
  160. }
  161. return channel_op_status::success;
  162. }
  163. }
  164. channel_op_status push( value_type const& value) {
  165. context * active_ctx = context::active();
  166. for (;;) {
  167. detail::spinlock_lock lk{ splk_ };
  168. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  169. return channel_op_status::closed;
  170. } else if ( is_full_() ) {
  171. active_ctx->wait_link( waiting_producers_);
  172. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  173. // suspend this producer
  174. active_ctx->suspend( lk);
  175. } else {
  176. slots_[pidx_] = value;
  177. pidx_ = (pidx_ + 1) % capacity_;
  178. // notify one waiting consumer
  179. while ( ! waiting_consumers_.empty() ) {
  180. context * consumer_ctx = & waiting_consumers_.front();
  181. waiting_consumers_.pop_front();
  182. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  183. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  184. lk.unlock();
  185. // notify context
  186. active_ctx->schedule( consumer_ctx);
  187. break;
  188. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  189. lk.unlock();
  190. // no timed-wait op.
  191. // notify context
  192. active_ctx->schedule( consumer_ctx);
  193. break;
  194. }
  195. }
  196. return channel_op_status::success;
  197. }
  198. }
  199. }
  200. channel_op_status push( value_type && value) {
  201. context * active_ctx = context::active();
  202. for (;;) {
  203. detail::spinlock_lock lk{ splk_ };
  204. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  205. return channel_op_status::closed;
  206. } else if ( is_full_() ) {
  207. active_ctx->wait_link( waiting_producers_);
  208. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  209. // suspend this producer
  210. active_ctx->suspend( lk);
  211. } else {
  212. slots_[pidx_] = std::move( value);
  213. pidx_ = (pidx_ + 1) % capacity_;
  214. // notify one waiting consumer
  215. while ( ! waiting_consumers_.empty() ) {
  216. context * consumer_ctx = & waiting_consumers_.front();
  217. waiting_consumers_.pop_front();
  218. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  219. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  220. lk.unlock();
  221. // notify context
  222. active_ctx->schedule( consumer_ctx);
  223. break;
  224. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  225. lk.unlock();
  226. // no timed-wait op.
  227. // notify context
  228. active_ctx->schedule( consumer_ctx);
  229. break;
  230. }
  231. }
  232. return channel_op_status::success;
  233. }
  234. }
  235. }
  236. template< typename Rep, typename Period >
  237. channel_op_status push_wait_for( value_type const& value,
  238. std::chrono::duration< Rep, Period > const& timeout_duration) {
  239. return push_wait_until( value,
  240. std::chrono::steady_clock::now() + timeout_duration);
  241. }
  242. template< typename Rep, typename Period >
  243. channel_op_status push_wait_for( value_type && value,
  244. std::chrono::duration< Rep, Period > const& timeout_duration) {
  245. return push_wait_until( std::forward< value_type >( value),
  246. std::chrono::steady_clock::now() + timeout_duration);
  247. }
  248. template< typename Clock, typename Duration >
  249. channel_op_status push_wait_until( value_type const& value,
  250. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  251. context * active_ctx = context::active();
  252. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  253. for (;;) {
  254. detail::spinlock_lock lk{ splk_ };
  255. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  256. return channel_op_status::closed;
  257. } else if ( is_full_() ) {
  258. active_ctx->wait_link( waiting_producers_);
  259. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  260. // suspend this producer
  261. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  262. // relock local lk
  263. lk.lock();
  264. // remove from waiting-queue
  265. waiting_producers_.remove( * active_ctx);
  266. return channel_op_status::timeout;
  267. }
  268. } else {
  269. slots_[pidx_] = value;
  270. pidx_ = (pidx_ + 1) % capacity_;
  271. // notify one waiting consumer
  272. while ( ! waiting_consumers_.empty() ) {
  273. context * consumer_ctx = & waiting_consumers_.front();
  274. waiting_consumers_.pop_front();
  275. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  276. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  277. lk.unlock();
  278. // notify context
  279. active_ctx->schedule( consumer_ctx);
  280. break;
  281. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  282. lk.unlock();
  283. // no timed-wait op.
  284. // notify context
  285. active_ctx->schedule( consumer_ctx);
  286. break;
  287. }
  288. }
  289. return channel_op_status::success;
  290. }
  291. }
  292. }
  293. template< typename Clock, typename Duration >
  294. channel_op_status push_wait_until( value_type && value,
  295. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  296. context * active_ctx = context::active();
  297. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  298. for (;;) {
  299. detail::spinlock_lock lk{ splk_ };
  300. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  301. return channel_op_status::closed;
  302. } else if ( is_full_() ) {
  303. active_ctx->wait_link( waiting_producers_);
  304. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  305. // suspend this producer
  306. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  307. // relock local lk
  308. lk.lock();
  309. // remove from waiting-queue
  310. waiting_producers_.remove( * active_ctx);
  311. return channel_op_status::timeout;
  312. }
  313. } else {
  314. slots_[pidx_] = std::move( value);
  315. pidx_ = (pidx_ + 1) % capacity_;
  316. // notify one waiting consumer
  317. while ( ! waiting_consumers_.empty() ) {
  318. context * consumer_ctx = & waiting_consumers_.front();
  319. waiting_consumers_.pop_front();
  320. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  321. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  322. lk.unlock();
  323. // notify context
  324. active_ctx->schedule( consumer_ctx);
  325. break;
  326. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  327. lk.unlock();
  328. // no timed-wait op.
  329. // notify context
  330. active_ctx->schedule( consumer_ctx);
  331. break;
  332. }
  333. }
  334. return channel_op_status::success;
  335. }
  336. }
  337. }
  338. channel_op_status try_pop( value_type & value) {
  339. context * active_ctx = context::active();
  340. detail::spinlock_lock lk{ splk_ };
  341. if ( is_empty_() ) {
  342. return is_closed_()
  343. ? channel_op_status::closed
  344. : channel_op_status::empty;
  345. } else {
  346. value = std::move( slots_[cidx_]);
  347. cidx_ = (cidx_ + 1) % capacity_;
  348. // notify one waiting producer
  349. while ( ! waiting_producers_.empty() ) {
  350. context * producer_ctx = & waiting_producers_.front();
  351. waiting_producers_.pop_front();
  352. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  353. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  354. lk.unlock();
  355. // notify context
  356. active_ctx->schedule( producer_ctx);
  357. break;
  358. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  359. lk.unlock();
  360. // no timed-wait op.
  361. // notify context
  362. active_ctx->schedule( producer_ctx);
  363. break;
  364. }
  365. }
  366. return channel_op_status::success;
  367. }
  368. }
  369. channel_op_status pop( value_type & value) {
  370. context * active_ctx = context::active();
  371. for (;;) {
  372. detail::spinlock_lock lk{ splk_ };
  373. if ( is_empty_() ) {
  374. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  375. return channel_op_status::closed;
  376. } else {
  377. active_ctx->wait_link( waiting_consumers_);
  378. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  379. // suspend this consumer
  380. active_ctx->suspend( lk);
  381. }
  382. } else {
  383. value = std::move( slots_[cidx_]);
  384. cidx_ = (cidx_ + 1) % capacity_;
  385. // notify one waiting producer
  386. while ( ! waiting_producers_.empty() ) {
  387. context * producer_ctx = & waiting_producers_.front();
  388. waiting_producers_.pop_front();
  389. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  390. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  391. lk.unlock();
  392. // notify context
  393. active_ctx->schedule( producer_ctx);
  394. break;
  395. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  396. lk.unlock();
  397. // no timed-wait op.
  398. // notify context
  399. active_ctx->schedule( producer_ctx);
  400. break;
  401. }
  402. }
  403. return channel_op_status::success;
  404. }
  405. }
  406. }
  407. value_type value_pop() {
  408. context * active_ctx = context::active();
  409. for (;;) {
  410. detail::spinlock_lock lk{ splk_ };
  411. if ( is_empty_() ) {
  412. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  413. throw fiber_error{
  414. std::make_error_code( std::errc::operation_not_permitted),
  415. "boost fiber: channel is closed" };
  416. } else {
  417. active_ctx->wait_link( waiting_consumers_);
  418. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  419. // suspend this consumer
  420. active_ctx->suspend( lk);
  421. }
  422. } else {
  423. value_type value = std::move( slots_[cidx_]);
  424. cidx_ = (cidx_ + 1) % capacity_;
  425. // notify one waiting producer
  426. while ( ! waiting_producers_.empty() ) {
  427. context * producer_ctx = & waiting_producers_.front();
  428. waiting_producers_.pop_front();
  429. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  430. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  431. lk.unlock();
  432. // notify context
  433. active_ctx->schedule( producer_ctx);
  434. break;
  435. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  436. lk.unlock();
  437. // no timed-wait op.
  438. // notify context
  439. active_ctx->schedule( producer_ctx);
  440. break;
  441. }
  442. }
  443. return std::move( value);
  444. }
  445. }
  446. }
  447. template< typename Rep, typename Period >
  448. channel_op_status pop_wait_for( value_type & value,
  449. std::chrono::duration< Rep, Period > const& timeout_duration) {
  450. return pop_wait_until( value,
  451. std::chrono::steady_clock::now() + timeout_duration);
  452. }
  453. template< typename Clock, typename Duration >
  454. channel_op_status pop_wait_until( value_type & value,
  455. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  456. context * active_ctx = context::active();
  457. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  458. for (;;) {
  459. detail::spinlock_lock lk{ splk_ };
  460. if ( is_empty_() ) {
  461. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  462. return channel_op_status::closed;
  463. } else {
  464. active_ctx->wait_link( waiting_consumers_);
  465. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  466. // suspend this consumer
  467. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  468. // relock local lk
  469. lk.lock();
  470. // remove from waiting-queue
  471. waiting_consumers_.remove( * active_ctx);
  472. return channel_op_status::timeout;
  473. }
  474. }
  475. } else {
  476. value = std::move( slots_[cidx_]);
  477. cidx_ = (cidx_ + 1) % capacity_;
  478. // notify one waiting producer
  479. while ( ! waiting_producers_.empty() ) {
  480. context * producer_ctx = & waiting_producers_.front();
  481. waiting_producers_.pop_front();
  482. std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
  483. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  484. lk.unlock();
  485. // notify context
  486. active_ctx->schedule( producer_ctx);
  487. break;
  488. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  489. lk.unlock();
  490. // no timed-wait op.
  491. // notify context
  492. active_ctx->schedule( producer_ctx);
  493. break;
  494. }
  495. }
  496. return channel_op_status::success;
  497. }
  498. }
  499. }
  500. class iterator {
  501. private:
  502. typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
  503. buffered_channel * chan_{ nullptr };
  504. storage_type storage_;
  505. void increment_() {
  506. BOOST_ASSERT( nullptr != chan_);
  507. try {
  508. ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
  509. } catch ( fiber_error const&) {
  510. chan_ = nullptr;
  511. }
  512. }
  513. public:
  514. typedef std::input_iterator_tag iterator_category;
  515. typedef std::ptrdiff_t difference_type;
  516. typedef value_type * pointer;
  517. typedef value_type & reference;
  518. typedef pointer pointer_t;
  519. typedef reference reference_t;
  520. iterator() noexcept = default;
  521. explicit iterator( buffered_channel< T > * chan) noexcept :
  522. chan_{ chan } {
  523. increment_();
  524. }
  525. iterator( iterator const& other) noexcept :
  526. chan_{ other.chan_ } {
  527. }
  528. iterator & operator=( iterator const& other) noexcept {
  529. if ( BOOST_LIKELY( this != & other) ) {
  530. chan_ = other.chan_;
  531. }
  532. return * this;
  533. }
  534. bool operator==( iterator const& other) const noexcept {
  535. return other.chan_ == chan_;
  536. }
  537. bool operator!=( iterator const& other) const noexcept {
  538. return other.chan_ != chan_;
  539. }
  540. iterator & operator++() {
  541. increment_();
  542. return * this;
  543. }
  544. iterator operator++( int) = delete;
  545. reference_t operator*() noexcept {
  546. return * reinterpret_cast< value_type * >( std::addressof( storage_) );
  547. }
  548. pointer_t operator->() noexcept {
  549. return reinterpret_cast< value_type * >( std::addressof( storage_) );
  550. }
  551. };
  552. friend class iterator;
  553. };
  554. template< typename T >
  555. typename buffered_channel< T >::iterator
  556. begin( buffered_channel< T > & chan) {
  557. return typename buffered_channel< T >::iterator( & chan);
  558. }
  559. template< typename T >
  560. typename buffered_channel< T >::iterator
  561. end( buffered_channel< T > &) {
  562. return typename buffered_channel< T >::iterator();
  563. }
  564. }}
  565. #ifdef BOOST_HAS_ABI_HEADERS
  566. # include BOOST_ABI_SUFFIX
  567. #endif
  568. #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H