Skip to content

vllm_omni.diffusion.diffusion_engine

logger module-attribute

logger = init_logger(__name__)

DiffusionEngine

The diffusion engine for vLLM-Omni diffusion models.

abort_queue instance-attribute

abort_queue: Queue[str] = Queue()

execute_fn instance-attribute

execute_fn = (
    execute_step if step_execution else execute_request
)

executor instance-attribute

executor = executor_class(od_config)

main_loop instance-attribute

main_loop: AbstractEventLoop | None = None

od_config instance-attribute

od_config = od_config

post_process_func instance-attribute

post_process_func = get_diffusion_post_process_func(
    od_config
)

pre_process_func instance-attribute

pre_process_func = get_diffusion_pre_process_func(od_config)

scheduler instance-attribute

scheduler: SchedulerInterface = scheduler or (
    StepScheduler()
    if step_execution
    else RequestScheduler()
)

step_execution instance-attribute

step_execution = bool(
    getattr(od_config, "step_execution", False)
)

stop_event instance-attribute

stop_event: Event | None = None

worker_thread instance-attribute

worker_thread: Thread | None = None

abort

abort(request_id: str | Iterable[str]) -> None

add_req_and_wait_for_response

add_req_and_wait_for_response(
    request: OmniDiffusionRequest,
) -> DiffusionOutput

add_request

add_request(request: OmniDiffusionRequest) -> str

async_add_req_and_wait_for_response async

async_add_req_and_wait_for_response(
    request: OmniDiffusionRequest,
) -> DiffusionOutput

async_collective_rpc async

async_collective_rpc(
    method: str,
    timeout: float | None = None,
    args: tuple = (),
    kwargs: dict | None = None,
    unique_reply_rank: int | None = None,
) -> Any

Async variant of :meth:collective_rpc for event-loop callers.

Mirrors :meth:async_add_req_and_wait_for_response: enqueue a task keyed by a future and await the result without blocking the loop.

close

close() -> None

collective_rpc

collective_rpc(
    method: str,
    timeout: float | None = None,
    args: tuple = (),
    kwargs: dict | None = None,
    unique_reply_rank: int | None = None,
) -> Any

Call a method on worker processes and get results immediately.

The call is enqueued and executed by the engine's busy loop between scheduler steps, so it is naturally serialized against per-request execute_fn() invocations without any explicit mutual-exclusion lock.

Parameters:

Name Type Description Default
method str

The method name (str) to execute on workers

required
timeout float | None

Optional timeout in seconds

None
args tuple

Positional arguments for the method

()
kwargs dict | None

Keyword arguments for the method

None
unique_reply_rank int | None

If set, only get reply from this rank

None

Returns:

Type Description
Any

Single result if unique_reply_rank is provided, otherwise list of results

get_result async

get_result(request_id: str) -> DiffusionOutput

make_engine staticmethod

make_engine(
    config: OmniDiffusionConfig,
    scheduler: SchedulerInterface | None = None,
) -> DiffusionEngine

Factory method to create a DiffusionEngine instance.

Parameters:

Name Type Description Default
config OmniDiffusionConfig

The configuration for the diffusion engine.

required

Returns:

Type Description
DiffusionEngine

An instance of DiffusionEngine.

profile

profile(
    is_start: bool = True, profile_prefix: str | None = None
) -> None

Start or stop profiling on all diffusion workers.

Parameters:

Name Type Description Default
is_start bool

True to start profiling, False to stop.

True
profile_prefix str | None

Optional prefix for trace filename.

None

step async

get_dummy_run_num_frames

get_dummy_run_num_frames(
    model_class_name: str, supports_audio_input: bool
) -> int

Get num_frames for the dummy warmup run. Returns 0 to skip warmup.

get_extra_body_params

get_extra_body_params(
    model_class_name: str,
) -> frozenset[str]

Return the set of extra_body keys accepted by a pipeline.

Each pipeline can declare EXTRA_BODY_PARAMS: ClassVar[frozenset[str]] to advertise which request-level parameters should be forwarded from extra_body to OmniDiffusionSamplingParams.extra_args. Returns an empty frozenset when the pipeline does not declare any.

get_extra_output_params

get_extra_output_params(
    model_class_name: str,
) -> frozenset[str]

Return the set of custom_output keys to expose in API response metrics.

Each pipeline can declare EXTRA_OUTPUT_PARAMS: ClassVar[frozenset[str]] to advertise which DiffusionOutput.custom_output keys should be copied into the response metrics dict. Returns an empty frozenset when the pipeline does not declare any.

image_color_format

image_color_format(model_class_name: str) -> str

supports_audio_output

supports_audio_output(model_class_name: str) -> bool

supports_multimodal_input

supports_multimodal_input(
    od_config: OmniDiffusionConfig,
) -> tuple[bool, bool]