Skip to content

vllm.v1.kv_offload.worker.worker

Classes:

  • OffloadingHandler

    OffloadingHandler class for managing asynchronous KV data transfers

  • OffloadingWorker

    OffloadingWorker class for managing asynchronous KV data transfers

OffloadingHandler

Bases: ABC

OffloadingHandler class for managing asynchronous KV data transfers

This class runs in the worker. It kicks off async KV data transfer requests, and allows collecting back completion statuses.

The class provides the following primitives

transfer_async() - kicks off a new transfer job get_finished() - returns a list of newly finished job IDs.

Methods:

  • get_finished

    Get transfers finished since last call.

  • shutdown

    Shutdown the handler and release any resources.

  • transfer_async

    Initiates an asynchronous transfer of KV data.

  • wait

    Wait for jobs to finish (blocking).

Source code in vllm/v1/kv_offload/worker/worker.py
class OffloadingHandler(ABC):
    """
    OffloadingHandler class for managing asynchronous KV data transfers

    This class runs in the worker.
    It kicks off async KV data transfer requests, and allows
    collecting back completion statuses.

    The class provides the following primitives:
        transfer_async() - kicks off a new transfer job
        get_finished() - returns a list of newly finished job IDs.
    """

    @abstractmethod
    def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
        """
        Initiates an asynchronous transfer of KV data.

        Args:
            job_id: a unique ID that will be used when notifying back on
                transfer completion.
            spec: the (src, dst) spec of the KV data transfer.

        Returns:
            True if transfer was submitted successfully.
        """
        pass

    @abstractmethod
    def get_finished(self) -> list[TransferResult]:
        """
        Get transfers finished since last call.

        Returns:
            A list of (job_id, success) of transfers.
        """
        pass

    @abstractmethod
    def wait(self, job_ids: set[int]) -> None:
        """
        Wait for jobs to finish (blocking).
        Args:
            job_ids: The set of job IDs to wait for.
        """

    def shutdown(self) -> None:
        """Shutdown the handler and release any resources."""
        return

get_finished() abstractmethod

Get transfers finished since last call.

Returns:

  • list[TransferResult]

    A list of (job_id, success) of transfers.

Source code in vllm/v1/kv_offload/worker/worker.py
@abstractmethod
def get_finished(self) -> list[TransferResult]:
    """
    Get transfers finished since last call.

    Returns:
        A list of (job_id, success) of transfers.
    """
    pass

shutdown()

Shutdown the handler and release any resources.

Source code in vllm/v1/kv_offload/worker/worker.py
def shutdown(self) -> None:
    """Shutdown the handler and release any resources."""
    return

transfer_async(job_id, spec) abstractmethod

Initiates an asynchronous transfer of KV data.

Parameters:

  • job_id

    (int) –

    a unique ID that will be used when notifying back on transfer completion.

  • spec

    (TransferSpec) –

    the (src, dst) spec of the KV data transfer.

Returns:

  • bool

    True if transfer was submitted successfully.

Source code in vllm/v1/kv_offload/worker/worker.py
@abstractmethod
def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
    """
    Initiates an asynchronous transfer of KV data.

    Args:
        job_id: a unique ID that will be used when notifying back on
            transfer completion.
        spec: the (src, dst) spec of the KV data transfer.

    Returns:
        True if transfer was submitted successfully.
    """
    pass

wait(job_ids) abstractmethod

Wait for jobs to finish (blocking). Args: job_ids: The set of job IDs to wait for.

Source code in vllm/v1/kv_offload/worker/worker.py
@abstractmethod
def wait(self, job_ids: set[int]) -> None:
    """
    Wait for jobs to finish (blocking).
    Args:
        job_ids: The set of job IDs to wait for.
    """

OffloadingWorker

OffloadingWorker class for managing asynchronous KV data transfers using multiple OffloadingHandlers

This class runs in the worker. It kicks off async KV data transfer requests, by delegating to one of its registered OffloadingHandlers, based on the transfer type.

The class provides the following primitives

register_handler() - registers a new handler to handle a specific transfer type transfer_async() - kicks off a new transfer job using one of the registered handlers. get_finished() - returns a list of newly finished job IDs from all handlers.

Methods:

Source code in vllm/v1/kv_offload/worker/worker.py
class OffloadingWorker:
    """
    OffloadingWorker class for managing asynchronous KV data transfers
    using multiple OffloadingHandlers

    This class runs in the worker.
    It kicks off async KV data transfer requests, by delegating
    to one of its registered OffloadingHandlers, based on the transfer type.

    The class provides the following primitives:
        register_handler() - registers a new handler to handle
            a specific transfer type
        transfer_async() - kicks off a new transfer job
            using one of the registered handlers.
        get_finished() - returns a list of newly finished job IDs
            from all handlers.
    """

    def __init__(self):
        self.handlers: set[OffloadingHandler] = set()
        self.transfer_type_to_handler: dict[TransferType, OffloadingHandler] = {}

    def register_handler(
        self,
        src_cls: type[LoadStoreSpec],
        dst_cls: type[LoadStoreSpec],
        handler: OffloadingHandler,
    ) -> None:
        """
        Registers a new handler.

        Args:
            src_cls: the source type of transfers handled by this handler.
            dst_cls: the destination type of transfers handled by this handler.
            handler: the handler that will handle transfers.
        """
        transfer_type = (src_cls.medium(), dst_cls.medium())
        assert transfer_type not in self.transfer_type_to_handler
        self.handlers.add(handler)
        self.transfer_type_to_handler[transfer_type] = handler

    def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
        """
        Initiates an asynchronous transfer of KV data.

        Args:
            job_id: a unique ID that will be used when notifying back on
                transfer completion.
            spec: the (src, dst) spec of the KV data transfer.

        Returns:
            True if transfer was submitted successfully.
        """
        src, dst = spec
        transfer_type = (src.medium(), dst.medium())
        handler = self.transfer_type_to_handler.get(transfer_type)
        assert handler is not None
        try:
            success = handler.transfer_async(job_id, spec)
        except Exception as e:
            logger.warning(
                "Exception in %r transfer %d: %r",
                transfer_type,
                job_id,
                e,
                exc_info=True,
            )
            return False

        if not success:
            logger.warning("Failed to submit %r transfer %d", transfer_type, job_id)
        else:
            logger.debug("Submitted %r transfer %d: %r", transfer_type, job_id, spec)
        return success

    def get_finished(self) -> list[TransferResult]:
        """
        Get transfers finished since last call.

        Returns:
            A list of TransferResults
        """
        finished = []
        for handler in self.handlers:
            finished.extend(handler.get_finished())
        return finished

    def wait(self, job_ids: set[int]) -> None:
        """
        Wait for jobs to finish (blocking).

        Args:
            job_ids: The set of job IDs to wait for.
        """
        for handler in self.handlers:
            handler.wait(job_ids)

    def shutdown(self) -> None:
        for handler in self.handlers:
            handler.shutdown()

get_finished()

Get transfers finished since last call.

Returns:

  • list[TransferResult]

    A list of TransferResults

Source code in vllm/v1/kv_offload/worker/worker.py
def get_finished(self) -> list[TransferResult]:
    """
    Get transfers finished since last call.

    Returns:
        A list of TransferResults
    """
    finished = []
    for handler in self.handlers:
        finished.extend(handler.get_finished())
    return finished

register_handler(src_cls, dst_cls, handler)

Registers a new handler.

Parameters:

Source code in vllm/v1/kv_offload/worker/worker.py
def register_handler(
    self,
    src_cls: type[LoadStoreSpec],
    dst_cls: type[LoadStoreSpec],
    handler: OffloadingHandler,
) -> None:
    """
    Registers a new handler.

    Args:
        src_cls: the source type of transfers handled by this handler.
        dst_cls: the destination type of transfers handled by this handler.
        handler: the handler that will handle transfers.
    """
    transfer_type = (src_cls.medium(), dst_cls.medium())
    assert transfer_type not in self.transfer_type_to_handler
    self.handlers.add(handler)
    self.transfer_type_to_handler[transfer_type] = handler

transfer_async(job_id, spec)

Initiates an asynchronous transfer of KV data.

Parameters:

  • job_id

    (int) –

    a unique ID that will be used when notifying back on transfer completion.

  • spec

    (TransferSpec) –

    the (src, dst) spec of the KV data transfer.

Returns:

  • bool

    True if transfer was submitted successfully.

Source code in vllm/v1/kv_offload/worker/worker.py
def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
    """
    Initiates an asynchronous transfer of KV data.

    Args:
        job_id: a unique ID that will be used when notifying back on
            transfer completion.
        spec: the (src, dst) spec of the KV data transfer.

    Returns:
        True if transfer was submitted successfully.
    """
    src, dst = spec
    transfer_type = (src.medium(), dst.medium())
    handler = self.transfer_type_to_handler.get(transfer_type)
    assert handler is not None
    try:
        success = handler.transfer_async(job_id, spec)
    except Exception as e:
        logger.warning(
            "Exception in %r transfer %d: %r",
            transfer_type,
            job_id,
            e,
            exc_info=True,
        )
        return False

    if not success:
        logger.warning("Failed to submit %r transfer %d", transfer_type, job_id)
    else:
        logger.debug("Submitted %r transfer %d: %r", transfer_type, job_id, spec)
    return success

wait(job_ids)

Wait for jobs to finish (blocking).

Parameters:

  • job_ids

    (set[int]) –

    The set of job IDs to wait for.

Source code in vllm/v1/kv_offload/worker/worker.py
def wait(self, job_ids: set[int]) -> None:
    """
    Wait for jobs to finish (blocking).

    Args:
        job_ids: The set of job IDs to wait for.
    """
    for handler in self.handlers:
        handler.wait(job_ids)