Skip to content

vllm_omni.diffusion.distributed.parallel_state

vLLM-Omni distributed state.

It takes over the control of the distributed environment from PyTorch. The typical workflow is:

  • call init_distributed_environment to initialize the distributed environment.
  • call initialize_model_parallel or ensure_model_parallel_initialized to initialize the model parallel groups.

  • any code dealing with the distributed stuff

  • call destroy_model_parallel to destroy the model parallel groups.

  • call destroy_distributed_environment to destroy the distributed environment.

If you only need to use the distributed environment without model parallelism, you can skip the model parallel initialization and destruction steps.

HAS_FLASH_ATTN module-attribute

HAS_FLASH_ATTN = env_info['has_flash_attn']

env_info module-attribute

env_info = get_packages_info()

logger module-attribute

logger = init_logger(__name__)

RankGenerator

cfg instance-attribute

cfg = cfg

dp instance-attribute

dp = dp

ep instance-attribute

ep = tp * sp * cfg * dp

fs instance-attribute

fs = fs

name_to_size instance-attribute

name_to_size = {
    "tp": tp,
    "sp": sp,
    "pp": pp,
    "cfg": cfg,
    "dp": dp,
    "fs": fs,
}

order instance-attribute

order = order

ordered_size instance-attribute

ordered_size = []

pp instance-attribute

pp = pp

rank_offset instance-attribute

rank_offset = rank_offset

sp instance-attribute

sp = sp

tp instance-attribute

tp = tp

world_size instance-attribute

world_size = tp * sp * pp * cfg * dp

get_mask

get_mask(order: str, token: str)

get_ranks

get_ranks(token, independent_ranks: bool = False)

Get rank group by input token.

Parameters:

Name Type Description Default
token str

Specify the ranks type that want to get. If we want to obtain multiple parallel types, we can use a hyphen '-' to separate them. For example, if we want to obtain the TP_DP group, the token should be 'tp-dp'.

required
independent_ranks bool

If True, generate independent rank groups that divide the world into groups of the specified size. Used for FS (fully shard) groups which operate independently from the main parallelism hierarchy.

False

destroy_distributed_env

destroy_distributed_env()

destroy_distributed_environment

destroy_distributed_environment()

destroy_model_parallel

destroy_model_parallel()

Set the groups to none and destroy them.

generate_masked_orthogonal_rank_groups

generate_masked_orthogonal_rank_groups(
    world_size: int,
    parallel_size: list[int],
    mask: list[bool],
) -> list[list[int]]

Generate orthogonal parallel groups based on the parallel size and mask.

Parameters:

Name Type Description Default
world_size int

world size

required
parallel_size list[int]

The parallel size of each orthogonal parallel type. For example, if tensor_parallel_size = 2, pipeline_model_parallel_group = 3, data_parallel_size = 4, and the parallel mapping order is tp-pp-dp, then the parallel_size = [2, 3, 4].

required
mask list[bool]

The mask controls which parallel methods the generated groups represent. If mask[i] is True, it means the generated group contains the i-th parallelism method. For example, if parallel_size = [tp_size, pp_size, dp_size], and mask = [True, False , True], then the generated group is the tp-dp group, if the mask = [False, True, False], then the generated group is the pp group.

required
Algorithm

For orthogonal parallelism, such as tp/dp/pp/cp, the global_rank and local_rank satisfy the following equation: global_rank = tp_rank + dp_rank * tp_size + pp_rank * tp_size * dp_size (1) tp_rank \in [0, tp_size) dp_rank \in [0, dp_size) pp_rank \in [0, pp_size)

If we want to get the dp_group (tp_size * pp_size groups of dp_size ranks each. For example, if the gpu size is 8 and order is 'tp-pp-dp', size is '2-2-2', and the dp_group here is [[0, 4], [1, 5], [2, 6], [3, 7]].) The tp_rank and pp_rank will be combined to form the dp_group_index. dp_group_index = tp_rank + pp_rank * tp_size (2)

So, Given that tp_rank and pp_rank satisfy equation (2), and dp_rank in range(0, dp_size), the ranks in dp_group[dp_group_index] satisfies the equation (1).

This function solve this math problem.

For example, if the parallel_size = [tp_size, dp_size, pp_size] = [2, 3, 4], and the mask = [False, True, False]. Then, dp_group_index(0) = tp_rank(0) + pp_rank(0) * 2 dp_group_index(1) = tp_rank(1) + pp_rank(0) * 2 ... dp_group_index(7) = tp_rank(1) + pp_rank(3) * 2

dp_group[0] = 0 + range(0, 3) * 2 + 0 = [0, 2, 4]
dp_group[1] = 1 + range(0, 3) * 2 + 0 = [1, 3, 5]
...
dp_group[7] = 1 + range(0, 3) * 2 + 3 * 2 * 3 = [19, 21, 23]

get_cfg_group

get_cfg_group() -> GroupCoordinator

get_classifier_free_guidance_rank

get_classifier_free_guidance_rank()

Return my rank for the classifier_free_guidance parallel group.

get_classifier_free_guidance_world_size

get_classifier_free_guidance_world_size()

Return world size for the classifier_free_guidance parallel group.

get_data_parallel_rank

get_data_parallel_rank()

Return my rank for the data parallel group.

get_data_parallel_world_size

get_data_parallel_world_size()

Return world size for the data parallel group.

get_dit_group

get_dit_group()

get_dit_world_size

get_dit_world_size()

Return world size for the DiT model.

get_dp_group

get_dp_group() -> GroupCoordinator

get_fs_group

get_fs_group() -> GroupCoordinator

get_fully_shard_rank

get_fully_shard_rank()

Return my rank for the fully shard group.

get_fully_shard_world_size

get_fully_shard_world_size()

Return world size for the fully shard group.

get_pipeline_parallel_rank

get_pipeline_parallel_rank()

Return my rank for the pipeline model parallel group.

get_pipeline_parallel_world_size

get_pipeline_parallel_world_size()

Return world size for the pipeline model parallel group.

get_pp_group

get_pp_group() -> PipelineGroupCoordinator

get_ring_parallel_rank

get_ring_parallel_rank()

get_ring_parallel_world_size

get_ring_parallel_world_size()

get_sequence_parallel_rank

get_sequence_parallel_rank()

Return my rank for the sequence parallel group.

get_sequence_parallel_world_size

get_sequence_parallel_world_size()

Return world size for the sequence parallel group.

get_sp_group

get_ulysses_parallel_rank

get_ulysses_parallel_rank()

get_ulysses_parallel_world_size

get_ulysses_parallel_world_size()

get_world_group

get_world_group() -> GroupCoordinator

init_distributed_environment

init_distributed_environment(
    world_size: int = -1,
    rank: int = -1,
    distributed_init_method: str = "env://",
    local_rank: int = -1,
    backend: str | None = None,
)

init_dit_group

init_dit_group(dit_parallel_size: int, backend: str)

init_model_parallel_group

init_model_parallel_group(
    group_ranks: list[list[int]],
    local_rank: int,
    backend: str,
    parallel_mode: str,
    **kwargs,
) -> GroupCoordinator

init_world_group

init_world_group(
    ranks: list[int], local_rank: int, backend: str
) -> GroupCoordinator

initialize_model_parallel

initialize_model_parallel(
    data_parallel_size: int = 1,
    cfg_parallel_size: int = 1,
    sequence_parallel_size: int | None = None,
    ulysses_degree: int = 1,
    ring_degree: int = 1,
    tensor_parallel_size: int = 1,
    pipeline_parallel_size: int = 1,
    fully_shard_degree: int = 1,
    hsdp_replicate_size: int = 1,
    enable_expert_parallel: bool = False,
    backend: str | None = None,
) -> None

is_dp_last_group

is_dp_last_group()

Return True if in the last data parallel group, False otherwise.

is_pipeline_first_stage

is_pipeline_first_stage()

Return True if in the first pipeline model parallel stage, False otherwise.

is_pipeline_last_stage

is_pipeline_last_stage()

Return True if in the last pipeline model parallel stage, False otherwise.

model_parallel_is_initialized

model_parallel_is_initialized()

Check if tensor and pipeline parallel groups are initialized.

set_seq_parallel_pg

set_seq_parallel_pg(
    sp_ulysses_degree: int,
    sp_ring_degree: int,
    rank: int,
    world_size: int,
    use_ulysses_low: bool = True,
    sp_group_ranks: list[list[int]] | None = None,
) -> tuple[ProcessGroup, ProcessGroup]

Initialize sequence-parallel Ulysses and Ring process groups.

This builds sequence-parallel (SP) subgroups inside each data-parallel (DP) slice. The SP group size is sp_ulysses_degree * sp_ring_degree, and world_size must be divisible by that size.

Parameters:

Name Type Description Default
sp_ulysses_degree int

Size of each Ulysses subgroup.

required
sp_ring_degree int

Size of each Ring subgroup.

required
rank int

Global rank of the current process.

required
world_size int

Total number of processes.

required
use_ulysses_low bool

If True, Ulysses groups are contiguous chunks and Ring groups are strided within each SP group. If False, the opposite.

True
sp_group_ranks list[list[int]] | None

Optional explicit SP groups. Each entry must be a list of length sp_ulysses_degree * sp_ring_degree. When provided, groups are built from these ranks instead of auto-generated contiguous ranges.

None

Returns:

Name Type Description
ulyssess_pg ProcessGroup

The Ulysses process group for this rank.

ring_pg ProcessGroup

The Ring process group for this rank.

Raises:

Type Description
ValueError

If sp_group_ranks length does not match world_size or any entry has the wrong size.

AssertionError

If world_size is not divisible by sp_size.

Behavior
  • If sp_group_ranks is provided, groups are built per entry and each entry is further split into Ulysses/Ring groups according to use_ulysses_low.
  • If sp_group_ranks is None, groups are auto-generated within each DP slice using offsets of size sp_size.