c++-gtk-utils
async_channel.h
Go to the documentation of this file.
1 /* Copyright (C) 2016 Chris Vine
2 
3 The library comprised in this file or of which this file is part is
4 distributed by Chris Vine under the GNU Lesser General Public
5 License as follows:
6 
7  This library is free software; you can redistribute it and/or
8  modify it under the terms of the GNU Lesser General Public License
9  as published by the Free Software Foundation; either version 2.1 of
10  the License, or (at your option) any later version.
11 
12  This library is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License, version 2.1, for more details.
16 
17  You should have received a copy of the GNU Lesser General Public
18  License, version 2.1, along with this library (see the file LGPL.TXT
19  which came with this source code package in the c++-gtk-utils
20  sub-directory); if not, write to the Free Software Foundation, Inc.,
21  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 
23 However, it is not intended that the object code of a program whose
24 source code instantiates a template from this file or uses macros or
25 inline functions (of any length) should by reason only of that
26 instantiation or use be subject to the restrictions of use in the GNU
27 Lesser General Public License. With that in mind, the words "and
28 macros, inline functions and instantiations of templates (of any
29 length)" shall be treated as substituted for the words "and small
30 macros and small inline functions (ten lines or less in length)" in
31 the fourth paragraph of section 5 of that licence. This does not
32 affect any other reason why object code may be subject to the
33 restrictions in that licence (nor for the avoidance of doubt does it
34 affect the application of section 2 of that licence to modifications
35 of the source code in this file).
36 
37 */
38 
39 /**
40  * @file async_channel.h
41  * @brief This file provides a "channel" class for inter-thread communication.
42  *
43  * AsyncChannel is similar to the AsyncQueueDispatch class, in that it
44  * provides a means of sending data between threads. Producer threads
45  * push data onto the queue and consumer threads pop them off in a
46  * thread safe way, and if there are no data in the channel any
47  * consumer thread will block until a producer thread pushes an item
48  * onto it. However, unlike the AsyncQueueDispatch class, it has a
49  * fixed maximum size as part of its type, which may be any size
50  * greater than 0, and if the number of data items still in the
51  * channel is such as to make the channel full, then producer threads
52  * will block on the channel until a consumer thread pops an item from
53  * it.
54  *
55  * It therefore provides for back pressure on producer threads which
56  * will automatically prevent the channel being overwhelmed by
57  * producer threads pushing more items onto the queue than consumer
58  * threads have the capacity to take off it.
59  *
60  * AsyncChannel is useful where this feature is important, and an
61  * AsyncChannel object can be used with any number of producer threads
62  * and consumer threads. However, under heavy contention with complex
63  * data item types AsyncQueueDispatch objects will usually be faster.
64  * Under lower contention and with simpler data types (or where
65  * temporary objects with non-complex move constructors are pushed),
66  * an AsyncChannel object may be faster as it can benefit from its
67  * fixed buffer size (the AsyncChannel implementation uses a circular
68  * buffer with in-buffer construction of data items).
69  *
70  * This class is available from version 2.0.31.
71  */
72 
73 #ifndef CGU_ASYNC_CHANNEL_H
74 #define CGU_ASYNC_CHANNEL_H
75 
76 #include <utility> // for std::move and std::forward
77 #include <new> // for std::bad_alloc
78 #include <cstddef> // for std::size_t
79 #include <cstdlib> // for std::malloc
80 
81 #include <pthread.h>
82 
83 #include <c++-gtk-utils/mutex.h>
84 #include <c++-gtk-utils/thread.h>
86 
87 #ifdef CGU_USE_SCHED_YIELD
88 #include <sched.h>
89 #else
90 #include <unistd.h>
91 #endif
92 
93 namespace Cgu {
94 
95 // function for pthread_cleanup_push() in AsyncChannel::push() and
96 // AsyncChannel::pop() methods
97 extern "C" {
98 inline void cgu_async_channel_waiters_dec(void* arg) {
99  --(*static_cast<std::size_t*>(arg));
100 }
101 } // extern "C"
102 
103 /**
104  * @class AsyncChannel async_channel.h c++-gtk-utils/async_channel.h
105  * @brief A thread-safe "channel" class for inter-thread communication.
106  * @sa AsyncQueue AsyncQueueDispatch AsyncChannel AsyncResult
107  *
108  * AsyncChannel is similar to the AsyncQueueDispatch class, in that it
109  * provides a means of sending data between threads. Producer threads
110  * push data onto the queue and consumer threads pop them off in a
111  * thread safe way, and if there are no data in the channel any
112  * consumer thread will block until a producer thread pushes an item
113  * onto it. However, unlike the AsyncQueueDispatch class, it has a
114  * fixed maximum size as part of its type, which may be any size
115  * greater than 0, and if the number of data items still in the
116  * channel is such as to make the channel full, then producer threads
117  * will block on the channel until a consumer thread pops an item from
118  * it.
119  *
120  * It therefore provides for back pressure on producer threads which
121  * will automatically prevent the channel being overwhelmed by
122  * producer threads pushing more items onto the queue than consumer
123  * threads have the capacity to take off it.
124  *
125  * AsyncChannel is useful where this feature is important, and an
126  * AsyncChannel object can be used with any number of producer threads
127  * and consumer threads. However, under heavy contention with complex
128  * data item types AsyncQueueDispatch objects will usually be faster.
129  * Under lower contention and with simpler data types (or where
130  * temporary objects with non-complex move constructors are pushed),
131  * an AsyncChannel object may be faster as it can benefit from its
132  * fixed buffer size (the AsyncChannel implementation uses a circular
133  * buffer with in-buffer construction of data items).
134  *
135  * AsyncChannel objects are instantiated with firstly a template type
136  * 'T' and secondly a template integer value 'n'. 'T' is the type of
137  * the data items to be placed on the queue. 'n' is the size of the
138  * channel, which as mentioned may be any size greater than 0.
139  *
140  * This class is available from version 2.0.31.
141  */
142 
143 /*
144  * We have to use Thread::Cond::broadcast() and not
145  * Thread::Cond::signal(), because it is possible to have at any one
146  * time both a producer and a consumer waiting on the AsyncChannel's
147  * condition variable. This can create a "thundering herd" problem if
148  * there are a large number of threads waiting on the channel, but it
149  * is within the range of the acceptable. As an example of the issue
150  * requiring this approach, take this case:
151  *
152  * Let there be a channel which has a capacity of one. Let the
153  * channel already have an item in it from some past producer. In
154  * addition, let two producer threads be currently in a cond-wait,
155  * waiting for the channel to become empty in order to put an item in
156  * it.
157  *
158  * A first consumer thread starts to remove the item in the channel.
159  * It acquires the channel's mutex without blocking because it is not
160  * locked. It sees there is an item in the channel so does not begin
161  * a cond-wait. Meanwhile, immediately after the first consumer
162  * thread acquires the mutex a second consumer thread tries to obtain
163  * an item from the channel and thus will block when trying to acquire
164  * the locked mutex.
165  *
166  * The first consumer thread then removes the item from the channel's
167  * queue and signals the cond-var - which causes one of the producer
168  * threads to wake-up and block on trying to acquire the mutex ("the
169  * first producer"). So for the producers, we now have the first
170  * producer contending on the mutex after being so awoken and the
171  * second producer still waiting on the cond-var. And we have two
172  * threads now contending on the mutex - the thread of the second
173  * consumer and the thread of the first producer. One of these
174  * threads will acquire the mutex when the first consumer releases it
175  * after signalling the cond-var, and it is unspecified which one.
176  *
177  * If it is the second consumer, it will find the channel empty and
178  * thus enter a cond-wait (so we now have the second producer and the
179  * second consumer waiting on the same cond-var) and release the
180  * mutex. This will cause the first producer to acquire the mutex,
181  * which will then add the item to the channel, signal the cond-var
182  * (which will cause either the second producer or second consumer to
183  * awaken), and release the mutex. If it is the second producer which
184  * awakens, its thread will acquire the mutex, find the channel full,
185  * enter a cond-wait again and release the mutex. Now we are stuffed
186  * because there is nothing to awaken the remaining consumer even
187  * though there is something in the channel.
188  */
189 template <class T, std::size_t n> class AsyncChannel {
190 public:
191  typedef T value_type;
192  typedef std::size_t size_type;
193 private:
194  mutable Thread::Mutex mutex;
195  Thread::Cond cond;
196  size_type size; // number of available items in channel
197  size_type idx; // index of first available item in channel (this
198  // value is meaningless when size == 0)
199  size_type waiters;
200  enum Status {normal, closed, destructing} status;
201  // TODO: when this library moves to a minimum requirement of
202  // gcc-4.8, use an object-resident char array for 'buf' with
203  // alignas<T>, to improve locality
204  T* buf;
205 
206 public:
207 /**
208  * This class cannot be copied. The copy constructor is deleted.
209  */
210  AsyncChannel(const AsyncChannel&) = delete;
211 
212 /**
213  * This class cannot be copied. The assignment operator is deleted.
214  */
215  AsyncChannel& operator=(const AsyncChannel&) = delete;
216 
217 /**
218  * Closes the channel. This means that (i) any threads blocking on a
219  * full channel with a call to push() or emplace() will unblock with a
220  * false return value, and any calls to those methods after the
221  * closure will return immediately with a false return value, (ii) any
222  * threads blocking on an empty channel with calls to pop() or
223  * move_pop() will unblock with a false return value, (iii) any data
224  * items remaining in the channel which were pushed to the channel
225  * prior to the closure of the channel can be popped after that
226  * closure by further calls to pop() or move_pop(), which will return
227  * normally with a true return value, and (iv) after any such
228  * remaining data items have been removed, any subsequent calls to
229  * pop() or move_pop() will return with a false return value.
230  *
231  * If called more than once, this method will do nothing.
232  *
233  * This method will not throw. It is thread safe - any thread may
234  * call it.
235  *
236  * One of the main purposes of this method is to enable a producer
237  * thread to inform a consumer thread that nothing more will be put in
238  * the channel by it for the consumer: for such cases, as mentioned
239  * once everything pushed to the channel prior to its closure has been
240  * extracted from the channel by pop() or move_pop() calls, any
241  * further pop() or move_pop() calls will return false. At that point
242  * the consumer thread can abandon and destroy the AsyncChannel
243  * object.
244  *
245  * Since 2.0.31
246  */
247  void close() {
248  Thread::Mutex::Lock lock{mutex};
249  if (status == normal) {
250  status = closed;
251  cond.broadcast();
252  }
253  }
254 
255 /**
256  * Pushes an item onto the channel. This method will only throw if the
257  * copy constructor of the pushed item throws, and has strong
258  * exception safety in such a case. It is thread safe.
259  *
260  * If the number of items already in the channel is equal to the size
261  * of the channel, then this method blocks until either room becomes
262  * available for the item by virtue of another thread calling the
263  * pop() or move_pop() methods, or another thread calls the close()
264  * method. If it blocks, the wait comprises a cancellation
265  * point. This method is cancellation safe if the stack unwinds on
266  * cancellation, as cancellation is blocked while the channel is being
267  * operated on after coming out of a wait.
268  *
269  * @param obj The item to be pushed onto the channel.
270  * @return If the push succeeds (whether after blocking or not
271  * blocking) this method returns true. If this method unblocks
272  * because the channel has been closed, or any subsequent calls to
273  * this method are made after the channel has been closed, this method
274  * returns false.
275  *
276  * Since 2.0.31
277  */
278  bool push(const value_type& obj) {
279  bool waiting = false;
280  Thread::Mutex::Lock lock{mutex};
281  if (status != normal) return false;
282 
283  // the only function call that could be a cancellation point
284  // within this pthread_cleanup_push/pop block is the call to
285  // Cond::wait() - if there is a cancellation request while
286  // blocking in that call we need to decrement the waiters count
287  // with a cancellation handler. The push is efficient enough not
288  // to have to unlock the mutex - the cleanup_push/pop block
289  // enables the cleanup macros to construct all the cleanup datum
290  // on the function stack set up at function entry
291  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
292  while (size >= n && status == normal) {
293  if (!waiting) {
294  ++waiters;
295  waiting = true;
296  }
297  cond.wait(mutex); // cancellation point
298  }
300  pthread_cleanup_pop(false);
301  // we need to keep a local copy of 'status' because as soon as
302  // 'waiters' is decremented the AsyncChannel object could be
303  // destroyed if status == destructing
304  Status local_status = status;
305  if (waiting) --waiters;
306  if (local_status != normal) return false;
307  // next is the index of the next available space in the channel
308  size_type next = (idx + size) % n;
309  new (static_cast<void*>(buf + next)) T{obj};
310  ++size;
311  cond.broadcast();
312  return true;
313  }
314 
315 /**
316  * Pushes an item onto the channel. This method will only throw if the
317  * move constructor, or if none the copy constructor, of the pushed
318  * item throws, and has strong exception safety in such a case. It is
319  * thread safe.
320  *
321  * If the number of items already in the channel is equal to the size
322  * of the channel, then this method blocks until either room becomes
323  * available for the item by virtue of another thread calling the
324  * pop() or move_pop() methods, or another thread calls the close()
325  * method. If it blocks, the wait comprises a cancellation
326  * point. This method is cancellation safe if the stack unwinds on
327  * cancellation, as cancellation is blocked while the channel is being
328  * operated on after coming out of a wait.
329  *
330  * @param obj The item to be pushed onto the channel.
331  * @return If the push succeeds (whether after blocking or not
332  * blocking) this method returns true. If this method unblocks
333  * because the channel has been closed, or any subsequent calls to
334  * this method are made after the channel has been closed, this method
335  * returns false.
336  *
337  * Since 2.0.31
338  */
339  bool push(value_type&& obj) {
340  bool waiting = false;
341  Thread::Mutex::Lock lock{mutex};
342  if (status != normal) return false;
343 
344  // the only function call that could be a cancellation point
345  // within this pthread_cleanup_push/pop block is the call to
346  // Cond::wait() - if there is a cancellation request while
347  // blocking in that call we need to decrement the waiters count
348  // with a cancellation handler. The push is efficient enough not
349  // to have to unlock the mutex - the cleanup_push/pop block
350  // enables the cleanup macros to construct all the cleanup datum
351  // on the function stack set up at function entry
352  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
353  while (size >= n && status == normal) {
354  if (!waiting) {
355  ++waiters;
356  waiting = true;
357  }
358  cond.wait(mutex); // cancellation point
359  }
361  pthread_cleanup_pop(false);
362  // we need to keep a local copy of 'status' because as soon as
363  // 'waiters' is decremented the AsyncChannel object could be
364  // destroyed if status == destructing
365  Status local_status = status;
366  if (waiting) --waiters;
367  if (local_status != normal) return false;
368  // next is the index of the next available space in the channel
369  size_type next = (idx + size) % n;
370  new (static_cast<void*>(buf + next)) T{std::move(obj)};
371  ++size;
372  cond.broadcast();
373  return true;
374  }
375 
376 /**
377  * Pushes an item onto the channel by constructing it in place from
378  * the given arguments. This method will only throw if the constructor
379  * of the emplaced item throws, and has strong exception safety in
380  * such a case. It is thread safe.
381  *
382  * If the number of items already in the channel is equal to the size
383  * of the channel, then this method blocks until either room becomes
384  * available for the item by virtue of another thread calling the
385  * pop() or move_pop() methods, or another thread calls the close()
386  * method. If it blocks, the wait comprises a cancellation
387  * point. This method is cancellation safe if the stack unwinds on
388  * cancellation, as cancellation is blocked while the channel is being
389  * operated on after coming out of a wait.
390  *
391  * @param args The constructor arguments for the item to be pushed
392  * onto the channel.
393  * @return If the push succeeds (whether after blocking or not
394  * blocking) this method returns true. If this method unblocks
395  * because the channel has been closed, or any subsequent calls to
396  * this method are made after the channel has been closed, this method
397  * returns false.
398  *
399  * Since 2.0.31
400  */
401  template<class... Args>
402  bool emplace(Args&&... args) {
403  bool waiting = false;
404  Thread::Mutex::Lock lock{mutex};
405  if (status != normal) return false;
406 
407  // the only function call that could be a cancellation point
408  // within this pthread_cleanup_push/pop block is the call to
409  // Cond::wait() - if there is a cancellation request while
410  // blocking in that call we need to decrement the waiters count
411  // with a cancellation handler. The push is efficient enough not
412  // to have to unlock the mutex - the cleanup_push/pop block
413  // enables the cleanup macros to construct all the cleanup datum
414  // on the function stack set up at function entry
415  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
416  while (size >= n && status == normal) {
417  if (!waiting) {
418  ++waiters;
419  waiting = true;
420  }
421  cond.wait(mutex); // cancellation point
422  }
424  pthread_cleanup_pop(false);
425  // we need to keep a local copy of 'status' because as soon as
426  // 'waiters' is decremented the AsyncChannel object could be
427  // destroyed if status == destructing
428  Status local_status = status;
429  if (waiting) --waiters;
430  if (local_status != normal) return false;
431  // next is the index of the next available space in the channel
432  size_type next = (idx + size) % n;
433  new (static_cast<void*>(buf + next)) T{std::forward<Args>(args)...};
434  ++size;
435  cond.broadcast();
436  return true;
437  }
438 
439 /**
440  * Pops an item from the channel using the item type's copy assignment
441  * operator. This method will only throw if that operator throws or
442  * the contained item's destructor throws. It has strong exception
443  * safety, provided the destructor of the contained item does not
444  * throw. See also the move_pop() method. It is thread safe.
445  *
446  * If the channel is empty, then this method blocks until either an
447  * item becomes available by virtue of another thread calling the
448  * emplace() or push() methods, or another thread calls the close()
449  * method. If it blocks, the wait comprises a cancellation
450  * point. This method is cancellation safe if the stack unwinds on
451  * cancellation, as cancellation is blocked while the channel is being
452  * operated on after coming out of a wait.
453  *
454  * @param obj A value type reference to which the item at the front of
455  * the channel will be copy assigned.
456  * @return If the pop succeeds (whether after blocking or not
457  * blocking) this method returns true. If this method unblocks
458  * because the channel has been closed or any subsequent calls to this
459  * method are made, and there are no remaining items in the channel,
460  * this method returns false.
461  *
462  * Since 2.0.31
463  */
464  bool pop(value_type& obj) {
465  bool waiting = false;
466  Thread::Mutex::Lock lock{mutex};
467 
468  // the only function call that could be a cancellation point
469  // within this pthread_cleanup_push/pop block is the call to
470  // Cond::wait() - if there is a cancellation request while
471  // blocking in that call we need to decrement the waiters count
472  // with a cancellation handler. The push is efficient enough not
473  // to have to unlock the mutex - the cleanup_push/pop block
474  // enables the cleanup macros to construct all the cleanup datum
475  // on the function stack set up at function entry
476  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
477  while (!size && status == normal) {
478  if (!waiting) {
479  ++waiters;
480  waiting = true;
481  }
482  cond.wait(mutex);
483  }
485  pthread_cleanup_pop(false);
486 
487  if (status == destructing) {
488  // decrementing waiters must be the last thing we do as it might
489  // cause the destructor to return
490  if (waiting) --waiters;
491  return false;
492  }
493  else if (size) { // status == normal, or status == closed && size > 0
494  size_type old_idx = idx;
495  obj = buf[old_idx];
496  ++idx;
497  if (idx == n) idx = 0;
498  --size;
499  try {
500  buf[old_idx].~T();
501  }
502  catch (...) {
503  // we might as well keep the AsyncChannel object's waiters in
504  // a workable state even if the item's destructor throws
505  if (waiting) --waiters;
506  cond.broadcast();
507  throw;
508  }
509  if (waiting) --waiters;
510  cond.broadcast();
511  return true;
512  }
513  else { // status == closed and size == 0
514  if (waiting) --waiters;
515  return false;
516  }
517  }
518 
519 /**
520  * Pops an item from the channel using the item type's move assignment
521  * operator if it has one, or if not its copy assignment operator
522  * (this method is identical to the pop() method if that type has no
523  * move assignment operator). This method will only throw if that
524  * operator throws or the contained item's destructor throws. It has
525  * strong exception safety, provided the destructor of the contained
526  * item does not throw and the move assignment operator of the
527  * contained item has strong exception safety. Use this method in
528  * preference to the pop() method if it is known that the contained
529  * items' move assignment operator does not throw or is strongly
530  * exception safe, or if the use case does not require strong
531  * exception safety. This method must be used in place of the pop()
532  * method if the contained item has a move assignment operator but no
533  * copy assignment operator (such as a std::unique_ptr object). It is
534  * thread safe.
535  *
536  * If the channel is empty, then this method blocks until either an
537  * item becomes available by virtue of another thread calling the
538  * emplace() or push() methods, or another thread calls the close()
539  * method. If it blocks, the wait comprises a cancellation
540  * point. This method is cancellation safe if the stack unwinds on
541  * cancellation, as cancellation is blocked while the channel is being
542  * operated on after coming out of a wait.
543  *
544  * @param obj A value type reference to which the item at the front of
545  * the channel will be move assigned.
546  * @return If the pop succeeds (whether after blocking or not
547  * blocking) this method returns true. If this method unblocks
548  * because the channel has been closed or any subsequent calls to this
549  * method are made, and there are no remaining items in the channel,
550  * this method returns false.
551  *
552  * Since 2.0.31
553  */
554  bool move_pop(value_type& obj) {
555  bool waiting = false;
556  Thread::Mutex::Lock lock{mutex};
557 
558  // the only function call that could be a cancellation point
559  // within this pthread_cleanup_push/pop block is the call to
560  // Cond::wait() - if there is a cancellation request while
561  // blocking in that call we need to decrement the waiters count
562  // with a cancellation handler. The push is efficient enough not
563  // to have to unlock the mutex - the cleanup_push/pop block
564  // enables the cleanup macros to construct all the cleanup datum
565  // on the function stack set up at function entry
566  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
567  while (!size && status == normal) {
568  if (!waiting) {
569  ++waiters;
570  waiting = true;
571  }
572  cond.wait(mutex);
573  }
575  pthread_cleanup_pop(false);
576 
577  if (status == destructing) {
578  // decrementing waiters must be the last thing we do as it might
579  // cause the destructor to return
580  if (waiting) --waiters;
581  return false;
582  }
583  else if (size) { // status == normal, or status == closed && size > 0
584  size_type old_idx = idx;
585  obj = std::move(buf[old_idx]);
586  ++idx;
587  if (idx == n) idx = 0;
588  --size;
589  try {
590  buf[old_idx].~T();
591  }
592  catch (...) {
593  // we might as well keep the AsyncChannel object's waiters in
594  // a workable state even if the item's destructor throws
595  if (waiting) --waiters;
596  cond.broadcast();
597  throw;
598  }
599  if (waiting) --waiters;
600  cond.broadcast();
601  return true;
602  }
603  else { // status == closed and size == 0
604  if (waiting) --waiters;
605  return false;
606  }
607  }
608 
609 /**
610  * AsyncChannel objects are instantiated with firstly a template type
611  * 'T' and secondly a template integer value 'n'. 'T' is the type of
612  * the data items to be placed on the queue. 'n' is the size of the
613  * channel, which must be greater than 0. However, a circular buffer
614  * for that size will be allocated at construction time by this
615  * default constructor, so the given size should not be unnecessarily
616  * large. Where a large AsyncChannel object would be required,
617  * consider using AsyncQueueDispatch instead, which sizes itself
618  * dynamically.
619  *
620  * @exception std::bad_alloc The default constructor might throw this
621  * exception if memory is exhausted and the system throws in that
622  * case.
623  * @exception Thread::MutexError The default constructor might throw
624  * this exception if initialisation of the contained mutex fails. (It
625  * is often not worth checking for this, as it means either memory is
626  * exhausted or pthread has run out of other resources to create new
627  * mutexes.)
628  * @exception Thread::CondError The default constructor might throw
629  * this exception if initialisation of the contained condition
630  * variable fails. (It is often not worth checking for this, as it
631  * means either memory is exhausted or pthread has run out of other
632  * resources to create new condition variables.)
633  *
634  * Since 2.0.31
635  */
636  AsyncChannel(): size(0), idx(0), waiters(0), status(normal),
637  // locality would be better if we could use an
638  // object-resident char array for 'buf' with
639  // alignas(T), instead of allocating on the heap
640  // with malloc, but this is not supported by gcc
641  // until gcc-4.8, and we support gcc-4.6 onwards.
642  // Sometimes unions are used in pre-C++11 code to
643  // get round this, but this is only guaranteed to
644  // work where T is a POD (C++98/03) or has standard
645  // layout (C++11/14). Bummer.
646  buf(static_cast<T*>(std::malloc(sizeof(T) * n))) {
647  static_assert(n != 0, "AsyncChannel objects may not be created with size 0");
648  if (!buf) throw std::bad_alloc();
649  }
650 
651  /**
652  * The destructor does not throw unless the destructor of a data item
653  * in the channel throws. It is thread safe (any thread may delete
654  * the AsyncChannel object).
655  *
656  * It is not an error for a thread to destroy the AsyncChannel object
657  * and so invoke this destructor while another thread is blocking on
658  * it: instead the destructor will release any blocking threads. The
659  * destructor will not return until all threads (if any) blocking on
660  * the AsyncChannel object have been released.
661  *
662  * Since 2.0.31
663  */
665  mutex.lock();
666  status = destructing;
667  mutex.unlock();
668  cond.broadcast();
669 
670  // since all a waiting thread does upon status == destructing is
671  // to unblock and return, it is more efficient to spin gracefully
672  // until 'waiters' is 0 instead of starting another condition
673  // variable wait
674  for (;;) {
675  mutex.lock();
676  if (waiters) {
677  mutex.unlock();
678 #ifdef CGU_USE_SCHED_YIELD
679  sched_yield();
680 #else
681  usleep(10);
682 #endif
683  }
684  else {
685  mutex.unlock();
686  break;
687  }
688  }
689  while (size) {
690  buf[idx].~T();
691  ++idx;
692  if (idx == n) idx = 0;
693  --size;
694  }
695  std::free(buf);
696  }
697 
698 /* Only has effect if --with-glib-memory-slices-compat or
699  * --with-glib-memory-slices-no-compat option picked */
701 };
702 
703 #ifndef DOXYGEN_PARSING
704 
705 /* This is a specialization of AsyncChannel when instantiated with a
706  size of 1. This specialization allows a number of optimizations
707  for that case.
708 */
709 template <class T>
710 class AsyncChannel<T, 1> {
711 public:
712  typedef T value_type;
713  typedef std::size_t size_type;
714 private:
715  mutable Thread::Mutex mutex;
716  Thread::Cond cond;
717  size_type waiters;
718  bool full;
719  enum Status {normal, closed, destructing} status;
720  // TODO: when this library moves to a minimum requirement of
721  // gcc-4.8, use an object-resident char array for 'datum' with
722  // alignas<T>, to improve locality
723  T* datum;
724 
725 public:
726  AsyncChannel(const AsyncChannel&) = delete;
727 
728  AsyncChannel& operator=(const AsyncChannel&) = delete;
729 
730  void close() {
731  Thread::Mutex::Lock lock{mutex};
732  if (status == normal) {
733  status = closed;
734  cond.broadcast();
735  }
736  }
737 
738  bool push(const value_type& obj) {
739  bool waiting = false;
740  Thread::Mutex::Lock lock{mutex};
741  if (status != normal) return false;
742 
743  // the only function call that could be a cancellation point
744  // within this pthread_cleanup_push/pop block is the call to
745  // Cond::wait() - if there is a cancellation request while
746  // blocking in that call we need to decrement the waiters count
747  // with a cancellation handler. The push is efficient enough not
748  // to have to unlock the mutex - the cleanup_push/pop block
749  // enables the cleanup macros to construct all the cleanup datum
750  // on the function stack set up at function entry
751  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
752  while (full && status == normal) {
753  if (!waiting) {
754  ++waiters;
755  waiting = true;
756  }
757  cond.wait(mutex); // cancellation point
758  }
759  Thread::CancelBlock b;
760  pthread_cleanup_pop(false);
761  // we need to keep a local copy of 'status' because as soon as
762  // 'waiters' is decremented the AsyncChannel object could be
763  // destroyed if status == destructing
764  Status local_status = status;
765  if (waiting) --waiters;
766  if (local_status != normal) return false;
767  new (static_cast<void*>(datum)) T{obj};
768  full = true;
769  cond.broadcast();
770  return true;
771  }
772 
773  bool push(value_type&& obj) {
774  bool waiting = false;
775  Thread::Mutex::Lock lock{mutex};
776  if (status != normal) return false;
777 
778  // the only function call that could be a cancellation point
779  // within this pthread_cleanup_push/pop block is the call to
780  // Cond::wait() - if there is a cancellation request while
781  // blocking in that call we need to decrement the waiters count
782  // with a cancellation handler. The push is efficient enough not
783  // to have to unlock the mutex - the cleanup_push/pop block
784  // enables the cleanup macros to construct all the cleanup datum
785  // on the function stack set up at function entry
786  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
787  while (full && status == normal) {
788  if (!waiting) {
789  ++waiters;
790  waiting = true;
791  }
792  cond.wait(mutex); // cancellation point
793  }
794  Thread::CancelBlock b;
795  pthread_cleanup_pop(false);
796  // we need to keep a local copy of 'status' because as soon as
797  // 'waiters' is decremented the AsyncChannel object could be
798  // destroyed if status == destructing
799  Status local_status = status;
800  if (waiting) --waiters;
801  if (local_status != normal) return false;
802  new (static_cast<void*>(datum)) T{std::move(obj)};
803  full = true;
804  cond.broadcast();
805  return true;
806  }
807 
808  template<class... Args>
809  bool emplace(Args&&... args) {
810  bool waiting = false;
811  Thread::Mutex::Lock lock{mutex};
812  if (status != normal) return false;
813 
814  // the only function call that could be a cancellation point
815  // within this pthread_cleanup_push/pop block is the call to
816  // Cond::wait() - if there is a cancellation request while
817  // blocking in that call we need to decrement the waiters count
818  // with a cancellation handler. The push is efficient enough not
819  // to have to unlock the mutex - the cleanup_push/pop block
820  // enables the cleanup macros to construct all the cleanup datum
821  // on the function stack set up at function entry
822  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
823  while (full && status == normal) {
824  if (!waiting) {
825  ++waiters;
826  waiting = true;
827  }
828  cond.wait(mutex); // cancellation point
829  }
830  Thread::CancelBlock b;
831  pthread_cleanup_pop(false);
832  // we need to keep a local copy of 'status' because as soon as
833  // 'waiters' is decremented the AsyncChannel object could be
834  // destroyed if status == destructing
835  Status local_status = status;
836  if (waiting) --waiters;
837  if (local_status != normal) return false;
838  new (static_cast<void*>(datum)) T{std::forward<Args>(args)...};
839  full = true;
840  cond.broadcast();
841  return true;
842  }
843 
844  bool pop(value_type& obj) {
845  bool waiting = false;
846  Thread::Mutex::Lock lock{mutex};
847 
848  // the only function call that could be a cancellation point
849  // within this pthread_cleanup_push/pop block is the call to
850  // Cond::wait() - if there is a cancellation request while
851  // blocking in that call we need to decrement the waiters count
852  // with a cancellation handler. The push is efficient enough not
853  // to have to unlock the mutex - the cleanup_push/pop block
854  // enables the cleanup macros to construct all the cleanup datum
855  // on the function stack set up at function entry
856  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
857  while (!full && status == normal) {
858  if (!waiting) {
859  ++waiters;
860  waiting = true;
861  }
862  cond.wait(mutex);
863  }
864  Thread::CancelBlock b;
865  pthread_cleanup_pop(false);
866 
867  if (status == destructing) {
868  // decrementing waiters must be the last thing we do as it might
869  // cause the destructor to return
870  if (waiting) --waiters;
871  return false;
872  }
873  else if (full) { // status == normal, or status == closed && full
874  obj = *datum;
875  full = false;
876  try {
877  datum->~T();
878  }
879  catch (...) {
880  // we might as well keep the AsyncChannel object's waiters in
881  // a workable state even if the item's destructor throws
882  if (waiting) --waiters;
883  cond.broadcast();
884  throw;
885  }
886  if (waiting) --waiters;
887  cond.broadcast();
888  return true;
889  }
890  else { // status == closed and !full
891  if (waiting) --waiters;
892  return false;
893  }
894  }
895 
896  bool move_pop(value_type& obj) {
897  bool waiting = false;
898  Thread::Mutex::Lock lock{mutex};
899 
900  // the only function call that could be a cancellation point
901  // within this pthread_cleanup_push/pop block is the call to
902  // Cond::wait() - if there is a cancellation request while
903  // blocking in that call we need to decrement the waiters count
904  // with a cancellation handler. The push is efficient enough not
905  // to have to unlock the mutex - the cleanup_push/pop block
906  // enables the cleanup macros to construct all the cleanup datum
907  // on the function stack set up at function entry
908  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
909  while (!full && status == normal) {
910  if (!waiting) {
911  ++waiters;
912  waiting = true;
913  }
914  cond.wait(mutex);
915  }
916  Thread::CancelBlock b;
917  pthread_cleanup_pop(false);
918 
919  if (status == destructing) {
920  // decrementing waiters must be the last thing we do as it might
921  // cause the destructor to return
922  if (waiting) --waiters;
923  return false;
924  }
925  else if (full) { // status == normal, or status == closed && full
926  obj = std::move(*datum);
927  full = false;
928  try {
929  datum->~T();
930  }
931  catch (...) {
932  // we might as well keep the AsyncChannel object's waiters in
933  // a workable state even if the item's destructor throws
934  if (waiting) --waiters;
935  cond.broadcast();
936  throw;
937  }
938  if (waiting) --waiters;
939  cond.broadcast();
940  return true;
941  }
942  else { // status == closed and !full
943  if (waiting) --waiters;
944  return false;
945  }
946  }
947 
948  AsyncChannel(): waiters(0), full(false), status(normal),
949  // locality would be better if we could use an
950  // object-resident char array for 'datum' with
951  // alignas(T), instead of allocating on the heap
952  // with malloc, but this is not supported by gcc
953  // until gcc-4.8, and we support gcc-4.6 onwards.
954  // Sometimes unions are used in pre-C++11 code to
955  // get round this, but this is only guaranteed to
956  // work where T is a POD (C++98/03) or has standard
957  // layout (C++11/14). Bummer.
958  datum(static_cast<T*>(std::malloc(sizeof(T)))) {
959  if (!datum) throw std::bad_alloc();
960  }
961 
962  ~AsyncChannel() {
963  mutex.lock();
964  status = destructing;
965  mutex.unlock();
966  cond.broadcast();
967 
968  // since all a waiting thread does upon status == destructing is
969  // to unblock and return, it is more efficient to spin gracefully
970  // until 'waiters' is 0 instead of starting another condition
971  // variable wait
972  for (;;) {
973  mutex.lock();
974  if (waiters) {
975  mutex.unlock();
976 #ifdef CGU_USE_SCHED_YIELD
977  sched_yield();
978 #else
979  usleep(10);
980 #endif
981  }
982  else {
983  mutex.unlock();
984  break;
985  }
986  }
987  if (full) datum->~T();
988  std::free(datum);
989  }
990 
992 };
993 
994 #endif // DOXYGEN_PARSING
995 
996 } // namespace Cgu
997 
998 #endif // CGU_ASYNC_CHANNEL_H
Cgu
Definition: application.h:44
Cgu::AsyncChannel::operator=
AsyncChannel & operator=(const AsyncChannel &)=delete
Cgu::Thread::Cond::broadcast
int broadcast()
Definition: mutex.h:483
Cgu::AsyncChannel::size_type
std::size_t size_type
Definition: async_channel.h:192
Cgu::Thread::Cond
A wrapper class for pthread condition variables.
Definition: mutex.h:449
Cgu::AsyncChannel::~AsyncChannel
~AsyncChannel()
Definition: async_channel.h:664
Cgu::AsyncChannel::pop
bool pop(value_type &obj)
Definition: async_channel.h:464
Cgu::AsyncChannel
A thread-safe "channel" class for inter-thread communication.
Definition: async_channel.h:189
Cgu::Thread::Cond::wait
int wait(Mutex &mutex)
Definition: mutex.h:508
Cgu::AsyncChannel::emplace
bool emplace(Args &&... args)
Definition: async_channel.h:402
Cgu::AsyncChannel::push
bool push(const value_type &obj)
Definition: async_channel.h:278
Cgu::Thread::Mutex::lock
int lock()
Definition: mutex.h:147
Cgu::AsyncChannel::AsyncChannel
AsyncChannel()
Definition: async_channel.h:636
Cgu::Thread::Mutex::Lock
A scoped locking class for exception safe Mutex locking.
Definition: mutex.h:207
CGU_GLIB_MEMORY_SLICES_FUNCS
#define CGU_GLIB_MEMORY_SLICES_FUNCS
Definition: cgu_config.h:84
Cgu::cgu_async_channel_waiters_dec
void cgu_async_channel_waiters_dec(void *arg)
Definition: async_channel.h:98
Cgu::Thread::CancelBlock
A class enabling the cancellation state of a thread to be controlled.
Definition: thread.h:686
mutex.h
Provides wrapper classes for pthread mutexes and condition variables, and scoped locking classes for ...
Cgu::AsyncChannel::move_pop
bool move_pop(value_type &obj)
Definition: async_channel.h:554
Cgu::Thread::Mutex::unlock
int unlock()
Definition: mutex.h:170
Cgu::AsyncChannel::close
void close()
Definition: async_channel.h:247
Cgu::AsyncChannel::value_type
T value_type
Definition: async_channel.h:191
thread.h
Cgu::AsyncChannel::push
bool push(value_type &&obj)
Definition: async_channel.h:339
Cgu::Thread::Mutex
A wrapper class for pthread mutexes.
Definition: mutex.h:117
cgu_config.h