Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 130 additions & 12 deletions amplifier_foundation/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from typing import Any
from typing import Callable

import yaml

if TYPE_CHECKING:
from collections.abc import Awaitable

Expand Down Expand Up @@ -353,39 +355,153 @@ def resolve_agent_path(self, name: str) -> Path | None:
For namespaced agents from included bundles, uses source_base_paths
to find the correct bundle's agents directory.

Security: Validates that resolved paths stay within the expected agents/
directory to prevent path traversal attacks.

Args:
name: Agent name (may include bundle prefix).

Returns:
Path to agent file, or None if not found.
Path to agent file, or None if not found or path traversal detected.
"""
# Security: Reject obvious path traversal attempts early
if ".." in name or name.startswith("/"):
logger.warning(f"Invalid agent name (potential path traversal): {name!r}")
return None

def _is_path_contained(base: Path, target: Path) -> bool:
"""Check if target path is contained within base directory."""
try:
base_resolved = base.resolve()
target_resolved = target.resolve()
return target_resolved.is_relative_to(base_resolved)
except (OSError, ValueError):
return False

# Check for namespaced agent (e.g., "foundation:bug-hunter")
if ":" in name:
namespace, simple_name = name.split(":", 1)

# First, try source_base_paths for included bundles
if namespace in self.source_base_paths:
agent_path = (
self.source_base_paths[namespace] / "agents" / f"{simple_name}.md"
)
if agent_path.exists():
agents_dir = self.source_base_paths[namespace] / "agents"
agent_path = agents_dir / f"{simple_name}.md"
if agent_path.exists() and _is_path_contained(agents_dir, agent_path):
return agent_path

# Fall back to self.base_path if namespace matches self.name
if namespace == self.name and self.base_path:
agent_path = self.base_path / "agents" / f"{simple_name}.md"
if agent_path.exists():
agents_dir = self.base_path / "agents"
agent_path = agents_dir / f"{simple_name}.md"
if agent_path.exists() and _is_path_contained(agents_dir, agent_path):
return agent_path
else:
# No namespace - look in self.base_path
simple_name = name
if self.base_path:
agent_path = self.base_path / "agents" / f"{simple_name}.md"
if agent_path.exists():
agents_dir = self.base_path / "agents"
agent_path = agents_dir / f"{simple_name}.md"
if agent_path.exists() and _is_path_contained(agents_dir, agent_path):
return agent_path

return None

def load_agent_content(self, name: str) -> dict[str, Any] | None:
"""Load agent content from its .md file.

Resolves the agent file path and parses its frontmatter and body.
The body becomes the system instruction for the agent.

Args:
name: Agent name (may include bundle prefix like "foundation:bug-hunter").

Returns:
Dict with agent config including:
- name: The agent name
- description: From frontmatter meta.description (if present)
- system.instruction: The markdown body content
- Any other frontmatter fields

Returns None if agent file not found.

Example:
>>> bundle.load_agent_content("foundation:bug-hunter")
{
"name": "bug-hunter",
"description": "Debugging expert...",
"system": {"instruction": "# Bug Hunter Agent\\n\\nYou are..."}
}
"""
from amplifier_foundation.io.frontmatter import parse_frontmatter

agent_path = self.resolve_agent_path(name)
if agent_path is None:
return None

try:
content = agent_path.read_text()
frontmatter, body = parse_frontmatter(content)

# Build agent config from frontmatter
agent_config: dict[str, Any] = {}

# Extract meta section - always set name consistently
meta = frontmatter.get("meta", {})
agent_config["name"] = meta.get("name", name.split(":")[-1])
if meta.get("description"):
agent_config["description"] = meta["description"]

# Copy any other top-level frontmatter fields
for key, value in frontmatter.items():
if key != "meta" and key not in agent_config:
agent_config[key] = value

# Set body as system instruction
if body.strip():
agent_config.setdefault("system", {})["instruction"] = body.strip()

return agent_config

except FileNotFoundError:
# File was there at resolve time but gone now (race condition)
logger.debug(f"Agent file disappeared for: {name}")
return None
except (OSError, UnicodeDecodeError) as e:
# Don't log full path - could leak sensitive directory structure
logger.warning(
f"Failed to read agent file for '{name}': {type(e).__name__}"
)
return None
except yaml.YAMLError as e:
# Invalid YAML in frontmatter
logger.warning(
f"Invalid YAML in agent file for '{name}': {type(e).__name__}"
)
return None

def resolve_agents(self) -> None:
"""Resolve agent content for all agents that only have names.

For agents specified via 'include:' that only have a name reference,
this loads their full content from the corresponding .md file.
Agents with inline definitions (already having system.instruction)
are left unchanged.

Call this after composition when source_base_paths is populated.
Similar to resolve_pending_context() but for agents.
"""
for name, config in list(self.agents.items()):
# Skip agents that already have instruction content
if config.get("system", {}).get("instruction"):
continue

# Try to load content from .md file
loaded = self.load_agent_content(name)
if loaded:
# Deep merge to preserve nested config (e.g., system.max_tokens)
# Loaded content takes precedence for conflicts
self.agents[name] = deep_merge(config, loaded)

def get_system_instruction(self) -> str | None:
"""Get the system instruction for this bundle.

Expand Down Expand Up @@ -568,7 +684,9 @@ def __init__(
self._activator = activator
self._activation_lock = asyncio.Lock()

def resolve(self, module_id: str, source_hint: Any = None, profile_hint: Any = None) -> BundleModuleSource:
def resolve(
self, module_id: str, source_hint: Any = None, profile_hint: Any = None
) -> BundleModuleSource:
"""Resolve module ID to source.

Args:
Expand All @@ -581,7 +699,7 @@ def resolve(self, module_id: str, source_hint: Any = None, profile_hint: Any = N

Raises:
ModuleNotFoundError: If module not in activated paths and lazy activation fails.

FIXME: Remove profile_hint parameter after all callers migrate to source_hint (target: v2.0).
"""
hint = profile_hint if profile_hint is not None else source_hint
Expand All @@ -608,7 +726,7 @@ async def async_resolve(

Raises:
ModuleNotFoundError: If module not found and activation fails.

FIXME: Remove profile_hint parameter after all callers migrate to source_hint (target: v2.0).
"""
hint = profile_hint if profile_hint is not None else source_hint
Expand Down
Loading