vllm_omni.diffusion.worker.diffusion_worker ¶
Diffusion Worker for vLLM-Omni.
Handles GPU infrastructure initialization and delegates model operations to DiffusionModelRunner.
CustomPipelineWorkerExtension ¶
DiffusionWorker ¶
A worker that manages GPU infrastructure and delegates to the model runner.
This class handles infrastructure initialization only: - Device setup (CUDA device selection) - Distributed environment (NCCL, model parallel) - Memory management (sleep/wake)
All model-related operations (loading, compilation, execution) are delegated to DiffusionModelRunner.
model_runner instance-attribute ¶
model_runner: DiffusionModelRunner | None = (
model_runner_cls(
vllm_config=vllm_config,
od_config=od_config,
device=device,
)
)
execute_model ¶
execute_model(
req: OmniDiffusionRequest,
od_config: OmniDiffusionConfig,
) -> DiffusionOutput
Execute a forward pass by delegating to the model runner.
execute_stepwise ¶
execute_stepwise(
scheduler_output: DiffusionSchedulerOutput,
) -> BaseRunnerOutput
Execute one diffusion step by delegating to the model runner.
generate ¶
generate(request: OmniDiffusionRequest) -> DiffusionOutput
Generate output for the given requests.
load_model ¶
load_model(
load_format: str = "default",
custom_pipeline_name: str | None = None,
**kwargs,
) -> None
Load the diffusion model using DiffusionModelRunner.
profile ¶
sleep ¶
Put the worker to sleep, offloading model weights.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level | int | Sleep level. Level 1 offloads weights, level 2 also saves buffers. | 1 |
wake_up ¶
Wake up the worker from sleep mode.
Re-activates the memory allocator for the specified tags and restores model buffers from CPU back to GPU if they were saved during Level 2 sleep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tags | list[str] | None | List of memory pool tags to re-activate (e.g., ["weights"] to match Level 1 sleep). If None, all pools are re-activated. | None |
WorkerProc ¶
Wrapper that runs one Worker in a separate process.
worker instance-attribute ¶
execute_rpc ¶
Execute an RPC request and indicate whether to reply.
worker_main staticmethod ¶
worker_main(
rank: int,
od_config: OmniDiffusionConfig,
pipe_writer: Connection,
broadcast_handle,
wake_event: Event,
worker_extension_cls: str | None = None,
custom_pipeline_args: dict[str, Any] | None = None,
) -> None
Worker initialization and execution loops.
WorkerWrapperBase ¶
Wrapper base class that creates DiffusionWorker with optional worker_extension_cls support. This enables dynamic inheritance for DiffusionWorker to extend with custom functionality.
worker instance-attribute ¶
worker = worker_class(
local_rank=gpu_id,
rank=gpu_id,
od_config=od_config,
skip_load_model=custom_pipeline_args is not None,
)
execute_method ¶
Execute a method on the worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
method | str | bytes | Method name (str) or serialized callable (bytes) | required |
Returns:
| Type | Description |
|---|---|
Any | Result of the method execution (type depends on the method) |
Raises:
| Type | Description |
|---|---|
Exception | If method execution fails |
execute_model ¶
execute_model(
reqs: list[OmniDiffusionRequest],
od_config: OmniDiffusionConfig,
) -> DiffusionOutput
Execute a forward pass.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reqs | list[OmniDiffusionRequest] | List of diffusion requests | required |
od_config | OmniDiffusionConfig | OmniDiffusionConfig configuration | required |
Returns:
| Type | Description |
|---|---|
DiffusionOutput | DiffusionOutput with generated results |
execute_stepwise ¶
execute_stepwise(
scheduler_output: DiffusionSchedulerOutput,
) -> BaseRunnerOutput
Execute one diffusion step.
generate ¶
generate(
requests: list[OmniDiffusionRequest],
) -> DiffusionOutput
Generate output for the given requests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
requests | list[OmniDiffusionRequest] | List of diffusion requests | required |
Returns:
| Type | Description |
|---|---|
DiffusionOutput | DiffusionOutput with generated results |
load_weights ¶
sleep ¶
Put the worker to sleep. The worker should not process any requests. The caller should guarantee that no requests are being processed during the sleep period, before wake_up is called.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level | int | The sleep level. Level 1 sleep will offload the model weights and discard the kv cache. Currently only support level 1. | 1 |
Returns:
| Type | Description |
|---|---|
bool | True on success |
wake_up ¶
Wake up the worker from sleep mode. See the sleep function method for more details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tags | list[str] | None | An optional list of tags to reallocate the worker memory for specific memory allocations. Values must be in | None |
Returns:
| Type | Description |
|---|---|
bool | True on success |