Skip to content

vllm_omni.distributed.omni_connectors.utils.initialization

Utilities for OmniConnector configuration and validation.

KV_RANK_PORT_STRIDE module-attribute

KV_RANK_PORT_STRIDE = 16

KV_REPLICA_PORT_STRIDE module-attribute

KV_REPLICA_PORT_STRIDE = 1024

KV_TRANSFER_PORT_OFFSET module-attribute

KV_TRANSFER_PORT_OFFSET = 100

logger module-attribute

logger = get_connector_logger(__name__)

build_stage_connectors

build_stage_connectors(
    stage_id: int,
    connectors_config: dict[str, Any],
    purpose: str = "request_forwarding",
) -> dict[tuple[str, str], Any] | None

Instantiate OmniConnectors for a stage based on config.

Deprecated: prefer get_stage_connector_config plus the unified connector factory. Kept as a thin shim so legacy callers keep working.

create_connectors_from_config

create_connectors_from_config(
    connectors_config: dict[tuple[str, str], ConnectorSpec],
    purpose: str = "request_forwarding",
    caller_stage_id: int | str | None = None,
    is_sender: bool | None = None,
) -> dict[tuple[str, str], OmniConnectorBase]

Create connectors from config.

Parameters:

Name Type Description Default
connectors_config dict[tuple[str, str], ConnectorSpec]

A dictionary of connector configurations.

required

Returns:

Type Description
dict[tuple[str, str], OmniConnectorBase]

A dictionary of connectors.

get_connectors_config_for_stage

get_connectors_config_for_stage(
    transfer_config: OmniTransferConfig | None,
    stage_id: str | int,
) -> dict[str, Any]

Extract connector configurations relevant for a specific stage worker.

Returns a dict compatible with worker initialization: { "from_stage_X": { "spec": { "name": "ConnectorName", "extra": {...} } }, ... }

get_stage_connector_config

get_stage_connector_config(
    transfer_config: OmniTransferConfig | None,
    stage_id: int,
) -> dict[str, Any]

Return the serialized connector config payload for a specific stage.

initialize_connectors_from_config

initialize_connectors_from_config(
    config_path: str | Path | None = None,
    default_shm_threshold: int = 65536,
    purpose: str = "request_forwarding",
    caller_stage_id: int | str | None = None,
    is_sender: bool | None = None,
) -> tuple[
    OmniTransferConfig | None,
    dict[tuple[str, str], OmniConnectorBase],
]

Initialize connectors from configuration file.

Returns:

Name Type Description
tuple tuple[OmniTransferConfig | None, dict[tuple[str, str], OmniConnectorBase]]

(OmniTransferConfig, dict of {(from, to): connector_instance})

initialize_orchestrator_connectors

initialize_orchestrator_connectors(
    config_path: str | None,
    worker_backend: str | None = "multi_process",
    shm_threshold_bytes: int = 65536,
) -> tuple[
    OmniTransferConfig | None,
    dict[tuple[str, str], OmniConnectorBase],
]

Initialize connectors shared at orchestrator level. Args: config_path: The path to the configuration file. worker_backend: The backend to use for the worker. Returns: A tuple containing the OmniTransferConfig and a dictionary of connectors.

load_omni_transfer_config

load_omni_transfer_config(
    config_path: str | Path | None = None,
    config_dict: dict[str, Any] | None = None,
    default_shm_threshold: int = 65536,
) -> OmniTransferConfig | None

Load OmniTransferConfig from file or dict.

resolve_omni_kv_config_for_stage

resolve_omni_kv_config_for_stage(
    transfer_cfg: OmniTransferConfig | None,
    stage_id: int | str,
) -> tuple[dict[str, Any] | None, str | None, str | None]

Resolve connector configuration for a specific stage (Sender/Receiver).

This determines the primary connector configuration to be injected into the engine arguments, prioritizing outgoing edges (Sender role).