vllm_omni.distributed.omni_connectors ¶
Modules:
| Name | Description |
|---|---|
adapter | |
connectors | |
factory | |
kv_transfer_manager | Unified OmniConnector and KV cache transfer management. |
transfer_adapter | |
utils | |
ConnectorSpec dataclass ¶
MooncakeStoreConnector ¶
MooncakeTransferEngineConnector ¶
Bases: OmniConnectorBase
OmniConnector implementation using Mooncake Transfer Engine with a managed memory pool. Supports both CPU (Pinned) and GPU memory pools, and both RDMA and TCP protocols. Topology limitations (current implementation): Current design focuses on peer-to-peer communication between stages. - 1 sender → 1 receiver per key: After a successful RDMA write the sender immediately cleans up the buffer (cleanup()), so only the first receiver to pull a given key will succeed. Broadcast / multicast (1 sender → N receivers sharing the same data) is not yet supported. - 1 receiver → N senders: Supported via partial metadata. The manager constructs metadata with the target sender's source_host / source_port (computed from from_rank) and passes it to get(metadata=...). The connector detects that data_size is missing, queries the specified sender at the given address to fill it in, then performs the RDMA pull. This enables heterogeneous TP (sender TP > receiver TP) where a single receiver must pull KV shards from multiple sender ranks.
Future work
- Support 1 sender → N receivers (e.g. reference-counted buffers, or explicit
retain()/release()semantics so the buffer survives multiple pulls).
cleanup ¶
Release the producer-side buffer associated with the request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request_id | str | The key used in | required |
from_stage | str | None | Optional source stage. When both from_stage and to_stage are provided the method applies | None |
to_stage | str | None | Optional destination stage (see from_stage). | None |
close ¶
Gracefully shutdown the connector and release all resources. This method should be called when the connector is no longer needed. Idempotent: safe to call multiple times.
get ¶
get(
from_stage: str,
to_stage: str,
get_key: str,
metadata: dict[str, Any] | None = None,
) -> tuple[Any, int] | None
Consumer Side. Allocates from local pool and pulls data via RDMA.
Metadata resolution:
metadataprovided withdata_size→ use directly (RDMA pull).metadataprovided withsource_host/source_portbut withoutdata_size→ query that specific sender fordata_size/is_fast_path, then RDMA pull. This is the heterogeneous-TP path where the manager knows the target sender endpoint but not the payload size.metadata=None→ query the default sender (set viaupdate_sender_info()) for the full metadata.
Returns:
| Type | Description |
|---|---|
tuple[Any, int] | None |
|
tuple[Any, int] | None |
|
tuple[Any, int] | None |
|
get_connection_info ¶
Get connection info for this connector (useful for sender to share with receivers).
put ¶
put(
from_stage: str, to_stage: str, put_key: str, data: Any
) -> tuple[bool, int, dict[str, Any] | None]
Producer Side. Exposes data for RDMA transfer.
Behavior by data type: - ManagedBuffer (from this pool): Zero-copy, is_fast_path=True - ManagedBuffer (different pool): Fallback to tensor copy path - torch.Tensor / bytes: Copy to pool, is_fast_path=True - Other (dict, etc.): Serialize to bytes, copy to pool, is_fast_path=False (receiver will deserialize automatically)
update_sender_info ¶
update_sender_info(
sender_host: str,
sender_zmq_port: int,
sender_rank: int | None = None,
) -> None
Inject a sender's ZMQ endpoint into the receiver connector.
When sender_rank is None (default), sets the single default sender used by get() when no rank is specified — this preserves backward-compatible 1:1 semantics.
When sender_rank is an integer, the endpoint is stored in a per-rank registry for internal use (e.g. by _query_metadata_from_sender(sender_rank=R)).
OmniConnectorBase ¶
Bases: ABC
Base class for all OmniConnectors.
close abstractmethod ¶
Release resources held by this connector.
Subclasses must implement this to clean up transport-specific resources (connections, memory pools, threads, etc.). Implementations should be idempotent (safe to call multiple times).
deserialize_obj staticmethod ¶
Deserialize bytes to Python object using centralized serializer.
get abstractmethod ¶
get(
from_stage: str,
to_stage: str,
get_key: str,
metadata: dict[str, Any] | None = None,
) -> tuple[Any, int] | None
Retrieve Python object and payload size (bytes).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_stage | str | Source stage identifier | required |
to_stage | str | Destination stage identifier | required |
get_key | str | Unique request identifier | required |
metadata | dict[str, Any] | None | Optional transport-specific metadata. When provided, the connector uses it directly (e.g. source_host, source_port, data_size) instead of querying the sender. For heterogeneous TP the manager may supply partial metadata (host/port only); the connector will query the sender at that address to fill in data_size. | None |
Returns:
| Type | Description |
|---|---|
tuple[Any, int] | None | Tuple of (Python object, serialized byte size) if found, None otherwise |
put abstractmethod ¶
put(
from_stage: str, to_stage: str, put_key: str, data: Any
) -> tuple[bool, int, dict[str, Any] | None]
Store Python object, internal serialization handled by connector.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_stage | str | Source stage identifier | required |
to_stage | str | Destination stage identifier | required |
put_key | str | Unique request identifier | required |
data | Any | Python object to store | required |
Returns:
| Name | Type | Description |
|---|---|---|
tuple | tuple[bool, int, dict[str, Any] | None] | (success: bool, serialized_size: int, metadata: Optional[dict]) Metadata may contain transport-specific handles or inline data. |
OmniConnectorFactory ¶
OmniTransferConfig dataclass ¶
Top-level configuration for OmniConnector system. Members: connectors: A dictionary of connectors, keyed by (from_stage, to_stage). default_connector: The default connector to use if no connector is specified for an edge.
connectors class-attribute instance-attribute ¶
default_connector class-attribute instance-attribute ¶
default_connector: ConnectorSpec | None = None
get_connector_for_edge ¶
get_connector_for_edge(
from_stage: str, to_stage: str
) -> ConnectorSpec | None
Get connector spec for a specific edge.
SharedMemoryConnector ¶
Bases: OmniConnectorBase
Key-addressed local shared-memory connector.
SHM is a local-only transport: it reads/writes POSIX shared memory segments identified purely by key. It does not understand remote-transport metadata such as source_host / source_port (that is the RDMA connector's job). When such metadata is passed in, the connector silently falls back to key-based lookup.
YuanrongConnector ¶
YuanrongTransferEngineConnector ¶
Bases: OmniConnectorBase
Pull-based connector backed by Yuanrong transfer_engine.
device_name instance-attribute ¶
host instance-attribute ¶
host = (
_get_local_ip()
if lower() in AUTO_HOST_VALUES
else host_config
)
pool_device instance-attribute ¶
sender_zmq_port instance-attribute ¶
cleanup ¶
get ¶
get(
from_stage: str,
to_stage: str,
get_key: str,
metadata: dict[str, Any] | None = None,
) -> tuple[Any, int] | None
build_stage_connectors ¶
build_stage_connectors(
stage_id: int,
connectors_config: dict[str, Any],
purpose: str = "request_forwarding",
) -> dict[tuple[str, str], Any] | None
Instantiate OmniConnectors for a stage based on config.
Deprecated: prefer get_stage_connector_config plus the unified connector factory. Kept as a thin shim so legacy callers keep working.
get_connectors_config_for_stage ¶
get_connectors_config_for_stage(
transfer_config: OmniTransferConfig | None,
stage_id: str | int,
) -> dict[str, Any]
Extract connector configurations relevant for a specific stage worker.
Returns a dict compatible with worker initialization: { "from_stage_X": { "spec": { "name": "ConnectorName", "extra": {...} } }, ... }
get_stage_connector_config ¶
get_stage_connector_config(
transfer_config: OmniTransferConfig | None,
stage_id: int,
) -> dict[str, Any]
Return the serialized connector config payload for a specific stage.
initialize_connectors_from_config ¶
initialize_connectors_from_config(
config_path: str | Path | None = None,
default_shm_threshold: int = 65536,
purpose: str = "request_forwarding",
caller_stage_id: int | str | None = None,
is_sender: bool | None = None,
) -> tuple[
OmniTransferConfig | None,
dict[tuple[str, str], OmniConnectorBase],
]
Initialize connectors from configuration file.
Returns:
| Name | Type | Description |
|---|---|---|
tuple | tuple[OmniTransferConfig | None, dict[tuple[str, str], OmniConnectorBase]] | (OmniTransferConfig, dict of {(from, to): connector_instance}) |
initialize_orchestrator_connectors ¶
initialize_orchestrator_connectors(
config_path: str | None,
worker_backend: str | None = "multi_process",
shm_threshold_bytes: int = 65536,
) -> tuple[
OmniTransferConfig | None,
dict[tuple[str, str], OmniConnectorBase],
]
Initialize connectors shared at orchestrator level. Args: config_path: The path to the configuration file. worker_backend: The backend to use for the worker. Returns: A tuple containing the OmniTransferConfig and a dictionary of connectors.