73 #ifndef CGU_ASYNC_CHANNEL_H
74 #define CGU_ASYNC_CHANNEL_H
87 #ifdef CGU_USE_SCHED_YIELD
99 --(*
static_cast<std::size_t*
>(arg));
200 enum Status {normal, closed, destructing} status;
249 if (status == normal) {
279 bool waiting =
false;
281 if (status != normal)
return false;
292 while (size >= n && status == normal) {
300 pthread_cleanup_pop(
false);
304 Status local_status = status;
305 if (waiting) --waiters;
306 if (local_status != normal)
return false;
309 new (
static_cast<void*
>(buf + next)) T{obj};
340 bool waiting =
false;
342 if (status != normal)
return false;
353 while (size >= n && status == normal) {
361 pthread_cleanup_pop(
false);
365 Status local_status = status;
366 if (waiting) --waiters;
367 if (local_status != normal)
return false;
370 new (
static_cast<void*
>(buf + next)) T{std::move(obj)};
401 template<
class... Args>
403 bool waiting =
false;
405 if (status != normal)
return false;
416 while (size >= n && status == normal) {
424 pthread_cleanup_pop(
false);
428 Status local_status = status;
429 if (waiting) --waiters;
430 if (local_status != normal)
return false;
433 new (
static_cast<void*
>(buf + next)) T{std::forward<Args>(args)...};
465 bool waiting =
false;
477 while (!size && status == normal) {
485 pthread_cleanup_pop(
false);
487 if (status == destructing) {
490 if (waiting) --waiters;
497 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
498 T* ptr = std::launder(buf + idx);
504 if (idx == n) idx = 0;
512 if (waiting) --waiters;
516 if (waiting) --waiters;
521 if (waiting) --waiters;
562 bool waiting =
false;
574 while (!size && status == normal) {
582 pthread_cleanup_pop(
false);
584 if (status == destructing) {
587 if (waiting) --waiters;
594 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
595 T* ptr = std::launder(buf + idx);
599 obj = std::move(*ptr);
601 if (idx == n) idx = 0;
609 if (waiting) --waiters;
613 if (waiting) --waiters;
618 if (waiting) --waiters;
660 buf(static_cast<T*>(std::malloc(sizeof(T) * n))) {
661 static_assert(n != 0,
"AsyncChannel objects may not be created with size 0");
662 if (!buf)
throw std::bad_alloc();
680 status = destructing;
692 #ifdef CGU_USE_SCHED_YIELD
704 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
705 T* ptr = std::launder(buf + idx);
711 if (idx == n) idx = 0;
722 #ifndef DOXYGEN_PARSING
729 class AsyncChannel<T, 1> {
734 mutable Thread::Mutex mutex;
738 enum Status {normal, closed, destructing} status;
749 void close() noexcept {
750 Thread::Mutex::Lock lock{mutex};
751 if (status == normal) {
758 bool waiting =
false;
759 Thread::Mutex::Lock lock{mutex};
760 if (status != normal)
return false;
771 while (full && status == normal) {
778 Thread::CancelBlock b;
779 pthread_cleanup_pop(
false);
783 Status local_status = status;
784 if (waiting) --waiters;
785 if (local_status != normal)
return false;
786 new (
static_cast<void*
>(datum)) T{obj};
793 bool waiting =
false;
794 Thread::Mutex::Lock lock{mutex};
795 if (status != normal)
return false;
806 while (full && status == normal) {
813 Thread::CancelBlock b;
814 pthread_cleanup_pop(
false);
818 Status local_status = status;
819 if (waiting) --waiters;
820 if (local_status != normal)
return false;
821 new (
static_cast<void*
>(datum)) T{std::move(obj)};
827 template<
class... Args>
829 bool waiting =
false;
830 Thread::Mutex::Lock lock{mutex};
831 if (status != normal)
return false;
842 while (full && status == normal) {
849 Thread::CancelBlock b;
850 pthread_cleanup_pop(
false);
854 Status local_status = status;
855 if (waiting) --waiters;
856 if (local_status != normal)
return false;
857 new (
static_cast<void*
>(datum)) T{std::forward<Args>(args)...};
864 bool waiting =
false;
865 Thread::Mutex::Lock lock{mutex};
876 while (!full && status == normal) {
883 Thread::CancelBlock b;
884 pthread_cleanup_pop(
false);
886 if (status == destructing) {
889 if (waiting) --waiters;
896 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
897 T* ptr = std::launder(datum);
909 if (waiting) --waiters;
913 if (waiting) --waiters;
918 if (waiting) --waiters;
924 bool waiting =
false;
925 Thread::Mutex::Lock lock{mutex};
936 while (!full && status == normal) {
943 Thread::CancelBlock b;
944 pthread_cleanup_pop(
false);
946 if (status == destructing) {
949 if (waiting) --waiters;
953 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
954 T* ptr = std::launder(datum);
958 obj = std::move(*ptr);
966 if (waiting) --waiters;
970 if (waiting) --waiters;
975 if (waiting) --waiters;
980 AsyncChannel(): waiters(0), full(false), status(normal),
990 datum(static_cast<T*>(std::malloc(sizeof(T)))) {
991 if (!datum)
throw std::bad_alloc();
996 status = destructing;
1008 #ifdef CGU_USE_SCHED_YIELD
1019 #if defined(__cpp_lib_launder) && __cpp_lib_launder >= 201606
1020 if (full) std::launder(datum)->~T();
1022 if (full) datum->~T();
1030 #endif // DOXYGEN_PARSING
1034 #endif // CGU_ASYNC_CHANNEL_H