From eef936d9c0594ff2bd5c4329e0ddded5a30a10ae Mon Sep 17 00:00:00 2001 From: Dongfeng Yu Date: Sat, 7 Mar 2026 03:22:17 +0000 Subject: [PATCH 1/4] Fix parser channel issue Signed-off-by: Dongfeng Yu --- tensorrt_llm/serve/harmony_adapter.py | 75 ++++++++++++++++++++------- 1 file changed, 57 insertions(+), 18 deletions(-) diff --git a/tensorrt_llm/serve/harmony_adapter.py b/tensorrt_llm/serve/harmony_adapter.py index 883483897ce..f5168b786ee 100644 --- a/tensorrt_llm/serve/harmony_adapter.py +++ b/tensorrt_llm/serve/harmony_adapter.py @@ -202,42 +202,68 @@ def _create_delta_from_parser_state(self) -> dict[str, Any] | None: self.parser.current_channel): return {"should_stop": "Repeated message"} + # Check for tool calls first, regardless of channel. + # The model may emit tool calls on either "commentary" or "analysis" channel. + if (self.parser.current_channel in ("commentary", "analysis") + and self.parser.current_recipient + and "functions." in str(self.parser.current_recipient)): + func_name = str( + self.parser.current_recipient).split("functions.")[-1] + self.current_channel_state = "commentary_tool" + + # Check if tool is allowed + if self.should_filter_tools and func_name not in self.available_tools: + logger.debug( + f"Request {self.request_id}: tool {func_name} not in available tools" + ) + return None + + # Get or create tool call + tool_id = self._get_or_create_tool_call(func_name) + + # Accumulate arguments + self.tool_calls[tool_id][ + "arguments"] += self.parser.last_content_delta + + # Create tool call delta - return only the new content delta, not accumulated + return { + "tool_calls": [{ + "id": tool_id, + "type": "function", + "function": { + "name": func_name, + "arguments": self.parser. + last_content_delta # Only the new content delta + }, + "index": self.tool_calls[tool_id]["index"] + }] + } + if self.parser.current_channel == "analysis": # Analysis channel -> reasoning (no token wrapping needed) self.current_channel_state = "analysis" return {"reasoning": self.parser.last_content_delta} elif self.parser.current_channel == "commentary": - if self.parser.current_recipient and "functions." in str( - self.parser.current_recipient): - # Tool call in commentary channel - func_name = str( - self.parser.current_recipient).split("functions.")[-1] + if self.parser.current_recipient and self.parser.current_recipient != 'assistant': + # Non-functions tool call (e.g., browser, python) + func_name = str(self.parser.current_recipient) self.current_channel_state = "commentary_tool" - # Check if tool is allowed if self.should_filter_tools and func_name not in self.available_tools: - logger.debug( - f"Request {self.request_id}: tool {func_name} not in available tools" - ) return None - # Get or create tool call tool_id = self._get_or_create_tool_call(func_name) - - # Accumulate arguments self.tool_calls[tool_id][ "arguments"] += self.parser.last_content_delta - # Create tool call delta - return only the new content delta, not accumulated return { "tool_calls": [{ "id": tool_id, "type": "function", "function": { "name": func_name, - "arguments": self.parser. - last_content_delta # Only the new content delta + "arguments": self.parser.last_content_delta }, "index": self.tool_calls[tool_id]["index"] }] @@ -903,7 +929,7 @@ def _parse_tool_call_from_harmony_message( ) return None elif msg_content_type and "code" in msg_content_type: - function_name = str(msg_recipient) + function_name = str(msg_recipient).split("functions.")[-1] return { "id": f"call_{uuid.uuid4().hex[:8]}", "type": "function", @@ -1050,7 +1076,20 @@ def harmony_output_to_openai( if not _check_channel_valid(generated_channels, msg_channel): continue - if msg_channel == "analysis": + # Check for tool calls first, regardless of channel. + # The model may emit tool calls on either "commentary" or "analysis" channel + # (other frameworks handle both channels when recipient starts with "functions.") + if (msg_channel in ("commentary", "analysis") and msg_recipient + and msg_recipient != 'assistant' + and str(msg_recipient).startswith("functions.")): + # Tool call + tool_call = self._parse_tool_call_from_harmony_message(msg) + if tool_call and self._is_tool_call_allowed( + tool_call, external_tools, + should_filter_external_tools): + tool_calls.append(tool_call) + + elif msg_channel == "analysis": for content in msg_content: if isinstance(content, TextContent): analysis_content += content.text @@ -1061,7 +1100,7 @@ def harmony_output_to_openai( elif msg_channel == "commentary": if msg_recipient and msg_recipient != 'assistant': - # Tool call + # Non-functions tool call (e.g., browser, python) tool_call = self._parse_tool_call_from_harmony_message( msg) if tool_call and self._is_tool_call_allowed( From 92a65bbf805449dc2007d44a2ab9f18249b37243 Mon Sep 17 00:00:00 2001 From: Dongfeng Yu Date: Sat, 7 Mar 2026 05:08:15 +0000 Subject: [PATCH 2/4] Fix reasoning content missing Signed-off-by: Dongfeng Yu --- tensorrt_llm/serve/harmony_adapter.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tensorrt_llm/serve/harmony_adapter.py b/tensorrt_llm/serve/harmony_adapter.py index f5168b786ee..d27482c0daa 100644 --- a/tensorrt_llm/serve/harmony_adapter.py +++ b/tensorrt_llm/serve/harmony_adapter.py @@ -1405,6 +1405,7 @@ def create_openai_streaming_response( # Handle reasoning content if "reasoning" in harmony_delta: delta_message.reasoning = harmony_delta["reasoning"] + delta_message.reasoning_content = harmony_delta["reasoning"] # tool_calls will use default factory (empty list) # Handle regular content @@ -1747,6 +1748,7 @@ def _create_response_message(parsed_output: dict[str, Any]) -> dict[str, Any]: # Add reasoning_content if present if "reasoning" in parsed_output: message["reasoning"] = parsed_output["reasoning"] + message["reasoning_content"] = parsed_output["reasoning"] return message From 81f33181cdac7c5754ad5224d1011ff98fd8e930 Mon Sep 17 00:00:00 2001 From: Dongfeng Yu Date: Sat, 7 Mar 2026 05:29:39 +0000 Subject: [PATCH 3/4] Harmony streaming drop Signed-off-by: Dongfeng Yu --- tensorrt_llm/serve/harmony_adapter.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tensorrt_llm/serve/harmony_adapter.py b/tensorrt_llm/serve/harmony_adapter.py index d27482c0daa..9d3f8a7603e 100644 --- a/tensorrt_llm/serve/harmony_adapter.py +++ b/tensorrt_llm/serve/harmony_adapter.py @@ -1610,6 +1610,27 @@ def end_streaming(res): try: res = [] if done: + # Process any remaining tokens before sending final message + if output.token_ids_diff: + remaining_responses, _ = harmony_adapter.create_openai_streaming_response( + request_id=request_id, + tokens=output.token_ids_diff, + available_tools=tools_for_parser, + model_name=model, + tool_choice=tool_choice) + if first_iteration and remaining_responses: + first_delta = DeltaMessage(role="assistant") + choice = ChatCompletionResponseStreamChoice( + index=0, delta=first_delta) + first_response = ChatCompletionStreamResponse( + model=model, + choices=[choice], + ) + response_json = first_response.model_dump_json( + exclude_none=True) + res.append(f"data: {response_json}\n\n") + res.extend(remaining_responses) + # Send final message with finish_reason final_response = ChatCompletionStreamResponse( model=model, From 246e773ceb5833fe4d2757a26d336be53e302167 Mon Sep 17 00:00:00 2001 From: Dongfeng Yu Date: Mon, 9 Mar 2026 22:59:51 +0000 Subject: [PATCH 4/4] Fix state key for post process worker Signed-off-by: Dongfeng Yu --- tensorrt_llm/serve/openai_server.py | 2 -- tensorrt_llm/serve/postprocess_handlers.py | 7 ++++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tensorrt_llm/serve/openai_server.py b/tensorrt_llm/serve/openai_server.py index 4ab06057210..98886a7b3cc 100644 --- a/tensorrt_llm/serve/openai_server.py +++ b/tensorrt_llm/serve/openai_server.py @@ -1268,8 +1268,6 @@ async def create_streaming_generator(promise: RequestOutput, disaggregated_params=disaggregated_params, trace_headers=trace_headers, ) - postproc_args.request_id = promise.request_id - if not self.postproc_worker_enabled: postproc_args.num_prompt_tokens = len(promise.prompt_token_ids) diff --git a/tensorrt_llm/serve/postprocess_handlers.py b/tensorrt_llm/serve/postprocess_handlers.py index bacce813b6b..ff3e6c81d2d 100644 --- a/tensorrt_llm/serve/postprocess_handlers.py +++ b/tensorrt_llm/serve/postprocess_handlers.py @@ -585,12 +585,17 @@ def chat_harmony_post_processor( @nvtx_range_debug("chat_harmony_streaming_post_processor") def chat_harmony_streaming_post_processor( rsp: GenerationResult, args: ChatCompletionPostprocArgs) -> List[str]: + # Read the request ID directly from rsp.id instead of args.request_id. + # Both are the same executor-assigned ID, but args.request_id is set too + # late (after generate_async returns) for the postprocess worker path: + # the worker receives a copy of args before the ID is assigned, so + # args.request_id is always None with num_postprocess_workers > 0. response = handle_streaming_response( tools=args.tools, tool_choice=args.tool_choice, result=rsp, model=args.model, - request_id=args.request_id, + request_id=str(rsp.id), done=rsp._done, num_prompt_tokens=args.num_prompt_tokens, first_iteration=args.first_iteration,