diff --git a/.aic/graph.db b/.aic/graph.db new file mode 100644 index 0000000..cc58f81 Binary files /dev/null and b/.aic/graph.db differ diff --git a/README.md b/README.md index e6a57dc..fa3228c 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ The philosophy behind Conductor is simple: control your code. By treating contex - **Iterate safely**: Review plans before code is written, keeping you firmly in the loop. - **Work as a team**: Set project-level context for your product, tech stack, and workflow preferences that become a shared foundation for your team. - **Build on existing projects**: Intelligent initialization for both new (Greenfield) and existing (Brownfield) projects. +- **Semantic Awareness (AIC)**: Automatically indexes your codebase into "Rich Skeletons" using the AI Compiler (AIC). This functionality is powered by a local **Model Context Protocol (MCP)** server that exposes tools for semantic indexing and context retrieval ( `index_repo`, `get_file_context`) directly to the Gemini agent. - **Smart revert**: A git-aware revert command that understands logical units of work (tracks, phases, tasks) rather than just commit hashes. ## Installation @@ -112,8 +113,21 @@ During implementation, you can also: | `/conductor:status` | Displays the current progress of the tracks file and active tracks. | Reads `conductor/tracks.md` | | `/conductor:revert` | Reverts a track, phase, or task by analyzing git history. | Reverts git history | +## Architecture + +Conductor leverages the **Model Context Protocol (MCP)** to provide deep, local integration with your codebase. + +- **Client**: The Gemini CLI acts as the MCP client. +- **Server**: The `aic` package runs as a local MCP server (`python3 -m aic.server`). +- **Tools**: The server exposes the following tools to the agent: + - `aic_index`: Builds/updates the semantic dependency graph. + - `aic_get_file_context`: Retrieves token-optimized skeletons for files and their dependencies. + - `aic_list_directory`: Provides filesystem visibility. + - `aic_run_shell_command`: Allows safe execution of setup and maintenance commands. + ## Resources +- [AI Compiler Patent](https://www.tdcommons.org/dpubs_series/8241/): Semantic Dependency Graph for AI Agents - [Gemini CLI extensions](https://geminicli.com/docs/extensions/): Documentation about using extensions in Gemini CLI - [GitHub issues](https://github.com/gemini-cli-extensions/conductor/issues): Report bugs or request features diff --git a/aic/__init__.py b/aic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aic/cli.py b/aic/cli.py new file mode 100644 index 0000000..55061b2 --- /dev/null +++ b/aic/cli.py @@ -0,0 +1,99 @@ +import argparse +import os +from aic.db import init_db, upsert_node, get_node, get_dependencies, update_edges, mark_dirty +from aic.skeleton import UniversalSkeletonizer +from aic.utils import calculate_hash, resolve_dep_to_path, get_ignore_patterns, should_ignore + +def index_repo(root_dir="."): + init_db() + skeletonizer = UniversalSkeletonizer() + ignore_patterns = get_ignore_patterns(root_dir) + + indexed_count = 0 + + for root, dirs, files in os.walk(root_dir): + # Exclusions + dirs[:] = [d for d in dirs if not should_ignore(d, ignore_patterns)] + + for file in files: + if should_ignore(file, ignore_patterns): + continue + + file_path = os.path.join(root, file) + rel_path = os.path.relpath(file_path, root_dir) + + # Skip non-text files to avoid reading binaries + # Simple heuristic: check extension or try reading + try: + with open(file_path, 'r', encoding='utf-8', errors='strict') as f: + content = f.read() + except UnicodeDecodeError: + # print(f"Skipping binary file: {rel_path}") + continue + except Exception as e: + print(f"Skipping {rel_path}: {e}") + continue + + current_hash = calculate_hash(content) + existing = get_node(rel_path) + + if existing and existing['hash'] == current_hash: + continue + + print(f"Indexing: {rel_path}") + skeleton, dependencies = skeletonizer.skeletonize(content, rel_path) + upsert_node(rel_path, current_hash, skeleton) + mark_dirty(rel_path) + + # Resolve dependencies to file paths + resolved_deps = [] + for dep in dependencies: + resolved = resolve_dep_to_path(dep, rel_path, root_dir) + if resolved: + resolved_deps.append(resolved) + + update_edges(rel_path, resolved_deps) + indexed_count += 1 + + print(f"Finished indexing. Processed {indexed_count} files.") + +def get_context(file_path): + node = get_node(file_path) + if not node: + return f"# Error: {file_path} not indexed." + + output = [f"# Context for {file_path}", node['skeleton'], ""] + + deps = get_dependencies(file_path) + if deps: + output.append("## Dependencies") + for dep in deps: + dep_node = get_node(dep) + if dep_node: + output.append(f"### {dep}") + output.append(dep_node['skeleton']) + output.append("") + + return "\n".join(output) + +def main(): + parser = argparse.ArgumentParser(description="AIC: AI Compiler") + subparsers = parser.add_subparsers(dest="command") + + subparsers.add_parser("index") + + context_parser = subparsers.add_parser("context") + context_parser.add_argument("file") + + args = parser.parse_args() + + if args.command == "index": + index_repo() + print("Finished indexing.") + elif args.command == "context": + print(get_context(args.file)) + else: + parser.print_help() + +if __name__ == "__main__": + main() diff --git a/aic/db.py b/aic/db.py new file mode 100644 index 0000000..ccd8058 --- /dev/null +++ b/aic/db.py @@ -0,0 +1,65 @@ +import sqlite3 +import os + +DB_PATH = ".aic/graph.db" + +def get_connection(): + os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + +def init_db(): + with get_connection() as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS nodes ( + path TEXT PRIMARY KEY, + hash TEXT, + skeleton TEXT, + status TEXT DEFAULT 'CLEAN' + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS edges ( + source TEXT, + target TEXT, + PRIMARY KEY (source, target), + FOREIGN KEY(source) REFERENCES nodes(path) + ) + """) + +def upsert_node(path, hash_val, skeleton): + with get_connection() as conn: + conn.execute(""" + INSERT INTO nodes (path, hash, skeleton, status) + VALUES (?, ?, ?, 'CLEAN') + ON CONFLICT(path) DO UPDATE SET + hash = excluded.hash, + skeleton = excluded.skeleton, + status = 'CLEAN' + """, (path, hash_val, skeleton)) + +def mark_dirty(path): + """Mark all nodes that depend on this path as DIRTY.""" + with get_connection() as conn: + conn.execute(""" + UPDATE nodes + SET status = 'DIRTY' + WHERE path IN ( + SELECT source FROM edges WHERE target = ? + ) + """, (path,)) + +def update_edges(source_path, target_paths): + with get_connection() as conn: + conn.execute("DELETE FROM edges WHERE source = ?", (source_path,)) + for target in target_paths: + conn.execute("INSERT OR IGNORE INTO edges (source, target) VALUES (?, ?)", (source_path, target)) + +def get_node(path): + with get_connection() as conn: + return conn.execute("SELECT * FROM nodes WHERE path = ?", (path,)).fetchone() + +def get_dependencies(path): + with get_connection() as conn: + return [row['target'] for row in conn.execute("SELECT target FROM edges WHERE source = ?", (path,)).fetchall()] diff --git a/aic/server.py b/aic/server.py new file mode 100644 index 0000000..5801e66 --- /dev/null +++ b/aic/server.py @@ -0,0 +1,288 @@ +import asyncio +import sys +import json +import logging +import inspect +import os +import traceback +from typing import Any, Callable, Dict, List, Optional + +from aic.db import init_db, upsert_node, update_edges, mark_dirty +from aic.skeleton import UniversalSkeletonizer +from aic.utils import calculate_hash, resolve_dep_to_path, get_ignore_patterns, should_ignore +from aic.cli import get_context + +# Configure logging to stderr so it doesn't interfere with JSON-RPC on stdout +logging.basicConfig(stream=sys.stderr, level=logging.INFO) +logger = logging.getLogger("aic-server") + +class MCPServer: + def __init__(self, name: str): + self.name = name + self.tools: Dict[str, Callable] = {} + self.tool_schemas: List[Dict[str, Any]] = [] + + def tool(self): + """Decorator to register a function as a tool.""" + def decorator(func: Callable): + self.register_tool(func) + return func + return decorator + + def register_tool(self, func: Callable): + name = func.__name__ + doc = inspect.getdoc(func) or "" + sig = inspect.signature(func) + + properties = {} + required = [] + + for param_name, param in sig.parameters.items(): + param_type = "string" # Default to string for simplicity in this minimal implementation + if param.annotation == int: + param_type = "integer" + elif param.annotation == bool: + param_type = "boolean" + + properties[param_name] = { + "type": param_type, + "description": f"Parameter {param_name}" + } + # Simple heuristic: parameters without defaults are required + if param.default == inspect.Parameter.empty: + required.append(param_name) + + schema = { + "name": name, + "description": doc, + "inputSchema": { + "type": "object", + "properties": properties, + "required": required + } + } + + self.tools[name] = func + self.tool_schemas.append(schema) + logger.info(f"Registered tool: {name}") + + async def handle_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]: + method = request.get("method") + msg_id = request.get("id") + + if method == "tools/list": + return { + "jsonrpc": "2.0", + "id": msg_id, + "result": { + "tools": self.tool_schemas + } + } + + elif method == "tools/call": + params = request.get("params", {}) + tool_name = params.get("name") + tool_args = params.get("arguments", {}) + + if tool_name not in self.tools: + return { + "jsonrpc": "2.0", + "id": msg_id, + "error": { + "code": -32601, + "message": f"Tool not found: {tool_name}" + } + } + + try: + func = self.tools[tool_name] + # Check if async + if inspect.iscoroutinefunction(func): + result = await func(**tool_args) + else: + result = func(**tool_args) + + return { + "jsonrpc": "2.0", + "id": msg_id, + "result": { + "content": [ + { + "type": "text", + "text": str(result) + } + ] + } + } + except Exception as e: + logger.error(f"Error executing {tool_name}: {traceback.format_exc()}") + return { + "jsonrpc": "2.0", + "id": msg_id, + "error": { + "code": -32603, + "message": f"Internal error: {str(e)}" + } + } + + # Handle other MCP lifecycle methods strictly to avoid errors + elif method == "initialize": + return { + "jsonrpc": "2.0", + "id": msg_id, + "result": { + "protocolVersion": "0.1.0", + "capabilities": { + "tools": {} + }, + "serverInfo": { + "name": self.name, + "version": "0.1.0" + } + } + } + elif method == "notifications/initialized": + # No response needed for notifications + return None + + return None + + async def run(self): + logger.info(f"Starting {self.name} server on stdio...") + + # We need to read from stdin line by line (JSON-RPC) + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + await loop.connect_read_pipe(lambda: protocol, sys.stdin) + + while True: + try: + line = await reader.readline() + if not line: + break + + message = json.loads(line) + response = await self.handle_request(message) + + if response: + sys.stdout.write(json.dumps(response) + "\n") + sys.stdout.flush() + + except json.JSONDecodeError: + logger.error("Failed to decode JSON from stdin") + except Exception as e: + logger.error(f"Unexpected error: {e}") + break + +server = MCPServer("aic") + +@server.tool() +async def aic_index(root_dir: str) -> str: + """ + Indexes the repository to build a semantic dependency graph. + Scans for Python files, generates skeletons, and updates the SQLite database. + """ + init_db() + skeletonizer = UniversalSkeletonizer() + indexed_count = 0 + + # Ensure we use absolute path for walking + abs_root_dir = os.path.abspath(root_dir) + ignore_patterns = get_ignore_patterns(abs_root_dir) + + for root, dirs, files in os.walk(abs_root_dir): + # Exclusions + dirs[:] = [d for d in dirs if not should_ignore(d, ignore_patterns)] + + for file in files: + if should_ignore(file, ignore_patterns): + continue + + file_path = os.path.join(root, file) + rel_path = os.path.relpath(file_path, abs_root_dir) + + # Skip non-text files to avoid reading binaries + try: + with open(file_path, 'r', encoding='utf-8', errors='strict') as f: + content = f.read() + except UnicodeDecodeError: + continue + except Exception as e: + print(f"Skipping {rel_path}: {e}") + continue + + current_hash = calculate_hash(content) + + skeleton, dependencies = skeletonizer.skeletonize(content, rel_path) + upsert_node(rel_path, current_hash, skeleton) + mark_dirty(rel_path) + + # Resolve dependencies to file paths + resolved_deps = [] + for dep in dependencies: + resolved = resolve_dep_to_path(dep, rel_path, abs_root_dir) + if resolved: + resolved_deps.append(resolved) + + update_edges(rel_path, resolved_deps) + indexed_count += 1 + + return f"Successfully indexed {indexed_count} files in {abs_root_dir}" + +@server.tool() +async def aic_get_file_context(file_path: str) -> str: + """ + Retrieves the extensive context for a file, including its skeleton and its direct dependencies' skeletons. + """ + try: + return get_context(file_path) + except Exception as e: + return f"Error retrieving context for {file_path}: {str(e)}" + +@server.tool() +async def aic_list_directory(path: str) -> str: + """ + Lists the files and directories in the specified path. + """ + try: + abs_path = os.path.abspath(path) + if not os.path.exists(abs_path): + return f"Error: Path '{path}' not found." + + items = [] + for name in os.listdir(abs_path): + full_path = os.path.join(abs_path, name) + is_dir = os.path.isdir(full_path) + items.append(f"{name}{'/' if is_dir else ''}") + + return "\n".join(sorted(items)) + except Exception as e: + return f"Error listing directory '{path}': {str(e)}" + +@server.tool() +async def aic_run_shell_command(command: str, cwd: str) -> str: + """ + Executes a shell command. + """ + try: + process = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=cwd + ) + stdout, stderr = await process.communicate() + + output = f"Exit Code: {process.returncode}\n" + if stdout: + output += f"\nStandard Output:\n{stdout.decode().strip()}" + if stderr: + output += f"\nStandard Error:\n{stderr.decode().strip()}" + + return output + except Exception as e: + return f"Error executing command: {str(e)}" + +if __name__ == "__main__": + asyncio.run(server.run()) \ No newline at end of file diff --git a/aic/skeleton.py b/aic/skeleton.py new file mode 100644 index 0000000..42d5401 --- /dev/null +++ b/aic/skeleton.py @@ -0,0 +1,126 @@ +import ast +import os + +class PythonSkeletonizer(ast.NodeVisitor): + def __init__(self): + self.reset() + + def reset(self): + self.skeleton = [] + self.dependencies = set() + self.imports = [] + + def skeletonize(self, source_code, path): + self.reset() + try: + tree = ast.parse(source_code) + except Exception as e: + return f"# BUG: Failed to parse {path}: {str(e)}", set() + + self.visit(tree) + return "\n".join(self.skeleton), self.dependencies + + def visit_Import(self, node): + for alias in node.names: + self.dependencies.add(alias.name) + self.imports.append(f"import {alias.name}") + + def visit_ImportFrom(self, node): + module = node.module or "" + level = node.level + # Handle relative imports level + prefix = "." * level if level > 0 else "" + full_module = prefix + module + + for alias in node.names: + self.dependencies.add(full_module) + self.imports.append(f"from {full_module} import {alias.name}") + + def visit_ClassDef(self, node): + # Extract class signature + self.skeleton.append(f"class {node.name}:") + docstring = ast.get_docstring(node) + if docstring: + self.skeleton.append(f' """{docstring}"""') + + # We don't visit children yet, just let the visitor handle them + # But we want to indent them + old_skeleton = self.skeleton + self.skeleton = [] + self.generic_visit(node) + inner = self.skeleton + self.skeleton = old_skeleton + for line in inner: + self.skeleton.append(f" {line}") + self.skeleton.append("") # Spacer + + def visit_FunctionDef(self, node): + self._skeletonize_func(node) + + def visit_AsyncFunctionDef(self, node): + self._skeletonize_func(node, is_async=True) + + def _skeletonize_func(self, node, is_async=False): + prefix = "async " if is_async else "" + args = ast.unparse(node.args) if hasattr(ast, 'unparse') else "..." + returns = f" -> {ast.unparse(node.returns)}" if hasattr(ast, 'unparse') and node.returns else "" + + signature = f"{prefix}def {node.name}({args}){returns}:" + self.skeleton.append(signature) + + docstring = ast.get_docstring(node) + if docstring: + self.skeleton.append(f' """{docstring}"""') + + # Effects analysis + effects = self._analyze_effects(node) + if effects: + self.skeleton.append(f" # {effects}") + + self.skeleton.append(" ...") + self.skeleton.append("") # Spacer + + def _analyze_effects(self, node): + returns = [] + raises = [] + calls = [] + + for child in ast.walk(node): + if isinstance(child, ast.Return): + if child.value: + try: + returns.append(ast.unparse(child.value)) + except: + returns.append("some_value") + elif isinstance(child, ast.Raise): + if child.exc: + try: + raises.append(ast.unparse(child.exc)) + except: + raises.append("Exception") + elif isinstance(child, ast.Call): + try: + calls.append(ast.unparse(child.func)) + except: + pass + + res = [] + if returns: res.append(f"RETURNS: {' | '.join(list(set(returns))[:3])}") + if raises: res.append(f"RAISES: {' | '.join(list(set(raises))[:3])}") + if calls: res.append(f"CALLS: {' | '.join(list(set(calls))[:5])}") + + return " | ".join(res) + +class UniversalSkeletonizer: + def __init__(self): + self.py_skeletonizer = PythonSkeletonizer() + + def skeletonize(self, source_code, path): + if path.endswith('.py'): + return self.py_skeletonizer.skeletonize(source_code, path) + else: + # For non-Python files, treat content as the skeleton + # Limit size to avoid DB bloat (e.g. 100KB) + if len(source_code) > 100 * 1024: + return f"# Content truncated (size: {len(source_code)} bytes)\n" + source_code[:100*1024] + "...", set() + return source_code, set() diff --git a/aic/utils.py b/aic/utils.py new file mode 100644 index 0000000..04bb5df --- /dev/null +++ b/aic/utils.py @@ -0,0 +1,70 @@ +import hashlib +import os +import fnmatch + +def calculate_hash(content): + if isinstance(content, str): + content = content.encode('utf-8') + return hashlib.sha256(content).hexdigest() + +def get_ignore_patterns(root_dir): + """ + Loads ignore patterns from .geminiignore and .gitignore, plus defaults. + """ + # Defaults + patterns = {'.git', '.aic', '__pycache__', 'node_modules', '.DS_Store', 'venv', '.venv', 'env', '.env', 'dist', 'build'} + + for filename in ['.geminiignore', '.gitignore']: + path = os.path.join(root_dir, filename) + if os.path.exists(path): + try: + with open(path, 'r') as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + # Normalize pattern: remove leading/trailing slashes for simple matching + # This is a naive implementation; proper gitignore handling is complex + clean_line = line.rstrip('/') + if clean_line: + patterns.add(clean_line) + except Exception: + pass # Fail silently on read errors + + return list(patterns) + +def should_ignore(name, patterns): + """ + Checks if a name matches any of the ignore patterns. + """ + for pattern in patterns: + if fnmatch.fnmatch(name, pattern): + return True + return False + +def resolve_dep_to_path(dep_name, current_file, root_dir): + """Simple heuristic to resolve module name to file path.""" + # Handle relative imports (e.g., '.module' or '..module') + if dep_name.startswith('.'): + levels = 0 + while dep_name.startswith('.'): + levels += 1 + dep_name = dep_name[1:] + + curr_dir = os.path.dirname(current_file) + for _ in range(levels - 1): + curr_dir = os.path.dirname(curr_dir) + + base_path = os.path.join(curr_dir, dep_name.replace('.', os.sep)) + else: + base_path = os.path.join(root_dir, dep_name.replace('.', os.sep)) + + candidates = [ + base_path + ".py", + os.path.join(base_path, "__init__.py") + ] + + for cand in candidates: + if os.path.exists(cand): + return os.path.relpath(cand, root_dir) + return None diff --git a/commands/conductor/implement.toml b/commands/conductor/implement.toml index 9988a6c..3603f26 100644 --- a/commands/conductor/implement.toml +++ b/commands/conductor/implement.toml @@ -66,6 +66,9 @@ CRITICAL: You must validate the success of every tool call. If any tool call fai - **Workflow:** Resolve **Workflow** (via the **Universal File Resolution Protocol** using the project's index file). c. **Error Handling:** If you fail to read any of these files, you MUST stop and inform the user of the error. +4. **Semantic Dependency Awareness (AIC):** + - **Protocol:** During the implementation of any task, if you encounter imported internal modules or functions whose names are not fully self-describing or whose implementation is not directly in your current context, you SHOULD call the `aic_get_file_context` tool for those modules. This will provide you with the "Rich Skeleton" (signatures, docstrings, and IO/side-effects) of the dependency without consuming excessive tokens. + 4. **Execute Tasks and Update Track Plan:** a. **Announce:** State that you will now execute the tasks from the track's **Implementation Plan** by following the procedures in the **Workflow**. b. **Iterate Through Tasks:** You MUST now loop through each task in the track's **Implementation Plan** one by one. diff --git a/commands/conductor/newTrack.toml b/commands/conductor/newTrack.toml index aab88e8..c391979 100644 --- a/commands/conductor/newTrack.toml +++ b/commands/conductor/newTrack.toml @@ -72,6 +72,9 @@ CRITICAL: You must validate the success of every tool call. If any tool call fai * Examples include reproduction steps for bugs, specific scope for chores, or success criteria. * Tailor the questions to the specific request. +4. **Semantic Context (Optional):** + - **Recommendation:** If the track involves complex interactions with existing code, you SHOULD call the `aic_get_file_context` tool for relevant files to better understand the API contracts and dependencies. This will help you generate a more accurate specification and plan. + 3. **Draft `spec.md`:** Once sufficient information is gathered, draft the content for the track's `spec.md` file, including sections like Overview, Functional Requirements, Non-Functional Requirements (if any), Acceptance Criteria, and Out of Scope. 4. **User Confirmation:** Present the drafted `spec.md` content to the user for review and approval. diff --git a/commands/conductor/setup.toml b/commands/conductor/setup.toml index 2f6850c..e49995f 100644 --- a/commands/conductor/setup.toml +++ b/commands/conductor/setup.toml @@ -343,6 +343,9 @@ CRITICAL: When determining model complexity, ALWAYS select the "flash" model, re 2. **Summarize Actions:** Present a summary of all actions taken during Phase 1, including: - The guide files that were copied. - The workflow file that was copied. +2. **Semantic Indexing (AIC):** + - **Announce Action:** Inform the user: "Finally, I will perform a semantic indexing of your project using AIC (AI Compiler). This creates a lightweight dependency graph to help me understand your code better." + - **Action:** Call the `aic_index` tool to index the repository. 3. **Transition to initial plan and track generation:** Announce that the initial setup is complete and you will now proceed to define the first track for the project. --- diff --git a/gemini-extension.json b/gemini-extension.json index fe6539e..a5c4c9d 100644 --- a/gemini-extension.json +++ b/gemini-extension.json @@ -2,4 +2,4 @@ "name": "conductor", "version": "0.2.0", "contextFileName": "GEMINI.md" -} +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..74613ca --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "conductor-aic" +version = "0.1.3" +description = "Conductor Extension for Gemini CLI with AI Compiler (AIC)" +readme = "README.md" +requires-python = ">=3.8" +license = "Apache-2.0" +authors = [ + { name = "Google DeepMind" }, +] +dependencies = [] + +[project.scripts] +aic = "aic.cli:main" + +[tool.hatch.build.targets.wheel] +packages = ["aic"]