vllm_omni.distributed.omni_connectors.utils.initialization ¶
Utilities for OmniConnector configuration and validation.
build_stage_connectors ¶
build_stage_connectors(
stage_id: int,
connectors_config: dict[str, Any],
purpose: str = "request_forwarding",
) -> dict[tuple[str, str], Any] | None
Instantiate OmniConnectors for a stage based on config.
Deprecated: prefer get_stage_connector_config plus the unified connector factory. Kept as a thin shim so legacy callers keep working.
create_connectors_from_config ¶
create_connectors_from_config(
connectors_config: dict[tuple[str, str], ConnectorSpec],
purpose: str = "request_forwarding",
caller_stage_id: int | str | None = None,
is_sender: bool | None = None,
) -> dict[tuple[str, str], OmniConnectorBase]
Create connectors from config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connectors_config | dict[tuple[str, str], ConnectorSpec] | A dictionary of connector configurations. | required |
Returns:
| Type | Description |
|---|---|
dict[tuple[str, str], OmniConnectorBase] | A dictionary of connectors. |
get_connectors_config_for_stage ¶
get_connectors_config_for_stage(
transfer_config: OmniTransferConfig | None,
stage_id: str | int,
) -> dict[str, Any]
Extract connector configurations relevant for a specific stage worker.
Returns a dict compatible with worker initialization: { "from_stage_X": { "spec": { "name": "ConnectorName", "extra": {...} } }, ... }
get_stage_connector_config ¶
get_stage_connector_config(
transfer_config: OmniTransferConfig | None,
stage_id: int,
) -> dict[str, Any]
Return the serialized connector config payload for a specific stage.
initialize_connectors_from_config ¶
initialize_connectors_from_config(
config_path: str | Path | None = None,
default_shm_threshold: int = 65536,
purpose: str = "request_forwarding",
caller_stage_id: int | str | None = None,
is_sender: bool | None = None,
) -> tuple[
OmniTransferConfig | None,
dict[tuple[str, str], OmniConnectorBase],
]
Initialize connectors from configuration file.
Returns:
| Name | Type | Description |
|---|---|---|
tuple | tuple[OmniTransferConfig | None, dict[tuple[str, str], OmniConnectorBase]] | (OmniTransferConfig, dict of {(from, to): connector_instance}) |
initialize_orchestrator_connectors ¶
initialize_orchestrator_connectors(
config_path: str | None,
worker_backend: str | None = "multi_process",
shm_threshold_bytes: int = 65536,
) -> tuple[
OmniTransferConfig | None,
dict[tuple[str, str], OmniConnectorBase],
]
Initialize connectors shared at orchestrator level. Args: config_path: The path to the configuration file. worker_backend: The backend to use for the worker. Returns: A tuple containing the OmniTransferConfig and a dictionary of connectors.
load_omni_transfer_config ¶
load_omni_transfer_config(
config_path: str | Path | None = None,
config_dict: dict[str, Any] | None = None,
default_shm_threshold: int = 65536,
) -> OmniTransferConfig | None
Load OmniTransferConfig from file or dict.
resolve_omni_kv_config_for_stage ¶
resolve_omni_kv_config_for_stage(
transfer_cfg: OmniTransferConfig | None,
stage_id: int | str,
) -> tuple[dict[str, Any] | None, str | None, str | None]
Resolve connector configuration for a specific stage (Sender/Receiver).
This determines the primary connector configuration to be injected into the engine arguments, prioritizing outgoing edges (Sender role).