vllm_omni.engine.async_omni_engine ¶
Async Omni Engine for vLLM-Omni multi-stage runtime.
AsyncOmniEngine in the caller's thread is a thin proxy that communicates with the Orchestrator (running in a background thread) via janus queues.
AsyncOmniEngine ¶
Thin proxy that launches an Orchestrator in a background thread.
All stage clients, input/output processors, and stage-to-stage transfer logic live inside the Orchestrator coroutine (running in its own thread with a dedicated asyncio event loop). This class communicates with it via janus queues (sync side for callers, async side for orchestrator).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model | str | Model name or path | required |
init_timeout | int | Total timeout waiting for orchestrator startup (seconds). | 600 |
stage_init_timeout | int | Timeout for stage initialization (seconds) | 300 |
**kwargs | Any | Additional arguments | {} |
default_sampling_params_list instance-attribute ¶
default_sampling_params_list: list[OmniSamplingParams] = []
orchestrator_thread instance-attribute ¶
orchestrator_thread = Thread(
target=_bootstrap_orchestrator,
args=(stage_init_timeout, startup_future),
daemon=True,
name="orchestrator",
)
add_request ¶
add_request(
request_id: str,
prompt: EngineCoreRequest | PromptType,
prompt_text: str | None = None,
sampling_params_list: Sequence[Any] | None = None,
final_stage_id: int = 0,
final_output_stage_ids: Sequence[int] | None = None,
arrival_time: float | None = None,
lora_request: Any = None,
tokenization_kwargs: dict[str, Any] | None = None,
trace_headers: Mapping[str, str] | None = None,
priority: int = 0,
data_parallel_rank: int | None = None,
reasoning_ended: bool | None = None,
*,
resumable: bool = False,
) -> None
Process stage-0 input locally, then send to the Orchestrator.
Input processing and output processor registration happen here in the caller's thread, avoiding a queue + coroutine-switch round-trip. The Orchestrator receives a ready-to-submit OmniEngineCoreRequest.
add_request_async async ¶
add_request_async(
request_id: str,
prompt: EngineCoreRequest | PromptType,
prompt_text: str | None = None,
sampling_params_list: Sequence[Any] | None = None,
final_stage_id: int = 0,
final_output_stage_ids: Sequence[int] | None = None,
arrival_time: float | None = None,
lora_request: Any = None,
tokenization_kwargs: dict[str, Any] | None = None,
trace_headers: Mapping[str, str] | None = None,
priority: int = 0,
data_parallel_rank: int | None = None,
reasoning_ended: bool | None = None,
*,
resumable: bool = False,
) -> None
Async add_request API.
add_streaming_update ¶
add_streaming_update(
request_id: str,
prompt: EngineCoreRequest | PromptType,
prompt_text: str | None = None,
sampling_params_list: Sequence[Any] | None = None,
final_stage_id: int = 0,
final_output_stage_ids: Sequence[int] | None = None,
arrival_time: float | None = None,
*,
resumable: bool = True,
) -> None
Send an incremental streaming update for an existing request.
add_streaming_update_async async ¶
add_streaming_update_async(
request_id: str,
prompt: EngineCoreRequest | PromptType,
prompt_text: str | None = None,
sampling_params_list: Sequence[Any] | None = None,
final_stage_id: int = 0,
final_output_stage_ids: Sequence[int] | None = None,
arrival_time: float | None = None,
*,
resumable: bool = True,
) -> None
Async wrapper for add_streaming_update().
collective_rpc ¶
collective_rpc(
method: str,
timeout: float | None = None,
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
stage_ids: list[int] | None = None,
) -> list[Any]
Send a control RPC to the Orchestrator and wait for aggregated results.
This uses a dedicated RPC output queue so control-plane messages do not race with the normal request output polling loop.
collective_rpc_async async ¶
collective_rpc_async(
method: str,
timeout: float | None = None,
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
stage_ids: list[int] | None = None,
) -> list[Any]
Async wrapper around collective_rpc().
get_stage_metadata ¶
get_stage_metadata(stage_id: int) -> StageRuntimeInfo
Get cached metadata for a stage.
try_get_output ¶
try_get_output(
timeout: float = 0.001,
) -> EngineQueueMessage | None
Read one output message from the Orchestrator output queue.
try_get_output_async async ¶
try_get_output_async() -> EngineQueueMessage | None
Async read from the Orchestrator output queue.