channel.hpp 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_CHANNEL_HPP
  8. #define BOOST_COBALT_CHANNEL_HPP
  9. #include <boost/cobalt/this_thread.hpp>
  10. #include <boost/cobalt/unique_handle.hpp>
  11. #include <boost/cobalt/detail/util.hpp>
  12. #include <boost/asio/cancellation_signal.hpp>
  13. #include <boost/asio/cancellation_type.hpp>
  14. #include <boost/circular_buffer.hpp>
  15. #include <boost/config.hpp>
  16. #include <boost/intrusive/list.hpp>
  17. #include <boost/variant2/variant.hpp>
  18. #include <optional>
  19. namespace boost::cobalt
  20. {
  21. template<typename T>
  22. struct channel_reader;
  23. // tag::outline[]
  24. template<typename T>
  25. struct channel
  26. {
  27. // end::outline[]
  28. #if defined(BOOST_COBALT_NO_PMR)
  29. explicit
  30. channel(std::size_t limit = 0u,
  31. executor executor = this_thread::get_executor());
  32. #else
  33. // tag::outline[]
  34. // create a channel with a buffer limit, executor & resource.
  35. explicit
  36. channel(std::size_t limit = 0u,
  37. executor executor = this_thread::get_executor(),
  38. pmr::memory_resource * resource = this_thread::get_default_resource());
  39. // end::outline[]
  40. #endif
  41. // tag::outline[]
  42. // not movable.
  43. channel(channel && rhs) noexcept = delete;
  44. channel & operator=(channel && lhs) noexcept = delete;
  45. using executor_type = executor;
  46. const executor_type & get_executor();
  47. // Closes the channel
  48. ~channel();
  49. bool is_open() const;
  50. // close the operation, will cancel all pending ops, too
  51. void close();
  52. // end::outline[]
  53. private:
  54. #if !defined(BOOST_COBALT_NO_PMR)
  55. boost::circular_buffer<T, pmr::polymorphic_allocator<T>> buffer_;
  56. #else
  57. boost::circular_buffer<T> buffer_;
  58. #endif
  59. executor_type executor_;
  60. bool is_closed_{false};
  61. struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  62. {
  63. channel * chn;
  64. boost::source_location loc;
  65. bool cancelled = false;
  66. std::optional<T> direct{};
  67. asio::cancellation_slot cancel_slot{};
  68. unique_handle<void> awaited_from{nullptr};
  69. void (*begin_transaction)(void*) = nullptr;
  70. void transactional_unlink()
  71. {
  72. if (begin_transaction)
  73. begin_transaction(awaited_from.get());
  74. this->unlink();
  75. }
  76. void interrupt_await()
  77. {
  78. if (!direct)
  79. {
  80. this->cancelled = true;
  81. if (this->awaited_from)
  82. this->awaited_from.release().resume();
  83. }
  84. }
  85. struct cancel_impl;
  86. bool await_ready() const noexcept{ return !chn->buffer_.empty() || chn->is_closed_; }
  87. template<typename Promise>
  88. BOOST_COBALT_MSVC_NOINLINE
  89. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  90. T await_resume();
  91. std::tuple<system::error_code, T> await_resume(const struct as_tuple_tag & );
  92. system::result<T> await_resume(const struct as_result_tag &);
  93. explicit operator bool() const {return chn && chn->is_open();}
  94. };
  95. struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  96. {
  97. channel * chn;
  98. using ref_t = std::conditional_t<
  99. std::is_copy_constructible_v<T>,
  100. variant2::variant<T*, const T*>,
  101. T*>;
  102. ref_t ref;
  103. boost::source_location loc;
  104. bool cancelled = false, direct = false, closed = !chn->is_open();
  105. asio::cancellation_slot cancel_slot{};
  106. unique_handle<void> awaited_from{nullptr};
  107. void (*begin_transaction)(void*) = nullptr;
  108. void transactional_unlink()
  109. {
  110. if (begin_transaction)
  111. begin_transaction(awaited_from.get());
  112. this->unlink();
  113. }
  114. void interrupt_await()
  115. {
  116. if (!direct)
  117. {
  118. this->cancelled = true;
  119. if (this->awaited_from)
  120. this->awaited_from.release().resume();
  121. }
  122. }
  123. struct cancel_impl;
  124. bool await_ready() const noexcept { return !chn->buffer_.full() || chn->is_closed_; }
  125. template<typename Promise>
  126. BOOST_COBALT_MSVC_NOINLINE
  127. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  128. void await_resume();
  129. std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
  130. system::result<void> await_resume(const struct as_result_tag &);
  131. explicit operator bool() const {return chn && chn->is_open();}
  132. };
  133. boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_;
  134. boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
  135. public:
  136. BOOST_COBALT_MSVC_NOINLINE
  137. read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; }
  138. BOOST_COBALT_MSVC_NOINLINE
  139. write_op write(const T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  140. requires std::is_copy_constructible_v<T>
  141. {
  142. return write_op{{}, this, &value, loc};
  143. }
  144. BOOST_COBALT_MSVC_NOINLINE
  145. write_op write(const T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  146. requires std::is_copy_constructible_v<T>
  147. {
  148. return write_op{{}, this, &value, loc};
  149. }
  150. BOOST_COBALT_MSVC_NOINLINE
  151. write_op write( T && value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  152. {
  153. return write_op{{}, this, &value, loc};
  154. }
  155. BOOST_COBALT_MSVC_NOINLINE
  156. write_op write( T & value, const boost::source_location & loc = BOOST_CURRENT_LOCATION)
  157. {
  158. return write_op{{}, this, &static_cast<const T&>(value), loc};
  159. }
  160. /*
  161. // tag::outline[]
  162. // an awaitable that yields T
  163. using __read_op__ = __unspecified__;
  164. // an awaitable that yields void
  165. using __write_op__ = __unspecified__;
  166. // read a value to a channel
  167. __read_op__ read();
  168. // write a value to the channel
  169. __write_op__ write(const T && value);
  170. __write_op__ write(const T & value);
  171. __write_op__ write( T && value);
  172. __write_op__ write( T & value);
  173. // write a value to the channel if T is void
  174. __write_op__ write(); // end::outline[]
  175. */
  176. // tag::outline[]
  177. };
  178. // end::outline[]
  179. template<>
  180. struct channel<void>
  181. {
  182. explicit
  183. channel(std::size_t limit = 0u,
  184. executor executor = this_thread::get_executor())
  185. : limit_(limit), executor_(executor) {}
  186. channel(channel &&) noexcept = delete;
  187. channel & operator=(channel && lhs) noexcept = delete;
  188. using executor_type = executor;
  189. const executor_type & get_executor() {return executor_;}
  190. BOOST_COBALT_DECL ~channel();
  191. bool is_open() const {return !is_closed_;}
  192. BOOST_COBALT_DECL void close();
  193. private:
  194. std::size_t limit_;
  195. std::size_t n_{0u};
  196. executor_type executor_;
  197. bool is_closed_{false};
  198. struct read_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  199. {
  200. channel * chn;
  201. boost::source_location loc;
  202. bool cancelled = false, direct = false;
  203. asio::cancellation_slot cancel_slot{};
  204. unique_handle<void> awaited_from{nullptr};
  205. void (*begin_transaction)(void*) = nullptr;
  206. void transactional_unlink()
  207. {
  208. if (begin_transaction)
  209. begin_transaction(awaited_from.get());
  210. this->unlink();
  211. }
  212. void interrupt_await()
  213. {
  214. if (!direct)
  215. {
  216. this->cancelled = true;
  217. if (this->awaited_from)
  218. this->awaited_from.release().resume();
  219. }
  220. }
  221. struct cancel_impl;
  222. bool await_ready() const noexcept
  223. {
  224. return (chn->n_ > 0) || chn->is_closed_;
  225. }
  226. template<typename Promise>
  227. BOOST_COBALT_MSVC_NOINLINE
  228. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  229. BOOST_COBALT_DECL void await_resume();
  230. BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
  231. BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
  232. explicit operator bool() const {return chn && chn->is_open();}
  233. };
  234. struct write_op : intrusive::list_base_hook<intrusive::link_mode<intrusive::auto_unlink> >
  235. {
  236. channel * chn;
  237. boost::source_location loc;
  238. bool cancelled = false, direct = false, closed = !chn->is_open();
  239. asio::cancellation_slot cancel_slot{};
  240. unique_handle<void> awaited_from{nullptr};
  241. void (*begin_transaction)(void*) = nullptr;
  242. void transactional_unlink()
  243. {
  244. if (begin_transaction)
  245. begin_transaction(awaited_from.get());
  246. this->unlink();
  247. }
  248. void interrupt_await()
  249. {
  250. if (!direct)
  251. {
  252. cancelled = true;
  253. if (this->awaited_from)
  254. this->awaited_from.release().resume();
  255. }
  256. }
  257. struct cancel_impl;
  258. bool await_ready() const noexcept
  259. {
  260. return chn->n_ < chn->limit_ || chn->is_closed_;
  261. }
  262. template<typename Promise>
  263. BOOST_COBALT_MSVC_NOINLINE
  264. std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
  265. BOOST_COBALT_DECL void await_resume();
  266. BOOST_COBALT_DECL std::tuple<system::error_code> await_resume(const struct as_tuple_tag & );
  267. BOOST_COBALT_DECL system::result<void> await_resume(const struct as_result_tag &);
  268. explicit operator bool() const {return chn && chn->is_open();}
  269. };
  270. boost::intrusive::list<read_op, intrusive::constant_time_size<false> > read_queue_;
  271. boost::intrusive::list<write_op, intrusive::constant_time_size<false> > write_queue_;
  272. public:
  273. read_op read(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return read_op{{}, this, loc}; }
  274. write_op write(const boost::source_location & loc = BOOST_CURRENT_LOCATION) {return write_op{{}, this, loc}; }
  275. };
  276. template<typename T>
  277. struct channel_reader
  278. {
  279. channel_reader(channel<T> & chan,
  280. const boost::source_location & loc = BOOST_CURRENT_LOCATION) : chan_(&chan), loc_(loc) {}
  281. auto operator co_await ()
  282. {
  283. return chan_->read(loc_);
  284. }
  285. explicit operator bool () const {return chan_ && chan_->is_open();}
  286. private:
  287. channel<T> * chan_;
  288. boost::source_location loc_;
  289. };
  290. }
  291. #include <boost/cobalt/impl/channel.hpp>
  292. #endif //BOOST_COBALT_CHANNEL_HPP