Skip to main content

chronon::sender::MultiProducerQueueAdapter

More...

#include <MessageQueue.hpp>

Inherits from chronon::sender::IMessageQueue< T >

Public Functions

Name
virtual std::optional< T >tryPop(uint64_t current_cycle) override
virtual size_tsize() const override
virtual voidsetCapacity(size_t cap) override
boolpushFromThread(size_t queue_id, T data, uint64_t arrive_cycle, uint32_t sender_id =0)
virtual boolpush(T data, uint64_t arrive_cycle) override
virtual std::vector< T >popAll(uint64_t current_cycle) override
virtual std::optional< uint64_t >minArrivalCycle() const override
virtual boolhasReady(uint64_t current_cycle) const override
size_tgetQueueIdForThread(size_t thread_id) const
boolfullForThread(size_t queue_id) const
virtual boolfull() const override
virtual boolempty() const override
virtual voidclear() override
virtual size_tcapacity() const override
virtual size_tavailable() const override
size_taddProducerThread(size_t thread_id)
MultiProducerQueueAdapter(size_t capacity =std::numeric_limits< size_t >::max())

Additional inherited members

Public Functions inherited from chronon::sender::IMessageQueue< T >

Name
virtual~IMessageQueue() =default

Detailed Description

template <typename T >
class chronon::sender::MultiProducerQueueAdapter;

MultiProducerQueueAdapter - Lock-free MPSC via per-thread SPSC queues.

Each source thread gets its own LockFreeMessageQueue to the consumer. This is thread-safe because:

  • Each thread has its own dedicated queue (no concurrent push to same queue)
  • Units on the same thread share a queue but execute sequentially
  • Consumer pops via a k-way merge keyed on (arrive_cycle, queue_id) for deterministic, simulated-time-ordered delivery.

Use when multiple threads write to the same InPort.

Ordering guarantees:

  • Primary key: arrive_cycle (lowest first).
  • Tiebreak key: queue_id (lowest first). queue_id is assigned by MultiProducerQueueAdapter::addProducerThread in the order producer threads are discovered during TickSimulation::optimizeConnectionQueuesForThreads, which iterates a std::set<size_t>. This makes ordering reproducible run-to-run for a fixed num_workers, but NOT num_workers-invariant: same-cycle cross-thread ties may resolve differently between single-threaded (priority_queue sequence) and multi-threaded (queue_id) modes. For full num_workers-invariant replay, use PortPolicy::LegacyFastPath with MessageQueueAdapter (priority_queue), at the cost of a mutex on push.

Correctness prerequisite:

  • Each per-thread LockFreeMessageQueue must be pushed with non-decreasing arrive_cycle (i.e., producer pushes arrive_cycle = X + const_delay where X is monotonically advancing). This holds for any TickableUnit whose push site is OutPort::send(data) with fixed delay — the standard pattern. Mixed-delay producers must not route through MultiProducerQueueAdapter.

Public Functions Documentation

function tryPop

inline virtual std::optional< T > tryPop(
uint64_t current_cycle
) override

Reimplements: chronon::sender::IMessageQueue::tryPop

function size

inline virtual size_t size() const override

Reimplements: chronon::sender::IMessageQueue::size

function setCapacity

inline virtual void setCapacity(
size_t cap
) override

Reimplements: chronon::sender::IMessageQueue::setCapacity

Set the soft user-visible capacity.

Per-thread physical ring capacity (USABLE_CAPACITY) is unchanged. Only the soft aggregate gate is updated. pushFromThread() still does NOT enforce this cap — per-thread ring fullness (fullForThread()) continues to govern push failure on the push path to avoid wall-clock races between producer threads. The MPSC arbiter consults full() before admitting a message.

function pushFromThread

inline bool pushFromThread(
size_t queue_id,
T data,
uint64_t arrive_cycle,
uint32_t sender_id =0
)

Push using thread's queue ID.

Thread-safe: each thread has its own queue. Multiple units on same thread share same queue - OK because they run sequentially.

function push

inline virtual bool push(
T data,
uint64_t arrive_cycle
) override

Reimplements: chronon::sender::IMessageQueue::push

function popAll

inline virtual std::vector< T > popAll(
uint64_t current_cycle
) override

Reimplements: chronon::sender::IMessageQueue::popAll

function minArrivalCycle

inline virtual std::optional< uint64_t > minArrivalCycle() const override

Reimplements: chronon::sender::IMessageQueue::minArrivalCycle

function hasReady

inline virtual bool hasReady(
uint64_t current_cycle
) const override

Reimplements: chronon::sender::IMessageQueue::hasReady

function getQueueIdForThread

inline size_t getQueueIdForThread(
size_t thread_id
) const

Get the queue ID for a given thread.

function fullForThread

inline bool fullForThread(
size_t queue_id
) const

function full

inline virtual bool full() const override

Reimplements: chronon::sender::IMessageQueue::full

function empty

inline virtual bool empty() const override

Reimplements: chronon::sender::IMessageQueue::empty

function clear

inline virtual void clear() override

Reimplements: chronon::sender::IMessageQueue::clear

function capacity

inline virtual size_t capacity() const override

Reimplements: chronon::sender::IMessageQueue::capacity

function available

inline virtual size_t available() const override

Reimplements: chronon::sender::IMessageQueue::available

function addProducerThread

inline size_t addProducerThread(
size_t thread_id
)

Parameters:

  • thread_id The thread ID (from cluster assignment)

Return: Queue ID for this thread (used in pushFromThread)

Register a new source thread and create its queue.

function MultiProducerQueueAdapter

inline explicit MultiProducerQueueAdapter(
size_t capacity =std::numeric_limits< size_t >::max()
)

Construct an MPSC adapter with an initial user-visible capacity.

The per-thread physical LockFreeMessageQueue ring capacity (USABLE_CAPACITY, typically 4095 slots) is fixed and remains unchanged. user_capacity_ is a soft aggregate gate consulted by capacity()/full(); it is only honored by the arbiter/canAccept layer before admitting a push. The push path itself (pushFromThread) does NOT enforce user_capacity_ — enforcing it there would reintroduce a wall-clock race where two producer threads racing against each other could both see size < cap and both succeed even though their joint push exceeds the soft cap.

If the user does not override capacity explicitly, we still want full() to never fire for a well-sized system, so the default behaves like the legacy physical aggregate (effectively unlimited for typical configurations).


Updated on 2026-05-26 at 05:42:32 +0000