Skip to content

Commit

Permalink
api server, thread ws, api factory
Browse files Browse the repository at this point in the history
  • Loading branch information
markokraemer committed Feb 3, 2025
1 parent a455980 commit e72b15c
Show file tree
Hide file tree
Showing 32 changed files with 3,072 additions and 1,489 deletions.
1 change: 1 addition & 0 deletions agentpress/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Empty file to mark as package
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@
- Use either XML or Standard tool calling patterns
"""

import os
import asyncio
import json
from agentpress.thread_manager import ThreadManager
from tools.files_tool import FilesTool
from example.tools.files_tool import FilesTool
from agentpress.state_manager import StateManager
from tools.terminal_tool import TerminalTool
from agentpress.api_factory import register_api_endpoint
from example.tools.terminal_tool import TerminalTool
import logging
from typing import AsyncGenerator, Optional, Dict, Any
import sys

from agentpress.api.api_factory import register_thread_task_api

BASE_SYSTEM_MESSAGE = """
You are a world-class web developer who can create, edit, and delete files, and execute terminal commands.
You write clean, well-structured code. Keep iterating on existing files, continue working on this existing
codebase - do not omit previous progress; instead, keep iterating.
Available tools:
- create_file: Create new files with specified content
- delete_file: Remove existing files
Expand Down Expand Up @@ -69,7 +68,6 @@
}
}
Think deeply and step by step.
"""

XML_FORMAT = """
Expand All @@ -88,87 +86,77 @@
<delete-file file_path="path/to/file">
</delete-file>
<stop_session></stop_session>
"""

def get_anthropic_api_key():
"""Get Anthropic API key from environment or prompt user."""
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
api_key = input("\n🔑 Please enter your Anthropic API key: ").strip()
if not api_key:
print("❌ No API key provided. Please set ANTHROPIC_API_KEY environment variable or enter a key.")
sys.exit(1)
os.environ["ANTHROPIC_API_KEY"] = api_key
return api_key

@register_api_endpoint("/main_agent")
@register_thread_task_api("/agent")
async def run_agent(
thread_id: str,
use_xml: bool = True,
max_iterations: int = 5,
project_description: Optional[str] = None
user_input: Optional[str] = None,
) -> Dict[str, Any]:
"""Run the development agent with specified configuration."""
# Initialize managers
thread_manager = ThreadManager()
await thread_manager.initialize()
"""Run the development agent with specified configuration.
state_manager = StateManager(thread_id)
await state_manager.initialize()

# Register tools
thread_manager.add_tool(FilesTool, thread_id=thread_id)
thread_manager.add_tool(TerminalTool, thread_id=thread_id)
Args:
thread_id (str): The ID of the thread.
max_iterations (int, optional): The maximum number of iterations. Defaults to 5.
user_input (Optional[str], optional): The user input. Defaults to None.
"""
thread_manager = ThreadManager()
state_manager = StateManager(thread_id)

# Add initial project description if provided
if project_description:
if user_input:
await thread_manager.add_message(
thread_id,
{
"role": "user",
"content": project_description
"content": user_input
}
)

# Set up system message with appropriate format
thread_manager.add_tool(FilesTool, thread_id=thread_id)
thread_manager.add_tool(TerminalTool, thread_id=thread_id)

system_message = {
"role": "system",
"content": BASE_SYSTEM_MESSAGE + (XML_FORMAT if use_xml else "")
"content": BASE_SYSTEM_MESSAGE + XML_FORMAT
}

# Create initial event to track agent loop
await thread_manager.create_event(
thread_id=thread_id,
event_type="agent_loop_started",
content={
"max_iterations": max_iterations,
"use_xml": use_xml,
"project_description": project_description
},
include_in_llm_message_history=False
)

results = []
iteration = 0
while iteration < max_iterations:
iteration += 1

files_tool = FilesTool(thread_id)
await files_tool._init_workspace_state()
files_tool = FilesTool(thread_id=thread_id)

state = await state_manager.get_latest_state()

state_message = {
"role": "user",
"content": f"""
Current development environment workspace state:
<current_workspace_state>
{json.dumps(state, indent=2)}
</current_workspace_state>
"""
state = await state_manager.export_store()

temporary_message_content = f"""
You are tasked to complete the LATEST USER REQUEST!
<latest_user_request>
{user_input}
</latest_user_request>
Current development environment workspace state:
<current_workspace_state>
{json.dumps(state, indent=2) if state else "{}"}
</current_workspace_state>
CONTINUE WITH THE TASK! USE THE SESSION TOOL TO STOP THE SESSION IF THE TASK IS COMPLETE.
"""

await thread_manager.add_message(
thread_id=thread_id,
message_data=temporary_message_content,
message_type="temporary_message",
include_in_llm_message_history=False
)

temporary_message = {
"role": "user",
"content": temporary_message_content
}

model_name = "anthropic/claude-3-5-sonnet-latest"
model_name = "anthropic/claude-3-5-sonnet-latest"

response = await thread_manager.run_thread(
thread_id=thread_id,
Expand All @@ -177,51 +165,57 @@ async def run_agent(
temperature=0.1,
max_tokens=8096,
tool_choice="auto",
temporary_message=state_message,
native_tool_calling=not use_xml,
xml_tool_calling=use_xml,
temporary_message=temporary_message,
native_tool_calling=False,
xml_tool_calling=True,
stream=True,
execute_tools_on_stream=False,
parallel_tool_execution=True
execute_tools_on_stream=True,
parallel_tool_execution=True,
)

# Handle both streaming and regular responses
if hasattr(response, '__aiter__'):
chunks = []
if isinstance(response, AsyncGenerator):
print("\n🤖 Assistant is responding:")
try:
async for chunk in response:
chunks.append(chunk)
if hasattr(chunk.choices[0], 'delta'):
delta = chunk.choices[0].delta

if hasattr(delta, 'content') and delta.content is not None:
content = delta.content
print(content, end='', flush=True)

# Check for open_files_in_editor tag and continue if found
if '</open_files_in_editor>' in content:
print("\n📂 Opening files in editor, continuing to next iteration...")
continue

if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tool_call in delta.tool_calls:
if tool_call.function:
if tool_call.function.name:
print(f"\n🛠️ Tool Call: {tool_call.function.name}", flush=True)
if tool_call.function.arguments:
print(f" {tool_call.function.arguments}", end='', flush=True)

print("\n✨ Response completed\n")

except Exception as e:
print(f"\n❌ Error processing stream: {e}", file=sys.stderr)
logging.error(f"Error processing stream: {e}")
raise
response = chunks

results.append({
"iteration": iteration,
"response": response
})
else:
print("\nNon-streaming response received:", response)

# Create iteration completion event
await thread_manager.create_event(
thread_id=thread_id,
event_type="iteration_complete",
content={
"iteration_number": iteration,
"max_iterations": max_iterations,
# "state": state
},
include_in_llm_message_history=False
)
# # Get latest assistant message and check for stop_session
# latest_msg = await thread_manager.get_llm_history_messages(
# thread_id=thread_id,
# only_latest_assistant=True
# )
# if latest_msg and '</stop_session>' in latest_msg:
# break

return {
"thread_id": thread_id,
"iterations": results,
}

if __name__ == "__main__":
print("\n🚀 Welcome to AgentPress Web Developer Example!")

get_anthropic_api_key()
print("\n🚀 Welcome to AgentPress!")

project_description = input("What would you like to build? (default: Create a modern, responsive landing page)\n> ")
if not project_description.strip():
Expand All @@ -241,10 +235,27 @@ async def run_agent(
print(f"\n{'XML-based' if use_xml else 'Standard'} agent will help you build: {project_description}")
print("Use Ctrl+C to stop the agent at any time.")

async def async_main():
async def test_agent():
thread_manager = ThreadManager()
thread_id = await thread_manager.create_thread()
logging.info(f"Created new thread: {thread_id}")
await run_agent(thread_id, use_xml, project_description=project_description)

asyncio.run(async_main())

try:
result = await run_agent(
thread_id=thread_id,
max_iterations=5,
user_input=project_description,
)
print("\n✅ Test completed successfully!")

except Exception as e:
print(f"\n❌ Test failed: {str(e)}")
raise

try:
asyncio.run(test_agent())
except KeyboardInterrupt:
print("\n⚠️ Test interrupted by user")
except Exception as e:
print(f"\n❌ Test failed with error: {str(e)}")
raise
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,8 @@ def __init__(self, thread_id: Optional[str] = None):
os.makedirs(self.workspace, exist_ok=True)
if thread_id:
self.state_manager = StateManager(thread_id)
asyncio.create_task(self._init_state())
self.SNIPPET_LINES = 4 # Number of context lines to show around edits

async def _init_state(self):
"""Initialize state manager and workspace state."""
await self.state_manager.initialize()
await self._init_workspace_state()
asyncio.create_task(self._init_workspace_state())
self.SNIPPET_LINES = 4

def _should_exclude_file(self, rel_path: str) -> bool:
"""Check if a file should be excluded based on path, name, or extension"""
Expand Down Expand Up @@ -264,6 +259,9 @@ async def str_replace(self, file_path: str, old_str: str, new_str: str) -> ToolR
new_content = content.replace(old_str, new_str)
full_path.write_text(new_content)

# Update state after file modification
await self._update_workspace_state()

# Show snippet around the edit
replacement_line = content.split(old_str)[0].count('\n')
start_line = max(0, replacement_line - self.SNIPPET_LINES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import asyncio
import subprocess
from agentpress.tool import Tool, ToolResult, openapi_schema, xml_schema
from agentpress.state_manager import StateManager
from typing import Optional

class TerminalTool(Tool):
Expand All @@ -12,24 +11,6 @@ def __init__(self, thread_id: Optional[str] = None):
super().__init__()
self.workspace = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'workspace')
os.makedirs(self.workspace, exist_ok=True)
if thread_id:
self.state_manager = StateManager(thread_id)
asyncio.create_task(self._init_state())

async def _init_state(self):
"""Initialize state manager."""
await self.state_manager.initialize()

async def _update_command_history(self, command: str, output: str, success: bool):
"""Update command history in state"""
history = await self.state_manager.get("terminal_history") or []
history.append({
"command": command,
"output": output,
"success": success,
"cwd": os.path.relpath(os.getcwd(), self.workspace)
})
await self.state_manager.set("terminal_history", history)

@openapi_schema({
"type": "function",
Expand Down Expand Up @@ -76,12 +57,6 @@ async def execute_command(self, command: str) -> ToolResult:
error = stderr.decode() if stderr else ""
success = process.returncode == 0

await self._update_command_history(
command=command,
output=output + error,
success=success
)

if success:
return self.success_response({
"output": output,
Expand All @@ -93,11 +68,6 @@ async def execute_command(self, command: str) -> ToolResult:
return self.fail_response(f"Command failed with exit code {process.returncode}: {error}")

except Exception as e:
await self._update_command_history(
command=command,
output=str(e),
success=False
)
return self.fail_response(f"Error executing command: {str(e)}")
finally:
os.chdir(original_dir)
Loading

0 comments on commit e72b15c

Please sign in to comment.