Skip to content

vllm.v1.engine.utils

Classes:

Functions:

CoreEngine

One per data parallel rank, used to track state during handshaking.

Source code in vllm/v1/engine/utils.py
class CoreEngine:
    """One per data parallel rank, used to track state during handshaking."""

    def __init__(self, index: int = 0, local: bool = True):
        self.local = local
        self.identity = index.to_bytes(2, "little")

        self.state = CoreEngineState.NEW

CoreEngineActorManager

Utility class to handle creation, readiness, and shutdown of core engine Ray actors used by the AsyncLLM and LLMEngine.

Different from CoreEngineProcManager, this class manages core engines for both local and remote nodes.

Methods:

Source code in vllm/v1/engine/utils.py
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
class CoreEngineActorManager:
    """
    Utility class to handle creation, readiness, and shutdown
    of core engine Ray actors used by the AsyncLLM and LLMEngine.

    Different from CoreEngineProcManager, this class manages
    core engines for both local and remote nodes.
    """

    def __init__(
        self,
        vllm_config: VllmConfig,
        addresses: EngineZmqAddresses,
        executor_class: type[Executor],
        log_stats: bool,
        placement_groups: list["PlacementGroup"] | None = None,
        local_dp_ranks: list[int] | None = None,
    ):
        import copy

        import ray
        from ray.runtime_env import RuntimeEnv
        from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

        from vllm.v1.engine.core import DPMoEEngineCoreActor, EngineCoreActor

        dp_size = vllm_config.parallel_config.data_parallel_size
        actor_class = (
            DPMoEEngineCoreActor
            if dp_size > 1 and vllm_config.model_config.is_moe
            else EngineCoreActor
        )

        self.local_engine_actors: list[ray.ActorHandle] = []
        self.remote_engine_actors: list[ray.ActorHandle] = []

        env_vars_list = get_env_vars_to_copy(
            destination=actor_class.__name__,
            exclude_vars=WORKER_SPECIFIC_ENV_VARS,
        )
        self.env_vars_dict = {
            name: os.environ[name] for name in env_vars_list if name in os.environ
        }
        runtime_env = RuntimeEnv(env_vars=self.env_vars_dict)

        self.addresses = addresses
        self.executor_class = executor_class
        self.log_stats = log_stats
        local_engine_count = vllm_config.parallel_config.data_parallel_size_local
        world_size = vllm_config.parallel_config.world_size
        self.manager_stopped = threading.Event()
        self.failed_proc_name: str | None = None

        if ray.is_initialized():
            logger.info("Ray is already initialized. Skipping Ray initialization.")
        else:
            ray.init()

        parallel_config = vllm_config.parallel_config
        if parallel_config.enable_elastic_ep:
            from vllm.distributed.utils import create_tcp_store

            ip = parallel_config.data_parallel_master_ip
            store = create_tcp_store(
                ip,
                0,
                is_master=True,
                world_size=-1,
                wait_for_workers=False,
            )
            parallel_config._coord_store_port = store.port
            self._coord_store = store

        if placement_groups is not None:
            assert local_dp_ranks is not None, (
                "local_dp_ranks must be provided if placement_groups is provided"
            )
            assert len(placement_groups) == len(local_dp_ranks), (
                "placement_groups and local_dp_ranks must have the same length"
            )
            logger.info("Using provided placement groups")
            # TODO(rui): validate passed-in placement groups
            self.created_placement_groups = []
        else:
            placement_groups, local_dp_ranks = (
                CoreEngineActorManager.create_dp_placement_groups(vllm_config)
            )
            self.created_placement_groups = placement_groups
        assert len(placement_groups) == dp_size, (
            "Number of placement groups must match data parallel size"
        )

        self.placement_group_is_local = []
        refs = []
        for index, local_index, pg in zip(
            range(dp_size), local_dp_ranks, placement_groups
        ):
            dp_vllm_config = copy.deepcopy(vllm_config)
            if dp_size > 1:
                _apply_dp_identity_suffix(dp_vllm_config, index)
            dp_vllm_config.parallel_config.placement_group = pg
            local_client = index < local_engine_count

            # Ray XPU known issue: dpctl initializes the GPU runtime early, so
            # setting device env vars in Ray actor's initialization method
            # will not affect device selection. See:
            # https://github.com/ray-project/ray/blob/master/python/ray/_private/accelerators/intel_gpu.py#L56 # noqa: E501
            if current_platform.is_xpu():
                device_evar = current_platform.device_control_env_var
                physical_gpu_ids = get_physical_gpu_ids_for_local_dp_rank(
                    device_evar, local_index, world_size
                )
                actor_env_vars = self.env_vars_dict.copy()
                actor_env_vars[device_evar] = ",".join(str(d) for d in physical_gpu_ids)
                runtime_env = RuntimeEnv(env_vars=actor_env_vars)

            actor = (
                ray.remote(actor_class)
                .options(
                    scheduling_strategy=PlacementGroupSchedulingStrategy(
                        placement_group=pg,
                        placement_group_bundle_index=world_size,
                    ),
                    runtime_env=runtime_env,
                )
                .remote(
                    vllm_config=dp_vllm_config,
                    executor_class=executor_class,
                    log_stats=log_stats,
                    local_client=local_client,
                    addresses=addresses,
                    dp_rank=index,
                    local_dp_rank=local_index,
                )
            )
            if local_client:
                self.local_engine_actors.append(actor)
            else:
                self.remote_engine_actors.append(actor)
            self.placement_group_is_local.append(local_client)
            refs.append(actor.wait_for_init.remote())

        ray.get(refs)
        self.run_refs = []
        self.actor_run_ref_dict = dict()
        for actor in self.local_engine_actors + self.remote_engine_actors:
            ref = actor.run.remote()
            self.run_refs.append(ref)
            self.actor_run_ref_dict[actor] = ref

    @staticmethod
    def create_dp_placement_groups(
        vllm_config: VllmConfig,
    ) -> tuple[list["PlacementGroup"], list[int]]:
        """
        Create placement groups for data parallel.
        """

        import ray
        from ray._private.state import available_resources_per_node

        logger.info("Creating placement groups for data parallel")
        dp_master_ip = vllm_config.parallel_config.data_parallel_master_ip
        dp_size = vllm_config.parallel_config.data_parallel_size
        dp_size_local = vllm_config.parallel_config.data_parallel_size_local

        available_resources = available_resources_per_node()
        world_size = vllm_config.parallel_config.world_size
        placement_groups: list[PlacementGroup] = []
        local_dp_ranks: list[int] = []

        dp_master_ip_key = f"node:{dp_master_ip}"
        nodes = sorted(
            available_resources.values(), key=lambda x: dp_master_ip_key not in x
        )
        assert len(nodes) > 0, "No nodes with resources found in Ray cluster."
        assert dp_master_ip_key in nodes[0], (
            f"The DP master node (ip: {dp_master_ip}) is missing or dead"
        )

        # optionally restrict DP placement to a caller-provided node set.
        requested_node_ips = {
            ip.strip()
            for ip in envs.VLLM_RAY_DP_PLACEMENT_NODE_IPS.split(",")
            if ip.strip()
        }
        if requested_node_ips:
            allowed_node_ips = set(requested_node_ips)
            # The master node must host the local ranks, so it has to be allowed.
            if dp_master_ip not in allowed_node_ips:
                allowed_node_ips.add(dp_master_ip)
            filtered_nodes = [
                node_resources
                for node_resources in nodes
                if _node_ip_from_resources(node_resources) in allowed_node_ips
            ]
            logger.info(
                "VLLM_RAY_DP_PLACEMENT_NODE_IPS set; restricting DP placement "
                "from %d to %d node(s): %s",
                len(nodes),
                len(filtered_nodes),
                sorted(allowed_node_ips),
            )
            nodes = filtered_nodes

        device_str = current_platform.ray_device_key
        n_node_devices: list[int] = [
            int(node_resources[device_str])
            for node_resources in nodes
            if device_str in node_resources
        ]
        assert n_node_devices, f"No {device_str} found in Ray cluster."
        max_device_per_node = max(n_node_devices)

        pack_strategy = envs.VLLM_RAY_DP_PACK_STRATEGY
        _supported_pack_strategies = ("strict", "fill", "span")
        if pack_strategy not in _supported_pack_strategies:
            raise ValueError(
                f"{envs.VLLM_RAY_DP_PACK_STRATEGY} is not supported. "
                "Make sure to set `VLLM_RAY_DP_PACK_STRATEGY` "
                f"to one of {_supported_pack_strategies}"
            )

        all2all_backend = vllm_config.parallel_config.all2all_backend
        if pack_strategy == "fill" and (
            all2all_backend == "deepep_high_throughput"
            or all2all_backend == "deepep_low_latency"
        ):
            raise ValueError(
                "DeepEP kernels require EP ranks [0,7] (same for [8,15], ...) "
                "to be on the same node, but VLLM_RAY_DP_PACK_STRATEGY=fill "
                "does not guarantee that. "
                "Please use VLLM_RAY_DP_PACK_STRATEGY=strict instead."
            )

        if pack_strategy in ("strict", "fill"):
            placement_strategy = "STRICT_PACK"
        else:
            placement_strategy = "PACK"
            assert world_size > max_device_per_node, (
                f"World size {world_size} is smaller than the "
                "maximum number of devices per node "
                f"{max_device_per_node}. Make sure to set "
                "`VLLM_RAY_DP_PACK_STRATEGY` to `strict` or `fill`"
            )

            # if we need multiple nodes per dp group, we require for now that
            # available nodes are homogeneous
            assert set(n_node_devices) == {max_device_per_node}, (
                f"Nodes are not homogeneous, {nodes}"
            )
            assert world_size % max_device_per_node == 0, (
                f"For multi-node data parallel groups, world_size ({world_size}) must "
                f"be a multiple of number of devices per node ({max_device_per_node})."
            )
            assert len(n_node_devices) * max_device_per_node >= world_size * dp_size, (
                f"Not enough total available nodes ({len(n_node_devices)}) "
                f"and devices per node ({max_device_per_node}) "
                f"to satisfy required world size {world_size} and data parallel size "
                f"{dp_size}"
            )
            assert dp_size_local == 1, (
                f"data-parallel-size-local {dp_size_local} should be set as the "
                "default (1) for VLLM_RAY_DP_PACK_STRATEGY=span. "
                "The actual data-parallel-size-local will be auto determined."
            )

        # bundles collected for a single DP rank from multiple nodes,
        # for "span" pack strategy
        collected_bundles = []
        for node_resources in nodes:
            node_ip = _node_ip_from_resources(node_resources)
            assert node_ip is not None, (
                f"No node IP key found in node resources: {node_resources}"
            )

            n_device_on_node = int(node_resources.get(device_str, 0))
            if pack_strategy == "span" and n_device_on_node != 0:
                # Strictly speaking,
                # dp_size_available = n_device_on_node / world_size
                # and is a fraction, but we use 1 for easier processing
                dp_size_available = 1
            else:
                dp_size_available = n_device_on_node // world_size

            if node_ip == dp_master_ip:
                if dp_size_available < dp_size_local:
                    raise ValueError(
                        f"Not enough resources to allocate {dp_size_local} DP ranks "
                        f"on DP master node {dp_master_ip}, possible to fit "
                        f"{dp_size_available} DP ranks."
                    )
                dp_size_to_allocate = dp_size_local
            elif pack_strategy == "strict":
                if dp_size_available < dp_size_local:
                    logger.info(
                        "Skipping node %s as %s DP ranks could not fit, "
                        "possible to fit %s DP ranks",
                        node_ip,
                        dp_size_local,
                        dp_size_available,
                    )
                    continue
                dp_size_to_allocate = dp_size_local
            else:
                # for "pack_strategy" in "fill" and "span"
                # we always take everything that's available
                dp_size_to_allocate = dp_size_available

            for i in range(dp_size_to_allocate):
                device_bundle = [{device_str: 1.0, "node:" + node_ip: 0.001}]
                if pack_strategy == "span":
                    collected_bundles += device_bundle * n_device_on_node
                    assert len(collected_bundles) <= world_size, (
                        "collected_bundles should be <= world_size, "
                        f"but got {len(collected_bundles)=} and {world_size=}"
                    )

                    # we only create a placement group if we collected enough devices
                    if len(collected_bundles) < world_size:
                        continue

                    control_node_ip = _get_bundle_node_ip(collected_bundles[0])
                    bundles = collected_bundles + [
                        _make_control_bundle(control_node_ip)
                    ]
                    collected_bundles = []
                else:
                    # STRICT_PACK already keeps every bundle in the placement
                    # group on one node, so the explicit node affinity on the
                    # control bundle is redundant for correctness here. Keep it
                    # anyway for consistency with the span path and to preserve
                    # intent if this scheduling strategy changes later.
                    bundles = device_bundle * world_size + [
                        _make_control_bundle(node_ip)
                    ]

                pg = ray.util.placement_group(
                    name=f"dp_rank_{len(placement_groups)}",
                    strategy=placement_strategy,
                    bundles=bundles,
                )
                placement_groups.append(pg)
                local_dp_ranks.append(i)
                if len(placement_groups) == dp_size:
                    break

            if len(placement_groups) == dp_size:
                break

        if len(placement_groups) < dp_size:
            raise ValueError(
                f"Not enough resources to allocate {dp_size} "
                "placement groups, only created "
                f"{len(placement_groups)} placement groups. "
                "Available resources: "
                f"{available_resources}"
            )
        assert len(placement_groups) == dp_size, (
            f"Created {len(placement_groups)} DP placement groups, expected {dp_size}"
        )
        assert len(local_dp_ranks) == dp_size, (
            f"local_dp_ranks length {len(local_dp_ranks)} does not match "
            f"expected {dp_size}"
        )
        return placement_groups, local_dp_ranks

    @staticmethod
    def add_dp_placement_groups(
        old_vllm_config: VllmConfig, new_data_parallel_size: int
    ) -> tuple[list["PlacementGroup"], list[int]]:
        """
        Add placement groups for new data parallel size.
        """
        import ray
        from ray._private.state import (
            available_resources_per_node,
            total_resources_per_node,
        )
        from ray.util.state import list_nodes

        old_dp_size = old_vllm_config.parallel_config.data_parallel_size
        num_pg_to_create = new_data_parallel_size - old_dp_size

        if num_pg_to_create <= 0:
            return [], []

        dp_master_ip = old_vllm_config.parallel_config.data_parallel_master_ip
        world_size = old_vllm_config.parallel_config.world_size

        nodes = list_nodes()
        nodes = sorted(nodes, key=lambda node: node.node_ip != dp_master_ip)
        assert nodes[0].node_ip == dp_master_ip, "The first node must be the head node"
        assert len(nodes) == 1 or nodes[1].node_ip != dp_master_ip, (
            "There can only be one head node"
        )

        available_resources = available_resources_per_node()
        total_resources = total_resources_per_node()

        placement_groups = []
        local_dp_ranks = []
        num_pg_created = 0

        device_str = current_platform.ray_device_key
        for node in nodes:
            if num_pg_created >= num_pg_to_create:
                break

            node_ip = node.node_ip
            node_id = node.node_id
            if device_str not in available_resources[node_id]:
                continue
            available_gpus = int(available_resources[node_id][device_str])

            # Get total GPUs on this node from the node's resources
            # Ray stores node resources with node ID as key
            total_gpus = int(total_resources[node_id][device_str])

            # Calculate used GPUs and used engines on this node
            used_gpus = max(0, total_gpus - available_gpus)
            used_engines_on_node = used_gpus // world_size

            # Calculate how many new engines this node can accommodate
            available_engine_count = available_gpus // world_size

            # Create placement groups for new engines on this node
            for i in range(available_engine_count):
                if num_pg_created >= num_pg_to_create:
                    break

                rank = old_dp_size + num_pg_created

                # Create bundles with node constraint for master node
                if node_ip == dp_master_ip:
                    bundles = [
                        {device_str: 1.0, "node:" + dp_master_ip: 0.001}
                    ] * world_size + [{"CPU": 1.0}]
                else:
                    bundles = [{device_str: 1.0}] * world_size + [{"CPU": 1.0}]

                pg = ray.util.placement_group(
                    name=f"dp_rank_{rank}",
                    strategy="STRICT_PACK",
                    bundles=bundles,
                )
                placement_groups.append(pg)

                # Local rank starts from the number of engines already used
                # on this node
                local_rank = used_engines_on_node + i
                local_dp_ranks.append(local_rank)
                num_pg_created += 1

        return placement_groups, local_dp_ranks

    def scale_up_elastic_ep(
        self, cur_vllm_config: VllmConfig, new_data_parallel_size: int
    ) -> None:
        import copy

        import ray
        from ray.runtime_env import RuntimeEnv
        from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

        from vllm.v1.engine.core import DPMoEEngineCoreActor, EngineCoreActor

        actor_class = (
            DPMoEEngineCoreActor
            if cur_vllm_config.model_config.is_moe
            else EngineCoreActor
        )

        cur_data_parallel_size = len(self.local_engine_actors) + len(
            self.remote_engine_actors
        )

        assert new_data_parallel_size > cur_data_parallel_size, (
            f"New data parallel size {new_data_parallel_size} must be greater "
            f"than current data parallel size {cur_data_parallel_size} "
            "for scale up"
        )

        placement_groups, local_dp_ranks = self.add_dp_placement_groups(
            cur_vllm_config, new_data_parallel_size
        )

        world_size = cur_vllm_config.parallel_config.world_size
        dp_master_ip = cur_vllm_config.parallel_config.data_parallel_master_ip
        new_local_engines = 0

        runtime_env = RuntimeEnv(
            env_vars=self.env_vars_dict | {"VLLM_ELASTIC_EP_SCALE_UP_LAUNCH": "1"}
        )
        for i, (pg, local_rank) in enumerate(zip(placement_groups, local_dp_ranks)):
            rank = cur_data_parallel_size + i
            dp_vllm_config = copy.deepcopy(cur_vllm_config)
            if new_data_parallel_size > 1:
                _apply_dp_identity_suffix(dp_vllm_config, rank)
            dp_vllm_config.parallel_config.data_parallel_size = new_data_parallel_size
            dp_vllm_config.parallel_config.placement_group = pg

            # Check if this placement group is on the head node
            local_client = any(
                bundle.get("node:" + dp_master_ip, 0) > 0 for bundle in pg.bundle_specs
            )

            if local_client:
                new_local_engines += 1
                # Update data_parallel_size_local
                dp_vllm_config.parallel_config.data_parallel_size_local = (
                    cur_vllm_config.parallel_config.data_parallel_size_local
                    + new_local_engines
                )

            actor = (
                ray.remote(actor_class)
                .options(
                    scheduling_strategy=PlacementGroupSchedulingStrategy(
                        placement_group=pg,
                        placement_group_bundle_index=world_size,
                    ),
                    runtime_env=runtime_env,
                )
                .remote(
                    vllm_config=dp_vllm_config,
                    executor_class=self.executor_class,
                    log_stats=self.log_stats,
                    local_client=local_client,
                    addresses=self.addresses,
                    dp_rank=rank,
                    local_dp_rank=local_rank,
                )
            )

            if local_client:
                self.local_engine_actors.append(actor)
            else:
                self.remote_engine_actors.append(actor)
            self.created_placement_groups.append(pg)
            self.placement_group_is_local.append(local_client)

        ray.get(
            [
                actor.wait_for_init.remote()
                for actor in (
                    self.local_engine_actors[-new_local_engines:]
                    if new_local_engines > 0
                    else []
                )
                + self.remote_engine_actors[
                    -(len(placement_groups) - new_local_engines) :
                ]
            ]
        )

        actors = (
            self.local_engine_actors[-new_local_engines:]
            if new_local_engines > 0
            else []
        ) + self.remote_engine_actors[-(len(placement_groups) - new_local_engines) :]

        for actor in actors:
            ref = actor.run.remote()
            self.run_refs.append(ref)
            self.actor_run_ref_dict[actor] = ref

        cur_vllm_config.parallel_config.data_parallel_size = new_data_parallel_size
        # Update old_vllm_config with new data_parallel_size_local if any new
        # local engines were added
        if new_local_engines > 0:
            cur_vllm_config.parallel_config.data_parallel_size_local += (
                new_local_engines
            )

    def scale_down_elastic_ep(
        self, cur_data_parallel_size: int, new_data_parallel_size: int
    ) -> None:
        import ray

        assert cur_data_parallel_size > new_data_parallel_size, (
            f"cur_data_parallel_size {cur_data_parallel_size} must be greater "
            f"than new_data_parallel_size {new_data_parallel_size} "
            "for scale down"
        )
        for _ in range(cur_data_parallel_size - new_data_parallel_size):
            pg = self.created_placement_groups.pop()
            is_local = self.placement_group_is_local.pop()
            if is_local:
                self.local_engine_actors.pop()
            else:
                self.remote_engine_actors.pop()
            ray.util.remove_placement_group(pg)

    def remove_run_refs_for_scale_down(self, removed_dp_size: int) -> None:
        if removed_dp_size <= 0:
            return
        flags = self.placement_group_is_local[-removed_dp_size:]
        li = len(self.local_engine_actors) - 1
        ri = len(self.remote_engine_actors) - 1
        for is_local in reversed(flags):
            if is_local:
                actor = self.local_engine_actors[li]
                li -= 1
            else:
                actor = self.remote_engine_actors[ri]
                ri -= 1
            ref = self.actor_run_ref_dict.pop(actor)
            self.run_refs.remove(ref)

    def get_run_refs(self):
        return self.run_refs

    def monitor_engine_liveness(self) -> None:
        import ray

        while not self.manager_stopped.is_set():
            actor_run_refs = list(self.get_run_refs())
            if not actor_run_refs:
                logger.info(
                    "There are no actors to monitor currently. "
                    "The monitoring function is about to terminate."
                )
                break
            actor_done_refs, _ = ray.wait(actor_run_refs, timeout=5)
            unexpected_failure = False
            for actor_ref in actor_done_refs:
                if self.manager_stopped.is_set():
                    break
                if actor_ref not in self.get_run_refs():
                    # The run refs may have been updated by elastic scale-down.
                    continue
                try:
                    ray.get(actor_ref)
                except ray.exceptions.RayActorError:
                    self.failed_proc_name = f"Actor {actor_ref}"
                    unexpected_failure = True

            if unexpected_failure:
                break

        self.shutdown()

    def shutdown(self, timeout: float | None = None) -> None:
        import ray

        self.manager_stopped.set()
        for actor in self.local_engine_actors + self.remote_engine_actors:
            ray.kill(actor)
        for pg in self.created_placement_groups:
            ray.util.remove_placement_group(pg)

add_dp_placement_groups(old_vllm_config, new_data_parallel_size) staticmethod

Add placement groups for new data parallel size.

Source code in vllm/v1/engine/utils.py
@staticmethod
def add_dp_placement_groups(
    old_vllm_config: VllmConfig, new_data_parallel_size: int
) -> tuple[list["PlacementGroup"], list[int]]:
    """
    Add placement groups for new data parallel size.
    """
    import ray
    from ray._private.state import (
        available_resources_per_node,
        total_resources_per_node,
    )
    from ray.util.state import list_nodes

    old_dp_size = old_vllm_config.parallel_config.data_parallel_size
    num_pg_to_create = new_data_parallel_size - old_dp_size

    if num_pg_to_create <= 0:
        return [], []

    dp_master_ip = old_vllm_config.parallel_config.data_parallel_master_ip
    world_size = old_vllm_config.parallel_config.world_size

    nodes = list_nodes()
    nodes = sorted(nodes, key=lambda node: node.node_ip != dp_master_ip)
    assert nodes[0].node_ip == dp_master_ip, "The first node must be the head node"
    assert len(nodes) == 1 or nodes[1].node_ip != dp_master_ip, (
        "There can only be one head node"
    )

    available_resources = available_resources_per_node()
    total_resources = total_resources_per_node()

    placement_groups = []
    local_dp_ranks = []
    num_pg_created = 0

    device_str = current_platform.ray_device_key
    for node in nodes:
        if num_pg_created >= num_pg_to_create:
            break

        node_ip = node.node_ip
        node_id = node.node_id
        if device_str not in available_resources[node_id]:
            continue
        available_gpus = int(available_resources[node_id][device_str])

        # Get total GPUs on this node from the node's resources
        # Ray stores node resources with node ID as key
        total_gpus = int(total_resources[node_id][device_str])

        # Calculate used GPUs and used engines on this node
        used_gpus = max(0, total_gpus - available_gpus)
        used_engines_on_node = used_gpus // world_size

        # Calculate how many new engines this node can accommodate
        available_engine_count = available_gpus // world_size

        # Create placement groups for new engines on this node
        for i in range(available_engine_count):
            if num_pg_created >= num_pg_to_create:
                break

            rank = old_dp_size + num_pg_created

            # Create bundles with node constraint for master node
            if node_ip == dp_master_ip:
                bundles = [
                    {device_str: 1.0, "node:" + dp_master_ip: 0.001}
                ] * world_size + [{"CPU": 1.0}]
            else:
                bundles = [{device_str: 1.0}] * world_size + [{"CPU": 1.0}]

            pg = ray.util.placement_group(
                name=f"dp_rank_{rank}",
                strategy="STRICT_PACK",
                bundles=bundles,
            )
            placement_groups.append(pg)

            # Local rank starts from the number of engines already used
            # on this node
            local_rank = used_engines_on_node + i
            local_dp_ranks.append(local_rank)
            num_pg_created += 1

    return placement_groups, local_dp_ranks

create_dp_placement_groups(vllm_config) staticmethod

Create placement groups for data parallel.

Source code in vllm/v1/engine/utils.py
@staticmethod
def create_dp_placement_groups(
    vllm_config: VllmConfig,
) -> tuple[list["PlacementGroup"], list[int]]:
    """
    Create placement groups for data parallel.
    """

    import ray
    from ray._private.state import available_resources_per_node

    logger.info("Creating placement groups for data parallel")
    dp_master_ip = vllm_config.parallel_config.data_parallel_master_ip
    dp_size = vllm_config.parallel_config.data_parallel_size
    dp_size_local = vllm_config.parallel_config.data_parallel_size_local

    available_resources = available_resources_per_node()
    world_size = vllm_config.parallel_config.world_size
    placement_groups: list[PlacementGroup] = []
    local_dp_ranks: list[int] = []

    dp_master_ip_key = f"node:{dp_master_ip}"
    nodes = sorted(
        available_resources.values(), key=lambda x: dp_master_ip_key not in x
    )
    assert len(nodes) > 0, "No nodes with resources found in Ray cluster."
    assert dp_master_ip_key in nodes[0], (
        f"The DP master node (ip: {dp_master_ip}) is missing or dead"
    )

    # optionally restrict DP placement to a caller-provided node set.
    requested_node_ips = {
        ip.strip()
        for ip in envs.VLLM_RAY_DP_PLACEMENT_NODE_IPS.split(",")
        if ip.strip()
    }
    if requested_node_ips:
        allowed_node_ips = set(requested_node_ips)
        # The master node must host the local ranks, so it has to be allowed.
        if dp_master_ip not in allowed_node_ips:
            allowed_node_ips.add(dp_master_ip)
        filtered_nodes = [
            node_resources
            for node_resources in nodes
            if _node_ip_from_resources(node_resources) in allowed_node_ips
        ]
        logger.info(
            "VLLM_RAY_DP_PLACEMENT_NODE_IPS set; restricting DP placement "
            "from %d to %d node(s): %s",
            len(nodes),
            len(filtered_nodes),
            sorted(allowed_node_ips),
        )
        nodes = filtered_nodes

    device_str = current_platform.ray_device_key
    n_node_devices: list[int] = [
        int(node_resources[device_str])
        for node_resources in nodes
        if device_str in node_resources
    ]
    assert n_node_devices, f"No {device_str} found in Ray cluster."
    max_device_per_node = max(n_node_devices)

    pack_strategy = envs.VLLM_RAY_DP_PACK_STRATEGY
    _supported_pack_strategies = ("strict", "fill", "span")
    if pack_strategy not in _supported_pack_strategies:
        raise ValueError(
            f"{envs.VLLM_RAY_DP_PACK_STRATEGY} is not supported. "
            "Make sure to set `VLLM_RAY_DP_PACK_STRATEGY` "
            f"to one of {_supported_pack_strategies}"
        )

    all2all_backend = vllm_config.parallel_config.all2all_backend
    if pack_strategy == "fill" and (
        all2all_backend == "deepep_high_throughput"
        or all2all_backend == "deepep_low_latency"
    ):
        raise ValueError(
            "DeepEP kernels require EP ranks [0,7] (same for [8,15], ...) "
            "to be on the same node, but VLLM_RAY_DP_PACK_STRATEGY=fill "
            "does not guarantee that. "
            "Please use VLLM_RAY_DP_PACK_STRATEGY=strict instead."
        )

    if pack_strategy in ("strict", "fill"):
        placement_strategy = "STRICT_PACK"
    else:
        placement_strategy = "PACK"
        assert world_size > max_device_per_node, (
            f"World size {world_size} is smaller than the "
            "maximum number of devices per node "
            f"{max_device_per_node}. Make sure to set "
            "`VLLM_RAY_DP_PACK_STRATEGY` to `strict` or `fill`"
        )

        # if we need multiple nodes per dp group, we require for now that
        # available nodes are homogeneous
        assert set(n_node_devices) == {max_device_per_node}, (
            f"Nodes are not homogeneous, {nodes}"
        )
        assert world_size % max_device_per_node == 0, (
            f"For multi-node data parallel groups, world_size ({world_size}) must "
            f"be a multiple of number of devices per node ({max_device_per_node})."
        )
        assert len(n_node_devices) * max_device_per_node >= world_size * dp_size, (
            f"Not enough total available nodes ({len(n_node_devices)}) "
            f"and devices per node ({max_device_per_node}) "
            f"to satisfy required world size {world_size} and data parallel size "
            f"{dp_size}"
        )
        assert dp_size_local == 1, (
            f"data-parallel-size-local {dp_size_local} should be set as the "
            "default (1) for VLLM_RAY_DP_PACK_STRATEGY=span. "
            "The actual data-parallel-size-local will be auto determined."
        )

    # bundles collected for a single DP rank from multiple nodes,
    # for "span" pack strategy
    collected_bundles = []
    for node_resources in nodes:
        node_ip = _node_ip_from_resources(node_resources)
        assert node_ip is not None, (
            f"No node IP key found in node resources: {node_resources}"
        )

        n_device_on_node = int(node_resources.get(device_str, 0))
        if pack_strategy == "span" and n_device_on_node != 0:
            # Strictly speaking,
            # dp_size_available = n_device_on_node / world_size
            # and is a fraction, but we use 1 for easier processing
            dp_size_available = 1
        else:
            dp_size_available = n_device_on_node // world_size

        if node_ip == dp_master_ip:
            if dp_size_available < dp_size_local:
                raise ValueError(
                    f"Not enough resources to allocate {dp_size_local} DP ranks "
                    f"on DP master node {dp_master_ip}, possible to fit "
                    f"{dp_size_available} DP ranks."
                )
            dp_size_to_allocate = dp_size_local
        elif pack_strategy == "strict":
            if dp_size_available < dp_size_local:
                logger.info(
                    "Skipping node %s as %s DP ranks could not fit, "
                    "possible to fit %s DP ranks",
                    node_ip,
                    dp_size_local,
                    dp_size_available,
                )
                continue
            dp_size_to_allocate = dp_size_local
        else:
            # for "pack_strategy" in "fill" and "span"
            # we always take everything that's available
            dp_size_to_allocate = dp_size_available

        for i in range(dp_size_to_allocate):
            device_bundle = [{device_str: 1.0, "node:" + node_ip: 0.001}]
            if pack_strategy == "span":
                collected_bundles += device_bundle * n_device_on_node
                assert len(collected_bundles) <= world_size, (
                    "collected_bundles should be <= world_size, "
                    f"but got {len(collected_bundles)=} and {world_size=}"
                )

                # we only create a placement group if we collected enough devices
                if len(collected_bundles) < world_size:
                    continue

                control_node_ip = _get_bundle_node_ip(collected_bundles[0])
                bundles = collected_bundles + [
                    _make_control_bundle(control_node_ip)
                ]
                collected_bundles = []
            else:
                # STRICT_PACK already keeps every bundle in the placement
                # group on one node, so the explicit node affinity on the
                # control bundle is redundant for correctness here. Keep it
                # anyway for consistency with the span path and to preserve
                # intent if this scheduling strategy changes later.
                bundles = device_bundle * world_size + [
                    _make_control_bundle(node_ip)
                ]

            pg = ray.util.placement_group(
                name=f"dp_rank_{len(placement_groups)}",
                strategy=placement_strategy,
                bundles=bundles,
            )
            placement_groups.append(pg)
            local_dp_ranks.append(i)
            if len(placement_groups) == dp_size:
                break

        if len(placement_groups) == dp_size:
            break

    if len(placement_groups) < dp_size:
        raise ValueError(
            f"Not enough resources to allocate {dp_size} "
            "placement groups, only created "
            f"{len(placement_groups)} placement groups. "
            "Available resources: "
            f"{available_resources}"
        )
    assert len(placement_groups) == dp_size, (
        f"Created {len(placement_groups)} DP placement groups, expected {dp_size}"
    )
    assert len(local_dp_ranks) == dp_size, (
        f"local_dp_ranks length {len(local_dp_ranks)} does not match "
        f"expected {dp_size}"
    )
    return placement_groups, local_dp_ranks

CoreEngineProcManager

Utility class to handle creation, readiness, and shutdown of background processes used by the AsyncLLM and LLMEngine.

Methods:

Source code in vllm/v1/engine/utils.py
class CoreEngineProcManager:
    """
    Utility class to handle creation, readiness, and shutdown
    of background processes used by the AsyncLLM and LLMEngine.
    """

    def __init__(
        self,
        local_engine_count: int,
        start_index: int,
        local_start_index: int,
        vllm_config: VllmConfig,
        local_client: bool,
        handshake_address: str,
        executor_class: type[Executor],
        log_stats: bool,
        client_handshake_address: str | None = None,
        tensor_queue: Queue | None = None,
    ):
        context = get_mp_context()
        common_kwargs = {
            "vllm_config": vllm_config,
            "local_client": local_client,
            "handshake_address": handshake_address,
            "executor_class": executor_class,
            "log_stats": log_stats,
            "tensor_queue": tensor_queue,
        }

        if client_handshake_address:
            common_kwargs["client_handshake_address"] = client_handshake_address

        is_dp = vllm_config.parallel_config.data_parallel_size > 1

        from vllm.v1.engine.core import EngineCoreProc

        self.processes: list[BaseProcess] = []
        local_dp_ranks = []
        for index in range(local_engine_count):
            local_index = local_start_index + index
            global_index = start_index + index

            # Start EngineCore in background process.
            local_dp_ranks.append(local_index)
            self.processes.append(
                context.Process(
                    target=EngineCoreProc.run_engine_core,
                    name=f"EngineCore_DP{global_index}" if is_dp else "EngineCore",
                    kwargs=common_kwargs
                    | {"dp_rank": global_index, "local_dp_rank": local_index},
                )
            )

        self._finalizer = weakref.finalize(self, shutdown, self.processes)
        self.manager_stopped = threading.Event()
        self.failed_proc_name: str | None = None

        # All ranks share this config object: capture the user-provided
        # --device-ids list before the per-rank shard overwrites it. Mutating
        # the config before each proc.start() works because the spawn method
        # pickles process args at start() time, sequentially per rank.
        user_assigned_gpu_ids = vllm_config.parallel_config.assigned_physical_gpu_ids
        try:
            for proc, local_dp_rank in zip(self.processes, local_dp_ranks):
                # Populate the logical-to-physical GPU mapping in DP for
                # platforms that cannot rely on
                # torch.accelerator.set_device_index(), and for Ray.
                needs_device_env_isolation = not (
                    current_platform.is_cuda_alike() or current_platform.is_xpu()
                )
                if is_dp and (
                    needs_device_env_isolation or vllm_config.parallel_config.use_ray
                ):
                    set_assigned_physical_gpu_ids_for_dp_rank(
                        vllm_config, local_dp_rank, user_assigned_gpu_ids
                    )

                with numa_utils.configure_subprocess(
                    # EngineCore itself does not have a TP/PP-local rank.
                    # When DP is enabled, set_assigned_physical_gpu_ids_for_dp_rank()
                    # populates the logical-to-physical mapping for this DP
                    # shard, so local_rank=0 means "the first local GPU in
                    # this shard". The actual TP/PP worker processes spawned
                    # by the executor are bound separately with their own
                    # local_rank values.
                    vllm_config,
                    local_rank=0,
                    dp_local_rank=local_dp_rank,
                    process_kind="EngineCore",
                ):
                    proc.start()
        finally:
            # Kill other procs if not all are running.
            if self.finished_procs():
                self.shutdown()

    def shutdown(self, timeout: float | None = None) -> None:
        """Shutdown engine core processes with configurable timeout."""
        self.manager_stopped.set()
        if self._finalizer.detach() is not None:
            shutdown(self.processes, timeout=timeout)

    def monitor_engine_liveness(self) -> None:
        """Monitor engine core process liveness."""

        sentinel_to_proc = {proc.sentinel: proc for proc in self.processes}
        sentinels = set(sentinel_to_proc.keys())

        while sentinels and not self.manager_stopped.is_set():
            died_sentinels = connection.wait(sentinels, timeout=1)

            for sentinel in died_sentinels:
                proc = sentinel_to_proc.pop(cast(int, sentinel))
                exitcode = proc.exitcode
                if exitcode != 0 and not self.manager_stopped.is_set():
                    self.failed_proc_name = proc.name
            if died_sentinels:
                # Any engine exit currently triggers a shutdown. Future
                # work (e.g., Elastic and fault-tolerant EP) will add finer-grained
                # handling for different exit scenarios.
                break

        self.shutdown()

    def sentinels(self) -> list:
        return [proc.sentinel for proc in self.processes]

    def finished_procs(self) -> dict[str, int]:
        """Returns dict of proc name -> exit code for any finished procs."""
        return {
            proc.name: proc.exitcode
            for proc in self.processes
            if proc.exitcode is not None
        }

finished_procs()

Returns dict of proc name -> exit code for any finished procs.

Source code in vllm/v1/engine/utils.py
def finished_procs(self) -> dict[str, int]:
    """Returns dict of proc name -> exit code for any finished procs."""
    return {
        proc.name: proc.exitcode
        for proc in self.processes
        if proc.exitcode is not None
    }

monitor_engine_liveness()

Monitor engine core process liveness.

Source code in vllm/v1/engine/utils.py
def monitor_engine_liveness(self) -> None:
    """Monitor engine core process liveness."""

    sentinel_to_proc = {proc.sentinel: proc for proc in self.processes}
    sentinels = set(sentinel_to_proc.keys())

    while sentinels and not self.manager_stopped.is_set():
        died_sentinels = connection.wait(sentinels, timeout=1)

        for sentinel in died_sentinels:
            proc = sentinel_to_proc.pop(cast(int, sentinel))
            exitcode = proc.exitcode
            if exitcode != 0 and not self.manager_stopped.is_set():
                self.failed_proc_name = proc.name
        if died_sentinels:
            # Any engine exit currently triggers a shutdown. Future
            # work (e.g., Elastic and fault-tolerant EP) will add finer-grained
            # handling for different exit scenarios.
            break

    self.shutdown()

shutdown(timeout=None)

Shutdown engine core processes with configurable timeout.

Source code in vllm/v1/engine/utils.py
def shutdown(self, timeout: float | None = None) -> None:
    """Shutdown engine core processes with configurable timeout."""
    self.manager_stopped.set()
    if self._finalizer.detach() is not None:
        shutdown(self.processes, timeout=timeout)

EngineHandshakeMetadata dataclass

Metadata sent to each engine process during startup handshake, including addresses of the front-end ZMQ queues that they should connect to.

Source code in vllm/v1/engine/utils.py
@dataclass
class EngineHandshakeMetadata:
    """Metadata sent to each engine process during startup handshake,
    including addresses of the front-end ZMQ queues that they should
    connect to.
    """

    addresses: EngineZmqAddresses
    parallel_config: dict[str, int | str | list[int]]

SignalCallback

Safely trigger a callback from signal handler context via a dedicated thread.

Source code in vllm/v1/engine/utils.py
class SignalCallback:
    """Safely trigger a callback from signal handler context via a dedicated thread."""

    def __init__(self, callback: Callable[[], None]):
        self._callback = callback
        self._event = threading.Event()
        self._stopped = False
        self._thread = threading.Thread(
            target=self._run,
            daemon=True,
            name="signal-callback",
        )
        self._thread.start()

    def _run(self):
        self._event.wait()
        if not self._stopped:
            self._callback()

    def trigger(self):
        self._event.set()

    def stop(self):
        self._stopped = True
        self._event.set()

_node_ip_from_resources(node_resources)

Return the node IP encoded in a Ray per-node resource dict, or None.

Ray advertises each node's IP as a node:<ip> resource key. The head node also carries node:__internal_head__, and placement groups add ..._group_... keys; both are ignored.

Source code in vllm/v1/engine/utils.py
def _node_ip_from_resources(node_resources: dict) -> str | None:
    """Return the node IP encoded in a Ray per-node resource dict, or None.

    Ray advertises each node's IP as a ``node:<ip>`` resource key. The head node
    also carries ``node:__internal_head__``, and placement groups add
    ``..._group_...`` keys; both are ignored.
    """
    for key in node_resources:
        if (
            key.startswith("node:")
            and key != "node:__internal_head__"
            and "_group_" not in key
        ):
            return key.split(":", 1)[1]
    return None

get_engine_zmq_addresses(vllm_config, num_api_servers=1, *, defer_api_server_ports=True)

Allocate ZMQ addresses for engine-client communication.

By default each TCP address is a tcp://host:0 placeholder; the consumer (API-server child or single-process MPClient) binds, then recovers the kernel-assigned port via getsockopt(zmq.LAST_ENDPOINT) and writes it back into addresses before the engine handshake.

Set defer_api_server_ports=False only when the consumer cannot report a bound port back (e.g. the Rust front-end). IPC paths are unaffected.

Source code in vllm/v1/engine/utils.py
def get_engine_zmq_addresses(
    vllm_config: VllmConfig,
    num_api_servers: int = 1,
    *,
    defer_api_server_ports: bool = True,
) -> EngineZmqAddresses:
    """Allocate ZMQ addresses for engine-client communication.

    By default each TCP address is a ``tcp://host:0`` placeholder; the
    consumer (API-server child or single-process ``MPClient``) binds, then
    recovers the kernel-assigned port via ``getsockopt(zmq.LAST_ENDPOINT)``
    and writes it back into ``addresses`` before the engine handshake.

    Set ``defer_api_server_ports=False`` only when the consumer cannot
    report a bound port back (e.g. the Rust front-end). IPC paths are
    unaffected."""
    parallel_config = vllm_config.parallel_config
    local_engine_count = parallel_config.data_parallel_size_local
    local_start_index = parallel_config.data_parallel_rank_local
    dp_size = parallel_config.data_parallel_size
    host = parallel_config.data_parallel_master_ip
    local_engines_only = parallel_config.local_engines_only

    # In offline mode there is an LLM instance per DP rank and
    # one core engine per LLM, see
    # examples/features/data_parallel/data_parallel_offline.py.
    offline_mode = local_start_index is not None

    # client_local_only = True for cases where this front-end
    # sends requests only to colocated engines.
    client_local_only = (
        offline_mode or local_engines_only or (local_engine_count == dp_size)
    )
    # NOTE(yongji): handling scaling from intra-node to inter-node
    if parallel_config.enable_elastic_ep:
        client_local_only = False

    def _addr() -> str:
        if client_local_only:
            return get_open_zmq_ipc_path()
        return get_tcp_uri(host, 0 if defer_api_server_ports else get_open_port())

    return EngineZmqAddresses(
        inputs=[_addr() for _ in range(num_api_servers)],
        outputs=[_addr() for _ in range(num_api_servers)],
    )

get_physical_gpu_ids_for_local_dp_rank(device_control_env_var, local_dp_rank, world_size, local_world_size=None, user_assigned_gpu_ids=None)

Returns list of physical GPU IDs for the specified data parallel rank.

For example, if world_size=2 and local_dp_rank=1, and there are 4 devices, this will return [2, 3] for local_dp_rank=1.

If user_assigned_gpu_ids is provided (e.g. from --device-ids), this DP rank's shard is sliced from it instead of being derived from the device-control env var.

Source code in vllm/v1/engine/utils.py
def get_physical_gpu_ids_for_local_dp_rank(
    device_control_env_var: str,
    local_dp_rank: int,
    world_size: int,
    local_world_size: int | None = None,
    user_assigned_gpu_ids: list[int] | None = None,
) -> list[int]:
    """
    Returns list of physical GPU IDs for the specified
    data parallel rank.

    For example, if world_size=2 and local_dp_rank=1, and there are 4 devices,
    this will return [2, 3] for local_dp_rank=1.

    If user_assigned_gpu_ids is provided (e.g. from --device-ids), this DP
    rank's shard is sliced from it instead of being derived from the
    device-control env var.
    """
    if local_world_size is None:
        local_world_size = world_size
    if user_assigned_gpu_ids is not None:
        start = local_dp_rank * world_size
        stop = start + local_world_size
        if stop > len(user_assigned_gpu_ids):
            raise ValueError(
                f"--device-ids provides {len(user_assigned_gpu_ids)} devices, "
                f"but DP rank {local_dp_rank} needs devices [{start}, {stop})"
            )
        return user_assigned_gpu_ids[start:stop]
    try:
        return [
            current_platform.device_id_to_physical_device_id(i)
            for i in range(
                local_dp_rank * world_size,
                local_dp_rank * world_size + local_world_size,
            )
        ]
    except IndexError as e:
        raise Exception(
            f"Error computing device indices for "
            f"{device_control_env_var}: "
            f"local range: [{local_dp_rank * world_size}, "
            f"{(local_dp_rank + 1) * world_size}) "
            "base value: "
            f'"{os.getenv(device_control_env_var)}"'
        ) from e

launch_core_engines(vllm_config, executor_class, log_stats, addresses, num_api_servers=1)

Launch engine and DP coordinator processes as needed.

Source code in vllm/v1/engine/utils.py
@contextlib.contextmanager
def launch_core_engines(
    vllm_config: VllmConfig,
    executor_class: type[Executor],
    log_stats: bool,
    addresses: EngineZmqAddresses,
    num_api_servers: int = 1,
) -> Iterator[
    tuple[
        CoreEngineProcManager | CoreEngineActorManager | None,
        DPCoordinator | None,
        EngineZmqAddresses,
        Queue | None,
    ]
]:
    """Launch engine and DP coordinator processes as needed."""

    parallel_config = vllm_config.parallel_config
    dp_size = parallel_config.data_parallel_size
    local_engine_count = parallel_config.data_parallel_size_local
    local_start_index = parallel_config.data_parallel_rank_local
    dp_rank = parallel_config.data_parallel_rank
    host = parallel_config.data_parallel_master_ip
    local_engines_only = parallel_config.local_engines_only

    offline_mode = local_start_index is not None

    # Create a single tensor IPC queue for sharing multimodal tensors between
    # API servers and engine core. Returns a single queue since we only support
    # DP=1 for this data flow.
    tensor_queue: Queue | None = None
    multimodal_config = vllm_config.model_config.multimodal_config
    if multimodal_config is not None and multimodal_config.mm_tensor_ipc == "torch_shm":
        tensor_queue = get_mp_context().Queue()

    # Run the DP Coordinator process with rank 0 when in online DP mode.
    # The coordinator is needed for:
    # 1. Internal/hybrid LB: collecting and publishing queue stats for load balancing
    # 2. MoE models: wave coordination in addition to stats
    run_coordinator = (
        vllm_config.needs_dp_coordinator and not offline_mode and dp_rank == 0
    )

    if run_coordinator:
        coordinator = DPCoordinator(
            parallel_config,
            enable_wave_coordination=vllm_config.model_config.is_moe,
        )

        addresses.coordinator_input, addresses.coordinator_output = (
            coordinator.get_engine_socket_addresses()
        )
        addresses.frontend_stats_publish_address = (
            coordinator.get_stats_publish_address()
        )

        logger.info("Started DP Coordinator process (PID: %d)", coordinator.proc.pid)
    else:
        coordinator = None

    if parallel_config.data_parallel_backend == "ray":
        logger.info("Starting ray-based data parallel backend")

        engine_actor_manager = CoreEngineActorManager(
            vllm_config=vllm_config,
            addresses=addresses,
            executor_class=executor_class,
            log_stats=log_stats,
        )

        yield engine_actor_manager, coordinator, addresses, tensor_queue
        return

    if offline_mode:
        assert local_engine_count == 1
        engines_to_handshake = [CoreEngine(index=dp_rank, local=True)]
    elif dp_rank == 0:
        # Rank 0 holds Coordinator, so it handshakes with all Cores
        # in both external dplb and internal dplb mode.
        # Note this also covers the case where we have zero local engines
        # and rank 0 is headless.
        engines_to_handshake = [
            CoreEngine(index=i, local=(i < local_engine_count)) for i in range(dp_size)
        ]
    else:
        # Rank > 0 handshakes with just the local cores it is managing.
        assert local_engines_only, (
            "Attempting to launch core_engines from dp_rank > 0, but "
            "found internal DPLB, which is incompatible."
        )
        engines_to_handshake = [
            CoreEngine(index=i, local=True)
            for i in range(dp_rank, dp_rank + local_engine_count)
        ]

    # Whether the started engines will handshake only with co-located
    # front-end processes. In external_dp_lb mode, ranks > 0 handshake with
    # their co-located frontend and also the rank 0 front-end, and hence this
    # will be False.
    handshake_local_only = offline_mode or local_engine_count == dp_size

    # NOTE(yongji): handling scaling from intra-node to inter-node
    if parallel_config.enable_elastic_ep:
        handshake_local_only = False

    # Preserve "port=0 means auto-pick" for the handshake address, which
    # is consumed by engines spawned in this process and so cannot defer
    # port resolution to bind time.
    rpc_port = parallel_config.data_parallel_rpc_port or get_open_port()
    handshake_address = get_engine_client_zmq_addr(handshake_local_only, host, rpc_port)

    if local_engines_only and dp_rank > 0:
        assert not handshake_local_only
        local_handshake_address = get_open_zmq_ipc_path()
        client_handshake_address = local_handshake_address
    else:
        local_handshake_address = handshake_address
        client_handshake_address = None

    with zmq_socket_ctx(
        local_handshake_address, zmq.ROUTER, bind=True
    ) as handshake_socket:
        # Start local engines.
        if local_engine_count:
            local_engine_manager = CoreEngineProcManager(
                vllm_config=vllm_config,
                executor_class=executor_class,
                log_stats=log_stats,
                handshake_address=handshake_address,
                client_handshake_address=client_handshake_address,
                local_client=True,
                local_engine_count=local_engine_count,
                start_index=dp_rank,
                local_start_index=local_start_index or 0,
                tensor_queue=tensor_queue,
            )
        else:
            local_engine_manager = None

        yield local_engine_manager, coordinator, addresses, tensor_queue

        # Now wait for engines to start.
        wait_for_engine_startup(
            handshake_socket,
            addresses,
            engines_to_handshake,
            parallel_config,
            dp_size > 1 and vllm_config.model_config.is_moe,
            vllm_config.cache_config,
            local_engine_manager,
            coordinator.proc if coordinator else None,
        )

set_assigned_physical_gpu_ids_for_dp_rank(vllm_config, local_dp_rank, user_assigned_gpu_ids=None)

Populate assigned_physical_gpu_ids on the config for the given DP rank.

user_assigned_gpu_ids is the full (un-sharded) --device-ids list, if the user provided one; this DP rank's shard is sliced from it. It is passed explicitly rather than read from the config because callers may reuse one config object across DP ranks, overwriting the field each time.

Source code in vllm/v1/engine/utils.py
def set_assigned_physical_gpu_ids_for_dp_rank(
    vllm_config: VllmConfig,
    local_dp_rank: int,
    user_assigned_gpu_ids: list[int] | None = None,
) -> None:
    """
    Populate assigned_physical_gpu_ids on the config for the given DP rank.

    user_assigned_gpu_ids is the full (un-sharded) --device-ids list, if the
    user provided one; this DP rank's shard is sliced from it. It is passed
    explicitly rather than read from the config because callers may reuse
    one config object across DP ranks, overwriting the field each time.
    """
    world_size = vllm_config.parallel_config.world_size
    local_world_size = vllm_config.parallel_config.local_world_size
    evar = current_platform.device_control_env_var

    physical_gpu_ids = get_physical_gpu_ids_for_local_dp_rank(
        evar,
        local_dp_rank,
        world_size,
        local_world_size,
        user_assigned_gpu_ids=user_assigned_gpu_ids,
    )
    vllm_config.parallel_config.assigned_physical_gpu_ids = physical_gpu_ids