channel.hpp 12 KB


  1. //
  2. // Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_COBALT_IMPL_CHANNEL_HPP
  8. #define BOOST_COBALT_IMPL_CHANNEL_HPP
  9. #include <boost/cobalt/channel.hpp>
  10. #include <boost/cobalt/result.hpp>
  11. #include <boost/asio/post.hpp>
  12. namespace boost::cobalt
  13. {
  14. #if !defined(BOOST_COBALT_NO_PMR)
  15. template<typename T>
  16. inline channel<T>::channel(
  17. std::size_t limit,
  18. executor executor,
  19. pmr::memory_resource * resource)
  20. : buffer_(limit, pmr::polymorphic_allocator<T>(resource)), executor_(executor) {}
  21. #else
  22. template<typename T>
  23. inline channel<T>::channel(
  24. std::size_t limit,
  25. executor executor)
  26. : buffer_(limit), executor_(executor) {}
  27. #endif
  28. template<typename T>
  29. auto channel<T>::get_executor() -> const executor_type & {return executor_;}
  30. template<typename T>
  31. bool channel<T>::is_open() const {return !is_closed_;}
  32. template<typename T>
  33. channel<T>::~channel()
  34. {
  35. while (!read_queue_.empty())
  36. read_queue_.front().awaited_from.reset();
  37. while (!write_queue_.empty())
  38. write_queue_.front().awaited_from.reset();
  39. }
  40. template<typename T>
  41. void channel<T>::close()
  42. {
  43. is_closed_ = true;
  44. while (!read_queue_.empty())
  45. {
  46. auto & op = read_queue_.front();
  47. op.unlink();
  48. op.cancelled = true;
  49. op.cancel_slot.clear();
  50. if (op.awaited_from)
  51. asio::post(executor_, std::move(op.awaited_from));
  52. }
  53. while (!write_queue_.empty())
  54. {
  55. auto & op = write_queue_.front();
  56. op.unlink();
  57. op.cancelled = true;
  58. op.closed = true;
  59. op.cancel_slot.clear();
  60. if (op.awaited_from)
  61. asio::post(executor_, std::move(op.awaited_from));
  62. }
  63. }
  64. template<typename T>
  65. struct channel<T>::read_op::cancel_impl
  66. {
  67. read_op * op;
  68. cancel_impl(read_op * op) : op(op) {}
  69. void operator()(asio::cancellation_type)
  70. {
  71. op->cancelled = true;
  72. op->unlink();
  73. if (op->awaited_from)
  74. asio::post(
  75. op->chn->executor_,
  76. std::move(op->awaited_from));
  77. op->cancel_slot.clear();
  78. }
  79. };
  80. template<typename T>
  81. template<typename Promise>
  82. std::coroutine_handle<void> channel<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
  83. {
  84. if (cancelled)
  85. return h; // already interrupted.
  86. if constexpr (requires {h.promise().get_cancellation_slot();})
  87. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  88. cancel_slot.emplace<cancel_impl>(this);
  89. if (awaited_from)
  90. boost::throw_exception(std::runtime_error("already-awaited"), loc);
  91. awaited_from.reset(h.address());
  92. // currently nothing to read
  93. if constexpr (requires {h.promise().begin_transaction();})
  94. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  95. if (chn->write_queue_.empty())
  96. {
  97. chn->read_queue_.push_back(*this);
  98. return std::noop_coroutine();
  99. }
  100. else
  101. {
  102. cancel_slot.clear();
  103. auto & op = chn->write_queue_.front();
  104. // transactional_unlink can interrupt or cancel `op` through `race`, so we need to check.
  105. op.direct = true;
  106. if constexpr (std::is_copy_constructible_v<T>)
  107. {
  108. if (op.ref.index() == 0)
  109. direct = std::move(*variant2::get<0>(op.ref));
  110. else
  111. direct = *variant2::get<1>(op.ref);
  112. }
  113. else
  114. direct = std::move(*op.ref);
  115. op.transactional_unlink();
  116. BOOST_ASSERT(op.awaited_from);
  117. BOOST_ASSERT(awaited_from);
  118. asio::post(chn->executor_, std::move(awaited_from));
  119. return op.awaited_from.release();
  120. }
  121. }
  122. template<typename T>
  123. T channel<T>::read_op::await_resume()
  124. {
  125. return await_resume(as_result_tag{}).value(loc);
  126. }
  127. template<typename T>
  128. std::tuple<system::error_code, T> channel<T>::read_op::await_resume(const struct as_tuple_tag &)
  129. {
  130. auto res = await_resume(as_result_tag{});
  131. if (res.has_error())
  132. return {res.error(), T{}};
  133. else
  134. return {system::error_code{}, std::move(*res)};
  135. }
  136. template<typename T>
  137. system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &)
  138. {
  139. if (cancel_slot.is_connected())
  140. cancel_slot.clear();
  141. if (chn->is_closed_ && chn->buffer_.empty() && !direct)
  142. {
  143. constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
  144. return {system::in_place_error, asio::error::broken_pipe, &loc};
  145. }
  146. if (cancelled)
  147. {
  148. constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
  149. return {system::in_place_error, asio::error::operation_aborted, &loc};
  150. }
  151. T value = chn->buffer_.empty() ? std::move(*direct) : std::move(chn->buffer_.front());
  152. if (!chn->buffer_.empty())
  153. {
  154. chn->buffer_.pop_front();
  155. if (direct)
  156. chn->buffer_.push_back(std::move(*direct));
  157. }
  158. if (!chn->write_queue_.empty())
  159. {
  160. auto &op = chn->write_queue_.front();
  161. BOOST_ASSERT(chn->read_queue_.empty());
  162. if (op.await_ready())
  163. {
  164. op.unlink();
  165. if (!op.cancelled && !op.closed)
  166. {
  167. op.direct = true;
  168. if constexpr (std::is_copy_constructible_v<T>)
  169. {
  170. if (op.ref.index() == 0)
  171. chn->buffer_.push_back(std::move(*variant2::get<0>(op.ref)));
  172. else
  173. chn->buffer_.push_back(*variant2::get<1>(op.ref));
  174. }
  175. else
  176. chn->buffer_.push_back(std::move(*op.ref));
  177. }
  178. BOOST_ASSERT(op.awaited_from);
  179. asio::post(chn->executor_, std::move(op.awaited_from));
  180. }
  181. }
  182. return {system::in_place_value, std::move(value)};
  183. }
  184. template<typename T>
  185. struct channel<T>::write_op::cancel_impl
  186. {
  187. write_op * op;
  188. cancel_impl(write_op * op) : op(op) {}
  189. void operator()(asio::cancellation_type)
  190. {
  191. op->cancelled = true;
  192. op->unlink();
  193. if (op->awaited_from)
  194. asio::post(
  195. op->chn->executor_, std::move(op->awaited_from));
  196. op->cancel_slot.clear();
  197. }
  198. };
  199. template<typename T>
  200. template<typename Promise>
  201. std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
  202. {
  203. if (cancelled)
  204. return h; // already interrupted.
  205. if constexpr (requires {h.promise().get_cancellation_slot();})
  206. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  207. cancel_slot.emplace<cancel_impl>(this);
  208. awaited_from.reset(h.address());
  209. if constexpr (requires {h.promise().begin_transaction();})
  210. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  211. BOOST_ASSERT(this->chn->buffer_.full());
  212. if (chn->read_queue_.empty())
  213. {
  214. chn->write_queue_.push_back(*this);
  215. return std::noop_coroutine();
  216. }
  217. else
  218. {
  219. cancel_slot.clear();
  220. auto & op = chn->read_queue_.front();
  221. if constexpr (std::is_copy_constructible_v<T>)
  222. {
  223. if (ref.index() == 0)
  224. op.direct.emplace(std::move(*variant2::get<0>(ref)));
  225. else
  226. op.direct.emplace(*variant2::get<1>(ref));
  227. }
  228. else
  229. op.direct.emplace(std::move(*ref));
  230. direct = true;
  231. op.transactional_unlink();
  232. BOOST_ASSERT(op.awaited_from);
  233. BOOST_ASSERT(awaited_from);
  234. asio::post(chn->executor_, std::move(awaited_from));
  235. return op.awaited_from.release();
  236. }
  237. }
  238. template<typename T>
  239. std::tuple<system::error_code> channel<T>::write_op::await_resume(const struct as_tuple_tag &)
  240. {
  241. return await_resume(as_result_tag{}).error();
  242. }
  243. template<typename T>
  244. void channel<T>::write_op::await_resume()
  245. {
  246. await_resume(as_result_tag{}).value(loc);
  247. }
  248. template<typename T>
  249. system::result<void> channel<T>::write_op::await_resume(const struct as_result_tag &)
  250. {
  251. if (cancel_slot.is_connected())
  252. cancel_slot.clear();
  253. if (closed)
  254. {
  255. constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
  256. return {system::in_place_error, asio::error::broken_pipe, &loc};
  257. }
  258. if (cancelled)
  259. {
  260. constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
  261. return {system::in_place_error, asio::error::operation_aborted, &loc};
  262. }
  263. if (!direct)
  264. {
  265. BOOST_ASSERT(!chn->buffer_.full());
  266. if constexpr (std::is_copy_constructible_v<T>)
  267. {
  268. if (ref.index() == 0)
  269. chn->buffer_.push_back(std::move(*variant2::get<0>(ref)));
  270. else
  271. chn->buffer_.push_back(*variant2::get<1>(ref));
  272. }
  273. else
  274. chn->buffer_.push_back(std::move(*ref));
  275. }
  276. if (!chn->read_queue_.empty())
  277. {
  278. auto & op = chn->read_queue_.front();
  279. BOOST_ASSERT(chn->write_queue_.empty());
  280. if (op.await_ready())
  281. {
  282. // unlink?
  283. op.unlink();
  284. BOOST_ASSERT(op.awaited_from);
  285. asio::post(chn->executor_, std::move(op.awaited_from));
  286. }
  287. }
  288. return system::in_place_value;
  289. }
  290. struct channel<void>::read_op::cancel_impl
  291. {
  292. read_op * op;
  293. cancel_impl(read_op * op) : op(op) {}
  294. void operator()(asio::cancellation_type)
  295. {
  296. op->cancelled = true;
  297. op->unlink();
  298. if (op->awaited_from)
  299. asio::post(op->chn->executor_, std::move(op->awaited_from));
  300. op->cancel_slot.clear();
  301. }
  302. };
  303. struct channel<void>::write_op::cancel_impl
  304. {
  305. write_op * op;
  306. cancel_impl(write_op * op) : op(op) {}
  307. void operator()(asio::cancellation_type)
  308. {
  309. op->cancelled = true;
  310. op->unlink();
  311. if (op->awaited_from)
  312. asio::post(op->chn->executor_, std::move(op->awaited_from));
  313. op->cancel_slot.clear();
  314. }
  315. };
  316. template<typename Promise>
  317. std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
  318. {
  319. if (cancelled)
  320. return h; // already interrupted.
  321. if constexpr (requires {h.promise().get_cancellation_slot();})
  322. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  323. cancel_slot.emplace<cancel_impl>(this);
  324. if (awaited_from)
  325. boost::throw_exception(std::runtime_error("already-awaited"), loc);
  326. awaited_from.reset(h.address());
  327. if constexpr (requires {h.promise().begin_transaction();})
  328. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  329. // nothing to read currently, enqueue
  330. if (chn->write_queue_.empty())
  331. {
  332. chn->read_queue_.push_back(*this);
  333. return std::noop_coroutine();
  334. }
  335. else // we're good, we can read, so we'll do that, but we need to post, so we need to initialize a transactin.
  336. {
  337. cancel_slot.clear();
  338. auto & op = chn->write_queue_.front();
  339. op.direct = true;
  340. direct = true;
  341. op.transactional_unlink();
  342. BOOST_ASSERT(op.awaited_from);
  343. BOOST_ASSERT(awaited_from);
  344. asio::post(chn->executor_, std::move(awaited_from));
  345. return op.awaited_from.release();
  346. }
  347. }
  348. template<typename Promise>
  349. std::coroutine_handle<void> channel<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
  350. {
  351. if (cancelled)
  352. return h; // already interrupted.
  353. if constexpr (requires {h.promise().get_cancellation_slot();})
  354. if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
  355. cancel_slot.emplace<cancel_impl>(this);
  356. awaited_from.reset(h.address());
  357. // currently nothing to read
  358. if constexpr (requires {h.promise().begin_transaction();})
  359. begin_transaction = +[](void * p){std::coroutine_handle<Promise>::from_address(p).promise().begin_transaction();};
  360. if (chn->read_queue_.empty())
  361. {
  362. chn->write_queue_.push_back(*this);
  363. return std::noop_coroutine();
  364. }
  365. else
  366. {
  367. cancel_slot.clear();
  368. auto & op = chn->read_queue_.front();
  369. op.direct = true; // let interrupt_await know that we'll be resuming it!
  370. direct = true;
  371. op.transactional_unlink();
  372. BOOST_ASSERT(op.awaited_from);
  373. BOOST_ASSERT(awaited_from);
  374. asio::post(chn->executor_, std::move(awaited_from));
  375. return op.awaited_from.release();
  376. }
  377. }
  378. }
  379. #endif //BOOST_COBALT_IMPL_CHANNEL_HPP