chronon::sender::Connection
#include <Connection.hpp>
Inherits from chronon::sender::ConnectionBase
Public Functions
| Name | |
|---|---|
| bool | transfer(T data, uint64_t send_cycle) |
| InPort< T > * | to() const |
| virtual Unit * | source() const override Source unit (for dependency analysis). |
| virtual void | setThreadQueueId(size_t queue_id) override |
| virtual void | setConnId(uint32_t conn_id) override |
| virtual size_t | registerProducerThread(size_t thread_id) override |
| virtual IArbitratablePort * | registerOnDestMPSC() override |
| virtual void | optimizeForSameThread() override |
| virtual void | optimizeForSPSC() override |
| virtual void | optimizeForMPSC() override |
| bool | isDestinationFull() const |
| virtual bool | hasThreadQueueId() const override True if this connection uses thread-specific queue (MPSC mode). |
| OutPort< T > * | from() const |
| virtual Unit * | destination() const override Destination unit (for dependency analysis). |
| virtual void * | destPortPtr() const override Destination port pointer (type-erased) for port-level grouping. |
| virtual uint32_t | delay() const override Delay in cycles between push and arrival. |
| virtual uint32_t | connId() const override |
| virtual void | cancelInFlight() override |
| bool | canTransfer() const True if the destination can accept data (back-pressure preflight). |
| virtual size_t | arbitrateAdmitErased(size_t budget) override |
| virtual size_t | arbitrateAdmitBoundedErased(size_t budget, uint64_t max_send_cycle) override |
| Connection(OutPort< T > * from, InPort< T > * to, uint32_t delay) |
Additional inherited members
Public Functions inherited from chronon::sender::ConnectionBase
| Name | |
|---|---|
| virtual | ~ConnectionBase() =default |
| bool | isTight() const True if this is a zero-delay (tight) connection. |
Detailed Description
template <typename T >
class chronon::sender::Connection;
Connection - Typed connection between OutPort and InPort.
Connections specify the communication delay:
- delay=0: Tight coupling, same-cycle delivery (delta cycles)
- delay>0: Loose coupling, future delivery (lookahead possible)
Usage: auto conn = sim.connect(producer->out, consumer->in, 5); // Messages sent at cycle N arrive at cycle N+5
Public Functions Documentation
function transfer
inline bool transfer(
T data,
uint64_t send_cycle
)
Parameters:
- data The data to transfer
- send_cycle The current cycle when sending
Return: true if transfer succeeded, false if destination full (back pressure)
Transfer data through the connection.
The data will arrive at the destination port after the configured delay. Uses thread-specific queue if in multi-producer mode.
function to
inline InPort< T > * to() const
function source
virtual Unit * source() const override
Source unit (for dependency analysis).
Reimplements: chronon::sender::ConnectionBase::source
function setThreadQueueId
inline virtual void setThreadQueueId(
size_t queue_id
) override
Parameters:
- queue_id The queue ID for this connection's source thread
Reimplements: chronon::sender::ConnectionBase::setThreadQueueId
Set the thread queue ID for multi-producer mode.
Called during initialization when the destination InPort is in multi-producer mode (multiple source threads writing to it). All connections from the same source thread share the same queue_id.
function setConnId
inline virtual void setConnId(
uint32_t conn_id
) override
Reimplements: chronon::sender::ConnectionBase::setConnId
Stable connection identifier assigned at simulation build time.
Equal to this connection's index in TickSimulation::connections_. Used by MultiProducerQueueAdapter as a cross-num_workers-stable tiebreaker in the k-way merge, replacing the partition-dependent queue_id. The value is deterministic given a fixed topology, regardless of thread count / cluster assignment.
function registerProducerThread
inline virtual size_t registerProducerThread(
size_t thread_id
) override
Parameters:
- thread_id Source thread identifier
Return: Queue ID for this producer thread, or SIZE_MAX on failure
Reimplements: chronon::sender::ConnectionBase::registerProducerThread
Register a producer thread for MPSC mode.
function registerOnDestMPSC
virtual IArbitratablePort * registerOnDestMPSC() override
Reimplements: chronon::sender::ConnectionBase::registerOnDestMPSC
Register this MPSC connection on its destination InPort and return the InPort's type-erased IArbitratablePort interface so the TickSimulation can drive cycle-boundary arbitration without ever knowing the Connection's message type. Returns nullptr when the connection is not in MPSC mode.
function optimizeForSameThread
inline virtual void optimizeForSameThread() override
Reimplements: chronon::sender::ConnectionBase::optimizeForSameThread
Optimize destination port for same-thread access.
Switches InPort to use lock-free SingleThreadMessageQueue. Call this during initialization when both source and destination are determined to be on the same thread (same cluster).
This eliminates mutex overhead (~18% of execution time) for intra-cluster connections.
function optimizeForSPSC
inline virtual void optimizeForSPSC() override
Reimplements: chronon::sender::ConnectionBase::optimizeForSPSC
Optimize destination port for cross-thread SPSC access.
Switches InPort to use lock-free LockFreeMessageQueue. Call this during initialization when there is exactly ONE source thread writing to the destination port on a different thread.
function optimizeForMPSC
inline virtual void optimizeForMPSC() override
Reimplements: chronon::sender::ConnectionBase::optimizeForMPSC
Optimize destination port for cross-thread MPSC access.
Switches InPort to use MultiProducerQueueAdapter with per-thread producer queues.
function isDestinationFull
inline bool isDestinationFull() const
function hasThreadQueueId
inline virtual bool hasThreadQueueId() const override
True if this connection uses thread-specific queue (MPSC mode).
Reimplements: chronon::sender::ConnectionBase::hasThreadQueueId
function from
inline OutPort< T > * from() const
function destination
virtual Unit * destination() const override
Destination unit (for dependency analysis).
Reimplements: chronon::sender::ConnectionBase::destination
function destPortPtr
inline virtual void * destPortPtr() const override
Destination port pointer (type-erased) for port-level grouping.
Reimplements: chronon::sender::ConnectionBase::destPortPtr
function delay
inline virtual uint32_t delay() const override
Delay in cycles between push and arrival.
Reimplements: chronon::sender::ConnectionBase::delay
function connId
inline virtual uint32_t connId() const override
Reimplements: chronon::sender::ConnectionBase::connId
function cancelInFlight
inline virtual void cancelInFlight() override
Reimplements: chronon::sender::ConnectionBase::cancelInFlight
Cancel all in-flight messages previously sent on this connection.
Only bumps the cancellation epoch. Staged entries are dropped lazily by the arbiter on drain (it compares each entry's epoch_snapshot against the current epoch). This matters under consumer-tick-driven arbitration (Option 1): the producer thread and the consumer-thread arbiter may access staging_ concurrently, so a producer-side staging_.clear() would race with the arbiter's drain. See R3 in docs/mpsc-atomic-publish.md.
function canTransfer
inline bool canTransfer() const
True if the destination can accept data (back-pressure preflight).
function arbitrateAdmitErased
inline virtual size_t arbitrateAdmitErased(
size_t budget
) override
Reimplements: chronon::sender::ConnectionBase::arbitrateAdmitErased
Type-erased MPSC admission helper invoked by the InPort arbiter.
Legacy unbounded variant: drains every staging entry whose epoch matches the current cancel_epoch_. Called by the main-thread arbiter path (Sequential per-cycle loop, Barrier sync_wait, lookahead epoch-end flush).
function arbitrateAdmitBoundedErased
inline virtual size_t arbitrateAdmitBoundedErased(
size_t budget,
uint64_t max_send_cycle
) override
Reimplements: chronon::sender::ConnectionBase::arbitrateAdmitBoundedErased
Cycle-bounded admission: drains every staging entry whose epoch matches AND whose enqueue_cycle <= max_send_cycle. Used by the consumer-tick-driven arbiter (Option 1): the consumer at its own localCycle computes S = min(predecessor-thread completed_cycle) and passes S here, so only entries that every producer has finished writing are admitted this tick.
function Connection
inline Connection(
OutPort< T > * from,
InPort< T > * to,
uint32_t delay
)
Parameters:
- from Source output port
- to Destination input port
- delay Number of cycles for message delivery
Create a connection with specified delay.
Updated on 2026-05-26 at 05:42:32 +0000