Skip to content

vllm.parser.harmony

Classes:

HarmonyParser

Bases: DelegatingParser

Methods:

  • parse

    Parse Harmony output from token IDs.

Source code in vllm/parser/harmony.py
class HarmonyParser(DelegatingParser):
    def __init__(self, tokenizer, tools=None, *args, **kwargs):
        super().__init__(tokenizer, tools, *args, **kwargs)

        if self.reasoning_parser and not isinstance(
            self.reasoning_parser, GptOssReasoningParser
        ):
            raise ValueError(
                "Harmony requires GptOssReasoningParser, "
                f"got {self.reasoning_parser.__class__.__name__}."
            )

        if self.tool_parser and not isinstance(self.tool_parser, GptOssToolParser):
            raise ValueError(
                "Harmony requires GptOssToolParser, "
                f"got {self.tool_parser.__class__.__name__}."
            )

        self._harmony_parser = get_streamable_parser_for_assistant()
        self._next_tool_call_index = 0
        self._num_processed_messages = 0

    @property
    def state(self) -> HarmonyStreamState:
        return self._harmony_parser.state

    @property
    def current_role(self) -> Role | None:
        return self._harmony_parser.current_role

    @property
    def current_channel(self) -> str | None:
        return self._harmony_parser.current_channel

    @property
    def current_recipient(self) -> str | None:
        return self._harmony_parser.current_recipient

    @property
    def current_content(self) -> str:
        return self._harmony_parser.current_content

    @property
    def current_content_type(self) -> str | None:
        return self._harmony_parser.current_content_type

    def parse(
        self,
        model_output: str,
        request: ChatCompletionRequest | ResponsesRequest,
        enable_auto_tools: bool = False,
        model_output_token_ids: Sequence[int] = (),
    ) -> tuple[str | None, str | None, list[FunctionCall] | None]:
        """Parse Harmony output from token IDs.

        Tool calls are always extracted regardless of ``enable_auto_tools``.
        Callers must decide whether to surface them.
        """
        result = self.process_chunk(model_output_token_ids)

        reasoning_parts: list[str] = []
        content_parts: list[str] = []
        tool_calls: list[FunctionCall] = []

        def _append_parsed_message(
            channel: str | None,
            recipient: str | None,
            text: str,
            content_type: str | None = None,
        ) -> None:
            segment_type = _SegmentType.from_channel_and_recipient(channel, recipient)
            match segment_type:
                case _SegmentType.REASONING if self.reasoning_parser and text:
                    reasoning_parts.append(text)
                case _SegmentType.CONTENT if text:
                    content_parts.append(text)
                case _SegmentType.TOOL if self.tool_parser:
                    assert recipient is not None
                    if content_type is not None and "json" not in content_type:
                        arguments = text
                    else:
                        try:
                            arguments = json.dumps(json.loads(text))
                        except json.JSONDecodeError:
                            arguments = text
                    tool_calls.append(
                        FunctionCall(
                            name=extract_function_from_recipient(recipient),
                            arguments=arguments,
                        )
                    )

        for segment in result.segments:
            msg = segment.completed_message
            if msg is None:
                continue
            if msg.author.role != "assistant" or not msg.content:
                continue
            _append_parsed_message(
                channel=msg.channel,
                recipient=msg.recipient,
                text=msg.content[0].text,
                content_type=msg.content_type,
            )

        if (
            self.current_channel is not None
            or self.current_recipient is not None
            or self.current_content
        ):
            _append_parsed_message(
                channel=self.current_channel,
                recipient=self.current_recipient,
                text=self.current_content,
                content_type=self.current_content_type,
            )

        reasoning = "\n".join(reasoning_parts) or None
        content = "\n".join(content_parts) or None
        return reasoning, content, tool_calls or None

    def parse_delta(
        self,
        delta_text: str,
        delta_token_ids: list[int],
        request: ChatCompletionRequest | ResponsesRequest,
        prompt_token_ids: list[int] | None = None,
        *,
        finished: bool,
    ) -> DeltaMessage | None:
        prev_recipient = self.current_recipient
        result = self.process_chunk(delta_token_ids)
        combined_content = ""
        combined_reasoning = ""
        tool_messages: list[DeltaToolCall] = []

        for segment in result.segments:
            if segment.completed_message is not None:
                prev_recipient = None
                continue

            segment_type = _SegmentType.from_channel_and_recipient(
                segment.channel, segment.recipient
            )
            match segment_type:
                case _SegmentType.REASONING if self.reasoning_parser:
                    combined_reasoning += segment.delta
                case _SegmentType.CONTENT:
                    combined_content += segment.delta
                case _SegmentType.TOOL if self.tool_parser:
                    assert segment.recipient is not None
                    if prev_recipient != segment.recipient:
                        tool_name = extract_function_from_recipient(segment.recipient)
                        tool_messages.append(
                            DeltaToolCall(
                                # HarmonyParser does not use _stream_state;
                                # "random" tool_call_id_type is always used
                                id=make_tool_call_id(),
                                type="function",
                                function=DeltaFunctionCall(
                                    name=tool_name,
                                    arguments=segment.delta,
                                ),
                                index=self._next_tool_call_index,
                            )
                        )
                        self._next_tool_call_index += 1
                        prev_recipient = segment.recipient
                    elif segment.delta:
                        idx = self._next_tool_call_index - 1
                        if tool_messages:
                            tool_msg = tool_messages[-1]
                            assert tool_msg.index == idx
                            fn = tool_msg.function
                            assert fn is not None and fn.arguments is not None
                            fn.arguments += segment.delta
                        else:
                            tool_messages.append(
                                DeltaToolCall(
                                    index=idx,
                                    function=DeltaFunctionCall(arguments=segment.delta),
                                )
                            )

        if not combined_content and not combined_reasoning and not tool_messages:
            return None

        delta_message = DeltaMessage()
        if combined_content:
            delta_message.content = combined_content
        if combined_reasoning:
            delta_message.reasoning = combined_reasoning
        if tool_messages:
            delta_message.tool_calls = tool_messages
        return delta_message

    def process_chunk(self, token_ids: Sequence[int]) -> ChunkResult:
        if not token_ids:
            return ChunkResult(segments=[], reasoning_token_count=0)

        segments: list[Segment] = []
        reasoning_token_count = 0
        for token_id in token_ids:
            self._harmony_parser.process(token_id)
            channel = self.current_channel
            recipient = self.current_recipient
            delta = self._harmony_parser.last_content_delta or ""
            completed_message = None
            _messages = self._harmony_parser.messages
            if len(_messages) > self._num_processed_messages:
                completed_message = _messages[self._num_processed_messages]
                self._num_processed_messages += 1

            if channel == "analysis" or (
                channel == "commentary" and recipient is not None
            ):
                reasoning_token_count += 1

            segments.append(
                Segment(
                    channel=channel,
                    recipient=recipient,
                    delta=delta,
                    completed_message=completed_message,
                )
            )

            # TODO: Optionally merge and suppress empty Segments

        return ChunkResult(
            segments=segments,
            reasoning_token_count=reasoning_token_count,
        )

parse(model_output, request, enable_auto_tools=False, model_output_token_ids=())

Parse Harmony output from token IDs.

Tool calls are always extracted regardless of enable_auto_tools. Callers must decide whether to surface them.

Source code in vllm/parser/harmony.py
def parse(
    self,
    model_output: str,
    request: ChatCompletionRequest | ResponsesRequest,
    enable_auto_tools: bool = False,
    model_output_token_ids: Sequence[int] = (),
) -> tuple[str | None, str | None, list[FunctionCall] | None]:
    """Parse Harmony output from token IDs.

    Tool calls are always extracted regardless of ``enable_auto_tools``.
    Callers must decide whether to surface them.
    """
    result = self.process_chunk(model_output_token_ids)

    reasoning_parts: list[str] = []
    content_parts: list[str] = []
    tool_calls: list[FunctionCall] = []

    def _append_parsed_message(
        channel: str | None,
        recipient: str | None,
        text: str,
        content_type: str | None = None,
    ) -> None:
        segment_type = _SegmentType.from_channel_and_recipient(channel, recipient)
        match segment_type:
            case _SegmentType.REASONING if self.reasoning_parser and text:
                reasoning_parts.append(text)
            case _SegmentType.CONTENT if text:
                content_parts.append(text)
            case _SegmentType.TOOL if self.tool_parser:
                assert recipient is not None
                if content_type is not None and "json" not in content_type:
                    arguments = text
                else:
                    try:
                        arguments = json.dumps(json.loads(text))
                    except json.JSONDecodeError:
                        arguments = text
                tool_calls.append(
                    FunctionCall(
                        name=extract_function_from_recipient(recipient),
                        arguments=arguments,
                    )
                )

    for segment in result.segments:
        msg = segment.completed_message
        if msg is None:
            continue
        if msg.author.role != "assistant" or not msg.content:
            continue
        _append_parsed_message(
            channel=msg.channel,
            recipient=msg.recipient,
            text=msg.content[0].text,
            content_type=msg.content_type,
        )

    if (
        self.current_channel is not None
        or self.current_recipient is not None
        or self.current_content
    ):
        _append_parsed_message(
            channel=self.current_channel,
            recipient=self.current_recipient,
            text=self.current_content,
            content_type=self.current_content_type,
        )

    reasoning = "\n".join(reasoning_parts) or None
    content = "\n".join(content_parts) or None
    return reasoning, content, tool_calls or None