Skip to content

vllm_omni.engine.membership_controller

MembershipController: distributed replica lifecycle management.

Extracted from Orchestrator to keep request-flow code free of distributed concerns. Owns the OmniCoordClientForHub, watches for replica disappearances, and handles register/unregister by building head-side clients via an injected factory and mutating StagePool membership.

RemoteReplicaFactory module-attribute

RemoteReplicaFactory = Callable[[int, int], Any]

logger module-attribute

logger = init_logger(__name__)

MembershipController

Manages dynamic replica attach/detach for distributed mode.

Constructed by DistStageRuntime and passed to Orchestrator. The Orchestrator delegates register/unregister messages here.

WATCH_INTERVAL_S class-attribute instance-attribute

WATCH_INTERVAL_S: float = 0.5

drain_tasks async

drain_tasks(timeout: float = 10.0) -> None

Wait for in-flight membership tasks to complete.

handle_register async

handle_register(stage_id: int, replica_id: int) -> None

Handle a register_remote_replica message (fire-and-forget).

handle_unregister async

handle_unregister(
    stage_id: int,
    input_addr: str,
    output_queue: Queue[EngineQueueMessage] | None = None,
    cleanup_callback: Callable[[list[str]], Awaitable[None]]
    | None = None,
) -> None

Handle an unregister_remote_replica message.

install_unregister_handlers

install_unregister_handlers(
    *,
    output_queue: Queue[EngineQueueMessage],
    cleanup_callback: Callable[
        [list[str]], Awaitable[None]
    ],
) -> None

Install shared cleanup sinks for watcher-driven unregister events.

shutdown

shutdown() -> None

Signal stop and close the hub.

start

start() -> Task[None]

Start the replica watcher as a background task. Returns the task.