Skip to content

vllm.mocks.mock_nixl_connector

FakeNixlWrapper

Mock implementation of NixlWrapper for testing.

We don't inherit from nixl._api.nixl_agent because nixl may not be installed.

Source code in vllm/mocks/mock_nixl_connector.py
class FakeNixlWrapper:
    """Mock implementation of NixlWrapper for testing.

    We don't inherit from nixl._api.nixl_agent because nixl may not be
    installed.
    """

    AGENT_METADATA = b"fake_agent_metadata"
    REMOTE_AGENT_NAME = "remote_agent"

    def __init__(self, agent_name: str, *args, **kwargs):
        self._cycles_before_xfer_done = 0
        self._check_xfer_state_cycles: defaultdict[int, int] = defaultdict(
            lambda: 0)

    def get_reg_descs(self, caches_data, memory_type: str) -> list:
        return [str(uuid.uuid4()) for _ in caches_data]

    def register_memory(self, descs) -> None:
        pass

    def get_xfer_descs(self, blocks_data, memory_type: str) -> list:
        return [str(uuid.uuid4()) for _ in blocks_data]

    def prep_xfer_dlist(self, agent_name: str, descs: list) -> int:
        return uuid.uuid4().int

    def get_agent_metadata(self) -> bytes:
        return self.AGENT_METADATA

    def add_remote_agent(self, agent_metadata: bytes) -> str:
        return self.REMOTE_AGENT_NAME

    def get_new_notifs(self) -> dict[str, list[bytes]]:
        # Used to collect done_sending, which we don't test yet.
        return {}

    def check_xfer_state(self, handle: int) -> str:
        if self._check_xfer_state_cycles[
                handle] >= self._cycles_before_xfer_done:
            return "DONE"
        self._check_xfer_state_cycles[handle] += 1
        return "PROC"

    def release_xfer_handle(self, handle: int) -> None:
        pass

    def send_notif(self, agent_name: str, notif_msg: bytes) -> None:
        pass

    def make_prepped_xfer(self,
                          xfer_type: str,
                          local_xfer_side_handle: int,
                          local_block_descs_ids: list[int],
                          remote_xfer_side_handle: int,
                          remote_block_descs_ids: list[int],
                          notif_msg: Optional[bytes] = None) -> int:
        return uuid.uuid4().int

    def transfer(self, handle: int) -> str:
        return "PROC"

    ############################################################
    # Follow are for changing the behavior during testing.
    ############################################################

    def set_cycles_before_xfer_done(self, cycles: int):
        """Set the number of cycles before a transfer is considered done."""
        self._cycles_before_xfer_done = cycles

AGENT_METADATA class-attribute instance-attribute

AGENT_METADATA = b'fake_agent_metadata'

REMOTE_AGENT_NAME class-attribute instance-attribute

REMOTE_AGENT_NAME = 'remote_agent'

_check_xfer_state_cycles instance-attribute

_check_xfer_state_cycles: defaultdict[int, int] = (
    defaultdict(lambda: 0)
)

_cycles_before_xfer_done instance-attribute

_cycles_before_xfer_done = 0

__init__

__init__(agent_name: str, *args, **kwargs)
Source code in vllm/mocks/mock_nixl_connector.py
def __init__(self, agent_name: str, *args, **kwargs):
    self._cycles_before_xfer_done = 0
    self._check_xfer_state_cycles: defaultdict[int, int] = defaultdict(
        lambda: 0)

add_remote_agent

add_remote_agent(agent_metadata: bytes) -> str
Source code in vllm/mocks/mock_nixl_connector.py
def add_remote_agent(self, agent_metadata: bytes) -> str:
    return self.REMOTE_AGENT_NAME

check_xfer_state

check_xfer_state(handle: int) -> str
Source code in vllm/mocks/mock_nixl_connector.py
def check_xfer_state(self, handle: int) -> str:
    if self._check_xfer_state_cycles[
            handle] >= self._cycles_before_xfer_done:
        return "DONE"
    self._check_xfer_state_cycles[handle] += 1
    return "PROC"

get_agent_metadata

get_agent_metadata() -> bytes
Source code in vllm/mocks/mock_nixl_connector.py
def get_agent_metadata(self) -> bytes:
    return self.AGENT_METADATA

get_new_notifs

get_new_notifs() -> dict[str, list[bytes]]
Source code in vllm/mocks/mock_nixl_connector.py
def get_new_notifs(self) -> dict[str, list[bytes]]:
    # Used to collect done_sending, which we don't test yet.
    return {}

get_reg_descs

get_reg_descs(caches_data, memory_type: str) -> list
Source code in vllm/mocks/mock_nixl_connector.py
def get_reg_descs(self, caches_data, memory_type: str) -> list:
    return [str(uuid.uuid4()) for _ in caches_data]

get_xfer_descs

get_xfer_descs(blocks_data, memory_type: str) -> list
Source code in vllm/mocks/mock_nixl_connector.py
def get_xfer_descs(self, blocks_data, memory_type: str) -> list:
    return [str(uuid.uuid4()) for _ in blocks_data]

make_prepped_xfer

make_prepped_xfer(
    xfer_type: str,
    local_xfer_side_handle: int,
    local_block_descs_ids: list[int],
    remote_xfer_side_handle: int,
    remote_block_descs_ids: list[int],
    notif_msg: Optional[bytes] = None,
) -> int
Source code in vllm/mocks/mock_nixl_connector.py
def make_prepped_xfer(self,
                      xfer_type: str,
                      local_xfer_side_handle: int,
                      local_block_descs_ids: list[int],
                      remote_xfer_side_handle: int,
                      remote_block_descs_ids: list[int],
                      notif_msg: Optional[bytes] = None) -> int:
    return uuid.uuid4().int

prep_xfer_dlist

prep_xfer_dlist(agent_name: str, descs: list) -> int
Source code in vllm/mocks/mock_nixl_connector.py
def prep_xfer_dlist(self, agent_name: str, descs: list) -> int:
    return uuid.uuid4().int

register_memory

register_memory(descs) -> None
Source code in vllm/mocks/mock_nixl_connector.py
def register_memory(self, descs) -> None:
    pass

release_xfer_handle

release_xfer_handle(handle: int) -> None
Source code in vllm/mocks/mock_nixl_connector.py
def release_xfer_handle(self, handle: int) -> None:
    pass

send_notif

send_notif(agent_name: str, notif_msg: bytes) -> None
Source code in vllm/mocks/mock_nixl_connector.py
def send_notif(self, agent_name: str, notif_msg: bytes) -> None:
    pass

set_cycles_before_xfer_done

set_cycles_before_xfer_done(cycles: int)

Set the number of cycles before a transfer is considered done.

Source code in vllm/mocks/mock_nixl_connector.py
def set_cycles_before_xfer_done(self, cycles: int):
    """Set the number of cycles before a transfer is considered done."""
    self._cycles_before_xfer_done = cycles

transfer

transfer(handle: int) -> str
Source code in vllm/mocks/mock_nixl_connector.py
def transfer(self, handle: int) -> str:
    return "PROC"