Diffusion Module Architecture Design¶
The vLLM-Omni diffusion module (vllm_omni/diffusion) is a high-performance inference engine for diffusion models, designed with a modular architecture that separates concerns across multiple components. It provides efficient execution for non-autoregressive generation tasks such as image and video generation.
This document describes the architecture design of the diffusion module, including the diffusion engine, scheduler, worker, diffusion pipeline, and acceleration components.
Main Components of the Diffusion Module
Table of Content:
- Architecture Overview
- Diffusion Engine
- Scheduler
- Worker
- Diffusion Pipeline
- Acceleration Components
- Data Flow
Architecture Overview¶
The diffusion module follows a multi-process, distributed architecture with clear separation of concerns:
Diffusion Architecture Overview
1. Diffusion Engine¶
Location: vllm_omni/diffusion/diffusion_engine.py
Responsibilities¶
The DiffusionEngine is the orchestrator of the diffusion inference system. It manages the lifecycle of worker processes and coordinates the execution flow.
Key Components¶
1.1 Initialization¶
class DiffusionEngine:
def __init__(self, od_config: OmniDiffusionConfig):
self.od_config = od_config
self.post_process_func = get_diffusion_post_process_func(od_config)
self.pre_process_func = get_diffusion_pre_process_func(od_config)
self._processes: list[mp.Process] = []
self._make_client()
Key Features:
-
Pre/Post Processing: Registers model-specific pre-processing and post-processing functions via registry pattern
-
Worker Management: Launches and manages multiple worker processes (one per GPU)
-
Process Isolation: Uses multiprocessing for true parallelism
1.2 Worker Launch Process¶
The engine launches workers using a spawn method:
def _launch_workers(self, broadcast_handle):
# Creates one process per GPU
for i in range(num_gpus):
process = mp.Process(
target=worker_proc.worker_main,
args=(i, od_config, writer, broadcast_handle),
name=f"DiffusionWorker-{i}",
)
process.start()
Design Decisions:
-
Spawn Method: Ensures clean state for each worker (no shared memory issues)
-
Pipe Communication: Uses
mp.Pipefor initialization handshake -
Device Selection: Each worker is assigned a specific GPU (
cuda:{rank})
1.3 Request Processing Flow¶
def step(self, requests: list[OmniDiffusionRequest]):
# 1. Pre-process requests
requests = self.pre_process_func(requests)
# 2. Send to scheduler and wait for response
output = self.add_req_and_wait_for_response(requests)
# 3. Post-process results
result = self.post_process_func(output.output)
return result
Flow:
-
Pre-processing: Applies model-specific transformations
-
Scheduling: Delegates to scheduler for distribution
-
Post-processing: Converts raw outputs to final format (e.g., PIL images)
2. Scheduler¶
Location: vllm_omni/diffusion/sched/
Architecture¶
The scheduler is a request-state scheduler. It owns request lifecycle management and scheduling decisions, while execution stays in DiffusionEngine and the executor.
Key Components¶
2.1 Scheduler Interface¶
class SchedulerInterface(ABC):
def add_request(self, request: OmniDiffusionRequest) -> str: ...
def schedule(self) -> DiffusionSchedulerOutput: ...
def update_from_output(
self,
sched_output: DiffusionSchedulerOutput,
output: DiffusionOutput,
) -> set[str]: ...
Responsibilities:
-
Lifecycle contract: Defines how the engine adds requests, triggers one scheduling cycle, and feeds executor results back.
-
Stable boundary:
DiffusionSchedulerOutputis the only scheduling result consumed byDiffusionEngine. -
Pluggability: Different scheduler policies can reuse the same engine integration path.
2.2 Request State Model¶
class DiffusionRequestStatus(enum.IntEnum):
WAITING = ...
RUNNING = ...
PREEMPTED = ...
FINISHED_COMPLETED = ...
FINISHED_ABORTED = ...
FINISHED_ERROR = ...
@dataclass
class DiffusionRequestState:
request_id: str
req: OmniDiffusionRequest
status: DiffusionRequestStatus = DiffusionRequestStatus.WAITING
Design Features:
-
Explicit lifecycle: Requests move through waiting, running, optional preemption, and terminal states.
-
Centralized error handling: Completion, abort, and error states are all normalized in the scheduler layer.
2.3 Shared Bookkeeping in _BaseScheduler¶
class _BaseScheduler(SchedulerInterface):
def __init__(self) -> None:
self._request_states = {}
self._waiting = deque()
self._running = []
self._finished_req_ids = set()
self.max_num_running_reqs = 1
Design Features:
-
Common state storage: Shared request maps and waiting/running sets live in the base class.
-
Shared cleanup logic: Duplicate-request checks, finish handling, and state removal are centralized instead of duplicated in each policy.
-
Current constraint boundary:
_BaseSchedulerderivesmax_num_running_reqsfrommax_num_seqs, but request-mode diffusion is still clamped back to1by the engine. The step-wise path can keep this above1for compatible-request batching.
2.4 Current RequestScheduler Policy¶
class RequestScheduler(_BaseScheduler):
def schedule(self) -> DiffusionSchedulerOutput:
# 1. keep existing RUNNING requests in the scheduling result
# 2. pull WAITING requests while capacity remains
# 3. move newly admitted requests into RUNNING
Behavior:
-
FIFO request scheduling: Waiting requests are promoted in queue order.
-
Single-request admission:
RequestSchedulerstill admits one active request at a time because request-mode execution completes a whole request per dispatch. -
Executor result feedback:
update_from_output()converts executor output intoFINISHED_COMPLETEDorFINISHED_ERRORand returns finished request ids.
2.5 Engine-Driven Execution Loop¶
request_id = scheduler.add_request(request)
while True:
sched_output = scheduler.schedule()
output = executor.add_req(req)
finished_req_ids = scheduler.update_from_output(sched_output, output)
Design Decisions:
-
Separation of concerns: Scheduler manages state and policy; executor handles runtime execution.
-
No scheduler-owned IPC: Scheduler no longer talks to workers directly.
-
Split concurrency model: Request-mode diffusion remains single-active-request, while the step-wise path can keep multiple compatible requests running and advance them independently between denoise steps.
3. Worker¶
Location: vllm_omni/diffusion/worker/gpu_worker.py
Architecture¶
Workers are independent processes that execute the actual model inference. Each worker runs on a dedicated GPU and participates in distributed inference.
Key Components¶
3.1 Worker Process Structure¶
class WorkerProc:
def __init__(self, od_config, gpu_id, broadcast_handle):
# Initialize ZMQ context for IPC
self.context = zmq.Context(io_threads=2)
# Connect to broadcast queue (receive requests)
self.mq = MessageQueue.create_from_handle(broadcast_handle, gpu_id)
# Create result queue (only rank 0)
if gpu_id == 0:
self.result_mq = MessageQueue(n_reader=1, ...)
# Initialize GPU worker
self.worker = GPUWorker(local_rank=gpu_id, rank=gpu_id, od_config=od_config)
Initialization Steps:
-
IPC Setup: Creates ZMQ context and message queues
-
Distributed Environment Setup: Initializes PyTorch distributed communication
-
For CUDA GPUs: Uses NCCL (fast GPU communication)
-
For NPU: Uses HCCL (Huawei Collective Communications Library)
-
For other devices: Uses appropriate backend (GLOO, MCCL, etc.)
-
-
Model Loading: Loads diffusion pipeline on assigned GPU
-
Cache Setup: Enables cache backend if configured.
3.2 GPU Worker¶
class GPUWorker:
def init_device_and_model(self):
# Set distributed environment variables
os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)
# Initialize PyTorch distributed
init_distributed_environment(world_size, rank)
parallel_config = self.od_config.parallel_config
initialize_model_parallel(
data_parallel_size=parallel_config.data_parallel_size,
cfg_parallel_size=parallel_config.cfg_parallel_size,
sequence_parallel_size=parallel_config.sequence_parallel_size,
tensor_parallel_size=parallel_config.tensor_parallel_size,
pipeline_parallel_size=parallel_config.pipeline_parallel_size,
)
# Load model
model_loader = DiffusersPipelineLoader(load_config)
self.pipeline = model_loader.load_model(od_config, load_device=f"cuda:{rank}")
# Setup cache backend
from vllm_omni.diffusion.cache.selector import get_cache_backend
self.cache_backend = get_cache_backend(od_config.cache_backend, od_config.cache_config)
if self.cache_backend is not None:
self.cache_backend.enable(self.pipeline)
Key Features:
-
Tensor Parallelism: Supports multi-GPU tensor parallelism via PyTorch distributed
-
Model Loading: Uses
DiffusersPipelineLoaderfor efficient weight loading -
Cache Integration: Enables cache backends (TeaCache, cache-dit, etc.) transparently
3.3 Worker Busy Loop¶
def worker_busy_loop(self):
while self._running:
# 1. Receive unified message (generation request, RPC request, or shutdown)
msg = self.recv_message()
# 2. Route message based on type
if isinstance(msg, dict) and msg.get("type") == "rpc":
# Handle RPC request
result, should_reply = self.execute_rpc(msg)
if should_reply:
self.return_result(result)
elif isinstance(msg, dict) and msg.get("type") == "shutdown":
# Handle shutdown message
self._running = False
else:
# Handle generation request (OmniDiffusionRequest list)
output = self.worker.execute_model(msg, self.od_config)
self.return_result(output)
Execution Flow:
-
Receive: Dequeues unified messages from shared memory queue
-
Route: Handles different message types (generation, RPC, shutdown)
-
Execute: Runs forward pass through pipeline for generation requests
-
Respond: Sends results back (rank 0 for generation, specified rank for RPC)
3.4 Model Execution¶
@torch.inference_mode()
def execute_model(self, reqs: list[OmniDiffusionRequest], od_config):
req = reqs[0] # TODO: support batching
# Refresh cache backend if enabled
if self.cache_backend is not None and self.cache_backend.is_enabled():
self.cache_backend.refresh(self.pipeline, req.num_inference_steps)
# Set forward context for parallelism
with set_forward_context(
vllm_config=self.vllm_config,
omni_diffusion_config=self.od_config
):
output = self.pipeline.forward(req)
return output
The model execution leverages multiple parallelism strategies that are transparently applied during the forward pass. The set_forward_context() context manager makes parallel group information available throughout the forward pass:
# Inside transformer layers, parallel groups are accessed via:
from vllm_omni.diffusion.distributed.parallel_state import (
get_sp_group, get_dp_group, get_cfg_group, get_pp_group
)
Optimizations:
-
Cache Refresh: Clears cache state before each generation for clean state
-
Context Management: Forward context ensures parallel groups are available during execution
-
Single Request: Currently processes one request at a time (batching TODO)
4. Diffusion Pipeline¶
Location: vllm_omni/diffusion/models/*/pipeline_*.py
The pipeline is the model-specific implementation that orchestrates the diffusion process. Different models (QwenImage, Wan2.2, Z-Image) have their own pipeline implementations.
Most pipeline implementation are referred from diffusers. The multi-step diffusion loop is usually the most time-consuming part during the overall inference process, which is defined by the diffuse function in the pipeline class. An example is as follows:
def diffuse(self, ...):
for i, t in enumerate(timesteps):
# Forward pass for positive prompt
transformer_kwargs = {
"hidden_states": latents,
"timestep": timestep / 1000,
"encoder_hidden_states": prompt_embeds,
}
noise_pred = self.transformer(**transformer_kwargs)[0]
# Forward pass for negative prompt (CFG)
if do_true_cfg:
neg_transformer_kwargs = {...}
neg_transformer_kwargs["cache_branch"] = "negative"
neg_noise_pred = self.transformer(**neg_transformer_kwargs)[0]
# Combine predictions
comb_pred = neg_noise_pred + true_cfg_scale * (noise_pred - neg_noise_pred)
noise_pred = comb_pred * (cond_norm / noise_norm)
# Scheduler step
latents = self.scheduler.step(noise_pred, t, latents)[0]
return latents
Key Features:
-
CFG Support: Handles classifier-free guidance with separate forward passes
-
Cache Branching: Uses
cache_branchparameter for cache-aware execution -
True CFG: Implements advanced CFG with norm preservation
To learn more about the diffusion pipeline and how to add a new diffusion pipeline, please view Adding Diffusion Model
5. Acceleration Components¶
5.1 Attention Backends¶
Location: vllm_omni/diffusion/attention/
Architecture¶
The attention system uses a role-aware backend selector pattern. Each Attention site declares a semantic role ("self", "cross", or a model-specific string). The selector consults the user's AttentionConfig to pick a backend per role and falls back to the platform default when nothing is configured.
Backend Selection¶
Location: vllm_omni/diffusion/attention/selector.py
class Attention(nn.Module):
def __init__(self, num_heads, head_size, causal, softmax_scale, *,
role="self", role_category=None, ...):
# Resolve backend for this role from the active OmniDiffusionConfig
config = get_current_diffusion_config_or_none()
attention_config = config.attention_config if config is not None else None
attn_backend_cls, spec = get_attn_backend_for_role(
role=role,
head_size=head_size,
attention_config=attention_config,
role_category=role_category,
)
self.attn_impl_cls = attn_backend_cls.get_impl_cls()
self.attention = self.attn_impl_cls(..., backend_kwargs=spec.extra if spec else None)
Available Backends:
-
FlashAttention (
FLASH_ATTN): Memory-efficient kernel that dispatches to FA2/FA3 on CUDA, AITER on ROCm, andmindiesdon Ascend NPU. -
SDPA (
TORCH_SDPA): PyTorch's scaled dot-product attention — cross-platform fallback. -
SageAttention (
SAGE_ATTN): Quantized attention implementation from the SageAttention library.
These backends provide the kernel implementations for attention computation. For attention-level sequence parallelism strategies (Ring Attention, Ulysses), see Parallel Attention.
Backend Selection Mechanism¶
Selection is driven by AttentionConfig on OmniDiffusionConfig, which carries a global default plus a per_role map of AttentionSpec entries. A role string is resolved in this order:
def get_attn_backend_for_role(role, head_size, attention_config=None, role_category=None):
# 1. attention_config.per_role[role] — exact match
# 2. attention_config.per_role[role_category] — category fallback
# 3. attention_config.default — global default
# 4. Platform default — hardware-specific
if attention_config is not None:
spec, source = attention_config.resolve_with_source(
role=role, role_category=role_category,
)
if spec is not None:
return load_backend(spec.backend), spec
return current_omni_platform.get_diffusion_attn_backend_cls(...), None
Selection Priority:
-
Per-role override (
--diffusion-attention-config.per_role.<role>.backend) — finest control; matched on the layer's exactrolestring. -
Role category fallback (
--diffusion-attention-config.per_role.<category>.backend) — used when an exact match is missing and the layer declaredrole_category(e.g. a"mymodel.audio_to_video"site can fall back to"cross"). -
Global default (
--diffusion-attention-backend FLASH_ATTN, or--diffusion-attention-config.default.backend, orDIFFUSION_ATTENTION_BACKENDenv var). -
Platform default —
current_omni_platform.get_diffusion_attn_backend_cls(...)picks the best-available kernel for the hardware.
AttentionSpec.extra is forwarded to the backend constructor as backend_kwargs, so backend-specific parameters (e.g. SparseBlock block_size) can be set without changing model code.
For the user-facing CLI surface, see Diffusion Attention Backends. For the role declaration contract on the model side, see Adding a Diffusion Model.
Backend Availability:
-
SDPA: Always available (PyTorch built-in)
-
FlashAttention: Requires
flash-attnon CUDA,aiteron ROCm, ormindiesdon Ascend NPU -
SageAttention: Requires
sage-attentionpackage (from THU-ML GitHub)
Attention Backend Registry¶
Location: vllm_omni/diffusion/attention/selector.py + vllm_omni/platforms/<device>/platform.py
Backend resolution is delegated to the active platform: current_omni_platform.get_diffusion_attn_backend_cls(selected_backend, head_size) returns a fully-qualified class path that _load_backend_cls imports lazily, with the result cached by (backend_name, head_size).
@cache
def _cached_get_backend_cls(backend_name: str | None, head_size: int) -> type[AttentionBackend]:
backend_cls_path = current_omni_platform.get_diffusion_attn_backend_cls(
selected_backend=backend_name,
head_size=head_size,
)
return _load_backend_cls(backend_cls_path)
Each platform (cuda, rocm, xpu, musa, npu) maps backend names like "FLASH_ATTN", "TORCH_SDPA", "SAGE_ATTN" to the right backend class path for that hardware, including head-size compatibility checks. Passing selected_backend=None lets the platform pick its own default.
Attention Backend Integration¶
The Attention layer integrates backends through a unified interface. Here's how FlashAttentionBackend is integrated as an example:
# attention/backends/flash_attn.py
class FlashAttentionBackend(AttentionBackend):
@staticmethod
def get_name() -> str:
return "FLASH_ATTN"
@staticmethod
def get_impl_cls() -> type["FlashAttentionImpl"]:
return FlashAttentionImpl
@staticmethod
def get_supported_head_sizes() -> list[int]:
return [64, 96, 128, 192, 256] # FlashAttention supports these head sizes
class FlashAttentionImpl(AttentionImpl):
def __init__(self, num_heads, head_size, softmax_scale, causal, ...):
self.num_heads = num_heads
self.causal = causal
self.softmax_scale = softmax_scale
def forward(self, query, key, value, attn_metadata=None):
# Call FlashAttention kernel
out = flash_attn_func(
query, key, value,
causal=self.causal,
softmax_scale=self.softmax_scale,
)
return out
5.2 Parallel Attention¶
Location: vllm_omni/diffusion/attention/parallel/
Architecture¶
Parallel attention strategies implement Sequence Parallelism (SP) at the attention layer level. These strategies distribute attention computation across multiple GPUs by splitting the sequence dimension, using different communication patterns. They work on top of AttentionBackend implementations (FlashAttention, SDPA, etc.), handling the parallelization/communication while the backends handle the actual attention computation.
Key Distinction: Unlike AttentionBackend (which provides kernel implementations), ParallelAttentionStrategy provides communication patterns for multi-GPU attention parallelism. These strategies implement the ParallelAttentionStrategy interface and use AttentionBackend implementations internally.
Both Ring Attention and Ulysses are forms of Sequence Parallelism (SP) that:
-
Split the sequence dimension across GPUs
-
Contribute to
sequence_parallel_size(viaring_degreeandulysses_degree) -
Work at the attention layer level (not model/pipeline level)
Ulysses Sequence Parallelism (USP)¶
Location: vllm_omni/diffusion/attention/parallel/ulysses.py
USP is a sequence-parallel attention strategy that splits attention computation across multiple GPUs by distributing both the sequence dimension and attention heads. It uses all-to-all communication to efficiently parallelize attention for very long sequences. Specifically, it uses all-to-all collective operations to redistribute Q/K/V tensors before attention computation and gather results afterward.
Ulysses splits attention computation in two dimensions:
-
Sequence Dimension: Splits the sequence length across GPUs
-
Head Dimension: Splits attention heads across GPUs
Configuration: ulysses_degree contributes to sequence_parallel_size
Ring Sequence Parallelism¶
Location: vllm_omni/diffusion/attention/parallel/ring.py
Ring Attention is a parallel attention strategy that implements sequence parallelism using ring-based point-to-point (P2P) communication. Unlike attention backends that provide the attention kernel implementation, Ring Attention is a communication pattern that works on top of attention backends (FlashAttention or SDPA).
Ring Attention splits sequence dimension across GPUs in a ring topology, implemented via the ParallelAttentionStrategy interface, instead of AttentionBackend. P2P ring communication is applied to circulate Key/Value blocks across GPUs. Internally, ring_flash_attn_func or ring_pytorch_attn_func is used depending on available backends.
Architecture:
class RingParallelAttention:
"""Ring sequence-parallel strategy."""
def run_attention(self, query, key, value, attn_metadata, ...):
# Selects underlying attention kernel (FlashAttention or SDPA)
if backend_pref == "sdpa":
return ring_pytorch_attn_func(...) # Uses SDPA kernel
else:
return ring_flash_attn_func(...) # Uses FlashAttention kernel
Integration:
-
Ring Attention is activated when
ring_degree > 1in parallel config -
It's selected by
build_parallel_attention_strategy()in the attention layer -
The
Attentionlayer routes to_run_ring_attention()when Ring is enabled -
Works alongside attention backends: Ring handles communication, backends handle computation
Configuration: ring_degree contributes to sequence_parallel_size
Relationship with AttentionBackend¶
Parallel attention strategies (Ring, Ulysses) work on top of AttentionBackend implementations:
-
They use AttentionBackend for the actual attention computation (FlashAttention, SDPA, etc.)
-
They handle the multi-GPU communication/parallelization layer
-
They implement
ParallelAttentionStrategyinterface (notAttentionBackend)
For general parallelism strategies (Data Parallelism, Tensor Parallelism, Pipeline Parallelism), see Parallel Strategies.
5.3 Cache Backends¶
Location: vllm_omni/diffusion/cache/
Architecture¶
Cache backends provide a unified interface for applying different caching strategies to accelerate diffusion inference. The system supports multiple backends (TeaCache, cache-dit) with a consistent API for enabling and refreshing cache state.
Cache Backend Interface¶
class CacheBackend(ABC):
def __init__(self, config: DiffusionCacheConfig):
self.config = config
self.enabled = False
@abstractmethod
def enable(self, pipeline: Any) -> None:
"""Enable cache on the pipeline."""
raise NotImplementedError
@abstractmethod
def refresh(self, pipeline: Any, num_inference_steps: int, verbose: bool = True) -> None:
"""Refresh cache state for new generation."""
raise NotImplementedError
def is_enabled(self) -> bool:
"""Check if cache is enabled."""
return self.enabled
Design Pattern:
-
Abstract Base Class: Defines contract for all cache backends
-
Pipeline-based: Works with pipeline instances (not just transformers)
-
State Management: Provides refresh mechanism for clean state between generations
Available Backends¶
1. TeaCache Backend
Location: vllm_omni/diffusion/cache/teacache/backend.py
class TeaCacheBackend(CacheBackend):
def enable(self, pipeline: Any):
# Extract transformer from pipeline
transformer = pipeline.transformer
transformer_type = transformer.__class__.__name__
# Create TeaCacheConfig from DiffusionCacheConfig
teacache_config = TeaCacheConfig(
transformer_type=transformer_type,
rel_l1_thresh=self.config.rel_l1_thresh,
coefficients=self.config.coefficients,
)
# Apply hooks to transformer
apply_teacache_hook(transformer, teacache_config)
self.enabled = True
def refresh(self, pipeline: Any, num_inference_steps: int, verbose: bool = True):
transformer = pipeline.transformer
if hasattr(transformer, "_hook_registry"):
transformer._hook_registry.reset_hook(TeaCacheHook._HOOK_NAME)
TeaCache Features:
-
Timestep-aware: Caches based on timestep embedding similarity
-
Adaptive: Dynamically decides when to reuse cached computations
-
CFG-aware: Handles positive/negative branches separately
-
Custom Hook System: Uses a custom forward interception mechanism (via
HookRegistry) that wraps the module'sforwardmethod, allowing transparent integration without modifying model code
2. Cache-DiT Backend
Location: vllm_omni/diffusion/cache/cache_dit_backend.py
class CacheDiTBackend(CacheBackend):
def enable(self, pipeline: Any):
# Uses cache-dit library for acceleration
# Supports DBCache, SCM (Step Computation Masking), TaylorSeer
# Works with single and dual-transformer architectures
...
self.enabled = True
def refresh(self, pipeline: Any, num_inference_steps: int, verbose: bool = True):
# Updates cache context with new num_inference_steps
...
Cache-DiT Features:
-
DBCache: Dynamic block caching with configurable compute blocks
-
SCM: Step Computation Masking for additional speedup
-
TaylorSeer: Advanced calibration for cache accuracy
-
Dual-transformer Support: Handles models like Wan2.2 with two transformers
Cache Backend Selector¶
Location: vllm_omni/diffusion/cache/selector.py
def get_cache_backend(
cache_backend: str | None,
cache_config: dict | DiffusionCacheConfig
) -> CacheBackend | None:
"""Get cache backend instance based on cache_backend string.
Args:
cache_backend: Cache backend name ("cache_dit", "tea_cache", or None)
cache_config: Cache configuration (dict or DiffusionCacheConfig)
Returns:
Cache backend instance or None if cache_backend is "none"
"""
if cache_backend is None or cache_backend == "none":
return None
if isinstance(cache_config, dict):
cache_config = DiffusionCacheConfig.from_dict(cache_config)
if cache_backend == "cache_dit":
return CacheDiTBackend(cache_config)
elif cache_backend == "tea_cache":
return TeaCacheBackend(cache_config)
else:
raise ValueError(f"Unsupported cache backend: {cache_backend}")
Usage Flow:
-
Selection:
get_cache_backend()returns appropriate backend instance -
Enable:
backend.enable(pipeline)called during worker initialization -
Refresh:
backend.refresh(pipeline, num_inference_steps)called before each generation -
Check:
backend.is_enabled()verifies cache is active
5.4 Parallel Strategies¶
Location: vllm_omni/diffusion/distributed/parallel_state.py
Parallelism Types¶
The system supports multiple orthogonal parallelism strategies:
Sequence Parallelism (SP)
-
Purpose: Split sequence dimension across GPUs
-
Attention-level SP: Ring Attention and Ulysses (USP) implement SP at the attention layer level
-
See Parallel Attention for details
-
Configuration:
ulysses_degree×ring_degree=sequence_parallel_size
-
-
Use Case: Very long sequences (e.g., high-resolution images)
Data Parallelism (DP)
-
Purpose: Replicate model across GPUs, split batch
-
Use Case: Batch processing, throughput optimization
Tensor Parallelism (TP) (Experimental)
-
Purpose: Split model weights across GPUs
-
Implementation: Uses vLLM's tensor parallel groups
-
Use Case: Large models that don't fit on single GPU
CFG Parallelism (under development)
-
Purpose: Parallelize Classifier-Free Guidance (positive/negative prompts)
-
Infrastructure: CFG parallel groups are initialized and available via
get_cfg_group()
Parallel Group Management¶
def initialize_model_parallel(
data_parallel_size: int = 1,
cfg_parallel_size: int = 1,
sequence_parallel_size: int | None = None,
ulysses_degree: int = 1,
ring_degree: int = 1,
tensor_parallel_size: int = 1,
pipeline_parallel_size: int = 1,
vae_parallel_size: int = 0,
):
# Generate orthogonal parallel groups
rank_generator = RankGenerator(
tensor_parallel_size,
sequence_parallel_size,
pipeline_parallel_size,
cfg_parallel_size,
data_parallel_size,
"tp-sp-pp-cfg-dp",
)
# Initialize each parallel group
_DP = init_model_parallel_group(rank_generator.get_ranks("dp"), ...)
_CFG = init_model_parallel_group(rank_generator.get_ranks("cfg"), ...)
_SP = init_model_parallel_group(rank_generator.get_ranks("sp"), ...)
_PP = init_model_parallel_group(rank_generator.get_ranks("pp"), ...)
_TP = init_model_parallel_group(rank_generator.get_ranks("tp"), ...)
Rank Order: tp-sp-pp-cfg-dp (tensor → sequence → pipeline → cfg → data)
Note: For attention-level Sequence Parallelism implementations (Ring Attention and Ulysses), see Parallel Attention. This section covers higher-level parallelism strategies.
6. Data Flow¶
Complete Request Flow¶
End-to-end Data Flow in the vLLM-Omni Diffusion Module
1. User Request
└─> OmniDiffusion.generate(prompt)
└─> Prepare OmniDiffusionRequest
└─> DiffusionEngine.step(requests)
2. Pre-processing
└─> pre_process_func(requests)
└─> Model-specific transformations
3. Scheduling
└─> scheduler.add_request(request)
└─> scheduler.schedule()
└─> DiffusionEngine submits scheduled request to executor.add_req(req)
4. Worker Execution
└─> WorkerProc.worker_busy_loop()
└─> GPUWorker.execute_model(reqs)
└─> Pipeline.forward(req)
├─> encode_prompt()
├─> prepare_latents()
├─> diffuse() [loop]
│ ├─> transformer.forward() [with cache backend hooks]
│ └─> scheduler.step()
└─> vae.decode()
5. Result Collection
└─> Executor returns DiffusionOutput
└─> scheduler.update_from_output(...)
└─> DiffusionEngine pops finished request state
6. Post-processing
└─> post_process_func(output)
└─> Convert to PIL images / final format