Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 76 additions & 76 deletions src/clis/agent/episodic_memory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""
情景记忆模块 - 当前任务的持久化 Markdown 文档
Episodic memory module - Persistent Markdown document for current task

特点:
- 持久化到 .clis_memory/ 目录
- 人类可读可编辑
- 结构化 Markdown (checklist, findings, next steps)
- 跨会话保留
Features:
- Persist to .clis_memory/ directory
- Human-readable and editable
- Structured Markdown (checklist, findings, next steps)
- Cross-session retention
"""

from pathlib import Path
Expand All @@ -16,15 +16,15 @@

class EpisodicMemory:
"""
情景记忆 - 当前任务的持久化 Markdown 文档
Episodic memory - Persistent Markdown document for current task

每个任务对应一个 Markdown 文件,包含:
- 任务目标
- 任务分解 (checklist)
- 关键发现
- 当前进度
- 下一步行动
- 执行日志
Each task corresponds to a Markdown file containing:
- Task objective
- Task breakdown (checklist)
- Key findings
- Current progress
- Next actions
- Execution log
"""

def __init__(self, task_id: str, memory_dir: str = ".clis_memory"):
Expand All @@ -33,97 +33,97 @@ def __init__(self, task_id: str, memory_dir: str = ".clis_memory"):
self.tasks_dir = self.memory_dir / "tasks" / "active"
self.task_file = self.tasks_dir / f"task_{task_id}.md"

# 确保目录存在
# Ensure directory exists
self.tasks_dir.mkdir(parents=True, exist_ok=True)

def load_or_create(self, task_description: str) -> str:
"""
加载现有任务文档或创建新文档
Load existing task document or create new one

Args:
task_description: 用户的任务描述
task_description: User's task description

Returns:
任务文档内容
Task document content
"""
if self.task_file.exists():
return self.task_file.read_text(encoding='utf-8')
else:
return self._create_initial_doc(task_description)

def _create_initial_doc(self, task_description: str) -> str:
"""创建初始任务文档"""
"""Create initial task document"""
doc = f"""# Task: {task_description}

**任务ID**: {self.task_id}
**创建时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**状态**: 🔄 进行中
**Task ID**: {self.task_id}
**Created**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Status**: 🔄 In Progress

---

## 📋 任务目标
## 📋 Task Objective

{task_description}

---

## ✅ 任务分解
## ✅ Task Breakdown

<!-- 这里会自动更新任务步骤 -->
- [ ] 步骤将在执行中自动识别
<!-- Task steps will be automatically updated here -->
- [ ] Steps will be automatically identified during execution

---

## 🔍 关键发现
## 🔍 Key Findings

*(执行中会自动记录)*
*(Will be automatically recorded during execution)*

---

## 📊 当前进度
## 📊 Current Progress

**阶段**: 初始化
**进度**: 0/0
**Phase**: Initialization
**Progress**: 0/0

---

## 🎯 下一步行动
## 🎯 Next Actions

开始执行任务...
Starting task execution...

---

## 📝 执行日志
## 📝 Execution Log

"""
self.task_file.write_text(doc, encoding='utf-8')
return doc

def update_step(self, step_description: str, status: str = "done"):
"""
更新任务步骤状态
Update task step status

Args:
step_description: 步骤描述
step_description: Step description
status: "done" | "in_progress" | "pending"
"""
if not self.task_file.exists():
return

content = self.task_file.read_text(encoding='utf-8')

# 查找任务分解区域
checklist_pattern = r'(## ✅ 任务分解.*?)(##|\Z)'
# Find task breakdown area
checklist_pattern = r'(## ✅ Task Breakdown.*?)(##|\Z)'
match = re.search(checklist_pattern, content, re.DOTALL)

if match:
checklist_section = match.group(1)

# 检查是否已存在此步骤
# Check if this step already exists
step_exists = step_description in checklist_section

if not step_exists:
# 添加新步骤
# Add new step
checkbox = {
"done": "- [x]",
"in_progress": "- [ ] 🔄",
Expand All @@ -132,43 +132,43 @@ def update_step(self, step_description: str, status: str = "done"):

new_step = f"{checkbox} {step_description}\n"

# 在下一个 ## 前插入
# Insert before next ##
next_section_pos = content.find('##', match.end(1))
if next_section_pos != -1:
# 在找到的位置前插入
# Insert before found position
insert_pos = content.rfind('\n', match.start(1), next_section_pos)
if insert_pos == -1:
insert_pos = match.end(1)
content = content[:insert_pos] + '\n' + new_step + content[insert_pos:]
else:
# 在结尾插入
# Insert at end
content = content.rstrip() + '\n' + new_step + '\n'

self.task_file.write_text(content, encoding='utf-8')

def add_finding(self, finding: str, category: str = "general"):
"""
添加关键发现
Add key finding

Args:
finding: 发现内容
category: 分类标签
finding: Finding content
category: Category label
"""
if not self.task_file.exists():
return

content = self.task_file.read_text(encoding='utf-8')

# 查找关键发现区域
findings_pattern = r'(## 🔍 关键发现.*?)(##|\Z)'
# Find key findings area
findings_pattern = r'(## 🔍 Key Findings.*?)(##|\Z)'
match = re.search(findings_pattern, content, re.DOTALL)

if match:
# 添加新发现
# Add new finding
timestamp = datetime.now().strftime('%H:%M:%S')
new_finding = f"- **[{category}]** ({timestamp}): {finding}\n"

# 在下一个 ## 前插入
# Insert before next ##
next_section_pos = content.find('##', match.end(1))
if next_section_pos != -1:
insert_pos = content.rfind('\n', match.start(1), next_section_pos)
Expand All @@ -181,52 +181,52 @@ def add_finding(self, finding: str, category: str = "general"):
self.task_file.write_text(content, encoding='utf-8')

def update_progress(self, phase: str, progress: str):
"""更新当前进度"""
"""Update current progress"""
if not self.task_file.exists():
return

content = self.task_file.read_text(encoding='utf-8')

# 更新阶段
# Update phase
content = re.sub(
r'\*\*阶段\*\*:.*',
f'**阶段**: {phase}',
r'\*\*Phase\*\*:.*',
f'**Phase**: {phase}',
content
)

# 更新进度
# Update progress
content = re.sub(
r'\*\*进度\*\*:.*',
f'**进度**: {progress}',
r'\*\*Progress\*\*:.*',
f'**Progress**: {progress}',
content
)

self.task_file.write_text(content, encoding='utf-8')

def update_next_action(self, action: str):
"""更新下一步行动建议"""
"""Update next action suggestion"""
if not self.task_file.exists():
return

content = self.task_file.read_text(encoding='utf-8')

# 查找下一步行动区域
next_action_pattern = r'(## 🎯 下一步行动.*?)(##|\Z)'
# Find next actions area
next_action_pattern = r'(## 🎯 Next Actions.*?)(##|\Z)'
match = re.search(next_action_pattern, content, re.DOTALL)

if match:
new_section = f"## 🎯 下一步行动\n\n{action}\n\n"
new_section = f"## 🎯 Next Actions\n\n{action}\n\n"
content = content[:match.start(1)] + new_section + content[match.end(1):]
self.task_file.write_text(content, encoding='utf-8')

def append_log(self, log_entry: str):
"""添加执行日志"""
"""Add execution log"""
if not self.task_file.exists():
return

content = self.task_file.read_text(encoding='utf-8')

# 在执行日志区域追加
# Append to execution log area
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_line = f"\n[{timestamp}] {log_entry}\n"

Expand All @@ -235,41 +235,41 @@ def append_log(self, log_entry: str):

def inject_to_prompt(self, include_log: bool = False) -> str:
"""
将任务文档注入到 prompt
Inject task document to prompt

Args:
include_log: 是否包含执行日志 (较长)
include_log: Whether to include execution log (lengthy)

Returns:
格式化的 prompt 文本
Formatted prompt text
"""
if not self.task_file.exists():
return ""

content = self.task_file.read_text(encoding='utf-8')

if not include_log:
# 移除执行日志部分 (节省 tokens)
content = re.sub(r'## 📝 执行日志.*', '', content, flags=re.DOTALL)
# Remove execution log section (save tokens)
content = re.sub(r'## 📝 Execution Log.*', '', content, flags=re.DOTALL)

return f"""
╭──────────────────────────────────────────────────────────────╮
│ 📖 任务记忆 (TASK MEMORY / MEMORY BANK)
│ 📖 TASK MEMORY / MEMORY BANK
╰──────────────────────────────────────────────────────────────╯

{content}

⚠️ 重要提醒:
检查上方的任务分解,看哪些步骤已完成
查看 🔍 关键发现,已收集的信息就在这里
参考 🎯 下一步行动的建议
如果任务完成,调用 {{"type": "done", "summary": "..."}}
⚠️ Important Reminders:
CheckTask Breakdown above to see which steps are completed
Review 🔍 Key Findings - collected information is here
Refer to 🎯 Next Actions suggestions
If task is complete, call {{"type": "done", "summary": "..."}}
"""

def get_file_path(self) -> Path:
"""获取任务文件路径"""
"""Get task file path"""
return self.task_file

def exists(self) -> bool:
"""检查任务文件是否存在"""
"""Check if task file exists"""
return self.task_file.exists()
Loading