A C++ Producer-Consumer Concurrency Template Library

Producer-consumer is a well-known C++ concurrency pattern that's been applied to applications ranging from scientific simulations to distributed parallel computing environments.


January 01, 2004
URL:http://drdobbs.com/a-c-producer-consumer-concurrency-templa/184401751

January 04:

Producer-consumer is a well-known C++ concurrency pattern that's been applied to applications ranging from scientific simulations (where multitasked computations are accomplished by using a pool of worker processes and a managing process that maintains the pace of progress) to image processing (where raster image data sets are filtered through multithreaded mask algorithms). It's also been applied to distributed parallel computing, where a concurrent network server responds to client requests with a pool of request handler threads. Existing implementations of the producer-consumer model (also referred to as the master-slave model) tend to be either narrowed to certain applications or bound to specific OS platforms (because of platform-specific threading and synchronization techniques). In this article, I present a C++ template-based and multithreaded producer-consumer implementation that's based on the C++ Standard Template Library (STL) and Boost::Threads library [1]. The complete source code for the implementation is available at http://www.cuj.com/code/.

My goal in developing this implementation was to abstract the producer-consumer algorithm, making it easier for other applications to adopt [2]. For instance, imagine a web server scenario in which a listening server accepts client connection requests, then processes the requests once connections are made. To be more responsive, the server may hand over an accepted client connection to a pool of worker threads by assigning the socket to a task list. The server then returns without blocking its listening mode and waits for new network connection requests. A worker thread removes the socket later from the list for further processing.

A producer-consumer pattern is chainable, making it useful in layered or multitier systems. In other words, consumers of a prior producer-consumer (PC) process (say, "production") can be the producers of a subsequent production. To extend the web server example, the worker threads that process the connections perhaps merely read the request content, then relay the requests immediately to a back-end database request queue. The subsequent database queries can be done by another PC process, where a pool of database access threads at the other end of the database-request queue pick up the query requests and fulfill the actual query tasks.

Generally, there can be multiple task schedulers (producers) that create task objects. Matching the number of producers to the number of consumers is often a runtime performance tuning requirement. The separation of producer and consumer in the PC model meets the requirement. A task holding and delivery queue is necessary for implementing a PC model. To summarize, producers in a PC model create and place tasks onto a predefined task-delivery queue without waiting for them to be processed. Consumers (task handlers) at the other end of the delivery queue retrieve the tasks and process them later. Figure 1 illustrates the PC workflow and lists the requirements.

A PC model does not dictate how consumers return results back to producers. In general, they may not. In multithreaded implementations of PC workflows, the delivery queue has to be accessed in a thread-safe manner. Finer delivery-queue tuning in terms of writer or reader access is outside the scope of the article.

The consumer pool and thread pool, two variations of the PC model, are also widely used. In a consumer pool case, there is no active producer — the task queue is created and prefilled before consumers start. In a thread pool, a simple producer and a linear task queue are the usual characteristics.

Implementation

The bulk of my work resides in two header files, ProCon.h and ThreadPool.h. To achieve reusability, I used a C++ template technique that separates a producer-consumer workflow from application-specific logics. STL's container classes are used as base collection structures for task-delivery queues that connect producers with consumers. Portability of the library to OS platforms other than Windows and Linux is assured by the STL and Boost libraries. There are four classes that make up the main portion of my PC implementation — Channel, Producer, Consumer, and Production.

The Channel class (see Listing 1) provides thread-safe access to an underlying STL container; it implements a delivery queue. Channel is declared as:

template < typename _Tp, typename _queueTp > class Channel;

The first template parameter, _Tp, is a user-defined task data structure. The second template parameter, _queueTp, represents an STL container. By default, _queueTp is std::deque<_Tp>, which is preferred over other container types in our case of frequent insertions at both ends of a queue. Normally, task-delivery queues are FIFOs. Channel can be overwriten to make task retrieval prioritized according to application needs.

STL containers are value semantic. When a task object is added to an STL container, the task object's allocator and copy constructor are called to clone the original. Similarly, when a task object is removed from an STL container, the task object's deallocator is called to delete the copy. The value semantics may be a performance concern, especially if producers and consumers frequently add tasks to and remove tasks from a queue. To workaround the issue, I suggest you use task pointers rather than task objects unless the tasks can be described with a primitive type or the task objects are reference counted and properly scoped. (See my ThreadPool implementation as an example).

The Producer, Consumer, and Production classes implement the PC model in their operator()() methods. Consequently, they can be run by Boost::Threads directly. Part of what classes Producer and Consumer do in their operator()() methods (see Listing 2) is looping on writing to or reading task objects from a task queue. Their threads may be blocked briefly if the queue is temporarily full (for producers) or empty (for consumers). A derived class of Producer overwrites its produce() method, which is called to create tasks. Subclasses of Consumer overwrite the consume(...) method that processes a task request.

There are two cases worth mentioning. First, it may be important to have the producers started before the consumers. This is an issue if consumers are started early but cannot decide whether they can exit if they are facing an empty queue for a period of time. I solved it in the Production class with a latch object to initially block the consumers from running; the latch also facilitates the synchronized releases of consumers when ready. Second, since there is no direct communication between producers and consumers, Production sets a stop flag to notify consumers after producers are gone. It lets consumer threads finish under a predefined stop condition; for example, when the task queue becomes empty. A derived class of Consumer may overwrite the cancel() method to implement a rigorous stop condition specific to an application. In short, a consumer thread may exit only after the bMayStop_ flag is set and its cancel() method returns true.

The Takable and Puttable classes are access interfaces of a task queue for consumers and producers, respectively. They provide an extra layer of indirection to access an underlying task queue. In most cases, you can ignore their existence. Producer and Consumer both have two constructors, one that accepts a task queue and another accepts a Takable or Puttable object. If you do not supply customized Takable and Puttable classes, the implementation creates defaults.

In short, users are expected to:

In addition, Consuming and ThreadPool are built on top of Producer and Consumer classes, and they represent two variant workflows of the PC model. Consuming maintains a pool of consumers that work against a static task queue. Consuming's task queue is expected to be prefilled with tasks that may not change during execution. In contrast, ThreadPool's execute(_Runnable*& runObj) function allows for dynamically submitting new Runnable tasks to its queue. ThreadPool's consumers retrieve pointers from the _Runnable tasks and invoke the _Runnable's operator()() method.

Examples

To demonstrate how you can use the classes, my first example is that of an application of computation-intensive matrix multiplication. In this case, I use Production and its consumer pool variant Consuming classes. The second example illustrates a generic network server skeleton, where I use ThreadPool to launch worker threads that process network client connection requests. Both examples run on Windows and Linux platforms.

The result of an n rows by m columns matrix A times a m rows by p columns matrix B, is an n rows by p columns matrix C. Since the products of the ith row of matrix A with elements of matrix B determine the ith row of matrix C exclusively, I choose to create tasks according to the row index (slice_i in the program) of matrix A. As such, there should be a total of n tasks to be created and processed. Class Convoluter derives from Consumer and implements consume(...), which computes the ith row of matrix C. Scheduler is a subclass of Producer that returns the next row index when its produce() method is called. The row index values are the task objects that get delivered from the producers (Scheduler) to the consumers (Convoluter). In the first approach, I derived Convolution2 from Production to manage the computation. There is one producer thread and a maximum of nConsumers consumer threads. The consumer pool approach is simpler: Convolution sets up the row index values in the task queue prior to the start of the consumer threads; producer is not involved in this case. On a 500-MHz Pentium machine running Windows 2000, benchmark runs show both approaches are about 10-30 percent slower than a single-threaded approach for matrices with sizes in the hundreds.

In my second example, I use ThreadPool to build a generic network server called NetworkService (see Listing 3).NetworkService can easily be integrated into an existing application by implementing its ServiceHandler(int socket) method, which processes a network socket request, and closes the client socket when finished. I built a simple HTTP web server with NetworkService. Upon request, the web server outputs "Hello World" and a visitor count in HTML format. It can be tested using any web browser by sending a request to http://localhost:1234. The gist of the example is the encapsulation of the network server logic and its threaded approach from the actual request handling.

Conclusion

C++ is a powerful and versatile language that promotes layered and object-oriented programming through class derivation. For example, the web server example contains software layers as STL, Boost::Threads, producer-consumer threading pool, network service encapsulation, and the web server on top. Developers working on each layer will constantly improve each component, whereas enhancement changes at the lower layers are hidden and ready to be incorporated into higher level components over time. Imagine how complex it would be (not to mention the on-going maintenance) if I had to write the web server in a procedural way with cross-platform threading ability and customizable function signatures.

Notes and References

[1] Boost libraries can be found at http://www.boost.org/. My producer-consumer implementation depends only on the Boost::Threads library.

[2] Lea, Doug. Concurrent Programming in Java, Second Edition (Addison-Wesley, 1999). Several key classes described in the article are inspired by Doug's util.concurrent Java package, which can be found at http://gee.cs.oswego.edu/dl/.


Ted Yuan received his Ph.D. in Physics from Northeastern University and he is currently a professional software developer with Spoke Software. He can be contacted via [email protected] or [email protected].


January 04:

Figure 1: Producer-consumer model as a task-processing workflow.

January 04:

Listing 1: The Channel class.

template < typename _Tp, typename _queueTp = std::deque<_Tp> >

class Channel : private _queueTp
{
private:
  Mutex monitor_; 
  Condition bufferNotFull_, bufferNotEmpty_;
  volatile bool bMayStop_;
  ...
public:
  // for consumer thread...
  _Tp poll(long msecs = -1) // ignore msecs for now, 
  {
    Lock lk(monitor_);
    while (!bMayStop_ && 0 == ((_queueTp *)this)->size())
    {
      bufferNotEmpty_.wait(lk);
    }
    // check if caller intentionally calls for stop
    if(bMayStop_ && 0 == ((_queueTp *)this)->size()) 
      throw pc_exception("consumer to end"); 
    // pop back
    _Tp item = pop();
    bufferNotFull_.notify_one();
    return item;
  }
  // for producer thread...
  bool offer(_Tp item, 
    long msecs = -1) // ignore msecs for now
  {
    Lock lk(monitor_);
    while (maxSize_ == ((_queueTp *)this)->size())
    {
      bufferNotFull_.wait(lk);
    }
    // push front
    push(item);
    bufferNotEmpty_.notify_one();
    return true;
  }
  virtual void mayStop(bool bMayStop = true) 
  {
    Lock lk(monitor_);
    bMayStop_ = bMayStop;
    if(bMayStop) 
    {
      // if producers exit, wake up the 
      // waiting consumer threads...
      bufferNotEmpty_.notify_all();
    }
  }
  ...
}; // Channel




January 04: 

Listing 2: Production::operator()().

template < typename _Tp,
  typename _ChannelTp = Channel<_Tp>, 
  typename _ProducerTp = Producer<_Tp, _ChannelTp>, 
  typename _ConsumerTp = Consumer<_Tp, _ChannelTp> >
class Production 
{
  ...
public:
  void operator()()
  {
    _ChannelTp chan(queueLen_);
    Latch theLatch, noLatch(true);
    _ProducerTp* producer = new _ProducerTp(chan, noLatch);
    _ConsumerTp* consumer = new _ConsumerTp(chan, syncStart_ ? 
                                                     theLatch : noLatch);
    std::auto_ptr<_ProducerTp> prodClean(producer);
    std::auto_ptr<_ConsumerTp> consClean(consumer);
    consumer->mayStop(false);
    // callback function for derived classes...
    pcModelCreated(*producer, *consumer);
    try {
      boost::thread_group pthreads, cthreads;
      int i;
      for (i = 0; i < nProducers_; ++i)
        pthreads.create_thread(*producer);
      for (i = 0; i < nConsumers_; ++i)
        cthreads.create_thread(*consumer);
      theLatch.release();
      beforeJoin();
      pthreads.join_all();
      while(chan.size() > 0) sleep(0, 10);
      // producers are done, consumers *may* stop...
      consumer->mayStop(true); 
      cthreads.join_all();
      afterJoin();
    } catch (boost::lock_error& err) {
      Logger::log(err.what());
    } catch (std::exception& err) { 
      Logger::log(err.what());
    } catch (...) {
      Logger::log("catched...");
    }
  }
  ...
};




January 04: 

Listing 3: NetworkService, a generic network server.

struct NetworkService
{
  ...
  // sub-classes overwrite this method and close socket 
  virtual void ServiceHandler(int socket) = 0;
  virtual bool stop() { return false; } 
  void operator()()
  {
    int server = createServerSocket(
      sourcePort, lengthWaitList+1);
    if(server < 0)
    {
      std::cout << "Can't bind to port " 
        << sourcePort << std::endl;
      return;
    }
    RunPool<SocketHandler> pool(numThreads);
    ThreadPool<SocketHandler> thpool(pool, 
      numThreads > 256 ? 256 : numThreads);
    // start the pool...
    boost::thread thrd(thpool);
    std::cout << "Ready... service at port " 
      << sourcePort << std::endl;
    while (!stop())
    {
      struct   sockaddr_in pin;
      int source = acceptClientSocket(server, pin);
      if(source < 0) continue;
      // put handler on the heap so we can delete this later...
      SocketHandler * linkage = new SocketHandler(*this, source);
      thpool.execute(linkage);
    }
    thrd.join();
    CloseConnection(server);  
    std::cout << "Shutting down network service " 
      << serviceId << " at port " << sourcePort 
      << std::endl; 
  }
  ...
};
void SocketHandler::operator()() 
{
  try 
  {
    ns_.ServiceHandler(client_);
  } catch (...) {}
  delete this;
}


        

Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.