Skip to content

vllm_omni.diffusion.distributed.pipeline_parallel

AsyncLatents

Transparent async wrapper returned by scheduler_step on rank 0.

Wraps a pending irecv_tensor_dict and defers handle.wait() until the underlying tensor is actually consumed — either via attribute access (e.g. latents.to(dtype), latents.shape) or via a torch operation (e.g. mask * latents). This keeps the first PP rank non-blocking after posting the receive, matching the async philosophy used everywhere else in the PP communication layer.

PipelineParallelMixin

Mixin providing Pipeline Parallelism for diffusion pipelines.

All PP ranks run the full denoising loop in forward(). predict_noise_maybe_with_cfg and scheduler_step_maybe_with_cfg encapsulate all inter-rank communication.

Communication pattern per denoising step

Forward chain : rank 0 → 1 → … → N-1 via async isend/irecv (AsyncIntermediateTensors) Next timestep : last rank → rank 0 via async isend/irecv (AsyncLatents)

All communication is asynchronous using isend_tensor_dict/irecv_tensor_dict. Only rank 0 needs updated latents for the next forward pass start.

For sequential CFG (cfg_parallel_size=1) with PP, two full forward chains are executed — one for the positive pass and one for the negative pass — so that each PP stage operates on the correct encoder_hidden_states.

predict_noise_maybe_with_cfg

predict_noise_maybe_with_cfg(
    do_true_cfg: bool,
    true_cfg_scale: float,
    positive_kwargs: dict[str, Any],
    negative_kwargs: dict[str, Any] | None,
    cfg_normalize: bool = True,
    output_slice: int | None = None,
) -> Tensor | tuple[Tensor, ...] | None

Drop-in replacement for predict_noise_maybe_with_cfg that also handles PP.

Supports three modes
  • PP only, sequential CFG: both branches (cond and uncond) run through this PP pipeline. This doubles communication volume per denoising step compared to PP + CFG-parallel.
  • PP + CFG-parallel: each PP pipeline carries one branch. The last PP rank all-gathers across the CFG group and combines, mirroring CFGParallelMixin.predict_noise_maybe_with_cfg exactly.
  • PP only, no CFG: cond branch only.

Returns:

Type Description
Tensor | tuple[Tensor, ...] | None

noise_pred on the last PP rank (all CFG ranks when CFG-parallel is active).

Tensor | tuple[Tensor, ...] | None

None on all other ranks.

scheduler_step_maybe_with_cfg

scheduler_step_maybe_with_cfg(
    noise_pred: Tensor | tuple[Tensor, ...] | None,
    t: Tensor | tuple[Tensor, ...],
    latents: Tensor | tuple[Tensor, ...],
    do_true_cfg: bool,
    per_request_scheduler: Any | None = None,
    generator: Generator | None = None,
) -> Tensor | tuple[Tensor, ...] | AsyncLatents

Drop-in replacement for scheduler_step_maybe_with_cfg that also handles PP.

Only the last rank runs the scheduler (it already has noise_pred); the result is sent to rank 0 which needs it for the next forward pass.

Returns a AsyncLatents on rank 0 that transparently defers handle.wait() until the tensor is actually consumed (via attribute access or a torch operation), keeping the rank non-blocking after the irecv is posted.