thread_pool.hpp 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. //
  2. // impl/thread_pool.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2025 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_IMPL_THREAD_POOL_HPP
  11. #define BOOST_ASIO_IMPL_THREAD_POOL_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/config.hpp>
  16. #include <boost/asio/detail/blocking_executor_op.hpp>
  17. #include <boost/asio/detail/executor_op.hpp>
  18. #include <boost/asio/detail/fenced_block.hpp>
  19. #include <boost/asio/detail/non_const_lvalue.hpp>
  20. #include <boost/asio/detail/type_traits.hpp>
  21. #include <boost/asio/execution_context.hpp>
  22. #include <boost/asio/detail/push_options.hpp>
  23. namespace boost {
  24. namespace asio {
  25. #if !defined(BOOST_ASIO_NO_TS_EXECUTORS)
  26. template <typename Allocator>
  27. thread_pool::thread_pool(allocator_arg_t, const Allocator& a)
  28. : execution_context(std::allocator_arg, a),
  29. scheduler_(boost::asio::make_service<detail::scheduler>(*this, false)),
  30. threads_(allocator<void>(*this)),
  31. num_threads_(default_thread_pool_size()),
  32. joinable_(true)
  33. {
  34. start();
  35. }
  36. #endif // !defined(BOOST_ASIO_NO_TS_EXECUTORS)
  37. template <typename Allocator>
  38. thread_pool::thread_pool(allocator_arg_t,
  39. const Allocator& a, std::size_t num_threads)
  40. : execution_context(std::allocator_arg, a,
  41. config_from_concurrency_hint(num_threads == 1 ? 1 : 0)),
  42. scheduler_(boost::asio::make_service<detail::scheduler>(*this, false)),
  43. threads_(allocator<void>(*this)),
  44. num_threads_(clamp_thread_pool_size(num_threads)),
  45. joinable_(true)
  46. {
  47. start();
  48. }
  49. template <typename Allocator>
  50. thread_pool::thread_pool(allocator_arg_t,
  51. const Allocator& a, std::size_t num_threads,
  52. const execution_context::service_maker& initial_services)
  53. : execution_context(std::allocator_arg, a, initial_services),
  54. scheduler_(boost::asio::make_service<detail::scheduler>(*this, false)),
  55. threads_(allocator<void>(*this)),
  56. num_threads_(clamp_thread_pool_size(num_threads)),
  57. joinable_(true)
  58. {
  59. start();
  60. }
  61. inline thread_pool::executor_type
  62. thread_pool::get_executor() noexcept
  63. {
  64. return executor_type(*this);
  65. }
  66. inline thread_pool::executor_type
  67. thread_pool::executor() noexcept
  68. {
  69. return executor_type(*this);
  70. }
  71. template <typename Allocator, unsigned int Bits>
  72. thread_pool::basic_executor_type<Allocator, Bits>&
  73. thread_pool::basic_executor_type<Allocator, Bits>::operator=(
  74. const basic_executor_type& other) noexcept
  75. {
  76. if (this != &other)
  77. {
  78. thread_pool* old_thread_pool = pool_;
  79. pool_ = other.pool_;
  80. allocator_ = other.allocator_;
  81. bits_ = other.bits_;
  82. if (Bits & outstanding_work_tracked)
  83. {
  84. if (pool_)
  85. pool_->scheduler_.work_started();
  86. if (old_thread_pool)
  87. old_thread_pool->scheduler_.work_finished();
  88. }
  89. }
  90. return *this;
  91. }
  92. template <typename Allocator, unsigned int Bits>
  93. thread_pool::basic_executor_type<Allocator, Bits>&
  94. thread_pool::basic_executor_type<Allocator, Bits>::operator=(
  95. basic_executor_type&& other) noexcept
  96. {
  97. if (this != &other)
  98. {
  99. thread_pool* old_thread_pool = pool_;
  100. pool_ = other.pool_;
  101. allocator_ = std::move(other.allocator_);
  102. bits_ = other.bits_;
  103. if (Bits & outstanding_work_tracked)
  104. {
  105. other.pool_ = 0;
  106. if (old_thread_pool)
  107. old_thread_pool->scheduler_.work_finished();
  108. }
  109. }
  110. return *this;
  111. }
  112. template <typename Allocator, unsigned int Bits>
  113. inline bool thread_pool::basic_executor_type<Allocator,
  114. Bits>::running_in_this_thread() const noexcept
  115. {
  116. return pool_->scheduler_.can_dispatch();
  117. }
  118. template <typename Allocator, unsigned int Bits>
  119. template <typename Function>
  120. void thread_pool::basic_executor_type<Allocator,
  121. Bits>::do_execute(Function&& f, false_type) const
  122. {
  123. typedef decay_t<Function> function_type;
  124. // Invoke immediately if the blocking.possibly property is enabled and we are
  125. // already inside the thread pool.
  126. if ((bits_ & blocking_never) == 0 && pool_->scheduler_.can_dispatch())
  127. {
  128. // Make a local, non-const copy of the function.
  129. function_type tmp(static_cast<Function&&>(f));
  130. #if !defined(BOOST_ASIO_NO_EXCEPTIONS)
  131. try
  132. {
  133. #endif // !defined(BOOST_ASIO_NO_EXCEPTIONS)
  134. detail::fenced_block b(detail::fenced_block::full);
  135. static_cast<function_type&&>(tmp)();
  136. return;
  137. #if !defined(BOOST_ASIO_NO_EXCEPTIONS)
  138. }
  139. catch (...)
  140. {
  141. std::terminate();
  142. return;
  143. }
  144. #endif // !defined(BOOST_ASIO_NO_EXCEPTIONS)
  145. }
  146. // Allocate and construct an operation to wrap the function.
  147. typedef detail::executor_op<function_type, Allocator> op;
  148. typename op::ptr p = { detail::addressof(allocator_),
  149. op::ptr::allocate(allocator_), 0 };
  150. p.p = new (p.v) op(static_cast<Function&&>(f), allocator_);
  151. if ((bits_ & relationship_continuation) != 0)
  152. {
  153. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  154. "thread_pool", pool_, 0, "execute(blk=never,rel=cont)"));
  155. }
  156. else
  157. {
  158. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  159. "thread_pool", pool_, 0, "execute(blk=never,rel=fork)"));
  160. }
  161. pool_->scheduler_.post_immediate_completion(p.p,
  162. (bits_ & relationship_continuation) != 0);
  163. p.v = p.p = 0;
  164. }
  165. template <typename Allocator, unsigned int Bits>
  166. template <typename Function>
  167. void thread_pool::basic_executor_type<Allocator,
  168. Bits>::do_execute(Function&& f, true_type) const
  169. {
  170. // Obtain a non-const instance of the function.
  171. detail::non_const_lvalue<Function> f2(f);
  172. // Invoke immediately if we are already inside the thread pool.
  173. if (pool_->scheduler_.can_dispatch())
  174. {
  175. #if !defined(BOOST_ASIO_NO_EXCEPTIONS)
  176. try
  177. {
  178. #endif // !defined(BOOST_ASIO_NO_EXCEPTIONS)
  179. detail::fenced_block b(detail::fenced_block::full);
  180. static_cast<decay_t<Function>&&>(f2.value)();
  181. return;
  182. #if !defined(BOOST_ASIO_NO_EXCEPTIONS)
  183. }
  184. catch (...)
  185. {
  186. std::terminate();
  187. }
  188. #endif // !defined(BOOST_ASIO_NO_EXCEPTIONS)
  189. }
  190. // Construct an operation to wrap the function.
  191. typedef decay_t<Function> function_type;
  192. detail::blocking_executor_op<function_type> op(f2.value);
  193. BOOST_ASIO_HANDLER_CREATION((*pool_, op,
  194. "thread_pool", pool_, 0, "execute(blk=always)"));
  195. pool_->scheduler_.post_immediate_completion(&op, false);
  196. op.wait();
  197. }
  198. #if !defined(BOOST_ASIO_NO_TS_EXECUTORS)
  199. template <typename Allocator, unsigned int Bits>
  200. inline thread_pool& thread_pool::basic_executor_type<
  201. Allocator, Bits>::context() const noexcept
  202. {
  203. return *pool_;
  204. }
  205. template <typename Allocator, unsigned int Bits>
  206. inline void thread_pool::basic_executor_type<Allocator,
  207. Bits>::on_work_started() const noexcept
  208. {
  209. pool_->scheduler_.work_started();
  210. }
  211. template <typename Allocator, unsigned int Bits>
  212. inline void thread_pool::basic_executor_type<Allocator,
  213. Bits>::on_work_finished() const noexcept
  214. {
  215. pool_->scheduler_.work_finished();
  216. }
  217. template <typename Allocator, unsigned int Bits>
  218. template <typename Function, typename OtherAllocator>
  219. void thread_pool::basic_executor_type<Allocator, Bits>::dispatch(
  220. Function&& f, const OtherAllocator& a) const
  221. {
  222. typedef decay_t<Function> function_type;
  223. // Invoke immediately if we are already inside the thread pool.
  224. if (pool_->scheduler_.can_dispatch())
  225. {
  226. // Make a local, non-const copy of the function.
  227. function_type tmp(static_cast<Function&&>(f));
  228. detail::fenced_block b(detail::fenced_block::full);
  229. static_cast<function_type&&>(tmp)();
  230. return;
  231. }
  232. // Allocate and construct an operation to wrap the function.
  233. typedef detail::executor_op<function_type, OtherAllocator> op;
  234. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  235. p.p = new (p.v) op(static_cast<Function&&>(f), a);
  236. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  237. "thread_pool", pool_, 0, "dispatch"));
  238. pool_->scheduler_.post_immediate_completion(p.p, false);
  239. p.v = p.p = 0;
  240. }
  241. template <typename Allocator, unsigned int Bits>
  242. template <typename Function, typename OtherAllocator>
  243. void thread_pool::basic_executor_type<Allocator, Bits>::post(
  244. Function&& f, const OtherAllocator& a) const
  245. {
  246. typedef decay_t<Function> function_type;
  247. // Allocate and construct an operation to wrap the function.
  248. typedef detail::executor_op<function_type, OtherAllocator> op;
  249. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  250. p.p = new (p.v) op(static_cast<Function&&>(f), a);
  251. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  252. "thread_pool", pool_, 0, "post"));
  253. pool_->scheduler_.post_immediate_completion(p.p, false);
  254. p.v = p.p = 0;
  255. }
  256. template <typename Allocator, unsigned int Bits>
  257. template <typename Function, typename OtherAllocator>
  258. void thread_pool::basic_executor_type<Allocator, Bits>::defer(
  259. Function&& f, const OtherAllocator& a) const
  260. {
  261. typedef decay_t<Function> function_type;
  262. // Allocate and construct an operation to wrap the function.
  263. typedef detail::executor_op<function_type, OtherAllocator> op;
  264. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  265. p.p = new (p.v) op(static_cast<Function&&>(f), a);
  266. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  267. "thread_pool", pool_, 0, "defer"));
  268. pool_->scheduler_.post_immediate_completion(p.p, true);
  269. p.v = p.p = 0;
  270. }
  271. #endif // !defined(BOOST_ASIO_NO_TS_EXECUTORS)
  272. } // namespace asio
  273. } // namespace boost
  274. #include <boost/asio/detail/pop_options.hpp>
  275. #endif // BOOST_ASIO_IMPL_THREAD_POOL_HPP