Skip to content

vllm_omni.distributed.omni_coordinator

Modules:

Name Description
load_balancer
messages
omni_coord_client_for_hub
omni_coord_client_for_stage
omni_coordinator
runtime

Lifecycle wrapper around :class:OmniCoordinator.

LeastQueueLengthBalancer

Bases: LoadBalancer

Select the replica with the smallest queue_length.

If multiple replicas share the same minimum queue length, one of them is chosen uniformly at random.

Raises:

Type Description
ValueError

If any replica has a negative queue_length.

select

select(task: Task, replicas: list[ReplicaInfo]) -> int

LoadBalancer

Bases: ABC

Abstract base class for load balancers.

Subclasses implement :meth:select to choose a replica for a given task.

select abstractmethod

select(task: Task, replicas: list[ReplicaInfo]) -> int

Route a task to one of the available replicas.

Parameters:

Name Type Description Default
task Task

The task to route. Not used by the random policy but reserved for future strategies that may inspect task metadata.

required
replicas list[ReplicaInfo]

List of available replicas to choose from.

required

Returns:

Type Description
int

Index of the selected replica in replicas.

Raises:

Type Description
ValueError

If replicas is empty.

LoadBalancingPolicy

Bases: str, Enum

Enumeration for load balancing policies.

These policies are used by :class:LoadBalancer implementations to route tasks to a subset of available replicas.

LEAST_QUEUE_LENGTH class-attribute instance-attribute

LEAST_QUEUE_LENGTH = 'least-queue-length'

RANDOM class-attribute instance-attribute

RANDOM = 'random'

ROUND_ROBIN class-attribute instance-attribute

ROUND_ROBIN = 'round-robin'

OmniCoordClientForHub

Client for AsyncOmni side to receive replica list updates.

This client maintains a SUB socket connected to OmniCoordinator's PUB endpoint and caches the latest :class:ReplicaList in memory for use by the load balancer and routing logic.

close

close() -> None

Close the SUB socket and stop the background thread.

get_replica_list

get_replica_list() -> ReplicaList

Return the latest cached :class:ReplicaList.

If no update has been received yet, returns an empty list with timestamp=0.0.

get_replicas_for_stage

get_replicas_for_stage(stage_id: int) -> ReplicaList

Return replicas filtered by stage_id.

OmniCoordClientForStage

Client used by stage replicas to send events to OmniCoordinator.

This client maintains a DEALER socket connected to OmniCoordinator's ROUTER endpoint and sends JSON-encoded events describing replica status.

close

close() -> None

Send a final down event and close the underlying socket.

update_info

update_info(
    status: ReplicaStatus | None = None,
    queue_length: int | None = None,
) -> None

Update replica information and notify OmniCoordinator.

At least one of status or queue_length must be provided.

OmniCoordinator

Coordinator for stage replicas and hub clients.

This service receives replica events from :class:OmniCoordClientForStage via a ZMQ ROUTER socket and publishes active replica lists to :class:OmniCoordClientForHub via a PUB socket.

The coordinator maintains an in-memory registry of all known replicas, including their status, queue length, and heartbeat timestamps. A background thread periodically checks for heartbeat timeouts and marks unhealthy replicas as ReplicaStatus.ERROR.

pub_zmq_addr instance-attribute

pub_zmq_addr = getsockopt_string(LAST_ENDPOINT)

router_zmq_addr instance-attribute

router_zmq_addr = getsockopt_string(LAST_ENDPOINT)

add_new_replica

add_new_replica(event: ReplicaEvent) -> None

Add a new replica based on an incoming event.

close

close() -> None

Shut down background threads and close all ZMQ sockets.

get_active_replicas

get_active_replicas() -> ReplicaList

Return a :class:ReplicaList of active (UP) replicas only.

publish_replica_list_update

publish_replica_list_update() -> bool

Publish the current active replica list to all subscribers.

Returns:

Type Description
bool

True if the PUB send succeeded, False if it was dropped (e.g.

bool

socket not ready when using zmq.NOBLOCK).

remove_replica

remove_replica(event: ReplicaEvent) -> None

Mark a replica as removed / down based on an incoming event.

This marks the replica's status as DOWN or ERROR (depending on the event) but keeps it in the internal registry. It is removed from the active replica list published to hubs.

update_replica_info

update_replica_info(event: ReplicaEvent) -> None

Update an existing replica based on an incoming event.

OmniCoordinatorRuntime

Own one :class:OmniCoordinator and the two ports it binds.

Constructor binds; :meth:close tears down. The class deliberately does not expose the coordinator instance — callers should consume the coordinator only via its wire protocol through :class:OmniCoordClientForStage and :class:OmniCoordClientForHub.

pub_address instance-attribute

pub_address: str = f'tcp://{host}:{pub_port}'

router_address instance-attribute

router_address: str = f'tcp://{host}:{router_port}'

close

close() -> None

Tear down the underlying coordinator. Idempotent.

RandomBalancer

Bases: LoadBalancer

Load balancer that selects a replica uniformly at random.

select

select(task: Task, replicas: list[ReplicaInfo]) -> int

ReplicaEvent dataclass

Wire payload from OmniCoordClientForStage to OmniCoordinator.

Schema for Stage → Coordinator events over ZMQ: input_addr, output_addr, stage_id, status, queue_length, event_type.

event_type instance-attribute

event_type: str

input_addr instance-attribute

input_addr: str

output_addr instance-attribute

output_addr: str

queue_length instance-attribute

queue_length: int

stage_id instance-attribute

stage_id: int

status instance-attribute

status: ReplicaStatus

ReplicaInfo dataclass

Metadata for a single stage replica.

This type is stored in OmniCoordinator's internal registry and is also published to hubs via :class:ReplicaList.

input_addr instance-attribute

input_addr: str

last_heartbeat instance-attribute

last_heartbeat: float

output_addr instance-attribute

output_addr: str

queue_length instance-attribute

queue_length: int

registered_at instance-attribute

registered_at: float

stage_id instance-attribute

stage_id: int

status instance-attribute

status: ReplicaStatus

ReplicaList dataclass

Container for replica list updates.

OmniCoordinator publishes a :class:ReplicaList whenever its view of active replicas changes. OmniCoordClientForHub caches the latest value and exposes it to AsyncOmni and the load balancer.

replicas instance-attribute

replicas: list[ReplicaInfo]

timestamp instance-attribute

timestamp: float

ReplicaStatus

Bases: str, Enum

Enumeration for stage replica status.

DOWN class-attribute instance-attribute

DOWN = 'down'

ERROR class-attribute instance-attribute

ERROR = 'error'

UP class-attribute instance-attribute

UP = 'up'

RoundRobinBalancer

Bases: LoadBalancer

Load balancer that selects replicas in a round-robin fashion.

This implementation keeps a running index modulo len(replicas). It therefore depends on the order and stable meaning of the replicas list between calls. If the list length or ordering changes, the sequence of picks may skip or repeat entries relative to a fixed set of backends.

Concurrency: a threading.Lock serializes updates to _next_index for callers that invoke select from multiple threads or alongside threaded infrastructure (e.g. ZMQ receive threads).

select

select(task: Task, replicas: list[ReplicaInfo]) -> int

Task

Bases: TypedDict

Task structure passed to StagePool.pick / LoadBalancer.select.

Mirrors the dict built around a stage submission with request_id and any payload-related fields a future load-balancing policy might inspect.

engine_inputs instance-attribute

engine_inputs: Any

request_id instance-attribute

request_id: str

sampling_params instance-attribute

sampling_params: Any

build_load_balancer_factory

build_load_balancer_factory(
    policy: str,
) -> Callable[[], LoadBalancer]

Translate --omni-lb-policy (string) into a per-pool LB factory.