Skip to content

vllm_omni.entrypoints.async_omni

AsyncOmni - Refactored async orchestrator using AsyncOmniEngine.

This is the new implementation that uses AsyncOmniEngine (which manages StageEngineCoreClient instances) instead of OmniStage with worker processes.

logger module-attribute

logger = init_logger(__name__)

AsyncEventResolver

A generic signal aggregator designed for synchronized handshakes in distributed or multi-stage environments. Supports waiting for a specified number (expected_count) of worker signals in both inline and multiprocess modes.

orchestrator instance-attribute

orchestrator = orchestrator

resolve async

resolve(ack: OmniACK)

watch_task

watch_task(task_id: str, expected_count: int = 1) -> Future

AsyncOmni

Bases: EngineClient, OmniBase

Asynchronous unified entry point for multi-stage pipelines using AsyncOmniEngine.

This is the refactored version that uses AsyncOmniEngine instead of OmniStage workers. It provides the same interface as AsyncOmni but with a cleaner architecture.

Parameters:

Name Type Description Default
model str

Model name or path to load.

''
**kwargs Any

Additional keyword arguments. - stage_configs_path: Optional path to YAML file containing stage configurations. If None, configurations are resolved from model pipeline factory. - log_stats: Whether to enable statistics logging. - stage_init_timeout: Timeout for per-stage initialization. - init_timeout: Total timeout for orchestrator startup. - async_chunk: Whether to enable async chunk mode. - output_modalities: Requested output modalities. - Additional keyword arguments passed to stage engines.

{}
Example

async_omni = AsyncOmni(model="Qwen/Qwen2.5-Omni-7B") async for output in async_omni.generate( ... prompt="Hello", ... request_id="req-1", ... sampling_params_list=[SamplingParams(), SamplingParams()] ... ): ... print(output)

config_path instance-attribute

config_path = config_path

dead_error property

dead_error: BaseException

EngineClient abstract property implementation.

errored property

errored: bool

Whether the engine is in a non-recoverable error state.

Delegates to OmniBase.errored which checks the orchestrator thread and all stage clients. Redeclared here to satisfy the EngineClient abstract-property requirement (Python's ABC mechanism does not resolve abstract methods from sibling MRO entries).

event_resolver instance-attribute

event_resolver = AsyncEventResolver(orchestrator=self)

final_output_task instance-attribute

final_output_task: Task | None = None

input_processor instance-attribute

input_processor = input_processor

io_processor instance-attribute

io_processor = None

is_running property

is_running: bool

Check if the engine is running.

is_stopped property

is_stopped: bool

EngineClient abstract property implementation.

model_config property

model_config

Return the model config for the comprehension stage when present.

renderer property

renderer

Return the renderer from the engine input processor when available.

tts_max_instructions_length instance-attribute

tts_max_instructions_length = get(
    "tts_max_instructions_length", None
)

vllm_config property

vllm_config

Return the vLLM config for the comprehension stage when present.

abort async

abort(request_id: str | Iterable[str]) -> None

Abort request(s) via the Orchestrator.

add_lora async

add_lora(lora_request: LoRARequest) -> bool

Load a new LoRA adapter into all stages.

Returns True only if all concretely-implemented stages report success.

check_health async

check_health() -> None

Check engine health by verifying the Orchestrator process is alive.

collective_rpc async

collective_rpc(
    method: str,
    timeout: float | None = None,
    args: tuple[Any, ...] = (),
    kwargs: dict[str, Any] | None = None,
    stage_ids: list[int] | None = None,
) -> list[Any]

Execute a best-effort control RPC on selected stages.

Unsupported stages currently return a TODO-style result dict instead of failing the entire call. This keeps AsyncOmni usable while the orchestrator control plane is still being filled out.

do_log_stats async

do_log_stats() -> None

Log statistics.

TODO: Forward to Orchestrator process via message.

encode async

encode(
    prompt: Any,
    pooling_params: PoolingParams,
    request_id: str,
    lora_request: LoRARequest | None = None,
    trace_headers: dict[str, str] | None = None,
    priority: int = 0,
    tokenization_kwargs: dict[str, Any] | None = None,
    reasoning_ended: bool | None = None,
) -> AsyncGenerator[PoolingRequestOutput, None]

EngineClient.encode() stub.

Omni pipeline currently exposes only generate() API at orchestrator level.

finish_weight_update async

finish_weight_update() -> None

Finish the current weight update.

Omni does not currently support weight transfer, so this is a no-op.

generate async

generate(
    prompt: OmniPromptType
    | AsyncGenerator[StreamingInput, None]
    | list[OmniPromptType],
    sampling_params: Any = None,
    request_id: str = "",
    *,
    prompt_text: str | None = None,
    lora_request: Any = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    sampling_params_list: Sequence[OmniSamplingParams]
    | None = None,
    output_modalities: list[str] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
    reasoning_ended: bool | None = None,
    reasoning_parser_kwargs: dict[str, Any] | None = None,
    arrival_time: float | None = None,
) -> AsyncGenerator[OmniRequestOutput, None]

Generate outputs for the given prompt(s) asynchronously.

Coordinates multi-stage pipeline execution. Processes the prompt through all stages in the pipeline and yields outputs as they become available.

Batch mode (diffusion only): When prompt is a list, all prompts are dispatched in a single DiffusionEngine.step() call at the diffusion stage. The combined result is yielded as one OmniRequestOutput with all generated images. Only a single request_id is used for the whole batch.

Parameters:

Name Type Description Default
prompt OmniPromptType | AsyncGenerator[StreamingInput, None] | list[OmniPromptType]

A single prompt or a list of prompts. A list triggers batch mode when the diffusion stage is reached.

required
request_id str

Unique identifier for this request. If one is not provided, a random one will be generated.

''
sampling_params_list Sequence[OmniSamplingParams] | None

List of SamplingParams, one per stage. Must have the same length as the number of stages. If None, uses default sampling params for each stage.

None
output_modalities list[str] | None

Optional list of output modalities.

None

Yields:

Type Description
AsyncGenerator[OmniRequestOutput, None]

OmniRequestOutput objects as they are produced by each stage.

AsyncGenerator[OmniRequestOutput, None]

In batch mode the diffusion stage yields one output containing

AsyncGenerator[OmniRequestOutput, None]

all generated images.

Raises:

Type Description
ValueError

If sampling_params_list has incorrect length.

get_diffusion_od_config

get_diffusion_od_config() -> Any | None

Return the diffusion-stage config when the pipeline has one.

get_input_preprocessor async

get_input_preprocessor() -> InputPreprocessor

Get input preprocessor.

get_supported_tasks async

get_supported_tasks() -> tuple[SupportedTask, ...]

Return the task set exposed by the orchestrator-backed engine.

get_tokenizer async

get_tokenizer() -> TokenizerLike

Get tokenizer for the comprehension stage.

get_vllm_config async

get_vllm_config() -> Any

Compatibility helper for call sites expecting async vllm config access.

is_paused async

is_paused() -> bool

Check if paused.

is_sleeping async

is_sleeping() -> bool

Return whether all stages are sleeping.

TODO(AsyncOmni): query the orchestrator once all stage backends expose a real sleeping-state RPC. For now we track the requested state locally.

is_tracing_enabled async

is_tracing_enabled() -> bool

Check if tracing is enabled.

list_loras async

list_loras() -> list[int]

List all loaded LoRA adapter IDs across stages.

notify_kv_transfer_request_rejected async

notify_kv_transfer_request_rejected(
    request_id: str,
    kv_transfer_params: dict[str, Any],
    *,
    data_parallel_rank: int | None = None,
) -> None

Notify engine that a KV-transfer request was rejected before admission.

Omni does not currently use KV-transfer pre-admission resources, so this is a no-op.

pause_generation async

pause_generation(
    *,
    mode: PauseMode = "abort",
    wait_for_inflight_requests: bool = False,
    clear_cache: bool = True,
) -> None

Pause generation.

pin_lora async

pin_lora(adapter_id: int) -> bool

Pin a LoRA adapter across stages.

remove_lora async

remove_lora(adapter_id: int) -> bool

Remove a LoRA adapter from all stages.

TODO(AsyncOmni): add richer per-stage error reporting to the public API.

reset_encoder_cache async

reset_encoder_cache() -> None

Reset the encoder cache for all stages.

TODO: Forward to Orchestrator process via message.

reset_mm_cache async

reset_mm_cache() -> None

Reset the multi-modal cache for all stages.

TODO: Forward to Orchestrator process via message.

reset_prefix_cache async

reset_prefix_cache(
    reset_running_requests: bool = False,
    reset_connector: bool = False,
) -> bool

Reset the prefix cache for all stages.

TODO: Forward to Orchestrator process via message.

resume_generation async

resume_generation() -> None

Resume generation.

shutdown

shutdown(timeout: float | None = None) -> None

Shutdown the engine.

sleep async

sleep(
    stage_ids: list[int] | None = None,
    level: int = 2,
    mode: PauseMode = "abort",
) -> list[OmniACK]

start_profile async

start_profile(
    profile_prefix: str | None = None,
    stages: list[int] | None = None,
) -> list[Any]

Start profiling specified stages.

Uses vLLM-compatible profile(is_start=True, profile_prefix) interface.

Parameters:

Name Type Description Default
profile_prefix str | None

Optional prefix for the trace file names.

None
stages list[int] | None

List of stage IDs to profile. If None, profiles all stages.

None

start_weight_update async

start_weight_update(
    is_checkpoint_format: bool = True,
) -> None

Start a new weight update.

Omni does not currently support weight transfer, so this is a no-op.

stop_profile async

stop_profile(stages: list[int] | None = None) -> list[Any]

Stop profiling specified stages.

Uses vLLM-compatible profile(is_start=False) interface.

Parameters:

Name Type Description Default
stages list[int] | None

List of stage IDs to profile. If None, stops all stages.

None

wake_up async

wake_up(
    stage_ids: list[int] | None = None,
    tags: list[str] | None = None,
) -> list[OmniACK]