-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathagent.py
More file actions
126 lines (105 loc) · 4.3 KB
/
agent.py
File metadata and controls
126 lines (105 loc) · 4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import asyncio
import time
import os
import datetime
from perception import extract_perception
from memory import MemoryManager, MemoryRecord
from decision import generate_plan
from action import execute_tool
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from log_utils import log
import json
import shutil
import sys
# def log(stage: str, msg: str):
# now = datetime.datetime.now().strftime("%H:%M:%S")
# print(f"[{now}] [{stage}] {msg}")
max_steps = 5
async def main(user_input: str):
try:
print("[agent] Starting agent...")
print("[agent] current working directory: ", os.getcwd())
server_params = StdioServerParameters(
command="python",
args=["mcp_server.py"],
cwd="."
)
try:
async with stdio_client(server_params) as (read, write):
print("Connection established, creating session...")
try:
async with ClientSession(read, write) as session:
print("[agent] Session created, initializing...")
try:
await session.initialize()
print("[agent] MCP session initialized")
tools = await session.list_tools()
print("Available tools:", [t.name for t in tools.tools])
print("Requesting tool list...")
tools_result = await session.list_tools()
tools = tools_result.tools
tool_descriptions = "\n".join(
f"- {tool.name}: {getattr(tool, 'description', 'No description')}"
for tool in tools
)
log("agent", f"{len(tools)} tools loaded")
memory = MemoryManager()
session_id = f"session-{int(time.time())}"
query = user_input
step = 0
while step < max_steps:
log("loop", f"Step {step + 1} started")
perception = extract_perception(user_input)
perception_result = perception.model_dump()
perception_result_str = json.dumps(perception_result, indent=4)
log("perception", f"Perception result: {perception_result_str}")
# log("perception", f"User original query: {perception.user_input}, Modified query by perception: {perception.modified_user_input}")
# log("perception", f"Intent: {perception.intent}, Tool hint: {perception.tool_hint}")
# log("perception", f"Entities Extracted: {perception.entities}")
retrieved = memory.retrieve(query=user_input, top_k=3, session_filter=session_id)
log("memory", f"Retrieved {len(retrieved)} relevant memories")
# log("memory", f"Retrieved memories: {retrieved}")
plan = generate_plan(perception, retrieved, tool_descriptions=tool_descriptions)
log("plan", f"Plan generated: {plan}")
if plan.startswith("FINAL_ANSWER:"):
log("agent", f"✅ FINAL RESULT: {plan}")
break
try:
# Ensure stdout and stderr are flushed before any tool execution
sys.stdout.flush()
sys.stderr.flush()
result = await execute_tool(session, tools, plan)
log("tool", f"{result.tool_name} returned: {result.result}")
# If we got user feedback, update the query
# if result.tool_name == "ask_user_for_clarification_feedback":
# user_input = result.result
# # Continue with the next search using the refined query
# continue
memory.add(MemoryRecord(
text=f"Tool call: {result.tool_name} with {result.arguments}, got: {result.result}",
type="tool_output",
session_id=session_id,
user_query=user_input,
tags=[result.tool_name],
tool_name=result.tool_name
))
if result.tool_name != "ask_user_for_clarification_feedback":
user_input = f"Original task: {query}\nPrevious output: {result.result}\nWhat should I do next?"
except Exception as e:
log("error", f"Tool execution failed: {e}")
break
step += 1
except Exception as e:
print(f"[agent] Session initialization error: {str(e)}")
except Exception as e:
print(f"[agent] Session creation error: {str(e)}")
except Exception as e:
print(f"[agent] Error: {e}")
sys.exit(1)
except Exception as e:
print(f"[agent] Overall error: {str(e)}")
log("agent", "Agent session complete.")
if __name__ == "__main__":
query = input("🧑 What Product do you want to find Today? → ")
asyncio.run(main(query))