73 #ifndef CGU_ASYNC_QUEUE_H
74 #define CGU_ASYNC_QUEUE_H
89 #ifdef CGU_USE_SCHED_YIELD
105 virtual const char*
what()
const throw() {
return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
151 template <
class T,
class Container = std::list<T> >
class AsyncQueue {
159 std::queue<T, Container> q;
173 #ifdef CGU_USE_SCHED_YIELD
223 q.push(std::move(obj));
249 template<
class... Args>
252 q.emplace(std::forward<Args>(args)...);
321 obj = std::move(q.front());
446 if (
this != &other) {
447 lock2(mutex, other.mutex);
478 lock2(mutex, rhs.mutex);
481 std::queue<T, Container> temp{rhs.q};
512 q = std::move(rhs.q);
660 std::queue<T, Container> q;
675 #ifdef CGU_USE_SCHED_YIELD
726 q.push(std::move(obj));
753 template<
class... Args>
756 q.emplace(std::forward<Args>(args)...);
826 obj = std::move(q.front());
913 while (q.empty()) cond.
wait(mutex);
958 while (q.empty()) cond.
wait(mutex);
960 obj = std::move(q.front());
1117 obj = std::move(q.front());
1257 if (
this != &other) {
1258 lock2(mutex, other.mutex);
1263 if (!other.q.empty()) other.cond.
broadcast();
1302 lock2(mutex, rhs.mutex);
1305 std::queue<T, Container> temp{rhs.q};
1344 q = std::move(rhs.q);
1441 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1480 template <
class T,
class Container>
1509 template <
class T,
class Container>
1515 #if defined(CGU_USE_INHERITABLE_QUEUE) && !defined(DOXYGEN_PARSING)
1524 template <
class T,
class Allocator>
1525 class AsyncQueue<T, std::list<T, Allocator> > {
1527 typedef std::list<T, Allocator> Container;
1528 typedef typename Container::value_type
value_type;
1529 typedef typename Container::size_type
size_type;
1537 class Q:
public std::queue<T, Container> {
1539 void splice_end(Container&& lst) {
1540 this->c.splice(this->c.end(), std::move(lst));
1542 void unsplice_beginning(Container& lst) {
1543 lst.splice(lst.begin(), this->c, this->c.begin());
1546 mutable Thread::Mutex mutex;
1549 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1552 if (!m2.trylock()) {
1557 #ifdef CGU_USE_SCHED_YIELD
1567 Container temp{obj};
1568 Thread::Mutex::Lock lock{mutex};
1570 q.splice_end(std::move(temp));
1580 temp.push_back(std::move(obj));
1581 Thread::Mutex::Lock lock{mutex};
1583 q.splice_end(std::move(temp));
1586 template<
class... Args>
1587 void emplace(Args&&... args) {
1589 temp.emplace_back(std::forward<Args>(args)...);
1590 Thread::Mutex::Lock lock{mutex};
1592 q.splice_end(std::move(temp));
1596 Thread::Mutex::Lock lock{mutex};
1597 if (q.empty())
throw AsyncQueuePopError();
1603 Thread::Mutex::Lock lock{mutex};
1604 if (q.empty())
throw AsyncQueuePopError();
1605 obj = std::move(q.front());
1615 Thread::Mutex::Lock lock{mutex};
1616 if (q.empty())
throw AsyncQueuePopError();
1618 q.unsplice_beginning(temp);
1620 obj = std::move(temp.front());
1624 Thread::Mutex::Lock lock{mutex};
1625 if (q.empty())
throw AsyncQueuePopError();
1629 bool empty()
const {
1630 Thread::Mutex::Lock lock{mutex};
1635 Thread::Mutex::Lock lock{mutex};
1640 if (
this != &other) {
1641 lock2(mutex, other.mutex);
1650 lock2(mutex, rhs.mutex);
1660 Thread::Mutex::Lock lock{mutex};
1661 q = std::move(rhs.q);
1672 Thread::Mutex::Lock lock{mutex};
1685 template <
class T,
class Allocator>
1686 class AsyncQueueDispatch<T, std::list<T, Allocator> > {
1688 typedef std::list<T, Allocator> Container;
1689 typedef typename Container::value_type
value_type;
1690 typedef typename Container::size_type
size_type;
1698 class Q:
public std::queue<T, Container> {
1700 void splice_end(Container&& lst) {
1701 this->c.splice(this->c.end(), std::move(lst));
1703 void unsplice_beginning(Container& lst) {
1704 lst.splice(lst.begin(), this->c, this->c.begin());
1707 mutable Thread::Mutex mutex;
1711 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1714 if (!m2.trylock()) {
1719 #ifdef CGU_USE_SCHED_YIELD
1729 Container temp{obj};
1730 Thread::Mutex::Lock lock{mutex};
1732 q.splice_end(std::move(temp));
1743 temp.push_back(std::move(obj));
1744 Thread::Mutex::Lock lock{mutex};
1746 q.splice_end(std::move(temp));
1750 template<
class... Args>
1751 void emplace(Args&&... args) {
1753 temp.emplace_back(std::forward<Args>(args)...);
1754 Thread::Mutex::Lock lock{mutex};
1756 q.splice_end(std::move(temp));
1761 Thread::Mutex::Lock lock{mutex};
1762 if (q.empty())
throw AsyncQueuePopError();
1768 Thread::Mutex::Lock lock{mutex};
1769 if (q.empty())
throw AsyncQueuePopError();
1770 obj = std::move(q.front());
1780 Thread::Mutex::Lock lock{mutex};
1781 if (q.empty())
throw AsyncQueuePopError();
1783 q.unsplice_beginning(temp);
1785 obj = std::move(temp.front());
1789 Thread::Mutex::Lock lock{mutex};
1790 while (q.empty()) cond.wait(mutex);
1791 Thread::CancelBlock b;
1797 Thread::Mutex::Lock lock{mutex};
1798 while (q.empty()) cond.wait(mutex);
1799 Thread::CancelBlock b;
1800 obj = std::move(q.front());
1807 bool cancelstate_restored =
false;
1809 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1814 pthread_setcancelstate(old_state, &ignore);
1815 cancelstate_restored =
true;
1816 Thread::Mutex::TrackLock lock{mutex};
1817 while (q.empty()) cond.wait(mutex);
1818 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1819 cancelstate_restored =
false;
1821 q.unsplice_beginning(temp);
1823 obj = std::move(temp.front());
1824 pthread_setcancelstate(old_state, &ignore);
1834 if (!cancelstate_restored) {
1835 pthread_setcancelstate(old_state, &ignore);
1844 Thread::Mutex::Lock lock{mutex};
1846 if (cond.timed_wait(mutex, ts))
return true;
1848 Thread::CancelBlock b;
1857 Thread::Mutex::Lock lock{mutex};
1859 if (cond.timed_wait(mutex, ts))
return true;
1861 Thread::CancelBlock b;
1862 obj = std::move(q.front());
1872 bool cancelstate_restored =
false;
1874 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1879 pthread_setcancelstate(old_state, &ignore);
1880 cancelstate_restored =
true;
1881 Thread::Mutex::TrackLock lock{mutex};
1883 if (cond.timed_wait(mutex, ts))
return true;
1885 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1886 cancelstate_restored =
false;
1888 q.unsplice_beginning(temp);
1890 obj = std::move(temp.front());
1891 pthread_setcancelstate(old_state, &ignore);
1902 if (!cancelstate_restored) {
1903 pthread_setcancelstate(old_state, &ignore);
1910 Thread::Mutex::Lock lock{mutex};
1911 if (q.empty())
throw AsyncQueuePopError();
1915 bool empty()
const {
1916 Thread::Mutex::Lock lock{mutex};
1921 Thread::Mutex::Lock lock{mutex};
1926 if (
this != &other) {
1927 lock2(mutex, other.mutex);
1931 if (!q.empty()) cond.broadcast();
1932 if (!other.q.empty()) other.cond.broadcast();
1938 lock2(mutex, rhs.mutex);
1943 if (!q.empty()) cond.broadcast();
1949 Thread::Mutex::Lock lock{mutex};
1950 q = std::move(rhs.q);
1951 if (!q.empty()) cond.broadcast();
1960 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1963 Thread::Mutex::Lock lock{mutex};
1969 #endif // CGU_USE_INHERITABLE_QUEUE