c++-gtk-utils
async_queue.h
Go to the documentation of this file.
1 /* Copyright (C) 2006 to 2014 Chris Vine
2 
3 The library comprised in this file or of which this file is part is
4 distributed by Chris Vine under the GNU Lesser General Public
5 License as follows:
6 
7  This library is free software; you can redistribute it and/or
8  modify it under the terms of the GNU Lesser General Public License
9  as published by the Free Software Foundation; either version 2.1 of
10  the License, or (at your option) any later version.
11 
12  This library is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License, version 2.1, for more details.
16 
17  You should have received a copy of the GNU Lesser General Public
18  License, version 2.1, along with this library (see the file LGPL.TXT
19  which came with this source code package in the c++-gtk-utils
20  sub-directory); if not, write to the Free Software Foundation, Inc.,
21  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 
23 However, it is not intended that the object code of a program whose
24 source code instantiates a template from this file or uses macros or
25 inline functions (of any length) should by reason only of that
26 instantiation or use be subject to the restrictions of use in the GNU
27 Lesser General Public License. With that in mind, the words "and
28 macros, inline functions and instantiations of templates (of any
29 length)" shall be treated as substituted for the words "and small
30 macros and small inline functions (ten lines or less in length)" in
31 the fourth paragraph of section 5 of that licence. This does not
32 affect any other reason why object code may be subject to the
33 restrictions in that licence (nor for the avoidance of doubt does it
34 affect the application of section 2 of that licence to modifications
35 of the source code in this file).
36 
37 */
38 
39 /**
40  * @file async_queue.h
41  * @brief This file provides thread-safe asynchronous queue classes.
42  *
43  * AsyncQueue is a class which provides some of the functionality of a
44  * std::queue object (but note that the AsyncQueue::pop(value_type&
45  * obj) and AsyncQueue::move_pop(value_type& obj) methods provide the
46  * popped element by reference - see the comments on that method for
47  * the reason), except that it has mutex locking of the data container
48  * so as to permit pushing and popping from different threads. It is
49  * therefore useful for passing data between threads, perhaps in
50  * response to a signal being emitted from a Notifier object.
51  *
52  * AsyncQueueDispatch is a class which has blocking pop() and
53  * move_pop() methods, which allows it to be waited on by a dedicated
54  * event/message dispatching thread for incoming work (represented by
55  * the data pushed onto the queue). In the same way, it can be used
56  * to implement thread pools, by having threads in the pool waiting on
57  * the queue.
58  *
59  * By default the queues use a std::list object as their container
60  * because when adding an item to the queue all allocation can take
61  * place outside the queue object's mutex. However, for types which
62  * have low overhead copy or move constructors, this can be changed
63  * to, say, a std::deque object by specifying it as the second
64  * template parameter.
65  *
66  * If data pushed and popped from a queue are held by a reference
67  * counted smart pointer, the reference count must be thread-safe,
68  * such as by using SharedLockPtr or IntrusiveLockCounter, or
69  * std::shared_ptr (which is required to have a thread-safe reference
70  * count).
71  */
72 
73 #ifndef CGU_ASYNC_QUEUE_H
74 #define CGU_ASYNC_QUEUE_H
75 
76 #include <queue>
77 #include <list>
78 #include <exception>
79 #include <utility> // for std::move and std::forward
80 #include <algorithm> // for std::swap
81 #include <time.h>
82 
83 #include <pthread.h>
84 
85 #include <c++-gtk-utils/mutex.h>
86 #include <c++-gtk-utils/thread.h>
88 
89 #ifdef CGU_USE_SCHED_YIELD
90 #include <sched.h>
91 #else
92 #include <unistd.h>
93 #endif
94 
95 namespace Cgu {
96 
97 /**
98  * @class AsyncQueuePopError async_queue.h c++-gtk-utils/async_queue.h
99  * @brief An exception thrown if calling pop() on a AsyncQueue or
100  * AsyncQueueDispatch object fails because the queue is empty.
101  * @sa AsyncQueue AsyncQueueDispatch
102  */
103 
104 struct AsyncQueuePopError: public std::exception {
105  virtual const char* what() const throw() {return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
106 };
107 
108 
109 /**
110  * @class AsyncQueue async_queue.h c++-gtk-utils/async_queue.h
111  * @brief A thread-safe asynchronous queue.
112  * @sa AsyncQueueDispatch AsyncChannel AsyncResult
113  *
114  * AsyncQueue is a class which provides some of the functionality of a
115  * std::queue object (but note that the AsyncQueue::pop(value_type&
116  * obj) and AsyncQueue::move_pop(value_type& obj) methods provide the
117  * popped element by reference - see the comments on that method for
118  * the reason), except that it has mutex locking of the data container
119  * so as to permit pushing and popping from different threads. It is
120  * therefore useful for passing data between threads, perhaps in
121  * response to a signal being emitted from a Notifier object.
122  *
123  * By default the queue uses a std::list object as its container
124  * because when adding an item to the queue all allocation can take
125  * place outside the queue object's mutex. However, for types which
126  * have low overhead copy or move constructors, this can be changed
127  * to, say, a std::deque object by specifying it as the second
128  * template parameter.
129  *
130  * If data pushed and popped from the queue are held by a reference
131  * counted smart pointer, the reference count must be thread-safe,
132  * such as by using SharedLockPtr or IntrusiveLockCounter, or
133  * std::shared_ptr (which is required to have a thread-safe reference
134  * count).
135  *
136  * If the library is installed using the
137  * \--with-glib-memory-slices-compat or
138  * \--with-glib-memory-slices-no-compat configuration options, any
139  * AsyncQueue objects constructed on free store will be constructed in
140  * glib memory slices. This does not affect the queue container
141  * itself: to change the allocator of the C++ container, a custom
142  * allocator type can be provided when the AsyncQueue object is
143  * instantiated offering the standard allocator interface. If glib
144  * memory slices are not used or no AsyncQueue objects are constructed
145  * on free store, it is not necessary to call g_thread_init() before
146  * manipulating or using an AsyncQueue object in multiple threads, but
147  * prior to glib version 2.32 glib itself (and thus glib memory
148  * slices) are not thread safe unless that function has been called.
149  */
150 
151 template <class T, class Container = std::list<T> > class AsyncQueue {
152 public:
153  typedef typename Container::value_type value_type;
154  typedef typename Container::size_type size_type;
155  typedef Container container_type;
156 private:
157  mutable Thread::Mutex mutex;
158  std::queue<T, Container> q;
159 
160 // TODO: at the next ABI break make this method explicitly static
161 // this method won't throw: it is for the user to ensure the arguments
162 // do not refer to the same mutex object
163  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
164  m1.lock();
165  for(;;) {
166  if (!m2.trylock()) {
167  return;
168  }
169  m1.unlock();
170  // spin nicely
171 #ifdef CGU_USE_SCHED_YIELD
172  sched_yield();
173 #else
174  usleep(10);
175 #endif
176  m1.lock();
177  }
178  }
179 public:
180 /**
181  * Pushes an item onto the queue. This method has strong exception
182  * safety if the container is a std::list or std::deque container (the
183  * default is std::list), except that if std::deque is used as the
184  * container and the copy constructor, move constructor, copy
185  * assignment operator or move assignment operator of the queue item
186  * throws, it only gives the basic exception guarantee (and the basic
187  * guarantee is not given by std::deque if the queue item's move
188  * constructor throws and it uses a non-default allocator which does
189  * not provide for it to be CopyInsertable). It is thread safe.
190  * @param obj The item to be pushed onto the queue.
191  * @exception std::bad_alloc The method might throw std::bad_alloc if
192  * memory is exhausted and the system throws in that case. It might
193  * also throw if the copy constructor, move constructor, assignment
194  * operator or move assignment operator of the queue item might throw.
195  */
196  void push(const value_type& obj) {
197  Thread::Mutex::Lock lock{mutex};
198  q.push(obj);
199  }
200 
201 /**
202  * Pushes an item onto the queue. This method has strong exception
203  * safety if the container is a std::list or std::deque container (the
204  * default is std::list), except that if std::deque is used as the
205  * container and the copy constructor, move constructor, copy
206  * assignment operator or move assignment operator of the queue item
207  * throws, it only gives the basic exception guarantee (and the basic
208  * guarantee is not given by std::deque if the queue item's move
209  * constructor throws and it uses a non-default allocator which does
210  * not provide for it to be CopyInsertable). It is thread safe.
211  * @param obj The item to be pushed onto the queue.
212  * @exception std::bad_alloc The method might throw std::bad_alloc if
213  * memory is exhausted and the system throws in that case. It might
214  * also throw if the copy constructor, move constructor, assignment
215  * operator or move assignment operator of the queue item might throw.
216  *
217  * Since 2.0.0-rc5
218  */
219  void push(value_type&& obj) {
220  Thread::Mutex::Lock lock{mutex};
221  q.push(std::move(obj));
222  }
223 
224 /**
225  * Pushes an item onto the queue by constructing it in place: that is,
226  * by passing to this method the item's constructor's arguments,
227  * rather than the item itself. This method has strong exception
228  * safety if the container is a std::list or std::deque container (the
229  * default is std::list). (Technically, for a std::deque container,
230  * emplace() only offers the same exception guarantees as does push(),
231  * namely only the basic guarantee where a copy or move of the queue
232  * item throws during the call, but the purpose of emplace is to
233  * construct in place and any reasonable implementation will not copy
234  * or move the queue item.) It is thread safe.
235  * @param args The constructor arguments for the item to be pushed
236  * onto the queue.
237  * @exception std::bad_alloc The method might throw std::bad_alloc if
238  * memory is exhausted and the system throws in that case. It might
239  * also throw if the item's constructor (including any of its
240  * constructor arguments) might throw when constructing the item.
241  * @note The constructor of the item pushed onto the queue must not
242  * access any of the methods of the same queue object, or a deadlock
243  * might occur.
244  *
245  * Since 2.0.0-rc5
246  */
247  template<class... Args>
248  void emplace(Args&&... args) {
249  Thread::Mutex::Lock lock{mutex};
250  q.emplace(std::forward<Args>(args)...);
251  }
252 
253 /**
254  * Pops an item from the queue using the contained type's copy
255  * assignment operator. This method has strong exception safety if
256  * the container is a std::deque or std::list container (the default
257  * is std::list), provided the destructor of a contained item does not
258  * throw. It is thread safe.
259  * @param obj A value type reference to which the item at the front of
260  * the queue will be assigned.
261  * @exception AsyncQueuePopError If the queue is empty when a pop is
262  * attempted, this method will throw AsyncQueuePopError. It might
263  * also throw if the copy assignment operator of the queue item might
264  * throw. In order to complete pop operations atomically under a
265  * single lock and to retain strong exception safety, the object into
266  * which the popped data is to be placed is passed as an argument by
267  * reference (this avoids a copy from a temporary object after the
268  * data has been extracted from the queue, which would occur if the
269  * item extracted were returned by value). It might also throw if the
270  * destructor of the queue item might throw (but that should never
271  * happen), or if the empty() method of the container type throws
272  * (which would not happen on any sane implementation).
273  */
274  void pop(value_type& obj) {
275  Thread::Mutex::Lock lock{mutex};
276  if (q.empty()) throw AsyncQueuePopError();
277  obj = q.front();
278  q.pop();
279  }
280 
281 /**
282  * Pops an item from the queue using the contained type's move
283  * assignment operator, if it has one. This method is identical to
284  * the pop() method if that type has no move assignment operator.
285  * This method has strong exception safety if the container is a
286  * std::deque or std::list container (the default is std::list),
287  * provided the destructor of a contained item does not throw and the
288  * move assignment operator of a contained item has strong exception
289  * safety. It is thread safe. Use this method in preference to the
290  * pop() method if it is known that the contained items' move
291  * assignment operator does not throw or is strongly exception safe,
292  * or if the use case does not require strong exception safety. This
293  * method (or the move_pop_basic() method) must be used in place of
294  * the pop() method if the contained type has a move assignment
295  * operator but no copy assignment operator (such as a std::unique_ptr
296  * object).
297  * @param obj A value type reference to which the item at the front of
298  * the queue will be move assigned.
299  * @exception AsyncQueuePopError If the queue is empty when a pop is
300  * attempted, this method will throw AsyncQueuePopError. It might
301  * also throw if the move assignment operator of the queue item might
302  * throw, or if it has no move assignment operator and its copy
303  * assignment operator throws. In order to complete pop operations
304  * atomically under a single lock and to retain strong exception
305  * safety, the object into which the popped data is to be placed is
306  * passed as an argument by reference (this avoids a move from a
307  * temporary object after the data has been extracted from the queue,
308  * which would occur if the item extracted were returned by value).
309  * It might also throw if the destructor of the queue item might throw
310  * (but that should never happen), or if the empty() method of the
311  * container type throws (which would not happen on any sane
312  * implementation).
313  *
314  * Since 2.0.11
315  */
316  void move_pop(value_type& obj) {
317  Thread::Mutex::Lock lock{mutex};
318  if (q.empty()) throw AsyncQueuePopError();
319  obj = std::move(q.front());
320  q.pop();
321  }
322 
323 /**
324  * Pops an item from the queue using the contained type's move
325  * assignment operator, if it has one, or if not using its copy
326  * assignment operator. This method is only available where the
327  * queue's container is a std::list object (which is the default), and
328  * does the same as the move_pop() method except that when popping the
329  * only thing which is done to the queue's container's internal state
330  * within the queue object's mutex is to swap some pointers: in
331  * particular, deallocation of the list node at the front of the queue
332  * occurs outside that mutex, as does assignment to this method's
333  * argument. Given that, if the queue's container is a list, any new
334  * node is also constructed outside the mutex when pushing or
335  * emplacing an item onto the queue, this minimizes contention between
336  * threads: it gets as close to lock-free performance as it is
337  * possible to get with the standard containers.
338  *
339  * However this minimizing of contention comes at a cost, which is
340  * that if the contained item's move assignment operator (or if it has
341  * none, its copy assignment operator) throws, then only the basic
342  * exception guarantee is offered (hence the name of this method).
343  * This means that although the AsyncQueue object would be left in a
344  * valid state in the event of such throwing, the item at the front of
345  * the queue would be lost. As in the case of the pop() and
346  * move_pop() methods, in addition only the basic exception guarantee
347  * is offered if the destructor of the contained item throws. Only
348  * use this method if the queue's container is a std::list object, and
349  * if either it is known that the contained item's move assignment
350  * operator (or if it has none, its copy assignment operator) does not
351  * throw, or the use case does not require strong exception safety.
352  * This method is thread safe.
353  *
354  * If this method is called for an AsyncQueue object whose container
355  * is not a std::list object, it will hand off to the move_pop()
356  * method, which is separately documented.
357  * @param obj A value type reference to which the item at the front of
358  * the queue will be move assigned.
359  * @exception AsyncQueuePopError If the queue is empty when a pop is
360  * attempted, this method will throw AsyncQueuePopError. It might
361  * also throw if the move assignment operator of the queue item might
362  * throw or it has no move assignment operator and its copy assignment
363  * operator throws (in which case only the basic exception guarantee
364  * is offered). It might also throw if the destructor of the queue
365  * item might throw (but that should never happen), if the empty()
366  * method of the container type throws (which would not happen on any
367  * sane implementation) or if the constructor of the implementation's
368  * list allocator throws (which would be highly unusual). In the
369  * event of any of the last two throwing, the strong exception
370  * guarantee is offered.
371  *
372  * Since 2.0.26 and 2.2.9
373  */
374 // See the specialisation for lists for the implementation of this
375 // method. For queues which do not use a list as their container,
376 // this hands off to move_pop().
378  move_pop(obj);
379  }
380 
381 /**
382  * Discards the item at the front of the queue. This method has
383  * strong exception safety if the container is a std::deque or
384  * std::list container (the default is std::list), provided the
385  * destructor of a contained item does not throw. It is thread safe.
386  * @exception AsyncQueuePopError If the queue is empty when a pop is
387  * attempted, this method will throw AsyncQueuePopError. It might
388  * also throw if the destructor of the queue item might throw (but
389  * that should never happen), or if the empty() method of the
390  * container type throws (which would not happen on any sane
391  * implementation).
392  */
393  void pop() {
394  Thread::Mutex::Lock lock{mutex};
395  if (q.empty()) throw AsyncQueuePopError();
396  q.pop();
397  }
398 
399 /**
400  * @return Whether the queue is empty. It will not throw assuming
401  * that the empty() method of the container type does not throw, as it
402  * will not on any sane implementation.
403  * @note This method is thread safe, but the return value may not be
404  * valid if another thread has pushed to or popped from the queue
405  * before the value returned by the method is acted on. It is
406  * provided as a utility, but may not be meaningful, depending on the
407  * intended usage.
408  */
409  bool empty() const {
410  Thread::Mutex::Lock lock{mutex};
411  return q.empty();
412  }
413 
414 /**
415  * @return The number of items currently in the queue. It will not
416  * throw assuming that the size() method of the container type does
417  * not throw, as it will not on any sane implementation.
418  * @note This method is thread safe, but the return value may not be
419  * valid if another thread has pushed to or popped from the queue
420  * before the value returned by the method is acted on. It is
421  * provided as a utility, but may not be meaningful, depending on the
422  * intended usage.
423  *
424  * Since 2.0.8
425  */
426  size_type size() const {
427  Thread::Mutex::Lock lock{mutex};
428  return q.size();
429  }
430 
431 /**
432  * Swaps the contents of 'this' and 'other'. It will not throw
433  * assuming that the swap method of the container type does not throw
434  * (which the C++11/14 standard requires not to happen with the
435  * standard sequence containers). It is thread safe and the swap is
436  * thread-wise atomic. A non-class function
437  * Cgu::swap(Cgu::AsyncQueue&, Cgu::AsyncQueue&) method is also
438  * provided which will call this method.
439  * @param other The object to be swapped with this one.
440  *
441  * Since 2.0.8
442  */
443  void swap(AsyncQueue& other) {
444  if (this != &other) {
445  lock2(mutex, other.mutex); // doesn't throw
447  Thread::Mutex::Lock l2{other.mutex, Thread::locked};
448  q.swap(other.q);
449  }
450  }
451 
452 /**
453  * The copy assignment operator is strongly exception safe with the
454  * standard sequence containers (it uses copy and swap). It is also
455  * thread safe, as it safely locks both the assignor's and assignee's
456  * mutex to provide a thread-wise atomic assignment.
457  * @param rhs The assignor.
458  * @return The AsyncQueue object after assignment.
459  * @exception std::bad_alloc The copy constructor of the queue's
460  * container type, and so this assignment operator, might throw
461  * std::bad_alloc if memory is exhausted and the system throws in that
462  * case. This assignment operator will also throw if the copy
463  * constructor of the queue's container type throws any other
464  * exceptions, including if any copy or move constructor or copy or
465  * move assignment operator of a contained item throws.
466  * @exception Thread::MutexError This assignment operator might throw
467  * Thread::MutexError if initialization of a transitional object's
468  * contained mutex fails. (It is often not worth checking for this,
469  * as it means either memory is exhausted or pthread has run out of
470  * other resources to create new mutexes.)
471  *
472  * Since 2.0.8
473  */
475  if (this != &rhs) {
476  lock2(mutex, rhs.mutex); // doesn't throw
478  Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
479  std::queue<T, Container> temp{rhs.q};
480  q.swap(temp);
481  }
482  return *this;
483  }
484 
485 /**
486  * This move assignment operator is thread safe as regards the
487  * assignee (the object moved to), but no synchronization is carried
488  * out with respect to the rvalue assignor/movant. This is because
489  * temporaries are only visible and accessible in the thread carrying
490  * out the move operation and synchronization for them would represent
491  * pointless overhead. In a case where the user uses std::move to
492  * force a move from a named object, and that named object's lifetime
493  * is managed by (or the object is otherwise accessed by) a different
494  * thread than the one making the move, the user must carry out her
495  * own synchronization with respect to that different thread, both to
496  * ensure that a consistent view of the the named object is obtained
497  * and because that object will be mutated by the move. This method
498  * invokes std::queue's move assignment operator, and therefore has
499  * the same exception safety as the standard library's implementation
500  * of that operator. It will not normally throw unless a custom
501  * allocator is used which throws on move assignment, or the
502  * destructor of a contained item throws.
503  * @param rhs The assignor/movant.
504  * @return The AsyncQueue object after move assignment.
505  *
506  * Since 2.0.8
507  */
509  Thread::Mutex::Lock lock{mutex};
510  q = std::move(rhs.q);
511  return *this;
512  }
513 
514 /**
515  * @exception std::bad_alloc The default constructor might throw
516  * std::bad_alloc if memory is exhausted and the system throws in that
517  * case.
518  * @exception Thread::MutexError The default constructor might throw
519  * Thread::MutexError if initialization of the contained mutex fails.
520  * (It is often not worth checking for this, as it means either memory
521  * is exhausted or pthread has run out of other resources to create
522  * new mutexes.)
523  */
524  AsyncQueue() = default;
525 
526 /**
527  * As regards thread safety, the move constructor does not synchronize
528  * with respect to the initializing rvalue. This is because
529  * temporaries are only visible and accessible in the thread carrying
530  * out the move operation and synchronization for them would represent
531  * pointless overhead. In a case where a user uses std::move to force
532  * a move from a named object, and that named object's lifetime is
533  * managed by (or the object is otherwise accessed by) a different
534  * thread than the one making the move, the user must carry out her
535  * own synchronization with respect to that different thread, both to
536  * ensure that a consistent view of the the named object is obtained
537  * and because that object will be mutated by the move.
538  * @param rhs The AsyncQueue object to be moved.
539  * @exception Thread::MutexError The move constructor might throw
540  * Thread::MutexError if initialization of the contained mutex fails.
541  * If it does so this move constructor is strongly exception safe (if
542  * it is thrown, the object passed as an argument will be unchanged).
543  * (It is often not worth checking for this, as it means either memory
544  * is exhausted or pthread has run out of other resources to create
545  * new mutexes.) It might also throw if the queue's container type's
546  * move constructor might throw, but it should not do that unless a
547  * custom allocator is in use.
548  *
549  * Since 2.0.8
550  */
551  AsyncQueue(AsyncQueue&& rhs): q(std::move(rhs.q)) {}
552 
553 /**
554  * The copy constructor is thread safe, as it locks the initializing
555  * object's mutex to obtain a consistent view of it.
556  * @param rhs The AsyncQueue object to be copied.
557  * @exception std::bad_alloc The copy constructor of the queue's
558  * container type, and so this constructor, might throw std::bad_alloc
559  * if memory is exhausted and the system throws in that case. It will
560  * also throw if the copy constructor of the queue's container type
561  * throws any other exceptions, including if any copy or move
562  * constructor or copy or move assignment operator of a contained item
563  * throws.
564  * @exception Thread::MutexError The copy constructor might throw
565  * Thread::MutexError if initialization of the contained mutex fails.
566  * (It is often not worth checking for this, as it means either memory
567  * is exhausted or pthread has run out of other resources to create
568  * new mutexes.)
569  *
570  * Since 2.0.8
571  */
572  // we use the comma operator here to lock the mutex and call the
573  // copy constructor: the lock will be retained until the end of the
574  // full expression in which it is lexically situated, namely until
575  // the end of q's constructor - see C++11 1.9/10 and 12.2/3
576  AsyncQueue(const AsyncQueue& rhs): q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
577 
578 /**
579  * The destructor does not throw unless the destructor of a contained
580  * item throws. It is thread safe (any thread may delete the
581  * AsyncQueue object).
582  */
584  // lock and unlock the mutex in the destructor so that we have an
585  // acquire operation to ensure that when the std::queue object is
586  // destroyed memory is synchronised, so any thread may destroy the
587  // AsyncQueue object
588  Thread::Mutex::Lock lock{mutex};
589  }
590 
591 /* Only has effect if --with-glib-memory-slices-compat or
592  * --with-glib-memory-slices-no-compat option picked */
594 };
595 
596 /**
597  * @class AsyncQueueDispatch async_queue.h c++-gtk-utils/async_queue.h
598  * @brief A thread-safe asynchronous queue with a blocking pop()
599  * method.
600  * @sa AsyncQueue AsyncChannel AsyncResult
601  *
602  * AsyncQueueDispatch is a class which has blocking pop_dispatch()
603  * and move_pop_dispatch() methods, which allows it to be waited on by a dedicated
604  * event/message dispatching thread for incoming work (represented by
605  * the data pushed onto the queue). In the same way, it can be used
606  * to implement thread pools, by having threads in the pool waiting on
607  * the queue. The AsyncResult class can be useful for passing results
608  * between threads in conjunction with AsyncQueueDispatch (the
609  * documentation on AsyncResult gives an example).
610  *
611  * By default the queue uses a std::list object as its container
612  * because when adding an item to the queue all allocation can take
613  * place outside the queue object's mutex. However, for types which
614  * have low overhead copy or move constructors, this can be changed
615  * to, say, a std::deque object by specifying it as the second
616  * template parameter.
617  *
618  * If data pushed and popped from the queue are held by a reference
619  * counted smart pointer, the reference count must be thread-safe,
620  * such as by using SharedLockPtr or IntrusiveLockCounter, or
621  * std::shared_ptr (which is required to have a thread-safe reference
622  * count).
623  *
624  * If the library is installed using the
625  * \--with-glib-memory-slices-compat or
626  * \--with-glib-memory-slices-no-compat configuration options, any
627  * AsyncQueueDispatch objects constructed on free store will be
628  * constructed in glib memory slices. This does not affect the queue
629  * container itself: to change the allocator of the C++ container, a
630  * custom allocator type can be provided when the AsyncQueueDispatch
631  * object is instantiated offering the standard allocator interface.
632  * If glib memory slices are not used or no AsyncQueueDispatch objects
633  * are constructed on free store, it is not necessary to call
634  * g_thread_init() before manipulating or using an AsyncQueueDispatch
635  * object in multiple threads, but prior to glib version 2.32 glib
636  * itself (and thus glib memory slices) are not thread safe unless
637  * that function has been called.
638  */
639 
640 template <class T, class Container = std::list<T> > class AsyncQueueDispatch {
641 public:
642  typedef typename Container::value_type value_type;
643  typedef typename Container::size_type size_type;
644  typedef Container container_type;
645 private:
646  mutable Thread::Mutex mutex;
647  Thread::Cond cond;
648  std::queue<T, Container> q;
649 
650 // TODO: at the next ABI break make this method explicitly static
651 // this method won't throw: it is for the user to ensure the arguments
652 // do not refer to the same mutex object
653  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
654  m1.lock();
655  for(;;) {
656  if (!m2.trylock()) {
657  return;
658  }
659  m1.unlock();
660  // spin nicely
661 #ifdef CGU_USE_SCHED_YIELD
662  sched_yield();
663 #else
664  usleep(10);
665 #endif
666  m1.lock();
667  }
668  }
669 public:
670 /**
671  * Pushes an item onto the queue. This method has strong exception
672  * safety if the container is a std::list or std::deque container (the
673  * default is std::list), except that if std::deque is used as the
674  * container and the copy constructor, move constructor, copy
675  * assignment operator or move assignment operator of the queue item
676  * throws, it only gives the basic exception guarantee (and the basic
677  * guarantee is not given by std::deque if the queue item's move
678  * constructor throws and it uses a non-default allocator which does
679  * not provide for it to be CopyInsertable). It is thread safe.
680  * @param obj The item to be pushed onto the queue.
681  * @exception std::bad_alloc The method might throw std::bad_alloc if
682  * memory is exhausted and the system throws in that case. It might
683  * also throw if the copy constructor, move constructor, assignment
684  * operator or move assignment operator of the queue item might throw.
685  */
686  void push(const value_type& obj) {
687  Thread::Mutex::Lock lock{mutex};
688  q.push(obj);
689  cond.signal();
690  }
691 
692 /**
693  * Pushes an item onto the queue. This method has strong exception
694  * safety if the container is a std::list or std::deque container (the
695  * default is std::list), except that if std::deque is used as the
696  * container and the copy constructor, move constructor, copy
697  * assignment operator or move assignment operator of the queue item
698  * throws, it only gives the basic exception guarantee (and the basic
699  * guarantee is not given by std::deque if the queue item's move
700  * constructor throws and it uses a non-default allocator which does
701  * not provide for it to be CopyInsertable). It is thread safe.
702  * @param obj The item to be pushed onto the queue.
703  * @exception std::bad_alloc The method might throw std::bad_alloc if
704  * memory is exhausted and the system throws in that case. It might
705  * also throw if the copy constructor, move constructor, assignment
706  * operator or move assignment operator of the queue item might throw.
707  *
708  * Since 2.0.0-rc5
709  */
710  void push(value_type&& obj) {
711  Thread::Mutex::Lock lock{mutex};
712  q.push(std::move(obj));
713  cond.signal();
714  }
715 
716 /**
717  * Pushes an item onto the queue by constructing it in place: that is,
718  * by passing to this method the item's constructor's arguments,
719  * rather than the item itself. This method has strong exception
720  * safety if the container is a std::list or std::deque container (the
721  * default is std::list). (Technically, for a std::deque container,
722  * emplace() only offers the same exception guarantees as does push(),
723  * namely only the basic guarantee where a copy or move of the queue
724  * item throws during the call, but the purpose of emplace is to
725  * construct in place and any reasonable implementation will not copy
726  * or move the queue item.) It is thread safe.
727  * @param args The constructor arguments for the item to be pushed
728  * onto the queue.
729  * @exception std::bad_alloc The method might throw std::bad_alloc if
730  * memory is exhausted and the system throws in that case. It might
731  * also throw if the item's constructor (including any of its
732  * constructor arguments) might throw when constructing the item.
733  * @note The constructor of the item pushed onto the queue must not
734  * access any of the methods of the same queue object, or a deadlock
735  * might occur.
736  *
737  * Since 2.0.0-rc5
738  */
739  template<class... Args>
740  void emplace(Args&&... args) {
741  Thread::Mutex::Lock lock{mutex};
742  q.emplace(std::forward<Args>(args)...);
743  cond.signal();
744  }
745 
746 /**
747  * Pops an item from the queue using the contained type's copy
748  * assignment operator. This method has strong exception safety if
749  * the container is a std::deque or std::list container (the default
750  * is std::list), provided the destructor of a contained item does not
751  * throw. It is thread safe.
752  * @param obj A value type reference to which the item at the front of
753  * the queue will be assigned.
754  * @exception AsyncQueuePopError If the queue is empty when a pop is
755  * attempted, this method will throw AsyncQueuePopError. It might
756  * also throw if the copy assignment operator of the queue item might
757  * throw. In order to complete pop operations atomically under a
758  * single lock and to retain strong exception safety, the object into
759  * which the popped data is to be placed is passed as an argument by
760  * reference (this avoids a copy from a temporary object after the
761  * data has been extracted from the queue, which would occur if the
762  * item extracted were returned by value). It might also throw if the
763  * destructor of the queue item might throw (but that should never
764  * happen), or if the empty() method of the container type throws
765  * (which would not happen on any sane implementation).
766  */
767  void pop(value_type& obj) {
768  Thread::Mutex::Lock lock{mutex};
769  if (q.empty()) throw AsyncQueuePopError();
770  obj = q.front();
771  q.pop();
772  }
773 
774 /**
775  * Pops an item from the queue using the contained type's move
776  * assignment operator, if it has one. This method is identical to
777  * the pop() method if that type has no move assignment operator.
778  * This method has strong exception safety if the container is a
779  * std::deque or std::list container (the default is std::list),
780  * provided the destructor of a contained item does not throw and the
781  * move assignment operator of a contained item has strong exception
782  * safety. It is thread safe. Use this method in preference to the
783  * pop() method if it is known that the contained items' move
784  * assignment operator does not throw or is strongly exception safe,
785  * or if the use case does not require strong exception safety. This
786  * method (or the move_pop_basic() method) must be used in place of
787  * the pop() method if the contained type has a move assignment
788  * operator but no copy assignment operator (such as a std::unique_ptr
789  * object).
790  * @param obj A value type reference to which the item at the front of
791  * the queue will be move assigned.
792  * @exception AsyncQueuePopError If the queue is empty when a pop is
793  * attempted, this method will throw AsyncQueuePopError. It might
794  * also throw if the move assignment operator of the queue item might
795  * throw, or if it has no move assignment operator and its copy
796  * assignment operator throws. In order to complete pop operations
797  * atomically under a single lock and to retain strong exception
798  * safety, the object into which the popped data is to be placed is
799  * passed as an argument by reference (this avoids a move from a
800  * temporary object after the data has been extracted from the queue,
801  * which would occur if the item extracted were returned by value).
802  * It might also throw if the destructor of the queue item might throw
803  * (but that should never happen), or if the empty() method of the
804  * container type throws (which would not happen on any sane
805  * implementation).
806  *
807  * Since 2.0.11
808  */
809  void move_pop(value_type& obj) {
810  Thread::Mutex::Lock lock{mutex};
811  if (q.empty()) throw AsyncQueuePopError();
812  obj = std::move(q.front());
813  q.pop();
814  }
815 
816 /**
817  * Pops an item from the queue using the contained type's move
818  * assignment operator, if it has one, or if not using its copy
819  * assignment operator. This method is only available where the
820  * queue's container is a std::list object (which is the default), and
821  * does the same as the move_pop() method except that when popping the
822  * only thing which is done to the queue's container's internal state
823  * within the queue object's mutex is to swap some pointers: in
824  * particular, deallocation of the list node at the front of the queue
825  * occurs outside that mutex, as does assignment to this method's
826  * argument. Given that, if the queue's container is a list, any new
827  * node is also constructed outside the mutex when pushing or
828  * emplacing an item onto the queue, this minimizes contention between
829  * threads: it gets as close to lock-free performance as it is
830  * possible to get with the standard containers.
831  *
832  * However this minimizing of contention comes at a cost, which is
833  * that if the contained item's move assignment operator (or if it has
834  * none, its copy assignment operator) throws, then only the basic
835  * exception guarantee is offered (hence the name of this method).
836  * This means that although the AsyncQueueDispatch object would be
837  * left in a valid state in the event of such throwing, the item at
838  * the front of the queue would be lost. As in the case of the pop()
839  * and move_pop() methods, in addition only the basic exception
840  * guarantee is offered if the destructor of the contained item
841  * throws. Only use this method if the queue's container is a
842  * std::list object, and if either it is known that the contained
843  * item's move assignment operator (or if it has none, its copy
844  * assignment operator) does not throw, or the use case does not
845  * require strong exception safety. This method is thread safe.
846  *
847  * If this method is called for an AsyncQueueDispatch object whose
848  * container is not a std::list object, it will hand off to the
849  * move_pop() method, which is separately documented.
850  * @param obj A value type reference to which the item at the front of
851  * the queue will be move assigned.
852  * @exception AsyncQueuePopError If the queue is empty when a pop is
853  * attempted, this method will throw AsyncQueuePopError. It might
854  * also throw if the move assignment operator of the queue item might
855  * throw or it has no move assignment operator and its copy assignment
856  * operator throws (in which case only the basic exception guarantee
857  * is offered). It might also throw if the destructor of the queue
858  * item might throw (but that should never happen), if the empty()
859  * method of the container type throws (which would not happen on any
860  * sane implementation) or if the constructor of the implementation's
861  * list allocator throws (which would be highly unusual). In the
862  * event of any of the last two throwing, the strong exception
863  * guarantee is offered.
864  *
865  * Since 2.0.26 and 2.2.9
866  */
867 // See the specialisation for lists for the implementation of this
868 // method. For queues which do not use a list as their container,
869 // this hands off to move_pop().
871  move_pop(obj);
872  }
873 
874 /**
875  * Pops an item from the queue using the contained type's copy
876  * assignment operator. If the queue is empty, it will block until an
877  * item becomes available. If it blocks, the wait comprises a
878  * cancellation point. This method is cancellation safe if the stack
879  * unwinds on cancellation, as cancellation is blocked while the queue
880  * is being operated on after coming out of a wait. This method has
881  * strong exception safety if the container is a std::deque or
882  * std::list container (the default is std::list), provided the
883  * destructor of a contained item does not throw. It is thread safe.
884  * @param obj A value type reference to which the item at the front of
885  * the queue will be assigned. This method might throw if the copy
886  * assignment operator of the queue item might throw. In order to
887  * complete pop operations atomically under a single lock and to
888  * retain strong exception safety, the object into which the popped
889  * data is to be placed is passed as an argument by reference (this
890  * avoids a copy from a temporary object after the data has been
891  * extracted from the queue, which would occur if the item extracted
892  * were returned by value). It might also throw if the destructor of
893  * the queue item might throw (but that should never happen), or if
894  * the empty() method of the container type throws (which would not
895  * happen on any sane implementation).
896  * @note This method calls Thread::Cond::wait(). Between versions
897  * 2.2.3 and 2.2.13 inclusive, Thread::Cond::wait() was marked
898  * 'noexcept'. This was a mistake because it prevented a thread being
899  * cancelled while in a wait, including in this method (the
900  * cancellation pseudo-exception conflicted with the noexcept
901  * specifier). This was fixed in version 2.2.14.
902  */
904  Thread::Mutex::Lock lock{mutex};
905  while (q.empty()) cond.wait(mutex);
907  obj = q.front();
908  q.pop();
909  }
910 
911 /**
912  * Pops an item from the queue using the contained type's move
913  * assignment operator, if it has one (this method is identical to the
914  * pop_dispatch() method if that type has no move assignment
915  * operator). If the queue is empty, it will block until an item
916  * becomes available. If it blocks, the wait comprises a cancellation
917  * point. This method is cancellation safe if the stack unwinds on
918  * cancellation, as cancellation is blocked while the queue is being
919  * operated on after coming out of a wait. This method has strong
920  * exception safety if the container is a std::deque or std::list
921  * container (the default is std::list), provided the destructor of a
922  * contained item does not throw and the move assignment operator of a
923  * contained item has strong exception safety. It is thread safe.
924  * Use this method in preference to the pop_dispatch() method if it is
925  * known that the contained items' move assignment operator does not
926  * throw or is strongly exception safe, or if the use case does not
927  * require strong exception safety. This method (or the
928  * move_pop_dispatch_basic() method) must be used in place of the
929  * pop_dispatch() method if the contained type has a move assignment
930  * operator but no copy assignment operator (such as a std::unique_ptr
931  * object).
932  * @param obj A value type reference to which the item at the front of
933  * the queue will be move assigned. This method might throw if the
934  * move assignment operator of the queue item might throw, or if it
935  * has no move assignment operator and its copy assignment operator
936  * throws. In order to complete pop operations atomically under a
937  * single lock and to retain strong exception safety, the object into
938  * which the popped data is to be placed is passed as an argument by
939  * reference (this avoids a move from a temporary object after the
940  * data has been extracted from the queue, which would occur if the
941  * item extracted were returned by value). It might also throw if the
942  * destructor of the queue item might throw (but that should never
943  * happen), or if the empty() method of the container type throws
944  * (which would not happen on any sane implementation).
945  * @note This method calls Thread::Cond::wait(). Between versions
946  * 2.2.3 and 2.2.13 inclusive, Thread::Cond::wait() was marked
947  * 'noexcept'. This was a mistake because it prevented a thread being
948  * cancelled while in a wait, including in this method (the
949  * cancellation pseudo-exception conflicted with the noexcept
950  * specifier). This was fixed in version 2.2.14.
951  *
952  * Since 2.0.11
953  */
955  Thread::Mutex::Lock lock{mutex};
956  while (q.empty()) cond.wait(mutex);
958  obj = std::move(q.front());
959  q.pop();
960  }
961 
962 /**
963  * Pops an item from the queue using the contained type's move
964  * assignment operator, if it has one, or if not using its copy
965  * assignment operator. This method is only available where the
966  * queue's container is a std::list object (which is the default), and
967  * does the same as the move_pop_dispatch() method except that when
968  * popping the only thing which is done to the queue's container's
969  * internal state within the queue object's mutex is to swap some
970  * pointers: in particular, deallocation of the list node at the front
971  * of the queue occurs outside that mutex, as does assignment to this
972  * method's argument. Given that, if the queue's container is a list,
973  * any new node is also constructed outside the mutex when pushing or
974  * emplacing an item onto the queue, this minimizes contention between
975  * threads: it gets as close to lock-free performance as it is
976  * possible to get with the standard containers.
977  *
978  * However this minimizing of contention comes at a cost, which is
979  * that if the contained item's move assignment operator (or if it has
980  * none, its copy assignment operator) throws, then only the basic
981  * exception guarantee is offered (hence the name of this method).
982  * This means that although the AsyncQueueDispatch object would be
983  * left in a valid state in the event of such throwing, the item at
984  * the front of the queue would be lost. As in the case of the
985  * pop_dispatch() and move_pop_dispatch() methods, in addition only
986  * the basic exception guarantee is offered if the destructor of the
987  * contained item throws. Only use this method if the queue's
988  * container is a std::list object, and if either it is known that the
989  * contained item's move assignment operator (or if it has none, its
990  * copy assignment operator) does not throw, or the use case does not
991  * require strong exception safety. This method is thread safe.
992  *
993  * If the queue is empty, it will block until an item
994  * becomes available. If it blocks, the wait comprises a cancellation
995  * point. This method is cancellation safe if the stack unwinds on
996  * cancellation, as cancellation is blocked while the queue is being
997  * operated on after coming out of a wait.
998  *
999  * If this method is called for an AsyncQueueDispatch object whose
1000  * container is not a std::list object, it will hand off to the
1001  * move_pop_dispatch() method, which is separately documented.
1002  * @param obj A value type reference to which the item at the front of
1003  * the queue will be move assigned. This method might throw if the
1004  * move assignment operator of the queue item might throw or it has no
1005  * move assignment operator and its copy assignment operator throws
1006  * (in which case only the basic exception guarantee is offered). It
1007  * might also throw if the destructor of the queue item might throw
1008  * (but that should never happen), if the empty() method of the
1009  * container type throws (which would not happen on any sane
1010  * implementation) or if the constructor of the implementation's list
1011  * allocator throws (which would be highly unusual). In the event of
1012  * any of the last two throwing, the strong exception guarantee is
1013  * offered.
1014  *
1015  * @note This method calls Thread::Cond::wait(). Between versions
1016  * 2.2.3 and 2.2.13 inclusive, Thread::Cond::wait() was marked
1017  * 'noexcept'. This was a mistake because it prevented a thread being
1018  * cancelled while in a wait, including in this method (the
1019  * cancellation pseudo-exception conflicted with the noexcept
1020  * specifier). This was fixed in version 2.2.14.
1021  *
1022  * Since 2.0.26 and 2.2.9
1023  */
1024 // See the specialisation for lists for the implementation of this
1025 // method. For queues which do not use a list as their container,
1026 // this hands off to move_pop_dispatch().
1028  move_pop_dispatch(obj);
1029  }
1030 
1031 /**
1032  * Pops an item from the queue using the contained type's copy
1033  * assignment operator. If the queue is empty, it will block until an
1034  * item becomes available or until the timeout expires. If it blocks,
1035  * the wait comprises a cancellation point. This method is
1036  * cancellation safe if the stack unwinds on cancellation, as
1037  * cancellation is blocked while the queue is being operated on after
1038  * coming out of a wait. This method has strong exception safety if
1039  * the container is a std::deque or std::list container (the default
1040  * is std::list), provided the destructor of a contained item does not
1041  * throw. It is thread safe.
1042  * @param obj A value type reference to which the item at the front of
1043  * the queue will be assigned. This method might throw if the copy
1044  * assignment operator of the queue item might throw. In order to
1045  * complete pop operations atomically under a single lock and to
1046  * retain strong exception safety, the object into which the popped
1047  * data is to be placed is passed as an argument by reference (this
1048  * avoids a copy from a temporary object after the data has been
1049  * extracted from the queue, which would occur if the item extracted
1050  * were returned by value). It might also throw if the destructor of
1051  * the queue item might throw (but that should never happen), or if
1052  * the empty() method of the container type throws (which would not
1053  * happen on any sane implementation).
1054  * @param millisec The timeout interval, in milliseconds.
1055  * @return If the timeout expires without an item becoming available,
1056  * the method will return true. If an item from the queue is
1057  * extracted, it returns false.
1058  * @note This method calls Thread::Cond::timed_wait(). Between
1059  * versions 2.2.3 and 2.2.13 inclusive, Thread::Cond::timed_wait() was
1060  * marked 'noexcept'. This was a mistake because it prevented a
1061  * thread being cancelled while in a wait, including in this method
1062  * (the cancellation pseudo-exception conflicted with the noexcept
1063  * specifier). This was fixed in version 2.2.14.
1064  */
1065  bool pop_timed_dispatch(value_type& obj, unsigned int millisec) {
1066  timespec ts;
1067  Thread::Cond::get_abs_time(ts, millisec);
1068  Thread::Mutex::Lock lock{mutex};
1069  while (q.empty()) {
1070  if (cond.timed_wait(mutex, ts)) return true;
1071  }
1073  obj = q.front();
1074  q.pop();
1075  return false;
1076  }
1077 
1078 /**
1079  * Pops an item from the queue using the contained type's move
1080  * assignment operator, if it has one (this method is identical to the
1081  * pop_timed_dispatch() method if that type has no move assignment
1082  * operator). If the queue is empty, it will block until an item
1083  * becomes available or until the timeout expires. If it blocks, the
1084  * wait comprises a cancellation point. This method is cancellation
1085  * safe if the stack unwinds on cancellation, as cancellation is
1086  * blocked while the queue is being operated on after coming out of a
1087  * wait. This method has strong exception safety if the container is
1088  * a std::deque or std::list container (the default is std::list),
1089  * provided the destructor of a contained item does not throw and the
1090  * move assignment operator of a contained item has strong exception
1091  * safety. It is thread safe. Use this method in preference to the
1092  * pop_timed_dispatch() method if it is known that the contained
1093  * items' move assignment operator does not throw or is strongly
1094  * exception safe, or if the use case does not require strong
1095  * exception safety. This method (or the
1096  * move_pop_timed_dispatch_basic() method) must be used in place of
1097  * the pop_timed_dispatch() method if the contained type has a move
1098  * assignment operator but no copy assignment operator (such as a
1099  * std::unique_ptr object).
1100  * @param obj A value type reference to which the item at the front of
1101  * the queue will be move assigned. This method might throw if the
1102  * move assignment operator of the queue item might throw, or if it
1103  * has no move assignment operator and its copy assignment operator
1104  * throws. In order to complete pop operations atomically under a
1105  * single lock and to retain strong exception safety, the object into
1106  * which the popped data is to be placed is passed as an argument by
1107  * reference (this avoids a move from a temporary object after the
1108  * data has been extracted from the queue, which would occur if the
1109  * item extracted were returned by value). It might also throw if the
1110  * destructor of the queue item might throw (but that should never
1111  * happen), or if the empty() method of the container type throws
1112  * (which would not happen on any sane implementation).
1113  * @param millisec The timeout interval, in milliseconds.
1114  * @return If the timeout expires without an item becoming available,
1115  * the method will return true. If an item from the queue is
1116  * extracted, it returns false.
1117  * @note This method calls Thread::Cond::timed_wait(). Between
1118  * versions 2.2.3 and 2.2.13 inclusive, Thread::Cond::timed_wait() was
1119  * marked 'noexcept'. This was a mistake because it prevented a
1120  * thread being cancelled while in a wait, including in this method
1121  * (the cancellation pseudo-exception conflicted with the noexcept
1122  * specifier). This was fixed in version 2.2.14.
1123  *
1124  * Since 2.0.11
1125  */
1126  bool move_pop_timed_dispatch(value_type& obj, unsigned int millisec) {
1127  timespec ts;
1128  Thread::Cond::get_abs_time(ts, millisec);
1129  Thread::Mutex::Lock lock{mutex};
1130  while (q.empty()) {
1131  if (cond.timed_wait(mutex, ts)) return true;
1132  }
1134  obj = std::move(q.front());
1135  q.pop();
1136  return false;
1137  }
1138 
1139 /**
1140  * Pops an item from the queue using the contained type's move
1141  * assignment operator, if it has one, or if not using its copy
1142  * assignment operator. This method is only available where the
1143  * queue's container is a std::list object (which is the default), and
1144  * does the same as the move_pop_timed_dispatch() method except that
1145  * when popping the only thing which is done to the queue's
1146  * container's internal state within the queue object's mutex is to
1147  * swap some pointers: in particular, deallocation of the list node at
1148  * the front of the queue occurs outside that mutex, as does
1149  * assignment to this method's argument. Given that, if the queue's
1150  * container is a list, any new node is also constructed outside the
1151  * mutex when pushing or emplacing an item onto the queue, this
1152  * minimizes contention between threads: it gets as close to lock-free
1153  * performance as it is possible to get with the standard containers.
1154  *
1155  * However this minimizing of contention comes at a cost, which is
1156  * that if the contained item's move assignment operator (or if it has
1157  * none, its copy assignment operator) throws, then only the basic
1158  * exception guarantee is offered (hence the name of this method).
1159  * This means that although the AsyncQueueDispatch object would be
1160  * left in a valid state in the event of such throwing, the item at
1161  * the front of the queue would be lost. As in the case of the
1162  * pop_timed_dispatch() and move_pop_timed_dispatch() methods, in
1163  * addition only the basic exception guarantee is offered if the
1164  * destructor of the contained item throws. Only use this method if
1165  * the queue's container is a std::list object, and if either it is
1166  * known that the contained item's move assignment operator (or if it
1167  * has none, its copy assignment operator) does not throw, or the use
1168  * case does not require strong exception safety. This method is
1169  * thread safe.
1170  *
1171  * If the queue is empty, it will block until an item becomes
1172  * available or until the timeout expires. If it blocks, the wait
1173  * comprises a cancellation point. This method is cancellation safe
1174  * if the stack unwinds on cancellation, as cancellation is blocked
1175  * while the queue is being operated on after coming out of a wait.
1176  *
1177  * If this method is called for an AsyncQueueDispatch object whose
1178  * container is not a std::list object, it will hand off to the
1179  * move_pop_timed_dispatch() method, which is separately documented.
1180  * @param obj A value type reference to which the item at the front of
1181  * the queue will be move assigned. This method might throw if the
1182  * move assignment operator of the queue item might throw or it has no
1183  * move assignment operator and its copy assignment operator throws
1184  * (in which case only the basic exception guarantee is offered). It
1185  * might also throw if the destructor of the queue item might throw
1186  * (but that should never happen), if the empty() method of the
1187  * container type throws (which would not happen on any sane
1188  * implementation) or if the constructor of the implementation's list
1189  * allocator throws (which would be highly unusual). In the event of
1190  * any of the last two throwing, the strong exception guarantee is
1191  * offered.
1192  * @param millisec The timeout interval, in milliseconds.
1193  * @return If the timeout expires without an item becoming available,
1194  * the method will return true. If an item from the queue is
1195  * extracted, it returns false.
1196  * @note This method calls Thread::Cond::timed_wait(). Between
1197  * versions 2.2.3 and 2.2.13 inclusive, Thread::Cond::timed_wait() was
1198  * marked 'noexcept'. This was a mistake because it prevented a
1199  * thread being cancelled while in a wait, including in this method
1200  * (the cancellation pseudo-exception conflicted with the noexcept
1201  * specifier). This was fixed in version 2.2.14.
1202  *
1203  * Since 2.0.26 and 2.2.9
1204  */
1205 // See the specialisation for lists for the implementation of this
1206 // method. For queues which do not use a list as their container,
1207 // this hands off to move_pop_timed_dispatch().
1208  bool move_pop_timed_dispatch_basic(value_type& obj, unsigned int millisec) {
1209  return move_pop_timed_dispatch(obj, millisec);
1210  }
1211 
1212 /**
1213  * Discards the item at the front of the queue. This method has
1214  * strong exception safety if the container is a std::deque or
1215  * std::list container (the default is std::list), provided the
1216  * destructor of a contained item does not throw. It is thread safe.
1217  * @exception AsyncQueuePopError If the queue is empty when a pop is
1218  * attempted, this method will throw AsyncQueuePopError. It might
1219  * also throw if the destructor of the queue item might throw (but
1220  * that should never happen), or if the empty() method of the
1221  * container type throws (which would not happen on any sane
1222  * implementation).
1223  */
1224  void pop() {
1225  Thread::Mutex::Lock lock{mutex};
1226  if (q.empty()) throw AsyncQueuePopError();
1227  q.pop();
1228  }
1229 
1230 /**
1231  * @return Whether the queue is empty. It will not throw assuming
1232  * that the empty() method of the container type does not throw, as it
1233  * will not on any sane implementation.
1234  * @note This method is thread safe, but the return value may not be
1235  * valid if another thread has pushed to or popped from the queue
1236  * before the value returned by the method is acted on. It is
1237  * provided as a utility, but may not be meaningful, depending on the
1238  * intended usage.
1239  */
1240  bool empty() const {
1241  Thread::Mutex::Lock lock{mutex};
1242  return q.empty();
1243  }
1244 
1245 /**
1246  * @return The number of items currently in the queue. It will not
1247  * throw assuming that the size() method of the container type does
1248  * not throw, as it will not on any sane implementation.
1249  * @note This method is thread safe, but the return value may not be
1250  * valid if another thread has pushed to or popped from the queue
1251  * before the value returned by the method is acted on. It is
1252  * provided as a utility, but may not be meaningful, depending on the
1253  * intended usage.
1254  *
1255  * Since 2.0.8
1256  */
1257  size_type size() const {
1258  Thread::Mutex::Lock lock{mutex};
1259  return q.size();
1260  }
1261 
1262 /**
1263  * Swaps the contents of 'this' and 'other'. It will not throw
1264  * assuming that the swap method of the container type does not throw
1265  * (which the C++11/14 standard requires not to happen with the
1266  * standard sequence containers). It is thread safe and the swap is
1267  * thread-wise atomic. A non-class function
1268  * Cgu::swap(Cgu::AsyncQueueDispatch&, Cgu::AsyncQueueDispatch&)
1269  * method is also provided which will call this method.
1270  * @param other The object to be swapped with this one.
1271  * @note An object swapped does not, by virtue of the swap, inherit
1272  * any threads waiting on the other one. However if threads were
1273  * waiting on a swapped object prior to the swap, and it acquires
1274  * items by virtue of the swap, the waiting threads will unblock and
1275  * extract those items.
1276  *
1277  * Since 2.0.8
1278  */
1279  void swap(AsyncQueueDispatch& other) {
1280  if (this != &other) {
1281  lock2(mutex, other.mutex); // doesn't throw
1283  Thread::Mutex::Lock l2{other.mutex, Thread::locked};
1284  q.swap(other.q);
1285  if (!q.empty()) cond.broadcast();
1286  if (!other.q.empty()) other.cond.broadcast();
1287  }
1288  }
1289 
1290 /**
1291  * The copy assignment operator is strongly exception safe with the
1292  * standard sequence containers (it uses copy and swap). It is also
1293  * thread safe, as it safely locks both the assignor's and assignee's
1294  * mutex to provide a thread-wise atomic assignment.
1295  * @param rhs The assignor.
1296  * @return The AsyncQueueDispatch object after assignment.
1297  * @exception std::bad_alloc The copy constructor of the queue's
1298  * container type, and so this assignment operator, might throw
1299  * std::bad_alloc if memory is exhausted and the system throws in that
1300  * case. This assignment operator will also throw if the copy
1301  * constructor of the queue's container type throws any other
1302  * exceptions, including if any copy or move constructor or copy or
1303  * move assignment operator of a contained item throws.
1304  * @exception Thread::MutexError This assignment operator might
1305  * throw Thread::MutexError if initialization of a transitional
1306  * object's contained mutex fails. (It is often not worth checking
1307  * for this, as it means either memory is exhausted or pthread has run
1308  * out of other resources to create new mutexes.)
1309  * @exception Thread::CondError This assignment operator might throw
1310  * Thread::CondError if initialisation of a transitional object's
1311  * contained condition variable fails. (It is often not worth
1312  * checking for this, as it means either memory is exhausted or
1313  * pthread has run out of other resources to create new condition
1314  * variables.)
1315  * @note The assignee does not, by virtue of the assignment, inherit
1316  * any threads waiting on the assignor. However, if prior to the
1317  * assignment threads were waiting on the assignee and the assignee
1318  * acquires items from the assignor as a result of the assignment, the
1319  * waiting threads will unblock and extract those items.
1320  *
1321  * Since 2.0.8
1322  */
1324  if (this != &rhs) {
1325  lock2(mutex, rhs.mutex); // doesn't throw
1327  Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
1328  std::queue<T, Container> temp{rhs.q};
1329  q.swap(temp);
1330  if (!q.empty()) cond.broadcast();
1331  }
1332  return *this;
1333  }
1334 
1335 /**
1336  * This move assignment operator is thread safe as regards the
1337  * assignee (the object moved to), but no synchronization is carried
1338  * out with respect to the rvalue assignor/movant. This is because
1339  * temporaries are only visible and accessible in the thread carrying
1340  * out the move operation and synchronization for them would represent
1341  * pointless overhead. In a case where the user uses std::move to
1342  * force a move from a named object, and that named object's lifetime
1343  * is managed by (or the object is otherwise accessed by) a different
1344  * thread than the one making the move, the user must carry out her
1345  * own synchronization with respect to that different thread, both to
1346  * ensure that a consistent view of the the named object is obtained
1347  * and because that object will be mutated by the move. This method
1348  * invokes std::queue's move assignment operator, and therefore has
1349  * the same exception safety as the standard library's implementation
1350  * of that operator. It will not normally throw unless a custom
1351  * allocator is used which throws on move assignment, or the
1352  * destructor of a contained item throws.
1353  * @param rhs The assignor/movant.
1354  * @return The AsyncQueueDispatch object after move assignment.
1355  * @note The assignee does not, by virtue of the move, inherit any
1356  * threads waiting on the assignor/movant. However, if prior to the
1357  * move threads were waiting on the assignee and the assignee acquires
1358  * items from the assignor/movant as a result of the move, from
1359  * version 2.0.9 the waiting threads will unblock and extract those
1360  * items (such unblocking on move assignment did not happen with
1361  * version 2.0.8, which was a bug).
1362  *
1363  * Since 2.0.8
1364  */
1366  Thread::Mutex::Lock lock{mutex};
1367  q = std::move(rhs.q);
1368  if (!q.empty()) cond.broadcast();
1369  return *this;
1370  }
1371 
1372 /**
1373  * @exception std::bad_alloc The default constructor might throw this
1374  * exception if memory is exhausted and the system throws in that
1375  * case.
1376  * @exception Thread::MutexError The default constructor might throw
1377  * this exception if initialisation of the contained mutex fails. (It
1378  * is often not worth checking for this, as it means either memory is
1379  * exhausted or pthread has run out of other resources to create new
1380  * mutexes.)
1381  * @exception Thread::CondError The default constructor might throw
1382  * this exception if initialisation of the contained condition
1383  * variable fails. (It is often not worth checking for this, as it
1384  * means either memory is exhausted or pthread has run out of other
1385  * resources to create new condition variables.)
1386  */
1387  AsyncQueueDispatch() = default;
1388 
1389 /**
1390  * As regards thread safety, the move constructor does not synchronize
1391  * with respect to the initializing rvalue. This is because
1392  * temporaries are only visible and accessible in the thread carrying
1393  * out the move operation and synchronization for them would represent
1394  * pointless overhead. In a case where a user uses std::move to force
1395  * a move from a named object, and that named object's lifetime is
1396  * managed by (or the object is otherwise accessed by) a different
1397  * thread than the one making the move, the user must carry out her
1398  * own synchronization with respect to that different thread, both to
1399  * ensure that a consistent view of the the named object is obtained
1400  * and because that object will be mutated by the move.
1401  * @param rhs The AsyncQueueDispatch object to be moved.
1402  * @exception Thread::MutexError The move constructor might throw
1403  * Thread::MutexError if initialization of the contained mutex fails.
1404  * If it does so this move constructor is strongly exception safe (if
1405  * it is thrown, the object passed as an argument will be unchanged).
1406  * (It is often not worth checking for this, as it means either memory
1407  * is exhausted or pthread has run out of other resources to create
1408  * new mutexes.)
1409  * @exception Thread::CondError The move constructor might throw
1410  * Thread::CondError exception if initialisation of the contained
1411  * condition variable fails. If it does so this move constructor is
1412  * strongly exception safe (if it is thrown, the object passed as an
1413  * argument will be unchanged). (It is often not worth checking for
1414  * this, as it means either memory is exhausted or pthread has run out
1415  * of other resources to create new condition variables.)
1416  * @note The move constructor might also throw if the queue's
1417  * container type's move constructor might throw, but it should not do
1418  * that unless a custom allocator is in use.
1419  *
1420  * Since 2.0.8
1421  */
1422  AsyncQueueDispatch(AsyncQueueDispatch&& rhs): q(std::move(rhs.q)) {}
1423 
1424 /**
1425  * The copy constructor is thread safe, as it locks the initializing
1426  * object's mutex to obtain a consistent view of it.
1427  * @param rhs The AsyncQueueDispatch object to be copied.
1428  * @exception std::bad_alloc The copy constructor of the queue's
1429  * container type, and so this constructor, might throw std::bad_alloc
1430  * if memory is exhausted and the system throws in that case. It will
1431  * also throw if the copy constructor of the queue's container type
1432  * throws any other exceptions, including if any copy or move
1433  * constructor or copy or move assignment operator of a contained item
1434  * throws.
1435  * @exception Thread::MutexError The copy constructor might throw
1436  * Thread::MutexError if initialization of the contained mutex fails.
1437  * (It is often not worth checking for this, as it means either memory
1438  * is exhausted or pthread has run out of other resources to create
1439  * new mutexes.)
1440  * @exception Thread::CondError The copy constructor might throw this
1441  * exception if initialisation of the contained condition variable
1442  * fails. (It is often not worth checking for this, as it means
1443  * either memory is exhausted or pthread has run out of other
1444  * resources to create new condition variables.)
1445  *
1446  * Since 2.0.8
1447  */
1448  // we use the comma operator here to lock the mutex and call the
1449  // copy constructor: the lock will be retained until the end of the
1450  // full expression in which it is lexically situated, namely until
1451  // the end of q's constructor - see C++11 1.9/10 and 12.2/3
1453  q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1454 
1455 /**
1456  * The destructor does not throw unless the destructor of a contained
1457  * item throws. It is thread safe (any thread may delete the
1458  * AsyncQueueDispatch object). Destroying an AsyncQueueDispatch
1459  * object on which another thread is currently blocked results in
1460  * undefined behavior.
1461  */
1463  // lock and unlock the mutex in the destructor so that we have an
1464  // acquire operation to ensure that when the std::queue object is
1465  // destroyed memory is synchronised, so any thread may destroy the
1466  // AsyncQueueDispatch object
1467  Thread::Mutex::Lock lock{mutex};
1468  }
1469 
1470 /* Only has effect if --with-glib-memory-slices-compat or
1471  * --with-glib-memory-slices-no-compat option picked */
1473 };
1474 
1475 /**
1476  * Swaps the contents of two AsyncQueue objects. It will not throw
1477  * assuming that the swap method of the container type does not throw
1478  * (which the C++11/14 standard requires not to happen with the
1479  * standard sequence containers). It is thread safe and the swap is
1480  * thread-wise atomic.
1481  * @param q1 An object to be swapped with the other.
1482  * @param q2 An object to be swapped with the other.
1483  * @note Calling std::swap on AsyncQueue objects is thread safe but
1484  * does not provide a thread-wise atomic swap (the swapped objects may
1485  * not be mirror images if during the execution of std::swap's default
1486  * algorithm one of them has been modified), although in many cases
1487  * that doesn't matter. If swap() is called without a namespace
1488  * qualifier, argument dependent look-up will pick this one correctly.
1489  *
1490  * Since 2.0.8
1491  */
1492 template <class T, class Container>
1495  q1.swap(q2);
1496 }
1497 
1498 /**
1499  * Swaps the contents of two AsyncQueueDispatch objects. It will not
1500  * throw assuming that the swap method of the container type does not
1501  * throw (which the C++11/14 standard requires not to happen with the
1502  * standard sequence containers). It is thread safe and the swap is
1503  * thread-wise atomic.
1504  * @param q1 An object to be swapped with the other.
1505  * @param q2 An object to be swapped with the other.
1506  * @note 1. An object swapped does not, by virtue of the swap, inherit
1507  * any threads waiting on the other one. However if threads were
1508  * waiting on a swapped object prior to the swap, and it acquires
1509  * items by virtue of the swap, the waiting threads will unblock and
1510  * extract those items.
1511  * @note 2. Calling std::swap on AsyncQueueDispatch objects is thread
1512  * safe but does not provide a thread-wise atomic swap (the swapped
1513  * objects may not be mirror images if during the execution of
1514  * std::swap's default algorithm one of them has been modified),
1515  * although in many cases that doesn't matter. If swap() is called
1516  * without a namespace qualifier, argument dependent look-up will pick
1517  * this one correctly.
1518  *
1519  * Since 2.0.8
1520  */
1521 template <class T, class Container>
1524  q1.swap(q2);
1525 }
1526 
1527 #if defined(CGU_USE_INHERITABLE_QUEUE) && !defined(DOXYGEN_PARSING)
1528 
1529 /* This is a specialization of AsyncQueue for std::list objects, which
1530  uses std::list::splice() to push or emplace a new item on the
1531  queue. This means that allocation for the push or emplacement
1532  occurs outside the mutex, so reducing contention (a tip from a talk
1533  by Sean Parent of Adobe). This is first available in version
1534  2.0.20/2.2.3.
1535  */
1536 template <class T, class Allocator>
1537 class AsyncQueue<T, std::list<T, Allocator> > {
1538 public:
1539  typedef std::list<T, Allocator> Container;
1540  typedef typename Container::value_type value_type;
1541  typedef typename Container::size_type size_type;
1542  typedef Container container_type;
1543 private:
1544  mutable Thread::Mutex mutex;
1545  // 23.6.3.1 of C++11 requires std::queue to have a protected
1546  // container member called 'c' for the purposes of derivation. This
1547  // specialisation will have the same binary layout as the
1548  // unspecialized version on any practical implementation: all we do
1549  // is add splice_end() and unsplice_beginning() members
1550  class Q: public std::queue<T, Container> {
1551  public:
1552  void splice_end(Container&& lst) {
1553  this->c.splice(this->c.end(), std::move(lst));
1554  }
1555  void unsplice_beginning(Container& lst) {
1556  lst.splice(lst.begin(), this->c, this->c.begin());
1557  }
1558  } q;
1559 
1560 // TODO: at the next ABI break make this method explicitly static
1561  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1562  m1.lock();
1563  for(;;) {
1564  if (!m2.trylock()) {
1565  return;
1566  }
1567  m1.unlock();
1568  // spin nicely
1569 #ifdef CGU_USE_SCHED_YIELD
1570  sched_yield();
1571 #else
1572  usleep(10);
1573 #endif
1574  m1.lock();
1575  }
1576  }
1577 public:
1578  void push(const value_type& obj) {
1579  Container temp{obj};
1580  Thread::Mutex::Lock lock{mutex};
1581  // splice_end doesn't throw
1582  q.splice_end(std::move(temp));
1583  }
1584 
1585  void push(value_type&& obj) {
1586  // although move intialisation of a std::list object via an
1587  // initializer list with a single element is almost certain to be
1588  // strongly exception safe, it is not mandated by the standard so
1589  // use push_back() (which is required to be strongly exception
1590  // safe)
1591  Container temp;
1592  temp.push_back(std::move(obj));
1593  Thread::Mutex::Lock lock{mutex};
1594  // splice_end doesn't throw
1595  q.splice_end(std::move(temp));
1596  }
1597 
1598  template<class... Args>
1599  void emplace(Args&&... args) {
1600  Container temp;
1601  temp.emplace_back(std::forward<Args>(args)...);
1602  Thread::Mutex::Lock lock{mutex};
1603  // splice_end doesn't throw
1604  q.splice_end(std::move(temp));
1605  }
1606 
1607  void pop(value_type& obj) {
1608  Thread::Mutex::Lock lock{mutex};
1609  if (q.empty()) throw AsyncQueuePopError();
1610  obj = q.front();
1611  q.pop();
1612  }
1613 
1614  void move_pop(value_type& obj) {
1615  Thread::Mutex::Lock lock{mutex};
1616  if (q.empty()) throw AsyncQueuePopError();
1617  obj = std::move(q.front());
1618  q.pop();
1619  }
1620 
1621  void move_pop_basic(value_type& obj) {
1622  // the standard does not require it, but in practice constructing
1623  // an empty list will not throw unless constructing its allocator
1624  // throws
1625  Container temp;
1626  {
1627  Thread::Mutex::Lock lock{mutex};
1628  if (q.empty()) throw AsyncQueuePopError();
1629  // unsplice_beginning doesn't throw
1630  q.unsplice_beginning(temp);
1631  }
1632  obj = std::move(temp.front());
1633  }
1634 
1635  void pop() {
1636  Thread::Mutex::Lock lock{mutex};
1637  if (q.empty()) throw AsyncQueuePopError();
1638  q.pop();
1639  }
1640 
1641  bool empty() const {
1642  Thread::Mutex::Lock lock{mutex};
1643  return q.empty();
1644  }
1645 
1646  size_type size() const {
1647  Thread::Mutex::Lock lock{mutex};
1648  return q.size();
1649  }
1650 
1651  void swap(AsyncQueue& other) {
1652  if (this != &other) {
1653  lock2(mutex, other.mutex); // doesn't throw
1654  Thread::Mutex::Lock l1{mutex, Thread::locked};
1655  Thread::Mutex::Lock l2{other.mutex, Thread::locked};
1656  q.swap(other.q);
1657  }
1658  }
1659 
1660  AsyncQueue& operator=(const AsyncQueue& rhs) {
1661  if (this != &rhs) {
1662  lock2(mutex, rhs.mutex); // doesn't throw
1663  Thread::Mutex::Lock l1{mutex, Thread::locked};
1664  Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
1665  Q temp{rhs.q};
1666  q.swap(temp);
1667  }
1668  return *this;
1669  }
1670 
1671  AsyncQueue& operator=(AsyncQueue&& rhs) {
1672  Thread::Mutex::Lock lock{mutex};
1673  q = std::move(rhs.q);
1674  return *this;
1675  }
1676 
1677  AsyncQueue() = default;
1678 
1679  AsyncQueue(AsyncQueue&& rhs): q(std::move(rhs.q)) {}
1680 
1681  AsyncQueue(const AsyncQueue& rhs): q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1682 
1683  ~AsyncQueue() {
1684  Thread::Mutex::Lock lock{mutex};
1685  }
1686 
1688 };
1689 
1690 /* This is a specialization of AsyncQueueDispatch for std::list
1691  objects, which uses std::list::splice() to push or emplace a new
1692  item on the queue. This means that allocation for the push or
1693  emplacement occurs outside the mutex, so reducing contention (a tip
1694  from a talk by Sean Parent of Adobe). This is first available in
1695  version 2.0.20/2.2.3.
1696  */
1697 template <class T, class Allocator>
1698 class AsyncQueueDispatch<T, std::list<T, Allocator> > {
1699 public:
1700  typedef std::list<T, Allocator> Container;
1701  typedef typename Container::value_type value_type;
1702  typedef typename Container::size_type size_type;
1703  typedef Container container_type;
1704 private:
1705  mutable Thread::Mutex mutex;
1706  Thread::Cond cond;
1707  // 23.6.3.1 of C++11 requires std::queue to have a protected
1708  // container member called 'c' for the purposes of derivation. This
1709  // specialisation will have the same binary layout as the
1710  // unspecialized version on any practical implementation: all we do
1711  // is add splice_end() and unsplice_beginning() members
1712  class Q: public std::queue<T, Container> {
1713  public:
1714  void splice_end(Container&& lst) {
1715  this->c.splice(this->c.end(), std::move(lst));
1716  }
1717  void unsplice_beginning(Container& lst) {
1718  lst.splice(lst.begin(), this->c, this->c.begin());
1719  }
1720  } q;
1721 
1722 // TODO: at the next ABI break make this method explicitly static
1723  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1724  m1.lock();
1725  for(;;) {
1726  if (!m2.trylock()) {
1727  return;
1728  }
1729  m1.unlock();
1730  // spin nicely
1731 #ifdef CGU_USE_SCHED_YIELD
1732  sched_yield();
1733 #else
1734  usleep(10);
1735 #endif
1736  m1.lock();
1737  }
1738  }
1739 public:
1740  void push(const value_type& obj) {
1741  Container temp{obj};
1742  Thread::Mutex::Lock lock{mutex};
1743  // splice_end doesn't throw
1744  q.splice_end(std::move(temp));
1745  cond.signal();
1746  }
1747 
1748  void push(value_type&& obj) {
1749  // although move intialisation of a std::list object via an
1750  // initializer list with a single element is almost certain to be
1751  // strongly exception safe, it is not mandated by the standard so
1752  // use push_back() (which is required to be strongly exception
1753  // safe)
1754  Container temp;
1755  temp.push_back(std::move(obj));
1756  Thread::Mutex::Lock lock{mutex};
1757  // splice_end doesn't throw
1758  q.splice_end(std::move(temp));
1759  cond.signal();
1760  }
1761 
1762  template<class... Args>
1763  void emplace(Args&&... args) {
1764  Container temp;
1765  temp.emplace_back(std::forward<Args>(args)...);
1766  Thread::Mutex::Lock lock{mutex};
1767  // splice_end doesn't throw
1768  q.splice_end(std::move(temp));
1769  cond.signal();
1770  }
1771 
1772  void pop(value_type& obj) {
1773  Thread::Mutex::Lock lock{mutex};
1774  if (q.empty()) throw AsyncQueuePopError();
1775  obj = q.front();
1776  q.pop();
1777  }
1778 
1779  void move_pop(value_type& obj) {
1780  Thread::Mutex::Lock lock{mutex};
1781  if (q.empty()) throw AsyncQueuePopError();
1782  obj = std::move(q.front());
1783  q.pop();
1784  }
1785 
1786  void move_pop_basic(value_type& obj) {
1787  // the standard does not require it, but in practice constructing
1788  // an empty list will not throw unless constructing its allocator
1789  // throws
1790  Container temp;
1791  {
1792  Thread::Mutex::Lock lock{mutex};
1793  if (q.empty()) throw AsyncQueuePopError();
1794  // unsplice_beginning doesn't throw
1795  q.unsplice_beginning(temp);
1796  }
1797  obj = std::move(temp.front());
1798  }
1799 
1800  void pop_dispatch(value_type& obj) {
1801  Thread::Mutex::Lock lock{mutex};
1802  while (q.empty()) cond.wait(mutex);
1803  Thread::CancelBlock b;
1804  obj = q.front();
1805  q.pop();
1806  }
1807 
1808  void move_pop_dispatch(value_type& obj) {
1809  Thread::Mutex::Lock lock{mutex};
1810  while (q.empty()) cond.wait(mutex);
1811  Thread::CancelBlock b;
1812  obj = std::move(q.front());
1813  q.pop();
1814  }
1815 
1816  void move_pop_dispatch_basic(value_type& obj) {
1817  int old_state;
1818  int ignore;
1819  bool cancelstate_restored = false;
1820  try {
1821  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1822  // the standard does not require it, but in practice
1823  // constructing an empty list will not throw unless constructing
1824  // its allocator throws
1825  Container temp;
1826  pthread_setcancelstate(old_state, &ignore);
1827  cancelstate_restored = true;
1828  Thread::Mutex::TrackLock lock{mutex};
1829  while (q.empty()) cond.wait(mutex);
1830  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1831  cancelstate_restored = false;
1832  // unsplice_beginning doesn't throw
1833  q.unsplice_beginning(temp);
1834  lock.unlock();
1835  obj = std::move(temp.front());
1836  pthread_setcancelstate(old_state, &ignore);
1837  }
1838  catch (...) {
1839  // in practice we could only enter here with
1840  // cancelstate_restored set as true if there has been a
1841  // cancellation pseudo-exception (but the code doesn't depend on
1842  // that). We could only enter here with a normal exception if
1843  // the construction of temp has thrown or if a queue element's
1844  // move/copy assignment operator has thrown (but again the code
1845  // doesn't depend on that).
1846  if (!cancelstate_restored) {
1847  pthread_setcancelstate(old_state, &ignore);
1848  }
1849  throw;
1850  }
1851  }
1852 
1853  bool pop_timed_dispatch(value_type& obj, unsigned int millisec) {
1854  timespec ts;
1855  Thread::Cond::get_abs_time(ts, millisec);
1856  Thread::Mutex::Lock lock{mutex};
1857  while (q.empty()) {
1858  if (cond.timed_wait(mutex, ts)) return true;
1859  }
1860  Thread::CancelBlock b;
1861  obj = q.front();
1862  q.pop();
1863  return false;
1864  }
1865 
1866  bool move_pop_timed_dispatch(value_type& obj, unsigned int millisec) {
1867  timespec ts;
1868  Thread::Cond::get_abs_time(ts, millisec);
1869  Thread::Mutex::Lock lock{mutex};
1870  while (q.empty()) {
1871  if (cond.timed_wait(mutex, ts)) return true;
1872  }
1873  Thread::CancelBlock b;
1874  obj = std::move(q.front());
1875  q.pop();
1876  return false;
1877  }
1878 
1879  bool move_pop_timed_dispatch_basic(value_type& obj, unsigned int millisec) {
1880  timespec ts;
1881  Thread::Cond::get_abs_time(ts, millisec);
1882  int old_state;
1883  int ignore;
1884  bool cancelstate_restored = false;
1885  try {
1886  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1887  // the standard does not require it, but in practice
1888  // constructing an empty list will not throw unless constructing
1889  // its allocator throws
1890  Container temp;
1891  pthread_setcancelstate(old_state, &ignore);
1892  cancelstate_restored = true;
1893  Thread::Mutex::TrackLock lock{mutex};
1894  while (q.empty()) {
1895  if (cond.timed_wait(mutex, ts)) return true;
1896  }
1897  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1898  cancelstate_restored = false;
1899  // unsplice_beginning doesn't throw
1900  q.unsplice_beginning(temp);
1901  lock.unlock();
1902  obj = std::move(temp.front());
1903  pthread_setcancelstate(old_state, &ignore);
1904  return false;
1905  }
1906  catch (...) {
1907  // in practice we could only enter here with
1908  // cancelstate_restored set as true if there has been a
1909  // cancellation pseudo-exception (but the code doesn't depend on
1910  // that). We could only enter here with a normal exception if
1911  // the construction of temp has thrown or if a queue element's
1912  // move/copy assignment operator has thrown (but again the code
1913  // doesn't depend on that).
1914  if (!cancelstate_restored) {
1915  pthread_setcancelstate(old_state, &ignore);
1916  }
1917  throw;
1918  }
1919  }
1920 
1921  void pop() {
1922  Thread::Mutex::Lock lock{mutex};
1923  if (q.empty()) throw AsyncQueuePopError();
1924  q.pop();
1925  }
1926 
1927  bool empty() const {
1928  Thread::Mutex::Lock lock{mutex};
1929  return q.empty();
1930  }
1931 
1932  size_type size() const {
1933  Thread::Mutex::Lock lock{mutex};
1934  return q.size();
1935  }
1936 
1937  void swap(AsyncQueueDispatch& other) {
1938  if (this != &other) {
1939  lock2(mutex, other.mutex); // doesn't throw
1940  Thread::Mutex::Lock l1{mutex, Thread::locked};
1941  Thread::Mutex::Lock l2{other.mutex, Thread::locked};
1942  q.swap(other.q);
1943  if (!q.empty()) cond.broadcast();
1944  if (!other.q.empty()) other.cond.broadcast();
1945  }
1946  }
1947 
1949  if (this != &rhs) {
1950  lock2(mutex, rhs.mutex); // doesn't throw
1951  Thread::Mutex::Lock l1{mutex, Thread::locked};
1952  Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
1953  Q temp{rhs.q};
1954  q.swap(temp);
1955  if (!q.empty()) cond.broadcast();
1956  }
1957  return *this;
1958  }
1959 
1961  Thread::Mutex::Lock lock{mutex};
1962  q = std::move(rhs.q);
1963  if (!q.empty()) cond.broadcast();
1964  return *this;
1965  }
1966 
1967  AsyncQueueDispatch() = default;
1968 
1969  AsyncQueueDispatch(AsyncQueueDispatch&& rhs): q(std::move(rhs.q)) {}
1970 
1972  q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1973 
1975  Thread::Mutex::Lock lock{mutex};
1976  }
1977 
1979 };
1980 
1981 #endif // CGU_USE_INHERITABLE_QUEUE
1982 
1983 } // namespace Cgu
1984 
1985 #endif
Cgu::Thread::Cond::get_abs_time
static void get_abs_time(timespec &ts, unsigned int millisec)
Cgu::Thread::Cond::timed_wait
int timed_wait(Mutex &mutex, const timespec &abs_time)
Definition: mutex.h:578
Cgu::AsyncQueue::operator=
AsyncQueue & operator=(const AsyncQueue &rhs)
Definition: async_queue.h:474
Cgu::AsyncQueue::container_type
Container container_type
Definition: async_queue.h:155
Cgu::AsyncQueueDispatch
A thread-safe asynchronous queue with a blocking pop() method.
Definition: async_queue.h:640
Cgu
Definition: application.h:44
Cgu::AsyncQueueDispatch::move_pop_timed_dispatch
bool move_pop_timed_dispatch(value_type &obj, unsigned int millisec)
Definition: async_queue.h:1126
Cgu::AsyncQueueDispatch::pop
void pop(value_type &obj)
Definition: async_queue.h:767
Cgu::Thread::Mutex::lock
int lock() noexcept
Definition: mutex.h:147
Cgu::AsyncQueueDispatch::pop
void pop()
Definition: async_queue.h:1224
Cgu::Thread::locked
@ locked
Definition: mutex.h:196
Cgu::AsyncQueueDispatch::emplace
void emplace(Args &&... args)
Definition: async_queue.h:740
Cgu::AsyncQueue::AsyncQueue
AsyncQueue(const AsyncQueue &rhs)
Definition: async_queue.h:576
Cgu::Thread::Mutex::unlock
int unlock() noexcept
Definition: mutex.h:170
Cgu::AsyncQueueDispatch::AsyncQueueDispatch
AsyncQueueDispatch(const AsyncQueueDispatch &rhs)
Definition: async_queue.h:1452
Cgu::Thread::Cond::broadcast
int broadcast() noexcept
Definition: mutex.h:483
Cgu::AsyncQueue::move_pop
void move_pop(value_type &obj)
Definition: async_queue.h:316
Cgu::AsyncQueue
A thread-safe asynchronous queue.
Definition: async_queue.h:151
Cgu::Thread::Cond
A wrapper class for pthread condition variables.
Definition: mutex.h:449
Cgu::AsyncQueuePopError
An exception thrown if calling pop() on a AsyncQueue or AsyncQueueDispatch object fails because the q...
Definition: async_queue.h:104
Cgu::AsyncQueueDispatch::push
void push(value_type &&obj)
Definition: async_queue.h:710
Cgu::AsyncQueueDispatch::size
size_type size() const
Definition: async_queue.h:1257
Cgu::AsyncQueueDispatch::move_pop_dispatch
void move_pop_dispatch(value_type &obj)
Definition: async_queue.h:954
Cgu::AsyncQueueDispatch::operator=
AsyncQueueDispatch & operator=(AsyncQueueDispatch &&rhs)
Definition: async_queue.h:1365
Cgu::AsyncQueue::AsyncQueue
AsyncQueue()=default
Cgu::AsyncQueue::push
void push(const value_type &obj)
Definition: async_queue.h:196
Cgu::Thread::Cond::wait
int wait(Mutex &mutex)
Definition: mutex.h:513
Cgu::AsyncQueue::size_type
Container::size_type size_type
Definition: async_queue.h:154
Cgu::AsyncQueueDispatch::~AsyncQueueDispatch
~AsyncQueueDispatch()
Definition: async_queue.h:1462
Cgu::swap
void swap(Cgu::AsyncQueue< T, Container > &q1, Cgu::AsyncQueue< T, Container > &q2)
Definition: async_queue.h:1493
Cgu::Thread::Mutex::trylock
int trylock() noexcept
Definition: mutex.h:157
Cgu::AsyncQueue::~AsyncQueue
~AsyncQueue()
Definition: async_queue.h:583
Cgu::AsyncQueueDispatch::value_type
Container::value_type value_type
Definition: async_queue.h:642
Cgu::AsyncQueueDispatch::operator=
AsyncQueueDispatch & operator=(const AsyncQueueDispatch &rhs)
Definition: async_queue.h:1323
Cgu::AsyncQueueDispatch::size_type
Container::size_type size_type
Definition: async_queue.h:643
Cgu::AsyncQueue::move_pop_basic
void move_pop_basic(value_type &obj)
Definition: async_queue.h:377
Cgu::AsyncQueue::pop
void pop(value_type &obj)
Definition: async_queue.h:274
Cgu::AsyncQueue::empty
bool empty() const
Definition: async_queue.h:409
Cgu::AsyncQueueDispatch::move_pop_timed_dispatch_basic
bool move_pop_timed_dispatch_basic(value_type &obj, unsigned int millisec)
Definition: async_queue.h:1208
Cgu::Thread::Cond::signal
int signal() noexcept
Definition: mutex.h:472
Cgu::AsyncQueueDispatch::swap
void swap(AsyncQueueDispatch &other)
Definition: async_queue.h:1279
Cgu::Thread::Mutex::Lock
A scoped locking class for exception safe Mutex locking.
Definition: mutex.h:207
Cgu::AsyncQueueDispatch::empty
bool empty() const
Definition: async_queue.h:1240
CGU_GLIB_MEMORY_SLICES_FUNCS
#define CGU_GLIB_MEMORY_SLICES_FUNCS
Definition: cgu_config.h:84
Cgu::AsyncQueue::operator=
AsyncQueue & operator=(AsyncQueue &&rhs)
Definition: async_queue.h:508
Cgu::AsyncQueue::emplace
void emplace(Args &&... args)
Definition: async_queue.h:248
Cgu::Thread::CancelBlock
A class enabling the cancellation state of a thread to be controlled.
Definition: thread.h:723
Cgu::AsyncQueue::AsyncQueue
AsyncQueue(AsyncQueue &&rhs)
Definition: async_queue.h:551
mutex.h
Provides wrapper classes for pthread mutexes and condition variables, and scoped locking classes for ...
Cgu::AsyncQueueDispatch::pop_dispatch
void pop_dispatch(value_type &obj)
Definition: async_queue.h:903
Cgu::AsyncQueue::value_type
Container::value_type value_type
Definition: async_queue.h:153
Cgu::AsyncQueueDispatch::pop_timed_dispatch
bool pop_timed_dispatch(value_type &obj, unsigned int millisec)
Definition: async_queue.h:1065
Cgu::AsyncQueueDispatch::push
void push(const value_type &obj)
Definition: async_queue.h:686
Cgu::AsyncQueueDispatch::container_type
Container container_type
Definition: async_queue.h:644
Cgu::AsyncQueueDispatch::move_pop
void move_pop(value_type &obj)
Definition: async_queue.h:809
Cgu::AsyncQueuePopError::what
virtual const char * what() const
Definition: async_queue.h:105
Cgu::AsyncQueueDispatch::AsyncQueueDispatch
AsyncQueueDispatch(AsyncQueueDispatch &&rhs)
Definition: async_queue.h:1422
Cgu::AsyncQueueDispatch::AsyncQueueDispatch
AsyncQueueDispatch()=default
Cgu::AsyncQueueDispatch::move_pop_basic
void move_pop_basic(value_type &obj)
Definition: async_queue.h:870
Cgu::AsyncQueueDispatch::move_pop_dispatch_basic
void move_pop_dispatch_basic(value_type &obj)
Definition: async_queue.h:1027
thread.h
Cgu::AsyncQueue::pop
void pop()
Definition: async_queue.h:393
Cgu::AsyncQueue::swap
void swap(AsyncQueue &other)
Definition: async_queue.h:443
Cgu::Thread::Mutex
A wrapper class for pthread mutexes.
Definition: mutex.h:117
Cgu::AsyncQueue::size
size_type size() const
Definition: async_queue.h:426
cgu_config.h
Cgu::AsyncQueue::push
void push(value_type &&obj)
Definition: async_queue.h:219