Skip to content

vllm_omni.engine.orchestrator

Orchestrator for vLLM-Omni multi-stage runtime.

Runs inside a background thread with its own asyncio event loop. Owns logical request progression across stage pools and handles stage-to-stage transfer logic.

In distributed mode (coordinator_pub_address provided), it also owns the single :class:OmniCoordClientForHub, runs a :meth:_watch_replica_list task that converts replica disappearances into unregister_remote_replica control messages, and handles the register_remote_replica / unregister_remote_replica flow that attaches / detaches head-side stage clients for headless replicas.

RemoteReplicaFactory module-attribute

RemoteReplicaFactory = Callable[[int, int], Awaitable[Any]]

logger module-attribute

logger = init_logger(__name__)

Orchestrator

Runs inside a background thread's asyncio event loop.

async_chunk instance-attribute

async_chunk = bool(async_chunk)

num_stages instance-attribute

num_stages = len(stage_pools)

output_async_queue instance-attribute

output_async_queue = output_async_queue

request_async_queue instance-attribute

request_async_queue = request_async_queue

request_states instance-attribute

request_states: dict[str, OrchestratorRequestState] = {}

rpc_async_queue instance-attribute

rpc_async_queue = rpc_async_queue

stage_pools instance-attribute

stage_pools: list[StagePool] = stage_pools

run async

run() -> None

Main entry point for the Orchestrator event loop.

OrchestratorRequestState dataclass

Per-request bookkeeping inside the Orchestrator.

final_output_stage_ids class-attribute instance-attribute

final_output_stage_ids: set[int] = field(
    default_factory=set
)

final_stage_id class-attribute instance-attribute

final_stage_id: int = -1

finished_final_output_stage_ids class-attribute instance-attribute

finished_final_output_stage_ids: set[int] = field(
    default_factory=set
)

mm_features class-attribute instance-attribute

mm_features: list | None = None

mm_processor_kwargs class-attribute instance-attribute

mm_processor_kwargs: dict | None = None

pd_prefill_multimodal_output class-attribute instance-attribute

pd_prefill_multimodal_output: dict[str, Any] | None = None

pipeline_timings class-attribute instance-attribute

pipeline_timings: dict[str, float] = field(
    default_factory=dict
)

prompt class-attribute instance-attribute

prompt: Any = None

request_id instance-attribute

request_id: str

request_timestamp class-attribute instance-attribute

request_timestamp: float = 0.0

sampling_params_list class-attribute instance-attribute

sampling_params_list: list[Any] = field(
    default_factory=list
)

stage_submit_ts class-attribute instance-attribute

stage_submit_ts: dict[int, float] = field(
    default_factory=dict
)

streaming class-attribute instance-attribute

streaming: StreamingInputState = field(
    default_factory=lambda: StreamingInputState()
)

StreamingInputState dataclass

bridge_states class-attribute instance-attribute

bridge_states: dict[str, Any] = field(default_factory=dict)

enabled class-attribute instance-attribute

enabled: bool = False

new_prompt_len_snapshot class-attribute instance-attribute

new_prompt_len_snapshot: int | None = None

segment_finished class-attribute instance-attribute

segment_finished: bool = False

build_engine_core_request_from_tokens

build_engine_core_request_from_tokens(
    request_id: str,
    prompt: dict[str, Any],
    params: SamplingParams | PoolingParams,
    arrival_time: float | None = None,
    model_config: ModelConfig | None = None,
    resumable: bool = False,
    mm_features: list | None = None,
) -> OmniEngineCoreRequest

Build an OmniEngineCoreRequest directly from an OmniTokensPrompt.