Skip to content

vllm.utils.async_utils

Contains helpers related to asynchronous code.

This is similar in concept to the asyncio module.

Functions:

collect_from_async_generator(iterator) async

Collect all items from an async generator into a list.

Source code in vllm/utils/async_utils.py
async def collect_from_async_generator(iterator: AsyncGenerator[T, None]) -> list[T]:
    """Collect all items from an async generator into a list."""
    items = []
    async for item in iterator:
        items.append(item)
    return items

make_async(func, executor=None)

Take a blocking function, and run it on in an executor thread.

This function prevents the blocking function from blocking the asyncio event loop. The code in this function needs to be thread safe.

Source code in vllm/utils/async_utils.py
def make_async(
    func: Callable[P, T],
    executor: Executor | None = None,
) -> Callable[P, Awaitable[T]]:
    """
    Take a blocking function, and run it on in an executor thread.

    This function prevents the blocking function from blocking the
    asyncio event loop.
    The code in this function needs to be thread safe.
    """

    def _async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Future[T]:
        loop = asyncio.get_event_loop()
        p_func = partial(func, *args, **kwargs)
        return loop.run_in_executor(executor=executor, func=p_func)

    return _async_wrapper

make_async_with_semaphore(func, executor)

Take a blocking function, and run it on in an executor thread.

This function prevents the blocking function from blocking the asyncio event loop. The code in this function needs to be thread safe.

The function is wrapped in a semaphore to limit the number of concurrent executions making it easier to cancel tasks before they start.

Source code in vllm/utils/async_utils.py
def make_async_with_semaphore(
    func: Callable[P, T],
    executor: ThreadPoolExecutor,
) -> Callable[P, Awaitable[T]]:
    """
    Take a blocking function, and run it on in an executor thread.

    This function prevents the blocking function from blocking the
    asyncio event loop.
    The code in this function needs to be thread safe.

    The function is wrapped in a semaphore to limit the number of
    concurrent executions making it easier to cancel tasks before they start.
    """

    semaphore = asyncio.Semaphore(executor._max_workers)

    async def _async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
        loop = asyncio.get_event_loop()
        p_func = partial(func, *args, **kwargs)
        async with semaphore:
            return await loop.run_in_executor(executor, p_func)

    return _async_wrapper

merge_async_iterators(*iterators) async

Merge multiple asynchronous iterators into a single iterator.

This method handle the case where some iterators finish before others. When it yields, it yields a tuple (i, item) where i is the index of the iterator that yields the item.

Source code in vllm/utils/async_utils.py
async def merge_async_iterators(
    *iterators: AsyncGenerator[T, None],
) -> AsyncGenerator[tuple[int, T], None]:
    """Merge multiple asynchronous iterators into a single iterator.

    This method handle the case where some iterators finish before others.
    When it yields, it yields a tuple (i, item) where i is the index of the
    iterator that yields the item.
    """
    if len(iterators) == 1:
        # Fast-path single iterator case.
        async for item in iterators[0]:
            yield 0, item
        return

    loop = asyncio.get_running_loop()

    awaits = {loop.create_task(anext(it)): (i, it) for i, it in enumerate(iterators)}
    try:
        while awaits:
            done, _ = await asyncio.wait(awaits.keys(), return_when=FIRST_COMPLETED)
            for d in done:
                pair = awaits.pop(d)
                try:
                    item = await d
                    i, it = pair
                    awaits[loop.create_task(anext(it))] = pair
                    yield i, item
                except StopAsyncIteration:
                    pass
    finally:
        # Cancel any remaining iterators
        for f, (_, it) in awaits.items():
            with contextlib.suppress(BaseException):
                f.cancel()
                await it.aclose()