SPSC

For real-time audio processing, it is necessary to communicate to your audio thread in such a way that the audio thread can never be blocked. Audio threads are hard-real-time: any delays will lead to loud pops/crackles/echoes. This is proper engineering! Fun!

So here’s my take on a simple C Lockless Single-Producer Single-Consumer queue that can be used for this (and other) applications.

This algorithm was written from a combination of reading existing C++ SPSC libraries (I couldn’t find any C ones), Wikipedia, the C atomics documentation, and Gemini. Even though Gemini helped, this was by no means remotely vibe coded, as will be explained later under the ‘heads or tails’ heading. Performance is good but note that there are faster options out there.

Have a look at the github repo: github.com/chrishulbert/spsc.

Theory

Basically we’re creating a ‘circular buffer’ where the reader chases the writer. The writer puts an item in the buffer if there’s a vacant position, and the reader consumes items from the buffer.

Seriously, go read the Wikipedia article on circular buffers, it’s surprisingly clear for a Wiki article, and does a better job than I at explaining the idea.

I’d also recommend skim-reading the references at the end of this article to understand the theory.

Heads or Tails?

These queues are often referred to as having a head and a tail. But there doesn’t seem to be a consensus as to whether you add to the tail and read from the head, or vice-versa. So there’s a terminology problem:

  • Linux says you write (insert) to the head: Queue.
  • Rust says you write (push) to the tail (back): VecDeque.
  • C++ STL says you write (push) to the tail (back): std::queue.
  • Java says you write (insert) at the tail: java.util.Queue.

In everday life, it is usual terminology that if a person is ‘at the head of the queue at the bank’ that they will be served next. Being served is analogous to being ‘read’ in our context. I suspect Linux is the one being unintuitive here.

Anyway, this is more important than a tomahto-tomayto thing, because this confusion makes it very difficult to use AI to make suggestions around queuing code, because half its training data treats head indices as writers, and the other half of its data treats heads as readers, so it tends to mix them up in code. AI aside, it makes it difficult to understand when reading examples!

Wikipedia avoids this confusion entirely by referring to them as the ‘write pointer’ and ‘read pointer’, so I’ll follow that terminology to make things unambiguous.

Circular representation in memory

Now since RAM isn’t circular, how do we represent this ring buffer?

It is a fixed-size array, with an index for where to write to, and an index for where to read from.

Since it is represented as a fixed-size array, this means that the buffer can be filled, and writes can fail. Since the writer is often the non-real-time side of the equation, if writes must not fail, some form of slower dynamic array could be used to keep track and retry later. But then you have to worry about “backpressure” when the consumer cannot keep up… 😀

It looks like this initially, for a size-4 queue. When the indices are equal (not necessarily 0, just equal) as follows, the queue is considered to be empty:

  ┌───┐
0 │   │ ← Reader index, Writer index
  ├───┤
1 │   │
  ├───┤
2 │   │
  ├───┤
3 │   │
  └───┘

To add item A, it is written at the current writer index, then the writer is incremented:

  ┌───┐
0 │ A │ ← Reader
  ├───┤
1 │   │ ← Writer
  ├───┤
2 │   │
  ├───┤
3 │   │
  └───┘

Then we add item B:

  ┌───┐
0 │ A │ ← Reader
  ├───┤
1 │ B │
  ├───┤
2 │   │ ← Writer
  ├───┤
3 │   │
  └───┘

Then we read. Since the reader index != writer, it considers it may read an item. It reads from the read index position, then increments the index:

  ┌───┐
0 │   │ A is read
  ├───┤
1 │ B │ ← Reader
  ├───┤
2 │   │ ← Writer
  ├───┤
3 │   │
  └───┘

We can read again, because the indices are unequal:

  ┌───┐
0 │   │ 
  ├───┤
1 │   │ B is read
  ├───┤
2 │   │ ← Reader, Writer
  ├───┤
3 │   │
  └───┘

There is nothing left to read, because the Reader == Writer:

  ┌───┐
0 │   │ 
  ├───┤
1 │   │
  ├───┤
2 │   │ ← Reader, Writer
  ├───┤
3 │   │
  └───┘

Now let’s add a few more, so we can see how it wraps around in a circular fashion. Let’s add X:

  ┌───┐
0 │   │ 
  ├───┤
1 │   │
  ├───┤
2 │ X │ ← Reader
  ├───┤
3 │   │ ← Writer
  └───┘

Now we add Y. Notice that the writer wraps around (circles) to index 0:

  ┌───┐
0 │   │ ← Writer
  ├───┤
1 │   │
  ├───┤
2 │ X │ ← Reader
  ├───┤
3 │ Y │
  └───┘

Now we add Z:

  ┌───┐
0 │ Z │ 
  ├───┤
1 │   │ ← Writer
  ├───┤
2 │ X │ ← Reader
  ├───┤
3 │ Y │
  └───┘

Even though the array above has an empty element, this queue is considered ‘full’ because (Writer + 1) % Length == Reader.

This ‘wasted’ element is a simple common technique to know when the queue is full, because if that extra element was filled, then Writer == Reader, which is the trigger for being considered empty, and another variable would be needed to disambiguate empty vs full.

To the best of my knowledge, this ‘wasted’ element isn’t reserved for multithreading safety, as the memory barriers (discussed later) solve that issue, it is just the way to determine full vs empty.

Atomics

Now: How is this to be made thread-safe?

Atomics are used for synchronising the threads in a lockless way, taking advantage of hardware CPU features to ensure neither thread will have to wait. In particular, this queue uses “release-acquire ordering”. More about atomics and release-acquire ordering can be read here.

In short:

  • An atomic_size_t foo; variable exists.
  • Values are ‘acquired’/loaded/read from this.
  • Values are ‘released’/stored/written to this.
  • When this acquire/release pattern is followed, the CPU guarantees: Any writes to other variables above the ‘release’ line of code will be visible to the ‘acquiring’ (reading) thread, they won’t be partially written or reordered by Out-Of-Order CPU optimisation or anything.

Thread Safety

Now we’re familiar with Atomics, how do they apply here?

  • A new entry is stored before the incremented write index is ‘released’.
  • The reader ‘acquires’ the write index, guaranteeing the data for the new entry is in a consistent state, because the entry was stored before the ‘release’.
  • The reader reads all the entries, then ‘releases’ the incremented read index.
  • The writer ‘acquires’ the read index, guaranteeing the reader has finished reading any queue entries, because the queue reading was completed before the ‘release’.

Code

Code is available at github.com/chrishulbert/spsc with benchmarking; however the important parts are here:

#include <stdatomic.h>

// You may want to have a 'type' for your queue entries:
typedef enum {
    QUEUE_ENTRY_TYPE_PLAY,
    QUEUE_ENTRY_TYPE_STOP,
} QueueEntryType;

// Put whatever you want to have in your queue entries here:
typedef struct {
    QueueEntryType type;
    int a;
    int b;
    int c;
} QueueEntry;

// If the queue size is a power of two eg 16,
// the '%' calculations later will be optimised to '& 0xF'.
#define QUEUE_SIZE 16

// This is the SPSC queue:
static struct {
    // Producer's index (writer thread, perhaps the game):
    atomic_size_t writeIndex;

    // Since the producer is the only thread that updates writeIndex,
    // it can use this 'mirror' as its source of truth,
    // thus no synchronisation is needed when reading.
    size_t writeIndexMirror; 

    // Consumer's index (reader thread, perhaps the audio thread).
    atomic_size_t readIndex;

    // Since the consumer is the only thread that updates readIndex,
    // it can use this 'mirror' as the source of truth,
    // thus no synchronisation is needed when reading.
    size_t readIndexMirror; 

    // The ring buffer.
    QueueEntry buffer[QUEUE_SIZE]; 
} queue;

// Write an item to the queue.
// To be called from the producer thread.
// Returns true on success.
bool queue_write(QueueEntry entry) {
    // Since only this thread updates the write index, use a non-atomic
    // mirror for a potentially-quicker read:
    size_t writeIndex = queue.writeIndexMirror;
    size_t nextWriteIndex = (writeIndex + 1) % QUEUE_SIZE;

    // Acquire the read index, so that any updates the reader made are visible.
    // Not that the reader makes any changes to the buffer, though.
    // Open to feedback here if this is necessary.
    size_t readIndex = atomic_load_explicit(
                           &queue.readIndex,
                           memory_order_acquire);

    // Check if queue is full.
    if (nextWriteIndex == readIndex) {
        return false; // Full!
    }

    // Store this entry in the queue.
    queue.buffer[writeIndex] = entry;
    
    // Use 'release' to ensure the entry in the buffer is visible
    // to the reader after it 'acquires' the write index.
    atomic_store_explicit(&queue.writeIndex, nextWriteIndex, memory_order_release);

    // Use this mirror as the source of truth, so it can be read next time
    // without needing synchronisation, since only this thread uses it.
    queue.writeIndexMirror = nextWriteIndex;

    return true; // Success, there was room!
}

// Read items from the queue, handling each one.
// To be called from the consumer thread.
void queue_read() {
    size_t readIndex = queue.readIndexMirror;

    // Acquire the write index from the writer thread.
    // Once acquired, any buffer updates made before updating the
    // write index will be visible to this thread.
    size_t writeIndex = atomic_load_explicit(
                            &queue.writeIndex, 
                            memory_order_acquire);

    // Loop through all entries:
    while (readIndex != writeIndex) {
        // Get this entry:
        QueueEntry* entry = &queue.buffer[readIndex];

        // Increment the read index, wrapping to 0 at the end of the queue buffer:
        readIndex = (readIndex + 1) % QUEUE_SIZE;
        
        // Deal with this entry:
        switch (entry->type) {
            case QUEUE_ENTRY_TYPE_PLAY:
                // Do something.
                break;

            case QUEUE_ENTRY_TYPE_STOP:
                // Do something.
                break;
        }
    }

    // Release the read index so the producer knows space has been cleared in
    // the buffer that it can use to store incoming entries:
    atomic_store_explicit(&queue.readIndex, readIndex, memory_order_release);

    queue.readIndexMirror = readIndex;
}

Benchmark

In my testing on a base model M4 Macbook Air:

Speed (lower is better): 14.886 nanos per entry
Throughput (higher is better): 67179 ops / millisecond

This is about 1/5th as fast as the 362Kops/sec that SPSCQueue achieves. I’m not sure why the speed difference? Perhaps because I’m running on macOS. Nevertheless, this is plenty fast enough for eg audio threads.

References

Thanks for reading, I pinky promise this was written by a human, not AI, hope you found this fascinating, at least a tiny bit, God bless!

Thanks for reading! And if you want to get in touch, I'd love to hear from you: chris.hulbert at gmail.

Chris Hulbert

(Comp Sci, Hons - UTS)

Software Developer (Freelancer / Contractor) in Australia.

I have worked at places such as Google, Cochlear, CommBank, Assembly Payments, News Corp, Fox Sports, NineMSN, FetchTV, Coles, Woolworths, Trust Bank, and Westpac, among others. If you're looking for help developing an iOS app, drop me a line!

Get in touch:
[email protected]
github.com/chrishulbert
linkedin



 Subscribe via RSS