vllm_omni.diffusion.worker ¶
Worker classes for diffusion models.
Modules:
| Name | Description |
|---|---|
diffusion_model_runner | Diffusion Model Runner for vLLM-Omni. |
diffusion_worker | Diffusion Worker for vLLM-Omni. |
input_batch | Diffusion input-batch structures following the MRV2-style vLLM layout. |
utils | Per-request mutable state for step-wise diffusion execution. |
DiffusionModelRunner ¶
Bases: OmniConnectorModelRunnerMixin
Model runner that handles model loading and execution for diffusion models.
This class follows the AR pattern where the Runner handles all model-related operations including loading, compilation, offloading, caching, and execution. The Worker only handles infrastructure (device, distributed env).
kv_transfer_manager instance-attribute ¶
kv_transfer_manager = OmniKVTransferManager.from_od_config(
od_config
)
clear_prompt_embed_cache ¶
Evict all cached text-encoder outputs (e.g. between training epochs).
execute_model ¶
execute_model(req: OmniDiffusionRequest) -> DiffusionOutput
Execute a forward pass for the given requests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
req | OmniDiffusionRequest | A diffusion request containing a list of prompts to process. | required |
Returns:
| Type | Description |
|---|---|
DiffusionOutput | DiffusionOutput with generated results. |
Note
We use torch.no_grad() for HSDP because HSDP2's fully_shard requires access to tensor version counters in pre_forward hooks, which inference tensors do not track. For non-HSDP inference, we use torch.inference_mode() for better performance.
execute_stepwise ¶
execute_stepwise(
scheduler_output: DiffusionSchedulerOutput,
) -> BatchRunnerOutput
Execute one step for one scheduled request and return runner output.
get_prompt_embed_cache_stats ¶
get_prompt_embed_cache_stats() -> dict | None
Return hit/miss statistics for the prompt-embedding cache, if enabled.
load_model ¶
load_model(
memory_pool_context_fn: callable | None = None,
load_format: str = "default",
custom_pipeline_name: str | None = None,
) -> None
Load the diffusion model, apply compilation and offloading.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
memory_pool_context_fn | callable | None | Optional function that returns a context manager for memory pool allocation (used for sleep mode). | None |
load_format | str | Format for loading model weights. Supported formats: - "default" (default): Automatically detect and use the default format based on configuration - "custom_pipeline": Init model from a custom pipeline class specified by | 'default' |
custom_pipeline_name | str | None | Optional custom pipeline class name to use. | None |
load_weights ¶
Load weights into the pipeline.
DiffusionWorker ¶
A worker that manages GPU infrastructure and delegates to the model runner.
This class handles infrastructure initialization only: - Device setup (CUDA device selection) - Distributed environment (NCCL, model parallel) - Memory management (sleep/wake)
All model-related operations (loading, compilation, execution) are delegated to DiffusionModelRunner.
model_runner instance-attribute ¶
model_runner: DiffusionModelRunner | None = (
model_runner_cls(
vllm_config=self.vllm_config,
od_config=self.od_config,
device=self.device,
)
)
execute_model ¶
execute_model(
req: OmniDiffusionRequest,
od_config: OmniDiffusionConfig,
) -> DiffusionOutput
Execute a forward pass by delegating to the model runner.
execute_stepwise ¶
execute_stepwise(
scheduler_output: DiffusionSchedulerOutput,
) -> BaseRunnerOutput
Execute one diffusion step by delegating to the model runner.
generate ¶
generate(request: OmniDiffusionRequest) -> DiffusionOutput
Generate output for the given requests.
load_model ¶
load_model(
load_format: str = "default",
custom_pipeline_name: str | None = None,
**kwargs,
) -> None
Load the diffusion model using DiffusionModelRunner.
profile ¶
sleep ¶
Put the worker to sleep, offloading model weights.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level | int | Sleep level. Level 1 offloads weights, level 2 also saves buffers. | 1 |
wake_up ¶
Wake up the worker from sleep mode.
Re-activates the memory allocator for the specified tags and restores model buffers from CPU back to GPU if they were saved during Level 2 sleep.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tags | list[str] | None | List of memory pool tags to re-activate (e.g., ["weights"] to match Level 1 sleep). If None, all pools are re-activated. | None |
WorkerProc ¶
Wrapper that runs one Worker in a separate process.
worker instance-attribute ¶
execute_rpc ¶
Execute an RPC request and indicate whether to reply.
worker_main staticmethod ¶
worker_main(
rank: int,
od_config: OmniDiffusionConfig,
pipe_writer: Connection,
broadcast_handle,
wake_event: Event,
worker_extension_cls: str | None = None,
custom_pipeline_args: dict[str, Any] | None = None,
) -> None
Worker initialization and execution loops.