chronon::sender::MultiProducerQueueAdapter
#include <MessageQueue.hpp>
Inherits from chronon::sender::IMessageQueue< T >
Public Functions
| Name | |
|---|---|
| virtual std::optional< T > | tryPop(uint64_t current_cycle) override |
| virtual size_t | size() const override |
| virtual void | setCapacity(size_t cap) override |
| bool | pushFromThread(size_t queue_id, T data, uint64_t arrive_cycle, uint32_t sender_id =0) |
| virtual bool | push(T data, uint64_t arrive_cycle) override |
| virtual std::vector< T > | popAll(uint64_t current_cycle) override |
| virtual std::optional< uint64_t > | minArrivalCycle() const override |
| virtual bool | hasReady(uint64_t current_cycle) const override |
| size_t | getQueueIdForThread(size_t thread_id) const |
| bool | fullForThread(size_t queue_id) const |
| virtual bool | full() const override |
| virtual bool | empty() const override |
| virtual void | clear() override |
| virtual size_t | capacity() const override |
| virtual size_t | available() const override |
| size_t | addProducerThread(size_t thread_id) |
| MultiProducerQueueAdapter(size_t capacity =std::numeric_limits< size_t >::max()) |
Additional inherited members
Public Functions inherited from chronon::sender::IMessageQueue< T >
| Name | |
|---|---|
| virtual | ~IMessageQueue() =default |
Detailed Description
template <typename T >
class chronon::sender::MultiProducerQueueAdapter;
MultiProducerQueueAdapter - Lock-free MPSC via per-thread SPSC queues.
Each source thread gets its own LockFreeMessageQueue to the consumer. This is thread-safe because:
- Each thread has its own dedicated queue (no concurrent push to same queue)
- Units on the same thread share a queue but execute sequentially
- Consumer pops via a k-way merge keyed on (arrive_cycle, queue_id) for deterministic, simulated-time-ordered delivery.
Use when multiple threads write to the same InPort.
Ordering guarantees:
- Primary key: arrive_cycle (lowest first).
- Tiebreak key: queue_id (lowest first). queue_id is assigned by MultiProducerQueueAdapter::addProducerThread in the order producer threads are discovered during TickSimulation::optimizeConnectionQueuesForThreads, which iterates a std::set<size_t>. This makes ordering reproducible run-to-run for a fixed num_workers, but NOT num_workers-invariant: same-cycle cross-thread ties may resolve differently between single-threaded (priority_queue sequence) and multi-threaded (queue_id) modes. For full num_workers-invariant replay, use PortPolicy::LegacyFastPath with MessageQueueAdapter (priority_queue), at the cost of a mutex on push.
Correctness prerequisite:
- Each per-thread LockFreeMessageQueue must be pushed with non-decreasing arrive_cycle (i.e., producer pushes arrive_cycle = X + const_delay where X is monotonically advancing). This holds for any TickableUnit whose push site is
OutPort::send(data)with fixed delay — the standard pattern. Mixed-delay producers must not route through MultiProducerQueueAdapter.
Public Functions Documentation
function tryPop
inline virtual std::optional< T > tryPop(
uint64_t current_cycle
) override
Reimplements: chronon::sender::IMessageQueue::tryPop
function size
inline virtual size_t size() const override
Reimplements: chronon::sender::IMessageQueue::size
function setCapacity
inline virtual void setCapacity(
size_t cap
) override
Reimplements: chronon::sender::IMessageQueue::setCapacity
Set the soft user-visible capacity.
Per-thread physical ring capacity (USABLE_CAPACITY) is unchanged. Only the soft aggregate gate is updated. pushFromThread() still does NOT enforce this cap — per-thread ring fullness (fullForThread()) continues to govern push failure on the push path to avoid wall-clock races between producer threads. The MPSC arbiter consults full() before admitting a message.
function pushFromThread
inline bool pushFromThread(
size_t queue_id,
T data,
uint64_t arrive_cycle,
uint32_t sender_id =0
)
Push using thread's queue ID.
Thread-safe: each thread has its own queue. Multiple units on same thread share same queue - OK because they run sequentially.
function push
inline virtual bool push(
T data,
uint64_t arrive_cycle
) override
Reimplements: chronon::sender::IMessageQueue::push
function popAll
inline virtual std::vector< T > popAll(
uint64_t current_cycle
) override
Reimplements: chronon::sender::IMessageQueue::popAll
function minArrivalCycle
inline virtual std::optional< uint64_t > minArrivalCycle() const override
Reimplements: chronon::sender::IMessageQueue::minArrivalCycle
function hasReady
inline virtual bool hasReady(
uint64_t current_cycle
) const override
Reimplements: chronon::sender::IMessageQueue::hasReady
function getQueueIdForThread
inline size_t getQueueIdForThread(
size_t thread_id
) const
Get the queue ID for a given thread.
function fullForThread
inline bool fullForThread(
size_t queue_id
) const
function full
inline virtual bool full() const override
Reimplements: chronon::sender::IMessageQueue::full
function empty
inline virtual bool empty() const override
Reimplements: chronon::sender::IMessageQueue::empty
function clear
inline virtual void clear() override
Reimplements: chronon::sender::IMessageQueue::clear
function capacity
inline virtual size_t capacity() const override
Reimplements: chronon::sender::IMessageQueue::capacity
function available
inline virtual size_t available() const override
Reimplements: chronon::sender::IMessageQueue::available
function addProducerThread
inline size_t addProducerThread(
size_t thread_id
)
Parameters:
- thread_id The thread ID (from cluster assignment)
Return: Queue ID for this thread (used in pushFromThread)
Register a new source thread and create its queue.
function MultiProducerQueueAdapter
inline explicit MultiProducerQueueAdapter(
size_t capacity =std::numeric_limits< size_t >::max()
)
Construct an MPSC adapter with an initial user-visible capacity.
The per-thread physical LockFreeMessageQueue ring capacity (USABLE_CAPACITY, typically 4095 slots) is fixed and remains unchanged. user_capacity_ is a soft aggregate gate consulted by capacity()/full(); it is only honored by the arbiter/canAccept layer before admitting a push. The push path itself (pushFromThread) does NOT enforce user_capacity_ — enforcing it there would reintroduce a wall-clock race where two producer threads racing against each other could both see size < cap and both succeed even though their joint push exceeds the soft cap.
If the user does not override capacity explicitly, we still want full() to never fire for a well-sized system, so the default behaves like the legacy physical aggregate (effectively unlimited for typical configurations).
Updated on 2026-05-26 at 05:42:32 +0000