async_frontend.hpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. /*
  2. * Copyright Andrey Semashev 2007 - 2015.
  3. * Distributed under the Boost Software License, Version 1.0.
  4. * (See accompanying file LICENSE_1_0.txt or copy at
  5. * http://www.boost.org/LICENSE_1_0.txt)
  6. */
  7. /*!
  8. * \file async_frontend.hpp
  9. * \author Andrey Semashev
  10. * \date 14.07.2009
  11. *
  12. * The header contains implementation of asynchronous sink frontend.
  13. */
  14. #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
  15. #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
  16. #include <thread>
  17. #include <mutex>
  18. #include <condition_variable>
  19. #include <exception> // std::terminate
  20. #include <boost/log/detail/config.hpp>
  21. #ifdef BOOST_HAS_PRAGMA_ONCE
  22. #pragma once
  23. #endif
  24. #if defined(BOOST_LOG_NO_THREADS)
  25. #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
  26. #endif
  27. #include <boost/memory_order.hpp>
  28. #include <boost/atomic/atomic.hpp>
  29. #include <boost/smart_ptr/shared_ptr.hpp>
  30. #include <boost/smart_ptr/make_shared_object.hpp>
  31. #include <boost/preprocessor/control/if.hpp>
  32. #include <boost/preprocessor/comparison/equal.hpp>
  33. #include <boost/log/exceptions.hpp>
  34. #include <boost/log/detail/locking_ptr.hpp>
  35. #include <boost/log/detail/parameter_tools.hpp>
  36. #include <boost/log/core/record_view.hpp>
  37. #include <boost/log/sinks/basic_sink_frontend.hpp>
  38. #include <boost/log/sinks/frontend_requirements.hpp>
  39. #include <boost/log/sinks/unbounded_fifo_queue.hpp>
  40. #include <boost/log/keywords/start_thread.hpp>
  41. #include <boost/log/detail/header.hpp>
  42. namespace boost {
  43. BOOST_LOG_OPEN_NAMESPACE
  44. namespace sinks {
  45. #ifndef BOOST_LOG_DOXYGEN_PASS
  46. #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
  47. template< typename T0 >\
  48. explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
  49. base_type(true),\
  50. queue_base_type(arg0),\
  51. m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
  52. m_ActiveOperation(idle),\
  53. m_StopRequested(false),\
  54. m_FlushRequested(false)\
  55. {\
  56. if (arg0[keywords::start_thread | true])\
  57. start_feeding_thread();\
  58. }\
  59. template< typename T0 >\
  60. explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
  61. base_type(true),\
  62. queue_base_type(arg0),\
  63. m_pBackend(backend),\
  64. m_ActiveOperation(idle),\
  65. m_StopRequested(false),\
  66. m_FlushRequested(false)\
  67. {\
  68. if (arg0[keywords::start_thread | true])\
  69. start_feeding_thread();\
  70. }
  71. #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
  72. template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
  73. explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
  74. base_type(true),\
  75. queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
  76. m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
  77. m_ActiveOperation(idle),\
  78. m_StopRequested(false),\
  79. m_FlushRequested(false)\
  80. {\
  81. if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
  82. start_feeding_thread();\
  83. }\
  84. template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
  85. explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
  86. base_type(true),\
  87. queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
  88. m_pBackend(backend),\
  89. m_ActiveOperation(idle),\
  90. m_StopRequested(false),\
  91. m_FlushRequested(false)\
  92. {\
  93. if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
  94. start_feeding_thread();\
  95. }
  96. #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
  97. BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
  98. #endif // BOOST_LOG_DOXYGEN_PASS
  99. /*!
  100. * \brief Asynchronous logging sink frontend
  101. *
  102. * The frontend starts a separate thread on construction. All logging records are passed
  103. * to the backend in this dedicated thread.
  104. *
  105. * The user can prevent spawning the internal thread by specifying \c start_thread parameter
  106. * with the value of \c false on construction. In this case log records will be buffered
  107. * in the internal queue until the user calls \c run, \c feed_records or \c flush in his own
  108. * thread. Log record queueing strategy is specified in the \c QueueingStrategyT template
  109. * parameter.
  110. */
  111. template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
  112. class asynchronous_sink :
  113. public aux::make_sink_frontend_base< SinkBackendT >::type,
  114. public QueueingStrategyT
  115. {
  116. typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
  117. typedef QueueingStrategyT queue_base_type;
  118. private:
  119. //! Backend synchronization mutex type
  120. typedef std::recursive_mutex backend_mutex_type;
  121. //! Frontend synchronization mutex type
  122. typedef typename base_type::mutex_type frontend_mutex_type;
  123. //! Operation bit mask
  124. enum operation
  125. {
  126. idle = 0u,
  127. feeding_records = 1u,
  128. flushing = 3u
  129. };
  130. //! Function object to run the log record feeding thread
  131. class run_func
  132. {
  133. public:
  134. typedef void result_type;
  135. private:
  136. asynchronous_sink* m_self;
  137. public:
  138. explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
  139. {
  140. }
  141. result_type operator()() const
  142. {
  143. m_self->run();
  144. }
  145. };
  146. //! A scope guard that implements active operation management
  147. class scoped_feeding_operation
  148. {
  149. private:
  150. asynchronous_sink& m_self;
  151. public:
  152. //! Initializing constructor
  153. explicit scoped_feeding_operation(asynchronous_sink& self) : m_self(self)
  154. {
  155. }
  156. //! Destructor
  157. ~scoped_feeding_operation()
  158. {
  159. m_self.complete_feeding_operation();
  160. }
  161. BOOST_DELETED_FUNCTION(scoped_feeding_operation(scoped_feeding_operation const&))
  162. BOOST_DELETED_FUNCTION(scoped_feeding_operation& operator= (scoped_feeding_operation const&))
  163. };
  164. //! A scope guard that resets a flag on destructor
  165. class scoped_flag
  166. {
  167. private:
  168. frontend_mutex_type& m_Mutex;
  169. std::condition_variable_any& m_Cond;
  170. boost::atomic< bool >& m_Flag;
  171. public:
  172. explicit scoped_flag(frontend_mutex_type& mut, std::condition_variable_any& cond, boost::atomic< bool >& f) :
  173. m_Mutex(mut), m_Cond(cond), m_Flag(f)
  174. {
  175. }
  176. ~scoped_flag()
  177. {
  178. try
  179. {
  180. std::lock_guard< frontend_mutex_type > lock(m_Mutex);
  181. m_Flag.store(false, boost::memory_order_relaxed);
  182. m_Cond.notify_all();
  183. }
  184. catch (...)
  185. {
  186. }
  187. }
  188. BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
  189. BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
  190. };
  191. public:
  192. //! Sink implementation type
  193. typedef SinkBackendT sink_backend_type;
  194. //! \cond
  195. static_assert(has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value, "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
  196. //! \endcond
  197. #ifndef BOOST_LOG_DOXYGEN_PASS
  198. //! A pointer type that locks the backend until it's destroyed
  199. typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
  200. #else // BOOST_LOG_DOXYGEN_PASS
  201. //! A pointer type that locks the backend until it's destroyed
  202. typedef implementation_defined locked_backend_ptr;
  203. #endif // BOOST_LOG_DOXYGEN_PASS
  204. private:
  205. //! Synchronization mutex
  206. backend_mutex_type m_BackendMutex;
  207. //! Pointer to the backend
  208. const shared_ptr< sink_backend_type > m_pBackend;
  209. //! Dedicated record feeding thread
  210. std::thread m_DedicatedFeedingThread;
  211. //! Condition variable to implement blocking operations
  212. std::condition_variable_any m_BlockCond;
  213. //! Currently active operation
  214. operation m_ActiveOperation;
  215. //! The flag indicates that the feeding loop has to be stopped
  216. boost::atomic< bool > m_StopRequested;
  217. //! The flag indicates that queue flush has been requested
  218. boost::atomic< bool > m_FlushRequested;
  219. public:
  220. /*!
  221. * Default constructor. Constructs the sink backend instance.
  222. * Requires the backend to be default-constructible.
  223. *
  224. * \param start_thread If \c true, the frontend creates a thread to feed
  225. * log records to the backend. Otherwise no thread is
  226. * started and it is assumed that the user will call
  227. * \c run, \c feed_records or \c flush himself.
  228. */
  229. explicit asynchronous_sink(bool start_thread = true) :
  230. base_type(true),
  231. m_pBackend(boost::make_shared< sink_backend_type >()),
  232. m_ActiveOperation(idle),
  233. m_StopRequested(false),
  234. m_FlushRequested(false)
  235. {
  236. if (start_thread)
  237. start_feeding_thread();
  238. }
  239. /*!
  240. * Constructor attaches user-constructed backend instance
  241. *
  242. * \param backend Pointer to the backend instance.
  243. * \param start_thread If \c true, the frontend creates a thread to feed
  244. * log records to the backend. Otherwise no thread is
  245. * started and it is assumed that the user will call
  246. * \c run, \c feed_records or \c flush himself.
  247. *
  248. * \pre \a backend is not \c NULL.
  249. */
  250. explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
  251. base_type(true),
  252. m_pBackend(backend),
  253. m_ActiveOperation(idle),
  254. m_StopRequested(false),
  255. m_FlushRequested(false)
  256. {
  257. if (start_thread)
  258. start_feeding_thread();
  259. }
  260. /*!
  261. * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor.
  262. * Refer to the backend documentation for the list of supported parameters.
  263. *
  264. * The frontend uses the following named parameters:
  265. *
  266. * \li start_thread - If \c true, the frontend creates a thread to feed
  267. * log records to the backend. Otherwise no thread is
  268. * started and it is assumed that the user will call
  269. * \c run, \c feed_records or \c flush himself.
  270. */
  271. #ifndef BOOST_LOG_DOXYGEN_PASS
  272. BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
  273. #else
  274. template< typename... Args >
  275. explicit asynchronous_sink(Args&&... args);
  276. #endif
  277. /*!
  278. * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
  279. */
  280. ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
  281. {
  282. stop();
  283. }
  284. /*!
  285. * Locking accessor to the attached backend
  286. */
  287. locked_backend_ptr locked_backend()
  288. {
  289. return locked_backend_ptr(m_pBackend, m_BackendMutex);
  290. }
  291. /*!
  292. * Enqueues the log record to the backend
  293. */
  294. void consume(record_view const& rec) BOOST_OVERRIDE
  295. {
  296. if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
  297. {
  298. std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  299. // Wait until flush is done
  300. while (m_FlushRequested.load(boost::memory_order_acquire))
  301. m_BlockCond.wait(lock);
  302. }
  303. queue_base_type::enqueue(rec);
  304. }
  305. /*!
  306. * The method attempts to pass logging record to the backend
  307. */
  308. bool try_consume(record_view const& rec) BOOST_OVERRIDE
  309. {
  310. if (!m_FlushRequested.load(boost::memory_order_acquire))
  311. return queue_base_type::try_enqueue(rec);
  312. else
  313. return false;
  314. }
  315. /*!
  316. * The method starts record feeding loop and effectively blocks until either of this happens:
  317. *
  318. * \li the thread is interrupted due to a call to \c stop
  319. * \li an exception is thrown while processing a log record in the backend, and the exception is
  320. * not terminated by the exception handler, if one is installed
  321. *
  322. * \pre The sink frontend must be constructed without spawning a dedicated thread
  323. */
  324. void run()
  325. {
  326. // First check that no other thread is running
  327. {
  328. std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  329. if (start_feeding_operation(lock, feeding_records))
  330. return;
  331. }
  332. scoped_feeding_operation guard(*this);
  333. // Now start the feeding loop
  334. while (true)
  335. {
  336. do_feed_records();
  337. if (!m_StopRequested.load(boost::memory_order_acquire))
  338. {
  339. // Block until new record is available
  340. record_view rec;
  341. if (queue_base_type::dequeue_ready(rec))
  342. base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
  343. }
  344. else
  345. break;
  346. }
  347. }
  348. /*!
  349. * The method softly interrupts record feeding loop. This method must be called when \c run,
  350. * \c feed_records or \c flush method execution has to be interrupted. Unlike regular thread
  351. * interruption, calling \c stop will not interrupt the record processing in the middle.
  352. * Instead, the sink frontend will attempt to finish its business with the record in progress
  353. * and return afterwards. This method can be called either if the sink was created with
  354. * an internal dedicated thread, or if the feeding loop was initiated by user.
  355. *
  356. * If no record feeding operation is in progress, calling \c stop marks the sink frontend
  357. * so that the next feeding operation stops immediately.
  358. *
  359. * \note Returning from this method does not guarantee that there are no records left buffered
  360. * in the sink frontend. It is possible that log records keep coming during and after this
  361. * method is called. At some point of execution of this method log records stop being processed,
  362. * and all records that come after this point are put into the queue. These records will be
  363. * processed upon further calls to \c run or \c feed_records.
  364. *
  365. * \note If the record feeding loop is being run in a user's thread (i.e. \c start_thread was specified
  366. * as \c false on frontend construction), this method does not guarantee that upon return the thread
  367. * has returned from the record feeding loop or that it won't enter it in the future. The method
  368. * only ensures that the record feeding thread will eventually return from the feeding loop. It is
  369. * user's responsibility to synchronize with the user's record feeding thread.
  370. */
  371. void stop()
  372. {
  373. std::thread feeding_thread;
  374. {
  375. std::lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
  376. m_StopRequested.store(true, boost::memory_order_release);
  377. queue_base_type::interrupt_dequeue();
  378. m_DedicatedFeedingThread.swap(feeding_thread);
  379. }
  380. if (feeding_thread.joinable())
  381. feeding_thread.join();
  382. }
  383. /*!
  384. * The method feeds log records that may have been buffered to the backend and returns
  385. *
  386. * \pre The sink frontend must be constructed without spawning a dedicated thread
  387. */
  388. void feed_records()
  389. {
  390. // First check that no other thread is running
  391. {
  392. std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  393. if (start_feeding_operation(lock, feeding_records))
  394. return;
  395. }
  396. scoped_feeding_operation guard(*this);
  397. // Now start the feeding loop
  398. do_feed_records();
  399. }
  400. /*!
  401. * The method feeds all log records that may have been buffered to the backend and returns.
  402. * Unlike \c feed_records, in case of ordering queueing the method also feeds records
  403. * that were enqueued during the ordering window, attempting to drain the queue completely.
  404. */
  405. void flush() BOOST_OVERRIDE
  406. {
  407. {
  408. std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  409. if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
  410. {
  411. // There is already a thread feeding records, let it do the job
  412. m_FlushRequested.store(true, boost::memory_order_release);
  413. queue_base_type::interrupt_dequeue();
  414. while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
  415. m_BlockCond.wait(lock);
  416. // The condition may have been signalled when the feeding operation was finishing.
  417. // In that case records may not have been flushed, and we do the flush ourselves.
  418. if (m_ActiveOperation != idle)
  419. return;
  420. }
  421. m_ActiveOperation = flushing;
  422. m_FlushRequested.store(true, boost::memory_order_relaxed);
  423. }
  424. scoped_feeding_operation guard(*this);
  425. do_feed_records();
  426. }
  427. private:
  428. #ifndef BOOST_LOG_DOXYGEN_PASS
  429. //! The method spawns record feeding thread
  430. void start_feeding_thread()
  431. {
  432. std::thread(run_func(this)).swap(m_DedicatedFeedingThread);
  433. }
  434. //! Starts record feeding operation. The method blocks or throws if another feeding operation is in progress.
  435. bool start_feeding_operation(std::unique_lock< frontend_mutex_type >& lock, operation op)
  436. {
  437. while (m_ActiveOperation != idle)
  438. {
  439. if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
  440. BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
  441. if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
  442. {
  443. m_StopRequested.store(false, boost::memory_order_relaxed);
  444. return true;
  445. }
  446. m_BlockCond.wait(lock);
  447. }
  448. m_ActiveOperation = op;
  449. return false;
  450. }
  451. //! Completes record feeding operation
  452. void complete_feeding_operation() BOOST_NOEXCEPT
  453. {
  454. try
  455. {
  456. std::lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
  457. m_ActiveOperation = idle;
  458. m_StopRequested.store(false, boost::memory_order_relaxed);
  459. m_BlockCond.notify_all();
  460. }
  461. catch (...)
  462. {
  463. }
  464. }
  465. //! The record feeding loop
  466. void do_feed_records()
  467. {
  468. while (!m_StopRequested.load(boost::memory_order_acquire))
  469. {
  470. record_view rec;
  471. bool dequeued = false;
  472. if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
  473. dequeued = queue_base_type::try_dequeue_ready(rec);
  474. else
  475. dequeued = queue_base_type::try_dequeue(rec);
  476. if (dequeued)
  477. base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
  478. else
  479. break;
  480. }
  481. if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
  482. {
  483. scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
  484. base_type::flush_backend(m_BackendMutex, *m_pBackend);
  485. }
  486. }
  487. #endif // BOOST_LOG_DOXYGEN_PASS
  488. };
  489. #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
  490. #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
  491. #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
  492. } // namespace sinks
  493. BOOST_LOG_CLOSE_NAMESPACE // namespace log
  494. } // namespace boost
  495. #include <boost/log/detail/footer.hpp>
  496. #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_