Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 80 additions & 18 deletions tensorrt_llm/serve/harmony_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,42 +202,68 @@ def _create_delta_from_parser_state(self) -> dict[str, Any] | None:
self.parser.current_channel):
return {"should_stop": "Repeated message"}

# Check for tool calls first, regardless of channel.
# The model may emit tool calls on either "commentary" or "analysis" channel.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is kind of violating the harmony definition. https://github.com/openai/harmony/blob/main/docs/format.md#receiving-tool-calls

But if the model is behaving like this, we should WAR to unblock our work.

if (self.parser.current_channel in ("commentary", "analysis")
and self.parser.current_recipient
and "functions." in str(self.parser.current_recipient)):
func_name = str(
self.parser.current_recipient).split("functions.")[-1]
self.current_channel_state = "commentary_tool"

# Check if tool is allowed
if self.should_filter_tools and func_name not in self.available_tools:
logger.debug(
f"Request {self.request_id}: tool {func_name} not in available tools"
)
return None

# Get or create tool call
tool_id = self._get_or_create_tool_call(func_name)

# Accumulate arguments
self.tool_calls[tool_id][
"arguments"] += self.parser.last_content_delta

# Create tool call delta - return only the new content delta, not accumulated
return {
"tool_calls": [{
"id": tool_id,
"type": "function",
"function": {
"name": func_name,
"arguments": self.parser.
last_content_delta # Only the new content delta
},
"index": self.tool_calls[tool_id]["index"]
}]
}

if self.parser.current_channel == "analysis":
# Analysis channel -> reasoning (no token wrapping needed)
self.current_channel_state = "analysis"
return {"reasoning": self.parser.last_content_delta}

elif self.parser.current_channel == "commentary":
if self.parser.current_recipient and "functions." in str(
self.parser.current_recipient):
# Tool call in commentary channel
func_name = str(
self.parser.current_recipient).split("functions.")[-1]
if self.parser.current_recipient and self.parser.current_recipient != 'assistant':
# Non-functions tool call (e.g., browser, python)
func_name = str(self.parser.current_recipient)
self.current_channel_state = "commentary_tool"

# Check if tool is allowed
if self.should_filter_tools and func_name not in self.available_tools:
logger.debug(
f"Request {self.request_id}: tool {func_name} not in available tools"
)
return None

# Get or create tool call
tool_id = self._get_or_create_tool_call(func_name)

# Accumulate arguments
self.tool_calls[tool_id][
"arguments"] += self.parser.last_content_delta

# Create tool call delta - return only the new content delta, not accumulated
return {
"tool_calls": [{
"id": tool_id,
"type": "function",
"function": {
"name": func_name,
"arguments": self.parser.
last_content_delta # Only the new content delta
"arguments": self.parser.last_content_delta
},
"index": self.tool_calls[tool_id]["index"]
}]
Expand Down Expand Up @@ -903,7 +929,7 @@ def _parse_tool_call_from_harmony_message(
)
return None
elif msg_content_type and "code" in msg_content_type:
function_name = str(msg_recipient)
function_name = str(msg_recipient).split("functions.")[-1]
return {
"id": f"call_{uuid.uuid4().hex[:8]}",
"type": "function",
Expand Down Expand Up @@ -1050,7 +1076,20 @@ def harmony_output_to_openai(
if not _check_channel_valid(generated_channels, msg_channel):
continue

if msg_channel == "analysis":
# Check for tool calls first, regardless of channel.
# The model may emit tool calls on either "commentary" or "analysis" channel
# (other frameworks handle both channels when recipient starts with "functions.")
if (msg_channel in ("commentary", "analysis") and msg_recipient
and msg_recipient != 'assistant'
and str(msg_recipient).startswith("functions.")):
# Tool call
tool_call = self._parse_tool_call_from_harmony_message(msg)
if tool_call and self._is_tool_call_allowed(
tool_call, external_tools,
should_filter_external_tools):
tool_calls.append(tool_call)

elif msg_channel == "analysis":
for content in msg_content:
if isinstance(content, TextContent):
analysis_content += content.text
Expand All @@ -1061,7 +1100,7 @@ def harmony_output_to_openai(

elif msg_channel == "commentary":
if msg_recipient and msg_recipient != 'assistant':
# Tool call
# Non-functions tool call (e.g., browser, python)
tool_call = self._parse_tool_call_from_harmony_message(
msg)
if tool_call and self._is_tool_call_allowed(
Expand Down Expand Up @@ -1366,6 +1405,7 @@ def create_openai_streaming_response(
# Handle reasoning content
if "reasoning" in harmony_delta:
delta_message.reasoning = harmony_delta["reasoning"]
delta_message.reasoning_content = harmony_delta["reasoning"]
# tool_calls will use default factory (empty list)

# Handle regular content
Expand Down Expand Up @@ -1570,6 +1610,27 @@ def end_streaming(res):
try:
res = []
if done:
# Process any remaining tokens before sending final message
if output.token_ids_diff:
remaining_responses, _ = harmony_adapter.create_openai_streaming_response(
request_id=request_id,
tokens=output.token_ids_diff,
available_tools=tools_for_parser,
model_name=model,
tool_choice=tool_choice)
if first_iteration and remaining_responses:
first_delta = DeltaMessage(role="assistant")
choice = ChatCompletionResponseStreamChoice(
index=0, delta=first_delta)
first_response = ChatCompletionStreamResponse(
model=model,
choices=[choice],
)
response_json = first_response.model_dump_json(
exclude_none=True)
res.append(f"data: {response_json}\n\n")
res.extend(remaining_responses)

# Send final message with finish_reason
final_response = ChatCompletionStreamResponse(
model=model,
Expand Down Expand Up @@ -1708,6 +1769,7 @@ def _create_response_message(parsed_output: dict[str, Any]) -> dict[str, Any]:
# Add reasoning_content if present
if "reasoning" in parsed_output:
message["reasoning"] = parsed_output["reasoning"]
message["reasoning_content"] = parsed_output["reasoning"]

return message

Expand Down
2 changes: 0 additions & 2 deletions tensorrt_llm/serve/openai_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1268,8 +1268,6 @@ async def create_streaming_generator(promise: RequestOutput,
disaggregated_params=disaggregated_params,
trace_headers=trace_headers,
)
postproc_args.request_id = promise.request_id

if not self.postproc_worker_enabled:
postproc_args.num_prompt_tokens = len(promise.prompt_token_ids)

Expand Down
7 changes: 6 additions & 1 deletion tensorrt_llm/serve/postprocess_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,12 +585,17 @@ def chat_harmony_post_processor(
@nvtx_range_debug("chat_harmony_streaming_post_processor")
def chat_harmony_streaming_post_processor(
rsp: GenerationResult, args: ChatCompletionPostprocArgs) -> List[str]:
# Read the request ID directly from rsp.id instead of args.request_id.
# Both are the same executor-assigned ID, but args.request_id is set too
# late (after generate_async returns) for the postprocess worker path:
# the worker receives a copy of args before the ID is assigned, so
# args.request_id is always None with num_postprocess_workers > 0.
response = handle_streaming_response(
tools=args.tools,
tool_choice=args.tool_choice,
result=rsp,
model=args.model,
request_id=args.request_id,
request_id=str(rsp.id),
done=rsp._done,
num_prompt_tokens=args.num_prompt_tokens,
first_iteration=args.first_iteration,
Expand Down