c++-gtk-utils
async_queue.h
Go to the documentation of this file.
1 /* Copyright (C) 2006 to 2013 Chris Vine
2 
3 The library comprised in this file or of which this file is part is
4 distributed by Chris Vine under the GNU Lesser General Public
5 License as follows:
6 
7  This library is free software; you can redistribute it and/or
8  modify it under the terms of the GNU Lesser General Public License
9  as published by the Free Software Foundation; either version 2.1 of
10  the License, or (at your option) any later version.
11 
12  This library is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License, version 2.1, for more details.
16 
17  You should have received a copy of the GNU Lesser General Public
18  License, version 2.1, along with this library (see the file LGPL.TXT
19  which came with this source code package in the c++-gtk-utils
20  sub-directory); if not, write to the Free Software Foundation, Inc.,
21  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 
23 However, it is not intended that the object code of a program whose
24 source code instantiates a template from this file or uses macros or
25 inline functions (of any length) should by reason only of that
26 instantiation or use be subject to the restrictions of use in the GNU
27 Lesser General Public License. With that in mind, the words "and
28 macros, inline functions and instantiations of templates (of any
29 length)" shall be treated as substituted for the words "and small
30 macros and small inline functions (ten lines or less in length)" in
31 the fourth paragraph of section 5 of that licence. This does not
32 affect any other reason why object code may be subject to the
33 restrictions in that licence (nor for the avoidance of doubt does it
34 affect the application of section 2 of that licence to modifications
35 of the source code in this file).
36 
37 */
38 
39 /**
40  * @file async_queue.h
41  * @brief This file provides thread-safe asynchronous queue classes.
42  *
43  * AsyncQueue is a class which provides some of the functionality of a
44  * std::queue object (but note that the AsyncQueue::pop(value_type&
45  * obj) method provides the pop()ed element by reference - see the
46  * comments on that method for the reason), except that it has mutex
47  * locking of the data container so as to permit push()ing and
48  * pop()ing from different threads. It is therefore useful for
49  * passing data between threads, perhaps in response to a signal being
50  * emitted from a Notifier object.
51  *
52  * AsyncQueueDispatch is a class which has a blocking pop() method,
53  * which allows it to be waited on by a dedicated event/message
54  * dispatching thread for incoming work (represented by the data
55  * pushed onto the queue). In the same way, it can be used to
56  * implement thread pools, by having threads in the pool waiting on
57  * the queue.
58  *
59  * By default the queues use a std::list object as their container
60  * because when adding an item to the queue all allocation can take
61  * place outside the queue object's mutex. However, for types which
62  * have low overhead copy constructors, this can be changed to, say, a
63  * std::deque object by specifying it as the second template
64  * parameter.
65  *
66  * If data pushed and popped from a queue are held by a reference
67  * counted smart pointer, the reference count must be thread-safe,
68  * such as by using SharedLockPtr or IntrusiveLockCounter.
69  */
70 
71 #ifndef CGU_ASYNC_QUEUE_H
72 #define CGU_ASYNC_QUEUE_H
73 
74 #include <queue>
75 #include <list>
76 #include <exception>
77 #include <algorithm> // for std::swap
78 #include <time.h>
79 #include <c++-gtk-utils/mutex.h>
80 #include <c++-gtk-utils/thread.h>
82 
83 #ifdef CGU_USE_SCHED_YIELD
84 #include <sched.h>
85 #else
86 #include <unistd.h>
87 #endif
88 
89 namespace Cgu {
90 
91 /**
92  * @class AsyncQueuePopError async_queue.h c++-gtk-utils/async_queue.h
93  * @brief An exception thrown if calling pop() on a AsyncQueue or
94  * AsyncQueueDispatch object fails because the queue is empty.
95  * @sa AsyncQueue AsyncQueueDispatch
96  */
97 
98 struct AsyncQueuePopError: public std::exception {
99  virtual const char* what() const throw() {return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
100 };
101 
102 
103 /**
104  * @class AsyncQueue async_queue.h c++-gtk-utils/async_queue.h
105  * @brief A thread-safe asynchronous queue.
106  * @sa AsyncQueueDispatch AsyncChannel AsyncResult
107  *
108  * AsyncQueue is a class which provides some of the functionality of a
109  * std::queue object (but note that the AsyncQueue::pop(value_type&
110  * obj) method provides the pop()ed element by reference - see the
111  * comments on that method for the reason), except that it has mutex
112  * locking of the data container so as to permit push()ing and
113  * pop()ing from different threads. It is therefore useful for
114  * passing data between threads, perhaps in response to a signal being
115  * emitted from a Notifier object.
116  *
117  * By default the queue uses a std::list object as its container
118  * because when adding an item to the queue all allocation can take
119  * place outside the queue object's mutex. However, for types which
120  * have low overhead copy constructors, this can be changed to, say, a
121  * std::deque object by specifying it as the second template
122  * parameter.
123  *
124  * If data pushed and popped from the queue are held by a reference
125  * counted smart pointer, the reference count must be thread-safe,
126  * such as by using SharedLockPtr or IntrusiveLockCounter.
127  *
128  * If the library is installed using the
129  * \--with-glib-memory-slices-compat or
130  * \--with-glib-memory-slices-no-compat configuration options, any
131  * AsyncQueue objects constructed on free store will be constructed in
132  * glib memory slices. This does not affect the queue container
133  * itself: to change the allocator of the C++ container, a custom
134  * allocator type can be provided when the AsyncQueue object is
135  * instantiated offering the standard allocator interface. If glib
136  * memory slices are not used or no AsyncQueue objects are constructed
137  * on free store, it is not necessary to call g_thread_init() before
138  * manipulating or using an AsyncQueue object in multiple threads, but
139  * prior to glib version 2.32 glib itself (and thus glib memory
140  * slices) are not thread safe unless that function has been called.
141  */
142 
143 template <class T, class Container = std::list<T> > class AsyncQueue {
144 public:
145  typedef typename Container::value_type value_type;
146  typedef typename Container::size_type size_type;
147  typedef Container container_type;
148 private:
149  std::queue<T, Container> q;
150  mutable Thread::Mutex mutex;
151 
152 // TODO: at the next ABI break make this method explicitly static
153 // this method won't throw: it is for the user to ensure the arguments
154 // do not refer to the same mutex object
155  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
156  m1.lock();
157  for(;;) {
158  if (!m2.trylock()) {
159  return;
160  }
161  m1.unlock();
162  // spin nicely
163 #ifdef CGU_USE_SCHED_YIELD
164  sched_yield();
165 #else
166  usleep(10);
167 #endif
168  m1.lock();
169  }
170  }
171 public:
172 /**
173  * Pushes an item onto the queue. This method has strong exception
174  * safety if the container is a std::list or std::deque container (the
175  * default is std::list), except that if std::deque is used as the
176  * container under C++11 and the copy constructor, move constructor,
177  * assignment operator or move assignment operator of the queue item
178  * throws, it only gives the basic exception guarantee (and the basic
179  * guarantee is not given by std::deque under C++11 if the queue
180  * item's move constructor throws and it uses a non-default allocator
181  * which does not provide for it to be CopyInsertable). It is thread
182  * safe.
183  * @param obj The item to be pushed onto the queue.
184  * @exception std::bad_alloc The method might throw std::bad_alloc if
185  * memory is exhausted and the system throws in that case. It might
186  * also throw if the copy constructor or assignment operator of the
187  * queue item might throw (or under C++11 if its move constructor or
188  * move assignment operator throws).
189  */
190  void push(const value_type& obj) {
191  Thread::Mutex::Lock lock(mutex);
192  q.push(obj);
193  }
194 
195 /**
196  * Pops an item from the queue. This method has strong exception
197  * safety if the container is a std::deque or std::list container (the
198  * default is std::list), provided the destructor of a contained item
199  * does not throw. It is thread safe.
200  * @param obj A value type reference to which the item at the front of
201  * the queue will be assigned.
202  * @exception AsyncQueuePopError If the queue is empty when a pop is
203  * attempted, this method will throw AsyncQueuePopError. It might
204  * also throw if the assignment operator of the queue item might
205  * throw. In order to complete pop() operations atomically under a
206  * single lock and to retain strong exception safety, the object into
207  * which the pop()ed data is to be placed is passed as an argument by
208  * reference (this avoids a copy from a temporary object after the
209  * data has been extracted from the queue, which would occur if the
210  * item extracted were returned by value). It might also throw if the
211  * destructor of the queue item might throw (but that should never
212  * happen), or if the empty() method of the container type throws
213  * (which would not happen on any sane implementation).
214  */
215  void pop(value_type& obj) {
216  Thread::Mutex::Lock lock(mutex);
217  if (q.empty()) throw AsyncQueuePopError();
218  obj = q.front();
219  q.pop();
220  }
221 
222 /**
223  * Discards the item at the front of the queue. This method has
224  * strong exception safety if the container is a std::deque or
225  * std::list container (the default is std::list), provided the
226  * destructor of a contained item does not throw. It is thread safe.
227  * @exception AsyncQueuePopError If the queue is empty when a pop is
228  * attempted, this method will throw AsyncQueuePopError. It might
229  * also throw if the destructor of the queue item might throw (but
230  * that should never happen), or if the empty() method of the
231  * container type throws (which would not happen on any sane
232  * implementation).
233  */
234  void pop() {
235  Thread::Mutex::Lock lock(mutex);
236  if (q.empty()) throw AsyncQueuePopError();
237  q.pop();
238  }
239 
240 /**
241  * @return Whether the queue is empty. It will not throw assuming
242  * that the empty() method of the container type does not throw, as it
243  * will not on any sane implementation.
244  * @note This method is thread safe, but the return value may not be
245  * valid if another thread has pushed to or popped from the queue
246  * before the value returned by the method is acted on. It is
247  * provided as a utility, but may not be meaningful, depending on the
248  * intended usage.
249  */
250  bool empty() const {
251  Thread::Mutex::Lock lock(mutex);
252  return q.empty();
253  }
254 
255 /**
256  * @return The number of items currently in the queue. It will not
257  * throw assuming that the size() method of the container type does
258  * not throw, as it will not on any sane implementation.
259  * @note This method is thread safe, but the return value may not be
260  * valid if another thread has pushed to or popped from the queue
261  * before the value returned by the method is acted on. It is
262  * provided as a utility, but may not be meaningful, depending on the
263  * intended usage.
264  *
265  * Since 1.2.22
266  */
267  size_type size() const {
268  Thread::Mutex::Lock lock(mutex);
269  return q.size();
270  }
271 
272 /**
273  * Swaps the contents of 'this' and 'other'. It will not throw
274  * assuming that the swap method of the container type does not throw
275  * (which the C++ standard requires not to happen with the standard
276  * sequence containers). It is thread safe and the swap is
277  * thread-wise atomic. A non-class function
278  * Cgu::swap(Cgu::AsyncQueue&, Cgu::AsyncQueue&) method is also
279  * provided which will call this method.
280  * @param other The object to be swapped with this one.
281  *
282  * Since 1.2.22
283  */
284  void swap(AsyncQueue& other) {
285  if (this != &other) {
286  lock2(mutex, other.mutex); // doesn't throw
288  Thread::Mutex::Lock l2(other.mutex, Thread::locked);
289  std::swap(q, other.q);
290  }
291  }
292 
293 /**
294  * The assignment operator is strongly exception safe with the
295  * standard sequence containers (it uses copy and swap). It is also
296  * thread safe, as it safely locks both the assignor's and assignee's
297  * mutex to provide a thread-wise atomic assignment.
298  * @param rhs The assignor.
299  * @return The AsyncQueue object after assignment.
300  * @exception std::bad_alloc The copy constructor of the queue's
301  * container type, and so this assignment operator, might throw
302  * std::bad_alloc if memory is exhausted and the system throws in that
303  * case. This assignment operator will also throw if the copy
304  * constructor of the queue's container type throws any other
305  * exceptions, including if any copy or move constructor or copy or
306  * move assignment operator of a contained item throws.
307  * @exception Thread::MutexError The assignment operator might throw
308  * Thread::MutexError if initialization of a transitional object's
309  * contained mutex fails. (It is often not worth checking for this,
310  * as it means either memory is exhausted or pthread has run out of
311  * other resources to create new mutexes.)
312  *
313  * Since 1.2.22
314  */
316  if (this != &rhs) {
317  lock2(mutex, rhs.mutex); // doesn't throw
319  Thread::Mutex::Lock l2(rhs.mutex, Thread::locked);
320  std::queue<T, Container> temp(rhs.q);
321  std::swap(q, temp);
322  }
323  return *this;
324  }
325 
326 /**
327  * @exception std::bad_alloc The default constructor might throw
328  * std::bad_alloc if memory is exhausted and the system throws in that
329  * case.
330  * @exception Thread::MutexError The default constructor might throw
331  * Thread::MutexError if initialisation of the contained mutex fails.
332  * (It is often not worth checking for this, as it means either memory
333  * is exhausted or pthread has run out of other resources to create
334  * new mutexes.)
335 */
337 
338 /**
339  * The copy constructor is thread safe, as it locks the initializing
340  * object's mutex to obtain a consistent view of it.
341  * @param rhs The AsyncQueue object to be copied.
342  * @exception std::bad_alloc The copy constructor of the queue's
343  * container type, and so this constructor, might throw std::bad_alloc
344  * if memory is exhausted and the system throws in that case. It will
345  * also throw if the copy constructor of the queue's container type
346  * throws any other exceptions, including if any copy or move
347  * constructor or copy or move assignment operator of a contained item
348  * throws.
349  * @exception Thread::MutexError The copy constructor might throw
350  * Thread::MutexError if initialization of the contained mutex fails.
351  * (It is often not worth checking for this, as it means either memory
352  * is exhausted or pthread has run out of other resources to create
353  * new mutexes.)
354  *
355  * Since 1.2.22
356  */
357  // we use the comma operator here to lock the mutex and call the
358  // copy constructor: the lock will be retained until the end of the
359  // full expression in which it is lexically situated, namely until
360  // the end of q's constructor - see C++98 1.9/12 and 12.2/3
361  AsyncQueue(const AsyncQueue& rhs): q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
362 
363 /**
364  * The destructor does not throw unless the destructor of a contained
365  * item throws. It is thread safe (any thread may delete the
366  * AsyncQueue object).
367  */
369  // lock and unlock the mutex in the destructor so that we have an
370  // acquire operation to ensure that when the std::queue object is
371  // destroyed memory is synchronised, so any thread may destroy the
372  // AsyncQueue object
373  Thread::Mutex::Lock lock(mutex);
374  }
375 
376 /* Only has effect if --with-glib-memory-slices-compat or
377  * --with-glib-memory-slices-no-compat option picked */
379 };
380 
381 /**
382  * @class AsyncQueueDispatch async_queue.h c++-gtk-utils/async_queue.h
383  * @brief A thread-safe asynchronous queue with a blocking pop()
384  * method.
385  * @sa AsyncQueue AsyncChannel AsyncResult
386  *
387  * AsyncQueueDispatch is similar to the AsyncQueue class, except that
388  * it has a blocking pop_dispatch() method, which allows it to be
389  * waited on by a dedicated event/message dispatching thread for
390  * incoming work (represented by the data pushed onto the queue). In
391  * the same way, it can be used to implement thread pools, by having
392  * threads in the pool waiting on the queue. The AsyncResult class
393  * can be useful for passing results between threads in conjunction
394  * with AsyncQueueDispatch (the documentation on AsyncResult gives an
395  * example).
396  *
397  * By default the queue uses a std::list object as its container
398  * because when adding an item to the queue all allocation can take
399  * place outside the queue object's mutex. However, for types which
400  * have low overhead copy constructors, this can be changed to, say, a
401  * std::deque object by specifying it as the second template
402  * parameter.
403  *
404  * If data pushed and popped from the queue are held by a reference
405  * counted smart pointer, the reference count must be thread-safe,
406  * such as by using SharedLockPtr or IntrusiveLockCounter.
407  *
408  * If the library is installed using the
409  * \--with-glib-memory-slices-compat or
410  * \--with-glib-memory-slices-no-compat configuration options, any
411  * AsyncQueueDispatch objects constructed on free store will be
412  * constructed in glib memory slices. This does not affect the queue
413  * container itself: to change the allocator of the C++ container, a
414  * custom allocator type can be provided when the AsyncQueueDispatch
415  * object is instantiated offering the standard allocator interface.
416  * If glib memory slices are not used or no AsyncQueueDispatch objects
417  * are constructed on free store, it is not necessary to call
418  * g_thread_init() before manipulating or using an AsyncQueueDispatch
419  * object in multiple threads, but prior to glib version 2.32 glib
420  * itself (and thus glib memory slices) are not thread safe unless
421  * that function has been called.
422  */
423 
424 template <class T, class Container = std::list<T> > class AsyncQueueDispatch {
425 public:
426  typedef typename Container::value_type value_type;
427  typedef typename Container::size_type size_type;
428  typedef Container container_type;
429 private:
430  std::queue<T, Container> q;
431  mutable Thread::Mutex mutex;
432  Thread::Cond cond;
433 
434 // TODO: at the next ABI break make this method explicitly static
435 // this method won't throw: it is for the user to ensure the arguments
436 // do not refer to the same mutex object
437  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
438  m1.lock();
439  for(;;) {
440  if (!m2.trylock()) {
441  return;
442  }
443  m1.unlock();
444  // spin nicely
445 #ifdef CGU_USE_SCHED_YIELD
446  sched_yield();
447 #else
448  usleep(10);
449 #endif
450  m1.lock();
451  }
452  }
453 public:
454 /**
455  * Pushes an item onto the queue. This method has strong exception
456  * safety if the container is a std::list or std::deque container (the
457  * default is std::list), except that if std::deque is used as the
458  * container under C++11 and the copy constructor, move constructor,
459  * assignment operator or move assignment operator of the queue item
460  * throws, it only gives the basic exception guarantee (and the basic
461  * guarantee is not given by std::deque under C++11 if the queue
462  * item's move constructor throws and it uses a non-default allocator
463  * which does not provide for it to be CopyInsertable). It is thread
464  * safe.
465  * @param obj The item to be pushed onto the queue.
466  * @exception std::bad_alloc The method might throw std::bad_alloc if
467  * memory is exhausted and the system throws in that case. It might
468  * also throw if the copy constructor or assignment operator of the
469  * queue item might throw (or under C++11 if its move constructor or
470  * move assignment operator throws).
471  */
472  void push(const value_type& obj) {
473  Thread::Mutex::Lock lock(mutex);
474  q.push(obj);
475  cond.signal();
476  }
477 
478 /**
479  * Pops an item from the queue. This method has strong exception
480  * safety if the container is a std::deque or std::list container (the
481  * default is std::list), provided the destructor of a contained item
482  * does not throw. It is thread safe.
483  * @param obj A value type reference to which the item at the front of
484  * the queue will be assigned.
485  * @exception AsyncQueuePopError If the queue is empty when a pop is
486  * attempted, this method will throw AsyncQueuePopError. It might
487  * also throw if the assignment operator of the queue item might
488  * throw. In order to complete pop() operations atomically under a
489  * single lock and to retain strong exception safety, the object into
490  * which the pop()ed data is to be placed is passed as an argument by
491  * reference (this avoids a copy from a temporary object after the
492  * data has been extracted from the queue, which would occur if the
493  * item extracted were returned by value). It might also throw if the
494  * destructor of the queue item might throw (but that should never
495  * happen), or if the empty() method of the container type throws
496  * (which would not happen on any sane implementation).
497  */
498  void pop(value_type& obj) {
499  Thread::Mutex::Lock lock(mutex);
500  if (q.empty()) throw AsyncQueuePopError();
501  obj = q.front();
502  q.pop();
503  }
504 
505 /**
506  * Pops an item from the queue. If the queue is empty, it will block
507  * until an item becomes available. If it blocks, the wait comprises
508  * a cancellation point. This method is cancellation safe if the
509  * stack unwinds on cancellation, as cancellation is blocked while the
510  * queue is being operated on after coming out of a wait. This method
511  * has strong exception safety if the container is a std::deque or
512  * std::list container (the default is std::list), provided the
513  * destructor of a contained item does not throw. It is thread safe.
514  * @param obj A value type reference to which the item at the front of
515  * the queue will be assigned. This method might throw if the
516  * assignment operator of the queue item might throw. In order to
517  * complete pop() operations atomically under a single lock and to
518  * retain strong exception safety, the object into which the pop()ed
519  * data is to be placed is passed as an argument by reference (this
520  * avoids a copy from a temporary object after the data has been
521  * extracted from the queue, which would occur if the item extracted
522  * were returned by value). It might also throw if the destructor of
523  * the queue item might throw (but that should never happen), or if
524  * the empty() method of the container type throws (which would not
525  * happen on any sane implementation).
526  */
528  Thread::Mutex::Lock lock(mutex);
529  while (q.empty()) cond.wait(mutex);
531  obj = q.front();
532  q.pop();
533  }
534 
535 /**
536  * Pops an item from the queue. If the queue is empty, it will block
537  * until an item becomes available or until the timeout expires. If
538  * it blocks, the wait comprises a cancellation point. This method is
539  * cancellation safe if the stack unwinds on cancellation, as
540  * cancellation is blocked while the queue is being operated on after
541  * coming out of a wait. This method has strong exception safety if
542  * the container is a std::deque or std::list container (the default
543  * is std::list), provided the destructor of a contained item does not
544  * throw. It is thread safe.
545  * @param obj A value type reference to which the item at the front of
546  * the queue will be assigned. This method might throw if the
547  * assignment operator of the queue item might throw. In order to
548  * complete pop() operations atomically under a single lock and to
549  * retain strong exception safety, the object into which the pop()ed
550  * data is to be placed is passed as an argument by reference (this
551  * avoids a copy from a temporary object after the data has been
552  * extracted from the queue, which would occur if the item extracted
553  * were returned by value). It might also throw if the destructor of
554  * the queue item might throw (but that should never happen), or if
555  * the empty() method of the container type throws (which would not
556  * happen on any sane implementation).
557  * @param millisec The timeout interval, in milliseconds.
558  * @return If the timeout expires without an item becoming available,
559  * the method will return true. If an item from the queue is
560  * extracted, it returns false.
561  */
562  bool pop_timed_dispatch(value_type& obj, unsigned int millisec) {
563  timespec ts;
564  Thread::Cond::get_abs_time(ts, millisec);
565  Thread::Mutex::Lock lock(mutex);
566  while (q.empty()) {
567  if (cond.timed_wait(mutex, ts)) return true;
568  }
570  obj = q.front();
571  q.pop();
572  return false;
573  }
574 
575 /**
576  * Discards the item at the front of the queue. This method has
577  * strong exception safety if the container is a std::deque or
578  * std::list container (the default is std::list), provided the
579  * destructor of a contained item does not throw. It is thread safe.
580  * @exception AsyncQueuePopError If the queue is empty when a pop is
581  * attempted, this method will throw AsyncQueuePopError. It might
582  * also throw if the destructor of the queue item might throw (but
583  * that should never happen), or if the empty() method of the
584  * container type throws (which would not happen on any sane
585  * implementation).
586  */
587  void pop() {
588  Thread::Mutex::Lock lock(mutex);
589  if (q.empty()) throw AsyncQueuePopError();
590  q.pop();
591  }
592 
593 /**
594  * @return Whether the queue is empty. It will not throw assuming
595  * that the empty() method of the container type does not throw, as it
596  * will not on any sane implementation.
597  * @note This method is thread safe, but the return value may not be
598  * valid if another thread has pushed to or popped from the queue
599  * before the value returned by the method is acted on. It is
600  * provided as a utility, but may not be meaningful, depending on the
601  * intended usage.
602  */
603  bool empty() const {
604  Thread::Mutex::Lock lock(mutex);
605  return q.empty();
606  }
607 
608 /**
609  * @return The number of items currently in the queue. It will not
610  * throw assuming that the size() method of the container type does
611  * not throw, as it will not on any sane implementation.
612  * @note This method is thread safe, but the return value may not be
613  * valid if another thread has pushed to or popped from the queue
614  * before the value returned by the method is acted on. It is
615  * provided as a utility, but may not be meaningful, depending on the
616  * intended usage.
617  *
618  * Since 1.2.22
619  */
620  size_type size() const {
621  Thread::Mutex::Lock lock(mutex);
622  return q.size();
623  }
624 
625 /**
626  * Swaps the contents of 'this' and 'other'. It will not throw
627  * assuming that the swap method of the container type does not throw
628  * (which the C++ standard requires not to happen with the standard
629  * sequence containers). It is thread safe and the swap is
630  * thread-wise atomic. A non-class function
631  * Cgu::swap(Cgu::AsyncQueueDispatch&, Cgu::AsyncQueueDispatch&)
632  * method is also provided which will call this method.
633  * @param other The object to be swapped with this one.
634  * @note An object swapped does not, by virtue of the swap, inherit
635  * any threads waiting on the other one. However if threads were
636  * waiting on a swapped object prior to the swap, and it acquires
637  * items by virtue of the swap, the waiting threads will unblock and
638  * extract those items.
639  *
640  * Since 1.2.22
641  */
642  void swap(AsyncQueueDispatch& other) {
643  if (this != &other) {
644  lock2(mutex, other.mutex); // doesn't throw
646  Thread::Mutex::Lock l2(other.mutex, Thread::locked);
647  std::swap(q, other.q);
648  if (!q.empty()) cond.broadcast();
649  if (!other.q.empty()) other.cond.broadcast();
650  }
651  }
652 
653 /**
654  * The assignment operator is strongly exception safe with the
655  * standard sequence containers (it uses copy and swap). It is also
656  * thread safe, as it safely locks both the assignor's and assignee's
657  * mutex to provide a thread-wise atomic assignment.
658  * @param rhs The assignor.
659  * @return The AsyncQueueDispatch object after assignment.
660  * @exception std::bad_alloc The copy constructor of the queue's
661  * container type, and so this assignment operator, might throw
662  * std::bad_alloc if memory is exhausted and the system throws in that
663  * case. This assignment operator will also throw if the copy
664  * constructor of the queue's container type throws any other
665  * exceptions, including if any copy or move constructor or copy or
666  * move assignment operator of a contained item throws.
667  * @exception Thread::MutexError The assignment operator might throw
668  * Thread::MutexError if initialization of a transitional object's
669  * contained mutex fails. (It is often not worth checking for this,
670  * as it means either memory is exhausted or pthread has run out of
671  * other resources to create new mutexes.)
672  * @exception Thread::CondError The assignment operator might throw
673  * this exception if initialisation of a transitional object's
674  * contained condition variable fails. (It is often not worth
675  * checking for this, as it means either memory is exhausted or
676  * pthread has run out of other resources to create new condition
677  * variables.)
678  * @note The assignee does not, by virtue of the assignment, inherit
679  * any threads waiting on the assignor. However, if prior to the
680  * assignment threads were waiting on the assignee and the assignee
681  * acquires items from the assignor as a result of the assignment, the
682  * waiting threads will unblock and extract those items.
683  *
684  * Since 1.2.22
685  */
687  if (this != &rhs) {
688  lock2(mutex, rhs.mutex); // doesn't throw
690  Thread::Mutex::Lock l2(rhs.mutex, Thread::locked);
691  std::queue<T, Container> temp(rhs.q);
692  std::swap(q, temp);
693  if (!q.empty()) cond.broadcast();
694  }
695  return *this;
696  }
697 
698 /**
699  * @exception std::bad_alloc The default constructor might throw this
700  * exception if memory is exhausted and the system throws in that
701  * case.
702  * @exception Thread::MutexError The default constructor might throw
703  * this exception if initialisation of the contained mutex fails. (It
704  * is often not worth checking for this, as it means either memory is
705  * exhausted or pthread has run out of other resources to create new
706  * mutexes.)
707  * @exception Thread::CondError The default constructor might throw
708  * this exception if initialisation of the contained condition
709  * variable fails. (It is often not worth checking for this, as it
710  * means either memory is exhausted or pthread has run out of other
711  * resources to create new condition variables.)
712  */
714 
715 /**
716  * The copy constructor is thread safe, as it locks the initializing
717  * object's mutex to obtain a consistent view of it.
718  * @param rhs The AsyncQueue object to be copied.
719  * @exception std::bad_alloc The copy constructor of the queue's
720  * container type, and so this constructor, might throw std::bad_alloc
721  * if memory is exhausted and the system throws in that case. It will
722  * also throw if the copy constructor of the queue's container type
723  * throws any other exceptions, including if any copy or move
724  * constructor or copy or move assignment operator of a contained item
725  * throws.
726  * @exception Thread::MutexError The copy constructor might throw
727  * Thread::MutexError if initialization of the contained mutex fails.
728  * (It is often not worth checking for this, as it means either memory
729  * is exhausted or pthread has run out of other resources to create
730  * new mutexes.)
731  * @exception Thread::CondError The copy constructor might throw this
732  * exception if initialisation of the contained condition variable
733  * fails. (It is often not worth checking for this, as it means
734  * either memory is exhausted or pthread has run out of other
735  * resources to create new condition variables.)
736  *
737  * Since 1.2.22
738  */
739  // we use the comma operator here to lock the mutex and call the
740  // copy constructor: the lock will be retained until the end of the
741  // full expression in which it is lexically situated, namely until
742  // the end of q's constructor - see C++98 1.9/12 and 12.2/3
744  q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
745 
746 /**
747  * The destructor does not throw unless the destructor of a contained
748  * item throws. It is thread safe (any thread may delete the
749  * AsyncQueueDispatch object). Destroying an AsyncQueueDispatch
750  * object on which another thread is currently blocked results in
751  * undefined behavior.
752  */
754  // lock and unlock the mutex in the destructor so that we have an
755  // acquire operation to ensure that when the std::queue object is
756  // destroyed memory is synchronised, so any thread may destroy the
757  // AsyncQueueDispatch object
758  Thread::Mutex::Lock lock(mutex);
759  }
760 
761 /* Only has effect if --with-glib-memory-slices-compat or
762  * --with-glib-memory-slices-no-compat option picked */
764 };
765 
766 /**
767  * Swaps the contents of two AsyncQueue objects. It will not throw
768  * assuming that the swap method of the container type does not throw
769  * (which the C++ standard requires not to happen with the standard
770  * sequence containers). It is thread safe and the swap is
771  * thread-wise atomic.
772  * @param q1 An object to be swapped with the other.
773  * @param q2 An object to be swapped with the other.
774  * @note Calling std::swap on AsyncQueue objects is thread safe but
775  * does not provide a thread-wise atomic swap (the swapped objects may
776  * not be mirror images if during the execution of std::swap's default
777  * algorithm one of them has been modified), although in many cases
778  * that doesn't matter. If swap() is called without a namespace
779  * qualifier, argument dependent look-up will pick this one correctly.
780  *
781  * Since 1.2.22
782  */
783 template <class T, class Container>
786  q1.swap(q2);
787 }
788 
789 /**
790  * Swaps the contents of two AsyncQueueDispatch objects. It will not
791  * throw assuming that the swap method of the container type does not
792  * throw (which the C++ standard requires not to happen with the
793  * standard sequence containers). It is thread safe and the swap is
794  * thread-wise atomic.
795  * @param q1 An object to be swapped with the other.
796  * @param q2 An object to be swapped with the other.
797  * @note 1. An object swapped does not, by virtue of the swap, inherit
798  * any threads waiting on the other one. However if threads were
799  * waiting on a swapped object prior to the swap, and it acquires
800  * items by virtue of the swap, the waiting threads will unblock and
801  * extract those items.
802  * @note 2. Calling std::swap on AsyncQueueDispatch objects is thread
803  * safe but does not provide a thread-wise atomic swap (the swapped
804  * objects may not be mirror images if during the execution of
805  * std::swap's default algorithm one of them has been modified),
806  * although in many cases that doesn't matter. If swap() is called
807  * without a namespace qualifier, argument dependent look-up will pick
808  * this one correctly.
809  *
810  * Since 1.2.22
811  */
812 template <class T, class Container>
815  q1.swap(q2);
816 }
817 
818 #if defined(CGU_USE_INHERITABLE_QUEUE) && !defined(DOXYGEN_PARSING)
819 
820 /* This is a specialization of AsyncQueue for std::list objects, which
821  uses std::list::splice() to push a new item on the queue. This
822  means that allocation for the push occurs outside the mutex, so
823  reducing contention (a tip from a talk by Sean Parent of Adobe).
824  This is first available in version 1.2.32.
825  */
826 template <class T, class Allocator>
827 class AsyncQueue<T, std::list<T, Allocator> > {
828 public:
829  typedef std::list<T, Allocator> Container;
830  typedef typename Container::value_type value_type;
831  typedef typename Container::size_type size_type;
832  typedef Container container_type;
833 private:
834  // 23.2.3.1 of C++98 requires std::queue to have a protected
835  // container member called 'c' for the purposes of derivation. This
836  // specialisation will have the same binary layout as the
837  // unspecialized version on any practical implementation: all we do
838  // is add a splice_end() member
839  class Q: public std::queue<T, Container> {
840  public:
841  void splice_end(Container& lst) {
842  this->c.splice(this->c.end(), lst);
843  }
844  } q;
845  mutable Thread::Mutex mutex;
846 
847 // TODO: at the next ABI break make this method explicitly static
848  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
849  m1.lock();
850  for(;;) {
851  if (!m2.trylock()) {
852  return;
853  }
854  m1.unlock();
855  // spin nicely
856 #ifdef CGU_USE_SCHED_YIELD
857  sched_yield();
858 #else
859  usleep(10);
860 #endif
861  m1.lock();
862  }
863  }
864 public:
865  void push(const value_type& obj) {
866  Container temp;
867  temp.push_back(obj);
868  Thread::Mutex::Lock lock(mutex);
869  q.splice_end(temp);
870  }
871 
872  void pop(value_type& obj) {
873  Thread::Mutex::Lock lock(mutex);
874  if (q.empty()) throw AsyncQueuePopError();
875  obj = q.front();
876  q.pop();
877  }
878 
879  void pop() {
880  Thread::Mutex::Lock lock(mutex);
881  if (q.empty()) throw AsyncQueuePopError();
882  q.pop();
883  }
884 
885  bool empty() const {
886  Thread::Mutex::Lock lock(mutex);
887  return q.empty();
888  }
889 
890  size_type size() const {
891  Thread::Mutex::Lock lock(mutex);
892  return q.size();
893  }
894 
895  void swap(AsyncQueue& other) {
896  if (this != &other) {
897  lock2(mutex, other.mutex); // doesn't throw
898  Thread::Mutex::Lock l1(mutex, Thread::locked);
899  Thread::Mutex::Lock l2(other.mutex, Thread::locked);
900  std::swap(q, other.q);
901  }
902  }
903 
904  AsyncQueue& operator=(const AsyncQueue& rhs) {
905  if (this != &rhs) {
906  lock2(mutex, rhs.mutex); // doesn't throw
907  Thread::Mutex::Lock l1(mutex, Thread::locked);
908  Thread::Mutex::Lock l2(rhs.mutex, Thread::locked);
909  Q temp(rhs.q);
910  std::swap(q, temp);
911  }
912  return *this;
913  }
914 
915  AsyncQueue() {}
916 
917  AsyncQueue(const AsyncQueue& rhs): q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
918 
919  ~AsyncQueue() {
920  Thread::Mutex::Lock lock(mutex);
921  }
922 
924 };
925 
926 /* This is a specialization of AsyncQueueDispatch for std::list
927  objects, which uses std::list::splice() to push a new item on the
928  queue. This means that allocation for the push occurs outside the
929  mutex, so reducing contention (a tip from a talk by Sean Parent of
930  Adobe). This is first available in version 1.2.32.
931  */
932 template <class T, class Allocator>
933 class AsyncQueueDispatch<T, std::list<T, Allocator> > {
934 public:
935  typedef std::list<T, Allocator> Container;
936  typedef typename Container::value_type value_type;
937  typedef typename Container::size_type size_type;
938  typedef Container container_type;
939 private:
940  // 23.2.3.1 of C++98 requires std::queue to have a protected
941  // container member called 'c' for the purposes of derivation. This
942  // specialisation will have the same binary layout as the
943  // unspecialized version on any practical implementation: all we do
944  // is add a splice_end() member
945  class Q: public std::queue<T, Container> {
946  public:
947  void splice_end(Container& lst) {
948  this->c.splice(this->c.end(), lst);
949  }
950  } q;
951  mutable Thread::Mutex mutex;
952  Thread::Cond cond;
953 
954 // TODO: at the next ABI break make this method explicitly static
955  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
956  m1.lock();
957  for(;;) {
958  if (!m2.trylock()) {
959  return;
960  }
961  m1.unlock();
962  // spin nicely
963 #ifdef CGU_USE_SCHED_YIELD
964  sched_yield();
965 #else
966  usleep(10);
967 #endif
968  m1.lock();
969  }
970  }
971 public:
972  void push(const value_type& obj) {
973  Container temp;
974  temp.push_back(obj);
975  Thread::Mutex::Lock lock(mutex);
976  q.splice_end(temp);
977  cond.signal();
978  }
979 
980  void pop(value_type& obj) {
981  Thread::Mutex::Lock lock(mutex);
982  if (q.empty()) throw AsyncQueuePopError();
983  obj = q.front();
984  q.pop();
985  }
986 
987  void pop_dispatch(value_type& obj) {
988  Thread::Mutex::Lock lock(mutex);
989  while (q.empty()) cond.wait(mutex);
990  Thread::CancelBlock b;
991  obj = q.front();
992  q.pop();
993  }
994 
995  bool pop_timed_dispatch(value_type& obj, unsigned int millisec) {
996  timespec ts;
997  Thread::Cond::get_abs_time(ts, millisec);
998  Thread::Mutex::Lock lock(mutex);
999  while (q.empty()) {
1000  if (cond.timed_wait(mutex, ts)) return true;
1001  }
1002  Thread::CancelBlock b;
1003  obj = q.front();
1004  q.pop();
1005  return false;
1006  }
1007 
1008  void pop() {
1009  Thread::Mutex::Lock lock(mutex);
1010  if (q.empty()) throw AsyncQueuePopError();
1011  q.pop();
1012  }
1013 
1014  bool empty() const {
1015  Thread::Mutex::Lock lock(mutex);
1016  return q.empty();
1017  }
1018 
1019  size_type size() const {
1020  Thread::Mutex::Lock lock(mutex);
1021  return q.size();
1022  }
1023 
1024  void swap(AsyncQueueDispatch& other) {
1025  if (this != &other) {
1026  lock2(mutex, other.mutex); // doesn't throw
1027  Thread::Mutex::Lock l1(mutex, Thread::locked);
1028  Thread::Mutex::Lock l2(other.mutex, Thread::locked);
1029  std::swap(q, other.q);
1030  if (!q.empty()) cond.broadcast();
1031  if (!other.q.empty()) other.cond.broadcast();
1032  }
1033  }
1034 
1036  if (this != &rhs) {
1037  lock2(mutex, rhs.mutex); // doesn't throw
1038  Thread::Mutex::Lock l1(mutex, Thread::locked);
1039  Thread::Mutex::Lock l2(rhs.mutex, Thread::locked);
1040  Q temp(rhs.q);
1041  std::swap(q, temp);
1042  if (!q.empty()) cond.broadcast();
1043  }
1044  return *this;
1045  }
1046 
1047  AsyncQueueDispatch() {}
1048 
1050  q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1051 
1053  Thread::Mutex::Lock lock(mutex);
1054  }
1055 
1057 };
1058 
1059 #endif // CGU_USE_INHERITABLE_QUEUE
1060 
1061 } // namespace Cgu
1062 
1063 #endif
Cgu::Thread::Cond::get_abs_time
static void get_abs_time(timespec &ts, unsigned int millisec)
Cgu::Thread::Cond::timed_wait
int timed_wait(Mutex &mutex, const timespec &abs_time)
Definition: mutex.h:528
Cgu::AsyncQueueDispatch::AsyncQueueDispatch
AsyncQueueDispatch()
Definition: async_queue.h:713
Cgu::AsyncQueue::operator=
AsyncQueue & operator=(const AsyncQueue &rhs)
Definition: async_queue.h:315
Cgu::AsyncQueue::container_type
Container container_type
Definition: async_queue.h:147
Cgu::AsyncQueueDispatch
A thread-safe asynchronous queue with a blocking pop() method.
Definition: async_queue.h:424
Cgu
Definition: application.h:45
Cgu::AsyncQueueDispatch::pop
void pop(value_type &obj)
Definition: async_queue.h:498
Cgu::AsyncQueueDispatch::pop
void pop()
Definition: async_queue.h:587
Cgu::Thread::locked
@ locked
Definition: mutex.h:181
Cgu::AsyncQueue::AsyncQueue
AsyncQueue(const AsyncQueue &rhs)
Definition: async_queue.h:361
Cgu::Thread::Cond::broadcast
int broadcast()
Definition: mutex.h:451
Cgu::AsyncQueueDispatch::AsyncQueueDispatch
AsyncQueueDispatch(const AsyncQueueDispatch &rhs)
Definition: async_queue.h:743
Cgu::AsyncQueue
A thread-safe asynchronous queue.
Definition: async_queue.h:143
Cgu::Thread::Cond
A wrapper class for pthread condition variables.
Definition: mutex.h:424
Cgu::AsyncQueuePopError
An exception thrown if calling pop() on a AsyncQueue or AsyncQueueDispatch object fails because the q...
Definition: async_queue.h:98
Cgu::Thread::Cond::signal
int signal()
Definition: mutex.h:440
Cgu::AsyncQueueDispatch::size
size_type size() const
Definition: async_queue.h:620
Cgu::AsyncQueue::push
void push(const value_type &obj)
Definition: async_queue.h:190
Cgu::Thread::Cond::wait
int wait(Mutex &mutex)
Definition: mutex.h:476
Cgu::AsyncQueue::size_type
Container::size_type size_type
Definition: async_queue.h:146
Cgu::AsyncQueueDispatch::~AsyncQueueDispatch
~AsyncQueueDispatch()
Definition: async_queue.h:753
Cgu::swap
void swap(Cgu::AsyncQueue< T, Container > &q1, Cgu::AsyncQueue< T, Container > &q2)
Definition: async_queue.h:784
Cgu::AsyncQueue::~AsyncQueue
~AsyncQueue()
Definition: async_queue.h:368
Cgu::AsyncQueueDispatch::value_type
Container::value_type value_type
Definition: async_queue.h:426
Cgu::AsyncQueueDispatch::operator=
AsyncQueueDispatch & operator=(const AsyncQueueDispatch &rhs)
Definition: async_queue.h:686
Cgu::AsyncQueueDispatch::size_type
Container::size_type size_type
Definition: async_queue.h:427
Cgu::AsyncQueue::pop
void pop(value_type &obj)
Definition: async_queue.h:215
Cgu::AsyncQueue::empty
bool empty() const
Definition: async_queue.h:250
Cgu::swap
void swap(Cgu::AsyncQueueDispatch< T, Container > &q1, Cgu::AsyncQueueDispatch< T, Container > &q2)
Definition: async_queue.h:813
Cgu::AsyncQueueDispatch::swap
void swap(AsyncQueueDispatch &other)
Definition: async_queue.h:642
Cgu::Thread::Mutex::lock
int lock()
Definition: mutex.h:132
Cgu::Thread::Mutex::Lock
A scoped locking class for exception safe Mutex locking.
Definition: mutex.h:192
Cgu::AsyncQueueDispatch::empty
bool empty() const
Definition: async_queue.h:603
CGU_GLIB_MEMORY_SLICES_FUNCS
#define CGU_GLIB_MEMORY_SLICES_FUNCS
Definition: cgu_config.h:84
Cgu::Thread::CancelBlock
A class enabling the cancellation state of a thread to be controlled.
Definition: thread.h:571
mutex.h
Provides wrapper classes for pthread mutexes and condition variables, and scoped locking classes for ...
Cgu::AsyncQueueDispatch::pop_dispatch
void pop_dispatch(value_type &obj)
Definition: async_queue.h:527
Cgu::AsyncQueue::AsyncQueue
AsyncQueue()
Definition: async_queue.h:336
Cgu::AsyncQueue::value_type
Container::value_type value_type
Definition: async_queue.h:145
Cgu::AsyncQueueDispatch::pop_timed_dispatch
bool pop_timed_dispatch(value_type &obj, unsigned int millisec)
Definition: async_queue.h:562
Cgu::AsyncQueueDispatch::push
void push(const value_type &obj)
Definition: async_queue.h:472
Cgu::AsyncQueueDispatch::container_type
Container container_type
Definition: async_queue.h:428
Cgu::Thread::Mutex::unlock
int unlock()
Definition: mutex.h:155
Cgu::AsyncQueuePopError::what
virtual const char * what() const
Definition: async_queue.h:99
Cgu::Thread::Mutex::trylock
int trylock()
Definition: mutex.h:142
thread.h
Cgu::AsyncQueue::pop
void pop()
Definition: async_queue.h:234
Cgu::AsyncQueue::swap
void swap(AsyncQueue &other)
Definition: async_queue.h:284
Cgu::Thread::Mutex
A wrapper class for pthread mutexes.
Definition: mutex.h:109
Cgu::AsyncQueue::size
size_type size() const
Definition: async_queue.h:267
cgu_config.h