Skip to content

vllm_omni.diffusion.distributed.cfg_parallel

Base pipeline class for Diffusion models with shared CFG functionality.

logger module-attribute

logger = init_logger(__name__)

CFGParallelMixin

Base Mixin class for Diffusion pipelines providing shared CFG methods.

All pipelines should inherit from this class to reuse classifier-free guidance logic.

CFG Parallel Architecture

When cfg_world_size > 1, each rank computes one branch (positive or negative), then all_gather exchanges results. All ranks then compute the CFG combine and scheduler step locally — no broadcast needed because the operations are deterministic.

Multi-output models

Models that return tuple from predict_noise() (e.g., video + audio) should override combine_cfg_noise() to define per-element combine logic, and set self.scheduler to a composite scheduler that handles tuples.

cfg_normalize_function

cfg_normalize_function(
    noise_pred: Tensor, comb_pred: Tensor
) -> Tensor

Normalize the combined noise prediction.

Parameters:

Name Type Description Default
noise_pred Tensor

positive noise prediction

required
comb_pred Tensor

combined noise prediction after CFG

required

Returns:

Type Description
Tensor

Normalized noise prediction tensor

combine_cfg_noise

combine_cfg_noise(
    positive_noise_pred: Tensor | tuple[Tensor, ...],
    negative_noise_pred: Tensor | tuple[Tensor, ...],
    true_cfg_scale: float,
    cfg_normalize: bool = False,
) -> Tensor | tuple[Tensor, ...]

Combine conditional and unconditional noise predictions with CFG.

Accepts both plain tensors (backward-compatible, used by LTX2 etc.) and tuples (multi-output models). Default implementation applies the standard CFG formula to every element.

Multi-output models can override this to apply different logic per element.

Example override for a model returning (video_pred, audio_pred)::

def combine_cfg_noise(self, positive_noise_pred, negative_noise_pred, scale, normalize):
    (video_pos, audio_pos) = positive_noise_pred
    (video_neg, audio_neg) = negative_noise_pred
    video_combined = super().combine_cfg_noise(video_pos, video_neg, scale, normalize)
    return (video_combined, audio_pos)  # audio: positive only, no CFG

Parameters:

Name Type Description Default
positive_noise_pred Tensor | tuple[Tensor, ...]

Positive/conditional prediction(s) — Tensor or tuple

required
negative_noise_pred Tensor | tuple[Tensor, ...]

Negative/unconditional prediction(s) — Tensor or tuple

required
true_cfg_scale float

CFG scale factor

required
cfg_normalize bool

Whether to normalize the combined prediction (default: False)

False

Returns:

Type Description
Tensor | tuple[Tensor, ...]

Combined noise prediction(s) — same type as inputs

combine_multi_branch_cfg_noise

combine_multi_branch_cfg_noise(
    predictions: list[Tensor | tuple[Tensor, ...]],
    true_cfg_scale: float | dict[str, float],
    cfg_normalize: bool = False,
) -> Tensor | tuple[Tensor, ...]

Combine N branch predictions. Default: standard 2-branch CFG formula.

Override this method for custom multi-branch combine logic.

Parameters:

Name Type Description Default
predictions list[Tensor | tuple[Tensor, ...]]

List of N predictions, where predictions[0] is always the positive/conditional branch.

required
true_cfg_scale float | dict[str, float]

CFG scale factor (float for 2-branch, dict for multi-branch).

required
cfg_normalize bool

Whether to normalize the combined prediction.

False

Returns:

Type Description
Tensor | tuple[Tensor, ...]

Combined noise prediction.

diffuse

diffuse(*args: Any, **kwargs: Any) -> Any

Diffusion loop with optional classifier-free guidance.

Subclasses MUST implement this method to define the complete diffusion/denoising loop for their specific model.

Typical implementation pattern (single output):

def diffuse(self, latents, timesteps, prompt_embeds, negative_embeds, ...):
    for t in timesteps:
        positive_kwargs = {...}
        negative_kwargs = {...}

        noise_pred = self.predict_noise_maybe_with_cfg(
            do_true_cfg=do_true_cfg,
            true_cfg_scale=self.guidance_scale,
            positive_kwargs=positive_kwargs,
            negative_kwargs=negative_kwargs,
        )

        latents = self.scheduler_step_maybe_with_cfg(
            noise_pred, t, latents, do_true_cfg=do_true_cfg
        )

    return latents

Multi-output models (e.g., video + audio) should: 1. Override predict_noise() to return a tuple 2. Override combine_cfg_noise() for per-element CFG logic 3. Set self.scheduler to a composite scheduler that handles tuples

def diffuse(self, video_latents, audio_latents, timesteps_video, timesteps_audio, ...):
    for t_v, t_a in zip(timesteps_video, timesteps_audio):
        positive_kwargs = {...}
        negative_kwargs = {...}

        # Returns tuple: (video_pred, audio_pred)
        video_pred, audio_pred = self.predict_noise_maybe_with_cfg(
            do_true_cfg=do_true_cfg,
            true_cfg_scale=self.guidance_scale,
            positive_kwargs=positive_kwargs,
            negative_kwargs=negative_kwargs,
        )

        # self.scheduler = VideoAudioScheduler(video_sched, audio_sched)
        # which accepts and returns tuples
        video_latents, audio_latents = self.scheduler_step_maybe_with_cfg(
            (video_pred, audio_pred),
            (t_v, t_a),
            (video_latents, audio_latents),
            do_true_cfg=do_true_cfg,
        )

    return video_latents, audio_latents

predict_noise

predict_noise(
    *args: Any, **kwargs: Any
) -> Tensor | tuple[Tensor, ...] | IntermediateTensors

Forward pass through transformer to predict noise.

Subclasses should override this if they need custom behavior, but the default implementation calls self.transformer.

Returns:

Type Description
Tensor | tuple[Tensor, ...] | IntermediateTensors

Single Tensor for standard models, or tuple of Tensors for

Tensor | tuple[Tensor, ...] | IntermediateTensors

multi-output models (e.g., video + audio). Multi-output models

Tensor | tuple[Tensor, ...] | IntermediateTensors

must also override combine_cfg_noise() and set self.scheduler

Tensor | tuple[Tensor, ...] | IntermediateTensors

to a composite scheduler that handles tuples.

Tensor | tuple[Tensor, ...] | IntermediateTensors

Non-last Pipeline Parallel stages return IntermediateTensors

Tensor | tuple[Tensor, ...] | IntermediateTensors

instead of final noise tensors wrapped in a tuple.

predict_noise_maybe_with_cfg

predict_noise_maybe_with_cfg(
    do_true_cfg: bool,
    true_cfg_scale: float,
    positive_kwargs: dict[str, Any],
    negative_kwargs: dict[str, Any] | None,
    cfg_normalize: bool = True,
    output_slice: int | None = None,
) -> Tensor | tuple[Tensor, ...]

Predict noise with optional classifier-free guidance.

Parameters:

Name Type Description Default
do_true_cfg bool

Whether to apply CFG

required
true_cfg_scale float

CFG scale factor

required
positive_kwargs dict[str, Any]

Kwargs for positive/conditional prediction

required
negative_kwargs dict[str, Any] | None

Kwargs for negative/unconditional prediction

required
cfg_normalize bool

Whether to normalize CFG output (default: True)

True
output_slice int | None

If set, slice each output to [:, :output_slice] for image editing

None

Returns:

Type Description
Tensor | tuple[Tensor, ...]

Predicted noise tensor or tuple of tensors.

Tensor | tuple[Tensor, ...]

In CFG parallel mode, result is valid on ALL ranks (not just rank 0).

Note

For multi-output models (e.g., video + audio where predict_noise returns a tuple), override combine_cfg_noise() for per-element CFG logic and set self.scheduler to a composite scheduler.

predict_noise_with_multi_branch_cfg

predict_noise_with_multi_branch_cfg(
    do_true_cfg: bool,
    true_cfg_scale: float | dict[str, float],
    branches_kwargs: list[dict[str, Any]],
    cfg_normalize: bool = False,
    output_slice: int | None = None,
) -> Tensor | tuple[Tensor, ...]

Predict noise with N-branch CFG dispatch across M GPUs.

This is the multi-branch counterpart of predict_noise_maybe_with_cfg(). Use this for models with 3 or more CFG branches (e.g., OmniGen2, Bagel, DreamID). Existing 2-branch models should continue using predict_noise_maybe_with_cfg().

Parameters:

Name Type Description Default
do_true_cfg bool

Whether to apply CFG.

required
true_cfg_scale float | dict[str, float]

CFG scale factor (passed to combine_multi_branch_cfg_noise).

required
branches_kwargs list[dict[str, Any]]

List of N dicts, each containing kwargs for one predict_noise() call. branches_kwargs[0] is always the positive/conditional branch.

required
cfg_normalize bool

Whether to normalize (passed to combine_multi_branch_cfg_noise).

False
output_slice int | None

If set, slice each output to [:, :output_slice].

None

Returns:

Type Description
Tensor | tuple[Tensor, ...]

Combined noise prediction, identical on all ranks in CFG parallel.

scheduler_step

scheduler_step(
    noise_pred: Tensor | tuple[Tensor, ...],
    t: Tensor | tuple[Tensor, ...],
    latents: Tensor | tuple[Tensor, ...],
    per_request_scheduler: Any | None = None,
    generator: Generator | None = None,
) -> Tensor | tuple[Tensor, ...]

Step the scheduler.

Default implementation passes inputs directly to self.scheduler.step(). For multi-output models, set self.scheduler to a composite scheduler that handles tuples (e.g., VideoAudioScheduler).

Parameters:

Name Type Description Default
noise_pred Tensor | tuple[Tensor, ...]

Predicted noise (Tensor or tuple for multi-output)

required
t Tensor | tuple[Tensor, ...]

Current timestep (Tensor or tuple when schedulers differ per output)

required
latents Tensor | tuple[Tensor, ...]

Current latents (Tensor or tuple for multi-output)

required
per_request_scheduler Any | None

Optional request-scoped scheduler that overrides self.scheduler for this call. This is primarily used by step-wise execution, where each request may keep scheduler state in its own runner-managed state object. Request-level execution should usually leave this as None and continue using self.scheduler.

None
generator Generator | None

Optional torch Generator for reproducible sampling. When using CFG parallel, both ranks should receive generators initialized with the same seed so that non-deterministic schedulers (e.g., DDPM) produce identical results.

None

Returns:

Type Description
Tensor | tuple[Tensor, ...]

Updated latents after scheduler step

scheduler_step_maybe_with_cfg

scheduler_step_maybe_with_cfg(
    noise_pred: Tensor | tuple[Tensor, ...],
    t: Tensor | tuple[Tensor, ...],
    latents: Tensor | tuple[Tensor, ...],
    do_true_cfg: bool,
    per_request_scheduler: Any | None = None,
    generator: Generator | None = None,
) -> Tensor | tuple[Tensor, ...]

Step the scheduler with automatic CFG parallel handling.

All ranks compute the scheduler step locally — no broadcast needed because predict_noise_maybe_with_cfg already ensures all ranks have identical noise_pred after all_gather + local combine.

Parameters:

Name Type Description Default
noise_pred Tensor | tuple[Tensor, ...]

Predicted noise (Tensor or tuple, valid on all ranks)

required
t Tensor | tuple[Tensor, ...]

Current timestep (Tensor or tuple when schedulers differ per output)

required
latents Tensor | tuple[Tensor, ...]

Current latents (Tensor or tuple)

required
do_true_cfg bool

Whether CFG is enabled

required
per_request_scheduler Any | None

Optional request-scoped scheduler that overrides self.scheduler for this call. This is mainly needed by step-wise execution, where scheduler state may be stored per request. Request-level execution should normally leave this as None.

None
generator Generator | None

Optional torch Generator for reproducible sampling. When using CFG parallel, both ranks should receive generators initialized with the same seed so that non-deterministic schedulers (e.g., DDPM) produce identical results.

None

Returns:

Type Description
Tensor | tuple[Tensor, ...]

Updated latents (identical across all CFG ranks)