Skip to content

vllm.v1.core.kv_cache_coordinator

Classes:

HybridKVCacheCoordinator

Bases: KVCacheCoordinator

KV cache coordinator for hybrid models with multiple KV cache types, and thus multiple kv cache groups.

Methods:

Source code in vllm/v1/core/kv_cache_coordinator.py
class HybridKVCacheCoordinator(KVCacheCoordinator):
    """
    KV cache coordinator for hybrid models with multiple KV cache types, and
    thus multiple kv cache groups.
    """

    def __init__(
        self,
        kv_cache_config: KVCacheConfig,
        max_model_len: int,
        max_num_batched_tokens: int,
        use_eagle: bool,
        enable_caching: bool,
        enable_kv_cache_events: bool,
        dcp_world_size: int,
        pcp_world_size: int,
        scheduler_block_size: int,
        hash_block_size: int,
        metrics_collector: KVCacheMetricsCollector | None = None,
    ):
        super().__init__(
            kv_cache_config,
            max_model_len,
            max_num_batched_tokens,
            use_eagle,
            enable_caching,
            enable_kv_cache_events,
            dcp_world_size=dcp_world_size,
            pcp_world_size=pcp_world_size,
            scheduler_block_size=scheduler_block_size,
            hash_block_size=hash_block_size,
            metrics_collector=metrics_collector,
        )
        # hash_block_size: the block size used to compute block hashes.
        # The actual block size usually equals hash_block_size, but in cases where
        # different KV cache groups have different block sizes, the actual block size
        # can be a multiple of hash_block_size.
        self.hash_block_size = hash_block_size
        assert all(
            g.kv_cache_spec.block_size % hash_block_size == 0
            for g in kv_cache_config.kv_cache_groups
        ), "block_size must be divisible by hash_block_size"
        assert dcp_world_size == 1, "DCP not support hybrid attn now."
        assert pcp_world_size == 1, "PCP not support hybrid attn now."
        self.verify_and_split_kv_cache_groups()

    def verify_and_split_kv_cache_groups(self) -> None:
        """
        Groups KV cache groups by their spec type for efficient batch processing
        during cache hit lookup.
        """
        self.attention_groups: list[SpecGroup] = []
        for i, g in enumerate(self.kv_cache_config.kv_cache_groups):
            manager_cls = self.single_type_managers[i].__class__
            spec = g.kv_cache_spec
            use_eagle = i in self.eagle_group_ids

            # Try to find an existing group with the same spec
            for idx, group in enumerate(self.attention_groups):
                if group.spec == spec:
                    assert manager_cls is group.manager_cls, (
                        "Expected same manager class for identical KV cache specs."
                    )
                    group.group_ids.append(i)
                    if use_eagle and not group.use_eagle:
                        self.attention_groups[idx] = group._replace(use_eagle=True)
                    break
            else:
                self.attention_groups.append(
                    SpecGroup(spec, [i], manager_cls, use_eagle)
                )

        assert len(self.attention_groups) > 1, (
            "HybridKVCacheCoordinator requires at least two attention groups."
        )

        # Put full attention first: its efficient left-to-right scan provides
        # a tighter initial bound, reducing work for subsequent groups.
        self.attention_groups.sort(
            key=lambda g: not isinstance(g.spec, FullAttentionSpec)
        )

        # Propagate the eagle bit to each manager (default to ``use_eagle=False``).
        for group in self.attention_groups:
            if group.use_eagle:
                for gid in group.group_ids:
                    self.single_type_managers[gid].use_eagle = True

    def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
        # Cache hits in this coordinator are always a multiple of
        # ``scheduler_block_size`` tokens (see ``find_longest_cache_hit``).
        # Within an aligned region, SWA groups may only consult a subset of blocks
        # per ``scheduler_block_size``-segment so the unused blocks also stay
        # out of the prefix-cache hash map.
        aligned_num_computed_tokens = (
            num_computed_tokens // self.scheduler_block_size * self.scheduler_block_size
        )
        for manager in self.single_type_managers:
            num_tokens_to_cache = aligned_num_computed_tokens
            # EAGLE groups match one block past each aligned boundary and drop
            # it, so make that lookahead block eligible to be cached.
            if manager.use_eagle and aligned_num_computed_tokens > 0:
                num_tokens_to_cache = min(
                    num_computed_tokens,
                    aligned_num_computed_tokens + manager.block_size,
                )
            # The manager already knows the fine hit granularity
            # (``scheduler_block_size``); retention is passed separately so it
            # can keep both the coarse segment tails and the fine replay
            # boundary (which needs the fine value).
            manager.cache_blocks(
                request,
                num_tokens_to_cache,
                retention_interval=self.retention_interval,
            )

    def find_longest_cache_hit(
        self,
        block_hashes: list[BlockHash],
        max_cache_hit_length: int,
    ) -> tuple[tuple[list[KVCacheBlock], ...], int]:
        """
        Find the longest cache hit using an iterative fixed-point algorithm.

        Each attention type either accepts the current candidate length or
        reduces it. If any type reduces the length, restart checks over all
        types. This converges because length monotonically decreases and is
        bounded below by 0.

        Args:
            block_hashes: The block hashes of the request.
            max_cache_hit_length: The maximum length of the cache hit.

        Returns:
            A tuple containing:
                - A tuple of the cache hit blocks for each single type manager.
                - The number of tokens of the longest cache hit.
        """

        def _get_block_hashes(kv_cache_spec: KVCacheSpec) -> BlockHashList:
            if kv_cache_spec.block_size == self.hash_block_size:
                return block_hashes
            return BlockHashListWithBlockSize(
                block_hashes, self.hash_block_size, kv_cache_spec.block_size
            )

        num_groups = len(self.kv_cache_config.kv_cache_groups)
        hit_length = max_cache_hit_length
        longest_hit_length = 0
        hit_blocks_by_group: list[list[KVCacheBlock] | None] = [None] * num_groups

        # Simple hybrid (1 full attn + 1 other): one iteration suffices.
        # Full attn is always first if it exists.
        is_simple_hybrid = len(self.attention_groups) == 2 and isinstance(
            self.attention_groups[0].spec, FullAttentionSpec
        )

        # Attention-group indices whose EAGLE drop is verified at the current
        # ``curr_hit_length``. Each eagle group applies the drop at most once
        # per candidate length (see issue #32802).
        eagle_verified: set[int] = set()

        while True:
            curr_hit_length = hit_length

            for idx, (spec, group_ids, manager_cls, use_eagle) in enumerate(
                self.attention_groups
            ):
                cached_blocks = hit_blocks_by_group[group_ids[0]]
                if isinstance(spec, FullAttentionSpec) and cached_blocks is not None:
                    # Full attention is downward-closed: we only need to look
                    # up cached blocks once; on subsequent iterations just trim
                    # to the (reduced) current hit length.
                    curr_hit_length = (
                        curr_hit_length // spec.block_size * spec.block_size
                    )
                    continue

                drop_eagle_block = use_eagle and idx not in eagle_verified

                _max_length = curr_hit_length
                if drop_eagle_block:
                    # Eagle needs to match one more block and then pop the last.
                    _max_length = min(
                        curr_hit_length + spec.block_size, max_cache_hit_length
                    )
                hit_blocks = manager_cls.find_longest_cache_hit(
                    block_hashes=_get_block_hashes(spec),
                    max_length=_max_length,
                    kv_cache_group_ids=group_ids,
                    block_pool=self.block_pool,
                    kv_cache_spec=spec,
                    drop_eagle_block=drop_eagle_block,
                    alignment_tokens=self.scheduler_block_size,
                )
                _new_hit_length = len(hit_blocks[0]) * spec.block_size
                if drop_eagle_block:
                    eagle_verified.add(idx)
                elif _new_hit_length < curr_hit_length:
                    # length shrunk; invalidate previous eagle verifications
                    eagle_verified.clear()
                curr_hit_length = _new_hit_length
                for group_id, blocks in zip(group_ids, hit_blocks):
                    hit_blocks_by_group[group_id] = blocks

                longest_hit_length = max(longest_hit_length, curr_hit_length)

            if curr_hit_length >= hit_length:
                break
            hit_length = curr_hit_length
            if is_simple_hybrid:
                break

        # Truncate full attention blocks to final hit_length (if present)
        first_group = self.attention_groups[0]
        if isinstance(first_group.spec, FullAttentionSpec):
            num_blocks = hit_length // first_group.spec.block_size
            for group_id in first_group.group_ids:
                if (blks := hit_blocks_by_group[group_id]) is not None:
                    del blks[num_blocks:]

        # Uncached shared prefix detection: If any attn. group cached a longer prefix
        # than the current prefix, it is an uncached common prefix across requests:
        self.num_uncached_common_prefix_tokens = longest_hit_length - hit_length
        return tuple(
            blocks if blocks is not None else [] for blocks in hit_blocks_by_group
        ), hit_length

    def find_longest_cache_hit_per_group(
        self,
        block_hashes: list[BlockHash],
        max_cache_hit_length: int,
    ) -> tuple[tuple[list[KVCacheBlock], ...], tuple[int, ...]]:
        """Like find_longest_cache_hit but evaluates each group independently.

        Returns:
            (blocks_per_group, hit_lengths_per_group)
        """

        def _get_block_hashes(kv_cache_spec: KVCacheSpec) -> BlockHashList:
            if kv_cache_spec.block_size == self.hash_block_size:
                return block_hashes
            return BlockHashListWithBlockSize(
                block_hashes, self.hash_block_size, kv_cache_spec.block_size
            )

        num_groups = len(self.kv_cache_config.kv_cache_groups)
        hit_blocks: list[list[KVCacheBlock]] = [[] for _ in range(num_groups)]
        hit_lengths: list[int] = [0] * num_groups

        for spec, group_ids, manager_cls, use_eagle in self.attention_groups:
            blocks = manager_cls.find_longest_cache_hit(
                block_hashes=_get_block_hashes(spec),
                max_length=max_cache_hit_length,
                kv_cache_group_ids=group_ids,
                block_pool=self.block_pool,
                kv_cache_spec=spec,
                drop_eagle_block=use_eagle,
                alignment_tokens=self.scheduler_block_size,
            )
            group_hit = len(blocks[0]) * spec.block_size
            for gid, blks in zip(group_ids, blocks):
                hit_blocks[gid] = blks
                hit_lengths[gid] = group_hit

        return tuple(hit_blocks), tuple(hit_lengths)

find_longest_cache_hit(block_hashes, max_cache_hit_length)

Find the longest cache hit using an iterative fixed-point algorithm.

Each attention type either accepts the current candidate length or reduces it. If any type reduces the length, restart checks over all types. This converges because length monotonically decreases and is bounded below by 0.

Parameters:

  • block_hashes

    (list[BlockHash]) –

    The block hashes of the request.

  • max_cache_hit_length

    (int) –

    The maximum length of the cache hit.

Returns:

  • tuple[tuple[list[KVCacheBlock], ...], int]

    A tuple containing: - A tuple of the cache hit blocks for each single type manager. - The number of tokens of the longest cache hit.

Source code in vllm/v1/core/kv_cache_coordinator.py
def find_longest_cache_hit(
    self,
    block_hashes: list[BlockHash],
    max_cache_hit_length: int,
) -> tuple[tuple[list[KVCacheBlock], ...], int]:
    """
    Find the longest cache hit using an iterative fixed-point algorithm.

    Each attention type either accepts the current candidate length or
    reduces it. If any type reduces the length, restart checks over all
    types. This converges because length monotonically decreases and is
    bounded below by 0.

    Args:
        block_hashes: The block hashes of the request.
        max_cache_hit_length: The maximum length of the cache hit.

    Returns:
        A tuple containing:
            - A tuple of the cache hit blocks for each single type manager.
            - The number of tokens of the longest cache hit.
    """

    def _get_block_hashes(kv_cache_spec: KVCacheSpec) -> BlockHashList:
        if kv_cache_spec.block_size == self.hash_block_size:
            return block_hashes
        return BlockHashListWithBlockSize(
            block_hashes, self.hash_block_size, kv_cache_spec.block_size
        )

    num_groups = len(self.kv_cache_config.kv_cache_groups)
    hit_length = max_cache_hit_length
    longest_hit_length = 0
    hit_blocks_by_group: list[list[KVCacheBlock] | None] = [None] * num_groups

    # Simple hybrid (1 full attn + 1 other): one iteration suffices.
    # Full attn is always first if it exists.
    is_simple_hybrid = len(self.attention_groups) == 2 and isinstance(
        self.attention_groups[0].spec, FullAttentionSpec
    )

    # Attention-group indices whose EAGLE drop is verified at the current
    # ``curr_hit_length``. Each eagle group applies the drop at most once
    # per candidate length (see issue #32802).
    eagle_verified: set[int] = set()

    while True:
        curr_hit_length = hit_length

        for idx, (spec, group_ids, manager_cls, use_eagle) in enumerate(
            self.attention_groups
        ):
            cached_blocks = hit_blocks_by_group[group_ids[0]]
            if isinstance(spec, FullAttentionSpec) and cached_blocks is not None:
                # Full attention is downward-closed: we only need to look
                # up cached blocks once; on subsequent iterations just trim
                # to the (reduced) current hit length.
                curr_hit_length = (
                    curr_hit_length // spec.block_size * spec.block_size
                )
                continue

            drop_eagle_block = use_eagle and idx not in eagle_verified

            _max_length = curr_hit_length
            if drop_eagle_block:
                # Eagle needs to match one more block and then pop the last.
                _max_length = min(
                    curr_hit_length + spec.block_size, max_cache_hit_length
                )
            hit_blocks = manager_cls.find_longest_cache_hit(
                block_hashes=_get_block_hashes(spec),
                max_length=_max_length,
                kv_cache_group_ids=group_ids,
                block_pool=self.block_pool,
                kv_cache_spec=spec,
                drop_eagle_block=drop_eagle_block,
                alignment_tokens=self.scheduler_block_size,
            )
            _new_hit_length = len(hit_blocks[0]) * spec.block_size
            if drop_eagle_block:
                eagle_verified.add(idx)
            elif _new_hit_length < curr_hit_length:
                # length shrunk; invalidate previous eagle verifications
                eagle_verified.clear()
            curr_hit_length = _new_hit_length
            for group_id, blocks in zip(group_ids, hit_blocks):
                hit_blocks_by_group[group_id] = blocks

            longest_hit_length = max(longest_hit_length, curr_hit_length)

        if curr_hit_length >= hit_length:
            break
        hit_length = curr_hit_length
        if is_simple_hybrid:
            break

    # Truncate full attention blocks to final hit_length (if present)
    first_group = self.attention_groups[0]
    if isinstance(first_group.spec, FullAttentionSpec):
        num_blocks = hit_length // first_group.spec.block_size
        for group_id in first_group.group_ids:
            if (blks := hit_blocks_by_group[group_id]) is not None:
                del blks[num_blocks:]

    # Uncached shared prefix detection: If any attn. group cached a longer prefix
    # than the current prefix, it is an uncached common prefix across requests:
    self.num_uncached_common_prefix_tokens = longest_hit_length - hit_length
    return tuple(
        blocks if blocks is not None else [] for blocks in hit_blocks_by_group
    ), hit_length

find_longest_cache_hit_per_group(block_hashes, max_cache_hit_length)

Like find_longest_cache_hit but evaluates each group independently.

Returns:

Source code in vllm/v1/core/kv_cache_coordinator.py
def find_longest_cache_hit_per_group(
    self,
    block_hashes: list[BlockHash],
    max_cache_hit_length: int,
) -> tuple[tuple[list[KVCacheBlock], ...], tuple[int, ...]]:
    """Like find_longest_cache_hit but evaluates each group independently.

    Returns:
        (blocks_per_group, hit_lengths_per_group)
    """

    def _get_block_hashes(kv_cache_spec: KVCacheSpec) -> BlockHashList:
        if kv_cache_spec.block_size == self.hash_block_size:
            return block_hashes
        return BlockHashListWithBlockSize(
            block_hashes, self.hash_block_size, kv_cache_spec.block_size
        )

    num_groups = len(self.kv_cache_config.kv_cache_groups)
    hit_blocks: list[list[KVCacheBlock]] = [[] for _ in range(num_groups)]
    hit_lengths: list[int] = [0] * num_groups

    for spec, group_ids, manager_cls, use_eagle in self.attention_groups:
        blocks = manager_cls.find_longest_cache_hit(
            block_hashes=_get_block_hashes(spec),
            max_length=max_cache_hit_length,
            kv_cache_group_ids=group_ids,
            block_pool=self.block_pool,
            kv_cache_spec=spec,
            drop_eagle_block=use_eagle,
            alignment_tokens=self.scheduler_block_size,
        )
        group_hit = len(blocks[0]) * spec.block_size
        for gid, blks in zip(group_ids, blocks):
            hit_blocks[gid] = blks
            hit_lengths[gid] = group_hit

    return tuple(hit_blocks), tuple(hit_lengths)

verify_and_split_kv_cache_groups()

Groups KV cache groups by their spec type for efficient batch processing during cache hit lookup.

Source code in vllm/v1/core/kv_cache_coordinator.py
def verify_and_split_kv_cache_groups(self) -> None:
    """
    Groups KV cache groups by their spec type for efficient batch processing
    during cache hit lookup.
    """
    self.attention_groups: list[SpecGroup] = []
    for i, g in enumerate(self.kv_cache_config.kv_cache_groups):
        manager_cls = self.single_type_managers[i].__class__
        spec = g.kv_cache_spec
        use_eagle = i in self.eagle_group_ids

        # Try to find an existing group with the same spec
        for idx, group in enumerate(self.attention_groups):
            if group.spec == spec:
                assert manager_cls is group.manager_cls, (
                    "Expected same manager class for identical KV cache specs."
                )
                group.group_ids.append(i)
                if use_eagle and not group.use_eagle:
                    self.attention_groups[idx] = group._replace(use_eagle=True)
                break
        else:
            self.attention_groups.append(
                SpecGroup(spec, [i], manager_cls, use_eagle)
            )

    assert len(self.attention_groups) > 1, (
        "HybridKVCacheCoordinator requires at least two attention groups."
    )

    # Put full attention first: its efficient left-to-right scan provides
    # a tighter initial bound, reducing work for subsequent groups.
    self.attention_groups.sort(
        key=lambda g: not isinstance(g.spec, FullAttentionSpec)
    )

    # Propagate the eagle bit to each manager (default to ``use_eagle=False``).
    for group in self.attention_groups:
        if group.use_eagle:
            for gid in group.group_ids:
                self.single_type_managers[gid].use_eagle = True

KVCacheCoordinator

Bases: ABC

Coordinate the KV cache of different KV cache groups.

Methods:

Source code in vllm/v1/core/kv_cache_coordinator.py
class KVCacheCoordinator(ABC):
    """
    Coordinate the KV cache of different KV cache groups.
    """

    def __init__(
        self,
        kv_cache_config: KVCacheConfig,
        max_model_len: int,
        max_num_batched_tokens: int,
        use_eagle: bool,
        enable_caching: bool,
        enable_kv_cache_events: bool,
        dcp_world_size: int,
        pcp_world_size: int,
        scheduler_block_size: int,
        hash_block_size: int,
        metrics_collector: KVCacheMetricsCollector | None = None,
    ):
        self.kv_cache_config = kv_cache_config
        self.max_model_len = max_model_len
        self.enable_caching = enable_caching
        # The scheduling granularity (LCM of all group block sizes), must be a multiple
        # of the hash_block_size and the block size of each group.
        assert scheduler_block_size % hash_block_size == 0 and all(
            scheduler_block_size % g.kv_cache_spec.block_size == 0
            for g in kv_cache_config.kv_cache_groups
        )
        self.scheduler_block_size = scheduler_block_size

        self.block_pool = BlockPool(
            num_gpu_blocks=kv_cache_config.num_blocks,
            enable_caching=enable_caching,
            hash_block_size=hash_block_size,
            enable_kv_cache_events=enable_kv_cache_events,
            metrics_collector=metrics_collector,
        )

        # KV cache group indices that get the EAGLE last-block drop.
        self.eagle_group_ids: set[int] = {
            i for i, g in enumerate(kv_cache_config.kv_cache_groups) if g.is_eagle_group
        }
        # Conservatively fall back to flag all groups when no group is flagged.
        if use_eagle and not self.eagle_group_ids:
            self.eagle_group_ids = set(range(len(kv_cache_config.kv_cache_groups)))

        self.single_type_managers = tuple(
            get_manager_for_kv_cache_spec(
                kv_cache_spec=kv_cache_group.kv_cache_spec,
                max_num_batched_tokens=max_num_batched_tokens,
                max_model_len=max_model_len,
                block_pool=self.block_pool,
                enable_caching=enable_caching,
                kv_cache_group_id=i,
                dcp_world_size=dcp_world_size,
                pcp_world_size=pcp_world_size,
                scheduler_block_size=self.scheduler_block_size,
            )
            for i, kv_cache_group in enumerate(self.kv_cache_config.kv_cache_groups)
        )

        # A positive retention interval must be a multiple of the base hit granularity
        # (``scheduler_block_size``) to land on real cache-hit boundaries.
        # 0 = keep only the latest replay boundary; None = dense;
        self.retention_interval = envs.VLLM_PREFIX_CACHE_RETENTION_INTERVAL
        _validate_prefix_cache_retention_interval(
            self.retention_interval, self.scheduler_block_size, kv_cache_config
        )

    def get_num_blocks_to_allocate(
        self,
        request_id: str,
        num_tokens: int,
        new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
        num_encoder_tokens: int,
        total_computed_tokens: int,
        num_tokens_main_model: int,
        apply_admission_cap: bool = False,
    ) -> int:
        """
        Get the number of blocks needed to be allocated for the request.

        Args:
            request_id: The request ID.
            num_tokens: The total number of tokens that need a slot (including
                tokens that are already allocated).
            new_computed_blocks: The new computed blocks just hitting the
                prefix caching.
            num_encoder_tokens: The number of encoder tokens for allocating
                blocks for cross-attention.
            total_computed_tokens: Include both local and external tokens.
            num_tokens_main_model: The number of tokens for the main model (aka target
                model in spec decode). w/o spec decode, it is num_tokens;
                with spec decode, it is num_tokens - num_lookahead_tokens.
            apply_admission_cap: If True, apply the recycling-aware
                per-request admission cap (SWA / chunked-local). Set only by
                the full-sequence admission gate; per-step allocation must
                leave it False so the predictor matches `allocate_new_blocks`.

        Returns:
            The number of blocks to allocate.
        """
        num_blocks_to_allocate = 0
        for i, manager in enumerate(self.single_type_managers):
            if isinstance(manager, CrossAttentionManager):
                # For cross-attention, we issue a single static allocation
                # of blocks based on the number of encoder input tokens.
                num_blocks_to_allocate += manager.get_num_blocks_to_allocate(
                    request_id,
                    num_encoder_tokens,
                    [],
                    0,
                    num_encoder_tokens,
                    apply_admission_cap=apply_admission_cap,
                )
            else:
                num_blocks_to_allocate += manager.get_num_blocks_to_allocate(
                    request_id,
                    num_tokens,
                    new_computed_blocks[i],
                    total_computed_tokens,
                    num_tokens_main_model,
                    apply_admission_cap=apply_admission_cap,
                )
        return num_blocks_to_allocate

    def allocate_new_computed_blocks(
        self,
        request_id: str,
        new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
        num_local_computed_tokens: int,
        num_external_computed_tokens: int,
    ) -> None:
        """
        Add the new computed blocks to the request. Optionally allocate new
            blocks for external computed tokens (if any).

        Args:
            request_id: The request ID.
            new_computed_blocks: The new computed blocks just hitting the
                prefix cache.
            num_local_computed_tokens: The number of local computed tokens.
            num_external_computed_tokens: The number of external computed tokens.
        """
        for i, manager in enumerate(self.single_type_managers):
            manager.allocate_new_computed_blocks(
                request_id,
                new_computed_blocks[i],
                num_local_computed_tokens,
                num_external_computed_tokens,
            )

    def allocate_new_blocks(
        self,
        request_id: str,
        num_tokens: int,
        num_tokens_main_model: int,
        num_encoder_tokens: int = 0,
    ) -> tuple[list[KVCacheBlock], ...]:
        """
        Allocate new blocks for the request to give it at least `num_tokens`
        token slots.

        Args:
            request_id: The request ID.
            num_tokens: The total number of tokens that need a slot (including
                tokens that are already allocated).
            num_tokens_main_model: The number of tokens for the main model (aka target
                model in spec decode). w/o spec decode, it is num_tokens;
                with spec decode, it is num_tokens - num_lookahead_tokens.
            num_encoder_tokens: The number of encoder tokens for allocating
                blocks for cross-attention.

        Returns:
            The new allocated blocks.
        """
        return tuple(
            manager.allocate_new_blocks(
                request_id,
                num_encoder_tokens
                if isinstance(manager, CrossAttentionManager)
                else num_tokens,
                num_tokens_main_model,
            )
            for manager in self.single_type_managers
        )

    def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
        """
        Cache the blocks for the request.

        Args:
            request: The request.
            num_computed_tokens: The total number of tokens
                that need to be cached
                (including tokens that are already cached).
        """
        for manager in self.single_type_managers:
            manager.cache_blocks(
                request,
                num_computed_tokens,
                retention_interval=self.retention_interval,
            )

    def free(self, request_id: str) -> None:
        """
        Free the blocks for the request.

        Args:
            request_id: The request ID.
        """
        for manager in self.single_type_managers:
            manager.free(request_id)

    def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
        """
        Get the number of common prefix blocks for all requests with allocated
        KV cache for each kv cache group.

        Args:
            running_request_id: The request ID of any running request, used to
                identify the common prefix blocks.

        Returns:
            list[int]: The number of common prefix blocks for each kv cache group.
        """
        return [
            manager.get_num_common_prefix_blocks(running_request_id)
            for manager in self.single_type_managers
        ]

    def remove_skipped_blocks(
        self, request_id: str, total_computed_tokens: int
    ) -> None:
        """
        Remove the blocks that are no longer needed from `blocks` and replace
        the removed blocks with null_block.

        Args:
            request_id: The request ID.
            total_computed_tokens: The total number of computed tokens, including
                local computed tokens and external computed tokens.
        """
        for manager in self.single_type_managers:
            manager.remove_skipped_blocks(request_id, total_computed_tokens)

    def get_blocks(self, request_id: str) -> tuple[list[KVCacheBlock], ...]:
        """
        Get the blocks for the request.
        """
        return tuple(
            manager.req_to_blocks.get(request_id) or []
            for manager in self.single_type_managers
        )

    @abstractmethod
    def find_longest_cache_hit(
        self,
        block_hashes: list[BlockHash],
        max_cache_hit_length: int,
    ) -> tuple[tuple[list[KVCacheBlock], ...], int]:
        pass

    def new_step_starts(self) -> None:
        """Called when a new step is started."""
        for manager in self.single_type_managers:
            manager.new_step_starts()

allocate_new_blocks(request_id, num_tokens, num_tokens_main_model, num_encoder_tokens=0)

Allocate new blocks for the request to give it at least num_tokens token slots.

Parameters:

  • request_id

    (str) –

    The request ID.

  • num_tokens

    (int) –

    The total number of tokens that need a slot (including tokens that are already allocated).

  • num_tokens_main_model

    (int) –

    The number of tokens for the main model (aka target model in spec decode). w/o spec decode, it is num_tokens; with spec decode, it is num_tokens - num_lookahead_tokens.

  • num_encoder_tokens

    (int, default: 0 ) –

    The number of encoder tokens for allocating blocks for cross-attention.

Returns:

Source code in vllm/v1/core/kv_cache_coordinator.py
def allocate_new_blocks(
    self,
    request_id: str,
    num_tokens: int,
    num_tokens_main_model: int,
    num_encoder_tokens: int = 0,
) -> tuple[list[KVCacheBlock], ...]:
    """
    Allocate new blocks for the request to give it at least `num_tokens`
    token slots.

    Args:
        request_id: The request ID.
        num_tokens: The total number of tokens that need a slot (including
            tokens that are already allocated).
        num_tokens_main_model: The number of tokens for the main model (aka target
            model in spec decode). w/o spec decode, it is num_tokens;
            with spec decode, it is num_tokens - num_lookahead_tokens.
        num_encoder_tokens: The number of encoder tokens for allocating
            blocks for cross-attention.

    Returns:
        The new allocated blocks.
    """
    return tuple(
        manager.allocate_new_blocks(
            request_id,
            num_encoder_tokens
            if isinstance(manager, CrossAttentionManager)
            else num_tokens,
            num_tokens_main_model,
        )
        for manager in self.single_type_managers
    )

allocate_new_computed_blocks(request_id, new_computed_blocks, num_local_computed_tokens, num_external_computed_tokens)

Add the new computed blocks to the request. Optionally allocate new blocks for external computed tokens (if any).

Parameters:

  • request_id

    (str) –

    The request ID.

  • new_computed_blocks

    (tuple[Sequence[KVCacheBlock], ...]) –

    The new computed blocks just hitting the prefix cache.

  • num_local_computed_tokens

    (int) –

    The number of local computed tokens.

  • num_external_computed_tokens

    (int) –

    The number of external computed tokens.

Source code in vllm/v1/core/kv_cache_coordinator.py
def allocate_new_computed_blocks(
    self,
    request_id: str,
    new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
    num_local_computed_tokens: int,
    num_external_computed_tokens: int,
) -> None:
    """
    Add the new computed blocks to the request. Optionally allocate new
        blocks for external computed tokens (if any).

    Args:
        request_id: The request ID.
        new_computed_blocks: The new computed blocks just hitting the
            prefix cache.
        num_local_computed_tokens: The number of local computed tokens.
        num_external_computed_tokens: The number of external computed tokens.
    """
    for i, manager in enumerate(self.single_type_managers):
        manager.allocate_new_computed_blocks(
            request_id,
            new_computed_blocks[i],
            num_local_computed_tokens,
            num_external_computed_tokens,
        )

cache_blocks(request, num_computed_tokens)

Cache the blocks for the request.

Parameters:

  • request

    (Request) –

    The request.

  • num_computed_tokens

    (int) –

    The total number of tokens that need to be cached (including tokens that are already cached).

Source code in vllm/v1/core/kv_cache_coordinator.py
def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
    """
    Cache the blocks for the request.

    Args:
        request: The request.
        num_computed_tokens: The total number of tokens
            that need to be cached
            (including tokens that are already cached).
    """
    for manager in self.single_type_managers:
        manager.cache_blocks(
            request,
            num_computed_tokens,
            retention_interval=self.retention_interval,
        )

free(request_id)

Free the blocks for the request.

Parameters:

  • request_id

    (str) –

    The request ID.

Source code in vllm/v1/core/kv_cache_coordinator.py
def free(self, request_id: str) -> None:
    """
    Free the blocks for the request.

    Args:
        request_id: The request ID.
    """
    for manager in self.single_type_managers:
        manager.free(request_id)

get_blocks(request_id)

Get the blocks for the request.

Source code in vllm/v1/core/kv_cache_coordinator.py
def get_blocks(self, request_id: str) -> tuple[list[KVCacheBlock], ...]:
    """
    Get the blocks for the request.
    """
    return tuple(
        manager.req_to_blocks.get(request_id) or []
        for manager in self.single_type_managers
    )

get_num_blocks_to_allocate(request_id, num_tokens, new_computed_blocks, num_encoder_tokens, total_computed_tokens, num_tokens_main_model, apply_admission_cap=False)

Get the number of blocks needed to be allocated for the request.

Parameters:

  • request_id

    (str) –

    The request ID.

  • num_tokens

    (int) –

    The total number of tokens that need a slot (including tokens that are already allocated).

  • new_computed_blocks

    (tuple[Sequence[KVCacheBlock], ...]) –

    The new computed blocks just hitting the prefix caching.

  • num_encoder_tokens

    (int) –

    The number of encoder tokens for allocating blocks for cross-attention.

  • total_computed_tokens

    (int) –

    Include both local and external tokens.

  • num_tokens_main_model

    (int) –

    The number of tokens for the main model (aka target model in spec decode). w/o spec decode, it is num_tokens; with spec decode, it is num_tokens - num_lookahead_tokens.

  • apply_admission_cap

    (bool, default: False ) –

    If True, apply the recycling-aware per-request admission cap (SWA / chunked-local). Set only by the full-sequence admission gate; per-step allocation must leave it False so the predictor matches allocate_new_blocks.

Returns:

  • int

    The number of blocks to allocate.

Source code in vllm/v1/core/kv_cache_coordinator.py
def get_num_blocks_to_allocate(
    self,
    request_id: str,
    num_tokens: int,
    new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
    num_encoder_tokens: int,
    total_computed_tokens: int,
    num_tokens_main_model: int,
    apply_admission_cap: bool = False,
) -> int:
    """
    Get the number of blocks needed to be allocated for the request.

    Args:
        request_id: The request ID.
        num_tokens: The total number of tokens that need a slot (including
            tokens that are already allocated).
        new_computed_blocks: The new computed blocks just hitting the
            prefix caching.
        num_encoder_tokens: The number of encoder tokens for allocating
            blocks for cross-attention.
        total_computed_tokens: Include both local and external tokens.
        num_tokens_main_model: The number of tokens for the main model (aka target
            model in spec decode). w/o spec decode, it is num_tokens;
            with spec decode, it is num_tokens - num_lookahead_tokens.
        apply_admission_cap: If True, apply the recycling-aware
            per-request admission cap (SWA / chunked-local). Set only by
            the full-sequence admission gate; per-step allocation must
            leave it False so the predictor matches `allocate_new_blocks`.

    Returns:
        The number of blocks to allocate.
    """
    num_blocks_to_allocate = 0
    for i, manager in enumerate(self.single_type_managers):
        if isinstance(manager, CrossAttentionManager):
            # For cross-attention, we issue a single static allocation
            # of blocks based on the number of encoder input tokens.
            num_blocks_to_allocate += manager.get_num_blocks_to_allocate(
                request_id,
                num_encoder_tokens,
                [],
                0,
                num_encoder_tokens,
                apply_admission_cap=apply_admission_cap,
            )
        else:
            num_blocks_to_allocate += manager.get_num_blocks_to_allocate(
                request_id,
                num_tokens,
                new_computed_blocks[i],
                total_computed_tokens,
                num_tokens_main_model,
                apply_admission_cap=apply_admission_cap,
            )
    return num_blocks_to_allocate

get_num_common_prefix_blocks(running_request_id)

Get the number of common prefix blocks for all requests with allocated KV cache for each kv cache group.

Parameters:

  • running_request_id

    (str) –

    The request ID of any running request, used to identify the common prefix blocks.

Returns:

  • list[int]

    list[int]: The number of common prefix blocks for each kv cache group.

Source code in vllm/v1/core/kv_cache_coordinator.py
def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
    """
    Get the number of common prefix blocks for all requests with allocated
    KV cache for each kv cache group.

    Args:
        running_request_id: The request ID of any running request, used to
            identify the common prefix blocks.

    Returns:
        list[int]: The number of common prefix blocks for each kv cache group.
    """
    return [
        manager.get_num_common_prefix_blocks(running_request_id)
        for manager in self.single_type_managers
    ]

new_step_starts()

Called when a new step is started.

Source code in vllm/v1/core/kv_cache_coordinator.py
def new_step_starts(self) -> None:
    """Called when a new step is started."""
    for manager in self.single_type_managers:
        manager.new_step_starts()

remove_skipped_blocks(request_id, total_computed_tokens)

Remove the blocks that are no longer needed from blocks and replace the removed blocks with null_block.

Parameters:

  • request_id

    (str) –

    The request ID.

  • total_computed_tokens

    (int) –

    The total number of computed tokens, including local computed tokens and external computed tokens.

Source code in vllm/v1/core/kv_cache_coordinator.py
def remove_skipped_blocks(
    self, request_id: str, total_computed_tokens: int
) -> None:
    """
    Remove the blocks that are no longer needed from `blocks` and replace
    the removed blocks with null_block.

    Args:
        request_id: The request ID.
        total_computed_tokens: The total number of computed tokens, including
            local computed tokens and external computed tokens.
    """
    for manager in self.single_type_managers:
        manager.remove_skipped_blocks(request_id, total_computed_tokens)

KVCacheCoordinatorNoPrefixCache

Bases: KVCacheCoordinator

KV cache coordinator to use if prefix caching is disabled or unsupported. In contrast to UnitaryKVCacheCoordinator and HybridKVCacheCoordinator, supports arbitrary numbers of KV cache groups (including 0 groups). Does not implement any features related to prefix caching.

Source code in vllm/v1/core/kv_cache_coordinator.py
class KVCacheCoordinatorNoPrefixCache(KVCacheCoordinator):
    """
    KV cache coordinator to use if prefix caching is disabled or unsupported.
    In contrast to UnitaryKVCacheCoordinator and HybridKVCacheCoordinator,
    supports arbitrary numbers of KV cache groups (including 0 groups).
    Does not implement any features related to prefix caching.
    """

    def __init__(
        self,
        kv_cache_config: KVCacheConfig,
        max_model_len: int,
        max_num_batched_tokens: int,
        use_eagle: bool,
        enable_kv_cache_events: bool,
        dcp_world_size: int,
        pcp_world_size: int,
        scheduler_block_size: int,
        hash_block_size: int,
        metrics_collector: KVCacheMetricsCollector | None = None,
    ):
        super().__init__(
            kv_cache_config,
            max_model_len,
            max_num_batched_tokens,
            use_eagle,
            False,
            enable_kv_cache_events,
            dcp_world_size=dcp_world_size,
            pcp_world_size=pcp_world_size,
            scheduler_block_size=scheduler_block_size,
            hash_block_size=hash_block_size,
            metrics_collector=metrics_collector,
        )
        self.num_single_type_manager = len(self.single_type_managers)

    def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
        return [0] * self.num_single_type_manager

    def find_longest_cache_hit(
        self,
        block_hashes: list[BlockHash],
        max_cache_hit_length: int,
    ) -> tuple[tuple[list[KVCacheBlock], ...], int]:
        blocks: tuple[list[KVCacheBlock], ...] = tuple(
            [] for _ in range(self.num_single_type_manager)
        )
        return blocks, 0

SpecGroup

Bases: NamedTuple

KV cache groups that share one spec, batched together for a single cache-hit lookup.

use_eagle is True iff any member group is an EAGLE/MTP group. Members sharing a spec are cached and looked up jointly, so the EAGLE last-block drop is necessarily decided for the whole spec group.

Source code in vllm/v1/core/kv_cache_coordinator.py
class SpecGroup(NamedTuple):
    """KV cache groups that share one spec, batched together for a single
    cache-hit lookup.

    ``use_eagle`` is True iff any member group is an EAGLE/MTP group. Members
    sharing a spec are cached and looked up jointly, so the EAGLE last-block drop
    is necessarily decided for the whole spec group.
    """

    spec: KVCacheSpec
    group_ids: list[int]
    manager_cls: type[SingleTypeKVCacheManager]
    use_eagle: bool

UnitaryKVCacheCoordinator

Bases: KVCacheCoordinator

KV cache coordinator for models with only one KV cache group. This is the case for models with only one KV cache type, e.g., all attention layers use full attention or all attention layers use sliding window attention.

Source code in vllm/v1/core/kv_cache_coordinator.py
class UnitaryKVCacheCoordinator(KVCacheCoordinator):
    """
    KV cache coordinator for models with only one KV cache group. This is the
    case for models with only one KV cache type, e.g., all attention layers use
    full attention or all attention layers use sliding window attention.
    """

    def __init__(
        self,
        kv_cache_config: KVCacheConfig,
        max_model_len: int,
        max_num_batched_tokens: int,
        use_eagle: bool,
        enable_caching: bool,
        enable_kv_cache_events: bool,
        dcp_world_size: int,
        pcp_world_size: int,
        scheduler_block_size: int,
        hash_block_size: int,
        metrics_collector: KVCacheMetricsCollector | None = None,
    ):
        super().__init__(
            kv_cache_config,
            max_model_len,
            max_num_batched_tokens,
            use_eagle,
            enable_caching,
            enable_kv_cache_events,
            dcp_world_size=dcp_world_size,
            pcp_world_size=pcp_world_size,
            scheduler_block_size=scheduler_block_size,
            hash_block_size=hash_block_size,
            metrics_collector=metrics_collector,
        )
        self.kv_cache_spec = self.kv_cache_config.kv_cache_groups[0].kv_cache_spec
        self.block_size = self.kv_cache_spec.block_size
        self.dcp_world_size = dcp_world_size
        self.pcp_world_size = pcp_world_size
        if dcp_world_size > 1:
            self.block_size *= dcp_world_size
        if pcp_world_size > 1:
            self.block_size *= pcp_world_size
        # For models using only Mamba, block_size is set to max_model_len when
        # prefix caching is disabled, and hash_block_size validation is skipped.
        assert not enable_caching or (hash_block_size == self.block_size), (
            "UnitaryKVCacheCoordinator assumes hash_block_size == block_size"
        )
        assert len(self.kv_cache_config.kv_cache_groups) == 1, (
            "UnitaryKVCacheCoordinator assumes only one kv cache group"
        )
        # Single group; useless but just set ``use_eagle`` for consistency regardless.
        self.single_type_managers[0].use_eagle = 0 in self.eagle_group_ids

    def find_longest_cache_hit(
        self,
        block_hashes: list[BlockHash],
        max_cache_hit_length: int,
    ) -> tuple[tuple[list[KVCacheBlock], ...], int]:
        hit_blocks = self.single_type_managers[0].find_longest_cache_hit(
            block_hashes=block_hashes,
            max_length=max_cache_hit_length,
            kv_cache_group_ids=[0],
            block_pool=self.block_pool,
            kv_cache_spec=self.kv_cache_spec,
            drop_eagle_block=0 in self.eagle_group_ids,
            alignment_tokens=self.block_size,
            dcp_world_size=self.dcp_world_size,
            pcp_world_size=self.pcp_world_size,
        )
        return hit_blocks, len(hit_blocks[0]) * self.block_size