Skip to content

vllm.v1.executor.ray_executor_v2

Classes:

  • RayExecutorV2

    Ray-based distributed executor using MessageQueue communication.

  • RayWorkerHandle

    Handle for a Ray worker actor, compatible with MultiprocExecutor.

  • RayWorkerProc

    Worker process that runs inside a Ray actor.

RayExecutorV2

Bases: MultiprocExecutor

Ray-based distributed executor using MessageQueue communication.

Inherits from MultiprocExecutor to reuse the MQ-based control plane and NCCL data plane. Workers are Ray actors.

Async scheduling is enabled, inherited from MultiprocExecutor. This is cricitcal for RayExecutorV2 to be performant.

Methods:

  • shutdown

    Properly shut down the executor and its workers.

  • start_worker_monitor

    Monitor worker liveness via ray.wait() on run() ObjectRefs.

Source code in vllm/v1/executor/ray_executor_v2.py
class RayExecutorV2(MultiprocExecutor):
    """Ray-based distributed executor using MessageQueue communication.

    Inherits from MultiprocExecutor to reuse the MQ-based control plane
    and NCCL data plane. Workers are Ray actors.

    Async scheduling is enabled, inherited from MultiprocExecutor.
    This is cricitcal for RayExecutorV2 to be performant.
    """

    uses_ray: bool = True
    supports_pp: bool = True

    def __init__(self, vllm_config: VllmConfig):
        super().__init__(vllm_config)

    def _build_runtime_env(self) -> dict:
        """Build a runtime_env dict for RayWorkerProc actors.

        Driver env vars are applied separately via initialize_worker
        with setdefault semantics.
        """
        base = self.parallel_config.ray_runtime_env
        runtime_env: dict = copy.deepcopy(dict(base)) if base else {}

        env_vars = runtime_env.setdefault("env_vars", {})
        env_vars.update({v: "1" for v in current_platform.ray_noset_device_env_vars})
        if self.parallel_config.ray_workers_use_nsight:
            runtime_env["nsight"] = {
                "t": "cuda,cudnn,cublas",
                "o": "'worker_process_%p'",
                "cuda-graph-trace": "node",
            }
        return runtime_env

    @staticmethod
    def _get_actor_resource_kwargs() -> dict[str, Any]:
        """Return Ray actor resource kwargs for the current platform."""
        num_devices = envs.VLLM_RAY_PER_WORKER_GPUS
        device_key = current_platform.ray_device_key
        if device_key == "GPU":
            return {"num_gpus": num_devices}
        return {"num_gpus": 0, "resources": {device_key: num_devices}}

    def _init_executor(self) -> None:
        """Initialize the RayExecutorV2 executor."""
        self._finalizer = weakref.finalize(self, self.shutdown)
        self.is_failed = False
        self.failure_callback = None
        self.shutting_down = False
        self.shutdown_lock = threading.Lock()

        # Step 1: Initialize Ray cluster and retrieve placement group
        if ray is None:
            raise ImportError("Using Ray backend requires installation of ray.")
        initialize_ray_cluster(self.parallel_config, require_gpu_on_driver=False)
        placement_group = self.parallel_config.placement_group

        tp_size, pp_size, pcp_size = self._get_parallel_sizes()
        assert self.world_size == tp_size * pp_size * pcp_size, (
            f"world_size ({self.world_size}) must be equal to the "
            f"tensor_parallel_size ({tp_size}) x pipeline"
            f"_parallel_size ({pp_size}) x prefill_context"
            f"_parallel_size ({pcp_size}). "
        )

        # Step 2: Build bundle assignments for worker rank placement
        # while respecting VLLM_RAY_BUNDLE_INDICES.
        if envs.VLLM_RAY_BUNDLE_INDICES:
            bundle_to_node_id = get_bundles_for_indices(
                placement_group,
                list(map(int, envs.VLLM_RAY_BUNDLE_INDICES.split(","))),
                self.world_size,
            )
        else:
            bundle_to_node_id = get_bundles_sorted_by_node(placement_group)
        driver_node = ray.get_runtime_context().get_node_id()

        bundle_assignments: list[dict[str, Any]] = []
        for rank, (bundle_id_idx, node_id, node_ip) in enumerate(bundle_to_node_id):
            bundle_assignments.append(
                {
                    "rank": rank,
                    "bundle_id_idx": bundle_id_idx,
                    "node_id": node_id,
                    "node_ip": node_ip,
                }
            )

        # Step 3: Resolve the IP for torch.distributed TCPStore.
        # The TCPStore server runs on rank 0's node, so all workers
        # must be able to reach this address.
        dist_ip = bundle_assignments[0]["node_ip"]
        distributed_init_method = get_distributed_init_method(dist_ip, get_open_port())

        # Step 4: Create broadcast MessageQueue.
        # Workers on the driver node use shared memory; the rest use TCP.
        max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024
        n_local = sum(1 for a in bundle_assignments if a["node_id"] == driver_node)
        self.rpc_broadcast_mq = MessageQueue(
            self.world_size,
            n_local,
            max_chunk_bytes=max_chunk_bytes,
            connect_ip=ray.util.get_node_ip_address(),
        )
        scheduler_output_handle = self.rpc_broadcast_mq.export_handle()

        # Step 5: Spawn RayWorkerProc actors into PG bundles (deferred init).
        # Workers are created lightweight here; full initialization happens
        # in Step 7 after GPU IDs are discovered.
        self.ray_worker_handles: list[RayWorkerHandle] = []
        instance_id = self.vllm_config.instance_id

        # Collect driver env vars and apply but don't overwrite node-local values.
        self.driver_env_vars = get_driver_env_vars(
            worker_specific_vars=WORKER_SPECIFIC_ENV_VARS,
        )

        runtime_env = self._build_runtime_env()
        resource_kwargs = self._get_actor_resource_kwargs()

        for bundle_idx in range(self.world_size):
            bundle = bundle_assignments[bundle_idx]
            is_driver_worker = self._is_driver_worker(bundle["rank"])
            is_driver_node = bundle["node_id"] == driver_node

            scheduling_strategy = PlacementGroupSchedulingStrategy(
                placement_group=placement_group,
                placement_group_bundle_index=bundle["bundle_id_idx"],
            )

            actor_name = build_actor_name(
                instance_id, bundle["rank"], tp_size, pp_size, pcp_size
            )

            actor = (
                ray.remote(RayWorkerProc)
                .options(
                    name=actor_name,
                    num_cpus=0,
                    **resource_kwargs,
                    scheduling_strategy=scheduling_strategy,
                    runtime_env=runtime_env,
                )
                .remote(
                    vllm_config=self.vllm_config,
                    rank=bundle["rank"],
                    distributed_init_method=distributed_init_method,
                    input_shm_handle=scheduler_output_handle,
                    is_driver_worker=is_driver_worker,
                    is_driver_node=is_driver_node,
                )
            )

            handle = RayWorkerHandle(
                actor=actor,
                rank=bundle["rank"],
                local_rank=-1,  # Set in Step 7 after GPU ID discovery
                node_id=bundle["node_id"],
                bundle_id_idx=bundle["bundle_id_idx"],
            )
            self.ray_worker_handles.append(handle)

        # Step 6: Discover physical GPU IDs assigned to each worker via Ray
        # runtime context.
        worker_node_and_physical_gpu_ids = ray.get(
            [
                h.actor.get_node_and_physical_gpu_ids.remote()
                for h in self.ray_worker_handles
            ]
        )

        node_workers: dict[str, list[int]] = defaultdict(list)
        node_physical_gpu_ids: dict[str, list[int]] = defaultdict(list)
        for i, (node_id, physical_gpu_ids) in enumerate(
            worker_node_and_physical_gpu_ids
        ):
            node_workers[node_id].append(i)
            node_physical_gpu_ids[node_id].extend(physical_gpu_ids)
        for node_id, physical_gpu_ids in node_physical_gpu_ids.items():
            node_physical_gpu_ids[node_id] = sorted(physical_gpu_ids)

        # Step 7: Initialize workers with local logical ranks and the
        # logical-to-physical GPU mapping discovered from Ray placement.
        init_worker_refs = []
        for i, (node_id, _) in enumerate(worker_node_and_physical_gpu_ids):
            local_rank = node_workers[node_id].index(i)
            assigned_physical_gpu_ids = sorted(node_physical_gpu_ids[node_id])
            worker_env_vars: dict[str, str] = {}
            self.ray_worker_handles[i].local_rank = local_rank
            init_worker_refs.append(
                self.ray_worker_handles[i].actor.initialize_worker.remote(
                    local_rank,
                    worker_env_vars,
                    self.driver_env_vars,
                    assigned_physical_gpu_ids=assigned_physical_gpu_ids,
                )
            )
        # Also set on the executor-side config for consistency. The mapping
        # is per-node, so only do this when all workers share one node.
        if len(node_physical_gpu_ids) == 1:
            node_id_0 = worker_node_and_physical_gpu_ids[0][0]
            self.vllm_config.parallel_config.assigned_physical_gpu_ids = sorted(
                node_physical_gpu_ids[node_id_0]
            )
        ray.get(init_worker_refs)

        # Step 8: Collect response MQ handles
        init_results = ray.get(
            [h.actor.wait_for_init.remote() for h in self.ray_worker_handles]
        )

        self.response_mqs: list[MessageQueue] = []
        for i, result in enumerate(init_results):
            if result["status"] != RayWorkerProc.READY_STR:
                raise RuntimeError(f"Worker {i} failed to initialize: {result}")
            self.response_mqs.append(
                MessageQueue.create_from_handle(result["handle"], 0)
            )

        # Step 9: Start run() before wait_until_ready() to avoid
        # deadlock — workers send subscriptions inside run().
        for handle in self.ray_worker_handles:
            handle.run()

        # Step 10: wait_until_ready() barrier
        self.rpc_broadcast_mq.wait_until_ready()
        for response_mq in self.response_mqs:
            response_mq.wait_until_ready()

        self.futures_queue = deque[FutureWrapper]()
        self._post_init_executor()

        self.start_worker_monitor()
        self.output_rank = self._get_output_rank()

    def start_worker_monitor(self, inline=False) -> None:
        """Monitor worker liveness via ray.wait() on run() ObjectRefs."""
        run_refs = [h.run_ref for h in self.ray_worker_handles if h.run_ref is not None]
        if not run_refs:
            raise RuntimeError("Ray workers have not started successfully.")

        self_ref = weakref.ref(self)
        ref_to_rank = {
            h.run_ref: h.rank for h in self.ray_worker_handles if h.run_ref is not None
        }

        def _should_stop() -> bool:
            executor = self_ref()
            return not executor or executor.shutting_down

        def monitor_workers():
            # Poll with a timeout rather than blocking on ray.wait()
            # because a blocking call would segfault if Ray is torn down
            # while this thread is inside it.
            while not _should_stop() and ray.is_initialized():
                try:
                    done, _ = ray.wait(run_refs, num_returns=1, timeout=5.0)
                except Exception:
                    logger.exception(
                        "RayWorkerMonitor: unexpected error, exiting monitor thread"
                    )
                    return
                if not done or _should_stop():
                    continue

                dead_ranks = [ref_to_rank[r] for r in done]
                executor = self_ref()
                if not executor:
                    return
                executor.is_failed = True
                logger.error(
                    "RayWorkerProc rank=%s died unexpectedly, shutting down executor.",
                    dead_ranks,
                )
                executor.shutdown()
                if executor.failure_callback is not None:
                    callback = executor.failure_callback
                    executor.failure_callback = None
                    callback()
                return

        t = threading.Thread(
            target=monitor_workers, daemon=True, name="RayWorkerMonitor"
        )
        t.start()
        self._monitor_thread = t

    def _join_monitor_thread(self) -> None:
        """Wait for the monitor thread to exit.

        Must be called before tearing down Ray resources — the monitor
        may be inside ray.wait() which would segfault if Ray is shut
        down underneath it. When the monitor itself calls shutdown()
        on worker death, we skip the join because the thread is about
        to return anyway.
        """
        monitor = getattr(self, "_monitor_thread", None)
        if (
            monitor is not None
            and monitor.is_alive()
            and threading.current_thread() is not monitor
        ):
            monitor.join(timeout=10)

    def shutdown(self) -> None:
        """Properly shut down the executor and its workers."""
        lock = getattr(self, "shutdown_lock", None)
        if lock is None:
            return

        with lock:
            if getattr(self, "shutting_down", False):
                return
            self.shutting_down = True

        self._join_monitor_thread()

        for handle in getattr(self, "ray_worker_handles", []):
            try:
                ray.kill(handle.actor)
                logger.debug("Killed actor rank=%d", handle.rank)
            except Exception:
                logger.exception("Failed to kill actor rank=%d", handle.rank)

        if rpc_broadcast_mq := getattr(self, "rpc_broadcast_mq", None):
            rpc_broadcast_mq.shutdown()
            self.rpc_broadcast_mq = None

        for mq in getattr(self, "response_mqs", []):
            mq.shutdown()
        self.response_mqs = []

_build_runtime_env()

Build a runtime_env dict for RayWorkerProc actors.

Driver env vars are applied separately via initialize_worker with setdefault semantics.

Source code in vllm/v1/executor/ray_executor_v2.py
def _build_runtime_env(self) -> dict:
    """Build a runtime_env dict for RayWorkerProc actors.

    Driver env vars are applied separately via initialize_worker
    with setdefault semantics.
    """
    base = self.parallel_config.ray_runtime_env
    runtime_env: dict = copy.deepcopy(dict(base)) if base else {}

    env_vars = runtime_env.setdefault("env_vars", {})
    env_vars.update({v: "1" for v in current_platform.ray_noset_device_env_vars})
    if self.parallel_config.ray_workers_use_nsight:
        runtime_env["nsight"] = {
            "t": "cuda,cudnn,cublas",
            "o": "'worker_process_%p'",
            "cuda-graph-trace": "node",
        }
    return runtime_env

_get_actor_resource_kwargs() staticmethod

Return Ray actor resource kwargs for the current platform.

Source code in vllm/v1/executor/ray_executor_v2.py
@staticmethod
def _get_actor_resource_kwargs() -> dict[str, Any]:
    """Return Ray actor resource kwargs for the current platform."""
    num_devices = envs.VLLM_RAY_PER_WORKER_GPUS
    device_key = current_platform.ray_device_key
    if device_key == "GPU":
        return {"num_gpus": num_devices}
    return {"num_gpus": 0, "resources": {device_key: num_devices}}

_init_executor()

Initialize the RayExecutorV2 executor.

Source code in vllm/v1/executor/ray_executor_v2.py
def _init_executor(self) -> None:
    """Initialize the RayExecutorV2 executor."""
    self._finalizer = weakref.finalize(self, self.shutdown)
    self.is_failed = False
    self.failure_callback = None
    self.shutting_down = False
    self.shutdown_lock = threading.Lock()

    # Step 1: Initialize Ray cluster and retrieve placement group
    if ray is None:
        raise ImportError("Using Ray backend requires installation of ray.")
    initialize_ray_cluster(self.parallel_config, require_gpu_on_driver=False)
    placement_group = self.parallel_config.placement_group

    tp_size, pp_size, pcp_size = self._get_parallel_sizes()
    assert self.world_size == tp_size * pp_size * pcp_size, (
        f"world_size ({self.world_size}) must be equal to the "
        f"tensor_parallel_size ({tp_size}) x pipeline"
        f"_parallel_size ({pp_size}) x prefill_context"
        f"_parallel_size ({pcp_size}). "
    )

    # Step 2: Build bundle assignments for worker rank placement
    # while respecting VLLM_RAY_BUNDLE_INDICES.
    if envs.VLLM_RAY_BUNDLE_INDICES:
        bundle_to_node_id = get_bundles_for_indices(
            placement_group,
            list(map(int, envs.VLLM_RAY_BUNDLE_INDICES.split(","))),
            self.world_size,
        )
    else:
        bundle_to_node_id = get_bundles_sorted_by_node(placement_group)
    driver_node = ray.get_runtime_context().get_node_id()

    bundle_assignments: list[dict[str, Any]] = []
    for rank, (bundle_id_idx, node_id, node_ip) in enumerate(bundle_to_node_id):
        bundle_assignments.append(
            {
                "rank": rank,
                "bundle_id_idx": bundle_id_idx,
                "node_id": node_id,
                "node_ip": node_ip,
            }
        )

    # Step 3: Resolve the IP for torch.distributed TCPStore.
    # The TCPStore server runs on rank 0's node, so all workers
    # must be able to reach this address.
    dist_ip = bundle_assignments[0]["node_ip"]
    distributed_init_method = get_distributed_init_method(dist_ip, get_open_port())

    # Step 4: Create broadcast MessageQueue.
    # Workers on the driver node use shared memory; the rest use TCP.
    max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024
    n_local = sum(1 for a in bundle_assignments if a["node_id"] == driver_node)
    self.rpc_broadcast_mq = MessageQueue(
        self.world_size,
        n_local,
        max_chunk_bytes=max_chunk_bytes,
        connect_ip=ray.util.get_node_ip_address(),
    )
    scheduler_output_handle = self.rpc_broadcast_mq.export_handle()

    # Step 5: Spawn RayWorkerProc actors into PG bundles (deferred init).
    # Workers are created lightweight here; full initialization happens
    # in Step 7 after GPU IDs are discovered.
    self.ray_worker_handles: list[RayWorkerHandle] = []
    instance_id = self.vllm_config.instance_id

    # Collect driver env vars and apply but don't overwrite node-local values.
    self.driver_env_vars = get_driver_env_vars(
        worker_specific_vars=WORKER_SPECIFIC_ENV_VARS,
    )

    runtime_env = self._build_runtime_env()
    resource_kwargs = self._get_actor_resource_kwargs()

    for bundle_idx in range(self.world_size):
        bundle = bundle_assignments[bundle_idx]
        is_driver_worker = self._is_driver_worker(bundle["rank"])
        is_driver_node = bundle["node_id"] == driver_node

        scheduling_strategy = PlacementGroupSchedulingStrategy(
            placement_group=placement_group,
            placement_group_bundle_index=bundle["bundle_id_idx"],
        )

        actor_name = build_actor_name(
            instance_id, bundle["rank"], tp_size, pp_size, pcp_size
        )

        actor = (
            ray.remote(RayWorkerProc)
            .options(
                name=actor_name,
                num_cpus=0,
                **resource_kwargs,
                scheduling_strategy=scheduling_strategy,
                runtime_env=runtime_env,
            )
            .remote(
                vllm_config=self.vllm_config,
                rank=bundle["rank"],
                distributed_init_method=distributed_init_method,
                input_shm_handle=scheduler_output_handle,
                is_driver_worker=is_driver_worker,
                is_driver_node=is_driver_node,
            )
        )

        handle = RayWorkerHandle(
            actor=actor,
            rank=bundle["rank"],
            local_rank=-1,  # Set in Step 7 after GPU ID discovery
            node_id=bundle["node_id"],
            bundle_id_idx=bundle["bundle_id_idx"],
        )
        self.ray_worker_handles.append(handle)

    # Step 6: Discover physical GPU IDs assigned to each worker via Ray
    # runtime context.
    worker_node_and_physical_gpu_ids = ray.get(
        [
            h.actor.get_node_and_physical_gpu_ids.remote()
            for h in self.ray_worker_handles
        ]
    )

    node_workers: dict[str, list[int]] = defaultdict(list)
    node_physical_gpu_ids: dict[str, list[int]] = defaultdict(list)
    for i, (node_id, physical_gpu_ids) in enumerate(
        worker_node_and_physical_gpu_ids
    ):
        node_workers[node_id].append(i)
        node_physical_gpu_ids[node_id].extend(physical_gpu_ids)
    for node_id, physical_gpu_ids in node_physical_gpu_ids.items():
        node_physical_gpu_ids[node_id] = sorted(physical_gpu_ids)

    # Step 7: Initialize workers with local logical ranks and the
    # logical-to-physical GPU mapping discovered from Ray placement.
    init_worker_refs = []
    for i, (node_id, _) in enumerate(worker_node_and_physical_gpu_ids):
        local_rank = node_workers[node_id].index(i)
        assigned_physical_gpu_ids = sorted(node_physical_gpu_ids[node_id])
        worker_env_vars: dict[str, str] = {}
        self.ray_worker_handles[i].local_rank = local_rank
        init_worker_refs.append(
            self.ray_worker_handles[i].actor.initialize_worker.remote(
                local_rank,
                worker_env_vars,
                self.driver_env_vars,
                assigned_physical_gpu_ids=assigned_physical_gpu_ids,
            )
        )
    # Also set on the executor-side config for consistency. The mapping
    # is per-node, so only do this when all workers share one node.
    if len(node_physical_gpu_ids) == 1:
        node_id_0 = worker_node_and_physical_gpu_ids[0][0]
        self.vllm_config.parallel_config.assigned_physical_gpu_ids = sorted(
            node_physical_gpu_ids[node_id_0]
        )
    ray.get(init_worker_refs)

    # Step 8: Collect response MQ handles
    init_results = ray.get(
        [h.actor.wait_for_init.remote() for h in self.ray_worker_handles]
    )

    self.response_mqs: list[MessageQueue] = []
    for i, result in enumerate(init_results):
        if result["status"] != RayWorkerProc.READY_STR:
            raise RuntimeError(f"Worker {i} failed to initialize: {result}")
        self.response_mqs.append(
            MessageQueue.create_from_handle(result["handle"], 0)
        )

    # Step 9: Start run() before wait_until_ready() to avoid
    # deadlock — workers send subscriptions inside run().
    for handle in self.ray_worker_handles:
        handle.run()

    # Step 10: wait_until_ready() barrier
    self.rpc_broadcast_mq.wait_until_ready()
    for response_mq in self.response_mqs:
        response_mq.wait_until_ready()

    self.futures_queue = deque[FutureWrapper]()
    self._post_init_executor()

    self.start_worker_monitor()
    self.output_rank = self._get_output_rank()

_join_monitor_thread()

Wait for the monitor thread to exit.

Must be called before tearing down Ray resources — the monitor may be inside ray.wait() which would segfault if Ray is shut down underneath it. When the monitor itself calls shutdown() on worker death, we skip the join because the thread is about to return anyway.

Source code in vllm/v1/executor/ray_executor_v2.py
def _join_monitor_thread(self) -> None:
    """Wait for the monitor thread to exit.

    Must be called before tearing down Ray resources — the monitor
    may be inside ray.wait() which would segfault if Ray is shut
    down underneath it. When the monitor itself calls shutdown()
    on worker death, we skip the join because the thread is about
    to return anyway.
    """
    monitor = getattr(self, "_monitor_thread", None)
    if (
        monitor is not None
        and monitor.is_alive()
        and threading.current_thread() is not monitor
    ):
        monitor.join(timeout=10)

shutdown()

Properly shut down the executor and its workers.

Source code in vllm/v1/executor/ray_executor_v2.py
def shutdown(self) -> None:
    """Properly shut down the executor and its workers."""
    lock = getattr(self, "shutdown_lock", None)
    if lock is None:
        return

    with lock:
        if getattr(self, "shutting_down", False):
            return
        self.shutting_down = True

    self._join_monitor_thread()

    for handle in getattr(self, "ray_worker_handles", []):
        try:
            ray.kill(handle.actor)
            logger.debug("Killed actor rank=%d", handle.rank)
        except Exception:
            logger.exception("Failed to kill actor rank=%d", handle.rank)

    if rpc_broadcast_mq := getattr(self, "rpc_broadcast_mq", None):
        rpc_broadcast_mq.shutdown()
        self.rpc_broadcast_mq = None

    for mq in getattr(self, "response_mqs", []):
        mq.shutdown()
    self.response_mqs = []

start_worker_monitor(inline=False)

Monitor worker liveness via ray.wait() on run() ObjectRefs.

Source code in vllm/v1/executor/ray_executor_v2.py
def start_worker_monitor(self, inline=False) -> None:
    """Monitor worker liveness via ray.wait() on run() ObjectRefs."""
    run_refs = [h.run_ref for h in self.ray_worker_handles if h.run_ref is not None]
    if not run_refs:
        raise RuntimeError("Ray workers have not started successfully.")

    self_ref = weakref.ref(self)
    ref_to_rank = {
        h.run_ref: h.rank for h in self.ray_worker_handles if h.run_ref is not None
    }

    def _should_stop() -> bool:
        executor = self_ref()
        return not executor or executor.shutting_down

    def monitor_workers():
        # Poll with a timeout rather than blocking on ray.wait()
        # because a blocking call would segfault if Ray is torn down
        # while this thread is inside it.
        while not _should_stop() and ray.is_initialized():
            try:
                done, _ = ray.wait(run_refs, num_returns=1, timeout=5.0)
            except Exception:
                logger.exception(
                    "RayWorkerMonitor: unexpected error, exiting monitor thread"
                )
                return
            if not done or _should_stop():
                continue

            dead_ranks = [ref_to_rank[r] for r in done]
            executor = self_ref()
            if not executor:
                return
            executor.is_failed = True
            logger.error(
                "RayWorkerProc rank=%s died unexpectedly, shutting down executor.",
                dead_ranks,
            )
            executor.shutdown()
            if executor.failure_callback is not None:
                callback = executor.failure_callback
                executor.failure_callback = None
                callback()
            return

    t = threading.Thread(
        target=monitor_workers, daemon=True, name="RayWorkerMonitor"
    )
    t.start()
    self._monitor_thread = t

RayWorkerHandle dataclass

Handle for a Ray worker actor, compatible with MultiprocExecutor.

Methods:

  • run

    Start the worker's busy loop

Attributes:

  • actor (ActorHandle) –

    Ray worker actor

  • bundle_id_idx (int) –

    Placement group bundle index for the worker

  • local_rank (int) –

    Local rank of the worker

  • node_id (str) –

    Node ID of the worker

  • rank (int) –

    Rank of the worker

  • run_ref (ObjectRef | None) –

    run() ObjectRef used as a sentinel for health monitoring

Source code in vllm/v1/executor/ray_executor_v2.py
@dataclass
class RayWorkerHandle:
    """Handle for a Ray worker actor, compatible with MultiprocExecutor."""

    actor: ActorHandle
    """Ray worker actor"""

    rank: int
    """Rank of the worker"""

    local_rank: int
    """Local rank of the worker"""

    node_id: str
    """Node ID of the worker"""

    bundle_id_idx: int = -1
    """Placement group bundle index for the worker"""

    run_ref: ObjectRef | None = None
    """run() ObjectRef used as a sentinel for health monitoring"""

    def run(self):
        """Start the worker's busy loop"""
        self.run_ref = self.actor.run.remote()

actor instance-attribute

Ray worker actor

bundle_id_idx = -1 class-attribute instance-attribute

Placement group bundle index for the worker

local_rank instance-attribute

Local rank of the worker

node_id instance-attribute

Node ID of the worker

rank instance-attribute

Rank of the worker

run_ref = None class-attribute instance-attribute

run() ObjectRef used as a sentinel for health monitoring

run()

Start the worker's busy loop

Source code in vllm/v1/executor/ray_executor_v2.py
def run(self):
    """Start the worker's busy loop"""
    self.run_ref = self.actor.run.remote()

RayWorkerProc

Bases: WorkerProc

Worker process that runs inside a Ray actor.

Initialization is split into two phases: 1. init: lightweight setup, stores init args (no device/model init) 2. initialize_worker: called after GPU IDs are discovered, completes the full WorkerProc initialization with the correct local_rank and logical-to-physical GPU mapping.

GPU assignment flow:

  1. RayExecutorV2 enables RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES so Ray does not set CUDA_VISIBLE_DEVICES on RayWorkerProc actors at creation time.
  2. Each actor is scheduled with a placement group and bundle index; Ray resolves the physical GPU ID for that bundle at placement time.
  3. After placement, the executor discovers each worker's GPU ID and passes the node's logical-to-physical mapping (assigned_physical_gpu_ids) to initialize_worker(); CUDA_VISIBLE_DEVICES is never modified.

Scheduling must complete before the mapping is known when the placement group is externally managed: only then is the GPU tied to the worker's bundle resolved.

This sequence allows multiple vLLM instances to coexist on the same node: each instance is unaware which physical devices others hold, and the externally managed placement group avoids device assignment conflicts by binding workers to specific placement group bundles.

Methods:

Source code in vllm/v1/executor/ray_executor_v2.py
class RayWorkerProc(WorkerProc):
    """Worker process that runs inside a Ray actor.

    Initialization is split into two phases:
    1. __init__: lightweight setup, stores init args (no device/model init)
    2. initialize_worker: called after GPU IDs are discovered, completes
       the full WorkerProc initialization with the correct local_rank and
       logical-to-physical GPU mapping.

    GPU assignment flow:

    1. RayExecutorV2 enables RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES so Ray does
       not set CUDA_VISIBLE_DEVICES on RayWorkerProc actors at creation time.
    2. Each actor is scheduled with a placement group and bundle index; Ray resolves
       the physical GPU ID for that bundle at placement time.
    3. After placement, the executor discovers each worker's GPU ID and passes the
       node's logical-to-physical mapping (assigned_physical_gpu_ids) to
       initialize_worker(); CUDA_VISIBLE_DEVICES is never modified.

    Scheduling must complete before the mapping is known when the placement
    group is externally managed: only then is the GPU tied to the worker's
    bundle resolved.

    This sequence allows multiple vLLM instances to coexist on the same node:
    each instance is unaware which physical devices others hold, and the
    externally managed placement group avoids device assignment conflicts
    by binding workers to specific placement group bundles.
    """

    def __init__(
        self,
        vllm_config: VllmConfig,
        rank: int,
        distributed_init_method: str,
        input_shm_handle: Handle,
        is_driver_worker: bool,
        is_driver_node: bool = False,
    ):
        # Defer WorkerProc.__init__ until GPU IDs are known.
        self._is_driver_node = is_driver_node
        self._init_kwargs = dict(
            vllm_config=vllm_config,
            rank=rank,
            distributed_init_method=distributed_init_method,
            input_shm_handle=input_shm_handle,
            shared_worker_lock=None,
            is_driver_worker=is_driver_worker,
        )

    def get_node_and_physical_gpu_ids(self) -> tuple[str, list[int]]:
        """Return (node_id, physical_gpu_ids) assigned to this actor by Ray."""
        node_id = ray.get_runtime_context().get_node_id()
        device_key = current_platform.ray_device_key
        if not device_key:
            raise RuntimeError(
                f"current platform {current_platform.device_name} does not support ray."
            )
        physical_gpu_ids = ray.get_runtime_context().get_accelerator_ids()[device_key]
        return node_id, [
            current_platform.device_control_id_to_physical_device_id(str(x))
            for x in physical_gpu_ids
        ]

    def initialize_worker(
        self,
        local_rank: int,
        env_vars: dict[str, str],
        driver_env_vars: dict[str, str] | None = None,
        assigned_physical_gpu_ids: list[int] | None = None,
    ) -> None:
        """Complete initialization after GPU assignment is known.

        *driver_env_vars* are applied with ``setdefault`` — they fill
        in missing vars but never overwrite node-local values.
        *env_vars* always overwrite.
        *assigned_physical_gpu_ids* maps local_rank to physical CUDA device ID.
        """
        if driver_env_vars:
            for key, value in driver_env_vars.items():
                os.environ.setdefault(key, value)
        for key, value in env_vars.items():
            os.environ[key] = value

        if assigned_physical_gpu_ids is not None:
            vllm_config = self._init_kwargs["vllm_config"]
            assert isinstance(vllm_config, VllmConfig)
            vllm_config.parallel_config.assigned_physical_gpu_ids = (
                assigned_physical_gpu_ids
            )

        self.local_rank = local_rank
        super().__init__(
            local_rank=local_rank,
            **self._init_kwargs,
        )

    def _init_message_queues(
        self, input_shm_handle: Handle, vllm_config: VllmConfig
    ) -> None:
        """
        Workers on the same node as the executor use shared memory for
        both the broadcast (input) MQ and the response MQ. Workers on
        different nodes use TCP (n_local_reader=0).
        """
        self.rpc_broadcast_mq = MessageQueue.create_from_handle(
            input_shm_handle, self.worker.rank
        )

        n_local = 1 if self._is_driver_node else 0
        # Use ray.util.get_node_ip_address() to get Ray's internal IP.
        # get_ip() returns host's external IP which is typically not
        # routable between nodes within the cluster.
        self.worker_response_mq = MessageQueue(
            n_reader=1,
            n_local_reader=n_local,
            connect_ip=ray.util.get_node_ip_address(),
        )
        self.peer_response_handles: list[dict] = []

    def wait_for_init(self) -> dict:
        """Respond to the driver's wait_until_ready() barrier."""
        assert self.worker_response_mq is not None
        return {
            "status": self.READY_STR,
            "handle": self.worker_response_mq.export_handle(),
        }

    def run(self) -> None:
        """Main entry point called via actor.run.remote()."""
        try:
            assert self.rpc_broadcast_mq is not None
            self.rpc_broadcast_mq.wait_until_ready()
            assert self.worker_response_mq is not None
            self.worker_response_mq.wait_until_ready()

            self.worker_busy_loop()
        except Exception as e:
            logger.exception("RayWorkerProc failed: %s", e)
            raise
        finally:
            self.shutdown()

_init_message_queues(input_shm_handle, vllm_config)

Workers on the same node as the executor use shared memory for both the broadcast (input) MQ and the response MQ. Workers on different nodes use TCP (n_local_reader=0).

Source code in vllm/v1/executor/ray_executor_v2.py
def _init_message_queues(
    self, input_shm_handle: Handle, vllm_config: VllmConfig
) -> None:
    """
    Workers on the same node as the executor use shared memory for
    both the broadcast (input) MQ and the response MQ. Workers on
    different nodes use TCP (n_local_reader=0).
    """
    self.rpc_broadcast_mq = MessageQueue.create_from_handle(
        input_shm_handle, self.worker.rank
    )

    n_local = 1 if self._is_driver_node else 0
    # Use ray.util.get_node_ip_address() to get Ray's internal IP.
    # get_ip() returns host's external IP which is typically not
    # routable between nodes within the cluster.
    self.worker_response_mq = MessageQueue(
        n_reader=1,
        n_local_reader=n_local,
        connect_ip=ray.util.get_node_ip_address(),
    )
    self.peer_response_handles: list[dict] = []

get_node_and_physical_gpu_ids()

Return (node_id, physical_gpu_ids) assigned to this actor by Ray.

Source code in vllm/v1/executor/ray_executor_v2.py
def get_node_and_physical_gpu_ids(self) -> tuple[str, list[int]]:
    """Return (node_id, physical_gpu_ids) assigned to this actor by Ray."""
    node_id = ray.get_runtime_context().get_node_id()
    device_key = current_platform.ray_device_key
    if not device_key:
        raise RuntimeError(
            f"current platform {current_platform.device_name} does not support ray."
        )
    physical_gpu_ids = ray.get_runtime_context().get_accelerator_ids()[device_key]
    return node_id, [
        current_platform.device_control_id_to_physical_device_id(str(x))
        for x in physical_gpu_ids
    ]

initialize_worker(local_rank, env_vars, driver_env_vars=None, assigned_physical_gpu_ids=None)

Complete initialization after GPU assignment is known.

driver_env_vars are applied with setdefault — they fill in missing vars but never overwrite node-local values. env_vars always overwrite. assigned_physical_gpu_ids maps local_rank to physical CUDA device ID.

Source code in vllm/v1/executor/ray_executor_v2.py
def initialize_worker(
    self,
    local_rank: int,
    env_vars: dict[str, str],
    driver_env_vars: dict[str, str] | None = None,
    assigned_physical_gpu_ids: list[int] | None = None,
) -> None:
    """Complete initialization after GPU assignment is known.

    *driver_env_vars* are applied with ``setdefault`` — they fill
    in missing vars but never overwrite node-local values.
    *env_vars* always overwrite.
    *assigned_physical_gpu_ids* maps local_rank to physical CUDA device ID.
    """
    if driver_env_vars:
        for key, value in driver_env_vars.items():
            os.environ.setdefault(key, value)
    for key, value in env_vars.items():
        os.environ[key] = value

    if assigned_physical_gpu_ids is not None:
        vllm_config = self._init_kwargs["vllm_config"]
        assert isinstance(vllm_config, VllmConfig)
        vllm_config.parallel_config.assigned_physical_gpu_ids = (
            assigned_physical_gpu_ids
        )

    self.local_rank = local_rank
    super().__init__(
        local_rank=local_rank,
        **self._init_kwargs,
    )

run()

Main entry point called via actor.run.remote().

Source code in vllm/v1/executor/ray_executor_v2.py
def run(self) -> None:
    """Main entry point called via actor.run.remote()."""
    try:
        assert self.rpc_broadcast_mq is not None
        self.rpc_broadcast_mq.wait_until_ready()
        assert self.worker_response_mq is not None
        self.worker_response_mq.wait_until_ready()

        self.worker_busy_loop()
    except Exception as e:
        logger.exception("RayWorkerProc failed: %s", e)
        raise
    finally:
        self.shutdown()

wait_for_init()

Respond to the driver's wait_until_ready() barrier.

Source code in vllm/v1/executor/ray_executor_v2.py
def wait_for_init(self) -> dict:
    """Respond to the driver's wait_until_ready() barrier."""
    assert self.worker_response_mq is not None
    return {
        "status": self.READY_STR,
        "handle": self.worker_response_mq.export_handle(),
    }