Listing 1: The Channel class.
template < typename _Tp, typename _queueTp = std::deque<_Tp> > class Channel : private _queueTp { private: Mutex monitor_; Condition bufferNotFull_, bufferNotEmpty_; volatile bool bMayStop_; ... public: // for consumer thread... _Tp poll(long msecs = -1) // ignore msecs for now, { Lock lk(monitor_); while (!bMayStop_ && 0 == ((_queueTp *)this)->size()) { bufferNotEmpty_.wait(lk); } // check if caller intentionally calls for stop if(bMayStop_ && 0 == ((_queueTp *)this)->size()) throw pc_exception("consumer to end"); // pop back _Tp item = pop(); bufferNotFull_.notify_one(); return item; } // for producer thread... bool offer(_Tp item, long msecs = -1) // ignore msecs for now { Lock lk(monitor_); while (maxSize_ == ((_queueTp *)this)->size()) { bufferNotFull_.wait(lk); } // push front push(item); bufferNotEmpty_.notify_one(); return true; } virtual void mayStop(bool bMayStop = true) { Lock lk(monitor_); bMayStop_ = bMayStop; if(bMayStop) { // if producers exit, wake up the // waiting consumer threads... bufferNotEmpty_.notify_all(); } } ... }; // Channel