vllm_omni.distributed.omni_coordinator ¶
Modules:
| Name | Description |
|---|---|
load_balancer | |
messages | |
omni_coord_client_for_hub | |
omni_coord_client_for_stage | |
omni_coordinator | |
runtime | Lifecycle wrapper around :class: |
LeastQueueLengthBalancer ¶
Bases: LoadBalancer
Select the replica with the smallest queue_length.
If multiple replicas share the same minimum queue length, one of them is chosen uniformly at random.
Raises:
| Type | Description |
|---|---|
ValueError | If any replica has a negative |
LoadBalancer ¶
Bases: ABC
Abstract base class for load balancers.
Subclasses implement :meth:select to choose a replica for a given task.
select abstractmethod ¶
select(task: Task, replicas: list[ReplicaInfo]) -> int
Route a task to one of the available replicas.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task | Task | The task to route. Not used by the random policy but reserved for future strategies that may inspect task metadata. | required |
replicas | list[ReplicaInfo] | List of available replicas to choose from. | required |
Returns:
| Type | Description |
|---|---|
int | Index of the selected replica in |
Raises:
| Type | Description |
|---|---|
ValueError | If |
LoadBalancingPolicy ¶
Enumeration for load balancing policies.
These policies are used by :class:LoadBalancer implementations to route tasks to a subset of available replicas.
OmniCoordClientForHub ¶
Client for AsyncOmni side to receive replica list updates.
This client maintains a SUB socket connected to OmniCoordinator's PUB endpoint and caches the latest :class:ReplicaList in memory for use by the load balancer and routing logic.
get_replica_list ¶
get_replica_list() -> ReplicaList
Return the latest cached :class:ReplicaList.
If no update has been received yet, returns an empty list with timestamp=0.0.
get_replicas_for_stage ¶
get_replicas_for_stage(stage_id: int) -> ReplicaList
Return replicas filtered by stage_id.
OmniCoordClientForStage ¶
Client used by stage replicas to send events to OmniCoordinator.
This client maintains a DEALER socket connected to OmniCoordinator's ROUTER endpoint and sends JSON-encoded events describing replica status.
update_info ¶
update_info(
status: ReplicaStatus | None = None,
queue_length: int | None = None,
) -> None
Update replica information and notify OmniCoordinator.
At least one of status or queue_length must be provided.
OmniCoordinator ¶
Coordinator for stage replicas and hub clients.
This service receives replica events from :class:OmniCoordClientForStage via a ZMQ ROUTER socket and publishes active replica lists to :class:OmniCoordClientForHub via a PUB socket.
The coordinator maintains an in-memory registry of all known replicas, including their status, queue length, and heartbeat timestamps. A background thread periodically checks for heartbeat timeouts and marks unhealthy replicas as ReplicaStatus.ERROR.
add_new_replica ¶
add_new_replica(event: ReplicaEvent) -> None
Add a new replica based on an incoming event.
get_active_replicas ¶
get_active_replicas() -> ReplicaList
Return a :class:ReplicaList of active (UP) replicas only.
remove_replica ¶
remove_replica(event: ReplicaEvent) -> None
Mark a replica as removed / down based on an incoming event.
This marks the replica's status as DOWN or ERROR (depending on the event) but keeps it in the internal registry. It is removed from the active replica list published to hubs.
update_replica_info ¶
update_replica_info(event: ReplicaEvent) -> None
Update an existing replica based on an incoming event.
OmniCoordinatorRuntime ¶
Own one :class:OmniCoordinator and the two ports it binds.
Constructor binds; :meth:close tears down. The class deliberately does not expose the coordinator instance — callers should consume the coordinator only via its wire protocol through :class:OmniCoordClientForStage and :class:OmniCoordClientForHub.
RandomBalancer ¶
ReplicaEvent dataclass ¶
Wire payload from OmniCoordClientForStage to OmniCoordinator.
Schema for Stage → Coordinator events over ZMQ: input_addr, output_addr, stage_id, status, queue_length, event_type.
ReplicaInfo dataclass ¶
Metadata for a single stage replica.
This type is stored in OmniCoordinator's internal registry and is also published to hubs via :class:ReplicaList.
ReplicaList dataclass ¶
Container for replica list updates.
OmniCoordinator publishes a :class:ReplicaList whenever its view of active replicas changes. OmniCoordClientForHub caches the latest value and exposes it to AsyncOmni and the load balancer.
ReplicaStatus ¶
RoundRobinBalancer ¶
Bases: LoadBalancer
Load balancer that selects replicas in a round-robin fashion.
This implementation keeps a running index modulo len(replicas). It therefore depends on the order and stable meaning of the replicas list between calls. If the list length or ordering changes, the sequence of picks may skip or repeat entries relative to a fixed set of backends.
Concurrency: a threading.Lock serializes updates to _next_index for callers that invoke select from multiple threads or alongside threaded infrastructure (e.g. ZMQ receive threads).
Task ¶
Bases: TypedDict
Task structure passed to StagePool.pick / LoadBalancer.select.
Mirrors the dict built around a stage submission with request_id and any payload-related fields a future load-balancing policy might inspect.
build_load_balancer_factory ¶
build_load_balancer_factory(
policy: str,
) -> Callable[[], LoadBalancer]
Translate --omni-lb-policy (string) into a per-pool LB factory.