73 #ifndef CGU_ASYNC_QUEUE_H
74 #define CGU_ASYNC_QUEUE_H
89 #ifdef CGU_USE_SCHED_YIELD
105 virtual const char*
what()
const throw() {
return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
151 template <
class T,
class Container = std::list<T> >
class AsyncQueue {
158 std::queue<T, Container> q;
171 #ifdef CGU_USE_SCHED_YIELD
221 q.push(std::move(obj));
247 template<
class... Args>
250 q.emplace(std::forward<Args>(args)...);
319 obj = std::move(q.front());
444 if (
this != &other) {
445 lock2(mutex, other.mutex);
476 lock2(mutex, rhs.mutex);
479 std::queue<T, Container> temp{rhs.q};
510 q = std::move(rhs.q);
648 std::queue<T, Container> q;
661 #ifdef CGU_USE_SCHED_YIELD
712 q.push(std::move(obj));
739 template<
class... Args>
742 q.emplace(std::forward<Args>(args)...);
812 obj = std::move(q.front());
905 while (q.empty()) cond.
wait(mutex);
956 while (q.empty()) cond.
wait(mutex);
958 obj = std::move(q.front());
1134 obj = std::move(q.front());
1280 if (
this != &other) {
1281 lock2(mutex, other.mutex);
1286 if (!other.q.empty()) other.cond.
broadcast();
1325 lock2(mutex, rhs.mutex);
1328 std::queue<T, Container> temp{rhs.q};
1367 q = std::move(rhs.q);
1453 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1492 template <
class T,
class Container>
1521 template <
class T,
class Container>
1527 #if defined(CGU_USE_INHERITABLE_QUEUE) && !defined(DOXYGEN_PARSING)
1536 template <
class T,
class Allocator>
1537 class AsyncQueue<T, std::list<T, Allocator> > {
1539 typedef std::list<T, Allocator> Container;
1540 typedef typename Container::value_type
value_type;
1541 typedef typename Container::size_type
size_type;
1544 mutable Thread::Mutex mutex;
1550 class Q:
public std::queue<T, Container> {
1552 void splice_end(Container&& lst) {
1553 this->c.splice(this->c.end(), std::move(lst));
1555 void unsplice_beginning(Container& lst) {
1556 lst.splice(lst.begin(), this->c, this->c.begin());
1561 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1564 if (!m2.trylock()) {
1569 #ifdef CGU_USE_SCHED_YIELD
1579 Container temp{obj};
1580 Thread::Mutex::Lock lock{mutex};
1582 q.splice_end(std::move(temp));
1592 temp.push_back(std::move(obj));
1593 Thread::Mutex::Lock lock{mutex};
1595 q.splice_end(std::move(temp));
1598 template<
class... Args>
1599 void emplace(Args&&... args) {
1601 temp.emplace_back(std::forward<Args>(args)...);
1602 Thread::Mutex::Lock lock{mutex};
1604 q.splice_end(std::move(temp));
1608 Thread::Mutex::Lock lock{mutex};
1609 if (q.empty())
throw AsyncQueuePopError();
1615 Thread::Mutex::Lock lock{mutex};
1616 if (q.empty())
throw AsyncQueuePopError();
1617 obj = std::move(q.front());
1627 Thread::Mutex::Lock lock{mutex};
1628 if (q.empty())
throw AsyncQueuePopError();
1630 q.unsplice_beginning(temp);
1632 obj = std::move(temp.front());
1636 Thread::Mutex::Lock lock{mutex};
1637 if (q.empty())
throw AsyncQueuePopError();
1641 bool empty()
const {
1642 Thread::Mutex::Lock lock{mutex};
1647 Thread::Mutex::Lock lock{mutex};
1652 if (
this != &other) {
1653 lock2(mutex, other.mutex);
1662 lock2(mutex, rhs.mutex);
1672 Thread::Mutex::Lock lock{mutex};
1673 q = std::move(rhs.q);
1684 Thread::Mutex::Lock lock{mutex};
1697 template <
class T,
class Allocator>
1698 class AsyncQueueDispatch<T, std::list<T, Allocator> > {
1700 typedef std::list<T, Allocator> Container;
1701 typedef typename Container::value_type
value_type;
1702 typedef typename Container::size_type
size_type;
1705 mutable Thread::Mutex mutex;
1712 class Q:
public std::queue<T, Container> {
1714 void splice_end(Container&& lst) {
1715 this->c.splice(this->c.end(), std::move(lst));
1717 void unsplice_beginning(Container& lst) {
1718 lst.splice(lst.begin(), this->c, this->c.begin());
1723 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1726 if (!m2.trylock()) {
1731 #ifdef CGU_USE_SCHED_YIELD
1741 Container temp{obj};
1742 Thread::Mutex::Lock lock{mutex};
1744 q.splice_end(std::move(temp));
1755 temp.push_back(std::move(obj));
1756 Thread::Mutex::Lock lock{mutex};
1758 q.splice_end(std::move(temp));
1762 template<
class... Args>
1763 void emplace(Args&&... args) {
1765 temp.emplace_back(std::forward<Args>(args)...);
1766 Thread::Mutex::Lock lock{mutex};
1768 q.splice_end(std::move(temp));
1773 Thread::Mutex::Lock lock{mutex};
1774 if (q.empty())
throw AsyncQueuePopError();
1780 Thread::Mutex::Lock lock{mutex};
1781 if (q.empty())
throw AsyncQueuePopError();
1782 obj = std::move(q.front());
1792 Thread::Mutex::Lock lock{mutex};
1793 if (q.empty())
throw AsyncQueuePopError();
1795 q.unsplice_beginning(temp);
1797 obj = std::move(temp.front());
1801 Thread::Mutex::Lock lock{mutex};
1802 while (q.empty()) cond.wait(mutex);
1803 Thread::CancelBlock b;
1809 Thread::Mutex::Lock lock{mutex};
1810 while (q.empty()) cond.wait(mutex);
1811 Thread::CancelBlock b;
1812 obj = std::move(q.front());
1819 bool cancelstate_restored =
false;
1821 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1826 pthread_setcancelstate(old_state, &ignore);
1827 cancelstate_restored =
true;
1828 Thread::Mutex::TrackLock lock{mutex};
1829 while (q.empty()) cond.wait(mutex);
1830 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1831 cancelstate_restored =
false;
1833 q.unsplice_beginning(temp);
1835 obj = std::move(temp.front());
1836 pthread_setcancelstate(old_state, &ignore);
1846 if (!cancelstate_restored) {
1847 pthread_setcancelstate(old_state, &ignore);
1856 Thread::Mutex::Lock lock{mutex};
1858 if (cond.timed_wait(mutex, ts))
return true;
1860 Thread::CancelBlock b;
1869 Thread::Mutex::Lock lock{mutex};
1871 if (cond.timed_wait(mutex, ts))
return true;
1873 Thread::CancelBlock b;
1874 obj = std::move(q.front());
1884 bool cancelstate_restored =
false;
1886 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1891 pthread_setcancelstate(old_state, &ignore);
1892 cancelstate_restored =
true;
1893 Thread::Mutex::TrackLock lock{mutex};
1895 if (cond.timed_wait(mutex, ts))
return true;
1897 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1898 cancelstate_restored =
false;
1900 q.unsplice_beginning(temp);
1902 obj = std::move(temp.front());
1903 pthread_setcancelstate(old_state, &ignore);
1914 if (!cancelstate_restored) {
1915 pthread_setcancelstate(old_state, &ignore);
1922 Thread::Mutex::Lock lock{mutex};
1923 if (q.empty())
throw AsyncQueuePopError();
1927 bool empty()
const {
1928 Thread::Mutex::Lock lock{mutex};
1933 Thread::Mutex::Lock lock{mutex};
1938 if (
this != &other) {
1939 lock2(mutex, other.mutex);
1943 if (!q.empty()) cond.broadcast();
1944 if (!other.q.empty()) other.cond.broadcast();
1950 lock2(mutex, rhs.mutex);
1955 if (!q.empty()) cond.broadcast();
1961 Thread::Mutex::Lock lock{mutex};
1962 q = std::move(rhs.q);
1963 if (!q.empty()) cond.broadcast();
1972 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1975 Thread::Mutex::Lock lock{mutex};
1981 #endif // CGU_USE_INHERITABLE_QUEUE