basic_thread_pool.hpp 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. // Copyright (C) 2013-2014 Vicente J. Botet Escriba
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. // 2013/09 Vicente J. Botet Escriba
  7. // Adapt to boost from CCIA C++11 implementation
  8. // first implementation of a simple pool thread using a vector of threads and a sync_queue.
  9. #ifndef BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
  10. #define BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
  11. #include <boost/thread/detail/config.hpp>
  12. #include <boost/thread/detail/delete.hpp>
  13. #include <boost/thread/detail/move.hpp>
  14. #include <boost/thread/thread.hpp>
  15. #include <boost/thread/concurrent_queues/sync_queue.hpp>
  16. #include <boost/thread/executors/work.hpp>
  17. #include <boost/thread/csbl/vector.hpp>
  18. #include <boost/config/abi_prefix.hpp>
  19. namespace boost
  20. {
  21. namespace executors
  22. {
  23. class basic_thread_pool
  24. {
  25. public:
  26. /// type-erasure to store the works to do
  27. typedef executors::work work;
  28. private:
  29. typedef thread thread_t;
  30. /// A move aware vector type
  31. typedef csbl::vector<thread_t> thread_vector;
  32. /// A move aware vector
  33. thread_vector threads;
  34. /// the thread safe work queue
  35. concurrent::sync_queue<work > work_queue;
  36. public:
  37. /**
  38. * Effects: try to execute one task.
  39. * Returns: whether a task has been executed.
  40. * Throws: whatever the current task constructor throws or the task() throws.
  41. */
  42. bool try_executing_one()
  43. {
  44. try
  45. {
  46. work task;
  47. if (work_queue.try_pull(task) == queue_op_status::success)
  48. {
  49. task();
  50. return true;
  51. }
  52. return false;
  53. }
  54. catch (...)
  55. {
  56. std::terminate();
  57. //return false;
  58. }
  59. }
  60. /**
  61. * Effects: schedule one task or yields
  62. * Throws: whatever the current task constructor throws or the task() throws.
  63. */
  64. void schedule_one_or_yield()
  65. {
  66. if ( ! try_executing_one())
  67. {
  68. this_thread::yield();
  69. }
  70. }
  71. private:
  72. /**
  73. * The main loop of the worker threads
  74. */
  75. void worker_thread()
  76. {
  77. try
  78. {
  79. for(;;)
  80. {
  81. work task;
  82. try
  83. {
  84. queue_op_status st = work_queue.wait_pull(task);
  85. if (st == queue_op_status::closed) {
  86. return;
  87. }
  88. task();
  89. }
  90. catch (boost::thread_interrupted&)
  91. {
  92. return;
  93. }
  94. }
  95. }
  96. catch (...)
  97. {
  98. std::terminate();
  99. return;
  100. }
  101. }
  102. #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
  103. template <class AtThreadEntry>
  104. void worker_thread1(AtThreadEntry& at_thread_entry)
  105. {
  106. at_thread_entry(*this);
  107. worker_thread();
  108. }
  109. #endif
  110. void worker_thread2(void(*at_thread_entry)(basic_thread_pool&))
  111. {
  112. at_thread_entry(*this);
  113. worker_thread();
  114. }
  115. template <class AtThreadEntry>
  116. void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
  117. {
  118. at_thread_entry(*this);
  119. worker_thread();
  120. }
  121. static void do_nothing_at_thread_entry(basic_thread_pool&) {}
  122. public:
  123. /// basic_thread_pool is not copyable.
  124. BOOST_THREAD_NO_COPYABLE(basic_thread_pool)
  125. /**
  126. * \b Effects: creates a thread pool that runs closures on \c thread_count threads.
  127. *
  128. * \b Throws: Whatever exception is thrown while initializing the needed resources.
  129. */
  130. basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
  131. {
  132. try
  133. {
  134. threads.reserve(thread_count);
  135. for (unsigned i = 0; i < thread_count; ++i)
  136. {
  137. #if 1
  138. thread th (&basic_thread_pool::worker_thread, this);
  139. threads.push_back(thread_t(boost::move(th)));
  140. #else
  141. threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
  142. #endif
  143. }
  144. }
  145. catch (...)
  146. {
  147. close();
  148. throw;
  149. }
  150. }
  151. /**
  152. * \b Effects: creates a thread pool that runs closures on \c thread_count threads
  153. * and executes the at_thread_entry function at the entry of each created thread. .
  154. *
  155. * \b Throws: Whatever exception is thrown while initializing the needed resources.
  156. */
  157. #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
  158. template <class AtThreadEntry>
  159. basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry)
  160. {
  161. try
  162. {
  163. threads.reserve(thread_count);
  164. for (unsigned i = 0; i < thread_count; ++i)
  165. {
  166. thread th (&basic_thread_pool::worker_thread1<AtThreadEntry>, this, at_thread_entry);
  167. threads.push_back(thread_t(boost::move(th)));
  168. //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
  169. }
  170. }
  171. catch (...)
  172. {
  173. close();
  174. throw;
  175. }
  176. }
  177. #endif
  178. basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&))
  179. {
  180. try
  181. {
  182. threads.reserve(thread_count);
  183. for (unsigned i = 0; i < thread_count; ++i)
  184. {
  185. thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry);
  186. threads.push_back(thread_t(boost::move(th)));
  187. //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
  188. }
  189. }
  190. catch (...)
  191. {
  192. close();
  193. throw;
  194. }
  195. }
  196. template <class AtThreadEntry>
  197. basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
  198. {
  199. try
  200. {
  201. threads.reserve(thread_count);
  202. for (unsigned i = 0; i < thread_count; ++i)
  203. {
  204. thread th (&basic_thread_pool::worker_thread3<AtThreadEntry>, this, boost::forward<AtThreadEntry>(at_thread_entry));
  205. threads.push_back(thread_t(boost::move(th)));
  206. //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
  207. }
  208. }
  209. catch (...)
  210. {
  211. close();
  212. throw;
  213. }
  214. }
  215. /**
  216. * \b Effects: Destroys the thread pool.
  217. *
  218. * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
  219. */
  220. ~basic_thread_pool()
  221. {
  222. // signal to all the worker threads that there will be no more submissions.
  223. close();
  224. // joins all the threads before destroying the thread pool resources (e.g. the queue).
  225. join();
  226. }
  227. /**
  228. * \b Effects: join all the threads.
  229. */
  230. void join()
  231. {
  232. for (unsigned i = 0; i < threads.size(); ++i)
  233. {
  234. threads[i].interrupt();
  235. threads[i].join();
  236. }
  237. }
  238. /**
  239. * \b Effects: close the \c basic_thread_pool for submissions.
  240. * The worker threads will work until there is no more closures to run.
  241. */
  242. void close()
  243. {
  244. work_queue.close();
  245. }
  246. /**
  247. * \b Returns: whether the pool is closed for submissions.
  248. */
  249. bool closed()
  250. {
  251. return work_queue.closed();
  252. }
  253. /**
  254. * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
  255. *
  256. * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
  257. * If invoked closure throws an exception the \c basic_thread_pool will call \c std::terminate, as is the case with threads.
  258. *
  259. * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
  260. *
  261. * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
  262. * Whatever exception that can be throw while storing the closure.
  263. */
  264. void submit(BOOST_THREAD_RV_REF(work) closure) {
  265. work_queue.push(boost::move(closure));
  266. }
  267. #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
  268. template <typename Closure>
  269. void submit(Closure & closure)
  270. {
  271. submit(work(closure));
  272. }
  273. #endif
  274. void submit(void (*closure)())
  275. {
  276. submit(work(closure));
  277. }
  278. template <typename Closure>
  279. void submit(BOOST_THREAD_FWD_REF(Closure) closure)
  280. {
  281. //submit(work(boost::forward<Closure>(closure)));
  282. work w((boost::forward<Closure>(closure)));
  283. submit(boost::move(w));
  284. }
  285. /**
  286. * \b Requires: This must be called from an scheduled task.
  287. *
  288. * \b Effects: reschedule functions until pred()
  289. */
  290. template <typename Pred>
  291. bool reschedule_until(Pred const& pred)
  292. {
  293. do {
  294. if ( ! try_executing_one())
  295. {
  296. return false;
  297. }
  298. } while (! pred());
  299. return true;
  300. }
  301. };
  302. }
  303. using executors::basic_thread_pool;
  304. }
  305. #include <boost/config/abi_suffix.hpp>
  306. #endif