scheduler.ipp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. //
  2. // detail/impl/scheduler.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2025 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
  11. #define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #include <boost/asio/config.hpp>
  17. #include <boost/asio/detail/event.hpp>
  18. #include <boost/asio/detail/limits.hpp>
  19. #include <boost/asio/detail/scheduler.hpp>
  20. #include <boost/asio/detail/scheduler_thread_info.hpp>
  21. #include <boost/asio/detail/signal_blocker.hpp>
  22. #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  23. # include <boost/asio/detail/io_uring_service.hpp>
  24. #else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  25. # include <boost/asio/detail/reactor.hpp>
  26. #endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  27. #include <boost/asio/detail/push_options.hpp>
  28. namespace boost {
  29. namespace asio {
  30. namespace detail {
  31. class scheduler::thread_function
  32. {
  33. public:
  34. explicit thread_function(scheduler* s)
  35. : this_(s)
  36. {
  37. }
  38. void operator()()
  39. {
  40. boost::system::error_code ec;
  41. this_->run(ec);
  42. }
  43. private:
  44. scheduler* this_;
  45. };
  46. struct scheduler::task_cleanup
  47. {
  48. ~task_cleanup()
  49. {
  50. if (this_thread_->private_outstanding_work > 0)
  51. {
  52. boost::asio::detail::increment(
  53. scheduler_->outstanding_work_,
  54. this_thread_->private_outstanding_work);
  55. }
  56. this_thread_->private_outstanding_work = 0;
  57. // Enqueue the completed operations and reinsert the task at the end of
  58. // the operation queue.
  59. lock_->lock();
  60. scheduler_->task_interrupted_ = true;
  61. scheduler_->op_queue_.push(this_thread_->private_op_queue);
  62. scheduler_->op_queue_.push(&scheduler_->task_operation_);
  63. }
  64. scheduler* scheduler_;
  65. mutex::scoped_lock* lock_;
  66. thread_info* this_thread_;
  67. };
  68. struct scheduler::work_cleanup
  69. {
  70. ~work_cleanup()
  71. {
  72. if (this_thread_->private_outstanding_work > 1)
  73. {
  74. boost::asio::detail::increment(
  75. scheduler_->outstanding_work_,
  76. this_thread_->private_outstanding_work - 1);
  77. }
  78. else if (this_thread_->private_outstanding_work < 1)
  79. {
  80. scheduler_->work_finished();
  81. }
  82. this_thread_->private_outstanding_work = 0;
  83. #if defined(BOOST_ASIO_HAS_THREADS)
  84. if (!this_thread_->private_op_queue.empty())
  85. {
  86. lock_->lock();
  87. scheduler_->op_queue_.push(this_thread_->private_op_queue);
  88. }
  89. #endif // defined(BOOST_ASIO_HAS_THREADS)
  90. }
  91. scheduler* scheduler_;
  92. mutex::scoped_lock* lock_;
  93. thread_info* this_thread_;
  94. };
  95. scheduler::scheduler(boost::asio::execution_context& ctx,
  96. bool own_thread, get_task_func_type get_task)
  97. : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
  98. one_thread_(config(ctx).get("scheduler", "concurrency_hint", 0) == 1),
  99. mutex_(config(ctx).get("scheduler", "locking", true),
  100. config(ctx).get("scheduler", "locking_spin_count", 0)),
  101. task_(0),
  102. get_task_(get_task),
  103. task_interrupted_(true),
  104. stopped_(false),
  105. shutdown_(false),
  106. outstanding_work_(0),
  107. task_usec_(config(ctx).get("scheduler", "task_usec", -1L)),
  108. wait_usec_(config(ctx).get("scheduler", "wait_usec", -1L)),
  109. thread_()
  110. {
  111. BOOST_ASIO_HANDLER_TRACKING_INIT;
  112. if (own_thread)
  113. {
  114. ++outstanding_work_;
  115. signal_blocker sb;
  116. thread_ = thread(thread_function(this));
  117. }
  118. }
  119. scheduler::scheduler(scheduler::internal, boost::asio::execution_context& ctx)
  120. : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
  121. one_thread_(false),
  122. mutex_(true, 0),
  123. task_(0),
  124. get_task_(&scheduler::get_default_task),
  125. task_interrupted_(true),
  126. stopped_(false),
  127. shutdown_(false),
  128. outstanding_work_(0),
  129. task_usec_(-1L),
  130. wait_usec_(-1L)
  131. {
  132. BOOST_ASIO_HANDLER_TRACKING_INIT;
  133. }
  134. scheduler::~scheduler()
  135. {
  136. if (thread_.joinable())
  137. {
  138. mutex::scoped_lock lock(mutex_);
  139. shutdown_ = true;
  140. stop_all_threads(lock);
  141. lock.unlock();
  142. thread_.join();
  143. }
  144. }
  145. void scheduler::shutdown()
  146. {
  147. mutex::scoped_lock lock(mutex_);
  148. shutdown_ = true;
  149. if (thread_.joinable())
  150. stop_all_threads(lock);
  151. lock.unlock();
  152. // Join thread to ensure task operation is returned to queue.
  153. thread_.join();
  154. // Destroy handler objects.
  155. while (!op_queue_.empty())
  156. {
  157. operation* o = op_queue_.front();
  158. op_queue_.pop();
  159. if (o != &task_operation_)
  160. o->destroy();
  161. }
  162. // Reset to initial state.
  163. task_ = 0;
  164. }
  165. void scheduler::init_task()
  166. {
  167. mutex::scoped_lock lock(mutex_);
  168. if (!shutdown_ && !task_)
  169. {
  170. task_ = get_task_(this->context());
  171. op_queue_.push(&task_operation_);
  172. wake_one_thread_and_unlock(lock);
  173. }
  174. }
  175. std::size_t scheduler::run(boost::system::error_code& ec)
  176. {
  177. ec = boost::system::error_code();
  178. if (outstanding_work_ == 0)
  179. {
  180. stop();
  181. return 0;
  182. }
  183. thread_info this_thread;
  184. this_thread.private_outstanding_work = 0;
  185. thread_call_stack::context ctx(this, this_thread);
  186. mutex::scoped_lock lock(mutex_);
  187. std::size_t n = 0;
  188. for (; do_run_one(lock, this_thread, ec); lock.lock())
  189. if (n != (std::numeric_limits<std::size_t>::max)())
  190. ++n;
  191. return n;
  192. }
  193. std::size_t scheduler::run_one(boost::system::error_code& ec)
  194. {
  195. ec = boost::system::error_code();
  196. if (outstanding_work_ == 0)
  197. {
  198. stop();
  199. return 0;
  200. }
  201. thread_info this_thread;
  202. this_thread.private_outstanding_work = 0;
  203. thread_call_stack::context ctx(this, this_thread);
  204. mutex::scoped_lock lock(mutex_);
  205. return do_run_one(lock, this_thread, ec);
  206. }
  207. std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec)
  208. {
  209. ec = boost::system::error_code();
  210. if (outstanding_work_ == 0)
  211. {
  212. stop();
  213. return 0;
  214. }
  215. thread_info this_thread;
  216. this_thread.private_outstanding_work = 0;
  217. thread_call_stack::context ctx(this, this_thread);
  218. mutex::scoped_lock lock(mutex_);
  219. return do_wait_one(lock, this_thread, usec, ec);
  220. }
  221. std::size_t scheduler::poll(boost::system::error_code& ec)
  222. {
  223. ec = boost::system::error_code();
  224. if (outstanding_work_ == 0)
  225. {
  226. stop();
  227. return 0;
  228. }
  229. thread_info this_thread;
  230. this_thread.private_outstanding_work = 0;
  231. thread_call_stack::context ctx(this, this_thread);
  232. mutex::scoped_lock lock(mutex_);
  233. #if defined(BOOST_ASIO_HAS_THREADS)
  234. // We want to support nested calls to poll() and poll_one(), so any handlers
  235. // that are already on a thread-private queue need to be put on to the main
  236. // queue now.
  237. if (one_thread_)
  238. if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
  239. op_queue_.push(outer_info->private_op_queue);
  240. #endif // defined(BOOST_ASIO_HAS_THREADS)
  241. std::size_t n = 0;
  242. for (; do_poll_one(lock, this_thread, ec); lock.lock())
  243. if (n != (std::numeric_limits<std::size_t>::max)())
  244. ++n;
  245. return n;
  246. }
  247. std::size_t scheduler::poll_one(boost::system::error_code& ec)
  248. {
  249. ec = boost::system::error_code();
  250. if (outstanding_work_ == 0)
  251. {
  252. stop();
  253. return 0;
  254. }
  255. thread_info this_thread;
  256. this_thread.private_outstanding_work = 0;
  257. thread_call_stack::context ctx(this, this_thread);
  258. mutex::scoped_lock lock(mutex_);
  259. #if defined(BOOST_ASIO_HAS_THREADS)
  260. // We want to support nested calls to poll() and poll_one(), so any handlers
  261. // that are already on a thread-private queue need to be put on to the main
  262. // queue now.
  263. if (one_thread_)
  264. if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
  265. op_queue_.push(outer_info->private_op_queue);
  266. #endif // defined(BOOST_ASIO_HAS_THREADS)
  267. return do_poll_one(lock, this_thread, ec);
  268. }
  269. void scheduler::stop()
  270. {
  271. mutex::scoped_lock lock(mutex_);
  272. stop_all_threads(lock);
  273. }
  274. bool scheduler::stopped() const
  275. {
  276. mutex::scoped_lock lock(mutex_);
  277. return stopped_;
  278. }
  279. void scheduler::restart()
  280. {
  281. mutex::scoped_lock lock(mutex_);
  282. stopped_ = false;
  283. }
  284. void scheduler::compensating_work_started()
  285. {
  286. thread_info_base* this_thread = thread_call_stack::contains(this);
  287. BOOST_ASIO_ASSUME(this_thread != 0); // Only called from inside scheduler.
  288. ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
  289. }
  290. bool scheduler::can_dispatch()
  291. {
  292. return thread_call_stack::contains(this) != 0;
  293. }
  294. void scheduler::capture_current_exception()
  295. {
  296. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  297. this_thread->capture_current_exception();
  298. }
  299. void scheduler::post_immediate_completion(
  300. scheduler::operation* op, bool is_continuation)
  301. {
  302. #if defined(BOOST_ASIO_HAS_THREADS)
  303. if (one_thread_ || is_continuation)
  304. {
  305. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  306. {
  307. ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
  308. static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
  309. return;
  310. }
  311. }
  312. #else // defined(BOOST_ASIO_HAS_THREADS)
  313. (void)is_continuation;
  314. #endif // defined(BOOST_ASIO_HAS_THREADS)
  315. work_started();
  316. mutex::scoped_lock lock(mutex_);
  317. op_queue_.push(op);
  318. wake_one_thread_and_unlock(lock);
  319. }
  320. void scheduler::post_immediate_completions(std::size_t n,
  321. op_queue<scheduler::operation>& ops, bool is_continuation)
  322. {
  323. #if defined(BOOST_ASIO_HAS_THREADS)
  324. if (one_thread_ || is_continuation)
  325. {
  326. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  327. {
  328. static_cast<thread_info*>(this_thread)->private_outstanding_work
  329. += static_cast<long>(n);
  330. static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
  331. return;
  332. }
  333. }
  334. #else // defined(BOOST_ASIO_HAS_THREADS)
  335. (void)is_continuation;
  336. #endif // defined(BOOST_ASIO_HAS_THREADS)
  337. increment(outstanding_work_, static_cast<long>(n));
  338. mutex::scoped_lock lock(mutex_);
  339. op_queue_.push(ops);
  340. wake_one_thread_and_unlock(lock);
  341. }
  342. void scheduler::post_deferred_completion(scheduler::operation* op)
  343. {
  344. #if defined(BOOST_ASIO_HAS_THREADS)
  345. if (one_thread_)
  346. {
  347. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  348. {
  349. static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
  350. return;
  351. }
  352. }
  353. #endif // defined(BOOST_ASIO_HAS_THREADS)
  354. mutex::scoped_lock lock(mutex_);
  355. op_queue_.push(op);
  356. wake_one_thread_and_unlock(lock);
  357. }
  358. void scheduler::post_deferred_completions(
  359. op_queue<scheduler::operation>& ops)
  360. {
  361. if (!ops.empty())
  362. {
  363. #if defined(BOOST_ASIO_HAS_THREADS)
  364. if (one_thread_)
  365. {
  366. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  367. {
  368. static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
  369. return;
  370. }
  371. }
  372. #endif // defined(BOOST_ASIO_HAS_THREADS)
  373. mutex::scoped_lock lock(mutex_);
  374. op_queue_.push(ops);
  375. wake_one_thread_and_unlock(lock);
  376. }
  377. }
  378. void scheduler::do_dispatch(
  379. scheduler::operation* op)
  380. {
  381. work_started();
  382. mutex::scoped_lock lock(mutex_);
  383. op_queue_.push(op);
  384. wake_one_thread_and_unlock(lock);
  385. }
  386. void scheduler::abandon_operations(
  387. op_queue<scheduler::operation>& ops)
  388. {
  389. op_queue<scheduler::operation> ops2;
  390. ops2.push(ops);
  391. }
  392. std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
  393. scheduler::thread_info& this_thread,
  394. const boost::system::error_code& ec)
  395. {
  396. while (!stopped_)
  397. {
  398. if (!op_queue_.empty())
  399. {
  400. // Prepare to execute first handler from queue.
  401. operation* o = op_queue_.front();
  402. op_queue_.pop();
  403. bool more_handlers = (!op_queue_.empty());
  404. if (o == &task_operation_)
  405. {
  406. task_interrupted_ = more_handlers || task_usec_ == 0;
  407. if (more_handlers && !one_thread_ && wait_usec_ != 0)
  408. wakeup_event_.unlock_and_signal_one(lock);
  409. else
  410. lock.unlock();
  411. task_cleanup on_exit = { this, &lock, &this_thread };
  412. (void)on_exit;
  413. // Run the task. May throw an exception. Only block if the operation
  414. // queue is empty and we're not polling, otherwise we want to return
  415. // as soon as possible.
  416. task_->run(more_handlers ? 0 : task_usec_,
  417. this_thread.private_op_queue);
  418. }
  419. else
  420. {
  421. std::size_t task_result = o->task_result_;
  422. if (more_handlers && !one_thread_)
  423. wake_one_thread_and_unlock(lock);
  424. else
  425. lock.unlock();
  426. // Ensure the count of outstanding work is decremented on block exit.
  427. work_cleanup on_exit = { this, &lock, &this_thread };
  428. (void)on_exit;
  429. // Complete the operation. May throw an exception. Deletes the object.
  430. o->complete(this, ec, task_result);
  431. this_thread.rethrow_pending_exception();
  432. return 1;
  433. }
  434. }
  435. else
  436. {
  437. if (wait_usec_ == 0)
  438. {
  439. lock.unlock();
  440. lock.lock();
  441. }
  442. else
  443. {
  444. wakeup_event_.clear(lock);
  445. if (wait_usec_ > 0)
  446. wakeup_event_.wait_for_usec(lock, wait_usec_);
  447. else
  448. wakeup_event_.wait(lock);
  449. }
  450. }
  451. }
  452. return 0;
  453. }
  454. std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock,
  455. scheduler::thread_info& this_thread, long usec,
  456. const boost::system::error_code& ec)
  457. {
  458. if (stopped_)
  459. return 0;
  460. operation* o = op_queue_.front();
  461. if (o == 0)
  462. {
  463. wakeup_event_.clear(lock);
  464. usec = (wait_usec_ >= 0 && wait_usec_ < usec) ? wait_usec_ : usec;
  465. wakeup_event_.wait_for_usec(lock, usec);
  466. usec = 0; // Wait at most once.
  467. o = op_queue_.front();
  468. }
  469. if (o == &task_operation_)
  470. {
  471. op_queue_.pop();
  472. bool more_handlers = (!op_queue_.empty());
  473. usec = (task_usec_ >= 0 && task_usec_ < usec) ? task_usec_ : usec;
  474. task_interrupted_ = more_handlers || usec == 0;
  475. if (more_handlers && !one_thread_ && wait_usec_ != 0)
  476. wakeup_event_.unlock_and_signal_one(lock);
  477. else
  478. lock.unlock();
  479. {
  480. task_cleanup on_exit = { this, &lock, &this_thread };
  481. (void)on_exit;
  482. // Run the task. May throw an exception. Only block if the operation
  483. // queue is empty and we're not polling, otherwise we want to return
  484. // as soon as possible.
  485. task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue);
  486. }
  487. o = op_queue_.front();
  488. if (o == &task_operation_)
  489. {
  490. if (!one_thread_)
  491. wakeup_event_.maybe_unlock_and_signal_one(lock);
  492. return 0;
  493. }
  494. }
  495. if (o == 0)
  496. return 0;
  497. op_queue_.pop();
  498. bool more_handlers = (!op_queue_.empty());
  499. std::size_t task_result = o->task_result_;
  500. if (more_handlers && !one_thread_)
  501. wake_one_thread_and_unlock(lock);
  502. else
  503. lock.unlock();
  504. // Ensure the count of outstanding work is decremented on block exit.
  505. work_cleanup on_exit = { this, &lock, &this_thread };
  506. (void)on_exit;
  507. // Complete the operation. May throw an exception. Deletes the object.
  508. o->complete(this, ec, task_result);
  509. this_thread.rethrow_pending_exception();
  510. return 1;
  511. }
  512. std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock,
  513. scheduler::thread_info& this_thread,
  514. const boost::system::error_code& ec)
  515. {
  516. if (stopped_)
  517. return 0;
  518. operation* o = op_queue_.front();
  519. if (o == &task_operation_)
  520. {
  521. op_queue_.pop();
  522. lock.unlock();
  523. {
  524. task_cleanup c = { this, &lock, &this_thread };
  525. (void)c;
  526. // Run the task. May throw an exception. Only block if the operation
  527. // queue is empty and we're not polling, otherwise we want to return
  528. // as soon as possible.
  529. task_->run(0, this_thread.private_op_queue);
  530. }
  531. o = op_queue_.front();
  532. if (o == &task_operation_)
  533. {
  534. wakeup_event_.maybe_unlock_and_signal_one(lock);
  535. return 0;
  536. }
  537. }
  538. if (o == 0)
  539. return 0;
  540. op_queue_.pop();
  541. bool more_handlers = (!op_queue_.empty());
  542. std::size_t task_result = o->task_result_;
  543. if (more_handlers && !one_thread_)
  544. wake_one_thread_and_unlock(lock);
  545. else
  546. lock.unlock();
  547. // Ensure the count of outstanding work is decremented on block exit.
  548. work_cleanup on_exit = { this, &lock, &this_thread };
  549. (void)on_exit;
  550. // Complete the operation. May throw an exception. Deletes the object.
  551. o->complete(this, ec, task_result);
  552. this_thread.rethrow_pending_exception();
  553. return 1;
  554. }
  555. void scheduler::stop_all_threads(
  556. mutex::scoped_lock& lock)
  557. {
  558. stopped_ = true;
  559. wakeup_event_.signal_all(lock);
  560. if (!task_interrupted_ && task_)
  561. {
  562. task_interrupted_ = true;
  563. task_->interrupt();
  564. }
  565. }
  566. void scheduler::wake_one_thread_and_unlock(
  567. mutex::scoped_lock& lock)
  568. {
  569. if (wait_usec_ == 0 || !wakeup_event_.maybe_unlock_and_signal_one(lock))
  570. {
  571. if (!task_interrupted_ && task_)
  572. {
  573. task_interrupted_ = true;
  574. task_->interrupt();
  575. }
  576. lock.unlock();
  577. }
  578. }
  579. scheduler_task* scheduler::get_default_task(boost::asio::execution_context& ctx)
  580. {
  581. #if defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  582. return &use_service<io_uring_service>(ctx);
  583. #else // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  584. return &use_service<reactor>(ctx);
  585. #endif // defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  586. }
  587. } // namespace detail
  588. } // namespace asio
  589. } // namespace boost
  590. #include <boost/asio/detail/pop_options.hpp>
  591. #endif // BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP