vllm_omni.distributed.omni_connectors.kv_transfer_manager ¶
Unified OmniConnector and KV cache transfer management.
KVCacheTransferData dataclass ¶
Container for KV cache transfer data.
from_bytes staticmethod ¶
Reconstruct KV cache data from the packed bytes format.
from_bytes_device staticmethod ¶
Reconstruct KV cache data from a packed device tensor.
from_bytes_gpu staticmethod ¶
Compatibility alias for callers using the old GPU-specific name.
to_gpu_tensor ¶
Convert to a packed device tensor for raw-data connectors.
OmniKVCacheConfig dataclass ¶
OmniKVTransferManager ¶
Unified management for OmniConnector and KV cache transfer.
This class encapsulates all KV cache related operations: - Connector initialization and lazy creation - KV cache extraction from GPU blocks - KV cache transfer with retry logic - KV cache receiving with timeout
recv_stages instance-attribute ¶
recv_stages = (
(str(recv_from), str(stage_id))
if recv_from is not None and stage_id is not None
else (None, None)
)
send_stages instance-attribute ¶
apply_kv_cache_to_request ¶
from_od_config classmethod ¶
from_od_config(config: Any) -> OmniKVTransferManager
Create from model or OmniDiffusion config.
from_vllm_config classmethod ¶
from_vllm_config(
vllm_config: Any, model_config: Any
) -> OmniKVTransferManager
Create from vllm config with fallback to kv_transfer_config.
handle_finished_requests_kv_transfer ¶
handle_finished_requests_kv_transfer(
finished_reqs: dict[str, dict[str, Any]],
kv_caches: list[LayerKV],
block_size: int,
cache_dtype: str,
request_id_resolver: Callable[[str], str] | None = None,
) -> list[str]
Handle KV cache transfer for finished requests.
This method extracts KV cache from GPU blocks and transfers them to the downstream stage via the connector.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
finished_reqs | dict[str, dict[str, Any]] | Dict mapping request_id to {block_ids, seq_len} | required |
kv_caches | list[LayerKV] | List of KV cache (tensor or tuple) per layer | required |
block_size | int | Size of each cache block | required |
cache_dtype | str | Data type of the cache | required |
request_id_resolver | Callable[[str], str] | None | Optional function to resolve global request ID | None |
Returns:
| Type | Description |
|---|---|
list[str] | List of request IDs that were processed |
receive_kv_cache ¶
receive_kv_cache_for_request ¶
receive_kv_cache_for_request(
request_id: str, target_device: device | None = None
) -> tuple[dict[str, Any] | None, int]
Receive KV cache for a specific request.
This implements the receiving logic from gpu_diffusion_model_runner.py.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request_id | str | The request ID to receive KV cache for | required |
target_device | device | None | Optional device to move tensors to | None |
Returns:
| Type | Description |
|---|---|
tuple[dict[str, Any] | None, int] | Tuple of (data dict, size) if successful, (None, 0) otherwise |
receive_multi_kv_cache ¶
receive_multi_kv_cache(
req: Any,
cfg_kv_collect_func: Callable | None = None,
target_device: device | None = None,
) -> bool
Receive primary KV cache and optional CFG companion KV caches.
First receives the primary KV cache (existing logic). Then, if the request carries cfg_kv_request_ids and a model-specific cfg_kv_collect_func is provided, calls it to fetch and attach the companion KV caches to sampling_params.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
req | Any | Request object with request_id and sampling_params. | required |
cfg_kv_collect_func | Callable | None | Model-specific function for collecting CFG KV caches. Signature: (request_id, cfg_request_ids, kv_transfer_manager, target_device) -> dict[str, Any] | None |
target_device | device | None | Device to move tensors to. | None |
Returns:
| Type | Description |
|---|---|
bool | True if primary KV cache was received successfully. |
receive_multi_kv_cache_distributed ¶
receive_multi_kv_cache_distributed(
req: Any,
cfg_kv_collect_func: Callable | None = None,
target_device: device | None = None,
) -> bool
Distributed wrapper around :meth:receive_multi_kv_cache.
TP-aware path selection: - world size 1: direct receive - TP active, cfg size 1: each rank independently receives - TP active, cfg size > 1: cfg-rank 0 receives, then broadcasts to peers that share the same TP rank - TP inactive: legacy rank-0 receive then world broadcast
update_sender_info ¶
update_sender_info(
sender_info: dict[str, Any],
sender_stage_id: str | int | None = None,
) -> None
Update receiver-side sender info before loading remote KV cache.
The orchestrator always reports rank-0's ZMQ port. When TP > 1 the receiver must offset the port so that each TP rank connects to the corresponding sender rank's port.
The base host/port are also stored so that the receive path can construct per-rank metadata for heterogeneous TP scenarios.