From 66f527c2027af9eb190f26a0691a4e333e9bb59c Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Sat, 17 Jan 2026 11:19:12 -0800 Subject: [PATCH] feat(bundle): Add agent instruction loading from .md files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes a bug where agent instructions from .md files were never loaded when spawning sub-sessions via the task tool. The resolve_agent_path() method existed but was never called in the spawn pipeline. Changes: - Add load_agent_content(name) method to parse agent .md files (frontmatter + body) - Add resolve_agents() method to pre-populate agent configs from files - Add path traversal protection to prevent directory escape attacks - Add comprehensive tests for agent loading and security The load_agent_content method: - Parses frontmatter and body using existing parse_frontmatter() - Extracts meta.name and meta.description from frontmatter - Sets body as system.instruction - Returns None for invalid paths or missing files The resolve_agents method: - Iterates agents that only have name references (from include:) - Loads full content from .md files - Uses deep_merge to preserve existing nested config Security hardening: - Rejects agent names containing '..' or starting with '/' - Validates resolved paths stay within agents/ directory - Sanitizes error messages to avoid path disclosure Fixes: microsoft/amplifier#174 🤖 Generated with [Amplifier](https://github.com/microsoft/amplifier) Co-Authored-By: Amplifier <240397093+microsoft-amplifier@users.noreply.github.com> --- amplifier_foundation/bundle.py | 142 +++++++++++++++-- tests/test_bundle.py | 268 ++++++++++++++++++++++++++++++++- 2 files changed, 394 insertions(+), 16 deletions(-) diff --git a/amplifier_foundation/bundle.py b/amplifier_foundation/bundle.py index d1327d9..278f545 100644 --- a/amplifier_foundation/bundle.py +++ b/amplifier_foundation/bundle.py @@ -11,6 +11,8 @@ from typing import Any from typing import Callable +import yaml + if TYPE_CHECKING: from collections.abc import Awaitable @@ -353,39 +355,153 @@ def resolve_agent_path(self, name: str) -> Path | None: For namespaced agents from included bundles, uses source_base_paths to find the correct bundle's agents directory. + Security: Validates that resolved paths stay within the expected agents/ + directory to prevent path traversal attacks. + Args: name: Agent name (may include bundle prefix). Returns: - Path to agent file, or None if not found. + Path to agent file, or None if not found or path traversal detected. """ + # Security: Reject obvious path traversal attempts early + if ".." in name or name.startswith("/"): + logger.warning(f"Invalid agent name (potential path traversal): {name!r}") + return None + + def _is_path_contained(base: Path, target: Path) -> bool: + """Check if target path is contained within base directory.""" + try: + base_resolved = base.resolve() + target_resolved = target.resolve() + return target_resolved.is_relative_to(base_resolved) + except (OSError, ValueError): + return False + # Check for namespaced agent (e.g., "foundation:bug-hunter") if ":" in name: namespace, simple_name = name.split(":", 1) # First, try source_base_paths for included bundles if namespace in self.source_base_paths: - agent_path = ( - self.source_base_paths[namespace] / "agents" / f"{simple_name}.md" - ) - if agent_path.exists(): + agents_dir = self.source_base_paths[namespace] / "agents" + agent_path = agents_dir / f"{simple_name}.md" + if agent_path.exists() and _is_path_contained(agents_dir, agent_path): return agent_path # Fall back to self.base_path if namespace matches self.name if namespace == self.name and self.base_path: - agent_path = self.base_path / "agents" / f"{simple_name}.md" - if agent_path.exists(): + agents_dir = self.base_path / "agents" + agent_path = agents_dir / f"{simple_name}.md" + if agent_path.exists() and _is_path_contained(agents_dir, agent_path): return agent_path else: # No namespace - look in self.base_path simple_name = name if self.base_path: - agent_path = self.base_path / "agents" / f"{simple_name}.md" - if agent_path.exists(): + agents_dir = self.base_path / "agents" + agent_path = agents_dir / f"{simple_name}.md" + if agent_path.exists() and _is_path_contained(agents_dir, agent_path): return agent_path return None + def load_agent_content(self, name: str) -> dict[str, Any] | None: + """Load agent content from its .md file. + + Resolves the agent file path and parses its frontmatter and body. + The body becomes the system instruction for the agent. + + Args: + name: Agent name (may include bundle prefix like "foundation:bug-hunter"). + + Returns: + Dict with agent config including: + - name: The agent name + - description: From frontmatter meta.description (if present) + - system.instruction: The markdown body content + - Any other frontmatter fields + + Returns None if agent file not found. + + Example: + >>> bundle.load_agent_content("foundation:bug-hunter") + { + "name": "bug-hunter", + "description": "Debugging expert...", + "system": {"instruction": "# Bug Hunter Agent\\n\\nYou are..."} + } + """ + from amplifier_foundation.io.frontmatter import parse_frontmatter + + agent_path = self.resolve_agent_path(name) + if agent_path is None: + return None + + try: + content = agent_path.read_text() + frontmatter, body = parse_frontmatter(content) + + # Build agent config from frontmatter + agent_config: dict[str, Any] = {} + + # Extract meta section - always set name consistently + meta = frontmatter.get("meta", {}) + agent_config["name"] = meta.get("name", name.split(":")[-1]) + if meta.get("description"): + agent_config["description"] = meta["description"] + + # Copy any other top-level frontmatter fields + for key, value in frontmatter.items(): + if key != "meta" and key not in agent_config: + agent_config[key] = value + + # Set body as system instruction + if body.strip(): + agent_config.setdefault("system", {})["instruction"] = body.strip() + + return agent_config + + except FileNotFoundError: + # File was there at resolve time but gone now (race condition) + logger.debug(f"Agent file disappeared for: {name}") + return None + except (OSError, UnicodeDecodeError) as e: + # Don't log full path - could leak sensitive directory structure + logger.warning( + f"Failed to read agent file for '{name}': {type(e).__name__}" + ) + return None + except yaml.YAMLError as e: + # Invalid YAML in frontmatter + logger.warning( + f"Invalid YAML in agent file for '{name}': {type(e).__name__}" + ) + return None + + def resolve_agents(self) -> None: + """Resolve agent content for all agents that only have names. + + For agents specified via 'include:' that only have a name reference, + this loads their full content from the corresponding .md file. + Agents with inline definitions (already having system.instruction) + are left unchanged. + + Call this after composition when source_base_paths is populated. + Similar to resolve_pending_context() but for agents. + """ + for name, config in list(self.agents.items()): + # Skip agents that already have instruction content + if config.get("system", {}).get("instruction"): + continue + + # Try to load content from .md file + loaded = self.load_agent_content(name) + if loaded: + # Deep merge to preserve nested config (e.g., system.max_tokens) + # Loaded content takes precedence for conflicts + self.agents[name] = deep_merge(config, loaded) + def get_system_instruction(self) -> str | None: """Get the system instruction for this bundle. @@ -568,7 +684,9 @@ def __init__( self._activator = activator self._activation_lock = asyncio.Lock() - def resolve(self, module_id: str, source_hint: Any = None, profile_hint: Any = None) -> BundleModuleSource: + def resolve( + self, module_id: str, source_hint: Any = None, profile_hint: Any = None + ) -> BundleModuleSource: """Resolve module ID to source. Args: @@ -581,7 +699,7 @@ def resolve(self, module_id: str, source_hint: Any = None, profile_hint: Any = N Raises: ModuleNotFoundError: If module not in activated paths and lazy activation fails. - + FIXME: Remove profile_hint parameter after all callers migrate to source_hint (target: v2.0). """ hint = profile_hint if profile_hint is not None else source_hint @@ -608,7 +726,7 @@ async def async_resolve( Raises: ModuleNotFoundError: If module not found and activation fails. - + FIXME: Remove profile_hint parameter after all callers migrate to source_hint (target: v2.0). """ hint = profile_hint if profile_hint is not None else source_hint diff --git a/tests/test_bundle.py b/tests/test_bundle.py index 3e90a46..21e7077 100644 --- a/tests/test_bundle.py +++ b/tests/test_bundle.py @@ -33,7 +33,9 @@ def test_from_dict_full(self) -> None: "description": "A full test bundle", }, "session": {"orchestrator": "loop-basic"}, - "providers": [{"module": "provider-anthropic", "config": {"model": "test"}}], + "providers": [ + {"module": "provider-anthropic", "config": {"model": "test"}} + ], "tools": [{"module": "tool-bash"}], "hooks": [{"module": "hooks-logging"}], "includes": ["base-bundle"], @@ -61,7 +63,9 @@ def test_compose_empty_bundles(self) -> None: def test_compose_session_deep_merge(self) -> None: """Session configs are deep merged.""" - base = Bundle(name="base", session={"orchestrator": "loop-basic", "context": "simple"}) + base = Bundle( + name="base", session={"orchestrator": "loop-basic", "context": "simple"} + ) child = Bundle(name="child", session={"orchestrator": "loop-streaming"}) result = base.compose(child) assert result.session["orchestrator"] == "loop-streaming" @@ -193,7 +197,9 @@ def test_resolve_pending_context_with_source_base_paths(self) -> None: # Should be resolved now assert "myns:context/file.md" in bundle.context - assert bundle.context["myns:context/file.md"] == Path("/namespace/root/context/file.md") + assert bundle.context["myns:context/file.md"] == Path( + "/namespace/root/context/file.md" + ) # Should be removed from pending assert "myns:context/file.md" not in bundle._pending_context @@ -209,7 +215,9 @@ def test_resolve_pending_context_self_reference(self) -> None: # Should be resolved using base_path (self-reference) assert "myns:context/file.md" in bundle.context - assert bundle.context["myns:context/file.md"] == Path("/bundle/root/context/file.md") + assert bundle.context["myns:context/file.md"] == Path( + "/bundle/root/context/file.md" + ) def test_compose_merges_pending_context(self) -> None: """Compose merges pending context from both bundles.""" @@ -256,3 +264,255 @@ def test_pending_context_resolved_after_compose(self) -> None: assert "ns2:context/b.md" in result.context assert result.context["ns1:context/a.md"] == Path("/ns1/root/context/a.md") assert result.context["ns2:context/b.md"] == Path("/ns2/root/context/b.md") + + +class TestBundleAgentLoading: + """Tests for agent content loading from .md files.""" + + def test_load_agent_content_with_frontmatter(self) -> None: + """load_agent_content parses frontmatter and body from .md file.""" + with TemporaryDirectory() as tmpdir: + base = Path(tmpdir) + agents_dir = base / "agents" + agents_dir.mkdir() + + # Create agent file with frontmatter + agent_file = agents_dir / "test-agent.md" + agent_file.write_text("""--- +meta: + name: test-agent + description: A test agent for debugging +--- +# Test Agent + +You are a helpful test agent. + +## Instructions + +Be helpful and thorough. +""") + + bundle = Bundle(name="test", base_path=base) + result = bundle.load_agent_content("test-agent") + + assert result is not None + assert result["name"] == "test-agent" + assert result["description"] == "A test agent for debugging" + assert "system" in result + assert "instruction" in result["system"] + assert "# Test Agent" in result["system"]["instruction"] + assert "Be helpful and thorough" in result["system"]["instruction"] + + def test_load_agent_content_namespaced(self) -> None: + """load_agent_content resolves namespaced agent references.""" + with TemporaryDirectory() as tmpdir: + base = Path(tmpdir) + other_base = Path(tmpdir) / "other" + other_base.mkdir() + other_agents = other_base / "agents" + other_agents.mkdir() + + # Create agent in "other" namespace + agent_file = other_agents / "helper.md" + agent_file.write_text("""--- +meta: + name: helper + description: Helper agent +--- +You are a helper. +""") + + bundle = Bundle( + name="main", + base_path=base, + source_base_paths={"other": other_base}, + ) + + result = bundle.load_agent_content("other:helper") + + assert result is not None + assert result["name"] == "helper" + assert result["description"] == "Helper agent" + assert "You are a helper" in result["system"]["instruction"] + + def test_load_agent_content_not_found(self) -> None: + """load_agent_content returns None for missing agent.""" + with TemporaryDirectory() as tmpdir: + bundle = Bundle(name="test", base_path=Path(tmpdir)) + result = bundle.load_agent_content("nonexistent") + assert result is None + + def test_load_agent_content_empty_body(self) -> None: + """load_agent_content handles files with frontmatter but empty body.""" + with TemporaryDirectory() as tmpdir: + base = Path(tmpdir) + agents_dir = base / "agents" + agents_dir.mkdir() + + agent_file = agents_dir / "metadata-only.md" + agent_file.write_text("""--- +meta: + name: metadata-only + description: Agent with no instructions +--- +""") + + bundle = Bundle(name="test", base_path=base) + result = bundle.load_agent_content("metadata-only") + + assert result is not None + assert result["name"] == "metadata-only" + assert result["description"] == "Agent with no instructions" + assert "system" not in result # No instruction = no system key + + def test_resolve_agents_populates_content(self) -> None: + """resolve_agents loads content for agents with only names.""" + with TemporaryDirectory() as tmpdir: + base = Path(tmpdir) + agents_dir = base / "agents" + agents_dir.mkdir() + + # Create agent file + agent_file = agents_dir / "my-agent.md" + agent_file.write_text("""--- +meta: + name: my-agent + description: My special agent +--- +You are my special agent with custom instructions. +""") + + bundle = Bundle( + name="test", + base_path=base, + agents={"my-agent": {"name": "my-agent"}}, # Only has name, no content + ) + + bundle.resolve_agents() + + # Should now have full content + agent = bundle.agents["my-agent"] + assert agent["description"] == "My special agent" + assert "system" in agent + assert "custom instructions" in agent["system"]["instruction"] + + def test_resolve_agents_preserves_inline_definitions(self) -> None: + """resolve_agents doesn't overwrite agents with existing instruction.""" + with TemporaryDirectory() as tmpdir: + base = Path(tmpdir) + agents_dir = base / "agents" + agents_dir.mkdir() + + # Create agent file (would be different from inline) + agent_file = agents_dir / "inline-agent.md" + agent_file.write_text("""--- +meta: + name: inline-agent +--- +File-based instructions. +""") + + bundle = Bundle( + name="test", + base_path=base, + agents={ + "inline-agent": { + "name": "inline-agent", + "system": { + "instruction": "Inline instructions should be kept." + }, + } + }, + ) + + bundle.resolve_agents() + + # Inline instruction should be preserved + assert ( + bundle.agents["inline-agent"]["system"]["instruction"] + == "Inline instructions should be kept." + ) + + def test_resolve_agents_handles_namespaced_agents(self) -> None: + """resolve_agents works with namespaced agent references.""" + with TemporaryDirectory() as tmpdir: + ns_base = Path(tmpdir) / "namespace" + ns_base.mkdir() + agents_dir = ns_base / "agents" + agents_dir.mkdir() + + agent_file = agents_dir / "ns-agent.md" + agent_file.write_text("""--- +meta: + name: ns-agent + description: Namespaced agent +--- +Namespaced instructions here. +""") + + bundle = Bundle( + name="main", + base_path=Path(tmpdir), + source_base_paths={"myns": ns_base}, + agents={"myns:ns-agent": {"name": "myns:ns-agent"}}, + ) + + bundle.resolve_agents() + + agent = bundle.agents["myns:ns-agent"] + assert agent["description"] == "Namespaced agent" + assert "Namespaced instructions" in agent["system"]["instruction"] + + +class TestBundleAgentSecurity: + """Security tests for agent loading.""" + + def test_path_traversal_with_dotdot_rejected(self) -> None: + """Path traversal attempts with .. should return None.""" + with TemporaryDirectory() as tmpdir: + bundle = Bundle(name="test", base_path=Path(tmpdir)) + + malicious_names = [ + "../../../etc/passwd", + "..%2f..%2fetc/passwd", + "foo/../../../etc/passwd", + ] + + for name in malicious_names: + result = bundle.load_agent_content(name) + assert result is None, f"Should reject: {name}" + + def test_path_traversal_namespaced_rejected(self) -> None: + """Namespaced path traversal attempts should return None.""" + with TemporaryDirectory() as tmpdir: + ns_base = Path(tmpdir) / "namespace" + ns_base.mkdir() + + bundle = Bundle( + name="main", + base_path=Path(tmpdir), + source_base_paths={"ns": ns_base}, + ) + + malicious_names = [ + "ns:../../../etc/passwd", + "ns:foo/../bar", + ] + + for name in malicious_names: + result = bundle.load_agent_content(name) + assert result is None, f"Should reject: {name}" + + def test_absolute_path_rejected(self) -> None: + """Absolute paths in agent names should return None.""" + with TemporaryDirectory() as tmpdir: + bundle = Bundle(name="test", base_path=Path(tmpdir)) + + malicious_names = [ + "/etc/passwd", + "ns:/etc/passwd", + ] + + for name in malicious_names: + result = bundle.load_agent_content(name) + assert result is None, f"Should reject: {name}"