Skip to content

vllm_omni.distributed.omni_connectors.kv_transfer_manager

Unified OmniConnector and KV cache transfer management.

LayerKV module-attribute

LayerKV = Tensor | tuple[Tensor, Tensor]

logger module-attribute

logger = init_logger(__name__)

KVCacheTransferData dataclass

Container for KV cache transfer data.

block_ids instance-attribute

block_ids: list[int]

layer_blocks instance-attribute

layer_blocks: dict[str, Any]

metadata instance-attribute

metadata: dict[str, Any]

request_id instance-attribute

request_id: str

from_bytes staticmethod

from_bytes(
    raw: bytes | bytearray | memoryview,
) -> dict[str, Any]

Reconstruct KV cache data from the packed bytes format.

from_bytes_device staticmethod

from_bytes_device(tensor: Tensor) -> dict[str, Any]

Reconstruct KV cache data from a packed device tensor.

from_bytes_gpu staticmethod

from_bytes_gpu(tensor: Tensor) -> dict[str, Any]

Compatibility alias for callers using the old GPU-specific name.

to_bytes

to_bytes() -> bytes

Convert to compact binary format for fast transfer.

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

to_gpu_tensor

to_gpu_tensor() -> Tensor

Convert to a packed device tensor for raw-data connectors.

OmniKVCacheConfig dataclass

Configuration for OmniKVTransferManager.

connector_config class-attribute instance-attribute

connector_config: dict[str, Any] | None = None

engine_input_source class-attribute instance-attribute

engine_input_source: list[str | int] | None = None

from_stage class-attribute instance-attribute

from_stage: str | None = None

from_tp class-attribute instance-attribute

from_tp: int = 1

need_recv_cache class-attribute instance-attribute

need_recv_cache: bool = False

need_send_cache class-attribute instance-attribute

need_send_cache: bool = False

recv_timeout class-attribute instance-attribute

recv_timeout: float = 30.0

stage_id class-attribute instance-attribute

stage_id: str | int | None = None

to_stage class-attribute instance-attribute

to_stage: str | None = None

to_tp class-attribute instance-attribute

to_tp: int = 1

OmniKVTransferManager

Unified management for OmniConnector and KV cache transfer.

This class encapsulates all KV cache related operations: - Connector initialization and lazy creation - KV cache extraction from GPU blocks - KV cache transfer with retry logic - KV cache receiving with timeout

config instance-attribute

config = config

connector property

connector

Lazy initialization of connector.

from_model_config class-attribute instance-attribute

from_model_config = from_od_config

get_connector class-attribute instance-attribute

get_connector = property(lambda self: connector)

kv_payload_merger instance-attribute

kv_payload_merger: Callable | None = None

kv_payload_slicer instance-attribute

kv_payload_slicer: Callable | None = None

kv_recv_key_builder instance-attribute

kv_recv_key_builder: Callable | None = None

kv_send_key_builder instance-attribute

kv_send_key_builder: Callable | None = None

recv_stages instance-attribute

recv_stages = (
    (str(recv_from), str(stage_id))
    if recv_from is not None and stage_id is not None
    else (None, None)
)

send_stages instance-attribute

send_stages = (
    (str(from_stage), str(to_stage))
    if from_stage and to_stage
    else (None, None)
)

apply_kv_cache_to_request

apply_kv_cache_to_request(
    req: Any, data: dict[str, Any]
) -> None

Apply received KV cache data to a request object.

Parameters:

Name Type Description Default
req Any

The request object to apply KV cache to

required
data dict[str, Any]

The received KV cache data dictionary

required

from_od_config classmethod

from_od_config(config: Any) -> OmniKVTransferManager

Create from model or OmniDiffusion config.

from_vllm_config classmethod

from_vllm_config(
    vllm_config: Any, model_config: Any
) -> OmniKVTransferManager

Create from vllm config with fallback to kv_transfer_config.

handle_finished_requests_kv_transfer

handle_finished_requests_kv_transfer(
    finished_reqs: dict[str, dict[str, Any]],
    kv_caches: list[LayerKV],
    block_size: int,
    cache_dtype: str,
    request_id_resolver: Callable[[str], str] | None = None,
) -> list[str]

Handle KV cache transfer for finished requests.

This method extracts KV cache from GPU blocks and transfers them to the downstream stage via the connector.

Parameters:

Name Type Description Default
finished_reqs dict[str, dict[str, Any]]

Dict mapping request_id to {block_ids, seq_len}

required
kv_caches list[LayerKV]

List of KV cache (tensor or tuple) per layer

required
block_size int

Size of each cache block

required
cache_dtype str

Data type of the cache

required
request_id_resolver Callable[[str], str] | None

Optional function to resolve global request ID

None

Returns:

Type Description
list[str]

List of request IDs that were processed

receive_kv_cache

receive_kv_cache(
    req: Any, target_device: device | None = None
) -> bool

Receive KV cache and populate request object (legacy interface).

Parameters:

Name Type Description Default
req Any

Request object with request_id attribute

required
target_device device | None

Optional device to move tensors to

None

Returns:

Type Description
bool

True if successful, False otherwise

receive_kv_cache_for_request

receive_kv_cache_for_request(
    request_id: str, target_device: device | None = None
) -> tuple[dict[str, Any] | None, int]

Receive KV cache for a specific request.

This implements the receiving logic from gpu_diffusion_model_runner.py.

Parameters:

Name Type Description Default
request_id str

The request ID to receive KV cache for

required
target_device device | None

Optional device to move tensors to

None

Returns:

Type Description
tuple[dict[str, Any] | None, int]

Tuple of (data dict, size) if successful, (None, 0) otherwise

receive_multi_kv_cache

receive_multi_kv_cache(
    req: Any,
    cfg_kv_collect_func: Callable | None = None,
    target_device: device | None = None,
) -> bool

Receive primary KV cache and optional CFG companion KV caches.

First receives the primary KV cache (existing logic). Then, if the request carries cfg_kv_request_ids and a model-specific cfg_kv_collect_func is provided, calls it to fetch and attach the companion KV caches to sampling_params.

Parameters:

Name Type Description Default
req Any

Request object with request_id and sampling_params.

required
cfg_kv_collect_func Callable | None

Model-specific function for collecting CFG KV caches. Signature: (request_id, cfg_request_ids, kv_transfer_manager, target_device) -> dict[str, Any]

None
target_device device | None

Device to move tensors to.

None

Returns:

Type Description
bool

True if primary KV cache was received successfully.

receive_multi_kv_cache_distributed

receive_multi_kv_cache_distributed(
    req: Any,
    cfg_kv_collect_func: Callable | None = None,
    target_device: device | None = None,
) -> bool

Distributed wrapper around :meth:receive_multi_kv_cache.

TP-aware path selection: - world size 1: direct receive - TP active, cfg size 1: each rank independently receives - TP active, cfg size > 1: cfg-rank 0 receives, then broadcasts to peers that share the same TP rank - TP inactive: legacy rank-0 receive then world broadcast

update_sender_info

update_sender_info(
    sender_info: dict[str, Any],
    sender_stage_id: str | int | None = None,
) -> None

Update receiver-side sender info before loading remote KV cache.

The orchestrator always reports rank-0's ZMQ port. When TP > 1 the receiver must offset the port so that each TP rank connects to the corresponding sender rank's port.

The base host/port are also stored so that the receive path can construct per-rank metadata for heterogeneous TP scenarios.