c++-gtk-utils
async_channel.h
Go to the documentation of this file.
1 /* Copyright (C) 2016 Chris Vine
2 
3 The library comprised in this file or of which this file is part is
4 distributed by Chris Vine under the GNU Lesser General Public
5 License as follows:
6 
7  This library is free software; you can redistribute it and/or
8  modify it under the terms of the GNU Lesser General Public License
9  as published by the Free Software Foundation; either version 2.1 of
10  the License, or (at your option) any later version.
11 
12  This library is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License, version 2.1, for more details.
16 
17  You should have received a copy of the GNU Lesser General Public
18  License, version 2.1, along with this library (see the file LGPL.TXT
19  which came with this source code package in the c++-gtk-utils
20  sub-directory); if not, write to the Free Software Foundation, Inc.,
21  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 
23 However, it is not intended that the object code of a program whose
24 source code instantiates a template from this file or uses macros or
25 inline functions (of any length) should by reason only of that
26 instantiation or use be subject to the restrictions of use in the GNU
27 Lesser General Public License. With that in mind, the words "and
28 macros, inline functions and instantiations of templates (of any
29 length)" shall be treated as substituted for the words "and small
30 macros and small inline functions (ten lines or less in length)" in
31 the fourth paragraph of section 5 of that licence. This does not
32 affect any other reason why object code may be subject to the
33 restrictions in that licence (nor for the avoidance of doubt does it
34 affect the application of section 2 of that licence to modifications
35 of the source code in this file).
36 
37 */
38 
39 /**
40  * @file async_channel.h
41  * @brief This file provides a "channel" class for inter-thread communication.
42  *
43  * AsyncChannel is similar to the AsyncQueueDispatch class, in that it
44  * provides a means of sending data between threads. Producer threads
45  * push data onto the queue and consumer threads pop them off in a
46  * thread safe way, and if there are no data in the channel any
47  * consumer thread will block until a producer thread pushes an item
48  * onto it. However, unlike the AsyncQueueDispatch class, it has a
49  * fixed maximum size as part of its type, which may be any size
50  * greater than 0, and if the number of data items still in the
51  * channel is such as to make the channel full, then producer threads
52  * will block on the channel until a consumer thread pops an item from
53  * it.
54  *
55  * It therefore provides for back pressure on producer threads which
56  * will automatically prevent the channel being overwhelmed by
57  * producer threads pushing more items onto the queue than consumer
58  * threads have the capacity to take off it.
59  *
60  * AsyncChannel is useful where this feature is important, and an
61  * AsyncChannel object can be used with any number of producer threads
62  * and consumer threads. However, under heavy contention with complex
63  * data item types AsyncQueueDispatch objects will usually be faster.
64  * Under lower contention and with simpler data types, an AsyncChannel
65  * object may be faster as it can benefit from its fixed buffer size
66  * (the AsyncChannel implementation uses a circular buffer with
67  * in-buffer construction of data items).
68  *
69  * This class is available from version 1.2.42.
70  */
71 
72 #ifndef CGU_ASYNC_CHANNEL_H
73 #define CGU_ASYNC_CHANNEL_H
74 
75 #include <new> // for std::bad_alloc
76 #include <cstddef> // for std::size_t
77 #include <cstdlib> // for std::malloc
78 
79 #include <pthread.h>
80 
81 #include <c++-gtk-utils/mutex.h>
82 #include <c++-gtk-utils/thread.h>
84 
85 #ifdef CGU_USE_SCHED_YIELD
86 #include <sched.h>
87 #else
88 #include <unistd.h>
89 #endif
90 
91 namespace Cgu {
92 
93 // function for pthread_cleanup_push() in AsyncChannel::push() and
94 // AsyncChannel::pop() methods
95 extern "C" {
96 inline void cgu_async_channel_waiters_dec(void* arg) {
97  --(*static_cast<std::size_t*>(arg));
98 }
99 } // extern "C"
100 
101 /**
102  * @class AsyncChannel async_channel.h c++-gtk-utils/async_channel.h
103  * @brief A thread-safe "channel" class for inter-thread communication.
104  * @sa AsyncQueue AsyncQueueDispatch AsyncChannel AsyncResult
105  *
106  * AsyncChannel is similar to the AsyncQueueDispatch class, in that it
107  * provides a means of sending data between threads. Producer threads
108  * push data onto the queue and consumer threads pop them off in a
109  * thread safe way, and if there are no data in the channel any
110  * consumer thread will block until a producer thread pushes an item
111  * onto it. However, unlike the AsyncQueueDispatch class, it has a
112  * fixed maximum size as part of its type, which may be any size
113  * greater than 0, and if the number of data items still in the
114  * channel is such as to make the channel full, then producer threads
115  * will block on the channel until a consumer thread pops an item from
116  * it.
117  *
118  * It therefore provides for back pressure on producer threads which
119  * will automatically prevent the channel being overwhelmed by
120  * producer threads pushing more items onto the queue than consumer
121  * threads have the capacity to take off it.
122  *
123  * AsyncChannel is useful where this feature is important, and an
124  * AsyncChannel object can be used with any number of producer threads
125  * and consumer threads. However, under heavy contention with complex
126  * data item types AsyncQueueDispatch objects will usually be faster.
127  * Under lower contention and with simpler data types, an AsyncChannel
128  * object may be faster as it can benefit from its fixed buffer size
129  * (the AsyncChannel implementation uses a circular buffer with
130  * in-buffer construction of data items).
131  *
132  * AsyncChannel objects are instantiated with firstly a template type
133  * 'T' and secondly a template integer value 'n'. 'T' is the type of
134  * the data items to be placed on the queue. 'n' is the size of the
135  * channel, which as mentioned may be any size greater than 0.
136  *
137  * This class is available from version 1.2.42.
138  */
139 
140 /*
141  * We have to use Thread::Cond::broadcast() and not
142  * Thread::Cond::signal(), because it is possible to have at any one
143  * time both a producer and a consumer waiting on the AsyncChannel's
144  * condition variable. This can create a "thundering herd" problem if
145  * there are a large number of threads waiting on the channel, but it
146  * is within the range of the acceptable. As an example of the issue
147  * requiring this approach, take this case:
148  *
149  * Let there be a channel which has a capacity of one. Let the
150  * channel already have an item in it from some past producer. In
151  * addition, let two producer threads be currently in a cond-wait,
152  * waiting for the channel to become empty in order to put an item in
153  * it.
154  *
155  * A first consumer thread starts to remove the item in the channel.
156  * It acquires the channel's mutex without blocking because it is not
157  * locked. It sees there is an item in the channel so does not begin
158  * a cond-wait. Meanwhile, immediately after the first consumer
159  * thread acquires the mutex a second consumer thread tries to obtain
160  * an item from the channel and thus will block when trying to acquire
161  * the locked mutex.
162  *
163  * The first consumer thread then removes the item from the channel's
164  * queue and signals the cond-var - which causes one of the producer
165  * threads to wake-up and block on trying to acquire the mutex ("the
166  * first producer"). So for the producers, we now have the first
167  * producer contending on the mutex after being so awoken and the
168  * second producer still waiting on the cond-var. And we have two
169  * threads now contending on the mutex - the thread of the second
170  * consumer and the thread of the first producer. One of these
171  * threads will acquire the mutex when the first consumer releases it
172  * after signalling the cond-var, and it is unspecified which one.
173  *
174  * If it is the second consumer, it will find the channel empty and
175  * thus enter a cond-wait (so we now have the second producer and the
176  * second consumer waiting on the same cond-var) and release the
177  * mutex. This will cause the first producer to acquire the mutex,
178  * which will then add the item to the channel, signal the cond-var
179  * (which will cause either the second producer or second consumer to
180  * awaken), and release the mutex. If it is the second producer which
181  * awakens, its thread will acquire the mutex, find the channel full,
182  * enter a cond-wait again and release the mutex. Now we are stuffed
183  * because there is nothing to awaken the remaining consumer even
184  * though there is something in the channel.
185  */
186 template <class T, std::size_t n> class AsyncChannel {
187 public:
188  typedef T value_type;
189  typedef std::size_t size_type;
190 private:
191  mutable Thread::Mutex mutex;
192  Thread::Cond cond;
193  size_type size; // number of available items in channel
194  size_type idx; // index of first available item in channel (this
195  // value is meaningless when size == 0)
196  size_type waiters;
197  enum Status {normal, closed, destructing} status;
198  T* buf;
199 
200 // this class cannot be copied
201  AsyncChannel(const AsyncChannel&);
202  AsyncChannel& operator=(const AsyncChannel&);
203 public:
204 /**
205  * Closes the channel. This means that (i) any threads blocking on a
206  * full channel with a call to push() will unblock with a false return
207  * value, and any call to that method after the closure will return
208  * immediately with a false return value, (ii) any threads blocking on
209  * an empty channel with a call to pop() will unblock with a false
210  * return value, (iii) any data items remaining in the channel which
211  * were pushed to the channel prior to the closure of the channel can
212  * be popped after that closure by further calls to pop(), which will
213  * return normally with a true return value, and (iv) after any such
214  * remaining data items have been removed, any subsequent calls to
215  * pop() will return with a false return value.
216  *
217  * If called more than once, this method will do nothing.
218  *
219  * This method will not throw. It is thread safe - any thread may
220  * call it.
221  *
222  * One of the main purposes of this method is to enable a producer
223  * thread to inform a consumer thread that nothing more will be put in
224  * the channel by it for the consumer: for such cases, as mentioned
225  * once everything pushed to the channel prior to its closure has been
226  * extracted from the channel by pop() calls, any further pop() calls
227  * will return false. At that point the consumer thread can abandon
228  * and destroy the AsyncChannel object.
229  *
230  * Since 1.2.42
231  */
232  void close() {
233  Thread::Mutex::Lock lock(mutex);
234  if (status == normal) {
235  status = closed;
236  cond.broadcast();
237  }
238  }
239 
240 /**
241  * Pushes an item onto the channel. This method will only throw if the
242  * copy constructor of the pushed item throws, and has strong
243  * exception safety in such a case. It is thread safe.
244  *
245  * If the number of items already in the channel is equal to the size
246  * of the channel, then this method blocks until either room becomes
247  * available for the item by virtue of another thread calling the
248  * pop() method, or another thread calls the close() method. If it
249  * blocks, the wait comprises a cancellation point. This method is
250  * cancellation safe if the stack unwinds on cancellation, as
251  * cancellation is blocked while the channel is being operated on
252  * after coming out of a wait.
253  *
254  * @param obj The item to be pushed onto the channel.
255  * @return If the push succeeds (whether after blocking or not
256  * blocking) this method returns true. If this method unblocks
257  * because the channel has been closed, or any subsequent calls to
258  * this method are made after the channel has been closed, this method
259  * returns false.
260  *
261  * Since 1.2.42
262  */
263  bool push(const value_type& obj) {
264  bool waiting = false;
265  Thread::Mutex::Lock lock(mutex);
266  if (status != normal) return false;
267 
268  // the only function call that could be a cancellation point
269  // within this pthread_cleanup_push/pop block is the call to
270  // Cond::wait() - if there is a cancellation request while
271  // blocking in that call we need to decrement the waiters count
272  // with a cancellation handler. The push is efficient enough not
273  // to have to unlock the mutex - the cleanup_push/pop block
274  // enables the cleanup macros to construct all the cleanup datum
275  // on the function stack set up at function entry
276  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
277  while (size >= n && status == normal) {
278  if (!waiting) {
279  ++waiters;
280  waiting = true;
281  }
282  cond.wait(mutex); // cancellation point
283  }
285  pthread_cleanup_pop(false);
286  // we need to keep a local copy of 'status' because as soon as
287  // 'waiters' is decremented the AsyncChannel object could be
288  // destroyed if status == destructing
289  Status local_status = status;
290  if (waiting) --waiters;
291  if (local_status != normal) return false;
292  // next is the index of the next available space in the channel
293  size_type next = (idx + size) % n;
294  new (static_cast<void*>(buf + next)) T(obj);
295  ++size;
296  cond.broadcast();
297  return true;
298  }
299 
300 /**
301  * Pops an item from the channel using the item type's copy assignment
302  * operator. This method will only throw if that operator throws or
303  * the contained item's destructor throws. It has strong exception
304  * safety, provided the destructor of the contained item does not
305  * throw. It is thread safe.
306  *
307  * If the channel is empty, then this method blocks until either an
308  * item becomes available by virtue of another thread calling the
309  * push() method, or another thread calls the close() method. If it
310  * blocks, the wait comprises a cancellation point. This method is
311  * cancellation safe if the stack unwinds on cancellation, as
312  * cancellation is blocked while the channel is being operated on
313  * after coming out of a wait.
314  *
315  * @param obj A value type reference to which the item at the front of
316  * the channel will be copy assigned.
317  * @return If the pop succeeds (whether after blocking or not
318  * blocking) this method returns true. If this method unblocks
319  * because the channel has been closed or any subsequent calls to this
320  * method are made, and there are no remaining items in the channel,
321  * this method returns false.
322  *
323  * Since 1.2.42
324  */
325  bool pop(value_type& obj) {
326  bool waiting = false;
327  Thread::Mutex::Lock lock(mutex);
328 
329  // the only function call that could be a cancellation point
330  // within this pthread_cleanup_push/pop block is the call to
331  // Cond::wait() - if there is a cancellation request while
332  // blocking in that call we need to decrement the waiters count
333  // with a cancellation handler. The push is efficient enough not
334  // to have to unlock the mutex - the cleanup_push/pop block
335  // enables the cleanup macros to construct all the cleanup datum
336  // on the function stack set up at function entry
337  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
338  while (!size && status == normal) {
339  if (!waiting) {
340  ++waiters;
341  waiting = true;
342  }
343  cond.wait(mutex);
344  }
346  pthread_cleanup_pop(false);
347 
348  if (status == destructing) {
349  // decrementing waiters must be the last thing we do as it might
350  // cause the destructor to return
351  if (waiting) --waiters;
352  return false;
353  }
354  else if (size) { // status == normal, or status == closed && size > 0
355  size_type old_idx = idx;
356  obj = buf[old_idx];
357  ++idx;
358  if (idx == n) idx = 0;
359  --size;
360  try {
361  buf[old_idx].~T();
362  }
363  catch (...) {
364  // we might as well keep the AsyncChannel object's waiters in
365  // a workable state even if the item's destructor throws
366  if (waiting) --waiters;
367  cond.broadcast();
368  throw;
369  }
370  if (waiting) --waiters;
371  cond.broadcast();
372  return true;
373  }
374  else { // status == closed and size == 0
375  if (waiting) --waiters;
376  return false;
377  }
378  }
379 
380 /**
381  * AsyncChannel objects are instantiated with firstly a template type
382  * 'T' and secondly a template integer value 'n'. 'T' is the type of
383  * the data items to be placed on the queue. 'n' is the size of the
384  * channel, which must be greater than 0. However, a circular buffer
385  * for that size will be allocated at construction time by this
386  * default constructor, so the given size should not be unnecessarily
387  * large. Where a large AsyncChannel object would be required,
388  * consider using AsyncQueueDispatch instead, which sizes itself
389  * dynamically.
390  *
391  * @exception std::bad_alloc The default constructor might throw this
392  * exception if memory is exhausted and the system throws in that
393  * case.
394  * @exception Thread::MutexError The default constructor might throw
395  * this exception if initialisation of the contained mutex fails. (It
396  * is often not worth checking for this, as it means either memory is
397  * exhausted or pthread has run out of other resources to create new
398  * mutexes.)
399  * @exception Thread::CondError The default constructor might throw
400  * this exception if initialisation of the contained condition
401  * variable fails. (It is often not worth checking for this, as it
402  * means either memory is exhausted or pthread has run out of other
403  * resources to create new condition variables.)
404  *
405  * Since 1.2.42
406  */
407  AsyncChannel(): size(0), idx(0), waiters(0), status(normal),
408  buf(static_cast<T*>(std::malloc(sizeof(T) * n))) {
409  if (!buf) throw std::bad_alloc();
410  }
411 
412  /**
413  * The destructor does not throw unless the destructor of a data item
414  * in the channel throws. It is thread safe (any thread may delete
415  * the AsyncChannel object).
416  *
417  * It is not an error for a thread to destroy the AsyncChannel object
418  * and so invoke this destructor while another thread is blocking on
419  * it: instead the destructor will release any blocking threads. The
420  * destructor will not return until all threads (if any) blocking on
421  * the AsyncChannel object have been released.
422  *
423  * Since 1.2.42
424  */
426  mutex.lock();
427  status = destructing;
428  mutex.unlock();
429  cond.broadcast();
430 
431  // since all a waiting thread does upon status == destructing is
432  // to unblock and return, it is more efficient to spin gracefully
433  // until 'waiters' is 0 instead of starting another condition
434  // variable wait
435  for (;;) {
436  mutex.lock();
437  if (waiters) {
438  mutex.unlock();
439 #ifdef CGU_USE_SCHED_YIELD
440  sched_yield();
441 #else
442  usleep(10);
443 #endif
444  }
445  else {
446  mutex.unlock();
447  break;
448  }
449  }
450  while (size) {
451  buf[idx].~T();
452  ++idx;
453  if (idx == n) idx = 0;
454  --size;
455  }
456  std::free(buf);
457  }
458 
459 /* Only has effect if --with-glib-memory-slices-compat or
460  * --with-glib-memory-slices-no-compat option picked */
462 };
463 
464 #ifndef DOXYGEN_PARSING
465 
466 /* This is a specialization of AsyncChannel when instantiated with a
467  size of 1. This specialization allows a number of optimizations
468  for that case.
469 */
470 template <class T>
471 class AsyncChannel<T, 1> {
472 public:
473  typedef T value_type;
474  typedef std::size_t size_type;
475 private:
476  mutable Thread::Mutex mutex;
477  Thread::Cond cond;
478  size_type waiters;
479  bool full;
480  enum Status {normal, closed, destructing} status;
481  T* datum;
482 
483  AsyncChannel(const AsyncChannel&);
484  AsyncChannel& operator=(const AsyncChannel&);
485 public:
486  void close() {
487  Thread::Mutex::Lock lock(mutex);
488  if (status == normal) {
489  status = closed;
490  cond.broadcast();
491  }
492  }
493 
494  bool push(const value_type& obj) {
495  bool waiting = false;
496  Thread::Mutex::Lock lock(mutex);
497  if (status != normal) return false;
498 
499  // the only function call that could be a cancellation point
500  // within this pthread_cleanup_push/pop block is the call to
501  // Cond::wait() - if there is a cancellation request while
502  // blocking in that call we need to decrement the waiters count
503  // with a cancellation handler. The push is efficient enough not
504  // to have to unlock the mutex - the cleanup_push/pop block
505  // enables the cleanup macros to construct all the cleanup datum
506  // on the function stack set up at function entry
507  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
508  while (full && status == normal) {
509  if (!waiting) {
510  ++waiters;
511  waiting = true;
512  }
513  cond.wait(mutex); // cancellation point
514  }
515  Thread::CancelBlock b;
516  pthread_cleanup_pop(false);
517  // we need to keep a local copy of 'status' because as soon as
518  // 'waiters' is decremented the AsyncChannel object could be
519  // destroyed if status == destructing
520  Status local_status = status;
521  if (waiting) --waiters;
522  if (local_status != normal) return false;
523  new (static_cast<void*>(datum)) T(obj);
524  full = true;
525  cond.broadcast();
526  return true;
527  }
528 
529  bool pop(value_type& obj) {
530  bool waiting = false;
531  Thread::Mutex::Lock lock(mutex);
532 
533  // the only function call that could be a cancellation point
534  // within this pthread_cleanup_push/pop block is the call to
535  // Cond::wait() - if there is a cancellation request while
536  // blocking in that call we need to decrement the waiters count
537  // with a cancellation handler. The push is efficient enough not
538  // to have to unlock the mutex - the cleanup_push/pop block
539  // enables the cleanup macros to construct all the cleanup datum
540  // on the function stack set up at function entry
541  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
542  while (!full && status == normal) {
543  if (!waiting) {
544  ++waiters;
545  waiting = true;
546  }
547  cond.wait(mutex);
548  }
549  Thread::CancelBlock b;
550  pthread_cleanup_pop(false);
551 
552  if (status == destructing) {
553  // decrementing waiters must be the last thing we do as it might
554  // cause the destructor to return
555  if (waiting) --waiters;
556  return false;
557  }
558  else if (full) { // status == normal, or status == closed && full
559  obj = *datum;
560  full = false;
561  try {
562  datum->~T();
563  }
564  catch (...) {
565  // we might as well keep the AsyncChannel object's waiters in
566  // a workable state even if the item's destructor throws
567  if (waiting) --waiters;
568  cond.broadcast();
569  throw;
570  }
571  if (waiting) --waiters;
572  cond.broadcast();
573  return true;
574  }
575  else { // status == closed and !full
576  if (waiting) --waiters;
577  return false;
578  }
579  }
580 
581  AsyncChannel(): waiters(0), full(false), status(normal),
582  datum(static_cast<T*>(std::malloc(sizeof(T)))) {
583  if (!datum) throw std::bad_alloc();
584  }
585 
586  ~AsyncChannel() {
587  mutex.lock();
588  status = destructing;
589  mutex.unlock();
590  cond.broadcast();
591 
592  // since all a waiting thread does upon status == destructing is
593  // to unblock and return, it is more efficient to spin gracefully
594  // until 'waiters' is 0 instead of starting another condition
595  // variable wait
596  for (;;) {
597  mutex.lock();
598  if (waiters) {
599  mutex.unlock();
600 #ifdef CGU_USE_SCHED_YIELD
601  sched_yield();
602 #else
603  usleep(10);
604 #endif
605  }
606  else {
607  mutex.unlock();
608  break;
609  }
610  }
611  if (full) datum->~T();
612  std::free(datum);
613  }
614 
616 };
617 
618 #endif // DOXYGEN_PARSING
619 
620 } // namespace Cgu
621 
622 #endif // CGU_ASYNC_CHANNEL_H
Cgu
Definition: application.h:45
Cgu::Thread::Cond::broadcast
int broadcast()
Definition: mutex.h:451
Cgu::AsyncChannel::size_type
std::size_t size_type
Definition: async_channel.h:189
Cgu::Thread::Cond
A wrapper class for pthread condition variables.
Definition: mutex.h:424
Cgu::AsyncChannel::~AsyncChannel
~AsyncChannel()
Definition: async_channel.h:425
Cgu::AsyncChannel::pop
bool pop(value_type &obj)
Definition: async_channel.h:325
Cgu::AsyncChannel
A thread-safe "channel" class for inter-thread communication.
Definition: async_channel.h:186
Cgu::Thread::Cond::wait
int wait(Mutex &mutex)
Definition: mutex.h:476
Cgu::AsyncChannel::push
bool push(const value_type &obj)
Definition: async_channel.h:263
Cgu::Thread::Mutex::lock
int lock()
Definition: mutex.h:132
Cgu::AsyncChannel::AsyncChannel
AsyncChannel()
Definition: async_channel.h:407
Cgu::Thread::Mutex::Lock
A scoped locking class for exception safe Mutex locking.
Definition: mutex.h:192
CGU_GLIB_MEMORY_SLICES_FUNCS
#define CGU_GLIB_MEMORY_SLICES_FUNCS
Definition: cgu_config.h:84
Cgu::cgu_async_channel_waiters_dec
void cgu_async_channel_waiters_dec(void *arg)
Definition: async_channel.h:96
Cgu::Thread::CancelBlock
A class enabling the cancellation state of a thread to be controlled.
Definition: thread.h:571
mutex.h
Provides wrapper classes for pthread mutexes and condition variables, and scoped locking classes for ...
Cgu::Thread::Mutex::unlock
int unlock()
Definition: mutex.h:155
Cgu::AsyncChannel::close
void close()
Definition: async_channel.h:232
Cgu::AsyncChannel::value_type
T value_type
Definition: async_channel.h:188
thread.h
Cgu::Thread::Mutex
A wrapper class for pthread mutexes.
Definition: mutex.h:109
cgu_config.h