Skip to content

vllm_omni.diffusion.data

SHUTDOWN_MESSAGE module-attribute

SHUTDOWN_MESSAGE = {'type': 'shutdown'}

logger module-attribute

logger = init_logger(__name__)

AttentionConfig dataclass

Per-role attention backend configuration.

Lookup precedence for a given (role, role_category): 1. per_role[role] — exact match 2. per_role[role_category] — category fallback (e.g. "ltx2.audio_to_video" → "cross") 3. default — global default 4. platform default — unchanged platform logic

default class-attribute instance-attribute

default: AttentionSpec | None = None

per_role class-attribute instance-attribute

per_role: dict[str, AttentionSpec] = field(
    default_factory=dict
)

resolve_with_source

resolve_with_source(
    role: str = "self", role_category: str | None = None
) -> tuple[AttentionSpec | None, str | None]

Resolve the AttentionSpec and report which config entry matched.

AttentionSpec dataclass

Specifies a backend and its backend-specific parameters for one attention role.

backend instance-attribute

backend: str

extra class-attribute instance-attribute

extra: dict[str, Any] = field(default_factory=dict)

DiffusionCacheConfig dataclass

Configuration for cache adapters (TeaCache, cache-dit, MagCache, etc.).

This dataclass provides a unified interface for cache configuration parameters. It can be initialized from a dictionary and accessed via attributes.

Common parameters
  • TeaCache: rel_l1_thresh, coefficients (optional)
  • cache-dit: Fn_compute_blocks, Bn_compute_blocks, max_warmup_steps, residual_diff_threshold, enable_taylorseer, taylorseer_order, scm_steps_mask_policy, scm_steps_policy
  • MagCache: mag_threshold, mag_max_skip_steps, mag_retention_ratio, mag_ratios, mag_calibrate
Example

From dict (user-facing API) - partial config uses defaults for missing keys

config = DiffusionCacheConfig.from_dict({"rel_l1_thresh": 0.3})

Access via attribute

print(config.rel_l1_thresh) # 0.3 (from dict) print(config.Fn_compute_blocks) # 8 (default)

Empty dict uses all defaults

default_config = DiffusionCacheConfig.from_dict({}) print(config.rel_l1_thresh) # 0.2 (default)

Bn_compute_blocks class-attribute instance-attribute

Bn_compute_blocks: int = 0

Fn_compute_blocks class-attribute instance-attribute

Fn_compute_blocks: int = 1

coefficients class-attribute instance-attribute

coefficients: list[float] | None = None

enable_taylorseer class-attribute instance-attribute

enable_taylorseer: bool = False

force_refresh_step_hint class-attribute instance-attribute

force_refresh_step_hint: int | None = None

force_refresh_step_policy class-attribute instance-attribute

force_refresh_step_policy: str = 'once'

mag_calibrate class-attribute instance-attribute

mag_calibrate: bool = False

mag_max_skip_steps class-attribute instance-attribute

mag_max_skip_steps: int = 5

mag_ratios class-attribute instance-attribute

mag_ratios: list[float] | None = None

mag_retention_ratio class-attribute instance-attribute

mag_retention_ratio: float = 0.1

mag_threshold class-attribute instance-attribute

mag_threshold: float = 0.24

max_cached_steps class-attribute instance-attribute

max_cached_steps: int = -1

max_continuous_cached_steps class-attribute instance-attribute

max_continuous_cached_steps: int = 3

max_warmup_steps class-attribute instance-attribute

max_warmup_steps: int = 4

num_inference_steps class-attribute instance-attribute

num_inference_steps: int | None = None

rel_l1_thresh class-attribute instance-attribute

rel_l1_thresh: float = 0.2

residual_diff_threshold class-attribute instance-attribute

residual_diff_threshold: float = 0.24

scm_steps_mask_policy class-attribute instance-attribute

scm_steps_mask_policy: str | None = None

scm_steps_policy class-attribute instance-attribute

scm_steps_policy: str = 'dynamic'

taylorseer_order class-attribute instance-attribute

taylorseer_order: int = 1

from_dict classmethod

from_dict(data: dict[str, Any]) -> DiffusionCacheConfig

Create DiffusionCacheConfig from a dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing cache configuration parameters

required

Returns:

Type Description
DiffusionCacheConfig

DiffusionCacheConfig instance with parameters set from dict

DiffusionOutput dataclass

Final output (after pipeline completion)

abort_message class-attribute instance-attribute

abort_message: str | None = None

aborted class-attribute instance-attribute

aborted: bool = False

custom_output class-attribute instance-attribute

custom_output: dict[str, Any] = field(default_factory=dict)

error class-attribute instance-attribute

error: str | None = None

output class-attribute instance-attribute

output: Tensor | tuple[Any, ...] | dict[str, Any] | None = (
    None
)

peak_memory_mb class-attribute instance-attribute

peak_memory_mb: float = 0.0

post_process_func class-attribute instance-attribute

post_process_func: Callable[..., Any] | None = None

stage_durations class-attribute instance-attribute

stage_durations: dict[str, float] = field(
    default_factory=dict
)

to_cpu class-attribute instance-attribute

to_cpu: bool = False

trajectory_decoded class-attribute instance-attribute

trajectory_decoded: list[Image] | None = None

trajectory_latents class-attribute instance-attribute

trajectory_latents: Tensor | dict[str, Any] | None = None

trajectory_log_probs class-attribute instance-attribute

trajectory_log_probs: Tensor | dict[str, Any] | None = None

trajectory_timesteps class-attribute instance-attribute

trajectory_timesteps: Tensor | dict[str, Any] | None = None

DiffusionParallelConfig dataclass

Configuration for diffusion model distributed execution.

cfg_parallel_size class-attribute instance-attribute

cfg_parallel_size: int = 1

Number of Classifier Free Guidance (CFG) parallel groups.

data_parallel_size class-attribute instance-attribute

data_parallel_size: int = 1

Number of data parallel groups.

enable_expert_parallel class-attribute instance-attribute

enable_expert_parallel: bool = False

Enable expert parallelism for MoE layers (TP is still used for non-MoE layers).

hsdp_replicate_size class-attribute instance-attribute

hsdp_replicate_size: int = 1

Number of replica groups for HSDP. Each replica holds a full sharded copy.

hsdp_shard_size class-attribute instance-attribute

hsdp_shard_size: int = -1

Number of GPUs to shard weights across within each replica group. -1 means auto-calculate.

pipeline_parallel_size class-attribute instance-attribute

pipeline_parallel_size: int = 1

Number of pipeline parallel stages.

ring_degree class-attribute instance-attribute

ring_degree: int = 1

Number of GPUs used for ring sequence parallelism.

sequence_parallel_size class-attribute instance-attribute

sequence_parallel_size: int | None = None

Number of sequence parallel groups. sequence_parallel_size = ring_degree * ulysses_degree

tensor_parallel_size class-attribute instance-attribute

tensor_parallel_size: int = 1

Number of tensor parallel groups.

ulysses_degree class-attribute instance-attribute

ulysses_degree: int = 1

Number of GPUs used for ulysses sequence parallelism.

ulysses_mode class-attribute instance-attribute

ulysses_mode: str = 'strict'

Ulysses sequence-parallel mode.

  • "strict": Require divisibility constraints (fastest, default).
  • "advanced_uaa": Enable UAA ("Ulysses Anything Attention") to support uneven sequence lengths and non-divisible head counts.

Note: - Ring attention does not support attention_mask, so models that rely on mask-based auto-padding are still incompatible with Ring. - When used in hybrid Ulysses+Ring, Ring requires consistent per-rank sequence shapes across the ring group.

use_hsdp class-attribute instance-attribute

use_hsdp: bool = False

Enable Hybrid Sharded Data Parallel (HSDP) for model weight sharding.

vae_patch_parallel_size class-attribute instance-attribute

vae_patch_parallel_size: int = 1

Number of ranks used for VAE patch/tile parallelism (decode/encode).

from_dict classmethod

from_dict(data: dict[str, Any]) -> DiffusionParallelConfig

Create DiffusionParallelConfig from a dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing parallel configuration parameters

required

Returns:

Type Description
DiffusionParallelConfig

DiffusionParallelConfig instance with parameters set from dict

DiffusionRequestAbortedError

Bases: RuntimeError

Raised when a diffusion request ends via user-visible abort.

OmniACK dataclass

Handshake payload from Workers to Orchestrator.

error_msg class-attribute instance-attribute

error_msg: str | None = None

freed_bytes class-attribute instance-attribute

freed_bytes: int = 0

metadata class-attribute instance-attribute

metadata: dict[str, Any] = field(default_factory=dict)

Additional telemetry such as: - max_contiguous_block: for fragmentation analysis. - cuda_graph_recalled: boolean if graphs were successfully destroyed/rebuilt. - latency_ms: time taken for the D2H/H2D transfer.

rank class-attribute instance-attribute

rank: int | None = None

stage_id class-attribute instance-attribute

stage_id: int | None = None

status instance-attribute

status: str

task_id instance-attribute

task_id: str

OmniDiffusionConfig dataclass

VSA_sparsity class-attribute instance-attribute

VSA_sparsity: float = 0.0

additional_config class-attribute instance-attribute

additional_config: dict[str, Any] = field(
    default_factory=dict
)

boundary_ratio class-attribute instance-attribute

boundary_ratio: float | None = None

cache_backend class-attribute instance-attribute

cache_backend: str = 'none'

cache_config class-attribute instance-attribute

cache_config: DiffusionCacheConfig | dict[str, Any] = field(
    default_factory=dict
)

cache_strategy class-attribute instance-attribute

cache_strategy: str = 'none'

cfg_kv_collect_func class-attribute instance-attribute

cfg_kv_collect_func: Any | None = None

custom_pipeline_args class-attribute instance-attribute

custom_pipeline_args: dict[str, Any] | None = None

diffusers_call_kwargs class-attribute instance-attribute

diffusers_call_kwargs: dict[str, Any] = field(
    default_factory=dict
)

diffusers_load_kwargs class-attribute instance-attribute

diffusers_load_kwargs: dict[str, Any] = field(
    default_factory=dict
)

diffusers_pipeline_cls class-attribute instance-attribute

diffusers_pipeline_cls: type[DiffusionPipeline] | None = (
    None
)

diffusion_attention_config class-attribute instance-attribute

diffusion_attention_config: AttentionConfig = field(
    default_factory=lambda: AttentionConfig()
)

diffusion_kv_cache_dtype class-attribute instance-attribute

diffusion_kv_cache_dtype: str | None = None

diffusion_kv_cache_skip_layer_indices class-attribute instance-attribute

diffusion_kv_cache_skip_layer_indices: set[int] | None = (
    None
)

diffusion_kv_cache_skip_layers class-attribute instance-attribute

diffusion_kv_cache_skip_layers: str | None = None

diffusion_kv_cache_skip_step_indices class-attribute instance-attribute

diffusion_kv_cache_skip_step_indices: set[int] | None = None

diffusion_kv_cache_skip_steps class-attribute instance-attribute

diffusion_kv_cache_skip_steps: str | None = None

diffusion_load_format class-attribute instance-attribute

diffusion_load_format: str = 'default'

disable_autocast class-attribute instance-attribute

disable_autocast: bool = False

dist_timeout class-attribute instance-attribute

dist_timeout: int | None = None

distributed_executor_backend class-attribute instance-attribute

distributed_executor_backend: str = 'mp'

dtype class-attribute instance-attribute

dtype: dtype = bfloat16

enable_cache_dit_summary class-attribute instance-attribute

enable_cache_dit_summary: bool = False

enable_cpu_offload class-attribute instance-attribute

enable_cpu_offload: bool = False

enable_diffusion_pipeline_profiler class-attribute instance-attribute

enable_diffusion_pipeline_profiler: bool = False

enable_layerwise_offload class-attribute instance-attribute

enable_layerwise_offload: bool = False

enable_multithread_weight_load class-attribute instance-attribute

enable_multithread_weight_load: bool = True

enable_prompt_embed_cache class-attribute instance-attribute

enable_prompt_embed_cache: bool = False

enable_sleep_mode class-attribute instance-attribute

enable_sleep_mode: bool = False

enable_stage_verification class-attribute instance-attribute

enable_stage_verification: bool = True

enforce_eager class-attribute instance-attribute

enforce_eager: bool = False

extras class-attribute instance-attribute

extras: dict[str, Any] = Field(default_factory=dict)

flow_shift class-attribute instance-attribute

flow_shift: float | None = None

force_cutlass_fp8 class-attribute instance-attribute

force_cutlass_fp8: bool = False

host class-attribute instance-attribute

host: str | None = None

is_moe property

is_moe: bool

log_level class-attribute instance-attribute

log_level: str = 'info'

lora_path class-attribute instance-attribute

lora_path: str | None = None

lora_scale class-attribute instance-attribute

lora_scale: float = 1.0

mask_strategy_file_path class-attribute instance-attribute

mask_strategy_file_path: str | None = None

master_port class-attribute instance-attribute

master_port: int | None = None

max_cpu_loras class-attribute instance-attribute

max_cpu_loras: int | None = None

max_multimodal_image_inputs class-attribute instance-attribute

max_multimodal_image_inputs: int | None = None

max_num_seqs class-attribute instance-attribute

max_num_seqs: int = 1

moba_config_path class-attribute instance-attribute

moba_config_path: str | None = None

model class-attribute instance-attribute

model: str | None = None

model_class_name class-attribute instance-attribute

model_class_name: str | None = None

model_config class-attribute instance-attribute

model_config: dict[str, Any] = field(default_factory=dict)

model_loaded class-attribute instance-attribute

model_loaded: dict[str, bool] = field(
    default_factory=lambda: {
        "transformer": True,
        "vae": True,
    }
)

model_paths class-attribute instance-attribute

model_paths: dict[str, str] = field(default_factory=dict)

nccl_port class-attribute instance-attribute

nccl_port: int | None = None

num_gpus class-attribute instance-attribute

num_gpus: int | None = None

num_weight_load_threads class-attribute instance-attribute

num_weight_load_threads: int = 4

omni_kv_config class-attribute instance-attribute

omni_kv_config: dict[str, Any] = field(default_factory=dict)

output_type class-attribute instance-attribute

output_type: str = 'pil'

override_transformer_cls_name class-attribute instance-attribute

override_transformer_cls_name: str | None = None

parallel_config class-attribute instance-attribute

parallel_config: DiffusionParallelConfig = field(
    default_factory=DiffusionParallelConfig
)

pin_cpu_memory class-attribute instance-attribute

pin_cpu_memory: bool = True

port class-attribute instance-attribute

port: int | None = None

profiler_config class-attribute instance-attribute

profiler_config: ProfilerConfig | dict[str, Any] | None = (
    None
)

prompt_embed_cache_size class-attribute instance-attribute

prompt_embed_cache_size: int = 32

prompt_file_path class-attribute instance-attribute

prompt_file_path: str | None = None

quantization_config class-attribute instance-attribute

quantization_config: (
    str | QuantizationConfig | dict[str, Any] | None
) = None

revision class-attribute instance-attribute

revision: str | None = None

scheduler_port class-attribute instance-attribute

scheduler_port: int = 5555

skip_time_steps class-attribute instance-attribute

skip_time_steps: int = 15

stage_id class-attribute instance-attribute

stage_id: int = 0

step_execution class-attribute instance-attribute

step_execution: bool = False

supports_multimodal_inputs class-attribute instance-attribute

supports_multimodal_inputs: bool = False

tf_model_config class-attribute instance-attribute

tf_model_config: TransformerConfig = field(
    default_factory=TransformerConfig
)

trust_remote_code class-attribute instance-attribute

trust_remote_code: bool = False

vae_use_slicing class-attribute instance-attribute

vae_use_slicing: bool = False

vae_use_tiling class-attribute instance-attribute

vae_use_tiling: bool = False

worker_extension_cls class-attribute instance-attribute

worker_extension_cls: str | None = None

enrich_config

enrich_config() -> None

Load model metadata from HuggingFace and populate config fields.

Diffusers-style models expose model_index.json with _class_name. Non-diffusers models (e.g. Bagel, NextStep) only have config.json, so we fall back to reading that and mapping model_type manually.

from_kwargs classmethod

from_kwargs(**kwargs: Any) -> OmniDiffusionConfig

set_tf_model_config

set_tf_model_config(tf_config: TransformerConfig) -> None

Assign tf_model_config and propagate quantization if detected.

In the normal startup flow OmniDiffusionConfig is created before the transformer config.json is loaded from disk, so __post_init__ sees an empty TransformerConfig. Callers that load the config later should use this method instead of bare assignment so that an embedded quant_config is propagated to self.quantization_config automatically.

Parameters:

Name Type Description Default
tf_config TransformerConfig

Transformer configuration, typically built via TransformerConfig.from_dict.

required

settle_port

settle_port(
    port: int, port_inc: int = 42, max_attempts: int = 100
) -> int

Find an available port with retry logic.

Parameters:

Name Type Description Default
port int

Initial port to check

required
port_inc int

Port increment for each attempt

42
max_attempts int

Maximum number of attempts to find an available port

100

Returns:

Type Description
int

An available port number

Raises:

Type Description
RuntimeError

If no available port is found after max_attempts

update_multimodal_support

update_multimodal_support() -> None

OmniSleepTask dataclass

Structured sleep instruction.

level class-attribute instance-attribute

level: int = 2

metadata class-attribute instance-attribute

metadata: dict[str, Any] = field(default_factory=dict)

task_id instance-attribute

task_id: str

OmniWakeTask dataclass

Structured wake-up instruction.

tags class-attribute instance-attribute

tags: list[str] | None = None

task_id instance-attribute

task_id: str

TransformerConfig dataclass

Container for raw transformer configuration dictionaries.

params class-attribute instance-attribute

params: dict[str, Any] = field(default_factory=dict)

quant_config class-attribute instance-attribute

quant_config: QuantizationConfig | None = None

quant_method class-attribute instance-attribute

quant_method: str | None = None

from_dict classmethod

from_dict(data: dict[str, Any]) -> TransformerConfig

get

get(key: str, default: Any | None = None) -> Any

to_dict

to_dict() -> dict[str, Any]

build_attention_config

build_attention_config(
    attention_config: AttentionConfig
    | Mapping[str, Any]
    | None = None,
) -> AttentionConfig

Normalize diffusion attention config — the single authoritative entry point.

Called exactly once in OmniDiffusionConfig.__post_init__. Handles type-conversion and env-var fallback (DIFFUSION_ATTENTION_BACKEND).

parse_attention_config

parse_attention_config(
    attention_config: AttentionConfig
    | Mapping[str, Any]
    | None = None,
    *,
    attention_backend: str | None = None,
) -> AttentionConfig

Pure type-conversion: coerce attention_config to an AttentionConfig.

Optionally merges an attention_backend shorthand into the config's default field. This does not read environment variables — use :func:build_attention_config for the full normalisation that should happen exactly once in OmniDiffusionConfig.__post_init__.

parse_kv_cache_skip_selector

parse_kv_cache_skip_selector(
    selector: str
    | list[int]
    | tuple[int, ...]
    | set[int]
    | None,
) -> set[int] | None

Parse a non-negative index selector such as "0-9,20,25-30".