chronon::InPort
#include <InPort.hpp>
Inherits from chronon::sender::PortBase, chronon::sender::IArbitratablePort
Public Types
| Name | |
|---|---|
| using detail::PortEnvelope< T > | StoredMessage |
Public Functions
| Name | |
|---|---|
| void | useSingleThreadQueue() |
| void | useMultiProducerQueue() |
| void | useLockFreeQueue() |
| std::optional< T > | tryReceive(uint64_t current_cycle) |
| void | setCapacity(size_t capacity) |
| virtual void | setArbitrationProgressPointers(std::vector< const std::atomic< uint64_t > * > ptrs) override |
| void | resetSelectiveCancellation() |
| size_t | registerProducerThread(size_t thread_id) |
| void | registerMPSCConnection(ConnectionBase * conn) |
| std::vector< T > | receiveAll(uint64_t current_cycle) |
| std::vector< T > | receiveAll() Receive all messages ready at the owning Unit's current local cycle. |
| size_t | queuedMessageCount() const |
| bool | pushToThreadQueueCancelable(size_t queue_id, T data, uint64_t arrive_cycle, const std::atomic< uint64_t > * cancel_epoch, uint64_t epoch_snapshot, uint64_t enqueue_cycle =0, uint32_t sender_id =0) Enqueue a cancelable message to a specific thread queue (MPSC mode). |
| bool | pushToThreadQueue(size_t queue_id, T data, uint64_t arrive_cycle, uint64_t enqueue_cycle =0, uint32_t sender_id =0) |
| PortPolicy | policy() const |
| std::optional< uint64_t > | minArrivalCycle() const Earliest arrival cycle of pending messages, used for lookahead. |
| bool | isMultiProducerMode() const True if the port is in multi-producer mode. |
| bool | isFull() const |
| bool | hasMessages() const True if messages are ready at the owning Unit's current local cycle. |
| bool | hasMPSCConnections() const |
| bool | hasData(uint64_t current_cycle) const |
| size_t | getQueueIdForThread(size_t thread_id) const Get the queue ID for a given source thread (multi-producer mode only). |
| void | flush() Drop all queued messages (including future arrivals). |
| bool | enqueueCancelable(T data, uint64_t arrive_cycle, const std::atomic< uint64_t > * cancel_epoch, uint64_t epoch_snapshot) |
| bool | enqueueCancelable(T data, uint64_t arrive_cycle, const std::atomic< uint64_t > * cancel_epoch, uint64_t epoch_snapshot, uint64_t enqueue_cycle) |
| size_t | effectiveCapacityPublic() const |
| void | configureForSourceThreadCount(size_t source_thread_count) |
| virtual void | clearPendingMessages() override |
| size_t | capacity() const |
| template <auto KeyFn,typename K > void | cancelYoungerThan(K watermark) Selectively cancel in-flight messages where KeyFn(data) > watermark. |
| template <auto KeyFn,typename MinK ,typename MaxK > void | cancelOutsideInclusive(MinK min_keep, MaxK max_keep) Keep only keys in [min_keep, max_keep] for current in-flight generation. |
| template <auto KeyFn,typename K > void | cancelOlderThan(K watermark) |
| bool | canAcceptThreadQueueFromProducer(size_t queue_id, size_t pending) const |
| bool | canAcceptThreadQueue(size_t queue_id) const |
| bool | canAcceptOnThreadQueue(size_t queue_id, size_t pending =0) const |
| bool | canAcceptFromProducer(size_t pending) const |
| bool | canAccept(size_t pending =0) const True if this producer can push again in the current simulated cycle. |
| size_t | available() const |
| virtual void | arbitrateMPSCConsumerDriven() override |
| virtual void | arbitrateMPSC() override |
| virtual void * | arbitratablePortKey() override |
| size_t | admissionCapacity() const |
| InPort(Unit * owner, std::string name, size_t capacity =UNLIMITED_CAPACITY, PortPolicy policy =PortPolicy::LegacyFastPath) | |
| InPort(Unit * owner, std::string name, PortPolicy policy) Convenience constructor: specify policy without setting capacity. |
Public Attributes
| Name | |
|---|---|
| size_t | UNLIMITED_CAPACITY |
Additional inherited members
Public Functions inherited from chronon::sender::PortBase
| Name | |
|---|---|
| virtual | ~PortBase() =default |
| Unit * | owner() const |
| const std::string & | name() const |
Protected Functions inherited from chronon::sender::PortBase
| Name | |
|---|---|
| PortBase(Unit * owner, std::string name) |
Protected Attributes inherited from chronon::sender::PortBase
| Name | |
|---|---|
| Unit * | owner_ |
| std::string | name_ |
Public Functions inherited from chronon::sender::IArbitratablePort
| Name | |
|---|---|
| virtual | ~IArbitratablePort() =default |
Detailed Description
template <typename T >
class chronon::InPort;
InPort - Receives data from connected OutPorts.
Features:
- Timestamped message queue for deterministic delivery
- Synchronous tryReceive for tick-based units
Usage: InPort
// In tick() method if (auto value = in.tryReceive(localCycle())) { process(*value); }
Public Types Documentation
using StoredMessage
using chronon::sender::InPort< T >::StoredMessage = detail::PortEnvelope<T>;
Public Functions Documentation
function useSingleThreadQueue
inline void useSingleThreadQueue()
Switch to single-thread queue (no mutex overhead).
Call this during initialization when both producer and consumer are determined to be on the same thread.
WARNING: Must be called before simulation starts (queue must be empty).
function useMultiProducerQueue
inline void useMultiProducerQueue()
Switch to multi-producer queue mode.
Call this during initialization for cross-thread connections where multiple source threads write to this port. Creates per-thread SPSC queues polled by consumer.
function useLockFreeQueue
inline void useLockFreeQueue()
Switch to lock-free SPSC queue.
Call this during initialization for cross-thread connections where there is only ONE source thread writing to this port. Uses atomic operations instead of mutex.
function tryReceive
inline std::optional< T > tryReceive(
uint64_t current_cycle
)
Parameters:
- current_cycle The current simulation cycle
Return: The message if available, std::nullopt otherwise
Try to receive a message synchronously.
function setCapacity
inline void setCapacity(
size_t capacity
)
function setArbitrationProgressPointers
inline virtual void setArbitrationProgressPointers(
std::vector< const std::atomic< uint64_t > * > ptrs
) override
Reimplements: chronon::sender::IArbitratablePort::setArbitrationProgressPointers
Install the set of completed_cycle atomics for the predecessor threads feeding this InPort's MPSC connections. Called at TickSimulation::initialize() once the thread_progress_array_ has been allocated. An empty set (e.g. under Sequential or Barrier execution) makes arbitrateMPSCConsumerDriven() degrade to the legacy unbounded arbiter — safe because those modes only call it when producers are known to have finished their cycle.
function resetSelectiveCancellation
inline void resetSelectiveCancellation()
Clear receiver-side selective cancellation bounds and extractor.
StageSelective: this is a NO-OP. Predicates are scoped per-flush and retired automatically pop-driven by shouldCancel(). Clearing live predicates here would re-open the #7 overlapping-flush zombie bug: a second flush's reset would erase the first flush's strict max_keep, allowing zombies enqueued between the two flushes to bypass the stricter predicate. The caller is expected to continue invoking resetSelectiveCancellation() + cancelYoungerThan() in that order (for backward compat with the LegacyFastPath path) — we simply ignore the reset and let install() accumulate predicates into the slots array.
function registerProducerThread
inline size_t registerProducerThread(
size_t thread_id
)
Parameters:
- thread_id The source thread ID
Return: Queue ID for this thread (used in pushToThreadQueue)
Register a source thread and get its queue ID.
Only valid in multi-producer mode.
function registerMPSCConnection
inline void registerMPSCConnection(
ConnectionBase * conn
)
Register an MPSC-mode Connection with this InPort so that the cycle-boundary arbiter can drain its staging deque. Connections are kept sorted by conn_id to give the arbiter a topology-stable fixed- priority order independent of num_workers or partition layout.
function receiveAll
inline std::vector< T > receiveAll(
uint64_t current_cycle
)
Parameters:
- current_cycle The current simulation cycle
Return: Vector of all ready messages
Receive all available messages.
function receiveAll
inline std::vector< T > receiveAll()
Receive all messages ready at the owning Unit's current local cycle.
function queuedMessageCount
inline size_t queuedMessageCount() const
function pushToThreadQueueCancelable
inline bool pushToThreadQueueCancelable(
size_t queue_id,
T data,
uint64_t arrive_cycle,
const std::atomic< uint64_t > * cancel_epoch,
uint64_t epoch_snapshot,
uint64_t enqueue_cycle =0,
uint32_t sender_id =0
)
Enqueue a cancelable message to a specific thread queue (MPSC mode).
function pushToThreadQueue
inline bool pushToThreadQueue(
size_t queue_id,
T data,
uint64_t arrive_cycle,
uint64_t enqueue_cycle =0,
uint32_t sender_id =0
)
Parameters:
- queue_id The queue ID (from registerProducerThread)
- data The message data
- arrive_cycle When the message should be delivered
Return: true if enqueued, false if queue full
Push to a specific thread's queue in multi-producer mode.
function policy
inline PortPolicy policy() const
Return: the cancellation policy for this InPort.
function minArrivalCycle
inline std::optional< uint64_t > minArrivalCycle() const
Earliest arrival cycle of pending messages, used for lookahead.
function isMultiProducerMode
inline bool isMultiProducerMode() const
True if the port is in multi-producer mode.
function isFull
inline bool isFull() const
True if the zero-pending admission check would reject another push. Prefer !canAccept() in new code.
function hasMessages
inline bool hasMessages() const
True if messages are ready at the owning Unit's current local cycle.
function hasMPSCConnections
inline bool hasMPSCConnections() const
function hasData
inline bool hasData(
uint64_t current_cycle
) const
function getQueueIdForThread
inline size_t getQueueIdForThread(
size_t thread_id
) const
Get the queue ID for a given source thread (multi-producer mode only).
function flush
inline void flush()
Drop all queued messages (including future arrivals).
function enqueueCancelable
inline bool enqueueCancelable(
T data,
uint64_t arrive_cycle,
const std::atomic< uint64_t > * cancel_epoch,
uint64_t epoch_snapshot
)
Enqueue a cancelable message for delivery.
Used by Connection to support OutPort::cancelInFlight().
function enqueueCancelable
inline bool enqueueCancelable(
T data,
uint64_t arrive_cycle,
const std::atomic< uint64_t > * cancel_epoch,
uint64_t epoch_snapshot,
uint64_t enqueue_cycle
)
Cancelable enqueue with explicit enqueue_cycle stamp.
Connection::transfer() uses this overload (computed as send_cycle, which is the producer's localCycle) so that StageSelective predicates can decide whether the message predates the most recent flush.
function effectiveCapacityPublic
inline size_t effectiveCapacityPublic() const
function configureForSourceThreadCount
inline void configureForSourceThreadCount(
size_t source_thread_count
)
Select a queue implementation from the number of source threads.
This is a semantic wrapper for manual setup. TickSimulation performs this optimization automatically during initialization.
function clearPendingMessages
inline virtual void clearPendingMessages() override
Reimplements: chronon::sender::PortBase::clearPendingMessages
Clear all pending messages (type-erased override).
Enables iterating unit->ports() and clearing all InPort queues without knowing the template type. Used for post-profiling reset.
function capacity
inline size_t capacity() const
function cancelYoungerThan
template <auto KeyFn,
typename K >
inline void cancelYoungerThan(
K watermark
)
Selectively cancel in-flight messages where KeyFn(data) > watermark.
function cancelOutsideInclusive
template <auto KeyFn,
typename MinK ,
typename MaxK >
inline void cancelOutsideInclusive(
MinK min_keep,
MaxK max_keep
)
Keep only keys in [min_keep, max_keep] for current in-flight generation.
function cancelOlderThan
template <auto KeyFn,
typename K >
inline void cancelOlderThan(
K watermark
)
Selectively cancel in-flight messages where KeyFn(data) < watermark.
Receiver-side selective cancellation defaults to in-flight scope: the first cancellation call captures the current enqueue generation so future messages are unaffected.
function canAcceptThreadQueueFromProducer
inline bool canAcceptThreadQueueFromProducer(
size_t queue_id,
size_t pending
) const
function canAcceptThreadQueue
inline bool canAcceptThreadQueue(
size_t queue_id
) const
function canAcceptOnThreadQueue
inline bool canAcceptOnThreadQueue(
size_t queue_id,
size_t pending =0
) const
Check if a specific producer queue can accept data (MPSC mode).
Falls back to port-level canAccept in non-MPSC mode.
function canAcceptFromProducer
inline bool canAcceptFromProducer(
size_t pending
) const
Producer-side canAccept: is there room for another push this cycle?
Rate-based per-cycle admission: a producer may push up to effectiveCapacity_() items per simulated cycle. The bound is tracked by Connection::pushes_this_cycle_ (passed as pending) and reset at each cycle advance. This matches RTL pipeline-register semantics — each clock edge captures up to capacity parallel values regardless of whether the receiver has drained previous cycles' captures — and is intrinsically race-free: the value of pending is a per-producer-connection cycle-local counter, never read from the live queue that the consumer mutates on a parallel thread.
This replaces the previous queue_->size() < capacity live-size check, which had two failure modes:
- In parallel mode (num_workers>=2), the consumer's mid-cycle pops could let the producer "win the race" and push beyond the intended bound — exposing cycle-count drift across num_workers.
- In sequential mode (num_workers=1), the producer ticks before the consumer, so the live queue still holds last cycle's batch (capacity items waiting for this cycle's pop). The producer saw size == capacity and back-pressured itself every cycle, inserting a spurious 1-cycle stall per pipeline stage.
Total queue growth is still bounded: the consumer's per-cycle drain keeps the queue at steady-state occupancy. Pathological consumer stalls are caught by the underlying ring's physical capacity (USABLE_CAPACITY in LockFreeMessageQueue). For MPSC fan-in, the per-connection staging deque is bounded by the InPort's capacity, giving multi-producer back-pressure without a shared-queue race.
function canAccept
inline bool canAccept(
size_t pending =0
) const
True if this producer can push again in the current simulated cycle.
function available
inline size_t available() const
function arbitrateMPSCConsumerDriven
inline virtual void arbitrateMPSCConsumerDriven() override
Reimplements: chronon::sender::PortBase::arbitrateMPSCConsumerDriven
Consumer-tick-driven arbitration (Option 1, see docs/mpsc-atomic-publish.md). Called by TickableUnit::executeTick on the consumer thread, immediately before the user's tick() body. Computes the safe drain bound S = min over predecessor threads of completed_cycle (acquire) and drains staging entries with enqueue_cycle <= S in conn_id order.
A no-op if the port has no MPSC connections, if the producer- thread set was never resolved (progress-based sync off), or if S has not advanced since the last arbitration.
function arbitrateMPSC
inline virtual void arbitrateMPSC() override
Reimplements: chronon::sender::IArbitratablePort::arbitrateMPSC
Per-cycle MPSC admission arbitration.
Called by the scheduler at each cycle boundary on InPorts that have at least one MPSC connection registered. Iterates the connections in conn_id-ascending order (stable across num_workers since conn_id is assigned at topology construction) and lets each drain its full staging deque into the shared queue.
The user_cap on the InPort governs per-producer staging depth (a Connection's staging deque is bounded at user_cap, giving the producer back-pressure when full); the arbiter itself does NOT cap aggregate per-cycle admission. A strict aggregate-cap budget here would starve lower-conn_id connections under heavy multi-producer fan-in: with N producers each pushing at the cap and a budget of cap/cycle, only the highest-priority producer makes progress. Bounding admission via per-producer staging plus consumer drain preserves both determinism (conn_id ordering of admission) and fairness (every producer that has unblocked staging makes progress every cycle).
function arbitratablePortKey
inline virtual void * arbitratablePortKey() override
Reimplements: chronon::sender::IArbitratablePort::arbitratablePortKey
Opaque identity for this arbitrable port. Returned by InPort as its this pointer (same value as destPortPtr() on connections). Used by TickSimulation to join MPSC InPorts against per-port producer- thread tables at init time.
function admissionCapacity
inline size_t admissionCapacity() const
Effective per-cycle admission bound visible to producer-side logic.
Connection::transfer consults this to enforce the same rate limit as canAccept() on the push path.
function InPort
inline InPort(
Unit * owner,
std::string name,
size_t capacity =UNLIMITED_CAPACITY,
PortPolicy policy =PortPolicy::LegacyFastPath
)
Parameters:
- owner The unit that owns this port
- name The port name (for debugging)
- capacity Maximum queue capacity (default unlimited)
Create an input port.
Automatically registers with PortDirectory when owner's TreeNode is set.
function InPort
inline InPort(
Unit * owner,
std::string name,
PortPolicy policy
)
Convenience constructor: specify policy without setting capacity.
Public Attributes Documentation
variable UNLIMITED_CAPACITY
static size_t UNLIMITED_CAPACITY = MessageQueue<StoredMessage>::UNLIMITED_CAPACITY;
Updated on 2026-05-26 at 05:42:32 +0000