vllm_omni.diffusion.distributed.pipeline_parallel ¶
AsyncLatents ¶
Transparent async wrapper returned by scheduler_step on rank 0.
Wraps a pending irecv_tensor_dict and defers handle.wait() until the underlying tensor is actually consumed — either via attribute access (e.g. latents.to(dtype), latents.shape) or via a torch operation (e.g. mask * latents). This keeps the first PP rank non-blocking after posting the receive, matching the async philosophy used everywhere else in the PP communication layer.
PipelineParallelMixin ¶
Mixin providing Pipeline Parallelism for diffusion pipelines.
All PP ranks run the full denoising loop in forward(). predict_noise_maybe_with_cfg and scheduler_step_maybe_with_cfg encapsulate all inter-rank communication.
Communication pattern per denoising step
Forward chain : rank 0 → 1 → … → N-1 via async isend/irecv (AsyncIntermediateTensors) Next timestep : last rank → rank 0 via async isend/irecv (AsyncLatents)
All communication is asynchronous using isend_tensor_dict/irecv_tensor_dict. Only rank 0 needs updated latents for the next forward pass start.
For sequential CFG (cfg_parallel_size=1) with PP, two full forward chains are executed — one for the positive pass and one for the negative pass — so that each PP stage operates on the correct encoder_hidden_states.
predict_noise_maybe_with_cfg ¶
predict_noise_maybe_with_cfg(
do_true_cfg: bool,
true_cfg_scale: float,
positive_kwargs: dict[str, Any],
negative_kwargs: dict[str, Any] | None,
cfg_normalize: bool = True,
output_slice: int | None = None,
) -> Tensor | tuple[Tensor, ...] | None
Drop-in replacement for predict_noise_maybe_with_cfg that also handles PP.
Supports three modes
- PP only, sequential CFG: both branches (cond and uncond) run through this PP pipeline. This doubles communication volume per denoising step compared to PP + CFG-parallel.
- PP + CFG-parallel: each PP pipeline carries one branch. The last PP rank all-gathers across the CFG group and combines, mirroring CFGParallelMixin.predict_noise_maybe_with_cfg exactly.
- PP only, no CFG: cond branch only.
Returns:
| Type | Description |
|---|---|
Tensor | tuple[Tensor, ...] | None | noise_pred on the last PP rank (all CFG ranks when CFG-parallel is active). |
Tensor | tuple[Tensor, ...] | None | None on all other ranks. |
scheduler_step_maybe_with_cfg ¶
scheduler_step_maybe_with_cfg(
noise_pred: Tensor | tuple[Tensor, ...] | None,
t: Tensor | tuple[Tensor, ...],
latents: Tensor | tuple[Tensor, ...],
do_true_cfg: bool,
per_request_scheduler: Any | None = None,
generator: Generator | None = None,
) -> Tensor | tuple[Tensor, ...] | AsyncLatents
Drop-in replacement for scheduler_step_maybe_with_cfg that also handles PP.
Only the last rank runs the scheduler (it already has noise_pred); the result is sent to rank 0 which needs it for the next forward pass.
Returns a AsyncLatents on rank 0 that transparently defers handle.wait() until the tensor is actually consumed (via attribute access or a torch operation), keeping the rank non-blocking after the irecv is posted.