-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
123 lines (102 loc) · 4.09 KB
/
run.py
File metadata and controls
123 lines (102 loc) · 4.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# run.py
import asyncio
import logging
import json
from datetime import datetime
from zoneinfo import ZoneInfo
import os
import argparse
from langchain_openai.chat_models import ChatOpenAI
from langchain_core.messages import HumanMessage
from knowledge_agent import get_mcp_tools, create_knowledge_agent_graph
from dotenv import load_dotenv
from db_utils import create_tables
from terminal_utils import print_colorful_break
# Load environment variables from .env file
load_dotenv()
# Create the tables if they don't exist
create_tables()
# Create a custom JSON formatter
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"name": record.name,
"message": record.getMessage(),
"module": record.module,
"funcName": record.funcName,
"lineno": record.lineno
}
return json.dumps(log_record)
# Create a logs directory if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Configure the logger
log_file = f"logs/knowledge_agent_run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_file, mode="w"),
logging.StreamHandler()
]
)
# Get a specific logger for our application's own messages
logger = logging.getLogger('KnowledgeAgent')
async def main():
print_colorful_break("KNOWLEDGE AGENT INITIALIZING")
parser = argparse.ArgumentParser(description="Run the Knowledge Agent with a specific workflow.")
parser.add_argument("--maintenance", action="store_true", help="Run the full maintenance workflow.")
parser.add_argument("--analyze", action="store_true", help="Run the analysis workflow.")
parser.add_argument("--research", action="store_true", help="Run the research workflow.")
parser.add_argument("--curate", action="store_true", help="Run the curation workflow.")
parser.add_argument("--audit", action="store_true", help="Run the audit workflow.")
parser.add_argument("--fix", action="store_true", help="Run the fix workflow.")
parser.add_argument("--advise", action="store_true", help="Run the advise workflow.")
args = parser.parse_args()
task = "maintenance" # Default task
if args.analyze:
task = "analyze"
elif args.research:
task = "research"
elif args.curate:
task = "curate"
elif args.audit:
task = "audit"
elif args.fix:
task = "fix"
elif args.advise:
task = "advise"
logger.info(f"Initializing Knowledge Agent for task: {task}...")
try:
# 1. Load tools asynchronously
mcp_tools = await get_mcp_tools()
model = ChatOpenAI(
model=os.environ.get("OPENAI_MODEL_NAME", "chat"),
base_url=os.environ.get("OPENAI_BASE_URL", "http://localhost:8001/v1"),
temperature=0.6,
top_p=0.6,
)
# 2. Pass tools into the graph creation function
app = create_knowledge_agent_graph(task, mcp_tools)
run_timestamp = datetime.now(ZoneInfo("America/Los_Angeles")).isoformat()
# 3. Initialize the state with the messages list and the first message
initial_state = {
"messages": [HumanMessage(content="Your task is to identify knowledge gaps in the LightRAG knowledge base. Begin now.")],
"task": task,
"status": f"Starting '{task}' workflow.",
"timestamp": run_timestamp,
"mcp_tools": mcp_tools,
"model": model,
"logger": logger
}
logger.info(f"--- Invoking graph for task: {task} ---")
final_state = await app.ainvoke(initial_state)
logger.info("--- Workflow Complete ---")
logger.info(f"Final Status: {final_state['status']}")
print_colorful_break("KNOWLEDGE AGENT RUN COMPLETE")
finally:
logging.shutdown()
if __name__ == "__main__":
asyncio.run(main())