sync_timed_queue.hpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. // Copyright (C) 2014 Ian Forbed
  2. // Copyright (C) 2014-2017 Vicente J. Botet Escriba
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
  8. #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
  9. #include <boost/thread/detail/config.hpp>
  10. #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
  11. #include <boost/chrono/duration.hpp>
  12. #include <boost/chrono/time_point.hpp>
  13. #include <boost/chrono/system_clocks.hpp>
  14. #include <boost/chrono/chrono_io.hpp>
  15. #include <boost/config/abi_prefix.hpp>
  16. namespace boost
  17. {
  18. namespace concurrent
  19. {
  20. namespace detail
  21. {
  22. // fixme: shouldn't the timepoint be configurable
  23. template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
  24. struct scheduled_type
  25. {
  26. typedef T value_type;
  27. typedef Clock clock;
  28. typedef TimePoint time_point;
  29. T data;
  30. time_point time;
  31. BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
  32. scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
  33. scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
  34. scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
  35. scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
  36. data = other.data;
  37. time = other.time;
  38. return *this;
  39. }
  40. scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
  41. scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
  42. data = boost::move(other.data);
  43. time = other.time;
  44. return *this;
  45. }
  46. bool operator <(const scheduled_type & other) const
  47. {
  48. return this->time > other.time;
  49. }
  50. }; //end struct
  51. } //end detail namespace
  52. template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
  53. class sync_timed_queue
  54. : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
  55. {
  56. typedef detail::scheduled_type<T, Clock, TimePoint> stype;
  57. typedef sync_priority_queue<stype> super;
  58. public:
  59. typedef T value_type;
  60. typedef Clock clock;
  61. typedef typename clock::duration duration;
  62. typedef typename clock::time_point time_point;
  63. typedef typename super::underlying_queue_type underlying_queue_type;
  64. typedef typename super::size_type size_type;
  65. typedef typename super::op_status op_status;
  66. sync_timed_queue() : super() {};
  67. ~sync_timed_queue() {}
  68. using super::size;
  69. using super::empty;
  70. using super::full;
  71. using super::close;
  72. using super::closed;
  73. T pull();
  74. void pull(T& elem);
  75. template <class WClock, class Duration>
  76. queue_op_status pull_until(chrono::time_point<WClock,Duration> const& tp, T& elem);
  77. template <class Rep, class Period>
  78. queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
  79. queue_op_status try_pull(T& elem);
  80. queue_op_status wait_pull(T& elem);
  81. queue_op_status nonblocking_pull(T& elem);
  82. template <class Duration>
  83. void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
  84. template <class Rep, class Period>
  85. void push(const T& elem, chrono::duration<Rep,Period> const& dura);
  86. template <class Duration>
  87. void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
  88. template <class Rep, class Period>
  89. void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
  90. template <class Duration>
  91. queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
  92. template <class Rep, class Period>
  93. queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
  94. template <class Duration>
  95. queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
  96. template <class Rep, class Period>
  97. queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
  98. private:
  99. inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
  100. inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
  101. bool wait_to_pull(unique_lock<mutex>&);
  102. template <class WClock, class Duration>
  103. queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp);
  104. T pull(unique_lock<mutex>&);
  105. T pull(lock_guard<mutex>&);
  106. void pull(unique_lock<mutex>&, T& elem);
  107. void pull(lock_guard<mutex>&, T& elem);
  108. queue_op_status try_pull(unique_lock<mutex>&, T& elem);
  109. queue_op_status try_pull(lock_guard<mutex>&, T& elem);
  110. queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
  111. sync_timed_queue(const sync_timed_queue&);
  112. sync_timed_queue& operator=(const sync_timed_queue&);
  113. sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
  114. sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
  115. }; //end class
  116. template <class T, class Clock, class TimePoint>
  117. template <class Duration>
  118. void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
  119. {
  120. super::push(stype(elem,tp));
  121. }
  122. template <class T, class Clock, class TimePoint>
  123. template <class Rep, class Period>
  124. void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
  125. {
  126. push(elem, clock::now() + dura);
  127. }
  128. template <class T, class Clock, class TimePoint>
  129. template <class Duration>
  130. void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
  131. {
  132. super::push(stype(boost::move(elem),tp));
  133. }
  134. template <class T, class Clock, class TimePoint>
  135. template <class Rep, class Period>
  136. void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
  137. {
  138. push(boost::move(elem), clock::now() + dura);
  139. }
  140. template <class T, class Clock, class TimePoint>
  141. template <class Duration>
  142. queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
  143. {
  144. return super::try_push(stype(elem,tp));
  145. }
  146. template <class T, class Clock, class TimePoint>
  147. template <class Rep, class Period>
  148. queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
  149. {
  150. return try_push(elem,clock::now() + dura);
  151. }
  152. template <class T, class Clock, class TimePoint>
  153. template <class Duration>
  154. queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
  155. {
  156. return super::try_push(stype(boost::move(elem), tp));
  157. }
  158. template <class T, class Clock, class TimePoint>
  159. template <class Rep, class Period>
  160. queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
  161. {
  162. return try_push(boost::move(elem), clock::now() + dura);
  163. }
  164. ///////////////////////////
  165. template <class T, class Clock, class TimePoint>
  166. bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
  167. {
  168. return ! super::empty(lk) && clock::now() >= super::data_.top().time;
  169. }
  170. template <class T, class Clock, class TimePoint>
  171. bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
  172. {
  173. return ! super::empty(lk) && clock::now() >= super::data_.top().time;
  174. }
  175. ///////////////////////////
  176. template <class T, class Clock, class TimePoint>
  177. bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
  178. {
  179. for (;;)
  180. {
  181. if (not_empty_and_time_reached(lk)) return false; // success
  182. if (super::closed(lk)) return true; // closed
  183. super::wait_until_not_empty_or_closed(lk);
  184. if (not_empty_and_time_reached(lk)) return false; // success
  185. if (super::closed(lk)) return true; // closed
  186. const time_point tp(super::data_.top().time);
  187. super::wait_until_closed_until(lk, tp);
  188. }
  189. }
  190. template <class T, class Clock, class TimePoint>
  191. template <class WClock, class Duration>
  192. queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp)
  193. {
  194. for (;;)
  195. {
  196. if (not_empty_and_time_reached(lk)) return queue_op_status::success;
  197. if (super::closed(lk)) return queue_op_status::closed;
  198. if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
  199. super::wait_until_not_empty_or_closed_until(lk, tp);
  200. if (not_empty_and_time_reached(lk)) return queue_op_status::success;
  201. if (super::closed(lk)) return queue_op_status::closed;
  202. if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
  203. const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time);
  204. super::wait_until_closed_until(lk, tpmin);
  205. }
  206. }
  207. ///////////////////////////
  208. template <class T, class Clock, class TimePoint>
  209. T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
  210. {
  211. #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
  212. return boost::move(super::data_.pull().data);
  213. #else
  214. return super::data_.pull().data;
  215. #endif
  216. }
  217. template <class T, class Clock, class TimePoint>
  218. T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
  219. {
  220. #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
  221. return boost::move(super::data_.pull().data);
  222. #else
  223. return super::data_.pull().data;
  224. #endif
  225. }
  226. template <class T, class Clock, class TimePoint>
  227. T sync_timed_queue<T, Clock, TimePoint>::pull()
  228. {
  229. unique_lock<mutex> lk(super::mtx_);
  230. const bool has_been_closed = wait_to_pull(lk);
  231. if (has_been_closed) super::throw_if_closed(lk);
  232. return pull(lk);
  233. }
  234. ///////////////////////////
  235. template <class T, class Clock, class TimePoint>
  236. void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
  237. {
  238. #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
  239. elem = boost::move(super::data_.pull().data);
  240. #else
  241. elem = super::data_.pull().data;
  242. #endif
  243. }
  244. template <class T, class Clock, class TimePoint>
  245. void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
  246. {
  247. #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
  248. elem = boost::move(super::data_.pull().data);
  249. #else
  250. elem = super::data_.pull().data;
  251. #endif
  252. }
  253. template <class T, class Clock, class TimePoint>
  254. void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
  255. {
  256. unique_lock<mutex> lk(super::mtx_);
  257. const bool has_been_closed = wait_to_pull(lk);
  258. if (has_been_closed) super::throw_if_closed(lk);
  259. pull(lk, elem);
  260. }
  261. //////////////////////
  262. template <class T, class Clock, class TimePoint>
  263. template <class WClock, class Duration>
  264. queue_op_status
  265. sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem)
  266. {
  267. unique_lock<mutex> lk(super::mtx_);
  268. const queue_op_status rc = wait_to_pull_until(lk, tp);
  269. if (rc == queue_op_status::success) pull(lk, elem);
  270. return rc;
  271. }
  272. //////////////////////
  273. template <class T, class Clock, class TimePoint>
  274. template <class Rep, class Period>
  275. queue_op_status
  276. sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
  277. {
  278. return pull_until(chrono::steady_clock::now() + dura, elem);
  279. }
  280. ///////////////////////////
  281. template <class T, class Clock, class TimePoint>
  282. queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
  283. {
  284. if (not_empty_and_time_reached(lk))
  285. {
  286. pull(lk, elem);
  287. return queue_op_status::success;
  288. }
  289. if (super::closed(lk)) return queue_op_status::closed;
  290. if (super::empty(lk)) return queue_op_status::empty;
  291. return queue_op_status::not_ready;
  292. }
  293. template <class T, class Clock, class TimePoint>
  294. queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
  295. {
  296. if (not_empty_and_time_reached(lk))
  297. {
  298. pull(lk, elem);
  299. return queue_op_status::success;
  300. }
  301. if (super::closed(lk)) return queue_op_status::closed;
  302. if (super::empty(lk)) return queue_op_status::empty;
  303. return queue_op_status::not_ready;
  304. }
  305. template <class T, class Clock, class TimePoint>
  306. queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
  307. {
  308. lock_guard<mutex> lk(super::mtx_);
  309. return try_pull(lk, elem);
  310. }
  311. ///////////////////////////
  312. template <class T, class Clock, class TimePoint>
  313. queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
  314. {
  315. const bool has_been_closed = wait_to_pull(lk);
  316. if (has_been_closed) return queue_op_status::closed;
  317. pull(lk, elem);
  318. return queue_op_status::success;
  319. }
  320. template <class T, class Clock, class TimePoint>
  321. queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
  322. {
  323. unique_lock<mutex> lk(super::mtx_);
  324. return wait_pull(lk, elem);
  325. }
  326. ///////////////////////////
  327. template <class T, class Clock, class TimePoint>
  328. queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
  329. {
  330. unique_lock<mutex> lk(super::mtx_, try_to_lock);
  331. if (! lk.owns_lock()) return queue_op_status::busy;
  332. return try_pull(lk, elem);
  333. }
  334. } //end concurrent namespace
  335. using concurrent::sync_timed_queue;
  336. } //end boost namespace
  337. #include <boost/config/abi_suffix.hpp>
  338. #endif