io_uring_service.ipp 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921
  1. //
  2. // detail/impl/io_uring_service.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2025 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
  11. #define BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #if defined(BOOST_ASIO_HAS_IO_URING)
  17. #include <cstddef>
  18. #include <sys/eventfd.h>
  19. #include <boost/asio/detail/io_uring_service.hpp>
  20. #include <boost/asio/detail/reactor_op.hpp>
  21. #include <boost/asio/detail/scheduler.hpp>
  22. #include <boost/asio/detail/throw_error.hpp>
  23. #include <boost/asio/error.hpp>
  24. #include <boost/asio/detail/push_options.hpp>
  25. namespace boost {
  26. namespace asio {
  27. namespace detail {
  28. io_uring_service::io_uring_service(boost::asio::execution_context& ctx)
  29. : execution_context_service_base<io_uring_service>(ctx),
  30. scheduler_(use_service<scheduler>(ctx)),
  31. mutex_(config(ctx).get("reactor", "registration_locking", true),
  32. config(ctx).get("reactor", "registration_locking_spin_count", 0)),
  33. outstanding_work_(0),
  34. submit_sqes_op_(this),
  35. pending_sqes_(0),
  36. pending_submit_sqes_op_(false),
  37. shutdown_(false),
  38. io_locking_(config(ctx).get("reactor", "io_locking", true)),
  39. io_locking_spin_count_(
  40. config(ctx).get("reactor", "io_locking_spin_count", 0)),
  41. timeout_(),
  42. registration_mutex_(mutex_.enabled()),
  43. registered_io_objects_(execution_context::allocator<void>(ctx),
  44. config(ctx).get("reactor", "preallocated_io_objects", 0U),
  45. io_locking_, io_locking_spin_count_),
  46. reactor_(use_service<reactor>(ctx)),
  47. reactor_data_(),
  48. event_fd_(-1)
  49. {
  50. reactor_.init_task();
  51. init_ring();
  52. register_with_reactor();
  53. }
  54. io_uring_service::~io_uring_service()
  55. {
  56. if (ring_.ring_fd != -1)
  57. ::io_uring_queue_exit(&ring_);
  58. if (event_fd_ != -1)
  59. ::close(event_fd_);
  60. }
  61. void io_uring_service::shutdown()
  62. {
  63. mutex::scoped_lock lock(mutex_);
  64. shutdown_ = true;
  65. lock.unlock();
  66. op_queue<operation> ops;
  67. // Cancel all outstanding operations.
  68. while (io_object* io_obj = registered_io_objects_.first())
  69. {
  70. for (int i = 0; i < max_ops; ++i)
  71. {
  72. if (!io_obj->queues_[i].op_queue_.empty())
  73. {
  74. ops.push(io_obj->queues_[i].op_queue_);
  75. if (::io_uring_sqe* sqe = get_sqe())
  76. ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
  77. }
  78. }
  79. io_obj->shutdown_ = true;
  80. registered_io_objects_.free(io_obj);
  81. }
  82. // Cancel the timeout operation.
  83. if (::io_uring_sqe* sqe = get_sqe())
  84. ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
  85. submit_sqes();
  86. // Wait for all completions to come back.
  87. for (; outstanding_work_ > 0; --outstanding_work_)
  88. {
  89. ::io_uring_cqe* cqe = 0;
  90. if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
  91. break;
  92. }
  93. timer_queues_.get_all_timers(ops);
  94. scheduler_.abandon_operations(ops);
  95. }
  96. void io_uring_service::notify_fork(
  97. boost::asio::execution_context::fork_event fork_ev)
  98. {
  99. switch (fork_ev)
  100. {
  101. case boost::asio::execution_context::fork_prepare:
  102. {
  103. // Cancel all outstanding operations. They will be restarted
  104. // after the fork completes.
  105. mutex::scoped_lock registration_lock(registration_mutex_);
  106. for (io_object* io_obj = registered_io_objects_.first();
  107. io_obj != 0; io_obj = io_obj->next_)
  108. {
  109. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  110. for (int i = 0; i < max_ops; ++i)
  111. {
  112. if (!io_obj->queues_[i].op_queue_.empty()
  113. && !io_obj->queues_[i].cancel_requested_)
  114. {
  115. mutex::scoped_lock lock(mutex_);
  116. if (::io_uring_sqe* sqe = get_sqe())
  117. ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
  118. }
  119. }
  120. }
  121. // Cancel the timeout operation.
  122. {
  123. mutex::scoped_lock lock(mutex_);
  124. if (::io_uring_sqe* sqe = get_sqe())
  125. ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN);
  126. submit_sqes();
  127. }
  128. // Wait for all completions to come back, and post all completed I/O
  129. // queues to the scheduler. Note that some operations may have already
  130. // completed, or were explicitly cancelled. All others will be
  131. // automatically restarted.
  132. op_queue<operation> ops;
  133. for (; outstanding_work_ > 0; --outstanding_work_)
  134. {
  135. ::io_uring_cqe* cqe = 0;
  136. if (::io_uring_wait_cqe(&ring_, &cqe) != 0)
  137. break;
  138. if (void* ptr = ::io_uring_cqe_get_data(cqe))
  139. {
  140. if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_)
  141. {
  142. io_queue* io_q = static_cast<io_queue*>(ptr);
  143. io_q->set_result(cqe->res);
  144. ops.push(io_q);
  145. }
  146. }
  147. }
  148. scheduler_.post_deferred_completions(ops);
  149. // Restart and eventfd operation.
  150. register_with_reactor();
  151. }
  152. break;
  153. case boost::asio::execution_context::fork_parent:
  154. // Restart the timeout and eventfd operations.
  155. update_timeout();
  156. register_with_reactor();
  157. break;
  158. case boost::asio::execution_context::fork_child:
  159. {
  160. // The child process gets a new io_uring instance.
  161. ::io_uring_queue_exit(&ring_);
  162. init_ring();
  163. register_with_reactor();
  164. }
  165. break;
  166. default:
  167. break;
  168. }
  169. }
  170. void io_uring_service::init_task()
  171. {
  172. scheduler_.init_task();
  173. }
  174. void io_uring_service::register_io_object(
  175. io_uring_service::per_io_object_data& io_obj)
  176. {
  177. io_obj = allocate_io_object();
  178. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  179. io_obj->service_ = this;
  180. io_obj->shutdown_ = false;
  181. for (int i = 0; i < max_ops; ++i)
  182. {
  183. io_obj->queues_[i].io_object_ = io_obj;
  184. io_obj->queues_[i].cancel_requested_ = false;
  185. }
  186. }
  187. void io_uring_service::register_internal_io_object(
  188. io_uring_service::per_io_object_data& io_obj,
  189. int op_type, io_uring_operation* op)
  190. {
  191. io_obj = allocate_io_object();
  192. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  193. io_obj->service_ = this;
  194. io_obj->shutdown_ = false;
  195. for (int i = 0; i < max_ops; ++i)
  196. {
  197. io_obj->queues_[i].io_object_ = io_obj;
  198. io_obj->queues_[i].cancel_requested_ = false;
  199. }
  200. io_obj->queues_[op_type].op_queue_.push(op);
  201. io_object_lock.unlock();
  202. mutex::scoped_lock lock(mutex_);
  203. if (::io_uring_sqe* sqe = get_sqe())
  204. {
  205. op->prepare(sqe);
  206. ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
  207. post_submit_sqes_op(lock);
  208. }
  209. else
  210. {
  211. boost::system::error_code ec(ENOBUFS,
  212. boost::asio::error::get_system_category());
  213. boost::asio::detail::throw_error(ec, "io_uring_get_sqe");
  214. }
  215. }
  216. void io_uring_service::register_buffers(const ::iovec* v, unsigned n)
  217. {
  218. int result = ::io_uring_register_buffers(&ring_, v, n);
  219. if (result < 0)
  220. {
  221. boost::system::error_code ec(-result,
  222. boost::asio::error::get_system_category());
  223. boost::asio::detail::throw_error(ec, "io_uring_register_buffers");
  224. }
  225. }
  226. void io_uring_service::unregister_buffers()
  227. {
  228. (void)::io_uring_unregister_buffers(&ring_);
  229. }
  230. void io_uring_service::start_op(int op_type,
  231. io_uring_service::per_io_object_data& io_obj,
  232. io_uring_operation* op, bool is_continuation)
  233. {
  234. if (!io_obj)
  235. {
  236. op->ec_ = boost::asio::error::bad_descriptor;
  237. post_immediate_completion(op, is_continuation);
  238. return;
  239. }
  240. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  241. if (io_obj->shutdown_)
  242. {
  243. io_object_lock.unlock();
  244. post_immediate_completion(op, is_continuation);
  245. return;
  246. }
  247. if (io_obj->queues_[op_type].op_queue_.empty())
  248. {
  249. if (op->perform(false))
  250. {
  251. io_object_lock.unlock();
  252. scheduler_.post_immediate_completion(op, is_continuation);
  253. }
  254. else
  255. {
  256. io_obj->queues_[op_type].op_queue_.push(op);
  257. io_object_lock.unlock();
  258. mutex::scoped_lock lock(mutex_);
  259. if (::io_uring_sqe* sqe = get_sqe())
  260. {
  261. op->prepare(sqe);
  262. ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]);
  263. scheduler_.work_started();
  264. post_submit_sqes_op(lock);
  265. }
  266. else
  267. {
  268. lock.unlock();
  269. io_obj->queues_[op_type].set_result(-ENOBUFS);
  270. post_immediate_completion(&io_obj->queues_[op_type], is_continuation);
  271. }
  272. }
  273. }
  274. else
  275. {
  276. io_obj->queues_[op_type].op_queue_.push(op);
  277. scheduler_.work_started();
  278. }
  279. }
  280. void io_uring_service::cancel_ops(io_uring_service::per_io_object_data& io_obj)
  281. {
  282. if (!io_obj)
  283. return;
  284. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  285. op_queue<operation> ops;
  286. do_cancel_ops(io_obj, ops);
  287. io_object_lock.unlock();
  288. scheduler_.post_deferred_completions(ops);
  289. }
  290. void io_uring_service::cancel_ops_by_key(
  291. io_uring_service::per_io_object_data& io_obj,
  292. int op_type, void* cancellation_key)
  293. {
  294. if (!io_obj)
  295. return;
  296. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  297. bool first = true;
  298. op_queue<operation> ops;
  299. op_queue<io_uring_operation> other_ops;
  300. while (io_uring_operation* op = io_obj->queues_[op_type].op_queue_.front())
  301. {
  302. io_obj->queues_[op_type].op_queue_.pop();
  303. if (op->cancellation_key_ == cancellation_key)
  304. {
  305. if (first)
  306. {
  307. other_ops.push(op);
  308. if (!io_obj->queues_[op_type].cancel_requested_)
  309. {
  310. io_obj->queues_[op_type].cancel_requested_ = true;
  311. mutex::scoped_lock lock(mutex_);
  312. if (::io_uring_sqe* sqe = get_sqe())
  313. {
  314. ::io_uring_prep_cancel(sqe, &io_obj->queues_[op_type], 0);
  315. submit_sqes();
  316. }
  317. }
  318. }
  319. else
  320. {
  321. op->ec_ = boost::asio::error::operation_aborted;
  322. ops.push(op);
  323. }
  324. }
  325. else
  326. other_ops.push(op);
  327. first = false;
  328. }
  329. io_obj->queues_[op_type].op_queue_.push(other_ops);
  330. io_object_lock.unlock();
  331. scheduler_.post_deferred_completions(ops);
  332. }
  333. void io_uring_service::deregister_io_object(
  334. io_uring_service::per_io_object_data& io_obj)
  335. {
  336. if (!io_obj)
  337. return;
  338. mutex::scoped_lock io_object_lock(io_obj->mutex_);
  339. if (!io_obj->shutdown_)
  340. {
  341. op_queue<operation> ops;
  342. bool pending_cancelled_ops = do_cancel_ops(io_obj, ops);
  343. io_obj->shutdown_ = true;
  344. io_object_lock.unlock();
  345. scheduler_.post_deferred_completions(ops);
  346. if (pending_cancelled_ops)
  347. {
  348. // There are still pending operations. Prevent cleanup_io_object from
  349. // freeing the I/O object and let the last operation to complete free it.
  350. io_obj = 0;
  351. }
  352. else
  353. {
  354. // Leave io_obj set so that it will be freed by the subsequent call to
  355. // cleanup_io_object.
  356. }
  357. }
  358. else
  359. {
  360. // We are shutting down, so prevent cleanup_io_object from freeing
  361. // the I/O object and let the destructor free it instead.
  362. io_obj = 0;
  363. }
  364. }
  365. void io_uring_service::cleanup_io_object(
  366. io_uring_service::per_io_object_data& io_obj)
  367. {
  368. if (io_obj)
  369. {
  370. free_io_object(io_obj);
  371. io_obj = 0;
  372. }
  373. }
  374. void io_uring_service::run(long usec, op_queue<operation>& ops)
  375. {
  376. __kernel_timespec ts;
  377. int local_ops = 0;
  378. if (usec > 0)
  379. {
  380. ts.tv_sec = usec / 1000000;
  381. ts.tv_nsec = (usec % 1000000) * 1000;
  382. mutex::scoped_lock lock(mutex_);
  383. if (::io_uring_sqe* sqe = get_sqe())
  384. {
  385. ++local_ops;
  386. ::io_uring_prep_timeout(sqe, &ts, 0, 0);
  387. ::io_uring_sqe_set_data(sqe, &ts);
  388. submit_sqes();
  389. }
  390. }
  391. ::io_uring_cqe* cqe = 0;
  392. int result = (usec == 0)
  393. ? ::io_uring_peek_cqe(&ring_, &cqe)
  394. : ::io_uring_wait_cqe(&ring_, &cqe);
  395. if (local_ops > 0)
  396. {
  397. if (result != 0 || ::io_uring_cqe_get_data(cqe) != &ts)
  398. {
  399. mutex::scoped_lock lock(mutex_);
  400. if (::io_uring_sqe* sqe = get_sqe())
  401. {
  402. ++local_ops;
  403. ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0);
  404. ::io_uring_sqe_set_data(sqe, &ts);
  405. submit_sqes();
  406. }
  407. }
  408. }
  409. bool check_timers = false;
  410. int count = 0;
  411. while (result == 0 || local_ops > 0)
  412. {
  413. if (result == 0)
  414. {
  415. if (void* ptr = ::io_uring_cqe_get_data(cqe))
  416. {
  417. if (ptr == this)
  418. {
  419. // The io_uring service was interrupted.
  420. }
  421. else if (ptr == &timer_queues_)
  422. {
  423. check_timers = true;
  424. }
  425. else if (ptr == &timeout_)
  426. {
  427. check_timers = true;
  428. timeout_.tv_sec = 0;
  429. timeout_.tv_nsec = 0;
  430. }
  431. else if (ptr == &ts)
  432. {
  433. --local_ops;
  434. }
  435. else
  436. {
  437. io_queue* io_q = static_cast<io_queue*>(ptr);
  438. io_q->set_result(cqe->res);
  439. ops.push(io_q);
  440. }
  441. }
  442. ::io_uring_cqe_seen(&ring_, cqe);
  443. ++count;
  444. }
  445. result = (count < complete_batch_size || local_ops > 0)
  446. ? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN;
  447. }
  448. decrement(outstanding_work_, count);
  449. if (check_timers)
  450. {
  451. mutex::scoped_lock lock(mutex_);
  452. timer_queues_.get_ready_timers(ops);
  453. if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0)
  454. {
  455. timeout_ = get_timeout();
  456. if (::io_uring_sqe* sqe = get_sqe())
  457. {
  458. ::io_uring_prep_timeout(sqe, &timeout_, 0, 0);
  459. ::io_uring_sqe_set_data(sqe, &timeout_);
  460. push_submit_sqes_op(ops);
  461. }
  462. }
  463. }
  464. }
  465. void io_uring_service::interrupt()
  466. {
  467. mutex::scoped_lock lock(mutex_);
  468. if (::io_uring_sqe* sqe = get_sqe())
  469. {
  470. ::io_uring_prep_nop(sqe);
  471. ::io_uring_sqe_set_data(sqe, this);
  472. }
  473. submit_sqes();
  474. }
  475. void io_uring_service::init_ring()
  476. {
  477. int result = ::io_uring_queue_init(ring_size, &ring_, 0);
  478. if (result < 0)
  479. {
  480. ring_.ring_fd = -1;
  481. boost::system::error_code ec(-result,
  482. boost::asio::error::get_system_category());
  483. boost::asio::detail::throw_error(ec, "io_uring_queue_init");
  484. }
  485. #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  486. event_fd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  487. if (event_fd_ < 0)
  488. {
  489. boost::system::error_code ec(-result,
  490. boost::asio::error::get_system_category());
  491. ::io_uring_queue_exit(&ring_);
  492. boost::asio::detail::throw_error(ec, "eventfd");
  493. }
  494. result = ::io_uring_register_eventfd(&ring_, event_fd_);
  495. if (result < 0)
  496. {
  497. ::close(event_fd_);
  498. ::io_uring_queue_exit(&ring_);
  499. boost::system::error_code ec(-result,
  500. boost::asio::error::get_system_category());
  501. boost::asio::detail::throw_error(ec, "io_uring_queue_init");
  502. }
  503. #endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  504. }
  505. #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  506. class io_uring_service::event_fd_read_op :
  507. public reactor_op
  508. {
  509. public:
  510. event_fd_read_op(io_uring_service* s)
  511. : reactor_op(boost::system::error_code(),
  512. &event_fd_read_op::do_perform, event_fd_read_op::do_complete),
  513. service_(s)
  514. {
  515. }
  516. static status do_perform(reactor_op* base)
  517. {
  518. event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
  519. for (;;)
  520. {
  521. // Only perform one read. The kernel maintains an atomic counter.
  522. uint64_t counter(0);
  523. errno = 0;
  524. int bytes_read = ::read(o->service_->event_fd_,
  525. &counter, sizeof(uint64_t));
  526. if (bytes_read < 0 && errno == EINTR)
  527. continue;
  528. break;
  529. }
  530. op_queue<operation> ops;
  531. o->service_->run(0, ops);
  532. o->service_->scheduler_.post_deferred_completions(ops);
  533. return not_done;
  534. }
  535. static void do_complete(void* /*owner*/, operation* base,
  536. const boost::system::error_code& /*ec*/,
  537. std::size_t /*bytes_transferred*/)
  538. {
  539. event_fd_read_op* o(static_cast<event_fd_read_op*>(base));
  540. delete o;
  541. }
  542. private:
  543. io_uring_service* service_;
  544. };
  545. #endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  546. void io_uring_service::register_with_reactor()
  547. {
  548. #if !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  549. reactor_.register_internal_descriptor(reactor::read_op,
  550. event_fd_, reactor_data_, new event_fd_read_op(this));
  551. #endif // !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
  552. }
  553. io_uring_service::io_object* io_uring_service::allocate_io_object()
  554. {
  555. mutex::scoped_lock registration_lock(registration_mutex_);
  556. return registered_io_objects_.alloc(io_locking_, io_locking_spin_count_);
  557. }
  558. void io_uring_service::free_io_object(io_uring_service::io_object* io_obj)
  559. {
  560. mutex::scoped_lock registration_lock(registration_mutex_);
  561. registered_io_objects_.free(io_obj);
  562. }
  563. bool io_uring_service::do_cancel_ops(
  564. per_io_object_data& io_obj, op_queue<operation>& ops)
  565. {
  566. bool cancel_op = false;
  567. for (int i = 0; i < max_ops; ++i)
  568. {
  569. if (io_uring_operation* first_op = io_obj->queues_[i].op_queue_.front())
  570. {
  571. cancel_op = true;
  572. io_obj->queues_[i].op_queue_.pop();
  573. while (io_uring_operation* op = io_obj->queues_[i].op_queue_.front())
  574. {
  575. op->ec_ = boost::asio::error::operation_aborted;
  576. io_obj->queues_[i].op_queue_.pop();
  577. ops.push(op);
  578. }
  579. io_obj->queues_[i].op_queue_.push(first_op);
  580. }
  581. }
  582. if (cancel_op)
  583. {
  584. mutex::scoped_lock lock(mutex_);
  585. for (int i = 0; i < max_ops; ++i)
  586. {
  587. if (!io_obj->queues_[i].op_queue_.empty()
  588. && !io_obj->queues_[i].cancel_requested_)
  589. {
  590. io_obj->queues_[i].cancel_requested_ = true;
  591. if (::io_uring_sqe* sqe = get_sqe())
  592. ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0);
  593. }
  594. }
  595. submit_sqes();
  596. }
  597. return cancel_op;
  598. }
  599. void io_uring_service::do_add_timer_queue(timer_queue_base& queue)
  600. {
  601. mutex::scoped_lock lock(mutex_);
  602. timer_queues_.insert(&queue);
  603. }
  604. void io_uring_service::do_remove_timer_queue(timer_queue_base& queue)
  605. {
  606. mutex::scoped_lock lock(mutex_);
  607. timer_queues_.erase(&queue);
  608. }
  609. void io_uring_service::update_timeout()
  610. {
  611. if (::io_uring_sqe* sqe = get_sqe())
  612. {
  613. ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0);
  614. ::io_uring_sqe_set_data(sqe, &timer_queues_);
  615. }
  616. }
  617. __kernel_timespec io_uring_service::get_timeout() const
  618. {
  619. __kernel_timespec ts;
  620. long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
  621. ts.tv_sec = usec / 1000000;
  622. ts.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
  623. return ts;
  624. }
  625. ::io_uring_sqe* io_uring_service::get_sqe()
  626. {
  627. ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_);
  628. if (!sqe)
  629. {
  630. submit_sqes();
  631. sqe = ::io_uring_get_sqe(&ring_);
  632. }
  633. if (sqe)
  634. {
  635. ::io_uring_sqe_set_data(sqe, 0);
  636. ++pending_sqes_;
  637. }
  638. return sqe;
  639. }
  640. void io_uring_service::submit_sqes()
  641. {
  642. if (pending_sqes_ != 0)
  643. {
  644. int result = ::io_uring_submit(&ring_);
  645. if (result > 0)
  646. {
  647. pending_sqes_ -= result;
  648. increment(outstanding_work_, result);
  649. }
  650. }
  651. }
  652. void io_uring_service::post_submit_sqes_op(mutex::scoped_lock& lock)
  653. {
  654. if (pending_sqes_ >= submit_batch_size)
  655. {
  656. submit_sqes();
  657. }
  658. else if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
  659. {
  660. pending_submit_sqes_op_ = true;
  661. lock.unlock();
  662. scheduler_.post_immediate_completion(&submit_sqes_op_, false);
  663. }
  664. }
  665. void io_uring_service::push_submit_sqes_op(op_queue<operation>& ops)
  666. {
  667. if (pending_sqes_ != 0 && !pending_submit_sqes_op_)
  668. {
  669. pending_submit_sqes_op_ = true;
  670. ops.push(&submit_sqes_op_);
  671. scheduler_.compensating_work_started();
  672. }
  673. }
  674. io_uring_service::submit_sqes_op::submit_sqes_op(io_uring_service* s)
  675. : operation(&io_uring_service::submit_sqes_op::do_complete),
  676. service_(s)
  677. {
  678. }
  679. void io_uring_service::submit_sqes_op::do_complete(void* owner, operation* base,
  680. const boost::system::error_code& /*ec*/, std::size_t /*bytes_transferred*/)
  681. {
  682. if (owner)
  683. {
  684. submit_sqes_op* o = static_cast<submit_sqes_op*>(base);
  685. mutex::scoped_lock lock(o->service_->mutex_);
  686. o->service_->submit_sqes();
  687. if (o->service_->pending_sqes_ != 0)
  688. o->service_->scheduler_.post_immediate_completion(o, true);
  689. else
  690. o->service_->pending_submit_sqes_op_ = false;
  691. }
  692. }
  693. io_uring_service::io_queue::io_queue()
  694. : operation(&io_uring_service::io_queue::do_complete)
  695. {
  696. }
  697. struct io_uring_service::perform_io_cleanup_on_block_exit
  698. {
  699. explicit perform_io_cleanup_on_block_exit(io_uring_service* s)
  700. : service_(s), io_object_to_free_(0), first_op_(0)
  701. {
  702. }
  703. ~perform_io_cleanup_on_block_exit()
  704. {
  705. if (io_object_to_free_)
  706. {
  707. mutex::scoped_lock lock(service_->mutex_);
  708. service_->free_io_object(io_object_to_free_);
  709. }
  710. if (first_op_)
  711. {
  712. // Post the remaining completed operations for invocation.
  713. if (!ops_.empty())
  714. service_->scheduler_.post_deferred_completions(ops_);
  715. // A user-initiated operation has completed, but there's no need to
  716. // explicitly call work_finished() here. Instead, we'll take advantage of
  717. // the fact that the scheduler will call work_finished() once we return.
  718. }
  719. else
  720. {
  721. // No user-initiated operations have completed, so we need to compensate
  722. // for the work_finished() call that the scheduler will make once this
  723. // operation returns.
  724. service_->scheduler_.compensating_work_started();
  725. }
  726. }
  727. io_uring_service* service_;
  728. io_object* io_object_to_free_;
  729. op_queue<operation> ops_;
  730. operation* first_op_;
  731. };
  732. operation* io_uring_service::io_queue::perform_io(int result)
  733. {
  734. perform_io_cleanup_on_block_exit io_cleanup(io_object_->service_);
  735. mutex::scoped_lock io_object_lock(io_object_->mutex_);
  736. if (result != -ECANCELED || cancel_requested_)
  737. {
  738. if (io_uring_operation* op = op_queue_.front())
  739. {
  740. if (result < 0)
  741. {
  742. op->ec_.assign(-result, boost::asio::error::get_system_category());
  743. op->bytes_transferred_ = 0;
  744. }
  745. else
  746. {
  747. op->ec_.assign(0, op->ec_.category());
  748. op->bytes_transferred_ = static_cast<std::size_t>(result);
  749. }
  750. }
  751. while (io_uring_operation* op = op_queue_.front())
  752. {
  753. if (op->perform(io_cleanup.ops_.empty()))
  754. {
  755. op_queue_.pop();
  756. io_cleanup.ops_.push(op);
  757. }
  758. else
  759. break;
  760. }
  761. }
  762. cancel_requested_ = false;
  763. if (!op_queue_.empty())
  764. {
  765. io_uring_service* service = io_object_->service_;
  766. mutex::scoped_lock lock(service->mutex_);
  767. if (::io_uring_sqe* sqe = service->get_sqe())
  768. {
  769. op_queue_.front()->prepare(sqe);
  770. ::io_uring_sqe_set_data(sqe, this);
  771. service->post_submit_sqes_op(lock);
  772. }
  773. else
  774. {
  775. lock.unlock();
  776. while (io_uring_operation* op = op_queue_.front())
  777. {
  778. op->ec_ = boost::asio::error::no_buffer_space;
  779. op_queue_.pop();
  780. io_cleanup.ops_.push(op);
  781. }
  782. }
  783. }
  784. // The last operation to complete on a shut down object must free it.
  785. if (io_object_->shutdown_)
  786. {
  787. io_cleanup.io_object_to_free_ = io_object_;
  788. for (int i = 0; i < max_ops; ++i)
  789. if (!io_object_->queues_[i].op_queue_.empty())
  790. io_cleanup.io_object_to_free_ = 0;
  791. }
  792. // The first operation will be returned for completion now. The others will
  793. // be posted for later by the io_cleanup object's destructor.
  794. io_cleanup.first_op_ = io_cleanup.ops_.front();
  795. io_cleanup.ops_.pop();
  796. return io_cleanup.first_op_;
  797. }
  798. void io_uring_service::io_queue::do_complete(void* owner, operation* base,
  799. const boost::system::error_code& ec, std::size_t bytes_transferred)
  800. {
  801. if (owner)
  802. {
  803. io_queue* io_q = static_cast<io_queue*>(base);
  804. int result = static_cast<int>(bytes_transferred);
  805. if (operation* op = io_q->perform_io(result))
  806. {
  807. op->complete(owner, ec, 0);
  808. }
  809. }
  810. }
  811. io_uring_service::io_object::io_object(bool locking, int spin_count)
  812. : mutex_(locking, spin_count)
  813. {
  814. }
  815. } // namespace detail
  816. } // namespace asio
  817. } // namespace boost
  818. #include <boost/asio/detail/pop_options.hpp>
  819. #endif // defined(BOOST_ASIO_HAS_IO_URING)
  820. #endif // BOOST_ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP