Skip to main content

chronon::sender::InPort

More...

#include <InPort.hpp>

Inherits from chronon::sender::PortBase, chronon::sender::IArbitratablePort

Public Types

Name
using detail::PortEnvelope< T >StoredMessage

Public Functions

Name
voiduseSingleThreadQueue()
voiduseMultiProducerQueue()
voiduseLockFreeQueue()
std::optional< T >tryReceive(uint64_t current_cycle)
voidsetCapacity(size_t capacity)
virtual voidsetArbitrationProgressPointers(std::vector< const std::atomic< uint64_t > * > ptrs) override
voidresetSelectiveCancellation()
size_tregisterProducerThread(size_t thread_id)
voidregisterMPSCConnection(ConnectionBase * conn)
std::vector< T >receiveAll(uint64_t current_cycle)
std::vector< T >receiveAll()
Receive all messages ready at the owning Unit's current local cycle.
size_tqueuedMessageCount() const
boolpushToThreadQueueCancelable(size_t queue_id, T data, uint64_t arrive_cycle, const std::atomic< uint64_t > * cancel_epoch, uint64_t epoch_snapshot, uint64_t enqueue_cycle =0, uint32_t sender_id =0)
Enqueue a cancelable message to a specific thread queue (MPSC mode).
boolpushToThreadQueue(size_t queue_id, T data, uint64_t arrive_cycle, uint64_t enqueue_cycle =0, uint32_t sender_id =0)
PortPolicypolicy() const
std::optional< uint64_t >minArrivalCycle() const
Earliest arrival cycle of pending messages, used for lookahead.
boolisMultiProducerMode() const
True if the port is in multi-producer mode.
boolisFull() const
boolhasMessages() const
True if messages are ready at the owning Unit's current local cycle.
boolhasMPSCConnections() const
boolhasData(uint64_t current_cycle) const
size_tgetQueueIdForThread(size_t thread_id) const
Get the queue ID for a given source thread (multi-producer mode only).
voidflush()
Drop all queued messages (including future arrivals).
boolenqueueCancelable(T data, uint64_t arrive_cycle, const std::atomic< uint64_t > * cancel_epoch, uint64_t epoch_snapshot)
boolenqueueCancelable(T data, uint64_t arrive_cycle, const std::atomic< uint64_t > * cancel_epoch, uint64_t epoch_snapshot, uint64_t enqueue_cycle)
size_teffectiveCapacityPublic() const
voidconfigureForSourceThreadCount(size_t source_thread_count)
virtual voidclearPendingMessages() override
size_tcapacity() const
template <auto KeyFn,typename K >
void
cancelYoungerThan(K watermark)
Selectively cancel in-flight messages where KeyFn(data) > watermark.
template <auto KeyFn,typename MinK ,typename MaxK >
void
cancelOutsideInclusive(MinK min_keep, MaxK max_keep)
Keep only keys in [min_keep, max_keep] for current in-flight generation.
template <auto KeyFn,typename K >
void
cancelOlderThan(K watermark)
boolcanAcceptThreadQueueFromProducer(size_t queue_id, size_t pending) const
boolcanAcceptThreadQueue(size_t queue_id) const
boolcanAcceptOnThreadQueue(size_t queue_id, size_t pending =0) const
boolcanAcceptFromProducer(size_t pending) const
boolcanAccept(size_t pending =0) const
True if this producer can push again in the current simulated cycle.
size_tavailable() const
virtual voidarbitrateMPSCConsumerDriven() override
virtual voidarbitrateMPSC() override
virtual void *arbitratablePortKey() override
size_tadmissionCapacity() const
InPort(Unit * owner, std::string name, size_t capacity =UNLIMITED_CAPACITY, PortPolicy policy =PortPolicy::LegacyFastPath)
InPort(Unit * owner, std::string name, PortPolicy policy)
Convenience constructor: specify policy without setting capacity.

Public Attributes

Name
size_tUNLIMITED_CAPACITY

Additional inherited members

Public Functions inherited from chronon::sender::PortBase

Name
virtual~PortBase() =default
Unit *owner() const
const std::string &name() const

Protected Functions inherited from chronon::sender::PortBase

Name
PortBase(Unit * owner, std::string name)

Protected Attributes inherited from chronon::sender::PortBase

Name
Unit *owner_
std::stringname_

Public Functions inherited from chronon::sender::IArbitratablePort

Name
virtual~IArbitratablePort() =default

Detailed Description

template <typename T >
class chronon::sender::InPort;

InPort - Receives data from connected OutPorts.

Features:

  • Timestamped message queue for deterministic delivery
  • Synchronous tryReceive for tick-based units

Usage: InPort in{this, "in"};

// In tick() method if (auto value = in.tryReceive(localCycle())) { process(*value); }

Public Types Documentation

using StoredMessage

using chronon::sender::InPort< T >::StoredMessage = detail::PortEnvelope<T>;

Public Functions Documentation

function useSingleThreadQueue

inline void useSingleThreadQueue()

Switch to single-thread queue (no mutex overhead).

Call this during initialization when both producer and consumer are determined to be on the same thread.

WARNING: Must be called before simulation starts (queue must be empty).

function useMultiProducerQueue

inline void useMultiProducerQueue()

Switch to multi-producer queue mode.

Call this during initialization for cross-thread connections where multiple source threads write to this port. Creates per-thread SPSC queues polled by consumer.

function useLockFreeQueue

inline void useLockFreeQueue()

Switch to lock-free SPSC queue.

Call this during initialization for cross-thread connections where there is only ONE source thread writing to this port. Uses atomic operations instead of mutex.

function tryReceive

inline std::optional< T > tryReceive(
uint64_t current_cycle
)

Parameters:

  • current_cycle The current simulation cycle

Return: The message if available, std::nullopt otherwise

Try to receive a message synchronously.

function setCapacity

inline void setCapacity(
size_t capacity
)

function setArbitrationProgressPointers

inline virtual void setArbitrationProgressPointers(
std::vector< const std::atomic< uint64_t > * > ptrs
) override

Reimplements: chronon::sender::IArbitratablePort::setArbitrationProgressPointers

Install the set of completed_cycle atomics for the predecessor threads feeding this InPort's MPSC connections. Called at TickSimulation::initialize() once the thread_progress_array_ has been allocated. An empty set (e.g. under Sequential or Barrier execution) makes arbitrateMPSCConsumerDriven() degrade to the legacy unbounded arbiter — safe because those modes only call it when producers are known to have finished their cycle.

function resetSelectiveCancellation

inline void resetSelectiveCancellation()

Clear receiver-side selective cancellation bounds and extractor.

StageSelective: this is a NO-OP. Predicates are scoped per-flush and retired automatically pop-driven by shouldCancel(). Clearing live predicates here would re-open the #7 overlapping-flush zombie bug: a second flush's reset would erase the first flush's strict max_keep, allowing zombies enqueued between the two flushes to bypass the stricter predicate. The caller is expected to continue invoking resetSelectiveCancellation() + cancelYoungerThan() in that order (for backward compat with the LegacyFastPath path) — we simply ignore the reset and let install() accumulate predicates into the slots array.

function registerProducerThread

inline size_t registerProducerThread(
size_t thread_id
)

Parameters:

  • thread_id The source thread ID

Return: Queue ID for this thread (used in pushToThreadQueue)

Register a source thread and get its queue ID.

Only valid in multi-producer mode.

function registerMPSCConnection

inline void registerMPSCConnection(
ConnectionBase * conn
)

Register an MPSC-mode Connection with this InPort so that the cycle-boundary arbiter can drain its staging deque. Connections are kept sorted by conn_id to give the arbiter a topology-stable fixed- priority order independent of num_workers or partition layout.

function receiveAll

inline std::vector< T > receiveAll(
uint64_t current_cycle
)

Parameters:

  • current_cycle The current simulation cycle

Return: Vector of all ready messages

Receive all available messages.

function receiveAll

inline std::vector< T > receiveAll()

Receive all messages ready at the owning Unit's current local cycle.

function queuedMessageCount

inline size_t queuedMessageCount() const

function pushToThreadQueueCancelable

inline bool pushToThreadQueueCancelable(
size_t queue_id,
T data,
uint64_t arrive_cycle,
const std::atomic< uint64_t > * cancel_epoch,
uint64_t epoch_snapshot,
uint64_t enqueue_cycle =0,
uint32_t sender_id =0
)

Enqueue a cancelable message to a specific thread queue (MPSC mode).

function pushToThreadQueue

inline bool pushToThreadQueue(
size_t queue_id,
T data,
uint64_t arrive_cycle,
uint64_t enqueue_cycle =0,
uint32_t sender_id =0
)

Parameters:

  • queue_id The queue ID (from registerProducerThread)
  • data The message data
  • arrive_cycle When the message should be delivered

Return: true if enqueued, false if queue full

Push to a specific thread's queue in multi-producer mode.

function policy

inline PortPolicy policy() const

Return: the cancellation policy for this InPort.

function minArrivalCycle

inline std::optional< uint64_t > minArrivalCycle() const

Earliest arrival cycle of pending messages, used for lookahead.

function isMultiProducerMode

inline bool isMultiProducerMode() const

True if the port is in multi-producer mode.

function isFull

inline bool isFull() const

True if the zero-pending admission check would reject another push. Prefer !canAccept() in new code.

function hasMessages

inline bool hasMessages() const

True if messages are ready at the owning Unit's current local cycle.

function hasMPSCConnections

inline bool hasMPSCConnections() const

function hasData

inline bool hasData(
uint64_t current_cycle
) const

function getQueueIdForThread

inline size_t getQueueIdForThread(
size_t thread_id
) const

Get the queue ID for a given source thread (multi-producer mode only).

function flush

inline void flush()

Drop all queued messages (including future arrivals).

function enqueueCancelable

inline bool enqueueCancelable(
T data,
uint64_t arrive_cycle,
const std::atomic< uint64_t > * cancel_epoch,
uint64_t epoch_snapshot
)

Enqueue a cancelable message for delivery.

Used by Connection to support OutPort::cancelInFlight().

function enqueueCancelable

inline bool enqueueCancelable(
T data,
uint64_t arrive_cycle,
const std::atomic< uint64_t > * cancel_epoch,
uint64_t epoch_snapshot,
uint64_t enqueue_cycle
)

Cancelable enqueue with explicit enqueue_cycle stamp.

Connection::transfer() uses this overload (computed as send_cycle, which is the producer's localCycle) so that StageSelective predicates can decide whether the message predates the most recent flush.

function effectiveCapacityPublic

inline size_t effectiveCapacityPublic() const

function configureForSourceThreadCount

inline void configureForSourceThreadCount(
size_t source_thread_count
)

Select a queue implementation from the number of source threads.

This is a semantic wrapper for manual setup. TickSimulation performs this optimization automatically during initialization.

function clearPendingMessages

inline virtual void clearPendingMessages() override

Reimplements: chronon::sender::PortBase::clearPendingMessages

Clear all pending messages (type-erased override).

Enables iterating unit->ports() and clearing all InPort queues without knowing the template type. Used for post-profiling reset.

function capacity

inline size_t capacity() const

function cancelYoungerThan

template <auto KeyFn,
typename K >
inline void cancelYoungerThan(
K watermark
)

Selectively cancel in-flight messages where KeyFn(data) > watermark.

function cancelOutsideInclusive

template <auto KeyFn,
typename MinK ,
typename MaxK >
inline void cancelOutsideInclusive(
MinK min_keep,
MaxK max_keep
)

Keep only keys in [min_keep, max_keep] for current in-flight generation.

function cancelOlderThan

template <auto KeyFn,
typename K >
inline void cancelOlderThan(
K watermark
)

Selectively cancel in-flight messages where KeyFn(data) < watermark.

Receiver-side selective cancellation defaults to in-flight scope: the first cancellation call captures the current enqueue generation so future messages are unaffected.

function canAcceptThreadQueueFromProducer

inline bool canAcceptThreadQueueFromProducer(
size_t queue_id,
size_t pending
) const

function canAcceptThreadQueue

inline bool canAcceptThreadQueue(
size_t queue_id
) const

function canAcceptOnThreadQueue

inline bool canAcceptOnThreadQueue(
size_t queue_id,
size_t pending =0
) const

Check if a specific producer queue can accept data (MPSC mode).

Falls back to port-level canAccept in non-MPSC mode.

function canAcceptFromProducer

inline bool canAcceptFromProducer(
size_t pending
) const

Producer-side canAccept: is there room for another push this cycle?

Rate-based per-cycle admission: a producer may push up to effectiveCapacity_() items per simulated cycle. The bound is tracked by Connection::pushes_this_cycle_ (passed as pending) and reset at each cycle advance. This matches RTL pipeline-register semantics — each clock edge captures up to capacity parallel values regardless of whether the receiver has drained previous cycles' captures — and is intrinsically race-free: the value of pending is a per-producer-connection cycle-local counter, never read from the live queue that the consumer mutates on a parallel thread.

This replaces the previous queue_->size() < capacity live-size check, which had two failure modes:

  • In parallel mode (num_workers>=2), the consumer's mid-cycle pops could let the producer "win the race" and push beyond the intended bound — exposing cycle-count drift across num_workers.
  • In sequential mode (num_workers=1), the producer ticks before the consumer, so the live queue still holds last cycle's batch (capacity items waiting for this cycle's pop). The producer saw size == capacity and back-pressured itself every cycle, inserting a spurious 1-cycle stall per pipeline stage.

Total queue growth is still bounded: the consumer's per-cycle drain keeps the queue at steady-state occupancy. Pathological consumer stalls are caught by the underlying ring's physical capacity (USABLE_CAPACITY in LockFreeMessageQueue). For MPSC fan-in, the per-connection staging deque is bounded by the InPort's capacity, giving multi-producer back-pressure without a shared-queue race.

function canAccept

inline bool canAccept(
size_t pending =0
) const

True if this producer can push again in the current simulated cycle.

function available

inline size_t available() const

function arbitrateMPSCConsumerDriven

inline virtual void arbitrateMPSCConsumerDriven() override

Reimplements: chronon::sender::PortBase::arbitrateMPSCConsumerDriven

Consumer-tick-driven arbitration (Option 1, see docs/mpsc-atomic-publish.md). Called by TickableUnit::executeTick on the consumer thread, immediately before the user's tick() body. Computes the safe drain bound S = min over predecessor threads of completed_cycle (acquire) and drains staging entries with enqueue_cycle <= S in conn_id order.

A no-op if the port has no MPSC connections, if the producer- thread set was never resolved (progress-based sync off), or if S has not advanced since the last arbitration.

function arbitrateMPSC

inline virtual void arbitrateMPSC() override

Reimplements: chronon::sender::IArbitratablePort::arbitrateMPSC

Per-cycle MPSC admission arbitration.

Called by the scheduler at each cycle boundary on InPorts that have at least one MPSC connection registered. Iterates the connections in conn_id-ascending order (stable across num_workers since conn_id is assigned at topology construction) and lets each drain its full staging deque into the shared queue.

The user_cap on the InPort governs per-producer staging depth (a Connection's staging deque is bounded at user_cap, giving the producer back-pressure when full); the arbiter itself does NOT cap aggregate per-cycle admission. A strict aggregate-cap budget here would starve lower-conn_id connections under heavy multi-producer fan-in: with N producers each pushing at the cap and a budget of cap/cycle, only the highest-priority producer makes progress. Bounding admission via per-producer staging plus consumer drain preserves both determinism (conn_id ordering of admission) and fairness (every producer that has unblocked staging makes progress every cycle).

function arbitratablePortKey

inline virtual void * arbitratablePortKey() override

Reimplements: chronon::sender::IArbitratablePort::arbitratablePortKey

Opaque identity for this arbitrable port. Returned by InPort as its this pointer (same value as destPortPtr() on connections). Used by TickSimulation to join MPSC InPorts against per-port producer- thread tables at init time.

function admissionCapacity

inline size_t admissionCapacity() const

Effective per-cycle admission bound visible to producer-side logic.

Connection::transfer consults this to enforce the same rate limit as canAccept() on the push path.

function InPort

inline InPort(
Unit * owner,
std::string name,
size_t capacity =UNLIMITED_CAPACITY,
PortPolicy policy =PortPolicy::LegacyFastPath
)

Parameters:

  • owner The unit that owns this port
  • name The port name (for debugging)
  • capacity Maximum queue capacity (default unlimited)

Create an input port.

Automatically registers with PortDirectory when owner's TreeNode is set.

function InPort

inline InPort(
Unit * owner,
std::string name,
PortPolicy policy
)

Convenience constructor: specify policy without setting capacity.

Public Attributes Documentation

variable UNLIMITED_CAPACITY

static size_t UNLIMITED_CAPACITY = MessageQueue<StoredMessage>::UNLIMITED_CAPACITY;

Updated on 2026-05-26 at 05:42:32 +0000