Skip to content

vllm_omni.distributed

Modules:

Name Description
kv_transfer

Patched KV transfer connectors for PD disaggregation.

omni_connectors
omni_coordinator
ray_utils

MooncakeConnector module-attribute

MooncakeConnector = MooncakeStoreConnector

ConnectorSpec dataclass

Specification for a connector instance.

extra class-attribute instance-attribute

extra: dict[str, Any] = field(default_factory=dict)

name instance-attribute

name: str

MooncakeStoreConnector

Bases: OmniConnectorBase

Mooncake-based distributed connector for OmniConnector.

config instance-attribute

config = config

host instance-attribute

host = get('host', '127.0.0.1')

localbuf instance-attribute

localbuf = get('localbuf', 64 * 1024 * 1024)

master instance-attribute

master = get('master', '127.0.0.1:50051')

metadata instance-attribute

metadata = get(
    "metadata_server", "http://127.0.0.1:8080/metadata"
)

pin instance-attribute

pin: ReplicateConfig | None = None

proto instance-attribute

proto = get('proto', 'tcp')

rdma instance-attribute

rdma = get('rdma', '')

segment instance-attribute

segment = get('segment', 512 * 1024 * 1024)

store instance-attribute

store: MooncakeDistributedStore | None = None

cleanup

cleanup(request_id: str) -> None

close

close()

Clean shutdown.

get

get(
    from_stage: str,
    to_stage: str,
    get_key: str,
    metadata: dict[str, Any] | None = None,
) -> tuple[Any, int] | None

health

health() -> dict[str, Any]

put

put(
    from_stage: str, to_stage: str, put_key: str, data: Any
) -> tuple[bool, int, dict[str, Any] | None]

MooncakeTransferEngineConnector

Bases: OmniConnectorBase

OmniConnector implementation using Mooncake Transfer Engine with a managed memory pool. Supports both CPU (Pinned) and GPU memory pools, and both RDMA and TCP protocols. Topology limitations (current implementation): Current design focuses on peer-to-peer communication between stages. - 1 sender → 1 receiver per key: After a successful RDMA write the sender immediately cleans up the buffer (cleanup()), so only the first receiver to pull a given key will succeed. Broadcast / multicast (1 sender → N receivers sharing the same data) is not yet supported. - 1 receiver → N senders: Supported via partial metadata. The manager constructs metadata with the target sender's source_host / source_port (computed from from_rank) and passes it to get(metadata=...). The connector detects that data_size is missing, queries the specified sender at the given address to fill it in, then performs the RDMA pull. This enables heterogeneous TP (sender TP > receiver TP) where a single receiver must pull KV shards from multiple sender ranks.

Future work
  • Support 1 sender → N receivers (e.g. reference-counted buffers, or explicit retain() / release() semantics so the buffer survives multiple pulls).

allocator instance-attribute

allocator = BufferAllocator(pool_size, alignment=4096)

base_ptr instance-attribute

base_ptr = data_ptr()

can_put instance-attribute

can_put = role == 'sender'

config instance-attribute

config = config

device_name instance-attribute

device_name = get('device_name', '')

engine instance-attribute

engine = TransferEngine()

engine_id instance-attribute

engine_id = str(uuid4())

host instance-attribute

host = _get_local_ip()

pool instance-attribute

pool = pin_memory()

pool_device instance-attribute

pool_device = get('memory_pool_device', 'cpu')

pool_size instance-attribute

pool_size = get('memory_pool_size', 1024 ** 3)

protocol instance-attribute

protocol = get('protocol', 'rdma')

rpc_port instance-attribute

rpc_port = get_rpc_port()

sender_host instance-attribute

sender_host = get('sender_host', None)

sender_zmq_port instance-attribute

sender_zmq_port = get('sender_zmq_port', None)

supports_raw_data class-attribute instance-attribute

supports_raw_data: bool = True

zmq_ctx instance-attribute

zmq_ctx = Context()

zmq_port instance-attribute

zmq_port = get('zmq_port', 50051)

cleanup

cleanup(
    request_id: str,
    from_stage: str | None = None,
    to_stage: str | None = None,
) -> None

Release the producer-side buffer associated with the request.

Parameters:

Name Type Description Default
request_id str

The key used in put().

required
from_stage str | None

Optional source stage. When both from_stage and to_stage are provided the method applies _make_key() internally, so callers can pass the same raw key they used in put() / get() without knowing the internal format. When omitted the request_id is used as-is (suitable for internal callers that already hold the transformed key).

None
to_stage str | None

Optional destination stage (see from_stage).

None

close

close() -> None

Gracefully shutdown the connector and release all resources. This method should be called when the connector is no longer needed. Idempotent: safe to call multiple times.

get

get(
    from_stage: str,
    to_stage: str,
    get_key: str,
    metadata: dict[str, Any] | None = None,
) -> tuple[Any, int] | None

Consumer Side. Allocates from local pool and pulls data via RDMA.

Metadata resolution:

  1. metadata provided with data_size → use directly (RDMA pull).
  2. metadata provided with source_host/source_port but without data_size → query that specific sender for data_size / is_fast_path, then RDMA pull. This is the heterogeneous-TP path where the manager knows the target sender endpoint but not the payload size.
  3. metadata=None → query the default sender (set via update_sender_info()) for the full metadata.

Returns:

Type Description
tuple[Any, int] | None

(data, size) on success, None on failure.

tuple[Any, int] | None
  • is_fast_path=True (tensor or bytes payload): Returns (ManagedBuffer, size). CALLER MUST call ManagedBuffer.release() after consuming.
tuple[Any, int] | None
  • is_fast_path=False (serialized Python object): Returns (DeserializedObject, size). Buffer is auto-released internally after deserialization.

get_connection_info

get_connection_info() -> dict[str, Any]

Get connection info for this connector (useful for sender to share with receivers).

health

health() -> dict[str, Any]

put

put(
    from_stage: str, to_stage: str, put_key: str, data: Any
) -> tuple[bool, int, dict[str, Any] | None]

Producer Side. Exposes data for RDMA transfer.

Behavior by data type: - ManagedBuffer (from this pool): Zero-copy, is_fast_path=True - ManagedBuffer (different pool): Fallback to tensor copy path - torch.Tensor / bytes: Copy to pool, is_fast_path=True - Other (dict, etc.): Serialize to bytes, copy to pool, is_fast_path=False (receiver will deserialize automatically)

update_sender_info

update_sender_info(
    sender_host: str,
    sender_zmq_port: int,
    sender_rank: int | None = None,
) -> None

Inject a sender's ZMQ endpoint into the receiver connector.

When sender_rank is None (default), sets the single default sender used by get() when no rank is specified — this preserves backward-compatible 1:1 semantics.

When sender_rank is an integer, the endpoint is stored in a per-rank registry for internal use (e.g. by _query_metadata_from_sender(sender_rank=R)).

OmniConnectorBase

Bases: ABC

Base class for all OmniConnectors.

supports_raw_data class-attribute instance-attribute

supports_raw_data: bool = False

cleanup abstractmethod

cleanup(request_id: str) -> None

Clean up resources for a request.

close abstractmethod

close() -> None

Release resources held by this connector.

Subclasses must implement this to clean up transport-specific resources (connections, memory pools, threads, etc.). Implementations should be idempotent (safe to call multiple times).

deserialize_obj staticmethod

deserialize_obj(data: bytes) -> Any

Deserialize bytes to Python object using centralized serializer.

get abstractmethod

get(
    from_stage: str,
    to_stage: str,
    get_key: str,
    metadata: dict[str, Any] | None = None,
) -> tuple[Any, int] | None

Retrieve Python object and payload size (bytes).

Parameters:

Name Type Description Default
from_stage str

Source stage identifier

required
to_stage str

Destination stage identifier

required
get_key str

Unique request identifier

required
metadata dict[str, Any] | None

Optional transport-specific metadata. When provided, the connector uses it directly (e.g. source_host, source_port, data_size) instead of querying the sender. For heterogeneous TP the manager may supply partial metadata (host/port only); the connector will query the sender at that address to fill in data_size.

None

Returns:

Type Description
tuple[Any, int] | None

Tuple of (Python object, serialized byte size) if found, None otherwise

health abstractmethod

health() -> dict[str, Any]

Return health status and metrics.

put abstractmethod

put(
    from_stage: str, to_stage: str, put_key: str, data: Any
) -> tuple[bool, int, dict[str, Any] | None]

Store Python object, internal serialization handled by connector.

Parameters:

Name Type Description Default
from_stage str

Source stage identifier

required
to_stage str

Destination stage identifier

required
put_key str

Unique request identifier

required
data Any

Python object to store

required

Returns:

Name Type Description
tuple tuple[bool, int, dict[str, Any] | None]

(success: bool, serialized_size: int, metadata: Optional[dict]) Metadata may contain transport-specific handles or inline data.

serialize_obj staticmethod

serialize_obj(obj: Any) -> bytes

Serialize a Python object to bytes using centralized serializer.

OmniConnectorFactory

Factory for creating OmniConnectors.

create_connector classmethod

create_connector(spec: ConnectorSpec) -> OmniConnectorBase

Create a connector from specification.

list_registered_connectors classmethod

list_registered_connectors() -> list[str]

List all registered connector names.

register_connector classmethod

register_connector(
    name: str,
    constructor: Callable[
        [dict[str, Any]], OmniConnectorBase
    ],
) -> None

Register a connector constructor.

OmniTransferConfig dataclass

Top-level configuration for OmniConnector system. Members: connectors: A dictionary of connectors, keyed by (from_stage, to_stage). default_connector: The default connector to use if no connector is specified for an edge.

connectors class-attribute instance-attribute

connectors: dict[tuple[str, str], ConnectorSpec] = field(
    default_factory=dict
)

default_connector class-attribute instance-attribute

default_connector: ConnectorSpec | None = None

get_connector_for_edge

get_connector_for_edge(
    from_stage: str, to_stage: str
) -> ConnectorSpec | None

Get connector spec for a specific edge.

has_connector_for_edge

has_connector_for_edge(
    from_stage: str, to_stage: str
) -> bool

Check if there's a connector configured for the edge.

SharedMemoryConnector

Bases: OmniConnectorBase

Key-addressed local shared-memory connector.

SHM is a local-only transport: it reads/writes POSIX shared memory segments identified purely by key. It does not understand remote-transport metadata such as source_host / source_port (that is the RDMA connector's job). When such metadata is passed in, the connector silently falls back to key-based lookup.

config instance-attribute

config = config

device instance-attribute

device = get('device', 'cuda:0')

stage_id instance-attribute

stage_id = get('stage_id', -1)

threshold instance-attribute

threshold = int(get('shm_threshold_bytes', 65536))

cleanup

cleanup(request_id: str) -> None

Best-effort cleanup of unconsumed SHM segments for request_id.

Matches pending keys where request_id appears as the full key, as a _-delimited prefix, or as a _-delimited suffix. If get() was never called, we unlink it here so /dev/shm doesn't leak.

close

close() -> None

Unlink all remaining tracked SHM segments.

get

get(
    from_stage: str,
    to_stage: str,
    get_key: str,
    metadata=None,
) -> tuple[Any, int] | None

health

health() -> dict[str, Any]

put

put(
    from_stage: str, to_stage: str, put_key: str, data: Any
) -> tuple[bool, int, dict[str, Any] | None]

YuanrongConnector

Bases: OmniConnectorBase

Datasystem-based distributed connector for OmniConnector.

client instance-attribute

client = None

config instance-attribute

config = config

get_sub_timeout_ms instance-attribute

get_sub_timeout_ms = max(
    0, int(get("get_sub_timeout_ms", 1000))
)

set_param instance-attribute

set_param = SetParam()

cleanup

cleanup(request_id: str) -> None

close

close() -> None

get

get(
    from_stage: str,
    to_stage: str,
    get_key: str,
    metadata: dict[str, Any] | None = None,
) -> tuple[Any, int] | None

health

health() -> dict[str, Any]

put

put(
    from_stage: str, to_stage: str, put_key: str, data: Any
) -> tuple[bool, int, dict[str, Any] | None]

YuanrongTransferEngineConnector

Bases: OmniConnectorBase

Pull-based connector backed by Yuanrong transfer_engine.

allocator instance-attribute

allocator = BufferAllocator(pool_size, alignment=4096)

base_ptr instance-attribute

base_ptr = int(data_ptr())

can_put instance-attribute

can_put = role == 'sender'

config instance-attribute

config = {
    key: (expand_env_value(value))
    for key, value in (items())
}

device_name instance-attribute

device_name = _resolve_device_name(
    get("device_name", "auto"), protocol
)

engine instance-attribute

engine = TransferEngine()

host instance-attribute

host = (
    _get_local_ip()
    if lower() in AUTO_HOST_VALUES
    else host_config
)

pool instance-attribute

pool = empty(pool_size, dtype=uint8, device=pool_device)

pool_device instance-attribute

pool_device = _resolve_pool_device(
    get("memory_pool_device", "npu")
)

pool_size instance-attribute

pool_size = int(get('memory_pool_size', 1024 ** 3))

protocol instance-attribute

protocol = str(get('protocol', 'ascend'))

rpc_port instance-attribute

rpc_port = _resolve_port(get('rpc_port'), host, 'rpc_port')

sender_host instance-attribute

sender_host = get('sender_host')

sender_zmq_port instance-attribute

sender_zmq_port = _resolve_optional_port(
    sender_zmq_port, "sender_zmq_port"
)

supports_raw_data class-attribute instance-attribute

supports_raw_data: bool = True

zmq_ctx instance-attribute

zmq_ctx = Context()

zmq_port instance-attribute

zmq_port = _resolve_port(get('zmq_port'), host, 'zmq_port')

cleanup

cleanup(
    request_id: str,
    from_stage: str | None = None,
    to_stage: str | None = None,
) -> None

close

close() -> None

get

get(
    from_stage: str,
    to_stage: str,
    get_key: str,
    metadata: dict[str, Any] | None = None,
) -> tuple[Any, int] | None

get_connection_info

get_connection_info() -> dict[str, Any]

health

health() -> dict[str, Any]

put

put(
    from_stage: str, to_stage: str, put_key: str, data: Any
) -> tuple[bool, int, dict[str, Any] | None]

update_sender_info

update_sender_info(
    sender_host: str, sender_zmq_port: int
) -> None

load_omni_transfer_config

load_omni_transfer_config(
    config_path: str | Path | None = None,
    config_dict: dict[str, Any] | None = None,
    default_shm_threshold: int = 65536,
) -> OmniTransferConfig | None

Load OmniTransferConfig from file or dict.