Skip to content

vllm.config.parallel

Classes:

  • EPLBConfig

    Configuration for Expert Parallel Load Balancing (EP).

  • ParallelConfig

    Configuration for the distributed execution.

EPLBConfig

Configuration for Expert Parallel Load Balancing (EP).

Attributes:

Source code in vllm/config/parallel.py
@config
class EPLBConfig:
    """Configuration for Expert Parallel Load Balancing (EP)."""

    window_size: int = Field(default=1000, gt=0)
    """Window size for expert load recording."""
    step_interval: int = Field(default=3000, gt=0)
    """
    Interval for rearranging experts in expert parallelism.

    Note that if this is greater than the EPLB window size, only the metrics
    of the last `lb_window_size` steps will be used for rearranging experts.
    """

    num_redundant_experts: int = Field(default=0, ge=0)
    """Number of redundant experts to use for expert parallelism."""

    log_balancedness: bool = False
    """
    Log the balancedness each step of expert parallelism.
    This is turned off by default since it will cause communication overhead.
    """
    log_balancedness_interval: int = Field(default=1, gt=0)
    """
    Interval for logging the balancedness.
    """
    use_async: bool = True
    """
    Whether to use non-blocking EPLB.
    """

    policy: EPLBPolicyOption = "default"
    """The policy type for expert parallel load balancing (EPLB)."""

    communicator: EPLBCommunicatorBackend | None = None
    """
    Backend for EPLB expert weight communication:
    - "torch_nccl": Use torch.distributed on the device process group
    - "torch_gloo": Use torch.distributed gloo with CPU staging
    - "nixl": Use NIXL/ RIXL with staged send/recv buffers
    - "pynccl": Use PyNccl send/recv
    - None: Auto-select backend (prefers "nixl", falls back to "torch_gloo")
    """

    @model_validator(mode="after")
    def _validate_eplb_config(self) -> Self:
        if self.use_async and self.policy != "default":
            raise ValueError("Async EPLB is only supported with the default policy.")
        if self.use_async and self.communicator in ("torch_nccl", "pynccl"):
            raise ValueError(
                f"{self.communicator} communicator is incompatible with "
                "async EPLB due to NCCL multi-stream conflicts. Use "
                "'torch_gloo' or 'nixl' instead, or leave communicator "
                "unset for automatic selection."
            )
        if self.log_balancedness and self.log_balancedness_interval <= 0:
            raise ValueError("log_balancedness_interval must be greater than 0.")
        return self

communicator = None class-attribute instance-attribute

Backend for EPLB expert weight communication: - "torch_nccl": Use torch.distributed on the device process group - "torch_gloo": Use torch.distributed gloo with CPU staging - "nixl": Use NIXL/ RIXL with staged send/recv buffers - "pynccl": Use PyNccl send/recv - None: Auto-select backend (prefers "nixl", falls back to "torch_gloo")

log_balancedness = False class-attribute instance-attribute

Log the balancedness each step of expert parallelism. This is turned off by default since it will cause communication overhead.

log_balancedness_interval = Field(default=1, gt=0) class-attribute instance-attribute

Interval for logging the balancedness.

num_redundant_experts = Field(default=0, ge=0) class-attribute instance-attribute

Number of redundant experts to use for expert parallelism.

policy = 'default' class-attribute instance-attribute

The policy type for expert parallel load balancing (EPLB).

step_interval = Field(default=3000, gt=0) class-attribute instance-attribute

Interval for rearranging experts in expert parallelism.

Note that if this is greater than the EPLB window size, only the metrics of the last lb_window_size steps will be used for rearranging experts.

use_async = True class-attribute instance-attribute

Whether to use non-blocking EPLB.

window_size = Field(default=1000, gt=0) class-attribute instance-attribute

Window size for expert load recording.

ParallelConfig

Configuration for the distributed execution.

Methods:

Attributes:

Source code in vllm/config/parallel.py
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
@config
class ParallelConfig:
    """Configuration for the distributed execution."""

    pipeline_parallel_size: int = Field(default=1, ge=1)
    """Number of pipeline parallel groups."""
    tensor_parallel_size: int = Field(default=1, ge=1)
    """Number of tensor parallel groups."""
    prefill_context_parallel_size: int = Field(default=1, ge=1)
    """Number of prefill context parallel groups."""
    data_parallel_size: int = Field(default=1, ge=1)
    """Number of data parallel groups. MoE layers will be sharded according to
    the product of the tensor parallel size and data parallel size."""
    data_parallel_size_local: int = Field(default=1, ge=0)
    """Number of local data parallel groups. A value of 0 is a sentinel used by
    the engine-args layer to signal that data parallelism was specified
    externally (see `ParallelConfig.__post_init__`)."""
    data_parallel_rank: int = Field(default=0, ge=0)
    """Rank of the data parallel group. The runtime check at
    ``__post_init__`` further bounds this by ``data_parallel_size``."""
    data_parallel_rank_local: int | None = None
    """Local rank of the data parallel group, set only in SPMD mode."""
    data_parallel_master_ip: str = "127.0.0.1"
    """IP of the data parallel master."""
    data_parallel_rpc_port: int = 29550
    """Port for data parallel messaging."""
    data_parallel_master_port: int = 29500
    """Port of the data parallel master."""
    data_parallel_backend: DataParallelBackend = "mp"
    """Backend to use for data parallel, either "mp" or "ray"."""
    data_parallel_external_lb: bool = False
    """Whether to use "external" DP LB mode. Applies only to online serving
    and when data_parallel_size > 0. This is useful for a "one-pod-per-rank"
    wide-EP setup in Kubernetes. Supported only for MoE deployments; non-MoE
    models should use independent vLLM instances without --data-parallel-*
    arguments. Set implicitly when --data-parallel-rank is provided explicitly
    to vllm serve."""
    data_parallel_hybrid_lb: bool = False
    """Whether to use "hybrid" DP LB mode. Applies only to online serving
    and when data_parallel_size > 0. Enables running an AsyncLLM
    and API server on a "per-node" basis where vLLM load balances
    between local data parallel ranks, but an external LB balances
    between vLLM nodes/replicas. Set explicitly in conjunction with
    --data-parallel-start-rank."""
    is_moe_model: bool | None = None
    """Whether the deployed model is MoE (if known)."""
    enable_expert_parallel: bool = False
    """Use expert parallelism instead of tensor parallelism for MoE layers."""
    enable_ep_weight_filter: bool = False
    """Skip non-local expert weights during model loading when expert
    parallelism is active.  Each rank only reads its own expert shard from
    disk, which can drastically reduce storage I/O for MoE models with
    per-expert weight tensors (e.g. DeepSeek, Mixtral, Kimi-K2.5).  Has no
    effect on 3D fused-expert checkpoints (e.g. GPT-OSS) or non-MoE
    models."""
    enable_eplb: bool = False
    """Enable expert parallelism load balancing for MoE layers."""
    eplb_config: EPLBConfig = Field(default_factory=EPLBConfig)
    """Expert parallelism configuration."""
    expert_placement_strategy: ExpertPlacementStrategy = "linear"
    """The expert placement strategy for MoE layers:

    - "linear": Experts are placed in a contiguous manner. For example, with 4
      experts and 2 ranks, rank 0 will have experts [0, 1] and rank 1 will have
      experts [2, 3].
    - "round_robin": Experts are placed in a round-robin manner. For example,
      with 4 experts and 2 ranks, rank 0 will have experts [0, 2] and rank 1
      will have experts [1, 3]. This strategy can help improve load balancing
      for grouped expert models with no redundant experts."""
    all2all_backend: All2AllBackend = "allgather_reducescatter"
    """All2All backend for MoE expert parallel communication. Available options:

    - "allgather_reducescatter": All2all based on allgather and reducescatter
    - "deepep_high_throughput": Use deepep high-throughput kernels
    - "deepep_low_latency": Use deepep low-latency kernels
    - "mori_high_throughput": MoRI EP with InterNodeV1 for multi-node
    - "mori_low_latency": MoRI EP with InterNodeV1LL for multi-node
    - "nixl_ep": Use nixl-ep kernels
    - "flashinfer_nvlink_two_sided": Use flashinfer two-sided kernels for mnnvl
    - "flashinfer_nvlink_one_sided": Use flashinfer high-throughput a2a kernels"""

    max_parallel_loading_workers: int | None = Field(default=None, ge=1)
    """Maximum number of parallel loading workers when loading model
    sequentially in multiple batches. To avoid RAM OOM when using tensor
    parallel and large models."""

    disable_custom_all_reduce: bool = False
    """Disable the custom all-reduce kernel and fall back to NCCL."""

    enable_elastic_ep: bool = False
    """Enable elastic expert parallelism with stateless NCCL groups for DP/EP."""

    enable_dbo: bool = False
    """Enable dual batch overlap for the model executor."""
    ubatch_size: int = Field(default=0, ge=0)
    """Number of ubatch size."""

    dbo_decode_token_threshold: int = Field(default=32, ge=0)
    """The threshold for dual batch overlap for batches only containing decodes.
    If the number of tokens in the request is greater than this threshold,
    microbatching will be used. Otherwise, the request will be processed in a
    single batch."""
    dbo_prefill_token_threshold: int = Field(default=512, ge=0)  # TODO(lucas): tune
    """The threshold for dual batch overlap for batches that contain one or more
    prefills. If the number of tokens in the request is greater than this
    threshold, microbatching will be used. Otherwise, the request will be
    processed in a single batch."""

    disable_nccl_for_dp_synchronization: bool | None = None
    """Forces the dp synchronization logic in vllm/v1/worker/dp_utils.py 
    to use Gloo instead of NCCL for its all reduce.

    Defaults to True when async scheduling is enabled, False otherwise.
    """

    ray_workers_use_nsight: bool = False
    """Whether to profile Ray workers with nsight, see https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html#profiling-nsight-profiler."""

    ray_runtime_env: RuntimeEnv | None = None
    """Ray runtime environment to pass to distributed workers."""

    placement_group: PlacementGroup | None = None
    """ray distributed model workers placement group."""

    distributed_executor_backend: (
        str | DistributedExecutorBackend | type[Executor] | None
    ) = None
    """
    Backend to use for distributed model workers, either "ray" or "mp"
    (multiprocessing). If the product of pipeline_parallel_size and tensor_parallel_size
    is less than or equal to the number of GPUs available, "mp" will be used to
    keep processing on a single host. Otherwise, an error will be raised. To use "mp"
    you must also set nnodes, and to use "ray" you must manually set
    distributed_executor_backend to "ray".

    Note:
        [TPU](https://docs.vllm.ai/projects/tpu/en/latest/) platform only supports Ray
        for distributed inference.
    """

    worker_cls: str = "auto"
    """The full name of the worker class to use. If "auto", the worker class
    will be determined based on the platform."""
    sd_worker_cls: str = "auto"
    """The full name of the worker class to use for speculative decoding.
    If "auto", the worker class will be determined based on the platform."""
    worker_extension_cls: str = ""
    """The full name of the worker extension class to use. The worker extension
    class is dynamically inherited by the worker class. This is used to inject
    new attributes and methods to the worker class for use in collective_rpc
    calls."""
    master_addr: str = "127.0.0.1"
    """distributed master address for multi-node distributed 
    inference when distributed_executor_backend is mp."""
    master_port: int = 29501
    """distributed master port for multi-node distributed 
    inference when distributed_executor_backend is mp."""
    node_rank: int = Field(default=0, ge=0)
    """distributed node rank for multi-node distributed
    inference when distributed_executor_backend is mp."""
    nnodes: int = Field(default=1, ge=1)
    """num of nodes for multi-node distributed
    inference when distributed_executor_backend is mp."""
    numa_bind: bool = False
    """Enable NUMA binding for GPU worker subprocesses.

    By default, workers are pinned to their GPU's NUMA-local CPUs and
    memory; on PCT-capable Xeons they also auto-bind to the SKU's
    PCT priority cores.
    """
    numa_bind_nodes: list[int] | None = None
    """NUMA node to bind each GPU worker to.

    Specify one NUMA node per visible GPU, for example `[0, 0, 1, 1]`
    for a 4-GPU system with GPUs 0-1 on NUMA node 0 and GPUs 2-3 on
    NUMA node 1. If unset and `numa_bind=True`, vLLM auto-detects the
    GPU-to-NUMA topology. The values are passed to `numactl --membind`
    and `--cpunodebind`, so they must be valid `numactl` NUMA node indices.
    """
    numa_bind_cpus: list[str] | None = None
    """Optional CPU lists to bind each GPU worker to.

    Specify one CPU list per visible GPU, for example
    `["0-3", "4-7", "8-11", "12-15"]`. When set, vLLM uses
    `numactl --physcpubind` instead of `--cpunodebind`. This is useful
    for custom policies such as binding to PCT or other high-frequency cores.
    Each entry must use `numactl --physcpubind` CPU-list syntax, for example
    `"0-3"` or `"0,2,4-7"`.
    """
    assigned_physical_gpu_ids: list[int] | None = None
    """Mapping from vLLM-local logical GPU IDs to physical GPU IDs.

    For example, ``[2, 3]`` means logical GPU 0 maps to physical GPU 2,
    and logical GPU 1 maps to physical GPU 3. Physical IDs are used only
    at platform/topology boundaries such as NVML, NIC affinity, P2P
    checks, and final CUDA device selection when needed. When None,
    logical IDs map to visible device IDs in order."""

    distributed_timeout_seconds: int | None = None
    """Timeout in seconds for distributed operations (e.g., init_process_group).
    If set, this value is passed to torch.distributed.init_process_group as the
    timeout parameter. If None, PyTorch's default timeout is used (600s for NCCL).
    Increase this for multi-node setups where model downloads may be slow."""

    cpu_distributed_timeout_seconds: int | None = None
    """Timeout (in seconds) for cpu communication groups. If None, PyTorch's
    default timeout is used (1800s for gloo)."""

    world_size: int = Field(init=False)
    """world_size is TPxPP, it affects the number of workers we create."""

    rank: int = 0
    """Global rank in distributed setup."""

    _data_parallel_master_port_list: list[int] = Field(default_factory=list)
    """List of open port auto-queried for data parallel messaging.
    Set to be private as it's not intended to be configured by users.
    """

    _coord_store_port: int = 0
    """Port of the coordination TCPStore. Can be set by the API server; workers
    connect as clients to exchange self-picked group ports at runtime."""

    decode_context_parallel_size: int = Field(default=1, ge=1)
    """Number of decode context parallel groups, because the world size does
    not change by dcp, it simply reuse the GPUs of TP group, and tp_size
    needs to be divisible by dcp_size."""

    dcp_kv_cache_interleave_size: int = 1
    """
    Interleave size of kv_cache storage while using DCP.
    dcp_kv_cache_interleave_size has been replaced by cp_kv_cache_interleave_size,
    and will be deprecated when PCP is fully supported.

    """
    dcp_comm_backend: DCPCommBackend = "ag_rs"
    """Communication backend for Decode Context Parallel (DCP).
    - "ag_rs": AllGather + ReduceScatter (default, existing behavior)
    - "a2a": All-to-All exchange of partial outputs + LSE, then
      combine with Triton kernel. Reduces NCCL calls from 3 to 2
      per layer for MLA models.
    """

    cp_kv_cache_interleave_size: int = 1
    """Interleave size of kv_cache storage while using DCP or PCP.
    For `total_cp_rank = pcp_rank * dcp_world_size + dcp_rank`,
        and `total_cp_world_size = pcp_world_size * dcp_world_size`.
    store interleave_size tokens on total_cp_rank i,
    then store next interleave_size tokens on total_cp_rank i+1.
    Interleave_size=1: token-level alignment, where token `i` is stored on
        total_cp_rank `i % total_cp_world_size`.
    Interleave_size=block_size: block-level alignment, where tokens are
        first populated to the preceding ranks. Tokens are then stored
        in (rank i+1, block j) only after (rank i, block j) is fully occupied.
    Block_size should be greater than or equal to cp_kv_cache_interleave_size.
    Block_size should be divisible by cp_kv_cache_interleave_size.
    """

    data_parallel_index: int = Field(init=False)
    """Equal to the data parallel rank but not used for torch process groups
    and not overridden for dense models."""

    _api_process_count: int = Field(default=1, gt=0)
    """
    The number of API processes initialized.

    Note:
        This is an internal config that is only valid for and
        should only be set by API server scale-out.
    """

    _api_process_rank: int = Field(default=0, ge=-1)
    """
    The rank of this API process, or `-1` for engine core processes
    under API server scale-out.

    Note:
        This is an internal config that is only valid for and
        should only be set by API server scale-out.
    """

    @field_validator("disable_nccl_for_dp_synchronization", mode="wrap")
    @classmethod
    def _skip_none_validation(cls, value: Any, handler: Callable) -> Any:
        """Skip validation if the value is `None` when initialisation is delayed."""
        return None if value is None else handler(value)

    @field_validator("numa_bind_nodes")
    @classmethod
    def _validate_numa_bind_nodes(cls, value: list[int] | None) -> list[int] | None:
        if value is None:
            return None
        if not value:
            raise ValueError("numa_bind_nodes must not be empty.")
        if any(node < 0 for node in value):
            raise ValueError("numa_bind_nodes must contain non-negative integers.")
        return value

    @field_validator("numa_bind_cpus")
    @classmethod
    def _validate_numa_bind_cpus(cls, value: list[str] | None) -> list[str] | None:
        if value is None:
            return None
        if not value:
            raise ValueError("numa_bind_cpus must not be empty.")

        for cpuset in value:
            if not cpuset:
                raise ValueError("numa_bind_cpus entries must not be empty.")
            if not _NUMACTL_CPUSET_PATTERN.fullmatch(cpuset):
                raise ValueError(
                    "numa_bind_cpus entries must use numactl CPU list syntax, "
                    "for example '0-3' or '0,2,4-7'."
                )
            for part in cpuset.split(","):
                if "-" not in part:
                    continue
                start_str, end_str = part.split("-", 1)
                if int(start_str) > int(end_str):
                    raise ValueError(
                        f"numa_bind_cpus ranges must be ascending, but got '{cpuset}'."
                    )
        return value

    @model_validator(mode="after")
    def _validate_parallel_config(self) -> Self:
        if self._api_process_rank >= self._api_process_count:
            raise ValueError(
                "Invalid value of `_api_process_rank`. "
                f"Expected to be `-1` or `[0, {self._api_process_count})`, "
                f"but found: {self._api_process_rank}"
            )

        if self.all2all_backend in ["pplx", "naive"]:
            logger.warning(
                "The '%s' all2all backend has been removed. "
                "Falling back to 'allgather_reducescatter'.",
                self.all2all_backend,
            )
            self.all2all_backend = "allgather_reducescatter"

        if self.data_parallel_size_local > self.data_parallel_size:
            raise ValueError(
                f"data_parallel_size_local ({self.data_parallel_size_local}) "
                f"must be <= data_parallel_size ({self.data_parallel_size})"
            )

        if self.data_parallel_size <= 1 and self.data_parallel_external_lb:
            raise ValueError(
                "data_parallel_external_lb can only be set when data_parallel_size > 1"
            )

        if not self.numa_bind and (
            self.numa_bind_nodes is not None or self.numa_bind_cpus is not None
        ):
            raise ValueError(
                "numa_bind_nodes and numa_bind_cpus require numa_bind=True."
            )

        if self.enable_eplb:
            if not current_platform.is_cuda_alike():
                raise ValueError(
                    "Expert parallelism load balancing is only supported on "
                    "CUDA devices or ROCm devices now."
                )
            if not self.enable_expert_parallel:
                raise ValueError("enable_expert_parallel must be True to use EPLB.")
            if self.tensor_parallel_size * self.data_parallel_size <= 1:
                raise ValueError(
                    "EPLB requires tensor_parallel_size or data_parallel_size "
                    f"to be greater than 1, but got "
                    f"TP={self.tensor_parallel_size},DP={self.data_parallel_size}."
                )
        else:
            if self.eplb_config.num_redundant_experts != 0:
                raise ValueError(
                    "num_redundant_experts is set to "
                    f"{self.eplb_config.num_redundant_experts} but EPLB is not "
                    "enabled. Either enable EPLB or unset "
                    "num_redundant_experts."
                )

        # Note(hc): In the current implementation of decode context
        # parallel(DCP), tp_size needs to be divisible by dcp_size,
        # because the world size does not change by dcp, it simply
        # reuses the GPUs of TP group, and split one TP group into
        # tp_size//dcp_size DCP groups.
        if self.tensor_parallel_size % self.decode_context_parallel_size != 0:
            raise ValueError(
                f"tp_size={self.tensor_parallel_size} must be divisible by"
                f"dcp_size={self.decode_context_parallel_size}."
            )

        if self.dcp_comm_backend == "a2a" and self.decode_context_parallel_size <= 1:
            raise ValueError(
                "dcp_comm_backend='a2a' requires decode_context_parallel_size > 1."
            )

        return self

    @property
    def world_size_across_dp(self) -> int:
        """world_size_across_dp is TPxPPxDP, it is the size of the world
        including data parallelism."""
        return self.world_size * self.data_parallel_size

    @property
    def use_ubatching(self) -> bool:
        return self.enable_dbo or self.ubatch_size > 1

    @property
    def num_ubatches(self) -> int:
        return 2 if self.enable_dbo else self.ubatch_size

    @property
    def local_engines_only(self) -> bool:
        """
        Client manages local+remote EngineCores in pure internal LB case.
        Client manages local EngineCores in hybrid and external LB case.
        """
        return self.data_parallel_external_lb or self.data_parallel_hybrid_lb

    def get_next_dp_init_port(self) -> int:
        """
        We might need to initialize process groups in multiple
        processes that is related to data parallelism,
        e.g. both in the worker and in the engine, which
        can live in different processes. To avoid port conflicts, we
        pop a new port from the prepared port list each time we need to
        initialize a new process group related to data parallelism.
        """
        if self._data_parallel_master_port_list:
            answer = self._data_parallel_master_port_list.pop()
        else:
            answer = self.data_parallel_master_port
            self.data_parallel_master_port += 1

        return answer

    def _pick_stateless_dp_port(self) -> tuple[int, socket.socket | None]:
        """Return ``(port, listen_socket)`` for DP group init.

        With a coord store, rank 0 binds a socket and publishes the port;
        others read it.  Without one, pops a pre-allocated port and
        returns ``listen_socket=None``.
        """
        if not self._coord_store_port:
            return self.get_next_dp_init_port(), None

        from vllm.distributed.utils import get_cached_tcp_store_client

        store = get_cached_tcp_store_client(
            self.data_parallel_master_ip, self._coord_store_port
        )

        key = "dp_master_port"
        if self.data_parallel_rank == 0:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.bind((self.data_parallel_master_ip, 0))
            s.listen()
            port = s.getsockname()[1]
            store.set(key, str(port).encode())
            return port, s
        else:
            return int(store.get(key).decode()), None

    @overload
    def stateless_init_dp_group(
        self, return_store: Literal[False] = ...
    ) -> ProcessGroup: ...
    @overload
    def stateless_init_dp_group(
        self, return_store: Literal[True] = ...
    ) -> tuple[ProcessGroup, Store]: ...
    def stateless_init_dp_group(
        self, return_store: bool = False
    ) -> ProcessGroup | tuple[ProcessGroup, Store]:
        # NOTE: In high-concurrency scenarios multiple processes
        # can pick the same (currently free) port through a race
        # condition when calling `get_open_port()`. When the first
        # process binds the port the others will subsequently fail
        # with `torch.distributed.DistNetworkError: EADDRINUSE`.
        # To make the initialization more robust we retry a few times
        # with a fresh port whenever this specific error is observed.
        from torch.distributed import DistNetworkError

        from vllm.distributed.utils import (
            stateless_init_torch_distributed_process_group,
        )

        max_retries = 5
        last_exc: Exception | None = None
        for _ in range(max_retries):
            try:
                port, listen_socket = self._pick_stateless_dp_port()
                # use gloo since the engine process might not have cuda device
                return stateless_init_torch_distributed_process_group(
                    self.data_parallel_master_ip,
                    port,
                    self.data_parallel_rank,
                    self.data_parallel_size,
                    backend="gloo",
                    return_store=return_store,
                    listen_socket=listen_socket,
                )
            except DistNetworkError as e:
                # We only want to retry when the root cause is EADDRINUSE.
                if "EADDRINUSE" in str(e):
                    logger.warning("Address already in use. Retrying with a new port.")
                    last_exc = e
                    continue  # try again with a new port
                raise e

        # If we get here all retries have failed.
        assert last_exc is not None
        raise last_exc

    # The all_reduce at the end of attention (during o_proj) means that
    # inputs are replicated across each rank of the tensor parallel group.
    # If using expert-parallelism with DeepEP All2All ops, replicated
    # tokens results in useless duplicate computation and communication.
    #
    # In this case, ensure the input to the experts is sequence parallel
    # to avoid the excess work.
    #
    @property
    def use_sequence_parallel_moe(self) -> bool:
        return (
            self.all2all_backend
            in (
                "allgather_reducescatter",
                "deepep_high_throughput",
                "deepep_low_latency",
                "mori_high_throughput",
                "mori_low_latency",
                "nixl_ep",
            )
            and self.enable_expert_parallel
            and self.tensor_parallel_size > 1
            and self.data_parallel_size > 1
        )

    @property
    def use_batched_dp_moe(self) -> bool:
        return (
            self.all2all_backend
            in (
                "deepep_low_latency",
                "nixl_ep",
            )
            and self.enable_expert_parallel
            and self.data_parallel_size > 1
        )

    @property
    def node_rank_within_dp(self) -> int:
        return self.node_rank % self.nnodes_within_dp

    @property
    def nnodes_within_dp(self) -> int:
        if self.nnodes == 1:
            return 1
        data_parallel_node_size = (
            self.data_parallel_size // self.data_parallel_size_local
        )
        return self.nnodes // data_parallel_node_size

    @property
    def local_world_size(self) -> int:
        return self.world_size // self.nnodes_within_dp

    @staticmethod
    def has_unfinished_dp(dp_group: ProcessGroup, has_unfinished: bool) -> bool:
        tensor = torch.tensor([has_unfinished], dtype=torch.int32, device="cpu")
        # dp rank 0: has_unfinished_seqs=True
        # dp rank 1: has_unfinished_seqs=False
        # aggregated: has_unfinished_seqs=True
        # so this is an OR operation, i.e. MAX in integers
        torch.distributed.all_reduce(tensor, op=ReduceOp.MAX, group=dp_group)
        aggregated_has_unfinished = bool(tensor.item())
        return aggregated_has_unfinished

    @staticmethod
    def sync_dp_state(
        dp_group: ProcessGroup, has_unfinished: bool, pending_pause: bool
    ) -> tuple[bool, bool]:
        """Combined all-reduce for DP state synchronization.

        Uses a single SUM all-reduce on a 2-element tensor:
          [0] = 1 if this rank has unfinished work, else 0.
                SUM > 0 ≡ logical OR across ranks → any rank has work.
          [1] = 1 if this rank has a pending pause request, else 0.
                SUM == dp_size ≡ all ranks reached pause consensus.

        has_unfinished_global is true if any rank has unfinished work,
        or if some ranks are waiting for a pause consensus.

        Returns:
            (has_unfinished_global, pause_consensus)
        """
        tensor = torch.tensor(
            [int(has_unfinished), int(pending_pause)], dtype=torch.int32, device="cpu"
        )
        torch.distributed.all_reduce(tensor, op=ReduceOp.SUM, group=dp_group)
        dp_size = dp_group.size()
        pause_count = tensor[1].item()
        has_unfinished_global = tensor[0].item() > 0 or pause_count % dp_size != 0
        return has_unfinished_global, pause_count == dp_size

    @staticmethod
    def sync_kv_cache_memory_size(dp_group: ProcessGroup, kv_cache_memory: int) -> int:
        if kv_cache_memory == -1:
            kv_cache_memory = torch.iinfo(torch.int64).max
        tensor = torch.tensor([kv_cache_memory], dtype=torch.int64, device="cpu")
        # we cannot use broadcast for stateless dp group since it depends
        # on global rank
        torch.distributed.all_reduce(tensor, op=ReduceOp.MIN, group=dp_group)
        return tensor.item()

    def compute_hash(self):
        """
        Provide a hash that uniquely identifies all the configs
        that affect the structure of the computation
        graph from input ids/embeddings to the final hidden states,
        excluding anything before input ids/embeddings and after
        the final hidden states.

        This hash is also used for DP worker configuration validation
        to prevent hangs from mismatched collective communication patterns.
        """
        ignored_factors = {
            # Derived/runtime topology, networking, or launch details
            "data_parallel_rank",
            "data_parallel_rank_local",
            "data_parallel_size_local",
            "data_parallel_index",
            "data_parallel_backend",
            "data_parallel_external_lb",
            "data_parallel_hybrid_lb",
            "data_parallel_master_ip",
            "data_parallel_master_port",
            "_data_parallel_master_port_list",
            "data_parallel_rpc_port",
            "rank",
            "master_addr",
            "master_port",
            "node_rank",
            "nnodes",
            "max_parallel_loading_workers",
            "disable_custom_all_reduce",
            "ray_workers_use_nsight",
            "ray_runtime_env",
            "placement_group",
            "distributed_executor_backend",
            "worker_cls",
            "sd_worker_cls",
            "worker_extension_cls",
            "_api_process_count",
            "_api_process_rank",
            # NUMA binding is per-rank host-side memory locality; it does
            # not affect collective-communication semantics. When numa_bind
            # is enabled with auto-detection, each DP rank stores its own
            # NUMA node in numa_bind_nodes (see vllm/utils/numa_utils.py
            # `_get_numa_node`), which would otherwise diverge the DP hash.
            "numa_bind",
            "numa_bind_nodes",
            "numa_bind_cpus",
            "assigned_physical_gpu_ids",
        }

        from vllm.config.utils import get_hash_factors, hash_factors

        factors = get_hash_factors(self, ignored_factors)
        return hash_factors(factors)

    def __post_init__(self) -> None:
        # Continue with the rest of the initialization
        self.world_size = (
            self.pipeline_parallel_size
            * self.tensor_parallel_size
            * self.prefill_context_parallel_size
        )

        if self.distributed_executor_backend == "external_launcher":
            logger.info("Using external launcher for distributed inference.")
            self.world_size *= self.data_parallel_size

        if self.enable_elastic_ep:
            if not self.enable_eplb:
                raise ValueError("Elastic EP is only supported with enable_eplb=True.")
            if self.pipeline_parallel_size > 1:
                raise ValueError(
                    "Elastic EP is not supported with pipeline parallelism "
                    f"(pipeline_parallel_size={self.pipeline_parallel_size})."
                )
            if self.data_parallel_external_lb or self.data_parallel_hybrid_lb:
                raise NotImplementedError(
                    "Elastic EP is not compatible with data_parallel_external_lb "
                    "or data_parallel_hybrid_lb. Elastic EP relies on a single API "
                    "server and core client to coordinate scale up/down."
                )
            if self.eplb_config.use_async:
                from vllm.distributed.nixl_utils import is_nixl_available

                if not is_nixl_available():
                    raise ValueError(
                        "Elastic EP with async EPLB requires the NIXL "
                        "package. Either install NIXL or set "
                        "--eplb-config.use_async=false."
                    )

        if self.data_parallel_size > 1 or self.data_parallel_size_local == 0:
            # Data parallel was specified in the engine args.
            if self.distributed_executor_backend == "external_launcher":
                # For external launcher,
                # we need to set the data parallel rank automatically
                self.data_parallel_rank = int(os.environ["RANK"]) // (
                    self.world_size // self.data_parallel_size
                )
                logger.info(
                    "Set data_parallel_rank to %d automatically.",
                    self.data_parallel_rank,
                )
            if not self.enable_elastic_ep:
                if not self._data_parallel_master_port_list:
                    self._data_parallel_master_port_list = get_open_ports_list(5)
                self.data_parallel_master_port = (
                    self._data_parallel_master_port_list.pop()
                )

            if not (0 <= self.data_parallel_rank < self.data_parallel_size):
                raise ValueError(
                    f"data_parallel_rank ({self.data_parallel_rank})"
                    f" must be in the range [0, {self.data_parallel_size})"
                )
        else:
            # Otherwise fall back to env vars (e.g. for offline SPMD case).
            self.data_parallel_size = envs.VLLM_DP_SIZE
            self.data_parallel_rank = envs.VLLM_DP_RANK
            self.data_parallel_rank_local = envs.VLLM_DP_RANK_LOCAL
            self.data_parallel_master_ip = envs.VLLM_DP_MASTER_IP
            self.data_parallel_master_port = envs.VLLM_DP_MASTER_PORT

            if self.data_parallel_size > 1 and self.is_moe_model is False:
                raise ValueError(
                    "Offline data parallel mode is not supported/useful"
                    " for dense models."
                )

        self.data_parallel_index = self.data_parallel_rank

        if self.distributed_executor_backend == "external_launcher":
            os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0"
            logger.info("Disabling V1 multiprocessing for external launcher.")

        if self.distributed_executor_backend is None and self.world_size_across_dp > 1:
            # We use multiprocessing by default if world_size fits on the
            # current node and we aren't in a ray placement group.

            from vllm.v1.executor import ray_utils

            backend: DistributedExecutorBackend = "mp"
            ray_found = ray_utils.ray_is_available()
            if current_platform.is_tpu() and envs.VLLM_XLA_USE_SPMD:
                backend = "uni"
            elif current_platform.is_cuda() and self.nnodes > 1:
                backend = "mp"
            elif (
                current_platform.is_cuda()
                and current_platform.device_count() < self.world_size
            ):
                gpu_count = current_platform.device_count()
                raise ValueError(
                    f"World size ({self.world_size}) is larger than the number of "
                    f"available GPUs ({gpu_count}) in this node. If this is "
                    "intentional and you are using:\n"
                    "- ray, set '--distributed-executor-backend ray'.\n"
                    "- multiprocessing, set '--nnodes' appropriately."
                )
            elif self.data_parallel_backend == "ray":
                logger.info(
                    "Using ray distributed inference because "
                    "data_parallel_backend is ray"
                )
                backend = "ray"
            elif ray_found:
                if self.placement_group:
                    backend = "ray"
                else:
                    from ray import is_initialized as ray_is_initialized

                    if ray_is_initialized():
                        from ray.util import get_current_placement_group

                        if get_current_placement_group():
                            backend = "ray"
            self.distributed_executor_backend = backend
            logger.debug("Defaulting to use %s for distributed inference", backend)

        if self.distributed_executor_backend is None and self.world_size == 1:
            self.distributed_executor_backend = "uni"

        if self.max_parallel_loading_workers is not None:
            logger.warning(
                "max_parallel_loading_workers is currently "
                "not supported and will be ignored."
            )
        allowed_backends = ("mp", "uni", "external_launcher")
        if (
            self.distributed_executor_backend not in allowed_backends
            and self.nnodes > 1
        ):
            raise ValueError(
                "nnodes > 1 can only be set when distributed executor "
                "backend is mp, uni or external_launcher."
            )

        if self.enable_eplb and self.eplb_config.communicator is None:
            # Prefer NIXL when available: zero-copy RDMA reads, compatible
            # with both async EPLB and elastic EP (deferred remote setup).
            # Fallbacks: pynccl for elastic EP (stateless groups need it),
            # torch_gloo for static EP.  torch_nccl is avoided because NCCL
            # is incompatible with async EPLB (multi-stream conflicts) and
            # batched isend/irecv hangs under high load.
            # See https://github.com/pytorch/pytorch/issues/174288
            from vllm.distributed.nixl_utils import is_nixl_available

            if is_nixl_available():
                self.eplb_config.communicator = "nixl"
            elif self.enable_elastic_ep:
                self.eplb_config.communicator = "pynccl"
            else:
                self.eplb_config.communicator = "torch_gloo"

    @property
    def use_ray(self) -> bool:
        return self.distributed_executor_backend == "ray" or (
            isinstance(self.distributed_executor_backend, type)
            and getattr(self.distributed_executor_backend, "uses_ray", False)
        )

    @model_validator(mode="after")
    def _verify_args(self) -> Self:
        # Lazy import to avoid circular import
        from vllm.v1.executor import Executor

        # Enable batch invariance settings if requested
        if envs.VLLM_BATCH_INVARIANT:
            self.disable_custom_all_reduce = True

        if (
            self.distributed_executor_backend is not None
            and not isinstance(self.distributed_executor_backend, str)
            and not (
                isinstance(self.distributed_executor_backend, type)
                and issubclass(self.distributed_executor_backend, Executor)
            )
        ):
            raise ValueError(
                "Unrecognized distributed executor backend "
                f"{self.distributed_executor_backend}. Supported "
                "values are 'ray', 'mp' 'uni', 'external_launcher', "
                " custom Executor subclass or its import path."
            )
        if self.use_ray:
            from vllm.v1.executor import ray_utils

            ray_utils.assert_ray_available()

        if not current_platform.use_custom_allreduce():
            self.disable_custom_all_reduce = True
            logger.debug(
                "Disabled the custom all-reduce kernel because it is not "
                "supported on current platform."
            )
        if self.nnodes > 1:
            self.disable_custom_all_reduce = True
            logger.debug(
                "Disabled the custom all-reduce since we are running on multi-node."
            )
        if self.ray_workers_use_nsight and not self.use_ray:
            raise ValueError(
                "Unable to use nsight profiling unless workers run with Ray."
            )

        return self

_api_process_count = Field(default=1, gt=0) class-attribute instance-attribute

The number of API processes initialized.

Note

This is an internal config that is only valid for and should only be set by API server scale-out.

_api_process_rank = Field(default=0, ge=(-1)) class-attribute instance-attribute

The rank of this API process, or -1 for engine core processes under API server scale-out.

Note

This is an internal config that is only valid for and should only be set by API server scale-out.

_coord_store_port = 0 class-attribute instance-attribute

Port of the coordination TCPStore. Can be set by the API server; workers connect as clients to exchange self-picked group ports at runtime.

_data_parallel_master_port_list = Field(default_factory=list) class-attribute instance-attribute

List of open port auto-queried for data parallel messaging. Set to be private as it's not intended to be configured by users.

all2all_backend = 'allgather_reducescatter' class-attribute instance-attribute

All2All backend for MoE expert parallel communication. Available options:

  • "allgather_reducescatter": All2all based on allgather and reducescatter
  • "deepep_high_throughput": Use deepep high-throughput kernels
  • "deepep_low_latency": Use deepep low-latency kernels
  • "mori_high_throughput": MoRI EP with InterNodeV1 for multi-node
  • "mori_low_latency": MoRI EP with InterNodeV1LL for multi-node
  • "nixl_ep": Use nixl-ep kernels
  • "flashinfer_nvlink_two_sided": Use flashinfer two-sided kernels for mnnvl
  • "flashinfer_nvlink_one_sided": Use flashinfer high-throughput a2a kernels

assigned_physical_gpu_ids = None class-attribute instance-attribute

Mapping from vLLM-local logical GPU IDs to physical GPU IDs.

For example, [2, 3] means logical GPU 0 maps to physical GPU 2, and logical GPU 1 maps to physical GPU 3. Physical IDs are used only at platform/topology boundaries such as NVML, NIC affinity, P2P checks, and final CUDA device selection when needed. When None, logical IDs map to visible device IDs in order.

cp_kv_cache_interleave_size = 1 class-attribute instance-attribute

Interleave size of kv_cache storage while using DCP or PCP. For total_cp_rank = pcp_rank * dcp_world_size + dcp_rank, and total_cp_world_size = pcp_world_size * dcp_world_size. store interleave_size tokens on total_cp_rank i, then store next interleave_size tokens on total_cp_rank i+1. Interleave_size=1: token-level alignment, where token i is stored on total_cp_rank i % total_cp_world_size. Interleave_size=block_size: block-level alignment, where tokens are first populated to the preceding ranks. Tokens are then stored in (rank i+1, block j) only after (rank i, block j) is fully occupied. Block_size should be greater than or equal to cp_kv_cache_interleave_size. Block_size should be divisible by cp_kv_cache_interleave_size.

cpu_distributed_timeout_seconds = None class-attribute instance-attribute

Timeout (in seconds) for cpu communication groups. If None, PyTorch's default timeout is used (1800s for gloo).

data_parallel_backend = 'mp' class-attribute instance-attribute

Backend to use for data parallel, either "mp" or "ray".

data_parallel_external_lb = False class-attribute instance-attribute

Whether to use "external" DP LB mode. Applies only to online serving and when data_parallel_size > 0. This is useful for a "one-pod-per-rank" wide-EP setup in Kubernetes. Supported only for MoE deployments; non-MoE models should use independent vLLM instances without --data-parallel-* arguments. Set implicitly when --data-parallel-rank is provided explicitly to vllm serve.

data_parallel_hybrid_lb = False class-attribute instance-attribute

Whether to use "hybrid" DP LB mode. Applies only to online serving and when data_parallel_size > 0. Enables running an AsyncLLM and API server on a "per-node" basis where vLLM load balances between local data parallel ranks, but an external LB balances between vLLM nodes/replicas. Set explicitly in conjunction with --data-parallel-start-rank.

data_parallel_index = Field(init=False) class-attribute instance-attribute

Equal to the data parallel rank but not used for torch process groups and not overridden for dense models.

data_parallel_master_ip = '127.0.0.1' class-attribute instance-attribute

IP of the data parallel master.

data_parallel_master_port = 29500 class-attribute instance-attribute

Port of the data parallel master.

data_parallel_rank = Field(default=0, ge=0) class-attribute instance-attribute

Rank of the data parallel group. The runtime check at __post_init__ further bounds this by data_parallel_size.

data_parallel_rank_local = None class-attribute instance-attribute

Local rank of the data parallel group, set only in SPMD mode.

data_parallel_rpc_port = 29550 class-attribute instance-attribute

Port for data parallel messaging.

data_parallel_size = Field(default=1, ge=1) class-attribute instance-attribute

Number of data parallel groups. MoE layers will be sharded according to the product of the tensor parallel size and data parallel size.

data_parallel_size_local = Field(default=1, ge=0) class-attribute instance-attribute

Number of local data parallel groups. A value of 0 is a sentinel used by the engine-args layer to signal that data parallelism was specified externally (see ParallelConfig.__post_init__).

dbo_decode_token_threshold = Field(default=32, ge=0) class-attribute instance-attribute

The threshold for dual batch overlap for batches only containing decodes. If the number of tokens in the request is greater than this threshold, microbatching will be used. Otherwise, the request will be processed in a single batch.

dbo_prefill_token_threshold = Field(default=512, ge=0) class-attribute instance-attribute

The threshold for dual batch overlap for batches that contain one or more prefills. If the number of tokens in the request is greater than this threshold, microbatching will be used. Otherwise, the request will be processed in a single batch.

dcp_comm_backend = 'ag_rs' class-attribute instance-attribute

Communication backend for Decode Context Parallel (DCP). - "ag_rs": AllGather + ReduceScatter (default, existing behavior) - "a2a": All-to-All exchange of partial outputs + LSE, then combine with Triton kernel. Reduces NCCL calls from 3 to 2 per layer for MLA models.

dcp_kv_cache_interleave_size = 1 class-attribute instance-attribute

Interleave size of kv_cache storage while using DCP. dcp_kv_cache_interleave_size has been replaced by cp_kv_cache_interleave_size, and will be deprecated when PCP is fully supported.

decode_context_parallel_size = Field(default=1, ge=1) class-attribute instance-attribute

Number of decode context parallel groups, because the world size does not change by dcp, it simply reuse the GPUs of TP group, and tp_size needs to be divisible by dcp_size.

disable_custom_all_reduce = False class-attribute instance-attribute

Disable the custom all-reduce kernel and fall back to NCCL.

disable_nccl_for_dp_synchronization = None class-attribute instance-attribute

Forces the dp synchronization logic in vllm/v1/worker/dp_utils.py to use Gloo instead of NCCL for its all reduce.

Defaults to True when async scheduling is enabled, False otherwise.

distributed_executor_backend = None class-attribute instance-attribute

Backend to use for distributed model workers, either "ray" or "mp" (multiprocessing). If the product of pipeline_parallel_size and tensor_parallel_size is less than or equal to the number of GPUs available, "mp" will be used to keep processing on a single host. Otherwise, an error will be raised. To use "mp" you must also set nnodes, and to use "ray" you must manually set distributed_executor_backend to "ray".

Note

TPU platform only supports Ray for distributed inference.

distributed_timeout_seconds = None class-attribute instance-attribute

Timeout in seconds for distributed operations (e.g., init_process_group). If set, this value is passed to torch.distributed.init_process_group as the timeout parameter. If None, PyTorch's default timeout is used (600s for NCCL). Increase this for multi-node setups where model downloads may be slow.

enable_dbo = False class-attribute instance-attribute

Enable dual batch overlap for the model executor.

enable_elastic_ep = False class-attribute instance-attribute

Enable elastic expert parallelism with stateless NCCL groups for DP/EP.

enable_ep_weight_filter = False class-attribute instance-attribute

Skip non-local expert weights during model loading when expert parallelism is active. Each rank only reads its own expert shard from disk, which can drastically reduce storage I/O for MoE models with per-expert weight tensors (e.g. DeepSeek, Mixtral, Kimi-K2.5). Has no effect on 3D fused-expert checkpoints (e.g. GPT-OSS) or non-MoE models.

enable_eplb = False class-attribute instance-attribute

Enable expert parallelism load balancing for MoE layers.

enable_expert_parallel = False class-attribute instance-attribute

Use expert parallelism instead of tensor parallelism for MoE layers.

eplb_config = Field(default_factory=EPLBConfig) class-attribute instance-attribute

Expert parallelism configuration.

expert_placement_strategy = 'linear' class-attribute instance-attribute

The expert placement strategy for MoE layers:

  • "linear": Experts are placed in a contiguous manner. For example, with 4 experts and 2 ranks, rank 0 will have experts [0, 1] and rank 1 will have experts [2, 3].
  • "round_robin": Experts are placed in a round-robin manner. For example, with 4 experts and 2 ranks, rank 0 will have experts [0, 2] and rank 1 will have experts [1, 3]. This strategy can help improve load balancing for grouped expert models with no redundant experts.

is_moe_model = None class-attribute instance-attribute

Whether the deployed model is MoE (if known).

local_engines_only property

Client manages local+remote EngineCores in pure internal LB case. Client manages local EngineCores in hybrid and external LB case.

master_addr = '127.0.0.1' class-attribute instance-attribute

distributed master address for multi-node distributed inference when distributed_executor_backend is mp.

master_port = 29501 class-attribute instance-attribute

distributed master port for multi-node distributed inference when distributed_executor_backend is mp.

max_parallel_loading_workers = Field(default=None, ge=1) class-attribute instance-attribute

Maximum number of parallel loading workers when loading model sequentially in multiple batches. To avoid RAM OOM when using tensor parallel and large models.

nnodes = Field(default=1, ge=1) class-attribute instance-attribute

num of nodes for multi-node distributed inference when distributed_executor_backend is mp.

node_rank = Field(default=0, ge=0) class-attribute instance-attribute

distributed node rank for multi-node distributed inference when distributed_executor_backend is mp.

numa_bind = False class-attribute instance-attribute

Enable NUMA binding for GPU worker subprocesses.

By default, workers are pinned to their GPU's NUMA-local CPUs and memory; on PCT-capable Xeons they also auto-bind to the SKU's PCT priority cores.

numa_bind_cpus = None class-attribute instance-attribute

Optional CPU lists to bind each GPU worker to.

Specify one CPU list per visible GPU, for example ["0-3", "4-7", "8-11", "12-15"]. When set, vLLM uses numactl --physcpubind instead of --cpunodebind. This is useful for custom policies such as binding to PCT or other high-frequency cores. Each entry must use numactl --physcpubind CPU-list syntax, for example "0-3" or "0,2,4-7".

numa_bind_nodes = None class-attribute instance-attribute

NUMA node to bind each GPU worker to.

Specify one NUMA node per visible GPU, for example [0, 0, 1, 1] for a 4-GPU system with GPUs 0-1 on NUMA node 0 and GPUs 2-3 on NUMA node 1. If unset and numa_bind=True, vLLM auto-detects the GPU-to-NUMA topology. The values are passed to numactl --membind and --cpunodebind, so they must be valid numactl NUMA node indices.

pipeline_parallel_size = Field(default=1, ge=1) class-attribute instance-attribute

Number of pipeline parallel groups.

placement_group = None class-attribute instance-attribute

ray distributed model workers placement group.

prefill_context_parallel_size = Field(default=1, ge=1) class-attribute instance-attribute

Number of prefill context parallel groups.

rank = 0 class-attribute instance-attribute

Global rank in distributed setup.

ray_runtime_env = None class-attribute instance-attribute

Ray runtime environment to pass to distributed workers.

ray_workers_use_nsight = False class-attribute instance-attribute

Whether to profile Ray workers with nsight, see https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html#profiling-nsight-profiler.

sd_worker_cls = 'auto' class-attribute instance-attribute

The full name of the worker class to use for speculative decoding. If "auto", the worker class will be determined based on the platform.

tensor_parallel_size = Field(default=1, ge=1) class-attribute instance-attribute

Number of tensor parallel groups.

ubatch_size = Field(default=0, ge=0) class-attribute instance-attribute

Number of ubatch size.

worker_cls = 'auto' class-attribute instance-attribute

The full name of the worker class to use. If "auto", the worker class will be determined based on the platform.

worker_extension_cls = '' class-attribute instance-attribute

The full name of the worker extension class to use. The worker extension class is dynamically inherited by the worker class. This is used to inject new attributes and methods to the worker class for use in collective_rpc calls.

world_size = Field(init=False) class-attribute instance-attribute

world_size is TPxPP, it affects the number of workers we create.

world_size_across_dp property

world_size_across_dp is TPxPPxDP, it is the size of the world including data parallelism.

_pick_stateless_dp_port()

Return (port, listen_socket) for DP group init.

With a coord store, rank 0 binds a socket and publishes the port; others read it. Without one, pops a pre-allocated port and returns listen_socket=None.

Source code in vllm/config/parallel.py
def _pick_stateless_dp_port(self) -> tuple[int, socket.socket | None]:
    """Return ``(port, listen_socket)`` for DP group init.

    With a coord store, rank 0 binds a socket and publishes the port;
    others read it.  Without one, pops a pre-allocated port and
    returns ``listen_socket=None``.
    """
    if not self._coord_store_port:
        return self.get_next_dp_init_port(), None

    from vllm.distributed.utils import get_cached_tcp_store_client

    store = get_cached_tcp_store_client(
        self.data_parallel_master_ip, self._coord_store_port
    )

    key = "dp_master_port"
    if self.data_parallel_rank == 0:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind((self.data_parallel_master_ip, 0))
        s.listen()
        port = s.getsockname()[1]
        store.set(key, str(port).encode())
        return port, s
    else:
        return int(store.get(key).decode()), None

_skip_none_validation(value, handler) classmethod

Skip validation if the value is None when initialisation is delayed.

Source code in vllm/config/parallel.py
@field_validator("disable_nccl_for_dp_synchronization", mode="wrap")
@classmethod
def _skip_none_validation(cls, value: Any, handler: Callable) -> Any:
    """Skip validation if the value is `None` when initialisation is delayed."""
    return None if value is None else handler(value)

compute_hash()

Provide a hash that uniquely identifies all the configs that affect the structure of the computation graph from input ids/embeddings to the final hidden states, excluding anything before input ids/embeddings and after the final hidden states.

This hash is also used for DP worker configuration validation to prevent hangs from mismatched collective communication patterns.

Source code in vllm/config/parallel.py
def compute_hash(self):
    """
    Provide a hash that uniquely identifies all the configs
    that affect the structure of the computation
    graph from input ids/embeddings to the final hidden states,
    excluding anything before input ids/embeddings and after
    the final hidden states.

    This hash is also used for DP worker configuration validation
    to prevent hangs from mismatched collective communication patterns.
    """
    ignored_factors = {
        # Derived/runtime topology, networking, or launch details
        "data_parallel_rank",
        "data_parallel_rank_local",
        "data_parallel_size_local",
        "data_parallel_index",
        "data_parallel_backend",
        "data_parallel_external_lb",
        "data_parallel_hybrid_lb",
        "data_parallel_master_ip",
        "data_parallel_master_port",
        "_data_parallel_master_port_list",
        "data_parallel_rpc_port",
        "rank",
        "master_addr",
        "master_port",
        "node_rank",
        "nnodes",
        "max_parallel_loading_workers",
        "disable_custom_all_reduce",
        "ray_workers_use_nsight",
        "ray_runtime_env",
        "placement_group",
        "distributed_executor_backend",
        "worker_cls",
        "sd_worker_cls",
        "worker_extension_cls",
        "_api_process_count",
        "_api_process_rank",
        # NUMA binding is per-rank host-side memory locality; it does
        # not affect collective-communication semantics. When numa_bind
        # is enabled with auto-detection, each DP rank stores its own
        # NUMA node in numa_bind_nodes (see vllm/utils/numa_utils.py
        # `_get_numa_node`), which would otherwise diverge the DP hash.
        "numa_bind",
        "numa_bind_nodes",
        "numa_bind_cpus",
        "assigned_physical_gpu_ids",
    }

    from vllm.config.utils import get_hash_factors, hash_factors

    factors = get_hash_factors(self, ignored_factors)
    return hash_factors(factors)

get_next_dp_init_port()

We might need to initialize process groups in multiple processes that is related to data parallelism, e.g. both in the worker and in the engine, which can live in different processes. To avoid port conflicts, we pop a new port from the prepared port list each time we need to initialize a new process group related to data parallelism.

Source code in vllm/config/parallel.py
def get_next_dp_init_port(self) -> int:
    """
    We might need to initialize process groups in multiple
    processes that is related to data parallelism,
    e.g. both in the worker and in the engine, which
    can live in different processes. To avoid port conflicts, we
    pop a new port from the prepared port list each time we need to
    initialize a new process group related to data parallelism.
    """
    if self._data_parallel_master_port_list:
        answer = self._data_parallel_master_port_list.pop()
    else:
        answer = self.data_parallel_master_port
        self.data_parallel_master_port += 1

    return answer

sync_dp_state(dp_group, has_unfinished, pending_pause) staticmethod

Combined all-reduce for DP state synchronization.

Uses a single SUM all-reduce on a 2-element tensor

[0] = 1 if this rank has unfinished work, else 0. SUM > 0 ≡ logical OR across ranks → any rank has work. [1] = 1 if this rank has a pending pause request, else 0. SUM == dp_size ≡ all ranks reached pause consensus.

has_unfinished_global is true if any rank has unfinished work, or if some ranks are waiting for a pause consensus.

Returns:

Source code in vllm/config/parallel.py
@staticmethod
def sync_dp_state(
    dp_group: ProcessGroup, has_unfinished: bool, pending_pause: bool
) -> tuple[bool, bool]:
    """Combined all-reduce for DP state synchronization.

    Uses a single SUM all-reduce on a 2-element tensor:
      [0] = 1 if this rank has unfinished work, else 0.
            SUM > 0 ≡ logical OR across ranks → any rank has work.
      [1] = 1 if this rank has a pending pause request, else 0.
            SUM == dp_size ≡ all ranks reached pause consensus.

    has_unfinished_global is true if any rank has unfinished work,
    or if some ranks are waiting for a pause consensus.

    Returns:
        (has_unfinished_global, pause_consensus)
    """
    tensor = torch.tensor(
        [int(has_unfinished), int(pending_pause)], dtype=torch.int32, device="cpu"
    )
    torch.distributed.all_reduce(tensor, op=ReduceOp.SUM, group=dp_group)
    dp_size = dp_group.size()
    pause_count = tensor[1].item()
    has_unfinished_global = tensor[0].item() > 0 or pause_count % dp_size != 0
    return has_unfinished_global, pause_count == dp_size