Skip to content

vllm_omni.engine.async_omni_engine

Async Omni Engine for vLLM-Omni multi-stage runtime.

AsyncOmniEngine in the caller's thread is a thin proxy that communicates with the Orchestrator (running in a background thread) via janus queues.

logger module-attribute

logger = init_logger(__name__)

AsyncOmniEngine

Thin proxy that launches an Orchestrator in a background thread.

All stage clients, input/output processors, and stage-to-stage transfer logic live inside the Orchestrator coroutine (running in its own thread with a dedicated asyncio event loop). This class communicates with it via janus queues (sync side for callers, async side for orchestrator).

Parameters:

Name Type Description Default
model str

Model name or path

required
init_timeout int

Total timeout waiting for orchestrator startup (seconds).

600
stage_init_timeout int

Timeout for stage initialization (seconds)

300
**kwargs Any

Additional arguments

{}

async_chunk instance-attribute

async_chunk = bool(
    getattr(stage0_args, "async_chunk", False)
)

default_sampling_params_list instance-attribute

default_sampling_params_list: list[OmniSamplingParams] = []

diffusion_batch_size instance-attribute

diffusion_batch_size = diffusion_batch_size

input_processor instance-attribute

input_processor: InputProcessor | None = None

model instance-attribute

model = model

num_stages instance-attribute

num_stages = len(stage_configs)

orchestrator_thread instance-attribute

orchestrator_thread = Thread(
    target=_bootstrap_orchestrator,
    args=(stage_init_timeout, startup_future),
    daemon=True,
    name="orchestrator",
)

output_queue instance-attribute

output_queue: Queue[EngineQueueMessage] = Queue()

request_queue instance-attribute

request_queue: Queue[EngineQueueMessage] = Queue()

rpc_output_queue instance-attribute

rpc_output_queue: Queue[EngineQueueMessage] = Queue()

single_stage_mode instance-attribute

single_stage_mode: bool = single_stage_mode

stage_clients instance-attribute

stage_clients: list[StageClient] = []

stage_metadata instance-attribute

stage_metadata: list[StageRuntimeInfo] = []

stage_pools instance-attribute

stage_pools: list[StagePool] = []

supported_tasks instance-attribute

supported_tasks: tuple[str, ...] = ('generate',)

tokenizer instance-attribute

tokenizer = tokenizer

abort

abort(request_ids: list[str]) -> None

Send abort message to the Orchestrator.

abort_async async

abort_async(request_ids: list[str]) -> None

Async abort API.

add_request

add_request(
    request_id: str,
    prompt: EngineCoreRequest | PromptType,
    prompt_text: str | None = None,
    sampling_params_list: Sequence[Any] | None = None,
    final_stage_id: int = 0,
    final_output_stage_ids: Sequence[int] | None = None,
    arrival_time: float | None = None,
    lora_request: Any = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
    reasoning_ended: bool | None = None,
    *,
    resumable: bool = False,
) -> None

Process stage-0 input locally, then send to the Orchestrator.

Input processing and output processor registration happen here in the caller's thread, avoiding a queue + coroutine-switch round-trip. The Orchestrator receives a ready-to-submit OmniEngineCoreRequest.

add_request_async async

add_request_async(
    request_id: str,
    prompt: EngineCoreRequest | PromptType,
    prompt_text: str | None = None,
    sampling_params_list: Sequence[Any] | None = None,
    final_stage_id: int = 0,
    final_output_stage_ids: Sequence[int] | None = None,
    arrival_time: float | None = None,
    lora_request: Any = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
    reasoning_ended: bool | None = None,
    *,
    resumable: bool = False,
) -> None

Async add_request API.

add_streaming_update

add_streaming_update(
    request_id: str,
    prompt: EngineCoreRequest | PromptType,
    prompt_text: str | None = None,
    sampling_params_list: Sequence[Any] | None = None,
    final_stage_id: int = 0,
    final_output_stage_ids: Sequence[int] | None = None,
    arrival_time: float | None = None,
    *,
    resumable: bool = True,
) -> None

Send an incremental streaming update for an existing request.

add_streaming_update_async async

add_streaming_update_async(
    request_id: str,
    prompt: EngineCoreRequest | PromptType,
    prompt_text: str | None = None,
    sampling_params_list: Sequence[Any] | None = None,
    final_stage_id: int = 0,
    final_output_stage_ids: Sequence[int] | None = None,
    arrival_time: float | None = None,
    *,
    resumable: bool = True,
) -> None

Async wrapper for add_streaming_update().

collective_rpc

collective_rpc(
    method: str,
    timeout: float | None = None,
    args: tuple[Any, ...] = (),
    kwargs: dict[str, Any] | None = None,
    stage_ids: list[int] | None = None,
) -> list[Any]

Send a control RPC to the Orchestrator and wait for aggregated results.

This uses a dedicated RPC output queue so control-plane messages do not race with the normal request output polling loop.

collective_rpc_async async

collective_rpc_async(
    method: str,
    timeout: float | None = None,
    args: tuple[Any, ...] = (),
    kwargs: dict[str, Any] | None = None,
    stage_ids: list[int] | None = None,
) -> list[Any]

Async wrapper around collective_rpc().

get_stage_metadata

get_stage_metadata(stage_id: int) -> StageRuntimeInfo

Get cached metadata for a stage.

is_alive

is_alive() -> bool

Whether the orchestrator thread is alive.

shutdown

shutdown() -> None

Send shutdown message and wait for the Orchestrator thread to exit.

try_get_output

try_get_output(
    timeout: float = 0.001,
) -> EngineQueueMessage | None

Read one output message from the Orchestrator output queue.

try_get_output_async async

try_get_output_async() -> EngineQueueMessage | None

Async read from the Orchestrator output queue.

StageRuntimeInfo dataclass

final_output instance-attribute

final_output: bool

final_output_type instance-attribute

final_output_type: FinalOutputModalityType | None

model_stage class-attribute instance-attribute

model_stage: str | None = None

stage_type instance-attribute

stage_type: str