diff --git a/agentpress/base_processors.py b/agentpress/base_processors.py new file mode 100644 index 000000000..7b017c125 --- /dev/null +++ b/agentpress/base_processors.py @@ -0,0 +1,137 @@ +import logging +from typing import Dict, Any, Callable, List, Optional, Set +from abc import ABC, abstractmethod + +# --- Tool Parser Base --- + +class ToolParserBase(ABC): + """Abstract base class defining the interface for parsing tool calls from LLM responses. + + This class provides the foundational interface for parsing both complete and streaming + responses from Language Models, specifically focusing on tool call extraction and processing. + + Attributes: + None + + Methods: + parse_response: Processes complete LLM responses + parse_stream: Handles streaming response chunks + """ + + @abstractmethod + async def parse_response(self, response: Any) -> Dict[str, Any]: + """Parse a complete LLM response and extract tool calls. + + Args: + response (Any): The complete response from the LLM + + Returns: + Dict[str, Any]: A dictionary containing: + - role: The message role (usually 'assistant') + - content: The text content of the response + - tool_calls: List of extracted tool calls (if present) + """ + pass + + @abstractmethod + async def parse_stream(self, response_chunk: Any, tool_calls_buffer: Dict[int, Dict]) -> tuple[Optional[Dict[str, Any]], bool]: + """Parse a streaming response chunk and manage tool call accumulation. + + Args: + response_chunk (Any): A single chunk from the streaming response + tool_calls_buffer (Dict[int, Dict]): Buffer storing incomplete tool calls + + Returns: + tuple[Optional[Dict[str, Any]], bool]: A tuple containing: + - The parsed message if complete tool calls are found (or None) + - Boolean indicating if the stream is complete + """ + pass + +# --- Tool Executor Base --- + +class ToolExecutorBase(ABC): + """Abstract base class defining the interface for tool execution strategies. + + This class provides the foundation for implementing different tool execution + approaches, supporting both parallel and sequential execution patterns. + + Attributes: + None + + Methods: + execute_tool_calls: Main entry point for tool execution + _execute_parallel: Handles parallel tool execution + _execute_sequential: Handles sequential tool execution + """ + + @abstractmethod + async def execute_tool_calls( + self, + tool_calls: List[Dict[str, Any]], + available_functions: Dict[str, Callable], + thread_id: str, + executed_tool_calls: Optional[Set[str]] = None + ) -> List[Dict[str, Any]]: + """Execute a list of tool calls and return their results. + + Args: + tool_calls: List of tool calls to execute + available_functions: Dictionary of available tool functions + thread_id: ID of the current conversation thread + executed_tool_calls: Set of already executed tool call IDs + + Returns: + List[Dict[str, Any]]: List of tool execution results + """ + pass + + @abstractmethod + async def _execute_parallel( + self, + tool_calls: List[Dict[str, Any]], + available_functions: Dict[str, Callable], + thread_id: str, + executed_tool_calls: Set[str] + ) -> List[Dict[str, Any]]: + """Execute tool calls in parallel.""" + pass + + @abstractmethod + async def _execute_sequential( + self, + tool_calls: List[Dict[str, Any]], + available_functions: Dict[str, Callable], + thread_id: str, + executed_tool_calls: Set[str] + ) -> List[Dict[str, Any]]: + """Execute tool calls sequentially.""" + pass + +# --- Results Adder Base --- + +class ResultsAdderBase(ABC): + """Abstract base class for handling tool results and message processing.""" + + def __init__(self, thread_manager): + """Initialize with a ThreadManager instance. + + Args: + thread_manager: The ThreadManager instance to use for message operations + """ + self.add_message = thread_manager.add_message + self.update_message = thread_manager._update_message + self.list_messages = thread_manager.list_messages + self.message_added = False + + @abstractmethod + async def add_initial_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + pass + + @abstractmethod + async def update_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + pass + + @abstractmethod + async def add_tool_result(self, thread_id: str, result: Dict[str, Any]): + pass diff --git a/agentpress/examples/example_agent/agent.py b/agentpress/examples/example_agent/agent.py index 5a41deca8..87fc07aa4 100644 --- a/agentpress/examples/example_agent/agent.py +++ b/agentpress/examples/example_agent/agent.py @@ -16,7 +16,7 @@ async def run_agent(thread_id: str, max_iterations: int = 5): thread_manager = ThreadManager() state_manager = StateManager() - # Initialize tools with XML schema support + # Initialize tools thread_manager.add_tool(FilesTool) thread_manager.add_tool(TerminalTool) @@ -60,7 +60,9 @@ async def finalizer(): # file contents here # -# +# +# text to replace +# replacement text # # @@ -71,7 +73,7 @@ async def finalizer(): "content": """ You are a world-class web developer who can create, edit, and delete files, and execute terminal commands. You write clean, well-structured code. Keep iterating on existing files, continue working on this existing codebase - do not omit previous progress; instead, keep iterating. -FORMAT: +RESPONSE FORMAT: Use XML tags to specify file operations: @@ -141,11 +143,6 @@ async def finalizer(): model_name = "anthropic/claude-3-5-sonnet-latest" - registry = thread_manager.tool_registry - tool_parser = XMLToolParser(tool_registry=registry) - tool_executor = XMLToolExecutor(parallel=True, tool_registry=registry) - results_adder = XMLResultsAdder(thread_manager) - response = await thread_manager.run_thread( thread_id=thread_id, system_message=system_message, @@ -154,15 +151,11 @@ async def finalizer(): max_tokens=8096, tool_choice="auto", temporary_message=state_message, - use_tools=True, - native_tool_calling=False, - execute_tools=True, + native_tool_calling=True, + xml_tool_calling=False, stream=True, - immediate_tool_execution=True, - parallel_tool_execution=True, - tool_parser=tool_parser, - tool_executor=tool_executor, - results_adder=results_adder + execute_tools_on_stream=True, + parallel_tool_execution=True ) if isinstance(response, AsyncGenerator): diff --git a/agentpress/examples/example_agent/workspace/index.html b/agentpress/examples/example_agent/workspace/index.html new file mode 100644 index 000000000..abf96d64f --- /dev/null +++ b/agentpress/examples/example_agent/workspace/index.html @@ -0,0 +1,85 @@ + + + + + + Modern Landing Page + + + +
+ +
+ +
+
+
+

Welcome to the Future

+

Transform your digital presence with our innovative solutions

+
+ + +
+
+
+
+ +
+

Our Features

+
+
+
🚀
+

Fast Performance

+

Lightning-quick load times and smooth interactions

+
+
+
🎨
+

Beautiful Design

+

Stunning visuals that capture attention

+
+
+
📱
+

Responsive

+

Perfect display on all devices

+
+
+
+ +
+
+

About Us

+

We're dedicated to creating exceptional digital experiences that drive results.

+
+
+ +
+

Get in Touch

+
+ + + + +
+
+
+ +
+

© 2024 Brand. All rights reserved.

+
+ + + + \ No newline at end of file diff --git a/agentpress/examples/example_agent/workspace/script.js b/agentpress/examples/example_agent/workspace/script.js new file mode 100644 index 000000000..fdc5510a4 --- /dev/null +++ b/agentpress/examples/example_agent/workspace/script.js @@ -0,0 +1,61 @@ +document.addEventListener('DOMContentLoaded', () => { + const mobileNavToggle = document.querySelector('.mobile-nav-toggle'); + const navLinks = document.querySelector('.nav-links'); + const header = document.querySelector('.header'); + let lastScroll = 0; + + mobileNavToggle.addEventListener('click', () => { + navLinks.classList.toggle('active'); + const spans = mobileNavToggle.querySelectorAll('span'); + spans[0].style.transform = navLinks.classList.contains('active') ? 'rotate(45deg) translate(8px, 8px)' : ''; + spans[1].style.opacity = navLinks.classList.contains('active') ? '0' : '1'; + spans[2].style.transform = navLinks.classList.contains('active') ? 'rotate(-45deg) translate(7px, -7px)' : ''; + }); + + window.addEventListener('scroll', () => { + const currentScroll = window.pageYOffset; + + if (currentScroll <= 0) { + header.style.transform = 'translateY(0)'; + return; + } + + if (currentScroll > lastScroll && !header.classList.contains('scroll-down')) { + header.style.transform = 'translateY(-100%)'; + } else if (currentScroll < lastScroll && header.classList.contains('scroll-down')) { + header.style.transform = 'translateY(0)'; + } + + lastScroll = currentScroll; + }); + + const observerOptions = { + threshold: 0.1, + rootMargin: '0px 0px -50px 0px' + }; + + const observer = new IntersectionObserver((entries) => { + entries.forEach(entry => { + if (entry.isIntersecting) { + entry.target.style.opacity = '1'; + entry.target.style.transform = 'translateY(0)'; + } + }); + }, observerOptions); + + document.querySelectorAll('section').forEach(section => { + section.style.opacity = '0'; + section.style.transform = 'translateY(20px)'; + section.style.transition = 'opacity 0.6s ease-out, transform 0.6s ease-out'; + observer.observe(section); + }); + + const form = document.querySelector('.contact-form'); + form.addEventListener('submit', (e) => { + e.preventDefault(); + const formData = new FormData(form); + const data = Object.fromEntries(formData); + console.log('Form submitted:', data); + form.reset(); + }); +}); \ No newline at end of file diff --git a/agentpress/examples/example_agent/workspace/styles.css b/agentpress/examples/example_agent/workspace/styles.css new file mode 100644 index 000000000..9e05405cc --- /dev/null +++ b/agentpress/examples/example_agent/workspace/styles.css @@ -0,0 +1,343 @@ +:root { + --primary-color: #6366f1; + --secondary-color: #4f46e5; + --text-color: #18181b; + --light-text: #71717a; + --background: #ffffff; + --glass-bg: rgba(255, 255, 255, 0.7); + --glass-border: rgba(255, 255, 255, 0.3); + --section-padding: 5rem 2rem; + --gradient-1: linear-gradient(135deg, #6366f1 0%, #4f46e5 100%); + --gradient-2: linear-gradient(135deg, #c084fc 0%, #a855f7 100%); + --shadow-1: 0 10px 30px -10px rgba(99, 102, 241, 0.2); + --shadow-2: 0 20px 40px -15px rgba(99, 102, 241, 0.1); +} + +* { + margin: 0; + padding: 0; + box-sizing: border-box; +} + +html { + scroll-behavior: smooth; +} + +body { + font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; + line-height: 1.6; + color: var(--text-color); +} + +.header { + position: fixed; + width: 100%; + background: var(--glass-bg); + backdrop-filter: blur(10px); + border-bottom: 1px solid var(--glass-border); + box-shadow: var(--shadow-1); + z-index: 1000; +} + +.nav { + display: flex; + justify-content: space-between; + align-items: center; + padding: 1rem 2rem; + max-width: 1200px; + margin: 0 auto; +} + +.logo { + font-size: 1.5rem; + font-weight: bold; + color: var(--primary-color); +} + +.nav-links { + display: flex; + gap: 2rem; + list-style: none; +} + +.nav-links a { + text-decoration: none; + color: var(--text-color); + font-weight: 500; + transition: color 0.3s ease; +} + +.nav-links a:hover { + color: var(--primary-color); +} + +.mobile-nav-toggle { + display: none; +} + +.hero { + height: 100vh; + display: flex; + align-items: center; + justify-content: center; + text-align: center; + background: var(--gradient-1); + position: relative; + overflow: hidden; + padding: var(--section-padding); +} + +.hero::before { + content: ''; + position: absolute; + width: 150%; + height: 150%; + background: radial-gradient(circle, rgba(255,255,255,0.1) 0%, transparent 60%); + animation: rotate 20s linear infinite; +} + +@keyframes rotate { + from { transform: rotate(0deg); } + to { transform: rotate(360deg); } +} + +.hero-content { + max-width: 800px; + position: relative; + z-index: 1; + background: var(--glass-bg); + backdrop-filter: blur(10px); + padding: 3rem; + border-radius: 1rem; + border: 1px solid var(--glass-border); + box-shadow: var(--shadow-2); +} + +.hero h1 { + font-size: 3.5rem; + margin-bottom: 1rem; + line-height: 1.2; +} + +.hero p { + font-size: 1.25rem; + color: var(--light-text); + margin-bottom: 2rem; +} + +.cta-group { + display: flex; + gap: 1rem; + justify-content: center; + margin-top: 2rem; +} + +.cta-button { + padding: 1rem 2rem; + font-size: 1.1rem; + border: none; + border-radius: 0.5rem; + cursor: pointer; + transition: all 0.3s ease; + position: relative; + overflow: hidden; +} + +.cta-button::before { + content: ''; + position: absolute; + top: 50%; + left: 50%; + width: 0; + height: 0; + background: rgba(255, 255, 255, 0.2); + border-radius: 50%; + transform: translate(-50%, -50%); + transition: width 0.6s ease, height 0.6s ease; +} + +.cta-button:hover::before { + width: 300%; + height: 300%; +} + +.cta-button.primary { + background: var(--gradient-1); + color: white; + box-shadow: var(--shadow-1); +} + +.cta-button.secondary { + background: var(--glass-bg); + color: var(--primary-color); + border: 1px solid var(--primary-color); +} + +.cta-button:hover { + background: var(--secondary-color); +} + +.features { + padding: var(--section-padding); + background: var(--background); +} + +.features h2 { + text-align: center; + margin-bottom: 3rem; + font-size: 2.5rem; +} + +.features-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); + gap: 2rem; + max-width: 1200px; + margin: 0 auto; +} + +.feature-card { + padding: 2rem; + text-align: center; + background: var(--glass-bg); + backdrop-filter: blur(10px); + border: 1px solid var(--glass-border); + border-radius: 1rem; + transition: all 0.3s ease; + box-shadow: var(--shadow-1); +} + +.feature-card:hover { + transform: translateY(-5px) scale(1.02); + box-shadow: var(--shadow-2); + background: var(--gradient-2); + color: white; +} + +.feature-icon { + font-size: 2.5rem; + margin-bottom: 1rem; +} + +.about { + padding: var(--section-padding); + background: #f3f4f6; +} + +.about-content { + max-width: 800px; + margin: 0 auto; + text-align: center; +} + +.about h2 { + font-size: 2.5rem; + margin-bottom: 1.5rem; +} + +.contact { + padding: var(--section-padding); + background: var(--background); +} + +.contact h2 { + text-align: center; + margin-bottom: 3rem; + font-size: 2.5rem; +} + +.contact-form { + display: flex; + flex-direction: column; + gap: 1rem; + max-width: 600px; + margin: 0 auto; +} + +.contact-form input, +.contact-form textarea { + padding: 1rem; + background: var(--glass-bg); + backdrop-filter: blur(10px); + border: 1px solid var(--glass-border); + border-radius: 0.5rem; + font-size: 1rem; + transition: all 0.3s ease; +} + +.contact-form input:focus, +.contact-form textarea:focus { + outline: none; + border-color: var(--primary-color); + box-shadow: 0 0 0 3px rgba(99, 102, 241, 0.2); + transform: translateY(-2px); +} + +.contact-form textarea { + height: 150px; + resize: vertical; +} + +.contact-form button { + padding: 1rem; + background: var(--primary-color); + color: white; + border: none; + border-radius: 0.5rem; + cursor: pointer; + transition: background-color 0.3s ease; +} + +.contact-form button:hover { + background: var(--secondary-color); +} + +.footer { + padding: 2rem; + text-align: center; + background: #f3f4f6; +} + +@media (max-width: 768px) { + .nav-links { + display: none; + position: absolute; + top: 100%; + left: 0; + right: 0; + background: var(--background); + padding: 1rem; + flex-direction: column; + align-items: center; + gap: 1rem; + } + + .nav-links.active { + display: flex; + } + + .mobile-nav-toggle { + display: block; + background: none; + border: none; + cursor: pointer; + padding: 0.5rem; + } + + .mobile-nav-toggle span { + display: block; + width: 25px; + height: 3px; + background: var(--text-color); + margin: 5px 0; + transition: 0.3s; + } + + .hero h1 { + font-size: 2.5rem; + } + + .features-grid { + grid-template-columns: 1fr; + } +} \ No newline at end of file diff --git a/agentpress/llm_response_processor.py b/agentpress/llm_response_processor.py new file mode 100644 index 000000000..6a622cebb --- /dev/null +++ b/agentpress/llm_response_processor.py @@ -0,0 +1,179 @@ +import asyncio +from typing import Callable, Dict, Any, AsyncGenerator, Optional +import logging +from agentpress.base_processors import ToolParserBase, ToolExecutorBase, ResultsAdderBase +from agentpress.standard_tool_parser import StandardToolParser +from agentpress.standard_tool_executor import StandardToolExecutor +from agentpress.standard_results_adder import StandardResultsAdder + +# --- Response Processor --- + +class LLMResponseProcessor: + """Handles LLM response processing and tool execution management.""" + + def __init__( + self, + thread_id: str, + available_functions: Dict = None, + add_message_callback: Callable = None, + update_message_callback: Callable = None, + list_messages_callback: Callable = None, + parallel_tool_execution: bool = True, + threads_dir: str = "threads", + tool_parser: Optional[ToolParserBase] = None, + tool_executor: Optional[ToolExecutorBase] = None, + results_adder: Optional[ResultsAdderBase] = None, + thread_manager = None # Add thread_manager parameter + ): + self.thread_id = thread_id + self.tool_executor = tool_executor or StandardToolExecutor(parallel=parallel_tool_execution) + self.tool_parser = tool_parser or StandardToolParser() + self.available_functions = available_functions or {} + self.threads_dir = threads_dir + + # Create a minimal thread manager if none provided + if thread_manager is None and (add_message_callback and update_message_callback and list_messages_callback): + class MinimalThreadManager: + def __init__(self, add_msg, update_msg, list_msg): + self.add_message = add_msg + self._update_message = update_msg + self.list_messages = list_msg + thread_manager = MinimalThreadManager(add_message_callback, update_message_callback, list_messages_callback) + + # Initialize results adder + self.results_adder = results_adder or StandardResultsAdder(thread_manager) + + # State tracking for streaming responses + self.tool_calls_buffer = {} + self.processed_tool_calls = set() + self.content_buffer = "" + self.tool_calls_accumulated = [] + + async def process_stream( + self, + response_stream: AsyncGenerator, + execute_tools: bool = True, + execute_tools_on_stream: bool = True + ) -> AsyncGenerator: + """Process streaming LLM response and handle tool execution.""" + pending_tool_calls = [] + background_tasks = set() + + async def handle_message_management(chunk): + try: + # Accumulate content + if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content: + self.content_buffer += chunk.choices[0].delta.content + + # Parse tool calls if present + if hasattr(chunk.choices[0].delta, 'tool_calls'): + parsed_message, is_complete = await self.tool_parser.parse_stream( + chunk, + self.tool_calls_buffer + ) + if parsed_message and 'tool_calls' in parsed_message: + self.tool_calls_accumulated = parsed_message['tool_calls'] + + # Handle tool execution and results + if execute_tools and self.tool_calls_accumulated: + new_tool_calls = [ + tool_call for tool_call in self.tool_calls_accumulated + if tool_call['id'] not in self.processed_tool_calls + ] + + if new_tool_calls: + if execute_tools_on_stream: + results = await self.tool_executor.execute_tool_calls( + tool_calls=new_tool_calls, + available_functions=self.available_functions, + thread_id=self.thread_id, + executed_tool_calls=self.processed_tool_calls + ) + for result in results: + await self.results_adder.add_tool_result(self.thread_id, result) + self.processed_tool_calls.add(result['tool_call_id']) + else: + pending_tool_calls.extend(new_tool_calls) + + # Add/update assistant message + message = { + "role": "assistant", + "content": self.content_buffer + } + if self.tool_calls_accumulated: + message["tool_calls"] = self.tool_calls_accumulated + + if not hasattr(self, '_message_added'): + await self.results_adder.add_initial_response( + self.thread_id, + self.content_buffer, + self.tool_calls_accumulated + ) + self._message_added = True + else: + await self.results_adder.update_response( + self.thread_id, + self.content_buffer, + self.tool_calls_accumulated + ) + + # Handle stream completion + if chunk.choices[0].finish_reason: + if not execute_tools_on_stream and pending_tool_calls: + results = await self.tool_executor.execute_tool_calls( + tool_calls=pending_tool_calls, + available_functions=self.available_functions, + thread_id=self.thread_id, + executed_tool_calls=self.processed_tool_calls + ) + for result in results: + await self.results_adder.add_tool_result(self.thread_id, result) + self.processed_tool_calls.add(result['tool_call_id']) + pending_tool_calls.clear() + + except Exception as e: + logging.error(f"Error in background task: {e}") + + try: + async for chunk in response_stream: + task = asyncio.create_task(handle_message_management(chunk)) + background_tasks.add(task) + task.add_done_callback(background_tasks.discard) + yield chunk + + if background_tasks: + await asyncio.gather(*background_tasks, return_exceptions=True) + + except Exception as e: + logging.error(f"Error in stream processing: {e}") + for task in background_tasks: + if not task.done(): + task.cancel() + raise + + async def process_response(self, response: Any, execute_tools: bool = True) -> None: + """Process complete LLM response and execute tools.""" + try: + assistant_message = await self.tool_parser.parse_response(response) + await self.results_adder.add_initial_response( + self.thread_id, + assistant_message['content'], + assistant_message.get('tool_calls') + ) + + if execute_tools and 'tool_calls' in assistant_message and assistant_message['tool_calls']: + results = await self.tool_executor.execute_tool_calls( + tool_calls=assistant_message['tool_calls'], + available_functions=self.available_functions, + thread_id=self.thread_id, + executed_tool_calls=self.processed_tool_calls + ) + + for result in results: + await self.results_adder.add_tool_result(self.thread_id, result) + logging.info(f"Tool execution result: {result}") + + except Exception as e: + logging.error(f"Error processing response: {e}") + response_content = response.choices[0].message.get('content', '') + await self.results_adder.add_initial_response(self.thread_id, response_content) diff --git a/agentpress/standard_results_adder.py b/agentpress/standard_results_adder.py new file mode 100644 index 000000000..94605c1d2 --- /dev/null +++ b/agentpress/standard_results_adder.py @@ -0,0 +1,41 @@ +from typing import Dict, Any, List, Optional +from agentpress.base_processors import ResultsAdderBase + +# --- Standard Results Adder Implementation --- + +class StandardResultsAdder(ResultsAdderBase): + """Standard implementation for handling tool results and message processing.""" + + def __init__(self, thread_manager): + """Initialize with ThreadManager instance.""" + super().__init__(thread_manager) # Use base class initialization + + async def add_initial_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + message = { + "role": "assistant", + "content": content + } + if tool_calls: + message["tool_calls"] = tool_calls + + await self.add_message(thread_id, message) + self.message_added = True + + async def update_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + if not self.message_added: + await self.add_initial_response(thread_id, content, tool_calls) + return + + message = { + "role": "assistant", + "content": content + } + if tool_calls: + message["tool_calls"] = tool_calls + + await self.update_message(thread_id, message) + + async def add_tool_result(self, thread_id: str, result: Dict[str, Any]): + messages = await self.list_messages(thread_id) + if not any(msg.get('tool_call_id') == result['tool_call_id'] for msg in messages): + await self.add_message(thread_id, result) diff --git a/agentpress/standard_tool_executor.py b/agentpress/standard_tool_executor.py new file mode 100644 index 000000000..e61460f88 --- /dev/null +++ b/agentpress/standard_tool_executor.py @@ -0,0 +1,151 @@ +import asyncio +import json +import logging +from typing import Dict, Any, List, Set, Callable, Optional +from agentpress.base_processors import ToolExecutorBase +from agentpress.tool import ToolResult + +# --- Standard Tool Executor Implementation --- + +class StandardToolExecutor(ToolExecutorBase): + """Standard implementation of tool execution with configurable strategies. + + Provides a flexible tool execution implementation that supports both parallel + and sequential execution patterns, with built-in error handling and result + formatting. + + Attributes: + parallel (bool): Whether to execute tools in parallel + + Methods: + execute_tool_calls: Main execution entry point + _execute_parallel: Parallel execution implementation + _execute_sequential: Sequential execution implementation + """ + + def __init__(self, parallel: bool = True): + self.parallel = parallel + + async def execute_tool_calls( + self, + tool_calls: List[Dict[str, Any]], + available_functions: Dict[str, Callable], + thread_id: str, + executed_tool_calls: Optional[Set[str]] = None + ) -> List[Dict[str, Any]]: + if executed_tool_calls is None: + executed_tool_calls = set() + + if self.parallel: + return await self._execute_parallel( + tool_calls, + available_functions, + thread_id, + executed_tool_calls + ) + else: + return await self._execute_sequential( + tool_calls, + available_functions, + thread_id, + executed_tool_calls + ) + + async def _execute_parallel( + self, + tool_calls: List[Dict[str, Any]], + available_functions: Dict[str, Callable], + thread_id: str, + executed_tool_calls: Set[str] + ) -> List[Dict[str, Any]]: + async def execute_single_tool(tool_call: Dict[str, Any]) -> Dict[str, Any]: + if tool_call['id'] in executed_tool_calls: + return None + + try: + function_name = tool_call['function']['name'] + function_args = tool_call['function']['arguments'] + if isinstance(function_args, str): + function_args = json.loads(function_args) + + function_to_call = available_functions.get(function_name) + if not function_to_call: + error_msg = f"Function {function_name} not found" + logging.error(error_msg) + return { + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(ToolResult(success=False, output=error_msg)) + } + + result = await function_to_call(**function_args) + logging.info(f"Tool execution result for {function_name}: {result}") + executed_tool_calls.add(tool_call['id']) + + return { + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(result) + } + except Exception as e: + error_msg = f"Error executing {function_name}: {str(e)}" + logging.error(error_msg) + return { + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(ToolResult(success=False, output=error_msg)) + } + + tasks = [execute_single_tool(tool_call) for tool_call in tool_calls] + results = await asyncio.gather(*tasks) + return [r for r in results if r is not None] + + async def _execute_sequential( + self, + tool_calls: List[Dict[str, Any]], + available_functions: Dict[str, Callable], + thread_id: str, + executed_tool_calls: Set[str] + ) -> List[Dict[str, Any]]: + results = [] + for tool_call in tool_calls: + if tool_call['id'] in executed_tool_calls: + continue + + try: + function_name = tool_call['function']['name'] + function_args = tool_call['function']['arguments'] + if isinstance(function_args, str): + function_args = json.loads(function_args) + + function_to_call = available_functions.get(function_name) + if not function_to_call: + error_msg = f"Function {function_name} not found" + logging.error(error_msg) + result = ToolResult(success=False, output=error_msg) + else: + result = await function_to_call(**function_args) + logging.info(f"Tool execution result for {function_name}: {result}") + executed_tool_calls.add(tool_call['id']) + + results.append({ + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(result) + }) + except Exception as e: + error_msg = f"Error executing {function_name}: {str(e)}" + logging.error(error_msg) + results.append({ + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(ToolResult(success=False, output=error_msg)) + }) + + return results + diff --git a/agentpress/standard_tool_parser.py b/agentpress/standard_tool_parser.py new file mode 100644 index 000000000..12a1a026b --- /dev/null +++ b/agentpress/standard_tool_parser.py @@ -0,0 +1,103 @@ +import json +from typing import Dict, Any, Optional +from agentpress.base_processors import ToolParserBase + +# --- Standard Tool Parser Implementation --- + +class StandardToolParser(ToolParserBase): + """Standard implementation of tool parsing for OpenAI-compatible API responses. + + This implementation handles the parsing of tool calls from responses that follow + the OpenAI API format, supporting both complete and streaming responses. + + Methods: + parse_response: Process complete LLM responses + parse_stream: Handle streaming response chunks + """ + + async def parse_response(self, response: Any) -> Dict[str, Any]: + response_message = response.choices[0].message + message = { + "role": "assistant", + "content": response_message.get('content') or "", + } + + tool_calls = response_message.get('tool_calls') + if tool_calls: + message["tool_calls"] = [ + { + "id": tool_call.id, + "type": "function", + "function": { + "name": tool_call.function.name, + "arguments": tool_call.function.arguments + } + } for tool_call in tool_calls + ] + + return message + + async def parse_stream(self, chunk: Any, tool_calls_buffer: Dict[int, Dict]) -> tuple[Optional[Dict[str, Any]], bool]: + content_chunk = "" + is_complete = False + has_complete_tool_call = False + + if hasattr(chunk.choices[0], 'delta'): + delta = chunk.choices[0].delta + + if hasattr(delta, 'content') and delta.content: + content_chunk = delta.content + + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + idx = tool_call.index + if idx not in tool_calls_buffer: + tool_calls_buffer[idx] = { + 'id': tool_call.id if hasattr(tool_call, 'id') and tool_call.id else None, + 'type': 'function', + 'function': { + 'name': tool_call.function.name if hasattr(tool_call.function, 'name') and tool_call.function.name else None, + 'arguments': '' + } + } + + current_tool = tool_calls_buffer[idx] + if hasattr(tool_call, 'id') and tool_call.id: + current_tool['id'] = tool_call.id + if hasattr(tool_call.function, 'name') and tool_call.function.name: + current_tool['function']['name'] = tool_call.function.name + if hasattr(tool_call.function, 'arguments') and tool_call.function.arguments: + current_tool['function']['arguments'] += tool_call.function.arguments + + if (current_tool['id'] and + current_tool['function']['name'] and + current_tool['function']['arguments']): + try: + json.loads(current_tool['function']['arguments']) + has_complete_tool_call = True + except json.JSONDecodeError: + pass + + if hasattr(chunk.choices[0], 'finish_reason') and chunk.choices[0].finish_reason: + is_complete = True + + if has_complete_tool_call or is_complete: + complete_tool_calls = [] + for idx, tool_call in tool_calls_buffer.items(): + try: + if (tool_call['id'] and + tool_call['function']['name'] and + tool_call['function']['arguments']): + json.loads(tool_call['function']['arguments']) + complete_tool_calls.append(tool_call) + except json.JSONDecodeError: + continue + + if complete_tool_calls: + return { + "role": "assistant", + "content": content_chunk, + "tool_calls": complete_tool_calls + }, is_complete + + return None, is_complete diff --git a/agentpress/thread_llm_response_processor.py b/agentpress/thread_llm_response_processor.py deleted file mode 100644 index 6e3db91a5..000000000 --- a/agentpress/thread_llm_response_processor.py +++ /dev/null @@ -1,596 +0,0 @@ -import logging -from typing import Dict, Any, AsyncGenerator, Callable, List, Optional, Set -from abc import ABC, abstractmethod -import asyncio -import json -from dataclasses import dataclass -from agentpress.tool import ToolResult - -# --- Tool Parser Base --- - -class ToolParserBase(ABC): - """Abstract base class defining the interface for parsing tool calls from LLM responses. - - This class provides the foundational interface for parsing both complete and streaming - responses from Language Models, specifically focusing on tool call extraction and processing. - - Attributes: - None - - Methods: - parse_response: Processes complete LLM responses - parse_stream: Handles streaming response chunks - """ - - @abstractmethod - async def parse_response(self, response: Any) -> Dict[str, Any]: - """Parse a complete LLM response and extract tool calls. - - Args: - response (Any): The complete response from the LLM - - Returns: - Dict[str, Any]: A dictionary containing: - - role: The message role (usually 'assistant') - - content: The text content of the response - - tool_calls: List of extracted tool calls (if present) - """ - pass - - @abstractmethod - async def parse_stream(self, response_chunk: Any, tool_calls_buffer: Dict[int, Dict]) -> tuple[Optional[Dict[str, Any]], bool]: - """Parse a streaming response chunk and manage tool call accumulation. - - Args: - response_chunk (Any): A single chunk from the streaming response - tool_calls_buffer (Dict[int, Dict]): Buffer storing incomplete tool calls - - Returns: - tuple[Optional[Dict[str, Any]], bool]: A tuple containing: - - The parsed message if complete tool calls are found (or None) - - Boolean indicating if the stream is complete - """ - pass - -# --- Tool Executor Base --- - -class ToolExecutorBase(ABC): - """Abstract base class defining the interface for tool execution strategies. - - This class provides the foundation for implementing different tool execution - approaches, supporting both parallel and sequential execution patterns. - - Attributes: - None - - Methods: - execute_tool_calls: Main entry point for tool execution - _execute_parallel: Handles parallel tool execution - _execute_sequential: Handles sequential tool execution - """ - - @abstractmethod - async def execute_tool_calls( - self, - tool_calls: List[Dict[str, Any]], - available_functions: Dict[str, Callable], - thread_id: str, - executed_tool_calls: Optional[Set[str]] = None - ) -> List[Dict[str, Any]]: - """Execute a list of tool calls and return their results. - - Args: - tool_calls: List of tool calls to execute - available_functions: Dictionary of available tool functions - thread_id: ID of the current conversation thread - executed_tool_calls: Set of already executed tool call IDs - - Returns: - List[Dict[str, Any]]: List of tool execution results - """ - pass - - @abstractmethod - async def _execute_parallel( - self, - tool_calls: List[Dict[str, Any]], - available_functions: Dict[str, Callable], - thread_id: str, - executed_tool_calls: Set[str] - ) -> List[Dict[str, Any]]: - """Execute tool calls in parallel.""" - pass - - @abstractmethod - async def _execute_sequential( - self, - tool_calls: List[Dict[str, Any]], - available_functions: Dict[str, Callable], - thread_id: str, - executed_tool_calls: Set[str] - ) -> List[Dict[str, Any]]: - """Execute tool calls sequentially.""" - pass - -# --- Standard Tool Parser Implementation --- - -class StandardToolParser(ToolParserBase): - """Standard implementation of tool parsing for OpenAI-compatible API responses. - - This implementation handles the parsing of tool calls from responses that follow - the OpenAI API format, supporting both complete and streaming responses. - - Methods: - parse_response: Process complete LLM responses - parse_stream: Handle streaming response chunks - """ - - async def parse_response(self, response: Any) -> Dict[str, Any]: - response_message = response.choices[0].message - message = { - "role": "assistant", - "content": response_message.get('content') or "", - } - - tool_calls = response_message.get('tool_calls') - if tool_calls: - message["tool_calls"] = [ - { - "id": tool_call.id, - "type": "function", - "function": { - "name": tool_call.function.name, - "arguments": tool_call.function.arguments - } - } for tool_call in tool_calls - ] - - return message - - async def parse_stream(self, chunk: Any, tool_calls_buffer: Dict[int, Dict]) -> tuple[Optional[Dict[str, Any]], bool]: - content_chunk = "" - is_complete = False - has_complete_tool_call = False - - if hasattr(chunk.choices[0], 'delta'): - delta = chunk.choices[0].delta - - if hasattr(delta, 'content') and delta.content: - content_chunk = delta.content - - if hasattr(delta, 'tool_calls') and delta.tool_calls: - for tool_call in delta.tool_calls: - idx = tool_call.index - if idx not in tool_calls_buffer: - tool_calls_buffer[idx] = { - 'id': tool_call.id if hasattr(tool_call, 'id') and tool_call.id else None, - 'type': 'function', - 'function': { - 'name': tool_call.function.name if hasattr(tool_call.function, 'name') and tool_call.function.name else None, - 'arguments': '' - } - } - - current_tool = tool_calls_buffer[idx] - if hasattr(tool_call, 'id') and tool_call.id: - current_tool['id'] = tool_call.id - if hasattr(tool_call.function, 'name') and tool_call.function.name: - current_tool['function']['name'] = tool_call.function.name - if hasattr(tool_call.function, 'arguments') and tool_call.function.arguments: - current_tool['function']['arguments'] += tool_call.function.arguments - - if (current_tool['id'] and - current_tool['function']['name'] and - current_tool['function']['arguments']): - try: - json.loads(current_tool['function']['arguments']) - has_complete_tool_call = True - except json.JSONDecodeError: - pass - - if hasattr(chunk.choices[0], 'finish_reason') and chunk.choices[0].finish_reason: - is_complete = True - - if has_complete_tool_call or is_complete: - complete_tool_calls = [] - for idx, tool_call in tool_calls_buffer.items(): - try: - if (tool_call['id'] and - tool_call['function']['name'] and - tool_call['function']['arguments']): - json.loads(tool_call['function']['arguments']) - complete_tool_calls.append(tool_call) - except json.JSONDecodeError: - continue - - if complete_tool_calls: - return { - "role": "assistant", - "content": content_chunk, - "tool_calls": complete_tool_calls - }, is_complete - - return None, is_complete - -# --- Standard Tool Executor Implementation --- - -class StandardToolExecutor(ToolExecutorBase): - """Standard implementation of tool execution with configurable strategies. - - Provides a flexible tool execution implementation that supports both parallel - and sequential execution patterns, with built-in error handling and result - formatting. - - Attributes: - parallel (bool): Whether to execute tools in parallel - - Methods: - execute_tool_calls: Main execution entry point - _execute_parallel: Parallel execution implementation - _execute_sequential: Sequential execution implementation - """ - - def __init__(self, parallel: bool = True): - self.parallel = parallel - - async def execute_tool_calls( - self, - tool_calls: List[Dict[str, Any]], - available_functions: Dict[str, Callable], - thread_id: str, - executed_tool_calls: Optional[Set[str]] = None - ) -> List[Dict[str, Any]]: - if executed_tool_calls is None: - executed_tool_calls = set() - - if self.parallel: - return await self._execute_parallel( - tool_calls, - available_functions, - thread_id, - executed_tool_calls - ) - else: - return await self._execute_sequential( - tool_calls, - available_functions, - thread_id, - executed_tool_calls - ) - - async def _execute_parallel( - self, - tool_calls: List[Dict[str, Any]], - available_functions: Dict[str, Callable], - thread_id: str, - executed_tool_calls: Set[str] - ) -> List[Dict[str, Any]]: - async def execute_single_tool(tool_call: Dict[str, Any]) -> Dict[str, Any]: - if tool_call['id'] in executed_tool_calls: - return None - - try: - function_name = tool_call['function']['name'] - function_args = tool_call['function']['arguments'] - if isinstance(function_args, str): - function_args = json.loads(function_args) - - function_to_call = available_functions.get(function_name) - if not function_to_call: - error_msg = f"Function {function_name} not found" - logging.error(error_msg) - return { - "role": "tool", - "tool_call_id": tool_call['id'], - "name": function_name, - "content": str(ToolResult(success=False, output=error_msg)) - } - - result = await function_to_call(**function_args) - logging.info(f"Tool execution result for {function_name}: {result}") - executed_tool_calls.add(tool_call['id']) - - return { - "role": "tool", - "tool_call_id": tool_call['id'], - "name": function_name, - "content": str(result) - } - except Exception as e: - error_msg = f"Error executing {function_name}: {str(e)}" - logging.error(error_msg) - return { - "role": "tool", - "tool_call_id": tool_call['id'], - "name": function_name, - "content": str(ToolResult(success=False, output=error_msg)) - } - - tasks = [execute_single_tool(tool_call) for tool_call in tool_calls] - results = await asyncio.gather(*tasks) - return [r for r in results if r is not None] - - async def _execute_sequential( - self, - tool_calls: List[Dict[str, Any]], - available_functions: Dict[str, Callable], - thread_id: str, - executed_tool_calls: Set[str] - ) -> List[Dict[str, Any]]: - results = [] - for tool_call in tool_calls: - if tool_call['id'] in executed_tool_calls: - continue - - try: - function_name = tool_call['function']['name'] - function_args = tool_call['function']['arguments'] - if isinstance(function_args, str): - function_args = json.loads(function_args) - - function_to_call = available_functions.get(function_name) - if not function_to_call: - error_msg = f"Function {function_name} not found" - logging.error(error_msg) - result = ToolResult(success=False, output=error_msg) - else: - result = await function_to_call(**function_args) - logging.info(f"Tool execution result for {function_name}: {result}") - executed_tool_calls.add(tool_call['id']) - - results.append({ - "role": "tool", - "tool_call_id": tool_call['id'], - "name": function_name, - "content": str(result) - }) - except Exception as e: - error_msg = f"Error executing {function_name}: {str(e)}" - logging.error(error_msg) - results.append({ - "role": "tool", - "tool_call_id": tool_call['id'], - "name": function_name, - "content": str(ToolResult(success=False, output=error_msg)) - }) - - return results - -# --- Results Adder Base --- - -class ResultsAdderBase(ABC): - """Abstract base class for handling tool results and message processing.""" - - def __init__(self, thread_manager): - """Initialize with a ThreadManager instance. - - Args: - thread_manager: The ThreadManager instance to use for message operations - """ - self.add_message = thread_manager.add_message - self.update_message = thread_manager._update_message - self.list_messages = thread_manager.list_messages - self.message_added = False - - @abstractmethod - async def add_initial_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): - pass - - @abstractmethod - async def update_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): - pass - - @abstractmethod - async def add_tool_result(self, thread_id: str, result: Dict[str, Any]): - pass - -# --- Standard Results Adder Implementation --- - -class StandardResultsAdder(ResultsAdderBase): - """Standard implementation for handling tool results and message processing.""" - - def __init__(self, thread_manager): - """Initialize with ThreadManager instance.""" - super().__init__(thread_manager) # Use base class initialization - - async def add_initial_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): - message = { - "role": "assistant", - "content": content - } - if tool_calls: - message["tool_calls"] = tool_calls - - await self.add_message(thread_id, message) - self.message_added = True - - async def update_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): - if not self.message_added: - await self.add_initial_response(thread_id, content, tool_calls) - return - - message = { - "role": "assistant", - "content": content - } - if tool_calls: - message["tool_calls"] = tool_calls - - await self.update_message(thread_id, message) - - async def add_tool_result(self, thread_id: str, result: Dict[str, Any]): - messages = await self.list_messages(thread_id) - if not any(msg.get('tool_call_id') == result['tool_call_id'] for msg in messages): - await self.add_message(thread_id, result) - -# --- Response Processor --- - -class StandardLLMResponseProcessor: - """Handles LLM response processing and tool execution management.""" - - def __init__( - self, - thread_id: str, - available_functions: Dict = None, - add_message_callback: Callable = None, - update_message_callback: Callable = None, - list_messages_callback: Callable = None, - parallel_tool_execution: bool = True, - threads_dir: str = "threads", - tool_parser: Optional[ToolParserBase] = None, - tool_executor: Optional[ToolExecutorBase] = None, - results_adder: Optional[ResultsAdderBase] = None, - thread_manager = None # Add thread_manager parameter - ): - self.thread_id = thread_id - self.tool_executor = tool_executor or StandardToolExecutor(parallel=parallel_tool_execution) - self.tool_parser = tool_parser or StandardToolParser() - self.available_functions = available_functions or {} - self.threads_dir = threads_dir - - # Create a minimal thread manager if none provided - if thread_manager is None and (add_message_callback and update_message_callback and list_messages_callback): - class MinimalThreadManager: - def __init__(self, add_msg, update_msg, list_msg): - self.add_message = add_msg - self._update_message = update_msg - self.list_messages = list_msg - thread_manager = MinimalThreadManager(add_message_callback, update_message_callback, list_messages_callback) - - # Initialize results adder - self.results_adder = results_adder or StandardResultsAdder(thread_manager) - - # State tracking for streaming responses - self.tool_calls_buffer = {} - self.processed_tool_calls = set() - self.content_buffer = "" - self.tool_calls_accumulated = [] - - async def process_stream( - self, - response_stream: AsyncGenerator, - execute_tools: bool = True, - immediate_execution: bool = True - ) -> AsyncGenerator: - """Process streaming LLM response and handle tool execution.""" - pending_tool_calls = [] - background_tasks = set() - - async def handle_message_management(chunk): - try: - # Accumulate content - if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content: - self.content_buffer += chunk.choices[0].delta.content - - # Parse tool calls if present - if hasattr(chunk.choices[0].delta, 'tool_calls'): - parsed_message, is_complete = await self.tool_parser.parse_stream( - chunk, - self.tool_calls_buffer - ) - if parsed_message and 'tool_calls' in parsed_message: - self.tool_calls_accumulated = parsed_message['tool_calls'] - - # Handle tool execution and results - if execute_tools and self.tool_calls_accumulated: - new_tool_calls = [ - tool_call for tool_call in self.tool_calls_accumulated - if tool_call['id'] not in self.processed_tool_calls - ] - - if new_tool_calls: - if immediate_execution: - results = await self.tool_executor.execute_tool_calls( - tool_calls=new_tool_calls, - available_functions=self.available_functions, - thread_id=self.thread_id, - executed_tool_calls=self.processed_tool_calls - ) - for result in results: - await self.results_adder.add_tool_result(self.thread_id, result) - self.processed_tool_calls.add(result['tool_call_id']) - else: - pending_tool_calls.extend(new_tool_calls) - - # Add/update assistant message - message = { - "role": "assistant", - "content": self.content_buffer - } - if self.tool_calls_accumulated: - message["tool_calls"] = self.tool_calls_accumulated - - if not hasattr(self, '_message_added'): - await self.results_adder.add_initial_response( - self.thread_id, - self.content_buffer, - self.tool_calls_accumulated - ) - self._message_added = True - else: - await self.results_adder.update_response( - self.thread_id, - self.content_buffer, - self.tool_calls_accumulated - ) - - # Handle stream completion - if chunk.choices[0].finish_reason: - if not immediate_execution and pending_tool_calls: - results = await self.tool_executor.execute_tool_calls( - tool_calls=pending_tool_calls, - available_functions=self.available_functions, - thread_id=self.thread_id, - executed_tool_calls=self.processed_tool_calls - ) - for result in results: - await self.results_adder.add_tool_result(self.thread_id, result) - self.processed_tool_calls.add(result['tool_call_id']) - pending_tool_calls.clear() - - except Exception as e: - logging.error(f"Error in background task: {e}") - - try: - async for chunk in response_stream: - task = asyncio.create_task(handle_message_management(chunk)) - background_tasks.add(task) - task.add_done_callback(background_tasks.discard) - yield chunk - - if background_tasks: - await asyncio.gather(*background_tasks, return_exceptions=True) - - except Exception as e: - logging.error(f"Error in stream processing: {e}") - for task in background_tasks: - if not task.done(): - task.cancel() - raise - - async def process_response(self, response: Any, execute_tools: bool = True) -> None: - """Process complete LLM response and execute tools.""" - try: - assistant_message = await self.tool_parser.parse_response(response) - await self.results_adder.add_initial_response( - self.thread_id, - assistant_message['content'], - assistant_message.get('tool_calls') - ) - - if execute_tools and 'tool_calls' in assistant_message and assistant_message['tool_calls']: - results = await self.tool_executor.execute_tool_calls( - tool_calls=assistant_message['tool_calls'], - available_functions=self.available_functions, - thread_id=self.thread_id, - executed_tool_calls=self.processed_tool_calls - ) - - for result in results: - await self.results_adder.add_tool_result(self.thread_id, result) - logging.info(f"Tool execution result: {result}") - - except Exception as e: - logging.error(f"Error processing response: {e}") - response_content = response.choices[0].message.get('content', '') - await self.results_adder.add_initial_response(self.thread_id, response_content) diff --git a/agentpress/thread_manager.py b/agentpress/thread_manager.py index 44ff6d547..51c82ce1b 100644 --- a/agentpress/thread_manager.py +++ b/agentpress/thread_manager.py @@ -5,12 +5,17 @@ from agentpress.llm import make_llm_api_call from agentpress.tool import Tool, ToolResult from agentpress.tool_registry import ToolRegistry -from agentpress.thread_llm_response_processor import StandardLLMResponseProcessor -from agentpress.thread_llm_response_processor import ToolParserBase -from agentpress.thread_llm_response_processor import ToolExecutorBase -from agentpress.thread_llm_response_processor import ResultsAdderBase +from agentpress.llm_response_processor import LLMResponseProcessor +from agentpress.base_processors import ToolParserBase, ToolExecutorBase, ResultsAdderBase import uuid +from agentpress.xml_tool_parser import XMLToolParser +from agentpress.xml_tool_executor import XMLToolExecutor +from agentpress.xml_results_adder import XMLResultsAdder +from agentpress.standard_tool_parser import StandardToolParser +from agentpress.standard_tool_executor import StandardToolExecutor +from agentpress.standard_results_adder import StandardResultsAdder + class ThreadManager: """Manages conversation threads with LLM models and tool execution. @@ -217,32 +222,49 @@ async def run_thread( max_tokens: Optional[int] = None, tool_choice: str = "auto", temporary_message: Optional[Dict[str, Any]] = None, - use_tools: bool = False, native_tool_calling: bool = False, + xml_tool_calling: bool = False, execute_tools: bool = True, stream: bool = False, - immediate_tool_execution: bool = True, - parallel_tool_execution: bool = True, + execute_tools_on_stream: bool = False, + parallel_tool_execution: bool = False, tool_parser: Optional[ToolParserBase] = None, tool_executor: Optional[ToolExecutorBase] = None, results_adder: Optional[ResultsAdderBase] = None ) -> Union[Dict[str, Any], AsyncGenerator]: """Run a conversation thread with specified parameters.""" + + # Validate tool calling configuration + if native_tool_calling and xml_tool_calling: + raise ValueError("Cannot use both native LLM tool calling and XML tool calling simultaneously") + + # Initialize tool components if any tool calling is enabled + if native_tool_calling or xml_tool_calling: + if tool_parser is None: + tool_parser = XMLToolParser(tool_registry=self.tool_registry) if xml_tool_calling else StandardToolParser() + + if tool_executor is None: + tool_executor = XMLToolExecutor(parallel=parallel_tool_execution, tool_registry=self.tool_registry) if xml_tool_calling else StandardToolExecutor(parallel=parallel_tool_execution) + + if results_adder is None: + results_adder = XMLResultsAdder(self) if xml_tool_calling else StandardResultsAdder(self) + try: messages = await self.list_messages(thread_id) prepared_messages = [system_message] + messages if temporary_message: prepared_messages.append(temporary_message) - tool_schemas = None - if use_tools: - if native_tool_calling: - tool_schemas = self.tool_registry.get_openapi_schemas() + openapi_tool_schemas = None + if native_tool_calling: + openapi_tool_schemas = self.tool_registry.get_openapi_schemas() + available_functions = self.tool_registry.get_available_functions() + elif xml_tool_calling: available_functions = self.tool_registry.get_available_functions() else: available_functions = {} - response_processor = StandardLLMResponseProcessor( + response_processor = LLMResponseProcessor( thread_id=thread_id, available_functions=available_functions, add_message_callback=self.add_message, @@ -260,7 +282,7 @@ async def run_thread( model_name=model_name, temperature=temperature, max_tokens=max_tokens, - tools=tool_schemas, + tools=openapi_tool_schemas, tool_choice=tool_choice if native_tool_calling else None, stream=stream ) @@ -269,7 +291,7 @@ async def run_thread( return response_processor.process_stream( response_stream=llm_response, execute_tools=execute_tools, - immediate_execution=immediate_tool_execution + execute_tools_on_stream=execute_tools_on_stream ) await response_processor.process_response( @@ -361,9 +383,9 @@ async def main(): temperature=0.7, max_tokens=4096, stream=True, - use_tools=True, + native_tool_calling=True, execute_tools=True, - immediate_tool_execution=True, + execute_tools_on_stream=True, parallel_tool_execution=True ) diff --git a/agentpress/xml_results_adder.py b/agentpress/xml_results_adder.py index 7dd28342b..47aa519f1 100644 --- a/agentpress/xml_results_adder.py +++ b/agentpress/xml_results_adder.py @@ -1,6 +1,6 @@ import logging from typing import Dict, Any, List, Optional -from agentpress.thread_llm_response_processor import ResultsAdderBase +from agentpress.base_processors import ResultsAdderBase class XMLResultsAdder(ResultsAdderBase): """XML-specific implementation for handling tool results and message processing. diff --git a/agentpress/xml_tool_executor.py b/agentpress/xml_tool_executor.py index 8b6a0348f..c2462d590 100644 --- a/agentpress/xml_tool_executor.py +++ b/agentpress/xml_tool_executor.py @@ -2,7 +2,7 @@ import asyncio import json import logging -from agentpress.thread_llm_response_processor import ToolExecutorBase +from agentpress.base_processors import ToolExecutorBase from agentpress.tool import ToolResult from agentpress.tool_registry import ToolRegistry diff --git a/agentpress/xml_tool_parser.py b/agentpress/xml_tool_parser.py index 20dcd22ea..10fae387b 100644 --- a/agentpress/xml_tool_parser.py +++ b/agentpress/xml_tool_parser.py @@ -1,6 +1,6 @@ import logging from typing import Dict, Any, Optional, List, Tuple -from agentpress.thread_llm_response_processor import ToolParserBase +from agentpress.base_processors import ToolParserBase import json import re from agentpress.tool_registry import ToolRegistry