vllm_omni.diffusion.distributed.parallel_state ¶
vLLM-Omni distributed state.
It takes over the control of the distributed environment from PyTorch. The typical workflow is:
- call
init_distributed_environmentto initialize the distributed environment. -
call
initialize_model_parallelorensure_model_parallel_initializedto initialize the model parallel groups. -
any code dealing with the distributed stuff
-
call
destroy_model_parallelto destroy the model parallel groups. - call
destroy_distributed_environmentto destroy the distributed environment.
If you only need to use the distributed environment without model parallelism, you can skip the model parallel initialization and destruction steps.
RankGenerator ¶
name_to_size instance-attribute ¶
get_ranks ¶
get_ranks(token, independent_ranks: bool = False)
Get rank group by input token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token | str | Specify the ranks type that want to get. If we want to obtain multiple parallel types, we can use a hyphen '-' to separate them. For example, if we want to obtain the TP_DP group, the token should be 'tp-dp'. | required |
independent_ranks | bool | If True, generate independent rank groups that divide the world into groups of the specified size. Used for FS (fully shard) groups which operate independently from the main parallelism hierarchy. | False |
generate_masked_orthogonal_rank_groups ¶
generate_masked_orthogonal_rank_groups(
world_size: int,
parallel_size: list[int],
mask: list[bool],
) -> list[list[int]]
Generate orthogonal parallel groups based on the parallel size and mask.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
world_size | int | world size | required |
parallel_size | list[int] | The parallel size of each orthogonal parallel type. For example, if tensor_parallel_size = 2, pipeline_model_parallel_group = 3, data_parallel_size = 4, and the parallel mapping order is tp-pp-dp, then the parallel_size = [2, 3, 4]. | required |
mask | list[bool] | The mask controls which parallel methods the generated groups represent. If mask[i] is True, it means the generated group contains the i-th parallelism method. For example, if parallel_size = [tp_size, pp_size, dp_size], and mask = [True, False , True], then the generated group is the | required |
Algorithm
For orthogonal parallelism, such as tp/dp/pp/cp, the global_rank and local_rank satisfy the following equation: global_rank = tp_rank + dp_rank * tp_size + pp_rank * tp_size * dp_size (1) tp_rank \in [0, tp_size) dp_rank \in [0, dp_size) pp_rank \in [0, pp_size)
If we want to get the dp_group (tp_size * pp_size groups of dp_size ranks each. For example, if the gpu size is 8 and order is 'tp-pp-dp', size is '2-2-2', and the dp_group here is [[0, 4], [1, 5], [2, 6], [3, 7]].) The tp_rank and pp_rank will be combined to form the dp_group_index. dp_group_index = tp_rank + pp_rank * tp_size (2)
So, Given that tp_rank and pp_rank satisfy equation (2), and dp_rank in range(0, dp_size), the ranks in dp_group[dp_group_index] satisfies the equation (1).
This function solve this math problem.
For example, if the parallel_size = [tp_size, dp_size, pp_size] = [2, 3, 4], and the mask = [False, True, False]. Then, dp_group_index(0) = tp_rank(0) + pp_rank(0) * 2 dp_group_index(1) = tp_rank(1) + pp_rank(0) * 2 ... dp_group_index(7) = tp_rank(1) + pp_rank(3) * 2
dp_group[0] = 0 + range(0, 3) * 2 + 0 = [0, 2, 4]
dp_group[1] = 1 + range(0, 3) * 2 + 0 = [1, 3, 5]
...
dp_group[7] = 1 + range(0, 3) * 2 + 3 * 2 * 3 = [19, 21, 23]
get_classifier_free_guidance_rank ¶
Return my rank for the classifier_free_guidance parallel group.
get_classifier_free_guidance_world_size ¶
Return world size for the classifier_free_guidance parallel group.
get_data_parallel_world_size ¶
Return world size for the data parallel group.
get_fully_shard_world_size ¶
Return world size for the fully shard group.
get_pipeline_parallel_rank ¶
Return my rank for the pipeline model parallel group.
get_pipeline_parallel_world_size ¶
Return world size for the pipeline model parallel group.
get_sequence_parallel_rank ¶
Return my rank for the sequence parallel group.
get_sequence_parallel_world_size ¶
Return world size for the sequence parallel group.
init_distributed_environment ¶
init_distributed_environment(
world_size: int = -1,
rank: int = -1,
distributed_init_method: str = "env://",
local_rank: int = -1,
backend: str | None = None,
)
init_model_parallel_group ¶
init_model_parallel_group(
group_ranks: list[list[int]],
local_rank: int,
backend: str,
parallel_mode: str,
**kwargs,
) -> GroupCoordinator
init_world_group ¶
init_world_group(
ranks: list[int], local_rank: int, backend: str
) -> GroupCoordinator
initialize_model_parallel ¶
initialize_model_parallel(
data_parallel_size: int = 1,
cfg_parallel_size: int = 1,
sequence_parallel_size: int | None = None,
ulysses_degree: int = 1,
ring_degree: int = 1,
tensor_parallel_size: int = 1,
pipeline_parallel_size: int = 1,
fully_shard_degree: int = 1,
hsdp_replicate_size: int = 1,
enable_expert_parallel: bool = False,
backend: str | None = None,
) -> None
is_dp_last_group ¶
Return True if in the last data parallel group, False otherwise.
is_pipeline_first_stage ¶
Return True if in the first pipeline model parallel stage, False otherwise.
is_pipeline_last_stage ¶
Return True if in the last pipeline model parallel stage, False otherwise.
model_parallel_is_initialized ¶
Check if tensor and pipeline parallel groups are initialized.
set_seq_parallel_pg ¶
set_seq_parallel_pg(
sp_ulysses_degree: int,
sp_ring_degree: int,
rank: int,
world_size: int,
use_ulysses_low: bool = True,
sp_group_ranks: list[list[int]] | None = None,
) -> tuple[ProcessGroup, ProcessGroup]
Initialize sequence-parallel Ulysses and Ring process groups.
This builds sequence-parallel (SP) subgroups inside each data-parallel (DP) slice. The SP group size is sp_ulysses_degree * sp_ring_degree, and world_size must be divisible by that size.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sp_ulysses_degree | int | Size of each Ulysses subgroup. | required |
sp_ring_degree | int | Size of each Ring subgroup. | required |
rank | int | Global rank of the current process. | required |
world_size | int | Total number of processes. | required |
use_ulysses_low | bool | If True, Ulysses groups are contiguous chunks and Ring groups are strided within each SP group. If False, the opposite. | True |
sp_group_ranks | list[list[int]] | None | Optional explicit SP groups. Each entry must be a list of length sp_ulysses_degree * sp_ring_degree. When provided, groups are built from these ranks instead of auto-generated contiguous ranges. | None |
Returns:
| Name | Type | Description |
|---|---|---|
ulyssess_pg | ProcessGroup | The Ulysses process group for this rank. |
ring_pg | ProcessGroup | The Ring process group for this rank. |
Raises:
| Type | Description |
|---|---|
ValueError | If sp_group_ranks length does not match world_size or any entry has the wrong size. |
AssertionError | If world_size is not divisible by sp_size. |
Behavior
- If sp_group_ranks is provided, groups are built per entry and each entry is further split into Ulysses/Ring groups according to use_ulysses_low.
- If sp_group_ranks is None, groups are auto-generated within each DP slice using offsets of size sp_size.