c++-gtk-utils
async_channel.h
Go to the documentation of this file.
1 /* Copyright (C) 2016 and 2020 Chris Vine
2 
3 The library comprised in this file or of which this file is part is
4 distributed by Chris Vine under the GNU Lesser General Public
5 License as follows:
6 
7  This library is free software; you can redistribute it and/or
8  modify it under the terms of the GNU Lesser General Public License
9  as published by the Free Software Foundation; either version 2.1 of
10  the License, or (at your option) any later version.
11 
12  This library is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License, version 2.1, for more details.
16 
17  You should have received a copy of the GNU Lesser General Public
18  License, version 2.1, along with this library (see the file LGPL.TXT
19  which came with this source code package in the c++-gtk-utils
20  sub-directory); if not, write to the Free Software Foundation, Inc.,
21  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 
23 However, it is not intended that the object code of a program whose
24 source code instantiates a template from this file or uses macros or
25 inline functions (of any length) should by reason only of that
26 instantiation or use be subject to the restrictions of use in the GNU
27 Lesser General Public License. With that in mind, the words "and
28 macros, inline functions and instantiations of templates (of any
29 length)" shall be treated as substituted for the words "and small
30 macros and small inline functions (ten lines or less in length)" in
31 the fourth paragraph of section 5 of that licence. This does not
32 affect any other reason why object code may be subject to the
33 restrictions in that licence (nor for the avoidance of doubt does it
34 affect the application of section 2 of that licence to modifications
35 of the source code in this file).
36 
37 */
38 
39 /**
40  * @file async_channel.h
41  * @brief This file provides a "channel" class for inter-thread communication.
42  *
43  * AsyncChannel is similar to the AsyncQueueDispatch class, in that it
44  * provides a means of sending data between threads. Producer threads
45  * push data onto the queue and consumer threads pop them off in a
46  * thread safe way, and if there are no data in the channel any
47  * consumer thread will block until a producer thread pushes an item
48  * onto it. However, unlike the AsyncQueueDispatch class, it has a
49  * fixed maximum size as part of its type, which may be any size
50  * greater than 0, and if the number of data items still in the
51  * channel is such as to make the channel full, then producer threads
52  * will block on the channel until a consumer thread pops an item from
53  * it.
54  *
55  * It therefore provides for back pressure on producer threads which
56  * will automatically prevent the channel being overwhelmed by
57  * producer threads pushing more items onto the queue than consumer
58  * threads have the capacity to take off it.
59  *
60  * AsyncChannel is useful where this feature is important, and an
61  * AsyncChannel object can be used with any number of producer threads
62  * and consumer threads. However, under heavy contention with complex
63  * data item types AsyncQueueDispatch objects will usually be faster.
64  * Under lower contention and with simpler data types (or where
65  * temporary objects with non-complex move constructors are pushed),
66  * an AsyncChannel object may be faster as it can benefit from its
67  * fixed buffer size (the AsyncChannel implementation uses a circular
68  * buffer with in-buffer construction of data items).
69  *
70  * This class is available from version 2.2.14.
71  */
72 
73 #ifndef CGU_ASYNC_CHANNEL_H
74 #define CGU_ASYNC_CHANNEL_H
75 
76 #include <utility> // for std::move and std::forward
77 #include <new> // for std::bad_alloc and __cpp_lib_launder
78 #include <cstddef> // for std::size_t
79 #include <cstdlib> // for std::malloc
80 
81 #include <pthread.h>
82 
83 #include <c++-gtk-utils/mutex.h>
84 #include <c++-gtk-utils/thread.h>
86 
87 #ifdef CGU_USE_SCHED_YIELD
88 #include <sched.h>
89 #else
90 #include <unistd.h>
91 #endif
92 
93 namespace Cgu {
94 
95 // function for pthread_cleanup_push() in AsyncChannel::push() and
96 // AsyncChannel::pop() methods
97 extern "C" {
98 inline void cgu_async_channel_waiters_dec(void* arg) {
99  --(*static_cast<std::size_t*>(arg));
100 }
101 } // extern "C"
102 
103 /**
104  * @class AsyncChannel async_channel.h c++-gtk-utils/async_channel.h
105  * @brief A thread-safe "channel" class for inter-thread communication.
106  * @sa AsyncQueue AsyncQueueDispatch AsyncChannel AsyncResult
107  *
108  * AsyncChannel is similar to the AsyncQueueDispatch class, in that it
109  * provides a means of sending data between threads. Producer threads
110  * push data onto the queue and consumer threads pop them off in a
111  * thread safe way, and if there are no data in the channel any
112  * consumer thread will block until a producer thread pushes an item
113  * onto it. However, unlike the AsyncQueueDispatch class, it has a
114  * fixed maximum size as part of its type, which may be any size
115  * greater than 0, and if the number of data items still in the
116  * channel is such as to make the channel full, then producer threads
117  * will block on the channel until a consumer thread pops an item from
118  * it.
119  *
120  * It therefore provides for back pressure on producer threads which
121  * will automatically prevent the channel being overwhelmed by
122  * producer threads pushing more items onto the queue than consumer
123  * threads have the capacity to take off it.
124  *
125  * AsyncChannel is useful where this feature is important, and an
126  * AsyncChannel object can be used with any number of producer threads
127  * and consumer threads. However, under heavy contention with complex
128  * data item types AsyncQueueDispatch objects will usually be faster.
129  * Under lower contention and with simpler data types (or where
130  * temporary objects with non-complex move constructors are pushed),
131  * an AsyncChannel object may be faster as it can benefit from its
132  * fixed buffer size (the AsyncChannel implementation uses a circular
133  * buffer with in-buffer construction of data items).
134  *
135  * AsyncChannel objects are instantiated with firstly a template type
136  * 'T' and secondly a template integer value 'n'. 'T' is the type of
137  * the data items to be placed on the queue. 'n' is the size of the
138  * channel, which as mentioned may be any size greater than 0.
139  *
140  * This class is available from version 2.2.14.
141  */
142 
143 /*
144  * We have to use Thread::Cond::broadcast() and not
145  * Thread::Cond::signal(), because it is possible to have at any one
146  * time both a producer and a consumer waiting on the AsyncChannel's
147  * condition variable. This can create a "thundering herd" problem if
148  * there are a large number of threads waiting on the channel, but it
149  * is within the range of the acceptable. As an example of the issue
150  * requiring this approach, take this case:
151  *
152  * Let there be a channel which has a capacity of one. Let the
153  * channel already have an item in it from some past producer. In
154  * addition, let two producer threads be currently in a cond-wait,
155  * waiting for the channel to become empty in order to put an item in
156  * it.
157  *
158  * A first consumer thread starts to remove the item in the channel.
159  * It acquires the channel's mutex without blocking because it is not
160  * locked. It sees there is an item in the channel so does not begin
161  * a cond-wait. Meanwhile, immediately after the first consumer
162  * thread acquires the mutex a second consumer thread tries to obtain
163  * an item from the channel and thus will block when trying to acquire
164  * the locked mutex.
165  *
166  * The first consumer thread then removes the item from the channel's
167  * queue and signals the cond-var - which causes one of the producer
168  * threads to wake-up and block on trying to acquire the mutex ("the
169  * first producer"). So for the producers, we now have the first
170  * producer contending on the mutex after being so awoken and the
171  * second producer still waiting on the cond-var. And we have two
172  * threads now contending on the mutex - the thread of the second
173  * consumer and the thread of the first producer. One of these
174  * threads will acquire the mutex when the first consumer releases it
175  * after signalling the cond-var, and it is unspecified which one.
176  *
177  * If it is the second consumer, it will find the channel empty and
178  * thus enter a cond-wait (so we now have the second producer and the
179  * second consumer waiting on the same cond-var) and release the
180  * mutex. This will cause the first producer to acquire the mutex,
181  * which will then add the item to the channel, signal the cond-var
182  * (which will cause either the second producer or second consumer to
183  * awaken), and release the mutex. If it is the second producer which
184  * awakens, its thread will acquire the mutex, find the channel full,
185  * enter a cond-wait again and release the mutex. Now we are stuffed
186  * because there is nothing to awaken the remaining consumer even
187  * though there is something in the channel.
188  */
189 template <class T, std::size_t n> class AsyncChannel {
190 public:
191  typedef T value_type;
192  typedef std::size_t size_type;
193 private:
194  mutable Thread::Mutex mutex;
195  Thread::Cond cond;
196  size_type size; // number of available items in channel
197  size_type idx; // index of first available item in channel (this
198  // value is meaningless when size == 0)
199  size_type waiters;
200  enum Status {normal, closed, destructing} status;
201  // TODO: when this library moves to a minimum requirement of
202  // gcc-4.8, use an object-resident char array for 'buf' with
203  // alignas<T>, to improve locality
204  T* buf;
205 
206 public:
207 /**
208  * This class cannot be copied. The copy constructor is deleted.
209  */
210  AsyncChannel(const AsyncChannel&) = delete;
211 
212 /**
213  * This class cannot be copied. The assignment operator is deleted.
214  */
216 
217 /**
218  * Closes the channel. This means that (i) any threads blocking on a
219  * full channel with a call to push() or emplace() will unblock with a
220  * false return value, and any calls to those methods after the
221  * closure will return immediately with a false return value, (ii) any
222  * threads blocking on an empty channel with calls to pop() or
223  * move_pop() will unblock with a false return value, (iii) any data
224  * items remaining in the channel which were pushed to the channel
225  * prior to the closure of the channel can be popped after that
226  * closure by further calls to pop() or move_pop(), which will return
227  * normally with a true return value, and (iv) after any such
228  * remaining data items have been removed, any subsequent calls to
229  * pop() or move_pop() will return with a false return value.
230  *
231  * If called more than once, this method will do nothing.
232  *
233  * This method will not throw. It is thread safe - any thread may
234  * call it.
235  *
236  * One of the main purposes of this method is to enable a producer
237  * thread to inform a consumer thread that nothing more will be put in
238  * the channel by it for the consumer: for such cases, as mentioned
239  * once everything pushed to the channel prior to its closure has been
240  * extracted from the channel by pop() or move_pop() calls, any
241  * further pop() or move_pop() calls will return false. At that point
242  * the consumer thread can abandon and destroy the AsyncChannel
243  * object.
244  *
245  * Since 2.0.31 and 2.2.14
246  */
247  void close() noexcept {
248  Thread::Mutex::Lock lock{mutex};
249  if (status == normal) {
250  status = closed;
251  cond.broadcast();
252  }
253  }
254 
255 /**
256  * Pushes an item onto the channel. This method will only throw if the
257  * copy constructor of the pushed item throws, and has strong
258  * exception safety in such a case. It is thread safe.
259  *
260  * If the number of items already in the channel is equal to the size
261  * of the channel, then this method blocks until either room becomes
262  * available for the item by virtue of another thread calling the
263  * pop() or move_pop() methods, or another thread calls the close()
264  * method. If it blocks, the wait comprises a cancellation
265  * point. This method is cancellation safe if the stack unwinds on
266  * cancellation, as cancellation is blocked while the channel is being
267  * operated on after coming out of a wait.
268  *
269  * @param obj The item to be pushed onto the channel.
270  * @return If the push succeeds (whether after blocking or not
271  * blocking) this method returns true. If this method unblocks
272  * because the channel has been closed, or any subsequent calls to
273  * this method are made after the channel has been closed, this method
274  * returns false.
275  *
276  * Since 2.0.31 and 2.2.14
277  */
278  bool push(const value_type& obj) {
279  bool waiting = false;
280  Thread::Mutex::Lock lock{mutex};
281  if (status != normal) return false;
282 
283  // the only function call that could be a cancellation point
284  // within this pthread_cleanup_push/pop block is the call to
285  // Cond::wait() - if there is a cancellation request while
286  // blocking in that call we need to decrement the waiters count
287  // with a cancellation handler. The push is efficient enough not
288  // to have to unlock the mutex - the cleanup_push/pop block
289  // enables the cleanup macros to construct all the cleanup datum
290  // on the function stack set up at function entry
291  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
292  while (size >= n && status == normal) {
293  if (!waiting) {
294  ++waiters;
295  waiting = true;
296  }
297  cond.wait(mutex); // cancellation point
298  }
300  pthread_cleanup_pop(false);
301  // we need to keep a local copy of 'status' because as soon as
302  // 'waiters' is decremented the AsyncChannel object could be
303  // destroyed if status == destructing
304  Status local_status = status;
305  if (waiting) --waiters;
306  if (local_status != normal) return false;
307  // next is the index of the next available space in the channel
308  size_type next = (idx + size) % n;
309  new (static_cast<void*>(buf + next)) T{obj};
310  ++size;
311  cond.broadcast();
312  return true;
313  }
314 
315 /**
316  * Pushes an item onto the channel. This method will only throw if the
317  * move constructor, or if none the copy constructor, of the pushed
318  * item throws, and has strong exception safety in such a case. It is
319  * thread safe.
320  *
321  * If the number of items already in the channel is equal to the size
322  * of the channel, then this method blocks until either room becomes
323  * available for the item by virtue of another thread calling the
324  * pop() or move_pop() methods, or another thread calls the close()
325  * method. If it blocks, the wait comprises a cancellation
326  * point. This method is cancellation safe if the stack unwinds on
327  * cancellation, as cancellation is blocked while the channel is being
328  * operated on after coming out of a wait.
329  *
330  * @param obj The item to be pushed onto the channel.
331  * @return If the push succeeds (whether after blocking or not
332  * blocking) this method returns true. If this method unblocks
333  * because the channel has been closed, or any subsequent calls to
334  * this method are made after the channel has been closed, this method
335  * returns false.
336  *
337  * Since 2.0.31 and 2.2.14
338  */
339  bool push(value_type&& obj) {
340  bool waiting = false;
341  Thread::Mutex::Lock lock{mutex};
342  if (status != normal) return false;
343 
344  // the only function call that could be a cancellation point
345  // within this pthread_cleanup_push/pop block is the call to
346  // Cond::wait() - if there is a cancellation request while
347  // blocking in that call we need to decrement the waiters count
348  // with a cancellation handler. The push is efficient enough not
349  // to have to unlock the mutex - the cleanup_push/pop block
350  // enables the cleanup macros to construct all the cleanup datum
351  // on the function stack set up at function entry
352  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
353  while (size >= n && status == normal) {
354  if (!waiting) {
355  ++waiters;
356  waiting = true;
357  }
358  cond.wait(mutex); // cancellation point
359  }
361  pthread_cleanup_pop(false);
362  // we need to keep a local copy of 'status' because as soon as
363  // 'waiters' is decremented the AsyncChannel object could be
364  // destroyed if status == destructing
365  Status local_status = status;
366  if (waiting) --waiters;
367  if (local_status != normal) return false;
368  // next is the index of the next available space in the channel
369  size_type next = (idx + size) % n;
370  new (static_cast<void*>(buf + next)) T{std::move(obj)};
371  ++size;
372  cond.broadcast();
373  return true;
374  }
375 
376 /**
377  * Pushes an item onto the channel by constructing it in place from
378  * the given arguments. This method will only throw if the constructor
379  * of the emplaced item throws, and has strong exception safety in
380  * such a case. It is thread safe.
381  *
382  * If the number of items already in the channel is equal to the size
383  * of the channel, then this method blocks until either room becomes
384  * available for the item by virtue of another thread calling the
385  * pop() or move_pop() methods, or another thread calls the close()
386  * method. If it blocks, the wait comprises a cancellation
387  * point. This method is cancellation safe if the stack unwinds on
388  * cancellation, as cancellation is blocked while the channel is being
389  * operated on after coming out of a wait.
390  *
391  * @param args The constructor arguments for the item to be pushed
392  * onto the channel.
393  * @return If the push succeeds (whether after blocking or not
394  * blocking) this method returns true. If this method unblocks
395  * because the channel has been closed, or any subsequent calls to
396  * this method are made after the channel has been closed, this method
397  * returns false.
398  *
399  * Since 2.0.31 and 2.2.14
400  */
401  template<class... Args>
402  bool emplace(Args&&... args) {
403  bool waiting = false;
404  Thread::Mutex::Lock lock{mutex};
405  if (status != normal) return false;
406 
407  // the only function call that could be a cancellation point
408  // within this pthread_cleanup_push/pop block is the call to
409  // Cond::wait() - if there is a cancellation request while
410  // blocking in that call we need to decrement the waiters count
411  // with a cancellation handler. The push is efficient enough not
412  // to have to unlock the mutex - the cleanup_push/pop block
413  // enables the cleanup macros to construct all the cleanup datum
414  // on the function stack set up at function entry
415  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
416  while (size >= n && status == normal) {
417  if (!waiting) {
418  ++waiters;
419  waiting = true;
420  }
421  cond.wait(mutex); // cancellation point
422  }
424  pthread_cleanup_pop(false);
425  // we need to keep a local copy of 'status' because as soon as
426  // 'waiters' is decremented the AsyncChannel object could be
427  // destroyed if status == destructing
428  Status local_status = status;
429  if (waiting) --waiters;
430  if (local_status != normal) return false;
431  // next is the index of the next available space in the channel
432  size_type next = (idx + size) % n;
433  new (static_cast<void*>(buf + next)) T{std::forward<Args>(args)...};
434  ++size;
435  cond.broadcast();
436  return true;
437  }
438 
439 /**
440  * Pops an item from the channel using the item type's copy assignment
441  * operator. This method will only throw if that operator throws or
442  * the contained item's destructor throws. It has strong exception
443  * safety, provided the destructor of the contained item does not
444  * throw. See also the move_pop() method. It is thread safe.
445  *
446  * If the channel is empty, then this method blocks until either an
447  * item becomes available by virtue of another thread calling the
448  * emplace() or push() methods, or another thread calls the close()
449  * method. If it blocks, the wait comprises a cancellation
450  * point. This method is cancellation safe if the stack unwinds on
451  * cancellation, as cancellation is blocked while the channel is being
452  * operated on after coming out of a wait.
453  *
454  * @param obj A value type reference to which the item at the front of
455  * the channel will be copy assigned.
456  * @return If the pop succeeds (whether after blocking or not
457  * blocking) this method returns true. If this method unblocks
458  * because the channel has been closed or any subsequent calls to this
459  * method are made, and there are no remaining items in the channel,
460  * this method returns false.
461  *
462  * Since 2.0.31 and 2.2.14
463  */
464  bool pop(value_type& obj) {
465  bool waiting = false;
466  Thread::Mutex::Lock lock{mutex};
467 
468  // the only function call that could be a cancellation point
469  // within this pthread_cleanup_push/pop block is the call to
470  // Cond::wait() - if there is a cancellation request while
471  // blocking in that call we need to decrement the waiters count
472  // with a cancellation handler. The push is efficient enough not
473  // to have to unlock the mutex - the cleanup_push/pop block
474  // enables the cleanup macros to construct all the cleanup datum
475  // on the function stack set up at function entry
476  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
477  while (!size && status == normal) {
478  if (!waiting) {
479  ++waiters;
480  waiting = true;
481  }
482  cond.wait(mutex);
483  }
485  pthread_cleanup_pop(false);
486 
487  if (status == destructing) {
488  // decrementing waiters must be the last thing we do as it might
489  // cause the destructor to return
490  if (waiting) --waiters;
491  return false;
492  }
493  else if (size) { // status == normal, or status == closed && size > 0
494  // if this library is employed in a program compiled in c++17 or
495  // greater, permit objects with references or const members to
496  // be stored in AsyncChannel objects via std::launder
497 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
498  T* ptr = std::launder(buf + idx);
499 #else
500  T* ptr = buf + idx;
501 #endif
502  obj = *ptr;
503  ++idx;
504  if (idx == n) idx = 0;
505  --size;
506  try {
507  ptr->~T();
508  }
509  catch (...) {
510  // we might as well keep the AsyncChannel object's waiters in
511  // a workable state even if the item's destructor throws
512  if (waiting) --waiters;
513  cond.broadcast();
514  throw;
515  }
516  if (waiting) --waiters;
517  cond.broadcast();
518  return true;
519  }
520  else { // status == closed and size == 0
521  if (waiting) --waiters;
522  return false;
523  }
524  }
525 
526 /**
527  * Pops an item from the channel using the item type's move assignment
528  * operator if it has one, or if not its copy assignment operator
529  * (this method is identical to the pop() method if that type has no
530  * move assignment operator). This method will only throw if that
531  * operator throws or the contained item's destructor throws. It has
532  * strong exception safety, provided the destructor of the contained
533  * item does not throw and the move assignment operator of the
534  * contained item has strong exception safety. Use this method in
535  * preference to the pop() method if it is known that the contained
536  * items' move assignment operator does not throw or is strongly
537  * exception safe, or if the use case does not require strong
538  * exception safety. This method must be used in place of the pop()
539  * method if the contained item has a move assignment operator but no
540  * copy assignment operator (such as a std::unique_ptr object). It is
541  * thread safe.
542  *
543  * If the channel is empty, then this method blocks until either an
544  * item becomes available by virtue of another thread calling the
545  * emplace() or push() methods, or another thread calls the close()
546  * method. If it blocks, the wait comprises a cancellation
547  * point. This method is cancellation safe if the stack unwinds on
548  * cancellation, as cancellation is blocked while the channel is being
549  * operated on after coming out of a wait.
550  *
551  * @param obj A value type reference to which the item at the front of
552  * the channel will be move assigned.
553  * @return If the pop succeeds (whether after blocking or not
554  * blocking) this method returns true. If this method unblocks
555  * because the channel has been closed or any subsequent calls to this
556  * method are made, and there are no remaining items in the channel,
557  * this method returns false.
558  *
559  * Since 2.0.31 and 2.2.14
560  */
561  bool move_pop(value_type& obj) {
562  bool waiting = false;
563  Thread::Mutex::Lock lock{mutex};
564 
565  // the only function call that could be a cancellation point
566  // within this pthread_cleanup_push/pop block is the call to
567  // Cond::wait() - if there is a cancellation request while
568  // blocking in that call we need to decrement the waiters count
569  // with a cancellation handler. The push is efficient enough not
570  // to have to unlock the mutex - the cleanup_push/pop block
571  // enables the cleanup macros to construct all the cleanup datum
572  // on the function stack set up at function entry
573  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
574  while (!size && status == normal) {
575  if (!waiting) {
576  ++waiters;
577  waiting = true;
578  }
579  cond.wait(mutex);
580  }
582  pthread_cleanup_pop(false);
583 
584  if (status == destructing) {
585  // decrementing waiters must be the last thing we do as it might
586  // cause the destructor to return
587  if (waiting) --waiters;
588  return false;
589  }
590  else if (size) { // status == normal, or status == closed && size > 0
591  // if this library is employed in a program compiled in c++17 or
592  // greater, permit objects with references or const members to
593  // be stored in AsyncChannel objects via std::launder
594 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
595  T* ptr = std::launder(buf + idx);
596 #else
597  T* ptr = buf + idx;
598 #endif
599  obj = std::move(*ptr);
600  ++idx;
601  if (idx == n) idx = 0;
602  --size;
603  try {
604  ptr->~T();
605  }
606  catch (...) {
607  // we might as well keep the AsyncChannel object's waiters in
608  // a workable state even if the item's destructor throws
609  if (waiting) --waiters;
610  cond.broadcast();
611  throw;
612  }
613  if (waiting) --waiters;
614  cond.broadcast();
615  return true;
616  }
617  else { // status == closed and size == 0
618  if (waiting) --waiters;
619  return false;
620  }
621  }
622 
623 /**
624  * AsyncChannel objects are instantiated with firstly a template type
625  * 'T' and secondly a template integer value 'n'. 'T' is the type of
626  * the data items to be placed on the queue. 'n' is the size of the
627  * channel, which must be greater than 0. However, a circular buffer
628  * for that size will be allocated at construction time by this
629  * default constructor, so the given size should not be unnecessarily
630  * large. Where a large AsyncChannel object would be required,
631  * consider using AsyncQueueDispatch instead, which sizes itself
632  * dynamically.
633  *
634  * @exception std::bad_alloc The default constructor might throw this
635  * exception if memory is exhausted and the system throws in that
636  * case.
637  * @exception Thread::MutexError The default constructor might throw
638  * this exception if initialisation of the contained mutex fails. (It
639  * is often not worth checking for this, as it means either memory is
640  * exhausted or pthread has run out of other resources to create new
641  * mutexes.)
642  * @exception Thread::CondError The default constructor might throw
643  * this exception if initialisation of the contained condition
644  * variable fails. (It is often not worth checking for this, as it
645  * means either memory is exhausted or pthread has run out of other
646  * resources to create new condition variables.)
647  *
648  * Since 2.0.31 and 2.2.14
649  */
650  AsyncChannel(): size(0), idx(0), waiters(0), status(normal),
651  // locality would be better if we could use an
652  // object-resident char array for 'buf' with
653  // alignas(T), instead of allocating on the heap
654  // with malloc, but this is not supported by gcc
655  // until gcc-4.8, and we support gcc-4.6 onwards.
656  // Sometimes unions are used in pre-C++11 code to
657  // get round this, but this is only guaranteed to
658  // work where T is a POD (C++98/03) or has standard
659  // layout (C++11/14). Bummer.
660  buf(static_cast<T*>(std::malloc(sizeof(T) * n))) {
661  static_assert(n != 0, "AsyncChannel objects may not be created with size 0");
662  if (!buf) throw std::bad_alloc();
663  }
664 
665  /**
666  * The destructor does not throw unless the destructor of a data item
667  * in the channel throws. It is thread safe (any thread may delete
668  * the AsyncChannel object).
669  *
670  * It is not an error for a thread to destroy the AsyncChannel object
671  * and so invoke this destructor while another thread is blocking on
672  * it: instead the destructor will release any blocking threads. The
673  * destructor will not return until all threads (if any) blocking on
674  * the AsyncChannel object have been released.
675  *
676  * Since 2.0.31 and 2.2.14
677  */
679  mutex.lock();
680  status = destructing;
681  mutex.unlock();
682  cond.broadcast();
683 
684  // since all a waiting thread does upon status == destructing is
685  // to unblock and return, it is more efficient to spin gracefully
686  // until 'waiters' is 0 instead of starting another condition
687  // variable wait
688  for (;;) {
689  mutex.lock();
690  if (waiters) {
691  mutex.unlock();
692 #ifdef CGU_USE_SCHED_YIELD
693  sched_yield();
694 #else
695  usleep(10);
696 #endif
697  }
698  else {
699  mutex.unlock();
700  break;
701  }
702  }
703  while (size) {
704 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
705  T* ptr = std::launder(buf + idx);
706 #else
707  T* ptr = buf + idx;
708 #endif
709  ptr->~T();
710  ++idx;
711  if (idx == n) idx = 0;
712  --size;
713  }
714  std::free(buf);
715  }
716 
717 /* Only has effect if --with-glib-memory-slices-compat or
718  * --with-glib-memory-slices-no-compat option picked */
720 };
721 
722 #ifndef DOXYGEN_PARSING
723 
724 /* This is a specialization of AsyncChannel when instantiated with a
725  size of 1. This specialization allows a number of optimizations
726  for that case.
727 */
728 template <class T>
729 class AsyncChannel<T, 1> {
730 public:
731  typedef T value_type;
732  typedef std::size_t size_type;
733 private:
734  mutable Thread::Mutex mutex;
735  Thread::Cond cond;
736  size_type waiters;
737  bool full;
738  enum Status {normal, closed, destructing} status;
739  // TODO: when this library moves to a minimum requirement of
740  // gcc-4.8, use an object-resident char array for 'datum' with
741  // alignas<T>, to improve locality
742  T* datum;
743 
744 public:
745  AsyncChannel(const AsyncChannel&) = delete;
746 
747  AsyncChannel& operator=(const AsyncChannel&) = delete;
748 
749  void close() noexcept {
750  Thread::Mutex::Lock lock{mutex};
751  if (status == normal) {
752  status = closed;
753  cond.broadcast();
754  }
755  }
756 
757  bool push(const value_type& obj) {
758  bool waiting = false;
759  Thread::Mutex::Lock lock{mutex};
760  if (status != normal) return false;
761 
762  // the only function call that could be a cancellation point
763  // within this pthread_cleanup_push/pop block is the call to
764  // Cond::wait() - if there is a cancellation request while
765  // blocking in that call we need to decrement the waiters count
766  // with a cancellation handler. The push is efficient enough not
767  // to have to unlock the mutex - the cleanup_push/pop block
768  // enables the cleanup macros to construct all the cleanup datum
769  // on the function stack set up at function entry
770  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
771  while (full && status == normal) {
772  if (!waiting) {
773  ++waiters;
774  waiting = true;
775  }
776  cond.wait(mutex); // cancellation point
777  }
778  Thread::CancelBlock b;
779  pthread_cleanup_pop(false);
780  // we need to keep a local copy of 'status' because as soon as
781  // 'waiters' is decremented the AsyncChannel object could be
782  // destroyed if status == destructing
783  Status local_status = status;
784  if (waiting) --waiters;
785  if (local_status != normal) return false;
786  new (static_cast<void*>(datum)) T{obj};
787  full = true;
788  cond.broadcast();
789  return true;
790  }
791 
792  bool push(value_type&& obj) {
793  bool waiting = false;
794  Thread::Mutex::Lock lock{mutex};
795  if (status != normal) return false;
796 
797  // the only function call that could be a cancellation point
798  // within this pthread_cleanup_push/pop block is the call to
799  // Cond::wait() - if there is a cancellation request while
800  // blocking in that call we need to decrement the waiters count
801  // with a cancellation handler. The push is efficient enough not
802  // to have to unlock the mutex - the cleanup_push/pop block
803  // enables the cleanup macros to construct all the cleanup datum
804  // on the function stack set up at function entry
805  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
806  while (full && status == normal) {
807  if (!waiting) {
808  ++waiters;
809  waiting = true;
810  }
811  cond.wait(mutex); // cancellation point
812  }
813  Thread::CancelBlock b;
814  pthread_cleanup_pop(false);
815  // we need to keep a local copy of 'status' because as soon as
816  // 'waiters' is decremented the AsyncChannel object could be
817  // destroyed if status == destructing
818  Status local_status = status;
819  if (waiting) --waiters;
820  if (local_status != normal) return false;
821  new (static_cast<void*>(datum)) T{std::move(obj)};
822  full = true;
823  cond.broadcast();
824  return true;
825  }
826 
827  template<class... Args>
828  bool emplace(Args&&... args) {
829  bool waiting = false;
830  Thread::Mutex::Lock lock{mutex};
831  if (status != normal) return false;
832 
833  // the only function call that could be a cancellation point
834  // within this pthread_cleanup_push/pop block is the call to
835  // Cond::wait() - if there is a cancellation request while
836  // blocking in that call we need to decrement the waiters count
837  // with a cancellation handler. The push is efficient enough not
838  // to have to unlock the mutex - the cleanup_push/pop block
839  // enables the cleanup macros to construct all the cleanup datum
840  // on the function stack set up at function entry
841  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
842  while (full && status == normal) {
843  if (!waiting) {
844  ++waiters;
845  waiting = true;
846  }
847  cond.wait(mutex); // cancellation point
848  }
849  Thread::CancelBlock b;
850  pthread_cleanup_pop(false);
851  // we need to keep a local copy of 'status' because as soon as
852  // 'waiters' is decremented the AsyncChannel object could be
853  // destroyed if status == destructing
854  Status local_status = status;
855  if (waiting) --waiters;
856  if (local_status != normal) return false;
857  new (static_cast<void*>(datum)) T{std::forward<Args>(args)...};
858  full = true;
859  cond.broadcast();
860  return true;
861  }
862 
863  bool pop(value_type& obj) {
864  bool waiting = false;
865  Thread::Mutex::Lock lock{mutex};
866 
867  // the only function call that could be a cancellation point
868  // within this pthread_cleanup_push/pop block is the call to
869  // Cond::wait() - if there is a cancellation request while
870  // blocking in that call we need to decrement the waiters count
871  // with a cancellation handler. The push is efficient enough not
872  // to have to unlock the mutex - the cleanup_push/pop block
873  // enables the cleanup macros to construct all the cleanup datum
874  // on the function stack set up at function entry
875  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
876  while (!full && status == normal) {
877  if (!waiting) {
878  ++waiters;
879  waiting = true;
880  }
881  cond.wait(mutex);
882  }
883  Thread::CancelBlock b;
884  pthread_cleanup_pop(false);
885 
886  if (status == destructing) {
887  // decrementing waiters must be the last thing we do as it might
888  // cause the destructor to return
889  if (waiting) --waiters;
890  return false;
891  }
892  else if (full) { // status == normal, or status == closed && full
893  // if this library is employed in a program compiled in c++17 or
894  // greater, permit objects with references or const members to
895  // be stored in AsyncChannel objects via std::launder
896 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
897  T* ptr = std::launder(datum);
898 #else
899  T* ptr = datum;
900 #endif
901  obj = *ptr;
902  full = false;
903  try {
904  ptr->~T();
905  }
906  catch (...) {
907  // we might as well keep the AsyncChannel object's waiters in
908  // a workable state even if the item's destructor throws
909  if (waiting) --waiters;
910  cond.broadcast();
911  throw;
912  }
913  if (waiting) --waiters;
914  cond.broadcast();
915  return true;
916  }
917  else { // status == closed and !full
918  if (waiting) --waiters;
919  return false;
920  }
921  }
922 
923  bool move_pop(value_type& obj) {
924  bool waiting = false;
925  Thread::Mutex::Lock lock{mutex};
926 
927  // the only function call that could be a cancellation point
928  // within this pthread_cleanup_push/pop block is the call to
929  // Cond::wait() - if there is a cancellation request while
930  // blocking in that call we need to decrement the waiters count
931  // with a cancellation handler. The push is efficient enough not
932  // to have to unlock the mutex - the cleanup_push/pop block
933  // enables the cleanup macros to construct all the cleanup datum
934  // on the function stack set up at function entry
935  pthread_cleanup_push(cgu_async_channel_waiters_dec, &this->waiters);
936  while (!full && status == normal) {
937  if (!waiting) {
938  ++waiters;
939  waiting = true;
940  }
941  cond.wait(mutex);
942  }
943  Thread::CancelBlock b;
944  pthread_cleanup_pop(false);
945 
946  if (status == destructing) {
947  // decrementing waiters must be the last thing we do as it might
948  // cause the destructor to return
949  if (waiting) --waiters;
950  return false;
951  }
952  else if (full) { // status == normal, or status == closed && full
953 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
954  T* ptr = std::launder(datum);
955 #else
956  T* ptr = datum;
957 #endif
958  obj = std::move(*ptr);
959  full = false;
960  try {
961  ptr->~T();
962  }
963  catch (...) {
964  // we might as well keep the AsyncChannel object's waiters in
965  // a workable state even if the item's destructor throws
966  if (waiting) --waiters;
967  cond.broadcast();
968  throw;
969  }
970  if (waiting) --waiters;
971  cond.broadcast();
972  return true;
973  }
974  else { // status == closed and !full
975  if (waiting) --waiters;
976  return false;
977  }
978  }
979 
980  AsyncChannel(): waiters(0), full(false), status(normal),
981  // locality would be better if we could use an
982  // object-resident char array for 'datum' with
983  // alignas(T), instead of allocating on the heap
984  // with malloc, but this is not supported by gcc
985  // until gcc-4.8, and we support gcc-4.6 onwards.
986  // Sometimes unions are used in pre-C++11 code to
987  // get round this, but this is only guaranteed to
988  // work where T is a POD (C++98/03) or has standard
989  // layout (C++11/14). Bummer.
990  datum(static_cast<T*>(std::malloc(sizeof(T)))) {
991  if (!datum) throw std::bad_alloc();
992  }
993 
994  ~AsyncChannel() {
995  mutex.lock();
996  status = destructing;
997  mutex.unlock();
998  cond.broadcast();
999 
1000  // since all a waiting thread does upon status == destructing is
1001  // to unblock and return, it is more efficient to spin gracefully
1002  // until 'waiters' is 0 instead of starting another condition
1003  // variable wait
1004  for (;;) {
1005  mutex.lock();
1006  if (waiters) {
1007  mutex.unlock();
1008 #ifdef CGU_USE_SCHED_YIELD
1009  sched_yield();
1010 #else
1011  usleep(10);
1012 #endif
1013  }
1014  else {
1015  mutex.unlock();
1016  break;
1017  }
1018  }
1019 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
1020  if (full) std::launder(datum)->~T();
1021 #else
1022  if (full) datum->~T();
1023 #endif
1024  std::free(datum);
1025  }
1026 
1028 };
1029 
1030 #endif // DOXYGEN_PARSING
1031 
1032 } // namespace Cgu
1033 
1034 #endif // CGU_ASYNC_CHANNEL_H
Cgu
Definition: application.h:44
Cgu::Thread::Mutex::lock
int lock() noexcept
Definition: mutex.h:147
Cgu::AsyncChannel::operator=
AsyncChannel & operator=(const AsyncChannel &)=delete
Cgu::Thread::Mutex::unlock
int unlock() noexcept
Definition: mutex.h:170
Cgu::AsyncChannel::size_type
std::size_t size_type
Definition: async_channel.h:192
Cgu::Thread::Cond::broadcast
int broadcast() noexcept
Definition: mutex.h:483
Cgu::Thread::Cond
A wrapper class for pthread condition variables.
Definition: mutex.h:449
Cgu::AsyncChannel::~AsyncChannel
~AsyncChannel()
Definition: async_channel.h:678
Cgu::AsyncChannel::pop
bool pop(value_type &obj)
Definition: async_channel.h:464
Cgu::AsyncChannel
A thread-safe "channel" class for inter-thread communication.
Definition: async_channel.h:189
Cgu::AsyncChannel::AsyncChannel
AsyncChannel(const AsyncChannel &)=delete
Cgu::AsyncChannel::close
void close() noexcept
Definition: async_channel.h:247
Cgu::Thread::Cond::wait
int wait(Mutex &mutex)
Definition: mutex.h:513
Cgu::AsyncChannel::emplace
bool emplace(Args &&... args)
Definition: async_channel.h:402
Cgu::AsyncChannel::push
bool push(const value_type &obj)
Definition: async_channel.h:278
Cgu::AsyncChannel::AsyncChannel
AsyncChannel()
Definition: async_channel.h:650
Cgu::Thread::Mutex::Lock
A scoped locking class for exception safe Mutex locking.
Definition: mutex.h:207
CGU_GLIB_MEMORY_SLICES_FUNCS
#define CGU_GLIB_MEMORY_SLICES_FUNCS
Definition: cgu_config.h:84
Cgu::cgu_async_channel_waiters_dec
void cgu_async_channel_waiters_dec(void *arg)
Definition: async_channel.h:98
Cgu::Thread::CancelBlock
A class enabling the cancellation state of a thread to be controlled.
Definition: thread.h:723
mutex.h
Provides wrapper classes for pthread mutexes and condition variables, and scoped locking classes for ...
Cgu::AsyncChannel::move_pop
bool move_pop(value_type &obj)
Definition: async_channel.h:561
Cgu::AsyncChannel::value_type
T value_type
Definition: async_channel.h:191
thread.h
Cgu::AsyncChannel::push
bool push(value_type &&obj)
Definition: async_channel.h:339
Cgu::Thread::Mutex
A wrapper class for pthread mutexes.
Definition: mutex.h:117
cgu_config.h