loop_executor.hpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Copyright (C) 2013,2014 Vicente J. Botet Escriba
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. // 2013/11 Vicente J. Botet Escriba
  7. // first implementation of a simple user scheduler.
  8. // 2013/11 Vicente J. Botet Escriba
  9. // rename loop_executor.
  10. #ifndef BOOST_THREAD_EXECUTORS_LOOP_EXECUTOR_HPP
  11. #define BOOST_THREAD_EXECUTORS_LOOP_EXECUTOR_HPP
  12. #include <boost/thread/detail/config.hpp>
  13. #include <boost/thread/detail/delete.hpp>
  14. #include <boost/thread/detail/move.hpp>
  15. #include <boost/thread/concurrent_queues/sync_queue.hpp>
  16. #include <boost/thread/executors/work.hpp>
  17. #include <boost/assert.hpp>
  18. #include <boost/config/abi_prefix.hpp>
  19. namespace boost
  20. {
  21. namespace executors
  22. {
  23. class loop_executor
  24. {
  25. public:
  26. /// type-erasure to store the works to do
  27. typedef executors::work work;
  28. private:
  29. /// the thread safe work queue
  30. concurrent::sync_queue<work > work_queue;
  31. public:
  32. /**
  33. * Effects: try to execute one task.
  34. * Returns: whether a task has been executed.
  35. * Throws: whatever the current task constructor throws or the task() throws.
  36. */
  37. bool try_executing_one()
  38. {
  39. return execute_one(/*wait:*/false);
  40. }
  41. private:
  42. /**
  43. * Effects: Execute one task.
  44. * Remark: If wait is true, waits until a task is available or the executor
  45. * is closed. If wait is false, returns false immediately if no
  46. * task is available.
  47. * Returns: whether a task has been executed (if wait is true, only returns false if closed).
  48. * Throws: whatever the current task constructor throws or the task() throws.
  49. */
  50. bool execute_one(bool wait)
  51. {
  52. work task;
  53. try
  54. {
  55. queue_op_status status = wait ?
  56. work_queue.wait_pull(task) :
  57. work_queue.try_pull(task);
  58. if (status == queue_op_status::success)
  59. {
  60. task();
  61. return true;
  62. }
  63. BOOST_ASSERT(!wait || status == queue_op_status::closed);
  64. return false;
  65. }
  66. catch (...)
  67. {
  68. std::terminate();
  69. //return false;
  70. }
  71. }
  72. public:
  73. /// loop_executor is not copyable.
  74. BOOST_THREAD_NO_COPYABLE(loop_executor)
  75. /**
  76. * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
  77. *
  78. * \b Throws: Whatever exception is thrown while initializing the needed resources.
  79. */
  80. loop_executor()
  81. {
  82. }
  83. /**
  84. * \b Effects: Destroys the thread pool.
  85. *
  86. * \b Synchronization: The completion of all the closures happen before the completion of the \c loop_executor destructor.
  87. */
  88. ~loop_executor()
  89. {
  90. // signal to all the worker thread that there will be no more submissions.
  91. close();
  92. }
  93. /**
  94. * The main loop of the worker thread
  95. */
  96. void loop()
  97. {
  98. while (execute_one(/*wait:*/true))
  99. {
  100. }
  101. BOOST_ASSERT(closed());
  102. while (try_executing_one())
  103. {
  104. }
  105. }
  106. /**
  107. * \b Effects: close the \c loop_executor for submissions.
  108. * The loop will work until there is no more closures to run.
  109. */
  110. void close()
  111. {
  112. work_queue.close();
  113. }
  114. /**
  115. * \b Returns: whether the pool is closed for submissions.
  116. */
  117. bool closed()
  118. {
  119. return work_queue.closed();
  120. }
  121. /**
  122. * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
  123. *
  124. * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
  125. * If invoked closure throws an exception the \c loop_executor will call \c std::terminate, as is the case with threads.
  126. *
  127. * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
  128. *
  129. * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
  130. * Whatever exception that can be throw while storing the closure.
  131. */
  132. void submit(BOOST_THREAD_RV_REF(work) closure) {
  133. work_queue.push(boost::move(closure));
  134. }
  135. #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
  136. template <typename Closure>
  137. void submit(Closure & closure)
  138. {
  139. submit(work(closure));
  140. }
  141. #endif
  142. void submit(void (*closure)())
  143. {
  144. submit(work(closure));
  145. }
  146. template <typename Closure>
  147. void submit(BOOST_THREAD_FWD_REF(Closure) closure)
  148. {
  149. //work_queue.push(work(boost::forward<Closure>(closure)));
  150. work w((boost::forward<Closure>(closure)));
  151. submit(boost::move(w));
  152. }
  153. /**
  154. * \b Requires: This must be called from an scheduled task.
  155. *
  156. * \b Effects: reschedule functions until pred()
  157. */
  158. template <typename Pred>
  159. bool reschedule_until(Pred const& pred)
  160. {
  161. do {
  162. if ( ! try_executing_one())
  163. {
  164. return false;
  165. }
  166. } while (! pred());
  167. return true;
  168. }
  169. /**
  170. * run queued closures
  171. */
  172. void run_queued_closures()
  173. {
  174. sync_queue<work>::underlying_queue_type q = work_queue.underlying_queue();
  175. while (! q.empty())
  176. {
  177. work& task = q.front();
  178. task();
  179. q.pop_front();
  180. }
  181. }
  182. };
  183. }
  184. using executors::loop_executor;
  185. }
  186. #include <boost/config/abi_suffix.hpp>
  187. #endif