vllm_omni.entrypoints.async_omni ¶
AsyncOmni - Refactored async orchestrator using AsyncOmniEngine.
This is the new implementation that uses AsyncOmniEngine (which manages StageEngineCoreClient instances) instead of OmniStage with worker processes.
AsyncEventResolver ¶
A generic signal aggregator designed for synchronized handshakes in distributed or multi-stage environments. Supports waiting for a specified number (expected_count) of worker signals in both inline and multiprocess modes.
AsyncOmni ¶
Bases: EngineClient, OmniBase
Asynchronous unified entry point for multi-stage pipelines using AsyncOmniEngine.
This is the refactored version that uses AsyncOmniEngine instead of OmniStage workers. It provides the same interface as AsyncOmni but with a cleaner architecture.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model | str | Model name or path to load. | '' |
**kwargs | Any | Additional keyword arguments. - stage_configs_path: Optional path to YAML file containing stage configurations. If None, configurations are resolved from model pipeline factory. - log_stats: Whether to enable statistics logging. - stage_init_timeout: Timeout for per-stage initialization. - init_timeout: Total timeout for orchestrator startup. - async_chunk: Whether to enable async chunk mode. - output_modalities: Requested output modalities. - Additional keyword arguments passed to stage engines. | {} |
Example
async_omni = AsyncOmni(model="Qwen/Qwen2.5-Omni-7B") async for output in async_omni.generate( ... prompt="Hello", ... request_id="req-1", ... sampling_params_list=[SamplingParams(), SamplingParams()] ... ): ... print(output)
errored property ¶
errored: bool
Whether the engine is in a non-recoverable error state.
Delegates to OmniBase.errored which checks the orchestrator thread and all stage clients. Redeclared here to satisfy the EngineClient abstract-property requirement (Python's ABC mechanism does not resolve abstract methods from sibling MRO entries).
model_config property ¶
Return the model config for the comprehension stage when present.
tts_max_instructions_length instance-attribute ¶
add_lora async ¶
add_lora(lora_request: LoRARequest) -> bool
Load a new LoRA adapter into all stages.
Returns True only if all concretely-implemented stages report success.
check_health async ¶
Check engine health by verifying the Orchestrator process is alive.
collective_rpc async ¶
collective_rpc(
method: str,
timeout: float | None = None,
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
stage_ids: list[int] | None = None,
) -> list[Any]
Execute a best-effort control RPC on selected stages.
Unsupported stages currently return a TODO-style result dict instead of failing the entire call. This keeps AsyncOmni usable while the orchestrator control plane is still being filled out.
do_log_stats async ¶
Log statistics.
TODO: Forward to Orchestrator process via message.
encode async ¶
encode(
prompt: Any,
pooling_params: PoolingParams,
request_id: str,
lora_request: LoRARequest | None = None,
trace_headers: dict[str, str] | None = None,
priority: int = 0,
tokenization_kwargs: dict[str, Any] | None = None,
reasoning_ended: bool | None = None,
) -> AsyncGenerator[PoolingRequestOutput, None]
EngineClient.encode() stub.
Omni pipeline currently exposes only generate() API at orchestrator level.
finish_weight_update async ¶
Finish the current weight update.
Omni does not currently support weight transfer, so this is a no-op.
generate async ¶
generate(
prompt: OmniPromptType
| AsyncGenerator[StreamingInput, None]
| list[OmniPromptType],
sampling_params: Any = None,
request_id: str = "",
*,
prompt_text: str | None = None,
lora_request: Any = None,
tokenization_kwargs: dict[str, Any] | None = None,
sampling_params_list: Sequence[OmniSamplingParams]
| None = None,
output_modalities: list[str] | None = None,
trace_headers: Mapping[str, str] | None = None,
priority: int = 0,
data_parallel_rank: int | None = None,
reasoning_ended: bool | None = None,
reasoning_parser_kwargs: dict[str, Any] | None = None,
arrival_time: float | None = None,
) -> AsyncGenerator[OmniRequestOutput, None]
Generate outputs for the given prompt(s) asynchronously.
Coordinates multi-stage pipeline execution. Processes the prompt through all stages in the pipeline and yields outputs as they become available.
Batch mode (diffusion only): When prompt is a list, all prompts are dispatched in a single DiffusionEngine.step() call at the diffusion stage. The combined result is yielded as one OmniRequestOutput with all generated images. Only a single request_id is used for the whole batch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt | OmniPromptType | AsyncGenerator[StreamingInput, None] | list[OmniPromptType] | A single prompt or a list of prompts. A list triggers batch mode when the diffusion stage is reached. | required |
request_id | str | Unique identifier for this request. If one is not provided, a random one will be generated. | '' |
sampling_params_list | Sequence[OmniSamplingParams] | None | List of SamplingParams, one per stage. Must have the same length as the number of stages. If None, uses default sampling params for each stage. | None |
output_modalities | list[str] | None | Optional list of output modalities. | None |
Yields:
| Type | Description |
|---|---|
AsyncGenerator[OmniRequestOutput, None] | OmniRequestOutput objects as they are produced by each stage. |
AsyncGenerator[OmniRequestOutput, None] | In batch mode the diffusion stage yields one output containing |
AsyncGenerator[OmniRequestOutput, None] | all generated images. |
Raises:
| Type | Description |
|---|---|
ValueError | If sampling_params_list has incorrect length. |
get_diffusion_od_config ¶
get_diffusion_od_config() -> Any | None
Return the diffusion-stage config when the pipeline has one.
get_input_preprocessor async ¶
Get input preprocessor.
get_supported_tasks async ¶
get_supported_tasks() -> tuple[SupportedTask, ...]
Return the task set exposed by the orchestrator-backed engine.
get_vllm_config async ¶
get_vllm_config() -> Any
Compatibility helper for call sites expecting async vllm config access.
is_sleeping async ¶
is_sleeping() -> bool
Return whether all stages are sleeping.
TODO(AsyncOmni): query the orchestrator once all stage backends expose a real sleeping-state RPC. For now we track the requested state locally.
notify_kv_transfer_request_rejected async ¶
notify_kv_transfer_request_rejected(
request_id: str,
kv_transfer_params: dict[str, Any],
*,
data_parallel_rank: int | None = None,
) -> None
Notify engine that a KV-transfer request was rejected before admission.
Omni does not currently use KV-transfer pre-admission resources, so this is a no-op.
pause_generation async ¶
pause_generation(
*,
mode: PauseMode = "abort",
wait_for_inflight_requests: bool = False,
clear_cache: bool = True,
) -> None
Pause generation.
remove_lora async ¶
Remove a LoRA adapter from all stages.
TODO(AsyncOmni): add richer per-stage error reporting to the public API.
reset_encoder_cache async ¶
Reset the encoder cache for all stages.
TODO: Forward to Orchestrator process via message.
reset_mm_cache async ¶
Reset the multi-modal cache for all stages.
TODO: Forward to Orchestrator process via message.
reset_prefix_cache async ¶
Reset the prefix cache for all stages.
TODO: Forward to Orchestrator process via message.
sleep async ¶
sleep(
stage_ids: list[int] | None = None,
level: int = 2,
mode: PauseMode = "abort",
) -> list[OmniACK]
start_profile async ¶
Start profiling specified stages.
Uses vLLM-compatible profile(is_start=True, profile_prefix) interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
profile_prefix | str | None | Optional prefix for the trace file names. | None |
stages | list[int] | None | List of stage IDs to profile. If None, profiles all stages. | None |
start_weight_update async ¶
start_weight_update(
is_checkpoint_format: bool = True,
) -> None
Start a new weight update.
Omni does not currently support weight transfer, so this is a no-op.