fork.hpp 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. // Copyright (c) 2023 Klemens D. Morgenstern
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef BOOST_COBALT_DETAIL_FORK_HPP
  6. #define BOOST_COBALT_DETAIL_FORK_HPP
  7. #include <boost/cobalt/config.hpp>
  8. #include <boost/cobalt/detail/await_result_helper.hpp>
  9. #include <boost/cobalt/detail/util.hpp>
  10. #include <boost/cobalt/this_thread.hpp>
  11. #include <boost/cobalt/unique_handle.hpp>
  12. #if defined(BOOST_COBALT_NO_PMR)
  13. #include <boost/cobalt/detail/monotonic_resource.hpp>
  14. #endif
  15. #include <boost/asio/cancellation_signal.hpp>
  16. #include <boost/intrusive_ptr.hpp>
  17. #include <coroutine>
  18. #include <optional>
  19. namespace boost::cobalt::detail
  20. {
  21. struct fork
  22. {
  23. fork() = default;
  24. struct shared_state
  25. {
  26. #if !defined(BOOST_COBALT_NO_PMR)
  27. pmr::monotonic_buffer_resource resource{};
  28. template<typename ... Args>
  29. shared_state(Args && ... args)
  30. : resource(std::forward<Args>(args)...,
  31. this_thread::get_default_resource())
  32. {
  33. }
  34. #else
  35. detail::monotonic_resource resource;
  36. template<typename ... Args>
  37. shared_state(Args && ... args)
  38. : resource(std::forward<Args>(args)...)
  39. {
  40. }
  41. #endif
  42. // the coro awaiting the fork statement, e.g. awaiting race
  43. unique_handle<void> coro{};
  44. std::size_t use_count = 0u;
  45. friend void intrusive_ptr_add_ref(shared_state * st) {st->use_count++;}
  46. friend void intrusive_ptr_release(shared_state * st)
  47. {
  48. if (st->use_count-- == 1u)
  49. st->coro.reset();
  50. }
  51. bool outstanding_work() {return use_count != 0u;}
  52. std::optional<executor> exec{};
  53. bool wired_up() {return exec.has_value();}
  54. using executor_type = executor;
  55. const executor_type & get_executor() const
  56. {
  57. BOOST_ASSERT(exec.has_value());
  58. return *exec;
  59. }
  60. #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  61. boost::source_location loc;
  62. #endif
  63. };
  64. template<typename std::size_t BufferSize>
  65. struct static_shared_state : private std::array<char, BufferSize>, shared_state
  66. {
  67. static_shared_state() : shared_state{std::array<char, BufferSize>::data(),
  68. std::array<char, BufferSize>::size()}
  69. {}
  70. };
  71. struct wired_up_t {};
  72. constexpr static wired_up_t wired_up{};
  73. struct set_transaction_function
  74. {
  75. void * begin_transaction_this = nullptr;
  76. void (*begin_transaction_func)(void*) = nullptr;
  77. template<typename BeginTransaction>
  78. set_transaction_function(BeginTransaction & transaction)
  79. : begin_transaction_this(&transaction)
  80. , begin_transaction_func(
  81. +[](void * ptr)
  82. {
  83. (*static_cast<BeginTransaction*>(ptr))();
  84. })
  85. {
  86. }
  87. };
  88. struct promise_type
  89. {
  90. template<typename State, typename ... Rest>
  91. void * operator new(const std::size_t size, State & st, Rest &&...)
  92. {
  93. return st.resource.allocate(size);
  94. }
  95. void operator delete(void *) noexcept {}
  96. template<typename ... Rest>
  97. promise_type(shared_state & st, Rest & ...)
  98. : state(&st)
  99. {
  100. }
  101. intrusive_ptr<shared_state> state;
  102. asio::cancellation_slot cancel;
  103. using executor_type = executor;
  104. const executor_type & get_executor() const { return state->get_executor(); }
  105. #if defined(BOOST_COBALT_NO_PMR)
  106. using allocator_type = detail::monotonic_allocator<void>;
  107. const allocator_type get_allocator() const { return &state->resource; }
  108. #else
  109. using allocator_type = pmr::polymorphic_allocator<void>;
  110. const allocator_type get_allocator() const { return &state->resource; }
  111. #endif
  112. using cancellation_slot_type = asio::cancellation_slot;
  113. cancellation_slot_type get_cancellation_slot() const { return cancel; }
  114. constexpr static std::suspend_never initial_suspend() noexcept {return {};}
  115. struct final_awaitable
  116. {
  117. promise_type * self;
  118. bool await_ready() const noexcept
  119. {
  120. return self->state->use_count != 1u;
  121. }
  122. std::coroutine_handle<void> await_suspend(std::coroutine_handle<promise_type> h) noexcept
  123. {
  124. auto pp = h.promise().state.detach();
  125. #if defined(BOOST_COBALT_NO_SELF_DELETE)
  126. h.promise().~promise_type();
  127. #else
  128. // mem is in a monotonic_resource, this is fine on msvc- gcc doesn't like it though
  129. h.destroy();
  130. #endif
  131. pp->use_count--;
  132. BOOST_ASSERT(pp->use_count == 0u);
  133. if (pp->coro)
  134. return pp->coro.release();
  135. else
  136. return std::noop_coroutine();
  137. }
  138. constexpr static void await_resume() noexcept {}
  139. };
  140. final_awaitable final_suspend() noexcept
  141. {
  142. if (cancel.is_connected())
  143. cancel.clear();
  144. return final_awaitable{this};
  145. }
  146. void return_void()
  147. {
  148. }
  149. template<awaitable<promise_type> Aw>
  150. struct wrapped_awaitable
  151. {
  152. Aw & aw;
  153. constexpr static bool await_ready() noexcept
  154. {
  155. return false;
  156. }
  157. auto await_suspend(std::coroutine_handle<promise_type> h)
  158. {
  159. BOOST_ASSERT(h.promise().state->wired_up());
  160. #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  161. if constexpr (requires {aw.await_suspend(h, boost::source_location ());})
  162. return aw.await_suspend(h, h.promise().state->loc);
  163. #endif
  164. return aw.await_suspend(h);
  165. }
  166. auto await_resume()
  167. {
  168. return aw.await_resume();
  169. }
  170. };
  171. template<awaitable<promise_type> Aw>
  172. auto await_transform(Aw & aw)
  173. {
  174. return wrapped_awaitable<Aw>{aw};
  175. }
  176. struct wired_up_awaitable
  177. {
  178. promise_type * promise;
  179. bool await_ready() const noexcept
  180. {
  181. return promise->state->wired_up();
  182. }
  183. void await_suspend(std::coroutine_handle<promise_type>)
  184. {
  185. }
  186. constexpr static void await_resume() noexcept {}
  187. };
  188. auto await_transform(wired_up_t)
  189. {
  190. return wired_up_awaitable{this};
  191. }
  192. auto await_transform(set_transaction_function sf)
  193. {
  194. begin_transaction_this = sf.begin_transaction_this;
  195. begin_transaction_func = sf.begin_transaction_func;
  196. return std::suspend_never();
  197. }
  198. auto await_transform(asio::cancellation_slot slot)
  199. {
  200. this->cancel = slot;
  201. return std::suspend_never();
  202. }
  203. [[noreturn]] void unhandled_exception() noexcept {std::terminate();}
  204. void * begin_transaction_this = nullptr;
  205. void (*begin_transaction_func)(void*) = nullptr;
  206. void begin_transaction()
  207. {
  208. if (begin_transaction_this)
  209. begin_transaction_func(begin_transaction_this);
  210. }
  211. fork get_return_object()
  212. {
  213. return this;
  214. }
  215. };
  216. [[nodiscard]] bool done() const
  217. {
  218. return ! handle_ || handle_.done();
  219. }
  220. auto release() -> std::coroutine_handle<promise_type>
  221. {
  222. return handle_.release();
  223. }
  224. private:
  225. fork(promise_type * pt) : handle_(pt) {}
  226. unique_handle<promise_type> handle_;
  227. };
  228. }
  229. #endif //BOOST_COBALT_DETAIL_FORK_HPP