Condition Variables
Sometimes its not enough to lock a shared resource and use it. Sometimes the shared resource needs to be in some specific state before it can be used. For example, a thread may try and pull data off of a stack, waiting for data to arrive if none is present. A mutex is not enough to allow for this type of synchronization. Another synchronization type, known as a condition variable, can be used in this case.
A condition variable is always used in conjunction with a mutex and the shared resource(s). A thread first locks the mutex and then verifies that the shared resource is in a state that can be safely used in the manner needed. If its not in the state needed, the thread waits on the condition variable. This operation causes the mutex to be unlocked during the wait so that another thread can actually change the state of the shared resource. It also ensures that the mutex is locked when the thread returns from the wait operation. When another thread changes the state of the shared resource, it needs to notify the threads that may be waiting on the condition variable, enabling them to return from the wait operation.
>Listing Four illustrates a very simple use of the boost::condition
class. A class is defined implementing a bounded buffer, a container with
a fixed size allowing FIFO input and output. This buffer is made thread-safe
internally through the use of a boost::mutex
. The put
and get
operations use a condition variable to ensure that a thread waits for the
buffer to be in the state needed to complete the operation. Two threads are
created, one that puts 100 integers into this buffer and the other pulling
the integers back out. The bounded buffer can only hold 10 integers at one
time, so the two threads wait for the other thread periodically. To verify
that it is happening, the put
and get
operations output diagnostic
strings to std::cout
. Finally, the main
thread waits for both
threads to complete.
Listing Four: The boost::condition class
#include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include <iostream> const int BUF_SIZE = 10; const int ITERS = 100; boost::mutex io_mutex; class buffer { public: typedef boost::mutex::scoped_lock scoped_lock; buffer() : p(0), c(0), full(0) { } void put(int m) { scoped_lock lock(mutex); if (full == BUF_SIZE) { { boost::mutex::scoped_lock lock(io_mutex); std::cout << "Buffer is full. Waiting..." << std::endl; } while (full == BUF_SIZE) cond.wait(lock); } buf[p] = m; p = (p+1) % BUF_SIZE; ++full; cond.notify_one(); } int get() { scoped_lock lk(mutex); if (full == 0) { { boost::mutex::scoped_lock lock(io_mutex); std::cout << "Buffer is empty. Waiting..." << std::endl; } while (full == 0) cond.wait(lk); } int i = buf[c]; c = (c+1) % BUF_SIZE; --full; cond.notify_one(); return i; } private: boost::mutex mutex; boost::condition cond; unsigned int p, c, full; int buf[BUF_SIZE]; }; buffer buf; void writer() { for (int n = 0; n < ITERS; ++n) { { boost::mutex::scoped_lock lock(io_mutex); std::cout << "sending: " << n << std::endl; } buf.put(n); } } void reader() { for (int x = 0; x < ITERS; ++x) { int n = buf.get(); { boost::mutex::scoped_lock lock(io_mutex); std::cout << "received: " << n << std::endl; } } } int main(int argc, char* argv[]) { boost::thread thrd1(&reader); boost::thread thrd2(&writer); thrd1.join(); thrd2.join(); return 0; }