feat: Add research-skill (start/check two-stage mode) to prevent timeout#126
feat: Add research-skill (start/check two-stage mode) to prevent timeout#126
Conversation
…etch/WebSearch timeout ## Summary Implement research-skill with background job mechanism to handle long-running external research tasks (search + fetch + synthesize) without blocking 480s session timeout. ## Key Changes - **New skill**: research-skill/start-research (async launch) & check-research (progress/result) - **Bot routing**: Update agents.py to prioritize run_skill_script for research tasks - **LineBot integration**: - _extract_research_tool_feedback() handles start/check responses - Start always returns job_id to user - Check returns progress/completion/failure with summaries - **Main specs sync**: Updated bot-platform/line-bot, added research-skill capability - **Tests**: Verify skill loading, routing, and linebot AI response handling ## Motivation - Prevents timeout when multiple WebFetch calls exceed 480s session limit - Enables user to query progress and retrieve partial results later - Maintains fallback paths (script failure → MCP tool) ## Implementation Details - start-research: Fork background process, return job_id immediately - check-research: Query status.json, handle stale jobs (>20min → failed) - JSON status files per job: status/progress/sources/final_summary - No sleep loops in Claude session (avoid nested timeout) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Summary of ChangesHello @yazelin, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses the issue of long-running external research tasks causing session timeouts by introducing an asynchronous Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
此 PR 導入了一個新的 research-skill,用以非同步處理長時間執行的研究任務,並採用 start/check 兩階段機制與背景工作來避免 session timeout。儘管這是一個出色的功能增強,但審查發現了兩個高嚴重性安全漏洞:research 腳本中存在 Server-Side Request Forgery (SSRF) 以及機器人回應處理邏輯中存在 Malicious Message Injection 漏洞,兩者皆源於對從外部網站獲取的數據缺乏足夠的驗證和清理。此外,我的審查還發現了一個正規表示式中的錯誤,並建議重構一些過長的函式以提高程式碼的可維護性和清晰度。
| } | ||
|
|
||
| try: | ||
| response = client.get(url) |
There was a problem hiding this comment.
The start-research script is vulnerable to Server-Side Request Forgery (SSRF). The urls parameter is used to fetch content from external websites, but the validation in _normalize_url only checks for the protocol scheme and the presence of a network location. It does not prevent access to internal IP addresses or hostnames (e.g., 127.0.0.1, 169.254.169.254, localhost). An attacker could exploit this to scan the internal network or access sensitive metadata services on the server.
Recommendation: Implement a strict validation check to ensure that the provided URLs do not resolve to private or reserved IP ranges. You can use a library like ipaddress to check the resolved IP of the hostname before making the request.
| if research_feedback: | ||
| script_name = research_feedback.get("script") | ||
| feedback_text = str(research_feedback.get("message") or "").strip() | ||
| job_id = str(research_feedback.get("job_id") or "").strip() | ||
|
|
||
| # start-research:確保回覆一定包含 job_id | ||
| if script_name == "start-research" and feedback_text: | ||
| if not ai_response: | ||
| ai_response = feedback_text | ||
| elif job_id and job_id not in ai_response: | ||
| ai_response = ai_response.rstrip() + "\n\n" + feedback_text | ||
| # check-research:若 AI 無文字內容,直接使用工具回傳摘要 | ||
| elif feedback_text and not str(ai_response or "").strip(): | ||
| ai_response = feedback_text |
There was a problem hiding this comment.
The bot is vulnerable to Malicious Message Injection. The tool output from the research skill (specifically the final_summary and partial_results which contain content fetched from external websites) is directly incorporated into the ai_response without sanitization. Since the bot's message parser (parse_ai_response) extracts and executes special tags like [FILE_MESSAGE:...] to send files or images, an attacker can host a malicious website containing these tags. When the research skill fetches the site, the tags will be included in the bot's response and interpreted as internal instructions, allowing the attacker to spoof bot messages or trick the bot into sending arbitrary links/images.
Recommendation: Sanitize the tool output before including it in the ai_response. You should escape or remove any patterns that match the bot's internal command tags (like [FILE_MESSAGE:...]) from the text fetched from external sources.
|
|
||
| _SCRIPT_STYLE_RE = re.compile(r"(?is)<(script|style|noscript).*?>.*?</\\1>") | ||
| _TAG_RE = re.compile(r"(?is)<[^>]+>") | ||
| _WHITESPACE_RE = re.compile(r"\\s+") |
| def _extract_research_tool_feedback(tool_calls: list) -> dict | None: | ||
| """從 run_skill_script(research-skill) 的工具輸出組合可回覆訊息。""" | ||
| for tool_call in reversed(tool_calls or []): | ||
| tool_name = getattr(tool_call, "name", "") | ||
| if tool_name != "mcp__ching-tech-os__run_skill_script": | ||
| continue | ||
|
|
||
| tool_input = getattr(tool_call, "input", {}) or {} | ||
| if tool_input.get("skill") != "research-skill": | ||
| continue | ||
|
|
||
| script_name = str(tool_input.get("script") or "") | ||
| raw_output = str(getattr(tool_call, "output", "") or "") | ||
| wrapper = _parse_json_object(raw_output) | ||
| if not wrapper: | ||
| continue | ||
|
|
||
| payload = wrapper | ||
| nested_output = wrapper.get("output") | ||
| if isinstance(nested_output, str): | ||
| parsed_nested = _parse_json_object(nested_output) | ||
| if parsed_nested: | ||
| payload = parsed_nested | ||
|
|
||
| # start-research: 確保 job_id 一定回到使用者 | ||
| if script_name == "start-research": | ||
| if payload.get("success") is True: | ||
| job_id = str(payload.get("job_id") or "").strip() | ||
| if job_id: | ||
| message = ( | ||
| f"✅ 研究任務已受理(job_id: {job_id})。\n" | ||
| f"請稍後提供 job_id 或輸入「查詢研究進度 {job_id}」,我會幫你查最新狀態。" | ||
| ) | ||
| else: | ||
| message = "✅ 研究任務已受理,請稍後再查詢進度。" | ||
| return { | ||
| "script": script_name, | ||
| "job_id": job_id, | ||
| "message": message, | ||
| } | ||
|
|
||
| error = payload.get("error") or wrapper.get("error") or "未知錯誤" | ||
| return { | ||
| "script": script_name, | ||
| "job_id": "", | ||
| "message": f"⚠️ 研究任務啟動失敗:{error}", | ||
| } | ||
|
|
||
| # check-research: 依狀態回覆進度/完成/失敗 | ||
| if script_name == "check-research": | ||
| if payload.get("success") is False: | ||
| error = payload.get("error") or wrapper.get("error") or "未知錯誤" | ||
| return { | ||
| "script": script_name, | ||
| "job_id": str(payload.get("job_id") or ""), | ||
| "message": f"⚠️ 查詢研究進度失敗:{error}", | ||
| } | ||
|
|
||
| status = str(payload.get("status") or "").strip() | ||
| job_id = str(payload.get("job_id") or "").strip() | ||
|
|
||
| if status == "completed": | ||
| summary = str(payload.get("final_summary") or "").strip() or "✅ 研究任務已完成。" | ||
| sources = payload.get("sources") or [] | ||
| source_lines = [] | ||
| for source in sources[:5]: | ||
| if not isinstance(source, dict): | ||
| continue | ||
| title = str(source.get("title") or source.get("url") or "來源") | ||
| url = str(source.get("url") or "").strip() | ||
| source_lines.append(f"- {title}" + (f"({url})" if url else "")) | ||
|
|
||
| message = summary | ||
| if source_lines: | ||
| message += "\n\n參考來源:\n" + "\n".join(source_lines) | ||
| return { | ||
| "script": script_name, | ||
| "job_id": job_id, | ||
| "message": message, | ||
| } | ||
|
|
||
| if status == "failed": | ||
| error = payload.get("error") or "研究任務失敗" | ||
| return { | ||
| "script": script_name, | ||
| "job_id": job_id, | ||
| "message": f"⚠️ 研究任務失敗:{error}", | ||
| } | ||
|
|
||
| status_label = str(payload.get("status_label") or status or "進行中") | ||
| progress = payload.get("progress") | ||
| progress_text = "" | ||
| if isinstance(progress, (int, float)): | ||
| progress_text = f" {int(progress)}%" | ||
|
|
||
| partial_lines = [] | ||
| partial_results = payload.get("partial_results") or [] | ||
| for item in partial_results[:2]: | ||
| if not isinstance(item, dict): | ||
| continue | ||
| snippet = str(item.get("snippet") or "").strip() | ||
| if not snippet: | ||
| continue | ||
| title = str(item.get("title") or item.get("url") or "來源") | ||
| partial_lines.append(f"- {title}:{snippet[:120]}" + ("..." if len(snippet) > 120 else "")) | ||
|
|
||
| message = f"⏳ 研究任務進行中({status_label}{progress_text})。" | ||
| if job_id: | ||
| message += f"\njob_id: {job_id}" | ||
| if partial_lines: | ||
| message += "\n\n目前已取得資料:\n" + "\n".join(partial_lines) | ||
| return { | ||
| "script": script_name, | ||
| "job_id": job_id, | ||
| "message": message, | ||
| } | ||
|
|
||
| return None |
There was a problem hiding this comment.
_extract_research_tool_feedback 函式相當長且複雜,它同時處理 start-research 和 check-research 的邏輯,並且在 check-research 中還處理多種狀態(completed、failed、進行中)。這降低了可讀性,也讓維護變得更加困難。
建議將其重構為更小、更專注的輔助函式。例如:
_handle_start_research_feedback(...)_handle_check_research_feedback(...)
而 _handle_check_research_feedback 還可以進一步為每種狀態(completed、failed、進行中)拆分出輔助函式。這將使程式碼更加模組化且易於理解。
| def _build_final_summary(query: str, fetched_results: list[dict]) -> str: | ||
| """根據已擷取內容產生最終統整。""" | ||
| ok_results = [item for item in fetched_results if item.get("fetch_status") == "ok" and item.get("content")] | ||
| failed_results = [item for item in fetched_results if item.get("fetch_status") != "ok"] | ||
|
|
||
| if not ok_results: | ||
| failed_count = len(failed_results) | ||
| if failed_count: | ||
| return f"針對「{query}」目前未取得可用內容,共有 {failed_count} 個來源擷取失敗。" | ||
| return f"針對「{query}」目前未取得可用內容。" | ||
|
|
||
| lines = [f"研究主題:{query}", "", "重點整理:"] | ||
| for idx, item in enumerate(ok_results[:4], start=1): | ||
| lines.append(f"{idx}. {item['title']}") | ||
| lines.append(f" {_truncate(item['content'], 320)}") | ||
|
|
||
| if failed_results: | ||
| lines.append("") | ||
| lines.append(f"備註:另有 {len(failed_results)} 個來源擷取失敗。") | ||
|
|
||
| return "\n".join(lines).strip() |
| def _do_research( | ||
| job_dir: Path, | ||
| status_path: Path, | ||
| job_id: str, | ||
| query: str, | ||
| seed_urls: list[str], | ||
| max_results: int, | ||
| max_fetch: int, | ||
| ) -> None: | ||
| """背景程序:執行研究流程。""" | ||
| status_data = { | ||
| "job_id": job_id, | ||
| "status": "starting", | ||
| "status_label": "啟動中", | ||
| "progress": 0, | ||
| "query": query, | ||
| "sources": [], | ||
| "partial_results": [], | ||
| "final_summary": "", | ||
| "error": None, | ||
| "created_at": datetime.now().isoformat(), | ||
| } | ||
| _write_status(status_path, status_data) | ||
|
|
||
| try: | ||
| headers = {"User-Agent": USER_AGENT} | ||
| with httpx.Client(timeout=HTTP_TIMEOUT_SEC, follow_redirects=True, headers=headers) as client: | ||
| # 1) 搜尋來源 | ||
| status_data["status"] = "searching" | ||
| status_data["status_label"] = "搜尋中" | ||
| status_data["progress"] = 15 | ||
| _write_status(status_path, status_data) | ||
|
|
||
| candidate_sources: list[dict] = [] | ||
| seen: set[str] = set() | ||
|
|
||
| for idx, url in enumerate(seed_urls[:MAX_SEED_URLS], start=1): | ||
| if url in seen: | ||
| continue | ||
| seen.add(url) | ||
| candidate_sources.append( | ||
| { | ||
| "title": f"指定來源 {idx}", | ||
| "url": url, | ||
| "snippet": "", | ||
| } | ||
| ) | ||
|
|
||
| if len(candidate_sources) < max_results: | ||
| search_results = _search_duckduckgo(client, query, max_results=max_results) | ||
| for item in search_results: | ||
| normalized_url = _normalize_url(str(item.get("url", ""))) | ||
| if not normalized_url or normalized_url in seen: | ||
| continue | ||
| seen.add(normalized_url) | ||
| candidate_sources.append( | ||
| { | ||
| "title": str(item.get("title", "來源")), | ||
| "url": normalized_url, | ||
| "snippet": str(item.get("snippet", "")), | ||
| } | ||
| ) | ||
| if len(candidate_sources) >= max_results: | ||
| break | ||
|
|
||
| if not candidate_sources: | ||
| raise RuntimeError("找不到可用的研究來源") | ||
|
|
||
| status_data["sources"] = candidate_sources | ||
| status_data["progress"] = 30 | ||
| _write_status(status_path, status_data) | ||
|
|
||
| # 2) 擷取內容 | ||
| status_data["status"] = "fetching" | ||
| status_data["status_label"] = "擷取中" | ||
| status_data["progress"] = 35 | ||
| _write_status(status_path, status_data) | ||
|
|
||
| to_fetch = candidate_sources[: max(1, max_fetch)] | ||
| total_fetch = len(to_fetch) | ||
| fetched_results: list[dict] = [] | ||
|
|
||
| for idx, source in enumerate(to_fetch, start=1): | ||
| fetched = _fetch_source(client, source) | ||
| fetched_results.append(fetched) | ||
| status_data["partial_results"] = [ | ||
| { | ||
| "title": item.get("title"), | ||
| "url": item.get("url"), | ||
| "fetch_status": item.get("fetch_status"), | ||
| "snippet": item.get("snippet", ""), | ||
| "error": item.get("error"), | ||
| } | ||
| for item in fetched_results | ||
| ] | ||
| status_data["progress"] = min(85, 35 + int(idx / total_fetch * 50)) | ||
| _write_status(status_path, status_data) | ||
|
|
||
| # 3) 統整結果 | ||
| status_data["status"] = "synthesizing" | ||
| status_data["status_label"] = "統整中" | ||
| status_data["progress"] = 90 | ||
| _write_status(status_path, status_data) | ||
|
|
||
| final_summary = _build_final_summary(query, fetched_results) | ||
| result_path = job_dir / "result.md" | ||
| _write_result_markdown( | ||
| result_path=result_path, | ||
| query=query, | ||
| final_summary=final_summary, | ||
| fetched_results=fetched_results, | ||
| ) | ||
|
|
||
| date_str = job_dir.parent.name | ||
| status_data["status"] = "completed" | ||
| status_data["status_label"] = "完成" | ||
| status_data["progress"] = 100 | ||
| status_data["final_summary"] = final_summary | ||
| status_data["error"] = None | ||
| status_data["result_file_path"] = str(result_path) | ||
| status_data["result_ctos_path"] = f"ctos://linebot/research/{date_str}/{job_id}/result.md" | ||
| _write_status(status_path, status_data) | ||
| except (httpx.HTTPError, OSError, RuntimeError, ValueError) as exc: | ||
| status_data["status"] = "failed" | ||
| status_data["status_label"] = "失敗" | ||
| status_data["error"] = str(exc) | ||
| status_data["progress"] = status_data.get("progress", 0) | ||
| _write_status(status_path, status_data) | ||
|
|
|
已併入 PR #127 一起 merge,此 PR 不再需要。 |
Summary
Implement research-skill with background job mechanism to handle long-running external research tasks without blocking 480s session timeout.
Problem
Solution
research-skillwith two scripts:start-research: Async launch, returns job_id immediatelycheck-research: Query progress, partial results, or final summaryChanges
Testing
Notes