Skip to content

vllm_omni.diffusion.worker.diffusion_worker

Diffusion Worker for vLLM-Omni.

Handles GPU infrastructure initialization and delegates model operations to DiffusionModelRunner.

logger module-attribute

logger = init_logger(__name__)

CustomPipelineWorkerExtension

re_init_pipeline

re_init_pipeline(
    custom_pipeline_args: dict[str, Any],
) -> None

Re-initialize the pipeline with custom arguments.

Parameters:

Name Type Description Default
custom_pipeline_args dict[str, Any]

Dictionary of arguments for custom pipeline initialization

required

DiffusionWorker

A worker that manages GPU infrastructure and delegates to the model runner.

This class handles infrastructure initialization only: - Device setup (CUDA device selection) - Distributed environment (NCCL, model parallel) - Memory management (sleep/wake)

All model-related operations (loading, compilation, execution) are delegated to DiffusionModelRunner.

device instance-attribute

device: device | None = None

local_rank instance-attribute

local_rank = local_rank

lora_manager instance-attribute

lora_manager: DiffusionLoRAManager | None = None

model_runner instance-attribute

model_runner: DiffusionModelRunner | None = (
    model_runner_cls(
        vllm_config=vllm_config,
        od_config=od_config,
        device=device,
    )
)

od_config instance-attribute

od_config = od_config

profiler instance-attribute

profiler: WorkerProfiler | None = _create_profiler()

rank instance-attribute

rank = rank

stage_id instance-attribute

stage_id = getattr(od_config, 'stage_id', 0)

vllm_config instance-attribute

vllm_config: VllmConfig | None = None

add_lora

add_lora(lora_request: LoRARequest) -> bool

execute_model

execute_model(
    req: OmniDiffusionRequest,
    od_config: OmniDiffusionConfig,
) -> DiffusionOutput

Execute a forward pass by delegating to the model runner.

execute_stepwise

execute_stepwise(
    scheduler_output: DiffusionSchedulerOutput,
) -> BaseRunnerOutput

Execute one diffusion step by delegating to the model runner.

generate

generate(request: OmniDiffusionRequest) -> DiffusionOutput

Generate output for the given requests.

handle_sleep_task

handle_sleep_task(task: OmniSleepTask) -> OmniACK

handle_wake_task

handle_wake_task(task: OmniWakeTask) -> OmniACK

init_device

init_device() -> None

Initialize the device and distributed environment.

init_lora_manager

init_lora_manager() -> None

Initialize the LoRA manager for this worker.

list_loras

list_loras() -> list[int]

load_model

load_model(
    load_format: str = "default",
    custom_pipeline_name: str | None = None,
    **kwargs,
) -> None

Load the diffusion model using DiffusionModelRunner.

load_weights

load_weights(weights) -> set[str]

Load weights by delegating to the model runner.

pin_lora

pin_lora(adapter_id: int) -> bool

profile

profile(
    is_start: bool = True, profile_prefix: str | None = None
) -> None

Start or stop profiling for this GPU worker.

Parameters:

Name Type Description Default
is_start bool

True to start profiling, False to stop.

True
profile_prefix str | None

Optional prefix for trace filename.

None

remove_lora

remove_lora(adapter_id: int) -> bool

shutdown

shutdown() -> None

Shutdown the worker and cleanup distributed environment.

sleep

sleep(level: int = 1) -> bool

Put the worker to sleep, offloading model weights.

Parameters:

Name Type Description Default
level int

Sleep level. Level 1 offloads weights, level 2 also saves buffers.

1

wake_up

wake_up(tags: list[str] | None = None) -> bool

Wake up the worker from sleep mode.

Re-activates the memory allocator for the specified tags and restores model buffers from CPU back to GPU if they were saved during Level 2 sleep.

Parameters:

Name Type Description Default
tags list[str] | None

List of memory pool tags to re-activate (e.g., ["weights"] to match Level 1 sleep). If None, all pools are re-activated.

None

WorkerProc

Wrapper that runs one Worker in a separate process.

context instance-attribute

context = Context(io_threads=2)

gpu_id instance-attribute

gpu_id = gpu_id

mq instance-attribute

mq = create_from_handle(broadcast_handle, gpu_id)

od_config instance-attribute

od_config = od_config

result_mq instance-attribute

result_mq = None

result_mq_handle instance-attribute

result_mq_handle = None

wake_event instance-attribute

wake_event = wake_event

worker instance-attribute

worker = _create_worker(
    gpu_id,
    od_config,
    worker_extension_cls,
    custom_pipeline_args,
)

execute_rpc

execute_rpc(
    rpc_request: dict,
) -> tuple[object | None, bool]

Execute an RPC request and indicate whether to reply.

recv_message

recv_message()

Receive messages from broadcast queue.

return_result

return_result(output: Any)

Reply to client, only on rank 0.

worker_busy_loop

worker_busy_loop() -> None

Main busy loop for Multiprocessing Workers.

worker_main staticmethod

worker_main(
    rank: int,
    od_config: OmniDiffusionConfig,
    pipe_writer: Connection,
    broadcast_handle,
    wake_event: Event,
    worker_extension_cls: str | None = None,
    custom_pipeline_args: dict[str, Any] | None = None,
) -> None

Worker initialization and execution loops.

WorkerWrapperBase

Wrapper base class that creates DiffusionWorker with optional worker_extension_cls support. This enables dynamic inheritance for DiffusionWorker to extend with custom functionality.

base_worker_class instance-attribute

base_worker_class = base_worker_class

custom_pipeline_args instance-attribute

custom_pipeline_args = custom_pipeline_args

gpu_id instance-attribute

gpu_id = gpu_id

od_config instance-attribute

od_config = od_config

worker instance-attribute

worker = worker_class(
    local_rank=gpu_id,
    rank=gpu_id,
    od_config=od_config,
    skip_load_model=custom_pipeline_args is not None,
)

worker_extension_cls instance-attribute

worker_extension_cls = worker_extension_cls

execute_method

execute_method(method: str | bytes, *args, **kwargs) -> Any

Execute a method on the worker.

Parameters:

Name Type Description Default
method str | bytes

Method name (str) or serialized callable (bytes)

required

Returns:

Type Description
Any

Result of the method execution (type depends on the method)

Raises:

Type Description
Exception

If method execution fails

execute_model

execute_model(
    reqs: list[OmniDiffusionRequest],
    od_config: OmniDiffusionConfig,
) -> DiffusionOutput

Execute a forward pass.

Parameters:

Name Type Description Default
reqs list[OmniDiffusionRequest]

List of diffusion requests

required
od_config OmniDiffusionConfig

OmniDiffusionConfig configuration

required

Returns:

Type Description
DiffusionOutput

DiffusionOutput with generated results

execute_stepwise

execute_stepwise(
    scheduler_output: DiffusionSchedulerOutput,
) -> BaseRunnerOutput

Execute one diffusion step.

generate

generate(
    requests: list[OmniDiffusionRequest],
) -> DiffusionOutput

Generate output for the given requests.

Parameters:

Name Type Description Default
requests list[OmniDiffusionRequest]

List of diffusion requests

required

Returns:

Type Description
DiffusionOutput

DiffusionOutput with generated results

handle_sleep_task

handle_sleep_task(task)

handle_wake_task

handle_wake_task(task)

load_weights

load_weights(
    weights: Iterable[tuple[str, Tensor]],
) -> set[str]

Load model weights.

Parameters:

Name Type Description Default
weights Iterable[tuple[str, Tensor]]

Iterable of (name, tensor) tuples

required

Returns:

Type Description
set[str]

Set of loaded weight names

shutdown

shutdown() -> None

Shutdown the worker and cleanup resources.

sleep

sleep(level: int = 1) -> bool

Put the worker to sleep. The worker should not process any requests. The caller should guarantee that no requests are being processed during the sleep period, before wake_up is called.

Parameters:

Name Type Description Default
level int

The sleep level. Level 1 sleep will offload the model weights and discard the kv cache. Currently only support level 1.

1

Returns:

Type Description
bool

True on success

wake_up

wake_up(tags: list[str] | None = None) -> bool

Wake up the worker from sleep mode. See the sleep function method for more details.

Parameters:

Name Type Description Default
tags list[str] | None

An optional list of tags to reallocate the worker memory for specific memory allocations. Values must be in ("weights"). If None, all memory is reallocated. wake_up should be called with all tags (or None) before the worker is used again.

None

Returns:

Type Description
bool

True on success