Skip to main content

chronon::observe::SPSCQueue

Lock-free single-producer single-consumer ring buffer. More...

#include <SPSCQueue.hpp>

Public Functions

Name
std::byte *prepareWrite(size_t n)
Reserve n bytes for the producer.
std::byte *prepareRead()
SPSCQueue &operator=(const SPSCQueue & ) =delete
SPSCQueue &operator=(SPSCQueue && ) =delete
boolhasSpace(size_t n) const
voidforceCommitWrite()
Publish unconditionally; use for shutdown or explicit flush.
voidforceCommitRead()
voidfinishWrite(size_t n)
voidfinishRead(size_t n)
voidfinishAndCommitWrite(size_t n)
boolempty() const
voideagerCommitRead()
Eagerly publish so freed space becomes visible to producers without waiting for batch.
voidcommitWrite()
Publish to consumer once batch threshold reached.
voidcommitRead()
size_tcapacity() const
size_tbytesWritten() const
size_tbytesRead() const
size_tbytesAvailable() const
SPSCQueue(size_t capacity =DEFAULT_CAPACITY)
SPSCQueue(const SPSCQueue & ) =delete
SPSCQueue(SPSCQueue && ) =delete

Public Attributes

Name
size_tDEFAULT_CAPACITY
Default 128KB per thread; overridden by YAML queue_capacity when configured.
size_tDEFAULT_BYTES_PER_BATCH
Update writer/reader atomics every N bytes to amortize cache-coherency traffic.

Detailed Description

class chronon::observe::SPSCQueue;

Lock-free single-producer single-consumer ring buffer.

Cache-line-aligned atomics, cached position reads, power-of-2 capacity for bit-masking, batched commits, and 2x buffer mirroring to eliminate wrap-around handling. Synchronization via acquire/release atomics; no mutex.

Producer:

auto* ptr = queue.prepareWrite(size);
if (ptr) { memcpy(ptr, data, size); queue.finishAndCommitWrite(size); }

Consumer:

while (auto* ptr = queue.prepareRead()) {
auto* h = reinterpret_cast<const RecordHeader*>(ptr);
processEvent(h, ptr + sizeof(RecordHeader));
queue.finishRead(h->total_size);
}
queue.commitRead();

Public Functions Documentation

function prepareWrite

inline std::byte * prepareWrite(
size_t n
)

Reserve n bytes for the producer.

Return: Pointer to write location, or nullptr if full.

Note: Single producer thread only.

function prepareRead

inline std::byte * prepareRead()

Return: Pointer to next record, or nullptr if empty.

Note: Single consumer thread only.

function operator=

SPSCQueue & operator=(
const SPSCQueue &
) =delete

function operator=

SPSCQueue & operator=(
SPSCQueue &&
) =delete

function hasSpace

inline bool hasSpace(
size_t n
) const

function forceCommitWrite

inline void forceCommitWrite()

Publish unconditionally; use for shutdown or explicit flush.

function forceCommitRead

inline void forceCommitRead()

function finishWrite

inline void finishWrite(
size_t n
)

function finishRead

inline void finishRead(
size_t n
)

function finishAndCommitWrite

inline void finishAndCommitWrite(
size_t n
)

function empty

inline bool empty() const

function eagerCommitRead

inline void eagerCommitRead()

Eagerly publish so freed space becomes visible to producers without waiting for batch.

function commitWrite

inline void commitWrite()

Publish to consumer once batch threshold reached.

function commitRead

inline void commitRead()

function capacity

inline size_t capacity() const

function bytesWritten

inline size_t bytesWritten() const

function bytesRead

inline size_t bytesRead() const

function bytesAvailable

inline size_t bytesAvailable() const

function SPSCQueue

inline explicit SPSCQueue(
size_t capacity =DEFAULT_CAPACITY
)

Parameters:

  • capacity Queue capacity in bytes; rounded up to a power of 2.

function SPSCQueue

SPSCQueue(
const SPSCQueue &
) =delete

function SPSCQueue

SPSCQueue(
SPSCQueue &&
) =delete

Public Attributes Documentation

variable DEFAULT_CAPACITY

static size_t DEFAULT_CAPACITY = 128 * 1024;

Default 128KB per thread; overridden by YAML queue_capacity when configured.

variable DEFAULT_BYTES_PER_BATCH

static size_t DEFAULT_BYTES_PER_BATCH = 4096;

Update writer/reader atomics every N bytes to amortize cache-coherency traffic.


Updated on 2026-05-26 at 05:42:33 +0000