From f4de69f728434fb470572382fd2e8c2519c4658f Mon Sep 17 00:00:00 2001 From: "Edward Z. Yang" Date: Mon, 5 May 2025 20:50:37 -0400 Subject: [PATCH] Update [ghstack-poisoned] --- codemcp/main.py | 42 +++---- codemcp/tools/__init__.py | 8 +- codemcp/tools/edit_file.py | 10 +- codemcp/tools/glob.py | 211 +++++++++++++----------------------- codemcp/tools/grep.py | 6 +- codemcp/tools/ls.py | 16 +-- codemcp/tools/mv.py | 9 +- codemcp/tools/read_file.py | 14 +-- codemcp/tools/rm.py | 61 ++++++----- codemcp/tools/write_file.py | 36 +++--- 10 files changed, 175 insertions(+), 238 deletions(-) diff --git a/codemcp/main.py b/codemcp/main.py index 556c70c..a508ef0 100644 --- a/codemcp/main.py +++ b/codemcp/main.py @@ -17,17 +17,17 @@ from .common import normalize_file_path from .git_query import get_current_commit_hash from .tools.chmod import chmod -from .tools.edit_file import edit_file_content -from .tools.glob import glob_files -from .tools.grep import grep_files +from .tools.edit_file import edit_file +from .tools.glob import glob +from .tools.grep import grep from .tools.init_project import init_project -from .tools.ls import ls_directory -from .tools.mv import mv_file -from .tools.read_file import read_file_content -from .tools.rm import rm_file +from .tools.ls import ls +from .tools.mv import mv +from .tools.read_file import read_file +from .tools.rm import rm from .tools.run_command import run_command from .tools.think import think -from .tools.write_file import write_file_content +from .tools.write_file import write_file # Initialize FastMCP server mcp = FastMCP("codemcp") @@ -182,7 +182,7 @@ async def codemcp( if path is None: raise ValueError("path is required for ReadFile subtool") - result = await read_file_content(path, offset, limit, chat_id, commit_hash) + result = await read_file(path, offset, limit, chat_id, commit_hash) return result if subtool == "WriteFile": @@ -193,9 +193,7 @@ async def codemcp( if chat_id is None: raise ValueError("chat_id is required for WriteFile subtool") - result = await write_file_content( - path, content, description, chat_id, commit_hash - ) + result = await write_file(path, content, description, chat_id, commit_hash) return result if subtool == "EditFile": @@ -216,16 +214,14 @@ async def codemcp( # Accept either new_string or new_str (prefer new_string if both are provided) new_content = new_string or new_str - result = await edit_file_content( - path, old_content, new_content, None, description, chat_id, commit_hash - ) + result = await edit_file(path, old_content, new_content, None, description, chat_id, commit_hash) return result if subtool == "LS": if path is None: raise ValueError("path is required for LS subtool") - result = await ls_directory(path, chat_id, commit_hash) + result = await ls(path, chat_id, commit_hash) return result if subtool == "InitProject": @@ -267,9 +263,7 @@ async def codemcp( raise ValueError("path is required for Grep subtool") try: - result_string = await grep_files( - pattern, path, include, chat_id, commit_hash - ) + result_string = await grep(pattern, path, include, chat_id, commit_hash) return result_string except Exception as e: logging.error(f"Error in Grep subtool: {e}", exc_info=True) @@ -282,9 +276,7 @@ async def codemcp( raise ValueError("path is required for Glob subtool") try: - result_string = await glob_files( - pattern, path, limit, offset, chat_id, commit_hash - ) + result_string = await glob(pattern, path, limit, offset, chat_id, commit_hash) return result_string except Exception as e: logging.error(f"Error in Glob subtool: {e}", exc_info=True) @@ -298,7 +290,7 @@ async def codemcp( if chat_id is None: raise ValueError("chat_id is required for RM subtool") - result = await rm_file(path, description, chat_id, commit_hash) + result = await rm(path, description, chat_id, commit_hash) return result if subtool == "MV": @@ -315,9 +307,7 @@ async def codemcp( if chat_id is None: raise ValueError("chat_id is required for MV subtool") - result = await mv_file( - source_path, target_path, description, chat_id, commit_hash - ) + result = await mv(source_path, target_path, description, chat_id, commit_hash) return result if subtool == "Think": diff --git a/codemcp/tools/__init__.py b/codemcp/tools/__init__.py index 26eb439..5cc32f3 100644 --- a/codemcp/tools/__init__.py +++ b/codemcp/tools/__init__.py @@ -6,8 +6,8 @@ from .git_diff import git_diff from .git_log import git_log from .git_show import git_show -from .mv import mv_file -from .rm import rm_file +from .mv import mv +from .rm import rm __all__ = [ "chmod", @@ -15,6 +15,6 @@ "git_diff", "git_log", "git_show", - "mv_file", - "rm_file", + "mv", + "rm", ] diff --git a/codemcp/tools/edit_file.py b/codemcp/tools/edit_file.py index 2e101eb..b4c6621 100644 --- a/codemcp/tools/edit_file.py +++ b/codemcp/tools/edit_file.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) __all__ = [ - "edit_file_content", + "edit_file", "find_similar_file", "apply_edit_pure", ] @@ -744,8 +744,8 @@ def debug_string_comparison( return not content_same -async def edit_file_content( - file_path: str, +async def edit_file( + path: str, old_string: str | None = None, new_string: str | None = None, read_file_timestamps: dict[str, float] | None = None, @@ -761,7 +761,7 @@ async def edit_file_content( whitespace on otherwise empty lines. Args: - file_path: The absolute path to the file to edit + path: The absolute path to the file to edit old_string: The text to replace (use empty string for new file creation) new_string: The new text to replace old_string with read_file_timestamps: Dictionary mapping file paths to timestamps when they were last read @@ -785,7 +785,7 @@ async def edit_file_content( chat_id = "" if chat_id is None else chat_id # Normalize the file path - full_file_path = normalize_file_path(file_path) + full_file_path = normalize_file_path(path) # Normalize string inputs to ensure consistent newlines old_string = old_string.replace("\r\n", "\n") diff --git a/codemcp/tools/glob.py b/codemcp/tools/glob.py index 5f8a7d9..c675797 100644 --- a/codemcp/tools/glob.py +++ b/codemcp/tools/glob.py @@ -1,161 +1,57 @@ #!/usr/bin/env python3 -import asyncio +import fnmatch import logging import os -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional from ..common import normalize_file_path +from ..git import is_git_repository from .commit_utils import append_commit_hash __all__ = [ - "glob_files", "glob", "render_result_for_assistant", ] -# Define constants -MAX_RESULTS = 100 - - -async def glob( - pattern: str, - path: str, - options: Optional[Dict[str, Any]] = None, -) -> Dict[str, Any]: - """Find files matching a glob pattern. - - Args: - pattern: The glob pattern to match files against - path: The directory to search in - options: Optional parameters for pagination (limit, offset) - - Returns: - A dictionary with matched files and metadata - """ - if options is None: - options = {} - - limit = options.get("limit", MAX_RESULTS) - offset = options.get("offset", 0) - - # Normalize the directory path - absolute_path = normalize_file_path(path) - - # In non-test environment, verify the path exists - if not os.environ.get("DESKAID_TESTING"): - # Check if path exists - if not os.path.exists(absolute_path): - raise FileNotFoundError(f"Path does not exist: {path}") - - # Check if it's a directory - if not os.path.isdir(absolute_path): - raise ValueError(f"Path is not a directory: {path}") - - # Create Path object for the directory - path_obj = Path(absolute_path) - - try: - # Use pathlib's glob functionality to find matching files - if pattern.startswith("/"): - # Treat as absolute path if it starts with / - matches = list(Path("/").glob(pattern[1:])) - else: - # Use relative path otherwise - matches = list(path_obj.glob(pattern)) - - # Filter out directories if they match the pattern - matches = [match for match in matches if match.is_file()] - - # Sort matches by modification time (newest first) - loop = asyncio.get_event_loop() - - # Get file stats asynchronously - stats: List[Optional[os.stat_result]] = [] - for match in matches: - file_stat = await loop.run_in_executor( - None, lambda m=match: os.stat(m) if os.path.exists(m) else None - ) - stats.append(file_stat) - - matches_with_stats: List[Tuple[Path, Optional[os.stat_result]]] = list( - zip(matches, stats, strict=False) - ) - - # In tests, sort by filename for deterministic results - if os.environ.get("NODE_ENV") == "test": - matches_with_stats.sort(key=lambda x: str(x[0])) - else: - # Sort by modification time (newest first), with filename as tiebreaker - matches_with_stats.sort( - key=lambda x: (-(x[1].st_mtime if x[1] else 0), str(x[0])) - ) - - matches = [match for match, _ in matches_with_stats] - - # Convert Path objects to strings - file_paths = [str(match) for match in matches] - - # Apply pagination - total_files = len(file_paths) - if offset > 0: - file_paths = file_paths[offset:] - - truncated = total_files > (offset + limit) - - # Limit the number of results - file_paths = file_paths[:limit] - - return { - "files": file_paths, - "truncated": truncated, - "total": total_files, - } - except Exception as e: - logging.exception(f"Error executing glob: {e!s}") - raise - def render_result_for_assistant(output: Dict[str, Any]) -> str: - """Render the results in a format suitable for the assistant. + """Render the glob results in a format suitable for the assistant. Args: - output: The glob results dictionary + output: The output from the glob operation Returns: A formatted string representation of the results """ - filenames = output.get("filenames", []) - num_files = output.get("numFiles", 0) + filenames = output.get("files", []) + num_files = output.get("total", 0) if num_files == 0: return "No files found" - result = os.linesep.join(filenames) + result = f"Found {num_files} files:\n\n" - # Only add truncation message if results were actually truncated - if output.get("truncated", False): - result += ( - "\n(Results are truncated. Consider using a more specific path or pattern.)" - ) + # Add each filename to the result + for filename in filenames: + result += f"{filename}\n" return result -async def glob_files( +async def glob( pattern: str, - path: str | None = None, + path: str, limit: int | None = None, offset: int | None = None, chat_id: str | None = None, commit_hash: str | None = None, ) -> str: - """Search for files matching a glob pattern. + """Find files matching a pattern. Args: pattern: The glob pattern to match files against - path: The directory to search in (defaults to current working directory) + path: The directory to search in limit: Maximum number of results to return offset: Number of results to skip (for pagination) chat_id: The unique ID of the current chat session @@ -166,30 +62,73 @@ async def glob_files( """ try: - # Use current directory if path is not provided - directory = path or os.getcwd() - normalized_path = normalize_file_path(directory) + # Set default values + chat_id = "" if chat_id is None else chat_id + limit_val = 100 if limit is None else limit + offset_val = 0 if offset is None else offset - # Set default values for limit and offset - limit = limit or MAX_RESULTS - offset = offset or 0 + # Normalize the directory path + full_directory_path = normalize_file_path(path) - # Execute glob with options for pagination - options = {"limit": limit, "offset": offset} - result = await glob(pattern, normalized_path, options) + # Validate the directory path + if not os.path.exists(full_directory_path): + raise FileNotFoundError(f"Directory does not exist: {path}") - # Add formatted result for assistant - formatted_result = render_result_for_assistant(result) + if not os.path.isdir(full_directory_path): + raise NotADirectoryError(f"Path is not a directory: {path}") - # Append commit hash - formatted_result, _ = await append_commit_hash( - formatted_result, normalized_path, commit_hash - ) + # Safety check: Verify the directory is within a git repository with codemcp.toml + if not await is_git_repository(full_directory_path): + raise ValueError(f"Directory is not in a Git repository: {path}") - return formatted_result + # Find all matching files + matches: List[str] = [] + for root, dirs, files in os.walk(full_directory_path): + # Skip hidden directories + dirs[:] = [d for d in dirs if not d.startswith(".")] + + # Check files against the pattern + for file in files: + if file.startswith("."): + continue + + file_path = os.path.join(root, file) + rel_path = os.path.relpath(file_path, full_directory_path) + + if fnmatch.fnmatch(rel_path, pattern): + matches.append(rel_path) + + # Sort the matches + matches.sort() + + # Apply offset and limit + total_matches = len(matches) + matches = matches[offset_val : offset_val + limit_val] + + # Create the result dictionary + result_dict = { + "files": matches, + "total": total_matches, + } + + # Format the results + if not matches: + output = f"No files matching '{pattern}' found in {path}" + else: + output = f"Found {total_matches} files matching '{pattern}' in {path}" + if offset_val > 0 or total_matches > offset_val + limit_val: + output += f" (showing {offset_val+1}-{min(offset_val+limit_val, total_matches)} of {total_matches})" + output += ":\n\n" + + for match in matches: + output += f"{match}\n" + + # Append commit hash + result, _ = await append_commit_hash(output, full_directory_path, commit_hash) + return result except Exception as e: # Log the error - logging.error(f"Error in glob_files: {e}", exc_info=True) + logging.error(f"Error in glob: {e}", exc_info=True) # Return error message error_message = f"Error searching for files: {e}" diff --git a/codemcp/tools/grep.py b/codemcp/tools/grep.py index a6b73dd..4b0830d 100644 --- a/codemcp/tools/grep.py +++ b/codemcp/tools/grep.py @@ -11,7 +11,7 @@ from .commit_utils import append_commit_hash __all__ = [ - "grep_files", + "grep", "git_grep", "render_result_for_assistant", "TOOL_NAME_FOR_PROMPT", @@ -156,7 +156,7 @@ def render_result_for_assistant(output: Dict[str, Any]) -> str: return result -async def grep_files( +async def grep( pattern: str, path: str | None = None, include: str | None = None, @@ -215,7 +215,7 @@ async def grep_files( return result_for_assistant except Exception as e: # Log the error - logging.error(f"Error in grep_files: {e}", exc_info=True) + logging.error(f"Error in grep: {e}", exc_info=True) # Return error message error_message = f"Error searching for pattern: {e}" diff --git a/codemcp/tools/ls.py b/codemcp/tools/ls.py index a88f151..8fcee3a 100644 --- a/codemcp/tools/ls.py +++ b/codemcp/tools/ls.py @@ -10,7 +10,7 @@ from .commit_utils import append_commit_hash __all__ = [ - "ls_directory", + "ls", "list_directory", "skip", "TreeNode", @@ -23,13 +23,13 @@ TRUNCATED_MESSAGE = f"There are more than {MAX_FILES} files in the directory. Use more specific paths to explore nested directories. The first {MAX_FILES} files and directories are included below:\n\n" -async def ls_directory( - directory_path: str, chat_id: str | None = None, commit_hash: str | None = None +async def ls( + path: str, chat_id: str | None = None, commit_hash: str | None = None ) -> str: """List the contents of a directory. Args: - directory_path: The absolute path to the directory to list + path: The absolute path to the directory to list chat_id: The unique ID of the current chat session commit_hash: Optional Git commit hash for version tracking @@ -41,18 +41,18 @@ async def ls_directory( chat_id = "" if chat_id is None else chat_id # Normalize the directory path - full_directory_path = normalize_file_path(directory_path) + full_directory_path = normalize_file_path(path) # Validate the directory path if not os.path.exists(full_directory_path): - raise FileNotFoundError(f"Directory does not exist: {directory_path}") + raise FileNotFoundError(f"Directory does not exist: {path}") if not os.path.isdir(full_directory_path): - raise NotADirectoryError(f"Path is not a directory: {directory_path}") + raise NotADirectoryError(f"Path is not a directory: {path}") # Safety check: Verify the directory is within a git repository with codemcp.toml if not await is_git_repository(full_directory_path): - raise ValueError(f"Directory is not in a Git repository: {directory_path}") + raise ValueError(f"Directory is not in a Git repository: {path}") # Check edit permission (which verifies codemcp.toml exists) is_permitted, permission_message = await check_edit_permission(full_directory_path) diff --git a/codemcp/tools/mv.py b/codemcp/tools/mv.py index ef915e6..fe5da7d 100644 --- a/codemcp/tools/mv.py +++ b/codemcp/tools/mv.py @@ -3,18 +3,21 @@ import logging import os import pathlib +import shutil +from typing import Optional +from ..access import check_edit_permission from ..common import normalize_file_path -from ..git import commit_changes, get_repository_root +from ..git import commit_changes, get_repository_root, is_git_repository from ..shell import run_command from .commit_utils import append_commit_hash __all__ = [ - "mv_file", + "mv", ] -async def mv_file( +async def mv( source_path: str, target_path: str, description: str | None = None, diff --git a/codemcp/tools/read_file.py b/codemcp/tools/read_file.py index faadfc8..122a63d 100644 --- a/codemcp/tools/read_file.py +++ b/codemcp/tools/read_file.py @@ -14,12 +14,12 @@ from .commit_utils import append_commit_hash __all__ = [ - "read_file_content", + "read_file", ] -async def read_file_content( - file_path: str, +async def read_file( + path: str, offset: int | None = None, limit: int | None = None, chat_id: str | None = None, @@ -28,7 +28,7 @@ async def read_file_content( """Read a file's content with optional offset and limit. Args: - file_path: The absolute path to the file to read + path: The absolute path to the file to read offset: The line number to start reading from (1-indexed) limit: The number of lines to read chat_id: The unique ID of the current chat session @@ -42,15 +42,15 @@ async def read_file_content( chat_id = "" if chat_id is None else chat_id # Normalize the file path - full_file_path = normalize_file_path(file_path) + full_file_path = normalize_file_path(path) # Validate the file path if not os.path.exists(full_file_path): # Try to find a similar file (stub - would need implementation) - raise FileNotFoundError(f"File does not exist: {file_path}") + raise FileNotFoundError(f"File does not exist: {path}") if os.path.isdir(full_file_path): - raise IsADirectoryError(f"Path is a directory, not a file: {file_path}") + raise IsADirectoryError(f"Path is a directory, not a file: {path}") # Check file size before reading file_size = os.path.getsize(full_file_path) diff --git a/codemcp/tools/rm.py b/codemcp/tools/rm.py index 757d6ed..4fb3272 100644 --- a/codemcp/tools/rm.py +++ b/codemcp/tools/rm.py @@ -3,71 +3,76 @@ import logging import os import pathlib +from typing import Optional +from ..access import check_edit_permission from ..common import normalize_file_path -from ..git import commit_changes, get_repository_root +from ..git import commit_changes, get_repository_root, is_git_repository from ..shell import run_command from .commit_utils import append_commit_hash __all__ = [ - "rm_file", + "rm", ] -async def rm_file( - path: str, - description: str | None = None, - chat_id: str | None = None, - commit_hash: str | None = None, +async def rm( + path: str, description: str, chat_id: str, commit_hash: Optional[str] = None ) -> str: - """Remove a file using git rm. + """Remove a file or directory. Args: - path: The path to the file to remove (can be absolute or relative to repository root) - description: Short description of why the file is being removed + path: The absolute path to the file or directory to remove + description: Short description of the change chat_id: The unique ID of the current chat session commit_hash: Optional Git commit hash for version tracking Returns: - A string containing the result of the removal operation - """ - # Set default values - description = "" if description is None else description - chat_id = "" if chat_id is None else chat_id + A success message - # Use the directory from the path as our starting point - file_path = normalize_file_path(path) - dir_path = os.path.dirname(file_path) if os.path.dirname(file_path) else "." + """ + # Normalize the file path + full_path = normalize_file_path(path) - if not os.path.exists(file_path): + # Validate the file path + if not os.path.exists(full_path): raise FileNotFoundError(f"File does not exist: {path}") - if not os.path.isfile(file_path): - raise ValueError(f"Path is not a file: {path}") + # Safety check: Verify the file is within a git repository with codemcp.toml + if not await is_git_repository(os.path.dirname(full_path)): + raise ValueError(f"File is not in a Git repository: {path}") + + # Check edit permission (which verifies codemcp.toml exists) + is_permitted, permission_message = await check_edit_permission(full_path) + if not is_permitted: + raise ValueError(permission_message) + + # Determine if it's a file or directory + is_dir = os.path.isdir(full_path) # Get git repository root - git_root = await get_repository_root(dir_path) + git_root = await get_repository_root(os.path.dirname(full_path)) # Ensure paths are absolute and resolve any symlinks - file_path_resolved = os.path.realpath(file_path) + full_path_resolved = os.path.realpath(full_path) git_root_resolved = os.path.realpath(git_root) # Use pathlib to check if the file is within the git repo # This handles path traversal correctly on all platforms try: # Convert to Path objects - file_path_obj = pathlib.Path(file_path_resolved) + full_path_obj = pathlib.Path(full_path_resolved) git_root_obj = pathlib.Path(git_root_resolved) # Check if file is inside the git repo using Path.relative_to - # This will raise ValueError if file_path is not inside git_root - file_path_obj.relative_to(git_root_obj) + # This will raise ValueError if full_path is not inside git_root + full_path_obj.relative_to(git_root_obj) except ValueError: - msg = f"Path {file_path} is not within the git repository at {git_root}" + msg = f"Path {full_path} is not within the git repository at {git_root}" logging.error(msg) raise ValueError(msg) # Get the relative path using pathlib - rel_path = os.path.relpath(file_path_resolved, git_root_resolved) + rel_path = os.path.relpath(full_path_resolved, git_root_resolved) logging.info(f"Using relative path: {rel_path}") # Check if the file is tracked by git from the git root diff --git a/codemcp/tools/write_file.py b/codemcp/tools/write_file.py index e923d61..cda9e66 100644 --- a/codemcp/tools/write_file.py +++ b/codemcp/tools/write_file.py @@ -16,12 +16,12 @@ from .commit_utils import append_commit_hash __all__ = [ - "write_file_content", + "write_file", ] -async def write_file_content( - file_path: str, +async def write_file( + path: str, content: str | dict | list | None = None, description: str | None = None, chat_id: str | None = None, @@ -30,7 +30,7 @@ async def write_file_content( """Write content to a file. Args: - file_path: The absolute path to the file to write + path: The absolute path to the file to write content: The content to write to the file. Can be a string, dict, or list (will be converted to JSON) description: Short description of the change chat_id: The unique ID of the current chat session @@ -50,7 +50,7 @@ async def write_file_content( chat_id = "" if chat_id is None else chat_id # Normalize the file path - file_path = normalize_file_path(file_path) + path = normalize_file_path(path) # Normalize content - if content is not a string, serialize it to a string using json.dumps if content is not None and not isinstance(content, str): @@ -66,53 +66,53 @@ async def write_file_content( ) # Validate file path and permissions - is_valid, error_message = await check_file_path_and_permissions(file_path) + is_valid, error_message = await check_file_path_and_permissions(path) if not is_valid: raise ValueError(error_message) # Check git tracking for existing files is_tracked, track_error = await check_git_tracking_for_existing_file( - file_path, chat_id + path, chat_id ) if not is_tracked: raise ValueError(track_error) # Determine line endings - old_file_exists = os.path.exists(file_path) + old_file_exists = os.path.exists(path) if old_file_exists: - line_endings = await detect_line_endings(file_path) + line_endings = await detect_line_endings(path) else: - line_endings = detect_repo_line_endings(os.path.dirname(file_path)) + line_endings = detect_repo_line_endings(os.path.dirname(path)) # Ensure directory exists for new files - directory = os.path.dirname(file_path) + directory = os.path.dirname(path) os.makedirs(directory, exist_ok=True) # Write the content with UTF-8 encoding and proper line endings - await write_text_content(file_path, content_str, "utf-8", line_endings) + await write_text_content(path, content_str, "utf-8", line_endings) # Try to run the formatter on the file format_message = "" - formatter_success, formatter_output = await run_formatter_without_commit(file_path) + formatter_success, formatter_output = await run_formatter_without_commit(path) if formatter_success: - logging.info(f"Auto-formatted {file_path}") + logging.info(f"Auto-formatted {path}") if formatter_output.strip(): format_message = f"\nAuto-formatted the file" else: # Only log warning if there was actually a format command configured but it failed if not "No format command configured" in formatter_output: - logging.warning(f"Failed to auto-format {file_path}: {formatter_output}") + logging.warning(f"Failed to auto-format {path}: {formatter_output}") # Commit the changes git_message = "" - success, message = await commit_changes(file_path, description, chat_id) + success, message = await commit_changes(path, description, chat_id) if success: git_message = f"\nChanges committed to git: {description}" else: git_message = f"\nFailed to commit changes to git: {message}" - result = f"Successfully wrote to {file_path}{format_message}{git_message}" + result = f"Successfully wrote to {path}{format_message}{git_message}" # Append commit hash - result, _ = await append_commit_hash(result, file_path, commit_hash) + result, _ = await append_commit_hash(result, path, commit_hash) return result