Skip to content

vllm_omni.distributed.omni_connectors.adapter

logger module-attribute

logger = get_connector_logger(__name__)

compute_talker_prompt_ids_length

compute_talker_prompt_ids_length(
    prompt_ids: list[int],
) -> int

Compute the length of the talker prompt ids.

Parameters:

Name Type Description Default
prompt_ids list[int]

The prompt ids tensor.

required

Returns:

Type Description
int

The length of the talker prompt ids.

construct_next_stage_streaming_input_prompt

construct_next_stage_streaming_input_prompt(
    payload_data: dict[str, Any], request: Any
) -> None

Update a downstream streaming request prompt from connector payload ids.

Async-chunk downstream stages are prewarmed before the real Talker prompt is known. When a Thinker payload carries ids.prompt, this helper:

  • Preserves num_computed_tokens (the scheduler token watermark).
  • Moves already-computed output tokens into prompt_token_ids.
  • Appends a new placeholder prompt slice sized from the upstream ids.
  • Refreshes block hashes so the scheduler allocates KV slots for the extended prompt without discarding prior computed state.

try_recv_via_connector

try_recv_via_connector(
    task: dict[str, Any],
    connectors: dict[Any, Any],
    stage_id: int,
) -> tuple[Any, dict[str, Any] | None]

Attempts to resolve input data from either connector or IPC. Returns (engine_inputs, rx_metrics) or (None, None) if failed/skipped.

try_send_via_connector

try_send_via_connector(
    connector: Any,
    stage_id: int,
    next_stage_id: int,
    req_id: str,
    next_inputs: Any,
    sampling_params: Any,
    original_prompt: Any,
    next_stage_queue_submit_fn: Callable[
        [dict[str, Any]], None
    ],
    metrics: OrchestratorAggregator,
) -> bool

Attempts to send data via OmniConnector. Returns True if successful, False otherwise. Encapsulates the logic of preparing payload, sending via connector, sending notification, and recording metrics.