class HarmonyParser(DelegatingParser):
def __init__(self, tokenizer, tools=None, *args, **kwargs):
super().__init__(tokenizer, tools, *args, **kwargs)
if self.reasoning_parser and not isinstance(
self.reasoning_parser, GptOssReasoningParser
):
raise ValueError(
"Harmony requires GptOssReasoningParser, "
f"got {self.reasoning_parser.__class__.__name__}."
)
if self.tool_parser and not isinstance(self.tool_parser, GptOssToolParser):
raise ValueError(
"Harmony requires GptOssToolParser, "
f"got {self.tool_parser.__class__.__name__}."
)
self._harmony_parser = get_streamable_parser_for_assistant()
self._next_tool_call_index = 0
self._num_processed_messages = 0
@property
def state(self) -> HarmonyStreamState:
return self._harmony_parser.state
@property
def current_role(self) -> Role | None:
return self._harmony_parser.current_role
@property
def current_channel(self) -> str | None:
return self._harmony_parser.current_channel
@property
def current_recipient(self) -> str | None:
return self._harmony_parser.current_recipient
@property
def current_content(self) -> str:
return self._harmony_parser.current_content
@property
def current_content_type(self) -> str | None:
return self._harmony_parser.current_content_type
def parse(
self,
model_output: str,
request: ChatCompletionRequest | ResponsesRequest,
enable_auto_tools: bool = False,
model_output_token_ids: Sequence[int] = (),
) -> tuple[str | None, str | None, list[FunctionCall] | None]:
"""Parse Harmony output from token IDs.
Tool calls are always extracted regardless of ``enable_auto_tools``.
Callers must decide whether to surface them.
"""
result = self.process_chunk(model_output_token_ids)
reasoning_parts: list[str] = []
content_parts: list[str] = []
tool_calls: list[FunctionCall] = []
def _append_parsed_message(
channel: str | None,
recipient: str | None,
text: str,
content_type: str | None = None,
) -> None:
segment_type = _SegmentType.from_channel_and_recipient(channel, recipient)
match segment_type:
case _SegmentType.REASONING if self.reasoning_parser and text:
reasoning_parts.append(text)
case _SegmentType.CONTENT if text:
content_parts.append(text)
case _SegmentType.TOOL if self.tool_parser:
assert recipient is not None
if content_type is not None and "json" not in content_type:
arguments = text
else:
try:
arguments = json.dumps(json.loads(text))
except json.JSONDecodeError:
arguments = text
tool_calls.append(
FunctionCall(
name=extract_function_from_recipient(recipient),
arguments=arguments,
)
)
for segment in result.segments:
msg = segment.completed_message
if msg is None:
continue
if msg.author.role != "assistant" or not msg.content:
continue
_append_parsed_message(
channel=msg.channel,
recipient=msg.recipient,
text=msg.content[0].text,
content_type=msg.content_type,
)
if (
self.current_channel is not None
or self.current_recipient is not None
or self.current_content
):
_append_parsed_message(
channel=self.current_channel,
recipient=self.current_recipient,
text=self.current_content,
content_type=self.current_content_type,
)
reasoning = "\n".join(reasoning_parts) or None
content = "\n".join(content_parts) or None
return reasoning, content, tool_calls or None
def parse_delta(
self,
delta_text: str,
delta_token_ids: list[int],
request: ChatCompletionRequest | ResponsesRequest,
prompt_token_ids: list[int] | None = None,
*,
finished: bool,
) -> DeltaMessage | None:
prev_recipient = self.current_recipient
result = self.process_chunk(delta_token_ids)
combined_content = ""
combined_reasoning = ""
tool_messages: list[DeltaToolCall] = []
for segment in result.segments:
if segment.completed_message is not None:
prev_recipient = None
continue
segment_type = _SegmentType.from_channel_and_recipient(
segment.channel, segment.recipient
)
match segment_type:
case _SegmentType.REASONING if self.reasoning_parser:
combined_reasoning += segment.delta
case _SegmentType.CONTENT:
combined_content += segment.delta
case _SegmentType.TOOL if self.tool_parser:
assert segment.recipient is not None
if prev_recipient != segment.recipient:
tool_name = extract_function_from_recipient(segment.recipient)
tool_messages.append(
DeltaToolCall(
# HarmonyParser does not use _stream_state;
# "random" tool_call_id_type is always used
id=make_tool_call_id(),
type="function",
function=DeltaFunctionCall(
name=tool_name,
arguments=segment.delta,
),
index=self._next_tool_call_index,
)
)
self._next_tool_call_index += 1
prev_recipient = segment.recipient
elif segment.delta:
idx = self._next_tool_call_index - 1
if tool_messages:
tool_msg = tool_messages[-1]
assert tool_msg.index == idx
fn = tool_msg.function
assert fn is not None and fn.arguments is not None
fn.arguments += segment.delta
else:
tool_messages.append(
DeltaToolCall(
index=idx,
function=DeltaFunctionCall(arguments=segment.delta),
)
)
if not combined_content and not combined_reasoning and not tool_messages:
return None
delta_message = DeltaMessage()
if combined_content:
delta_message.content = combined_content
if combined_reasoning:
delta_message.reasoning = combined_reasoning
if tool_messages:
delta_message.tool_calls = tool_messages
return delta_message
def process_chunk(self, token_ids: Sequence[int]) -> ChunkResult:
if not token_ids:
return ChunkResult(segments=[], reasoning_token_count=0)
segments: list[Segment] = []
reasoning_token_count = 0
for token_id in token_ids:
self._harmony_parser.process(token_id)
channel = self.current_channel
recipient = self.current_recipient
delta = self._harmony_parser.last_content_delta or ""
completed_message = None
_messages = self._harmony_parser.messages
if len(_messages) > self._num_processed_messages:
completed_message = _messages[self._num_processed_messages]
self._num_processed_messages += 1
if channel == "analysis" or (
channel == "commentary" and recipient is not None
):
reasoning_token_count += 1
segments.append(
Segment(
channel=channel,
recipient=recipient,
delta=delta,
completed_message=completed_message,
)
)
# TODO: Optionally merge and suppress empty Segments
return ChunkResult(
segments=segments,
reasoning_token_count=reasoning_token_count,
)