Skip to content

Diffusion Module Architecture Design

The vLLM-Omni diffusion module (vllm_omni/diffusion) is a high-performance inference engine for diffusion models, designed with a modular architecture that separates concerns across multiple components. It provides efficient execution for non-autoregressive generation tasks such as image and video generation.

This document describes the architecture design of the diffusion module, including the diffusion engine, scheduler, worker, diffusion pipeline, and acceleration components.

vLLM-Omni Diffusion Module Components

Main Components of the Diffusion Module

Table of Content:


Architecture Overview

The diffusion module follows a multi-process, distributed architecture with clear separation of concerns:

vLLM-Omni Diffusion Module Architecture

Diffusion Architecture Overview


1. Diffusion Engine

Location: vllm_omni/diffusion/diffusion_engine.py

Responsibilities

The DiffusionEngine is the orchestrator of the diffusion inference system. It manages the lifecycle of worker processes and coordinates the execution flow.

Key Components

1.1 Initialization

class DiffusionEngine:
    def __init__(self, od_config: OmniDiffusionConfig):
        self.od_config = od_config
        self.post_process_func = get_diffusion_post_process_func(od_config)
        self.pre_process_func = get_diffusion_pre_process_func(od_config)
        self._processes: list[mp.Process] = []
        self._make_client()

Key Features:

  • Pre/Post Processing: Registers model-specific pre-processing and post-processing functions via registry pattern

  • Worker Management: Launches and manages multiple worker processes (one per GPU)

  • Process Isolation: Uses multiprocessing for true parallelism

1.2 Worker Launch Process

The engine launches workers using a spawn method:

def _launch_workers(self, broadcast_handle):
    # Creates one process per GPU
    for i in range(num_gpus):
        process = mp.Process(
            target=worker_proc.worker_main,
            args=(i, od_config, writer, broadcast_handle),
            name=f"DiffusionWorker-{i}",
        )
        process.start()

Design Decisions:

  • Spawn Method: Ensures clean state for each worker (no shared memory issues)

  • Pipe Communication: Uses mp.Pipe for initialization handshake

  • Device Selection: Each worker is assigned a specific GPU (cuda:{rank})

1.3 Request Processing Flow

def step(self, requests: list[OmniDiffusionRequest]):
    # 1. Pre-process requests
    requests = self.pre_process_func(requests)

    # 2. Send to scheduler and wait for response
    output = self.add_req_and_wait_for_response(requests)

    # 3. Post-process results
    result = self.post_process_func(output.output)
    return result

Flow:

  1. Pre-processing: Applies model-specific transformations

  2. Scheduling: Delegates to scheduler for distribution

  3. Post-processing: Converts raw outputs to final format (e.g., PIL images)


2. Scheduler

Location: vllm_omni/diffusion/sched/

Architecture

The scheduler is a request-state scheduler. It owns request lifecycle management and scheduling decisions, while execution stays in DiffusionEngine and the executor.

Key Components

2.1 Scheduler Interface

class SchedulerInterface(ABC):
    def add_request(self, request: OmniDiffusionRequest) -> str: ...
    def schedule(self) -> DiffusionSchedulerOutput: ...
    def update_from_output(
        self,
        sched_output: DiffusionSchedulerOutput,
        output: DiffusionOutput,
    ) -> set[str]: ...

Responsibilities:

  • Lifecycle contract: Defines how the engine adds requests, triggers one scheduling cycle, and feeds executor results back.

  • Stable boundary: DiffusionSchedulerOutput is the only scheduling result consumed by DiffusionEngine.

  • Pluggability: Different scheduler policies can reuse the same engine integration path.

2.2 Request State Model

class DiffusionRequestStatus(enum.IntEnum):
    WAITING = ...
    RUNNING = ...
    PREEMPTED = ...
    FINISHED_COMPLETED = ...
    FINISHED_ABORTED = ...
    FINISHED_ERROR = ...

@dataclass
class DiffusionRequestState:
    request_id: str
    req: OmniDiffusionRequest
    status: DiffusionRequestStatus = DiffusionRequestStatus.WAITING

Design Features:

  • Explicit lifecycle: Requests move through waiting, running, optional preemption, and terminal states.

  • Centralized error handling: Completion, abort, and error states are all normalized in the scheduler layer.

2.3 Shared Bookkeeping in _BaseScheduler

class _BaseScheduler(SchedulerInterface):
    def __init__(self) -> None:
        self._request_states = {}
        self._waiting = deque()
        self._running = []
        self._finished_req_ids = set()
        self.max_num_running_reqs = 1

Design Features:

  • Common state storage: Shared request maps and waiting/running sets live in the base class.

  • Shared cleanup logic: Duplicate-request checks, finish handling, and state removal are centralized instead of duplicated in each policy.

  • Current constraint boundary: _BaseScheduler derives max_num_running_reqs from max_num_seqs, but request-mode diffusion is still clamped back to 1 by the engine. The step-wise path can keep this above 1 for compatible-request batching.

2.4 Current RequestScheduler Policy

class RequestScheduler(_BaseScheduler):
    def schedule(self) -> DiffusionSchedulerOutput:
        # 1. keep existing RUNNING requests in the scheduling result
        # 2. pull WAITING requests while capacity remains
        # 3. move newly admitted requests into RUNNING

Behavior:

  • FIFO request scheduling: Waiting requests are promoted in queue order.

  • Single-request admission: RequestScheduler still admits one active request at a time because request-mode execution completes a whole request per dispatch.

  • Executor result feedback: update_from_output() converts executor output into FINISHED_COMPLETED or FINISHED_ERROR and returns finished request ids.

2.5 Engine-Driven Execution Loop

request_id = scheduler.add_request(request)
while True:
    sched_output = scheduler.schedule()
    output = executor.add_req(req)
    finished_req_ids = scheduler.update_from_output(sched_output, output)

Design Decisions:

  • Separation of concerns: Scheduler manages state and policy; executor handles runtime execution.

  • No scheduler-owned IPC: Scheduler no longer talks to workers directly.

  • Split concurrency model: Request-mode diffusion remains single-active-request, while the step-wise path can keep multiple compatible requests running and advance them independently between denoise steps.


3. Worker

Location: vllm_omni/diffusion/worker/gpu_worker.py

Architecture

Workers are independent processes that execute the actual model inference. Each worker runs on a dedicated GPU and participates in distributed inference.

Key Components

3.1 Worker Process Structure

class WorkerProc:
    def __init__(self, od_config, gpu_id, broadcast_handle):
        # Initialize ZMQ context for IPC
        self.context = zmq.Context(io_threads=2)

        # Connect to broadcast queue (receive requests)
        self.mq = MessageQueue.create_from_handle(broadcast_handle, gpu_id)

        # Create result queue (only rank 0)
        if gpu_id == 0:
            self.result_mq = MessageQueue(n_reader=1, ...)

        # Initialize GPU worker
        self.worker = GPUWorker(local_rank=gpu_id, rank=gpu_id, od_config=od_config)

Initialization Steps:

  1. IPC Setup: Creates ZMQ context and message queues

  2. Distributed Environment Setup: Initializes PyTorch distributed communication

    • For CUDA GPUs: Uses NCCL (fast GPU communication)

    • For NPU: Uses HCCL (Huawei Collective Communications Library)

    • For other devices: Uses appropriate backend (GLOO, MCCL, etc.)

  3. Model Loading: Loads diffusion pipeline on assigned GPU

  4. Cache Setup: Enables cache backend if configured.

3.2 GPU Worker

class GPUWorker:
    def init_device_and_model(self):
        # Set distributed environment variables
        os.environ["RANK"] = str(rank)
        os.environ["WORLD_SIZE"] = str(world_size)

        # Initialize PyTorch distributed
        init_distributed_environment(world_size, rank)
        parallel_config = self.od_config.parallel_config
        initialize_model_parallel(
            data_parallel_size=parallel_config.data_parallel_size,
            cfg_parallel_size=parallel_config.cfg_parallel_size,
            sequence_parallel_size=parallel_config.sequence_parallel_size,
            tensor_parallel_size=parallel_config.tensor_parallel_size,
            pipeline_parallel_size=parallel_config.pipeline_parallel_size,
        )

        # Load model
        model_loader = DiffusersPipelineLoader(load_config)
        self.pipeline = model_loader.load_model(od_config, load_device=f"cuda:{rank}")

        # Setup cache backend
        from vllm_omni.diffusion.cache.selector import get_cache_backend
        self.cache_backend = get_cache_backend(od_config.cache_backend, od_config.cache_config)

        if self.cache_backend is not None:
            self.cache_backend.enable(self.pipeline)

Key Features:

  • Tensor Parallelism: Supports multi-GPU tensor parallelism via PyTorch distributed

  • Model Loading: Uses DiffusersPipelineLoader for efficient weight loading

  • Cache Integration: Enables cache backends (TeaCache, cache-dit, etc.) transparently

3.3 Worker Busy Loop

def worker_busy_loop(self):
    while self._running:
        # 1. Receive unified message (generation request, RPC request, or shutdown)
        msg = self.recv_message()

        # 2. Route message based on type
        if isinstance(msg, dict) and msg.get("type") == "rpc":
            # Handle RPC request
            result, should_reply = self.execute_rpc(msg)
            if should_reply:
                self.return_result(result)

        elif isinstance(msg, dict) and msg.get("type") == "shutdown":
            # Handle shutdown message
            self._running = False

        else:
            # Handle generation request (OmniDiffusionRequest list)
            output = self.worker.execute_model(msg, self.od_config)
            self.return_result(output)

Execution Flow:

  1. Receive: Dequeues unified messages from shared memory queue

  2. Route: Handles different message types (generation, RPC, shutdown)

  3. Execute: Runs forward pass through pipeline for generation requests

  4. Respond: Sends results back (rank 0 for generation, specified rank for RPC)

3.4 Model Execution

@torch.inference_mode()
def execute_model(self, reqs: list[OmniDiffusionRequest], od_config):
    req = reqs[0]  # TODO: support batching

    # Refresh cache backend if enabled
    if self.cache_backend is not None and self.cache_backend.is_enabled():
        self.cache_backend.refresh(self.pipeline, req.num_inference_steps)

    # Set forward context for parallelism
    with set_forward_context(
        vllm_config=self.vllm_config,
        omni_diffusion_config=self.od_config
    ):
        output = self.pipeline.forward(req)
    return output

The model execution leverages multiple parallelism strategies that are transparently applied during the forward pass. The set_forward_context() context manager makes parallel group information available throughout the forward pass:

# Inside transformer layers, parallel groups are accessed via:
from vllm_omni.diffusion.distributed.parallel_state import (
    get_sp_group, get_dp_group, get_cfg_group, get_pp_group
)

Optimizations:

  • Cache Refresh: Clears cache state before each generation for clean state

  • Context Management: Forward context ensures parallel groups are available during execution

  • Single Request: Currently processes one request at a time (batching TODO)


4. Diffusion Pipeline

Location: vllm_omni/diffusion/models/*/pipeline_*.py

The pipeline is the model-specific implementation that orchestrates the diffusion process. Different models (QwenImage, Wan2.2, Z-Image) have their own pipeline implementations.

Most pipeline implementation are referred from diffusers. The multi-step diffusion loop is usually the most time-consuming part during the overall inference process, which is defined by the diffuse function in the pipeline class. An example is as follows:

def diffuse(self, ...):
    for i, t in enumerate(timesteps):
        # Forward pass for positive prompt
        transformer_kwargs = {
            "hidden_states": latents,
            "timestep": timestep / 1000,
            "encoder_hidden_states": prompt_embeds,
        }
        noise_pred = self.transformer(**transformer_kwargs)[0]

        # Forward pass for negative prompt (CFG)
        if do_true_cfg:
            neg_transformer_kwargs = {...}
            neg_transformer_kwargs["cache_branch"] = "negative"
            neg_noise_pred = self.transformer(**neg_transformer_kwargs)[0]

            # Combine predictions
            comb_pred = neg_noise_pred + true_cfg_scale * (noise_pred - neg_noise_pred)
            noise_pred = comb_pred * (cond_norm / noise_norm)

        # Scheduler step
        latents = self.scheduler.step(noise_pred, t, latents)[0]

    return latents

Key Features:

  • CFG Support: Handles classifier-free guidance with separate forward passes

  • Cache Branching: Uses cache_branch parameter for cache-aware execution

  • True CFG: Implements advanced CFG with norm preservation

To learn more about the diffusion pipeline and how to add a new diffusion pipeline, please view Adding Diffusion Model


5. Acceleration Components

5.1 Attention Backends

Location: vllm_omni/diffusion/attention/

Architecture

The attention system uses a role-aware backend selector pattern. Each Attention site declares a semantic role ("self", "cross", or a model-specific string). The selector consults the user's AttentionConfig to pick a backend per role and falls back to the platform default when nothing is configured.

Backend Selection

Location: vllm_omni/diffusion/attention/selector.py

class Attention(nn.Module):
    def __init__(self, num_heads, head_size, causal, softmax_scale, *,
                 role="self", role_category=None, ...):
        # Resolve backend for this role from the active OmniDiffusionConfig
        config = get_current_diffusion_config_or_none()
        attention_config = config.attention_config if config is not None else None
        attn_backend_cls, spec = get_attn_backend_for_role(
            role=role,
            head_size=head_size,
            attention_config=attention_config,
            role_category=role_category,
        )
        self.attn_impl_cls = attn_backend_cls.get_impl_cls()
        self.attention = self.attn_impl_cls(..., backend_kwargs=spec.extra if spec else None)

Available Backends:

  • FlashAttention (FLASH_ATTN): Memory-efficient kernel that dispatches to FA2/FA3 on CUDA, AITER on ROCm, and mindiesd on Ascend NPU.

  • SDPA (TORCH_SDPA): PyTorch's scaled dot-product attention — cross-platform fallback.

  • SageAttention (SAGE_ATTN): Quantized attention implementation from the SageAttention library.

These backends provide the kernel implementations for attention computation. For attention-level sequence parallelism strategies (Ring Attention, Ulysses), see Parallel Attention.

Backend Selection Mechanism

Selection is driven by AttentionConfig on OmniDiffusionConfig, which carries a global default plus a per_role map of AttentionSpec entries. A role string is resolved in this order:

def get_attn_backend_for_role(role, head_size, attention_config=None, role_category=None):
    # 1. attention_config.per_role[role]            — exact match
    # 2. attention_config.per_role[role_category]   — category fallback
    # 3. attention_config.default                   — global default
    # 4. Platform default                           — hardware-specific
    if attention_config is not None:
        spec, source = attention_config.resolve_with_source(
            role=role, role_category=role_category,
        )
        if spec is not None:
            return load_backend(spec.backend), spec
    return current_omni_platform.get_diffusion_attn_backend_cls(...), None

Selection Priority:

  1. Per-role override (--diffusion-attention-config.per_role.<role>.backend) — finest control; matched on the layer's exact role string.

  2. Role category fallback (--diffusion-attention-config.per_role.<category>.backend) — used when an exact match is missing and the layer declared role_category (e.g. a "mymodel.audio_to_video" site can fall back to "cross").

  3. Global default (--diffusion-attention-backend FLASH_ATTN, or --diffusion-attention-config.default.backend, or DIFFUSION_ATTENTION_BACKEND env var).

  4. Platform defaultcurrent_omni_platform.get_diffusion_attn_backend_cls(...) picks the best-available kernel for the hardware.

AttentionSpec.extra is forwarded to the backend constructor as backend_kwargs, so backend-specific parameters (e.g. SparseBlock block_size) can be set without changing model code.

For the user-facing CLI surface, see Diffusion Attention Backends. For the role declaration contract on the model side, see Adding a Diffusion Model.

Backend Availability:

  • SDPA: Always available (PyTorch built-in)

  • FlashAttention: Requires flash-attn on CUDA, aiter on ROCm, or mindiesd on Ascend NPU

  • SageAttention: Requires sage-attention package (from THU-ML GitHub)

Attention Backend Registry

Location: vllm_omni/diffusion/attention/selector.py + vllm_omni/platforms/<device>/platform.py

Backend resolution is delegated to the active platform: current_omni_platform.get_diffusion_attn_backend_cls(selected_backend, head_size) returns a fully-qualified class path that _load_backend_cls imports lazily, with the result cached by (backend_name, head_size).

@cache
def _cached_get_backend_cls(backend_name: str | None, head_size: int) -> type[AttentionBackend]:
    backend_cls_path = current_omni_platform.get_diffusion_attn_backend_cls(
        selected_backend=backend_name,
        head_size=head_size,
    )
    return _load_backend_cls(backend_cls_path)

Each platform (cuda, rocm, xpu, musa, npu) maps backend names like "FLASH_ATTN", "TORCH_SDPA", "SAGE_ATTN" to the right backend class path for that hardware, including head-size compatibility checks. Passing selected_backend=None lets the platform pick its own default.

Attention Backend Integration

The Attention layer integrates backends through a unified interface. Here's how FlashAttentionBackend is integrated as an example:

# attention/backends/flash_attn.py

class FlashAttentionBackend(AttentionBackend):
    @staticmethod
    def get_name() -> str:
        return "FLASH_ATTN"

    @staticmethod
    def get_impl_cls() -> type["FlashAttentionImpl"]:
        return FlashAttentionImpl

    @staticmethod
    def get_supported_head_sizes() -> list[int]:
        return [64, 96, 128, 192, 256]  # FlashAttention supports these head sizes


class FlashAttentionImpl(AttentionImpl):
    def __init__(self, num_heads, head_size, softmax_scale, causal, ...):
        self.num_heads = num_heads
        self.causal = causal
        self.softmax_scale = softmax_scale

    def forward(self, query, key, value, attn_metadata=None):
        # Call FlashAttention kernel
        out = flash_attn_func(
            query, key, value,
            causal=self.causal,
            softmax_scale=self.softmax_scale,
        )
        return out

5.2 Parallel Attention

Location: vllm_omni/diffusion/attention/parallel/

Architecture

Parallel attention strategies implement Sequence Parallelism (SP) at the attention layer level. These strategies distribute attention computation across multiple GPUs by splitting the sequence dimension, using different communication patterns. They work on top of AttentionBackend implementations (FlashAttention, SDPA, etc.), handling the parallelization/communication while the backends handle the actual attention computation.

Key Distinction: Unlike AttentionBackend (which provides kernel implementations), ParallelAttentionStrategy provides communication patterns for multi-GPU attention parallelism. These strategies implement the ParallelAttentionStrategy interface and use AttentionBackend implementations internally.

Both Ring Attention and Ulysses are forms of Sequence Parallelism (SP) that:

  • Split the sequence dimension across GPUs

  • Contribute to sequence_parallel_size (via ring_degree and ulysses_degree)

  • Work at the attention layer level (not model/pipeline level)

Ulysses Sequence Parallelism (USP)

Location: vllm_omni/diffusion/attention/parallel/ulysses.py

USP is a sequence-parallel attention strategy that splits attention computation across multiple GPUs by distributing both the sequence dimension and attention heads. It uses all-to-all communication to efficiently parallelize attention for very long sequences. Specifically, it uses all-to-all collective operations to redistribute Q/K/V tensors before attention computation and gather results afterward.

Ulysses splits attention computation in two dimensions:

  1. Sequence Dimension: Splits the sequence length across GPUs

  2. Head Dimension: Splits attention heads across GPUs

Configuration: ulysses_degree contributes to sequence_parallel_size

Ring Sequence Parallelism

Location: vllm_omni/diffusion/attention/parallel/ring.py

Ring Attention is a parallel attention strategy that implements sequence parallelism using ring-based point-to-point (P2P) communication. Unlike attention backends that provide the attention kernel implementation, Ring Attention is a communication pattern that works on top of attention backends (FlashAttention or SDPA).

Ring Attention splits sequence dimension across GPUs in a ring topology, implemented via the ParallelAttentionStrategy interface, instead of AttentionBackend. P2P ring communication is applied to circulate Key/Value blocks across GPUs. Internally, ring_flash_attn_func or ring_pytorch_attn_func is used depending on available backends.

Architecture:

class RingParallelAttention:
    """Ring sequence-parallel strategy."""

    def run_attention(self, query, key, value, attn_metadata, ...):
        # Selects underlying attention kernel (FlashAttention or SDPA)
        if backend_pref == "sdpa":
            return ring_pytorch_attn_func(...)  # Uses SDPA kernel
        else:
            return ring_flash_attn_func(...)    # Uses FlashAttention kernel

Integration:

  • Ring Attention is activated when ring_degree > 1 in parallel config

  • It's selected by build_parallel_attention_strategy() in the attention layer

  • The Attention layer routes to _run_ring_attention() when Ring is enabled

  • Works alongside attention backends: Ring handles communication, backends handle computation

Configuration: ring_degree contributes to sequence_parallel_size

Relationship with AttentionBackend

Parallel attention strategies (Ring, Ulysses) work on top of AttentionBackend implementations:

  • They use AttentionBackend for the actual attention computation (FlashAttention, SDPA, etc.)

  • They handle the multi-GPU communication/parallelization layer

  • They implement ParallelAttentionStrategy interface (not AttentionBackend)

For general parallelism strategies (Data Parallelism, Tensor Parallelism, Pipeline Parallelism), see Parallel Strategies.


5.3 Cache Backends

Location: vllm_omni/diffusion/cache/

Architecture

Cache backends provide a unified interface for applying different caching strategies to accelerate diffusion inference. The system supports multiple backends (TeaCache, cache-dit) with a consistent API for enabling and refreshing cache state.

Cache Backend Interface

class CacheBackend(ABC):
    def __init__(self, config: DiffusionCacheConfig):
        self.config = config
        self.enabled = False

    @abstractmethod
    def enable(self, pipeline: Any) -> None:
        """Enable cache on the pipeline."""
        raise NotImplementedError

    @abstractmethod
    def refresh(self, pipeline: Any, num_inference_steps: int, verbose: bool = True) -> None:
        """Refresh cache state for new generation."""
        raise NotImplementedError

    def is_enabled(self) -> bool:
        """Check if cache is enabled."""
        return self.enabled

Design Pattern:

  • Abstract Base Class: Defines contract for all cache backends

  • Pipeline-based: Works with pipeline instances (not just transformers)

  • State Management: Provides refresh mechanism for clean state between generations

Available Backends

1. TeaCache Backend

Location: vllm_omni/diffusion/cache/teacache/backend.py

class TeaCacheBackend(CacheBackend):
    def enable(self, pipeline: Any):
        # Extract transformer from pipeline
        transformer = pipeline.transformer
        transformer_type = transformer.__class__.__name__

        # Create TeaCacheConfig from DiffusionCacheConfig
        teacache_config = TeaCacheConfig(
            transformer_type=transformer_type,
            rel_l1_thresh=self.config.rel_l1_thresh,
            coefficients=self.config.coefficients,
        )

        # Apply hooks to transformer
        apply_teacache_hook(transformer, teacache_config)
        self.enabled = True

    def refresh(self, pipeline: Any, num_inference_steps: int, verbose: bool = True):
        transformer = pipeline.transformer
        if hasattr(transformer, "_hook_registry"):
            transformer._hook_registry.reset_hook(TeaCacheHook._HOOK_NAME)

TeaCache Features:

  • Timestep-aware: Caches based on timestep embedding similarity

  • Adaptive: Dynamically decides when to reuse cached computations

  • CFG-aware: Handles positive/negative branches separately

  • Custom Hook System: Uses a custom forward interception mechanism (via HookRegistry) that wraps the module's forward method, allowing transparent integration without modifying model code

2. Cache-DiT Backend

Location: vllm_omni/diffusion/cache/cache_dit_backend.py

class CacheDiTBackend(CacheBackend):
    def enable(self, pipeline: Any):
        # Uses cache-dit library for acceleration
        # Supports DBCache, SCM (Step Computation Masking), TaylorSeer
        # Works with single and dual-transformer architectures
        ...
        self.enabled = True

    def refresh(self, pipeline: Any, num_inference_steps: int, verbose: bool = True):
        # Updates cache context with new num_inference_steps
        ...

Cache-DiT Features:

  • DBCache: Dynamic block caching with configurable compute blocks

  • SCM: Step Computation Masking for additional speedup

  • TaylorSeer: Advanced calibration for cache accuracy

  • Dual-transformer Support: Handles models like Wan2.2 with two transformers

Cache Backend Selector

Location: vllm_omni/diffusion/cache/selector.py

def get_cache_backend(
    cache_backend: str | None,
    cache_config: dict | DiffusionCacheConfig
) -> CacheBackend | None:
    """Get cache backend instance based on cache_backend string.

    Args:
        cache_backend: Cache backend name ("cache_dit", "tea_cache", or None)
        cache_config: Cache configuration (dict or DiffusionCacheConfig)

    Returns:
        Cache backend instance or None if cache_backend is "none"
    """
    if cache_backend is None or cache_backend == "none":
        return None

    if isinstance(cache_config, dict):
        cache_config = DiffusionCacheConfig.from_dict(cache_config)

    if cache_backend == "cache_dit":
        return CacheDiTBackend(cache_config)
    elif cache_backend == "tea_cache":
        return TeaCacheBackend(cache_config)
    else:
        raise ValueError(f"Unsupported cache backend: {cache_backend}")

Usage Flow:

  1. Selection: get_cache_backend() returns appropriate backend instance

  2. Enable: backend.enable(pipeline) called during worker initialization

  3. Refresh: backend.refresh(pipeline, num_inference_steps) called before each generation

  4. Check: backend.is_enabled() verifies cache is active

5.4 Parallel Strategies

Location: vllm_omni/diffusion/distributed/parallel_state.py

Parallelism Types

The system supports multiple orthogonal parallelism strategies:

Sequence Parallelism (SP)

  • Purpose: Split sequence dimension across GPUs

  • Attention-level SP: Ring Attention and Ulysses (USP) implement SP at the attention layer level

    • See Parallel Attention for details

    • Configuration: ulysses_degree × ring_degree = sequence_parallel_size

  • Use Case: Very long sequences (e.g., high-resolution images)

Data Parallelism (DP)

  • Purpose: Replicate model across GPUs, split batch

  • Use Case: Batch processing, throughput optimization

Tensor Parallelism (TP) (Experimental)

  • Purpose: Split model weights across GPUs

  • Implementation: Uses vLLM's tensor parallel groups

  • Use Case: Large models that don't fit on single GPU

CFG Parallelism (under development)

  • Purpose: Parallelize Classifier-Free Guidance (positive/negative prompts)

  • Infrastructure: CFG parallel groups are initialized and available via get_cfg_group()

Parallel Group Management

def initialize_model_parallel(
    data_parallel_size: int = 1,
    cfg_parallel_size: int = 1,
    sequence_parallel_size: int | None = None,
    ulysses_degree: int = 1,
    ring_degree: int = 1,
    tensor_parallel_size: int = 1,
    pipeline_parallel_size: int = 1,
    vae_parallel_size: int = 0,
):
    # Generate orthogonal parallel groups
    rank_generator = RankGenerator(
        tensor_parallel_size,
        sequence_parallel_size,
        pipeline_parallel_size,
        cfg_parallel_size,
        data_parallel_size,
        "tp-sp-pp-cfg-dp",
    )

    # Initialize each parallel group
    _DP = init_model_parallel_group(rank_generator.get_ranks("dp"), ...)
    _CFG = init_model_parallel_group(rank_generator.get_ranks("cfg"), ...)
    _SP = init_model_parallel_group(rank_generator.get_ranks("sp"), ...)
    _PP = init_model_parallel_group(rank_generator.get_ranks("pp"), ...)
    _TP = init_model_parallel_group(rank_generator.get_ranks("tp"), ...)

Rank Order: tp-sp-pp-cfg-dp (tensor → sequence → pipeline → cfg → data)

Note: For attention-level Sequence Parallelism implementations (Ring Attention and Ulysses), see Parallel Attention. This section covers higher-level parallelism strategies.


6. Data Flow

Complete Request Flow

vLLM-Omni Diffusion Module Components

End-to-end Data Flow in the vLLM-Omni Diffusion Module

1. User Request
   └─> OmniDiffusion.generate(prompt)
       └─> Prepare OmniDiffusionRequest
           └─> DiffusionEngine.step(requests)

2. Pre-processing
   └─> pre_process_func(requests)
       └─> Model-specific transformations

3. Scheduling
   └─> scheduler.add_request(request)
       └─> scheduler.schedule()
           └─> DiffusionEngine submits scheduled request to executor.add_req(req)

4. Worker Execution
   └─> WorkerProc.worker_busy_loop()
       └─> GPUWorker.execute_model(reqs)
           └─> Pipeline.forward(req)
               ├─> encode_prompt()
               ├─> prepare_latents()
               ├─> diffuse() [loop]
               │   ├─> transformer.forward() [with cache backend hooks]
               │   └─> scheduler.step()
               └─> vae.decode()

5. Result Collection
   └─> Executor returns DiffusionOutput
       └─> scheduler.update_from_output(...)
           └─> DiffusionEngine pops finished request state

6. Post-processing
   └─> post_process_func(output)
       └─> Convert to PIL images / final format