Example 2: One Producer, Many Consumers, Using Lock-Free Mailboxes
Next, let's consider a similar example, but instead of locks ,we'll use ordered atomic variables to synchronize the Producer-to-Consumers handoff...and no synchronization at all for interConsumers contention.
The idea in this second approach is to use a set of ordered atomic variables (e.g., Java/.NET volatile
, proposed C++0x atomic
), one for each Consumer, that will serve as "mailbox" slots for the Consumers individually. Each mail slot can hold exactly one item at a time. As the Producer produces each new task, he puts it in the next mailbox that doesn't already contain an item using an atomic write; this lets the Consumers run concurrently with the Producer, as they check their slots and perform work while the Producer is still manufacturing and assigning later tasks. For further efficiency, to avoid making the Consumer busy-wait whenever its slot is initially or temporarily empty, we'll also give each Consumer a semaphore sem
that the Producer will signal every time it puts a task into the Consumer's mail slot.
Finally, once all the tasks have been assigned, the Producer starts to put "done" dummy tasks into mailboxes that don't already contain an item until every mailbox has received one. Here's the code:
// One Producer thread: Changes any box from null to nonnull // curr = 0; // keep a finger on the current mailbox // Phase 1: Build and distribute tasks (use next available box) while( ThereAreMoreTasks() ) { task = AllocateAndBuildNewTask(); while( slot[curr] != null ) // acquire null: look for next curr = (curr+1)%K; // empty slot slot[curr] = task; // release nonnull: "You have mail!" sem[curr].signal(); } // Phase 2: Stuff the mailboxes with"done" signals numNotified = 0; while( numNotified < K ) { while( slot[curr] == done ) // acquire: look for next not-yet- curr = (curr+1)%K; // notified slot if( slot[curr] == null ) { // acquire null: check that the // mailbox is empty slot[curr] = done; // release done: write sentinel sem[curr].signal(); ++numNotified; } }
On the other side of the mailbox wall, each Consumer checks only its own mailbox. If its slot holds a new task, the Consumer takes the task, resets its mail slot to "empty," and then processes the task. If the mailbox holds a "done" dummy task, the Consumer knows it can stop.
// K Consumer threads (mySlot = 0..K-1): // Each changes its own box from non-null to null // myTask = null; while( myTask != done ) { while( (myTask = slot[mySlot]) == null ) // acquire nonnull, sem[mySlot].wait(); // without busy-waiting if( myTask != done ) { slot[mySlot] = null; // release null: tell // him we took it DoWork( myTask ); } }
A few notes: First, you can easily generalize this to replace each slot with a bounded queue; the previous example just expresses a bounded queue with a maximum size of 1. Second, this Producer code can needlessly busy-wait when it's trying to write a new task (or the remaining done
sentinels) and there are no free slots. This can be addressed by adding another semaphore, which
the Producer waits for when it has no free slots and that is signaled by each Consumer when it resets its slot to null. I've left that logic out for now to keep the example clearer.
In Example 2, it's not quite as easy to check that we did it right as it was when we used locks back in Example 1, but we can reason about the code to convince ourselves that it is correct. First, note how each Consumer's read of its slot "acquires" whatever value the Producer last wrote there, and the Producer's read of each slot "acquires" whatever value the corresponding Consumer last wrote there. We avoid conflicts because while the slot is null, it's owned by the Producer (only the Producer can change a slot from null to non-null); and while it's nonnull, it's owned by its corresponding Consumer (only the Consumer can change its slot from nonnull to null).