vllm_omni.distributed.omni_connectors.adapter ¶
compute_talker_prompt_ids_length ¶
construct_next_stage_streaming_input_prompt ¶
Update a downstream streaming request prompt from connector payload ids.
Async-chunk downstream stages are prewarmed before the real Talker prompt is known. When a Thinker payload carries ids.prompt, this helper:
- Preserves
num_computed_tokens(the scheduler token watermark). - Moves already-computed output tokens into
prompt_token_ids. - Appends a new placeholder prompt slice sized from the upstream ids.
- Refreshes block hashes so the scheduler allocates KV slots for the extended prompt without discarding prior computed state.
try_recv_via_connector ¶
try_recv_via_connector(
task: dict[str, Any],
connectors: dict[Any, Any],
stage_id: int,
) -> tuple[Any, dict[str, Any] | None]
Attempts to resolve input data from either connector or IPC. Returns (engine_inputs, rx_metrics) or (None, None) if failed/skipped.
try_send_via_connector ¶
try_send_via_connector(
connector: Any,
stage_id: int,
next_stage_id: int,
req_id: str,
next_inputs: Any,
sampling_params: Any,
original_prompt: Any,
next_stage_queue_submit_fn: Callable[
[dict[str, Any]], None
],
metrics: OrchestratorAggregator,
) -> bool
Attempts to send data via OmniConnector. Returns True if successful, False otherwise. Encapsulates the logic of preparing payload, sending via connector, sending notification, and recording metrics.