Skip to content

vllm_omni.entrypoints.omni_base

OutputMessageHandleResult module-attribute

OutputMessageHandleResult = (
    tuple[Literal[True], None, None, None]
    | tuple[Literal[False], str, int, ClientRequestState]
)

logger module-attribute

logger = init_logger(__name__)

OmniBase

Bases: PDDisaggregationMixin

Shared runtime foundation for AsyncOmni and Omni.

async_chunk instance-attribute

async_chunk = bool(getattr(engine, 'async_chunk', False))

default_sampling_params_list instance-attribute

default_sampling_params_list = default_sampling_params_list

engine instance-attribute

engine = AsyncOmniEngine(
    model=model,
    init_timeout=init_timeout,
    stage_init_timeout=stage_init_timeout,
    diffusion_batch_size=diffusion_batch_size,
    transfer_emitter=transfer_metrics,
    log_stats=log_stats,
    **kwargs,
)

errored property

errored: bool

Whether the engine is in a non-recoverable error state.

True when the orchestrator thread is dead or any stage client has been marked dead (e.g. diffusion worker OOM / process death).

Checks both _engine_dead (StageDiffusionClient) and resources.engine_dead (StageEngineCoreClient / AsyncMPClient) since the two client types store the flag differently.

is_running property

is_running: bool

log_stats instance-attribute

log_stats = log_stats

mod_metrics instance-attribute

mod_metrics = OmniModalityMetrics(
    model_name=model, log_stats=log_stats
)

model instance-attribute

model = model

num_stages property

num_stages: int

output_modalities instance-attribute

output_modalities = output_modalities or []

prom_metrics instance-attribute

prom_metrics = OmniPrometheusMetrics(
    model_name=model, log_stats=log_stats
)

request_states instance-attribute

request_states: dict[str, ClientRequestState] = {}

stage_configs property

stage_configs: list

Expose engine stage configs for PD disaggregation detection and validation.

transfer_metrics instance-attribute

transfer_metrics = OmniTransferMetrics(
    model_name=model, log_stats=log_stats
)

tts_batch_max_items instance-attribute

tts_batch_max_items: int = pop('tts_batch_max_items', 32)

check_health

check_health() -> None

close

close() -> None

from_cli_args classmethod

from_cli_args(
    args: TrackingNamespace, model: str | None = None
) -> OmniBase

Build from a TrackingNamespace parsed by TrackingArgumentParser. Only args that are explicitly passed to parse_args are forwarded.

resolve_sampling_params_list

resolve_sampling_params_list(
    sampling_params_list: Sequence[Any] | Any | None,
    allow_delta_coercion: bool = False,
) -> Sequence[Any]

shutdown

shutdown(timeout: float | None = None) -> None

start_profile

start_profile(
    profile_prefix: str | None = None,
    stages: list[int] | None = None,
) -> list[Any]

Start profiling specified stages.

Uses vLLM-compatible profile(is_start=True, profile_prefix) interface.

Parameters:

Name Type Description Default
profile_prefix str | None

Optional prefix for the trace file names.

None
stages list[int] | None

List of stage IDs to profile. If None, profiles all stages.

None

Returns:

Type Description
list[Any]

List of results from each stage.

stop_profile

stop_profile(stages: list[int] | None = None) -> list[Any]

Stop profiling specified stages.

Uses vLLM-compatible profile(is_start=False) interface.

Parameters:

Name Type Description Default
stages list[int] | None

List of stage IDs to profile. If None, stops all stages.

None

Returns:

Type Description
list[Any]

List of results from each stage.

OmniEngineDeadError

Bases: EngineDeadError

error_stage_id instance-attribute

error_stage_id: int | None = error_stage_id

omni_snapshot_download

omni_snapshot_download(model_id: str) -> str