vllm_omni.diffusion.distributed.cfg_parallel ¶
Base pipeline class for Diffusion models with shared CFG functionality.
CFGParallelMixin ¶
Base Mixin class for Diffusion pipelines providing shared CFG methods.
All pipelines should inherit from this class to reuse classifier-free guidance logic.
CFG Parallel Architecture
When cfg_world_size > 1, each rank computes one branch (positive or negative), then all_gather exchanges results. All ranks then compute the CFG combine and scheduler step locally — no broadcast needed because the operations are deterministic.
Multi-output models
Models that return tuple from predict_noise() (e.g., video + audio) should override combine_cfg_noise() to define per-element combine logic, and set self.scheduler to a composite scheduler that handles tuples.
cfg_normalize_function ¶
Normalize the combined noise prediction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
noise_pred | Tensor | positive noise prediction | required |
comb_pred | Tensor | combined noise prediction after CFG | required |
Returns:
| Type | Description |
|---|---|
Tensor | Normalized noise prediction tensor |
combine_cfg_noise ¶
combine_cfg_noise(
positive_noise_pred: Tensor | tuple[Tensor, ...],
negative_noise_pred: Tensor | tuple[Tensor, ...],
true_cfg_scale: float,
cfg_normalize: bool = False,
) -> Tensor | tuple[Tensor, ...]
Combine conditional and unconditional noise predictions with CFG.
Accepts both plain tensors (backward-compatible, used by LTX2 etc.) and tuples (multi-output models). Default implementation applies the standard CFG formula to every element.
Multi-output models can override this to apply different logic per element.
Example override for a model returning (video_pred, audio_pred)::
def combine_cfg_noise(self, positive_noise_pred, negative_noise_pred, scale, normalize):
(video_pos, audio_pos) = positive_noise_pred
(video_neg, audio_neg) = negative_noise_pred
video_combined = super().combine_cfg_noise(video_pos, video_neg, scale, normalize)
return (video_combined, audio_pos) # audio: positive only, no CFG
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
positive_noise_pred | Tensor | tuple[Tensor, ...] | Positive/conditional prediction(s) — Tensor or tuple | required |
negative_noise_pred | Tensor | tuple[Tensor, ...] | Negative/unconditional prediction(s) — Tensor or tuple | required |
true_cfg_scale | float | CFG scale factor | required |
cfg_normalize | bool | Whether to normalize the combined prediction (default: False) | False |
Returns:
| Type | Description |
|---|---|
Tensor | tuple[Tensor, ...] | Combined noise prediction(s) — same type as inputs |
combine_multi_branch_cfg_noise ¶
combine_multi_branch_cfg_noise(
predictions: list[Tensor | tuple[Tensor, ...]],
true_cfg_scale: float | dict[str, float],
cfg_normalize: bool = False,
) -> Tensor | tuple[Tensor, ...]
Combine N branch predictions. Default: standard 2-branch CFG formula.
Override this method for custom multi-branch combine logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
predictions | list[Tensor | tuple[Tensor, ...]] | List of N predictions, where predictions[0] is always the positive/conditional branch. | required |
true_cfg_scale | float | dict[str, float] | CFG scale factor (float for 2-branch, dict for multi-branch). | required |
cfg_normalize | bool | Whether to normalize the combined prediction. | False |
Returns:
| Type | Description |
|---|---|
Tensor | tuple[Tensor, ...] | Combined noise prediction. |
diffuse ¶
Diffusion loop with optional classifier-free guidance.
Subclasses MUST implement this method to define the complete diffusion/denoising loop for their specific model.
Typical implementation pattern (single output):
def diffuse(self, latents, timesteps, prompt_embeds, negative_embeds, ...):
for t in timesteps:
positive_kwargs = {...}
negative_kwargs = {...}
noise_pred = self.predict_noise_maybe_with_cfg(
do_true_cfg=do_true_cfg,
true_cfg_scale=self.guidance_scale,
positive_kwargs=positive_kwargs,
negative_kwargs=negative_kwargs,
)
latents = self.scheduler_step_maybe_with_cfg(
noise_pred, t, latents, do_true_cfg=do_true_cfg
)
return latents
Multi-output models (e.g., video + audio) should: 1. Override predict_noise() to return a tuple 2. Override combine_cfg_noise() for per-element CFG logic 3. Set self.scheduler to a composite scheduler that handles tuples
def diffuse(self, video_latents, audio_latents, timesteps_video, timesteps_audio, ...):
for t_v, t_a in zip(timesteps_video, timesteps_audio):
positive_kwargs = {...}
negative_kwargs = {...}
# Returns tuple: (video_pred, audio_pred)
video_pred, audio_pred = self.predict_noise_maybe_with_cfg(
do_true_cfg=do_true_cfg,
true_cfg_scale=self.guidance_scale,
positive_kwargs=positive_kwargs,
negative_kwargs=negative_kwargs,
)
# self.scheduler = VideoAudioScheduler(video_sched, audio_sched)
# which accepts and returns tuples
video_latents, audio_latents = self.scheduler_step_maybe_with_cfg(
(video_pred, audio_pred),
(t_v, t_a),
(video_latents, audio_latents),
do_true_cfg=do_true_cfg,
)
return video_latents, audio_latents
predict_noise ¶
Forward pass through transformer to predict noise.
Subclasses should override this if they need custom behavior, but the default implementation calls self.transformer.
Returns:
| Type | Description |
|---|---|
Tensor | tuple[Tensor, ...] | IntermediateTensors | Single Tensor for standard models, or tuple of Tensors for |
Tensor | tuple[Tensor, ...] | IntermediateTensors | multi-output models (e.g., video + audio). Multi-output models |
Tensor | tuple[Tensor, ...] | IntermediateTensors | must also override combine_cfg_noise() and set self.scheduler |
Tensor | tuple[Tensor, ...] | IntermediateTensors | to a composite scheduler that handles tuples. |
Tensor | tuple[Tensor, ...] | IntermediateTensors | Non-last Pipeline Parallel stages return |
Tensor | tuple[Tensor, ...] | IntermediateTensors | instead of final noise tensors wrapped in a tuple. |
predict_noise_maybe_with_cfg ¶
predict_noise_maybe_with_cfg(
do_true_cfg: bool,
true_cfg_scale: float,
positive_kwargs: dict[str, Any],
negative_kwargs: dict[str, Any] | None,
cfg_normalize: bool = True,
output_slice: int | None = None,
) -> Tensor | tuple[Tensor, ...]
Predict noise with optional classifier-free guidance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
do_true_cfg | bool | Whether to apply CFG | required |
true_cfg_scale | float | CFG scale factor | required |
positive_kwargs | dict[str, Any] | Kwargs for positive/conditional prediction | required |
negative_kwargs | dict[str, Any] | None | Kwargs for negative/unconditional prediction | required |
cfg_normalize | bool | Whether to normalize CFG output (default: True) | True |
output_slice | int | None | If set, slice each output to [:, :output_slice] for image editing | None |
Returns:
| Type | Description |
|---|---|
Tensor | tuple[Tensor, ...] | Predicted noise tensor or tuple of tensors. |
Tensor | tuple[Tensor, ...] | In CFG parallel mode, result is valid on ALL ranks (not just rank 0). |
Note
For multi-output models (e.g., video + audio where predict_noise returns a tuple), override combine_cfg_noise() for per-element CFG logic and set self.scheduler to a composite scheduler.
predict_noise_with_multi_branch_cfg ¶
predict_noise_with_multi_branch_cfg(
do_true_cfg: bool,
true_cfg_scale: float | dict[str, float],
branches_kwargs: list[dict[str, Any]],
cfg_normalize: bool = False,
output_slice: int | None = None,
) -> Tensor | tuple[Tensor, ...]
Predict noise with N-branch CFG dispatch across M GPUs.
This is the multi-branch counterpart of predict_noise_maybe_with_cfg(). Use this for models with 3 or more CFG branches (e.g., OmniGen2, Bagel, DreamID). Existing 2-branch models should continue using predict_noise_maybe_with_cfg().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
do_true_cfg | bool | Whether to apply CFG. | required |
true_cfg_scale | float | dict[str, float] | CFG scale factor (passed to combine_multi_branch_cfg_noise). | required |
branches_kwargs | list[dict[str, Any]] | List of N dicts, each containing kwargs for one predict_noise() call. branches_kwargs[0] is always the positive/conditional branch. | required |
cfg_normalize | bool | Whether to normalize (passed to combine_multi_branch_cfg_noise). | False |
output_slice | int | None | If set, slice each output to [:, :output_slice]. | None |
Returns:
| Type | Description |
|---|---|
Tensor | tuple[Tensor, ...] | Combined noise prediction, identical on all ranks in CFG parallel. |
scheduler_step ¶
scheduler_step(
noise_pred: Tensor | tuple[Tensor, ...],
t: Tensor | tuple[Tensor, ...],
latents: Tensor | tuple[Tensor, ...],
per_request_scheduler: Any | None = None,
generator: Generator | None = None,
) -> Tensor | tuple[Tensor, ...]
Step the scheduler.
Default implementation passes inputs directly to self.scheduler.step(). For multi-output models, set self.scheduler to a composite scheduler that handles tuples (e.g., VideoAudioScheduler).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
noise_pred | Tensor | tuple[Tensor, ...] | Predicted noise (Tensor or tuple for multi-output) | required |
t | Tensor | tuple[Tensor, ...] | Current timestep (Tensor or tuple when schedulers differ per output) | required |
latents | Tensor | tuple[Tensor, ...] | Current latents (Tensor or tuple for multi-output) | required |
per_request_scheduler | Any | None | Optional request-scoped scheduler that overrides | None |
generator | Generator | None | Optional torch Generator for reproducible sampling. When using CFG parallel, both ranks should receive generators initialized with the same seed so that non-deterministic schedulers (e.g., DDPM) produce identical results. | None |
Returns:
| Type | Description |
|---|---|
Tensor | tuple[Tensor, ...] | Updated latents after scheduler step |
scheduler_step_maybe_with_cfg ¶
scheduler_step_maybe_with_cfg(
noise_pred: Tensor | tuple[Tensor, ...],
t: Tensor | tuple[Tensor, ...],
latents: Tensor | tuple[Tensor, ...],
do_true_cfg: bool,
per_request_scheduler: Any | None = None,
generator: Generator | None = None,
) -> Tensor | tuple[Tensor, ...]
Step the scheduler with automatic CFG parallel handling.
All ranks compute the scheduler step locally — no broadcast needed because predict_noise_maybe_with_cfg already ensures all ranks have identical noise_pred after all_gather + local combine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
noise_pred | Tensor | tuple[Tensor, ...] | Predicted noise (Tensor or tuple, valid on all ranks) | required |
t | Tensor | tuple[Tensor, ...] | Current timestep (Tensor or tuple when schedulers differ per output) | required |
latents | Tensor | tuple[Tensor, ...] | Current latents (Tensor or tuple) | required |
do_true_cfg | bool | Whether CFG is enabled | required |
per_request_scheduler | Any | None | Optional request-scoped scheduler that overrides | None |
generator | Generator | None | Optional torch Generator for reproducible sampling. When using CFG parallel, both ranks should receive generators initialized with the same seed so that non-deterministic schedulers (e.g., DDPM) produce identical results. | None |
Returns:
| Type | Description |
|---|---|
Tensor | tuple[Tensor, ...] | Updated latents (identical across all CFG ranks) |