Skip to content

vllm.entrypoints.openai.engine.serving

Classes:

Functions:

OpenAIServing

Bases: BaseServing, BeamSearchOnlineMixin

Source code in vllm/entrypoints/openai/engine/serving.py
class OpenAIServing(BaseServing, BeamSearchOnlineMixin):
    request_id_prefix: ClassVar[str] = """
    A short string prepended to every request’s ID.
    """

    def __init__(
        self,
        engine_client: EngineClient,
        models: OpenAIServingModels,
        *,
        request_logger: RequestLogger | None,
        return_tokens_as_token_ids: bool = False,
    ):
        super().__init__(
            models=models,
            model_config=engine_client.model_config,
            request_logger=request_logger,
        )

        self.engine_client = engine_client
        self.return_tokens_as_token_ids = return_tokens_as_token_ids
        self.renderer = engine_client.renderer
        self.input_processor = engine_client.input_processor
        vllm_config = getattr(engine_client, "vllm_config", None)
        kv_transfer_config = getattr(vllm_config, "kv_transfer_config", None)
        self.has_kv_connector = kv_transfer_config is not None

        # Computed once at startup (cached by ``vllm_config`` identity) and
        # stamped on non-streaming responses. Streaming chunks deliberately
        # omit it to avoid per-chunk overhead.
        from vllm.entrypoints.serve.utils.fingerprint import get_system_fingerprint

        try:
            self.system_fingerprint: str | None = get_system_fingerprint(
                engine_client.vllm_config
            )
        except Exception:
            # Never fail server startup over the fingerprint.
            self.system_fingerprint = None

    def create_streaming_error_response(
        self,
        message: str | Exception,
        err_type: str = "BadRequestError",
        status_code: HTTPStatus = HTTPStatus.BAD_REQUEST,
        param: str | None = None,
    ) -> str:
        json_str = json.dumps(
            self.create_error_response(
                message=message,
                err_type=err_type,
                status_code=status_code,
                param=param,
            ).model_dump()
        )
        return json_str

    def _raise_if_error(self, finish_reason: str | None, request_id: str) -> None:
        """Raise GenerationError if finish_reason indicates an error."""
        if finish_reason == "error":
            logger.error(
                "Request %s failed with an internal error during generation",
                request_id,
            )
            raise GenerationError("Internal server error")

    def _convert_generation_error_to_streaming_response(
        self, e: GenerationError
    ) -> str:
        """Convert GenerationError to streaming error response."""
        return self.create_streaming_error_response(
            str(e),
            err_type="InternalServerError",
            status_code=e.status_code,
        )

    async def _get_trace_headers(
        self,
        headers: Headers,
    ) -> Mapping[str, str] | None:
        is_tracing_enabled = await self.engine_client.is_tracing_enabled()

        if is_tracing_enabled:
            return extract_trace_headers(headers)

        if contains_trace_headers(headers):
            log_tracing_disabled_warning()

        return None

    @staticmethod
    def _get_data_parallel_rank(raw_request: Request | None) -> int | None:
        """Pulls the data parallel rank from a header, if provided"""
        if raw_request is None:
            return None

        rank_str = raw_request.headers.get("X-data-parallel-rank")
        if rank_str is None:
            return None

        try:
            return int(rank_str)
        except ValueError:
            return None

    async def _with_kv_transfer_rejection_cleanup(
        self,
        awaitable: Awaitable[_T],
        request: ChatCompletionRequest | CompletionRequest | ResponsesRequest,
        raw_request: Request | None,
    ) -> _T:
        """Wrap a `create_*` coroutine so that, if it raises or returns an
        ErrorResponse (i.e. the request never reached the engine), the KV
        connector is notified to free any pinned remote-prefill blocks."""
        kv_transfer_params = self.has_kv_connector and request.kv_transfer_params
        if not kv_transfer_params or not kv_transfer_params.get("do_remote_prefill"):
            return await awaitable

        notify = True
        try:
            result = await awaitable
            if not isinstance(result, ErrorResponse):
                notify = False
            return result
        finally:
            if notify:
                try:
                    await self.engine_client.notify_kv_transfer_request_rejected(
                        request.request_id,
                        kv_transfer_params,
                        data_parallel_rank=self._get_data_parallel_rank(raw_request),
                    )
                except Exception:
                    logger.warning(
                        "Failed to notify KV connector about rejected request %s",
                        request.request_id,
                        exc_info=True,
                    )

    @staticmethod
    def _get_decoded_token(
        logprob: Logprob,
        token_id: int,
        tokenizer: TokenizerLike | None,
        return_as_token_id: bool = False,
    ) -> str:
        if return_as_token_id:
            return format_token_id_placeholder(token_id)

        if logprob.decoded_token is not None:
            return logprob.decoded_token

        if tokenizer is None:
            raise ValueError(
                "Unable to get tokenizer because `skip_tokenizer_init=True`"
            )

        return tokenizer.decode([token_id])

_convert_generation_error_to_streaming_response(e)

Convert GenerationError to streaming error response.

Source code in vllm/entrypoints/openai/engine/serving.py
def _convert_generation_error_to_streaming_response(
    self, e: GenerationError
) -> str:
    """Convert GenerationError to streaming error response."""
    return self.create_streaming_error_response(
        str(e),
        err_type="InternalServerError",
        status_code=e.status_code,
    )

_get_data_parallel_rank(raw_request) staticmethod

Pulls the data parallel rank from a header, if provided

Source code in vllm/entrypoints/openai/engine/serving.py
@staticmethod
def _get_data_parallel_rank(raw_request: Request | None) -> int | None:
    """Pulls the data parallel rank from a header, if provided"""
    if raw_request is None:
        return None

    rank_str = raw_request.headers.get("X-data-parallel-rank")
    if rank_str is None:
        return None

    try:
        return int(rank_str)
    except ValueError:
        return None

_raise_if_error(finish_reason, request_id)

Raise GenerationError if finish_reason indicates an error.

Source code in vllm/entrypoints/openai/engine/serving.py
def _raise_if_error(self, finish_reason: str | None, request_id: str) -> None:
    """Raise GenerationError if finish_reason indicates an error."""
    if finish_reason == "error":
        logger.error(
            "Request %s failed with an internal error during generation",
            request_id,
        )
        raise GenerationError("Internal server error")

_with_kv_transfer_rejection_cleanup(awaitable, request, raw_request) async

Wrap a create_* coroutine so that, if it raises or returns an ErrorResponse (i.e. the request never reached the engine), the KV connector is notified to free any pinned remote-prefill blocks.

Source code in vllm/entrypoints/openai/engine/serving.py
async def _with_kv_transfer_rejection_cleanup(
    self,
    awaitable: Awaitable[_T],
    request: ChatCompletionRequest | CompletionRequest | ResponsesRequest,
    raw_request: Request | None,
) -> _T:
    """Wrap a `create_*` coroutine so that, if it raises or returns an
    ErrorResponse (i.e. the request never reached the engine), the KV
    connector is notified to free any pinned remote-prefill blocks."""
    kv_transfer_params = self.has_kv_connector and request.kv_transfer_params
    if not kv_transfer_params or not kv_transfer_params.get("do_remote_prefill"):
        return await awaitable

    notify = True
    try:
        result = await awaitable
        if not isinstance(result, ErrorResponse):
            notify = False
        return result
    finally:
        if notify:
            try:
                await self.engine_client.notify_kv_transfer_request_rejected(
                    request.request_id,
                    kv_transfer_params,
                    data_parallel_rank=self._get_data_parallel_rank(raw_request),
                )
            except Exception:
                logger.warning(
                    "Failed to notify KV connector about rejected request %s",
                    request.request_id,
                    exc_info=True,
                )

resolve_token_id_placeholder(token, tokenizer)

Decode a 'token_id:N' placeholder back to a token string and UTF-8 bytes.

Returns (token, None) unchanged if token is not a placeholder. This is the inverse of format_token_id_placeholder / _get_decoded_token when return_as_token_id=True.

Source code in vllm/entrypoints/openai/engine/serving.py
def resolve_token_id_placeholder(
    token: str, tokenizer: TokenizerLike
) -> tuple[str, list[int] | None]:
    """Decode a 'token_id:N' placeholder back to a token string and UTF-8 bytes.

    Returns (token, None) unchanged if token is not a placeholder.
    This is the inverse of format_token_id_placeholder / _get_decoded_token
    when return_as_token_id=True.
    """
    suffix = token.removeprefix("token_id:")
    if suffix == token:
        return token, None
    try:
        token_id = int(suffix)
    except ValueError:
        return token, None
    token_repr = tokenizer.convert_ids_to_tokens([token_id])[0]
    if token_repr is None:
        logger.warning_once(
            "resolve_token_id_placeholder: token_id %d has no vocab entry; "
            "substituting empty string",
            token_id,
        )
        return "", None
    token_str = tokenizer.convert_tokens_to_string([token_repr])
    return token_str, list(token_str.encode("utf-8", errors="replace"))