Skip to content

vllm_omni.distributed.ray_utils.utils

RAY_AVAILABLE module-attribute

RAY_AVAILABLE = True

logger module-attribute

logger = getLogger(__name__)

calculate_total_bytes

calculate_total_bytes(size_args, dtype)

Calculate total bytes for a tensor allocation, handling nested tuples in size args.

create_placement_group

create_placement_group(
    number_of_stages: int,
    address: str | None = None,
    strategy: str = "PACK",
) -> PlacementGroup

Create a placement group for the given number of stages. Args: number_of_stages: The number of stages to create the placement group for. strategy: The strategy to use for the placement group. Returns: The placement group.

get_ray_queue_class

get_ray_queue_class()

get_ray_task_error

get_ray_task_error(
    task_ref: Any, **kwargs
) -> Exception | None

Gets ray task. Returns RayTaskError if ray instance exited with any error, else None.

initialize_ray_cluster

initialize_ray_cluster(address: str | None = None)

is_ray_initialized

is_ray_initialized()

Check if Ray is initialized without hard dependency on Ray.

is_ray_task_alive

is_ray_task_alive(task_ref: Any, **kwargs)

Checks ray task status. Returns FALSE if ray task has exited for any reason.

kill_ray_actor

kill_ray_actor(actor)

maybe_disable_pin_memory_for_ray

maybe_disable_pin_memory_for_ray(
    obj, size_bytes, threshold=32 * 1024 * 1024
)

Context manager to temporarily disable pin_memory if running in Ray and the allocation size exceeds the threshold.

This is a workaround for Ray workers often having low ulimit -l (locked memory), causing OS call failed errors when allocating large pinned buffers.

remove_placement_group

remove_placement_group(pg)

start_ray_actor

start_ray_actor(
    worker_entry_fn,
    placement_group,
    placement_group_bundle_index: int,
    *args,
    **kwargs,
)

try_close_ray

try_close_ray(pg=None)

Try to clean up Ray resources including placement group and shutdown.