When programming device drivers, network stacks, and similar systems, you often need a queue and storage management. FifoEmbed is a library that provides facilities for thread-safe basic queues, packet queues, and FIFO allocators.
October 01, 2004
URL:http://drdobbs.com/the-fifoembed-library/184401860
Dan Muresan is an engineer in the R&D group of Topex Public Switching in Bucharest, Romania. He can be contacted at muresan stanfordalumni.org.
When programming device drivers and network stacks, you usually need a queue and some form of storage management. In real-time systems, you'd like your queue and storage manager primitives to run in deterministic time. FifoEmbed, the C library I present here, can help. The complete source code for FifoEmbed is available at http://www.cuj.com/code/.
FifoEmbed provides three abstract data types (classes), all of which are implemented on top of circular arrays:
The three classes offer constant-time primitives and are inherently thread-safe, requiring (almost) no locks when used in a producer/consumer framework.
FifoEmbed is designed to be used concurrently by one producer and one consumer thread. A typical scenario is a TCP/IP or RTP (real-time protocol) implementation. The producer receives packets from the network, allocates storage out of an mpoolq, and inserts pointers in a packet queue. Packets may arrive out of orderthe packet queue will place them into their proper slots. The consumer dequeues pointers from the packet queue, processes the data, and frees up storage.
All classes store data in circular arrays and use two pointersa head and a tailto keep track of where the actual queue resides. The two pointers start out at the beginning of the array and increment until they reach the array boundary, at which point they wrap around to the beginning. When the producer enqueues an element, the tail increments; when the consumer dequeues an element, the head increments. The queue is empty when the head and tail coincide, and full when the tail is one slot behind the head (the dummy slot convention). The slots between the head pointer and tail pointer form the active window (the actual queue), while the rest of the array is called the "inactive part;" see Figure 1.
For flexibility, I have defined the classes as pseudotemplates, basing the underlying arrays on any previously defined type. For example, the mpoolq is defined in the mpoolq.h header file as:
typedef struct { MPOOLQ_CELL_T *base, *end, *head, *tail; int size; // == end - base } MPOOLQ_T;
Users must define MPOOLQ_CELL_T (and optionally MPOOLQ_T) before including mpoolq.h:
#define MPOOLQ_CELL_T int #include "mpoolq.h"
In real-time systems, lock-free algorithms are desirable because they avoid the problem of priority inversion. I designed FifoEmbed around the venerable single-reader/single-writer lock-free ring buffer algorithm, which is used in many software packages (audio drivers in particular). Locks are eliminated by partitioning primitives and class members into three categories and enforcing the following restrictions:
Access to the head and tail pointers is routed via a special union type that prevents the compiler from optimizing inappropriately (cache writes in registers or reorder stores):
union { volatile T * volatile w; volatile T * volatile const r; volatile T * const l; } head, tail;
Here, T is the appropriate cell type. The syntax means that for the r and w fields, both the address and the pointed cells are volatile, while for l, only the pointed cells are volatile. Primitives always write via the w field and read via:
On the hardware side, the lock-free implementation requires CPU/compiler support for sequentially coherent, atomic access to pointers and array slots. In particular, multiprocessor systems may present an inconsistent view of the memory in the absence of explicit synchronization instructions (memory barriers); for such targets, FifoEmbed requires extra porting effort.
In simple systems, in which both the consumer and producer are polled alternately from an endless main loop, there might be no need for synchronization. In most systems, however, the consumer and producer must sleep when the queue is empty/full and signal each other when changes occur. Listing 5, which presents the solution implemented in pthreads_framework.c, uses:
Listing 5 uses the basic queue. The pthread_cond_wait() operation takes a mutex and a condition variable and atomically unlocks the mutex and waits for the condition variable to be signaled; right before pthread_cond_wait() exits, it locks the mutex again [1].
FifoEmbed is implemented in four C header files (see Table 1). Most primitives are defined as both macros (Listing 1) and inline functions; the inline function is often a trivial stub that references the corresponding macro. The partial listings show only one of the versions.
Function and macro names are prefixed with the name of the pertinent class (mpoolq_alloc() and pktq_ins_at(), for instance); in discussion, I often omit the prefix when the class is clear from context. I also use the following suffixes frequently:
Instances of all FifoEmbed classes are initialized by passing the base address and the size of the underlying array to the appropriate init() primitive; you must allocate the array beforehand.
The size of the active window should be checked using the:
To minimize code duplication, all primitives related to circular arrays are defined as C macros in one header file, circ_arr.h. The macros fall into three categories:
Basic queues (Listing 2) support two access models: copying access and direct access. With copying access, elements are copied into and out of the queue using the ins() and read() primitive families.
With direct access, the producer and/or consumer access data in the circular array directly, and call the swallow() and drop() primitives to commit the changes when done. For example, here's how a producer can insert 10 elements into a queue q using direct access:
Similarly, to sum up the oldest 10 elements from q using direct access, the consumer would:
An application using fixed-size blocks exclusively should make the queue size a multiple of the block size to ensure that all blocks can be enqueued and/or dequeued with direct access.
The direct-access primitives facilitate zero-copy networking implementations.
Packet queues (Listing 3) are implemented as a basic queue holding pointers to packets; there is no special data structure. Basic queue primitives may still work, but there are also packet queue-specific primitives (their names start with the pktq_ prefix).
The packet queue can accommodate empty slots, marked by NULL pointers. This makes out-of-sequence insertion possible: A new primitive, ins_oo(), takes both an offset argument and a value argument. When packets arrive in sequence, the caller should supply an offset of 1. When early packets arrive, the offset should be two or more. When late packets arrive, the offset should be a negative number (see Table 2 and Figure 2).
The rules for converting sequence numbers to offsets follow from the way the insertion primitives modify the data structure. Insertion with a positive offset o amounts to inserting o-1 empty slots before performing the actual insertion (the tail pointer increments by o slots overall). The insertion primitives, striving to run in constant time, do not actually clear the o-1 newly incorporated slots, but rely on pktq_init() and pktq_read() to keep slots in the inactive part of the array empty at all times.
Insertion with a negative offset o does not change the tail pointer, but overwrites a slot within the active window (the slot o positions behind the tail). Such late insertions are not thread-safe: If they are ever used, you must protect all late insertions and all pktq_read() invocations with critical sections.
The ins_oo() primitive may fail if the target slot is already occupied (the current packet is a duplicate) or if the target slot is outside the active window (the offset is too large or too small). The offset is too small when the packet has arrived too late (its target slot has already been passed up by the consumer). The offset is too large when the insertion would force the tail pointer to cross the dummy slot. In all cases, supplied offsets must be less than the size of the array; the application may need extra logic to handle large gaps in the packet sequence.
Since ins_oo() may fail, applications may end up transferring unusable packets. To avoid this problem, there is a two-step alternative to ins_oo(): ins_valid() returns a pointer to the target slot if the insertion would succeed and NULL otherwise; ins_at() writes to a target slot directly. If ins_valid() fails, the packet can be skipped; if ins_valid() succeeds, the application can transfer the packet and call ins_at(). The two steps must be enclosed in a critical section.
Consumers can call either fifo_read() or, if out-of-sequence insertions are ever used, pktq_read(). The pktq version skips empty slots; thus, pktq_read() runs in linear time with respect to the largest number of contiguous empty slots in the active window. It also fills serviced slots with NULL.
A memory pool queue (mpoolq, Listing 4) is similar to a regular queue; the difference is that its elements are not slots, but contiguous blocks of slots from the circular array. The circular array is called the "pool;" its slots are called "cells." The user-defined cell type, MPOOLQ_ CELL_T, determines alignment, as well as the maximum possible size of a block. Blocks are aligned on a cell boundary and cover an integer number of cells. Size arguments and return values for the mpoolq primitives are specified in user units instead of cells, unless otherwise implied by the primitive's name; a user unit defaults to a byte (char) but can be redefined if required (MPOOLQ_USER_T).
The first cell of each block is a header storing the length of the block. Each block can be either in use or free (a hole). To distinguish between the two kinds, the header contains a positive number (counted in user units, excluding the header itself) for allocated blocks and a negative number (counted in cells, including the header) for holes; see Figure 3 (where cells are equal to user units for simplicity). MPOOLQ_CELL_T must be a signed type. The maximum size of a block is:
2 ^ (8 * sizeof (MPOOLQ_CELL_T) - 1)
The mpoolq combines the roles of a queue and a storage manager. The producer allocates consecutive blocks out of the pool by calling alloc() (the blocks are automatically enqueued). The consumer dequeues blocks with the help of two primitives: oldest() returns the address of the oldest block in the queue, while free() marks blocks as free and possibly dequeues them. If the sequence of free() calls matches the original sequence of alloc() calls, blocks are automatically dequeued. If blocks are freed out of sequence, dequeuing is delayed. When many recent blocks have been freed but an old block is still in use, the pool may appear full.
On the producer side, the alloc() primitive tries to return blocks starting at the tail pointer; if there isn't enough space there, the allocator tries to leave a hole at the end of the pool and wrap around (Figure 4). In both cases, the resulting block must not cross either the dummy slot or the array boundary.
This algorithm works well only when the requested size is less than half the size of the pool. A request for more may fail even when the pool is empty: If the head and the tail pointer are both at the half-point of the pool, the block cannot be allocated starting at the tail pointer (it would cross outside the pool); neither can the block be allocated at the base of the pool after creating a hole (it would cross the dummy slot). Nothing can be done, short of reinitializing the mpoolq to reset the head and tail pointers.
The mpoolq version of the lavail() primitive, lavail_cells(), returns the maximum number of cells available for allocation without creating a hole at the end of the poolunless that number is zero. When the tail pointer is exactly one slot behind the end of the array, no block can be allocated without wraparound (the single slot is wasted on the header); rather than return zero, lavail_cells() forces the tail to wrap around by creating a hole of length zero and tries again. The result may still be zero if the pool is full.
On the consumer side, the free() primitive turns a used block into a hole by modifying its header; afterwards, it calls next_blk(), a primitive that sweeps away holes at the beginning of the queue (by moving the head pointer to the beginning of the first allocated block, if any). The other consumer-side primitives (oldest(), used_cells(), and empty()), call next_blk() before examining the active window. Thus, they return accurate results regardless of the structure of the active window.
In specialized applications, an mpoolq is better than a classic heap managed by malloc() and free() because of its deterministic behavior. When accesses occur in a FIFO pattern, alloc(), free(), and all the other primitives run in constant time with respect to the pool size and the number of allocated blocks. When blocks are freed out of order, only consumer-side primitives are impacted (they run in linear time with respect to the largest deviation from FIFO order). No fragmentation occurs after a large number of blocks have been allocated and freed, so performance does not degrade over time. Additionally, mpoolq.h exposes functions (not included in the partial listing) that query the size of a block, free parts of a block, and shrink the most recently allocated block.
[1] The Open Group, Single Unix Specification (http://www.opengroup.org/products/publications/catalog/un.htm).
/* Wrapping macros */ #define CIRC_ARR_WRAP_END( ptr, base, end ) \ ((ptr) == (end) ? (base) : (ptr)) #define CIRC_ARR_WRAP_FWD( ptr, end, sz ) \ ((ptr) >= (end) ? (ptr) - (sz) : (ptr)) #define CIRC_ARR_WRAP_REV( ptr, base, sz ) \ ((ptr) < (base) ? (ptr) + (sz) : (ptr)) #define CIRC_ARR_WRAP( ptr, base, end, sz ) \ ((ptr) < (base) ? (ptr) + (sz) : ((ptr) >= (end) ? (ptr) - (sz) : (ptr))) #define CIRC_ARR_ADV_PTR( ptr, base, end ) \ CIRC_ARR_WRAP_END (ptr + 1, base, end) /* Counting macros */ #define CIRC_ARR_EMPTY( head, tail ) ((head) == (tail)) #define CIRC_ARR_USED( head, tail, sz ) \ CIRC_ARR_WRAP_REV ((tail) - (head), 0, sz) #define CIRC_ARR_LUSED( head, tail, end ) \ ((tail) >= (head) ? (tail) - (head) : (end) - (head)) #define CIRC_ARR_DS_FULL( head, tail, sz ) \ (((tail) + 1 - (head) == 0) || ((tail) + 1 - (head) == (sz))) #define CIRC_ARR_DS_AVAIL( head, tail, sz ) \ CIRC_ARR_WRAP_REV (head - 1 - tail, 0, sz) #define CIRC_ARR_DS_LAVAIL( base, end, head, tail, sz ) \ ((head) <= (tail) \ ? (end) - (tail) - ((head) == (base) ? 1 : 0) \ : CIRC_ARR_DS_AVAIL (head, tail, sz) \ ) /* "Power of 2" macros */ #define CIRC_ARR_P2_OFFSET( ptr, base, mask ) \ (((ptr) - (base)) & (mask)) #define CIRC_ARR_P2_WRAP( ptr, base, mask ) \ ((base) + CIRC_ARR_P2_OFFSET (ptr, base, mask)) #define CIRC_ARR_P2_ADV_PTR( ptr, base, mask ) \ CIRC_ARR_P2_WRAP (ptr + 1, base, mask) #define CIRC_ARR_P2_USED( head, tail, mask ) \ CIRC_ARR_P2_OFFSET (tail, head, mask) #define CIRC_ARR_P2_DS_AVAIL( head, tail, mask ) \ CIRC_ARR_P2_OFFSET (tail - 1, head, mask)
#if 0 /* REQUIRED DEFINITIONS */ #define FIFO_CELL_T int /* any type */ #define FIFO_T myq /* optional name of queue type */ #define FIFO_P2_SIZE /* optional, optimize for size 2^n */ #endif #include "circ_arr.h" typedef struct { FIFO_CELL_T *base, *end; union { volatile FIFO_CELL_T * volatile w; volatile FIFO_CELL_T * volatile const r; volatile FIFO_CELL_T * const l; } head, tail; int size, mask; } FIFO_T; /* Initialization */ #define FIFO_INIT( q, b, sz ) (\ (q)->base = (b), (q)->end = (b) + (sz), \ (q)->tail.w = (q)->head.w = (b), \ (q)->size = (sz), (q)->mask = (sz) - 1 \ ) /* Producer primitives */ static inline BOOL fifo_full (const FIFO_T *q) { volatile const FIFO_CELL_T *head = q->head.r; #ifdef FIFO_P2_SIZE return CIRC_ARR_P2_DS_FULL (head, q->tail.l, q->mask); #else return CIRC_ARR_DS_FULL (head, q->tail.l, q->size); #endif } static inline void fifo_ins (FIFO_T *q, FIFO_CELL_T x) { *q->tail.l = x; q->tail.w = CIRC_ARR_ADV_PTR (q->tail.l, q->base, q->end); } #define FIFO_INS_CHK( q, x ) \ (fifo_full (q) ? FALSE : (fifo_ins (q, x), TRUE)) static inline void fifo_swallow (FIFO_T *q, int delta) { q->tail.w = CIRC_ARR_WRAP_END (q->tail.l + delta, q->base, q->end); } /* Consumer primitives */ static inline FIFO_CELL_T fifo_read (FIFO_T *q) { q->head.w = CIRC_ARR_ADV_PTR (q->head.l, q->base, q->end); return *q->head.l; } #define FIFO_DROP( q, n ) ((q)->head.w = \ CIRC_ARR_WRAP_FWD ((q)->head.l + n, (q)->end, (q)->size))
#include "fifo.h" /* Initialization */ static inline void pktq_init (FIFO_T *q, FIFO_CELL_T *base, int size) { volatile FIFO_CELL_T *ptr; int n; FIFO_INIT (q, base, size); for (ptr = q->head.l, n = size; n--; ptr++) *ptr = NULL; } /* Producers */ static inline FIFO_CELL_T *pktq_ins_valid (FIFO_T *q, FIFO_SIZE_T n) { FIFO_SIZE_T avail = FIFO_AVAIL (q); if (n != 0 && ((n < 0 && n >= avail - q->size + 2) || n <= avail)) { volatile FIFO_CELL_T *ptr = q->tail.l + n - 1; ptr = CIRC_ARR_WRAP (ptr, q->base, q->end, q->size); if (*ptr == NULL) return (FIFO_CELL_T *) ptr; } return NULL; } static inline void pktq_ins_at (FIFO_T *q, int n, FIFO_CELL_T pkt, volatile FIFO_CELL_T *ptr) { *ptr = pkt; if (n > 0) q->tail.w = CIRC_ARR_ADV_PTR (ptr, q->base, q->end); } static inline void pktq_ins_oo (FIFO_T *q, int n, FIFO_CELL_T pkt) { int len = FIFO_USED (q); volatile FIFO_CELL_T *ptr = q->tail.l + n - 1; ptr = CIRC_ARR_WRAP (ptr, q->base, q->end, q->size); pktq_ins_at (q, n, pkt, ptr); } /* Consumers */ STATIC_INLINE void pktq_drop (FIFO_T *q, FIFO_SIZE_T n) { FIFO_SIZE_T total; volatile FIFO_CELL_T *ptr; for (ptr = q->head.l, total = 0; n; ptr = CIRC_ARR_ADV_PTR (ptr, q->base, q->end), total++) if (ptr == q->tail.r) break; else if (*ptr) { *ptr = NULL; n--; } FIFO_DROP (q, total); } static inline FIFO_CELL_T pktq_read (FIFO_T *q) { FIFO_CELL_T result = NULL; FIFO_SIZE_T total; volatile FIFO_CELL_T *ptr; for (ptr = q->head.l, total = 0; ptr != q->tail.r; ptr = CIRC_ARR_ADV_PTR (ptr, q->base, q->end), total++) if (*ptr) { result = *ptr; *ptr = NULL; total++; break; } FIFO_DROP (q, total); return result; }
#if 0 /* REQUIRED DEFINITIONS */ #define MPOOLQ_CELL_T int /* some signed type */ #endif #include "circ_arr.h" typedef struct { MPOOLQ_CELL_T *base, *end; union { volatile MPOOLQ_CELL_T * volatile w; volatile MPOOLQ_CELL_T * volatile const r; volatile MPOOLQ_CELL_T * const l; } head, tail; int size; } MPOOLQ_T; #ifndef MPOOLQ_USER_T #define MPOOLQ_USER_T char #endif #define MPOOLQ_UUS_PER_CELL \ (sizeof (MPOOLQ_CELL_T) / sizeof (MPOOLQ_USER_T)) #define MPOOLQ_UU_TO_CELLS( n ) (((n)+1) / MPOOLQ_UUS_PER_CELL) #define MPOOLQ_CELLS_TO_UU( n ) ((n) * MPOOLQ_UUS_PER_CELL) #define MPOOLQ_BLK_SIZE( blk ) \ (((volatile MPOOLQ_CELL_T *) (blk)) [-1]) /* Producers (allocator and related functions) */ #define MPOOLQ_CREATE_HOLE( p, ctail ) ( \ (ctail) [0] = -((p)->end - (ctail)), \ (p)->tail.w = (p)->base \ ) static inline void *mpoolq_alloc (MPOOLQ_T *p, MPOOLQ_CELL_T UUs) { MPOOLQ_CELL_T cells = MPOOLQ_UU_TO_CELLS (UUs); volatile MPOOLQ_CELL_T *head = p->head.r, *old_tail = p->tail.l; volatile MPOOLQ_CELL_T *end = old_tail + cells + 1; if (end >= head) { if (head > old_tail) return NULL; else { if (! (end < p->end || (end == p->end && head != p->base))) { /* actually wrap around */ if ((end = p->base + cells + 1) >= head) return NULL; old_tail = MPOOLQ_CREATE_HOLE (p, old_tail); end = old_tail + cells + 1; } } } *old_tail = UUs; p->tail.w = CIRC_ARR_WRAP_END (end, p->base, p->end); return (void *) (1 + old_tail); } static inline MPOOLQ_CELL_T mpoolq_lavail_cells (MPOOLQ_T *p) { volatile MPOOLQ_CELL_T *head = p->head.r, *tail = p->tail.l; if (tail + 1 == p->end) { if (head == p->base) return 0; tail = MPOOLQ_CREATE_HOLE (p, tail); } return CIRC_ARR_DS_LAVAIL (p->base, p->end, head, tail, p->size); } /* Consumers */ /* changes *blk to the address of the next allocated block, if any, or the tail ptr; returns TRUE if a block was found */ static inline BOOL mpoolq_next_blk (const MPOOLQ_T *p, MPOOLQ_CELL_T **blk) { MPOOLQ_CELL_T n; for (;; *blk = CIRC_ARR_WRAP_END (*blk+n, p->base, p->end)) { if (*blk == p->tail.r) return FALSE; if ((n = - (*blk) [0]) <= 0) return TRUE; } } static inline BOOL mpoolq_empty (MPOOLQ_T *p) { MPOOLQ_CELL_T *blk = (MPOOLQ_CELL_T *) p->head.l; BOOL empty = mpoolq_next_blk (p, &blk); p->head.w = blk; return empty; } #define MPOOLQ_OLDEST( p ) \ (mpoolq_empty (p) ? NULL : (void *) (p)->head.l + 1) static inline void mpoolq_free (MPOOLQ_T *p, void *blk) { MPOOLQ_BLK_SIZE (blk) = - MPOOLQ_BLK_CELLS (blk) - 1; mpoolq_empty (p); /* calls mpoolq_next_blk() */ }
#include <pthread.h> #include <semaphore.h> #define FIFO_CELL_T int #define BASIC_FIFO_T int_queue #include "fifo.h" int_queue q; sem_t q_count; BOOL more_room; pthread_cond_t more_room_cv; pthread_mutex_t more_room_lock; int get_data () { /* ... */ } void process_data (int x) { /* ... */ } void producer () { for (;;) { int x = get_data (); pthread_mutex_lock (& more_room_lock); for (;;) { more_room = FALSE; pthread_mutex_unlock (& more_room_lock); if (FIFO_INS_CHK (& q, x)) break; pthread_mutex_lock (& more_room_lock); while (! more_room) pthread_cond_wait (& more_room_cv, & more_room_lock); } sem_post (& q_count); } } void consumer () { for (;;) { sem_wait (& q_count); process_data (FIFO_READ (& q)); pthread_mutex_lock (& more_room_lock); more_room = TRUE; pthread_mutex_unlock (& more_room_lock); pthread_cond_signal (& more_room_cv); } }
Terms of Service | Privacy Statement | Copyright © 2024 UBM Tech, All rights reserved.