Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion codemcp/hot_reload_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ async def _run_manager_task(
break

if command == "call":
result = await session.call_tool("codemcp", arguments=args)
result = await session.call_tool(
name="codemcp", arguments=args
)
# This is the only error case FastMCP can
# faithfully re-propagate, see
# https://github.com/modelcontextprotocol/python-sdk/issues/348
Expand Down
71 changes: 50 additions & 21 deletions codemcp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def codemcp(
pattern: str | None = None,
include: str | None = None,
command: str | None = None,
arguments: list[str] | str | None = None,
arguments: str | None = None,
old_str: str | None = None, # Added for backward compatibility
new_str: str | None = None, # Added for backward compatibility
chat_id: str | None = None, # Added for chat identification
Expand Down Expand Up @@ -109,28 +109,28 @@ async def codemcp(
# We no longer need to convert string arguments to list since run_command now only accepts strings

# Normalize string inputs to ensure consistent newlines
def normalize_newlines(s):
def normalize_newlines(s: object) -> object:
"""Normalize string to use \n for all newlines."""
return s.replace("\r\n", "\n") if isinstance(s, str) else s

# Normalize content, old_string, and new_string to use consistent \n newlines
content = normalize_newlines(content)
old_string = normalize_newlines(old_string)
new_string = normalize_newlines(new_string)
content_norm = normalize_newlines(content)
old_string_norm = normalize_newlines(old_string)
new_string_norm = normalize_newlines(new_string)
# Also normalize backward compatibility parameters
old_str = normalize_newlines(old_str)
new_str = normalize_newlines(new_str)
old_str_norm = normalize_newlines(old_str)
new_str_norm = normalize_newlines(new_str)
# And user prompt which might contain code blocks
user_prompt = normalize_newlines(user_prompt)
user_prompt_norm = normalize_newlines(user_prompt)

# Get all provided non-None parameters
provided_params = {
param: value
for param, value in {
"path": path,
"content": content,
"old_string": old_string,
"new_string": new_string,
"content": content_norm,
"old_string": old_string_norm,
"new_string": new_string_norm,
"offset": offset,
"limit": limit,
"description": description,
Expand All @@ -139,12 +139,12 @@ def normalize_newlines(s):
"command": command,
"arguments": arguments,
# Include backward compatibility parameters
"old_str": old_str,
"new_str": new_str,
"old_str": old_str_norm,
"new_str": new_str_norm,
# Chat ID for session identification
"chat_id": chat_id,
# InitProject commit message parameters
"user_prompt": user_prompt,
"user_prompt": user_prompt_norm,
"subject_line": subject_line,
# Whether to reuse the chat ID from the HEAD commit
"reuse_head_chat_id": reuse_head_chat_id,
Expand Down Expand Up @@ -188,6 +188,8 @@ def normalize_newlines(s):
else:
content_str = content or ""

if chat_id is None:
raise ValueError("chat_id is required for WriteFile subtool")
return await write_file_content(path, content_str, description, chat_id)

if subtool == "EditFile":
Expand All @@ -205,6 +207,8 @@ def normalize_newlines(s):
old_content = old_string or old_str or ""
# Accept either new_string or new_str (prefer new_string if both are provided)
new_content = new_string or new_str or ""
if chat_id is None:
raise ValueError("chat_id is required for EditFile subtool")
return await edit_file_content(
path, old_content, new_content, None, description, chat_id
)
Expand Down Expand Up @@ -242,10 +246,20 @@ def normalize_newlines(s):
if command is None:
raise ValueError("command is required for RunCommand subtool")

# Ensure chat_id is provided
if chat_id is None:
raise ValueError("chat_id is required for RunCommand subtool")

# Ensure arguments is a string for run_command
args_str = (
arguments
if isinstance(arguments, str) or arguments is None
else " ".join(arguments)
)
return await run_command(
path,
command,
arguments,
args_str,
chat_id,
)

Expand Down Expand Up @@ -303,6 +317,8 @@ def normalize_newlines(s):
if description is None:
raise ValueError("description is required for RM subtool")

if chat_id is None:
raise ValueError("chat_id is required for RM subtool")
return await rm_file(path, description, chat_id)

if subtool == "Think":
Expand All @@ -317,14 +333,27 @@ def normalize_newlines(s):
if mode is None:
raise ValueError("mode is required for Chmod subtool")

result = await chmod(path, mode, chat_id)
if chat_id is None:
raise ValueError("chat_id is required for Chmod subtool")

# Ensure mode is one of the valid literals
if mode not in ["a+x", "a-x"]:
raise ValueError("mode must be either 'a+x' or 'a-x' for Chmod subtool")

from typing import Literal, cast

chmod_mode = cast(Literal["a+x", "a-x"], mode)
result = await chmod(path, chmod_mode, chat_id)
return result.get("resultForAssistant", "Chmod operation completed")
except Exception:
logging.error("Exception", exc_info=True)
raise

# This should never be reached, but adding for type safety
return "Unknown subtool or operation"


def configure_logging(log_file="codemcp.log"):
def configure_logging(log_file: str = "codemcp.log") -> None:
"""Configure logging to write to both a file and the console.

The log level is determined from the configuration file.
Expand Down Expand Up @@ -388,7 +417,7 @@ def configure_logging(log_file="codemcp.log"):

# Set up filter to exclude logs from 'mcp' module unless in debug mode
class ModuleFilter(logging.Filter):
def filter(self, record):
def filter(self, record: logging.LogRecord) -> bool:
# Allow all logs in debug mode, otherwise filter 'mcp' module
if debug_mode or not record.name.startswith("mcp"):
return True
Expand Down Expand Up @@ -478,7 +507,7 @@ def init_codemcp_project(path: str) -> str:

@click.group(invoke_without_command=True)
@click.pass_context
def cli(ctx):
def cli(ctx: click.Context) -> None:
"""CodeMCP: Command-line interface for MCP server and project management."""
# If no subcommand is provided, run the MCP server (for backwards compatibility)
if ctx.invoked_subcommand is None:
Expand All @@ -487,13 +516,13 @@ def cli(ctx):

@cli.command()
@click.argument("path", type=click.Path(), default=".")
def init(path):
def init(path: str) -> None:
"""Initialize a new codemcp project with an empty codemcp.toml file and git repository."""
result = init_codemcp_project(path)
click.echo(result)


def run():
def run() -> None:
"""Run the MCP server."""
configure_logging()
mcp.run()
37 changes: 29 additions & 8 deletions codemcp/multi_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,57 @@
mcp = FastMCP("codemcp_multi")


# Helper function to get a chat_id from a Context
def get_chat_id_from_context(ctx: Context) -> str:
# Generate a chat_id from the context
ctx_id = getattr(ctx, "id", None)
return f"multi-{ctx_id}" if ctx_id else "multi-default"


@mcp.tool()
async def read_file(
ctx: Context, file_path: str, offset: int = None, limit: int = None
ctx: Context, file_path: str, offset: int | None = None, limit: int | None = None
) -> str:
return await read_file_content(file_path, offset, limit)
# Get chat ID from context
chat_id = get_chat_id_from_context(ctx)
return await read_file_content(file_path, offset, limit, chat_id)


@mcp.tool()
async def write_file(
ctx: Context, file_path: str, content: str, description: str
) -> str:
return await write_file_content(file_path, content, description)
# Get chat ID from context
chat_id = get_chat_id_from_context(ctx)
return await write_file_content(file_path, content, description, chat_id)


@mcp.tool()
async def edit_file(
ctx: Context, file_path: str, old_string: str, new_string: str, description: str
) -> str:
return await edit_file_content(file_path, old_string, new_string, None, description)
# Get chat ID from context
chat_id = get_chat_id_from_context(ctx)
return await edit_file_content(
file_path, old_string, new_string, None, description, chat_id
)


@mcp.tool()
async def ls(ctx: Context, file_path: str) -> str:
return await ls_directory(file_path)
# Get chat ID from context
chat_id = get_chat_id_from_context(ctx)
return await ls_directory(file_path, chat_id)


@mcp.tool()
async def grep(
ctx: Context, pattern: str, path: str = None, include: str = None
ctx: Context, pattern: str, path: str | None = None, include: str | None = None
) -> str:
result = grep_files(pattern, path, include)
return await result.get(
# Get chat ID from context
chat_id = get_chat_id_from_context(ctx)
result = await grep_files(pattern, path, include, chat_id)
return result.get(
"resultForAssistant", f"Found {result.get('numFiles', 0)} file(s)"
)

Expand All @@ -55,6 +74,8 @@ async def init_project_tool(
subject_line: str,
reuse_head_chat_id: bool = False,
) -> str:
# The init_project function actually doesn't accept a chat_id parameter
# It generates its own chat_id, so we don't pass it as an argument
return await init_project(file_path, user_prompt, subject_line, reuse_head_chat_id)


Expand Down
27 changes: 16 additions & 11 deletions codemcp/tools/edit_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,20 +413,20 @@ def find_similar_lines(
String containing the most similar lines, or empty string if none found

"""
search_lines = search_lines.splitlines()
content_lines = content_lines.splitlines()
search_lines_list = search_lines.splitlines()
content_lines_list = content_lines.splitlines()

# Handle empty input cases
if not search_lines or not content_lines:
if not search_lines_list or not content_lines_list:
return ""

best_ratio = 0
best_match = [] # Initialize with empty list to avoid None checks
best_match: list[str] = [] # Initialize with empty list to avoid None checks
best_match_i = 0 # Initialize to avoid unbound variable errors

for i in range(len(content_lines) - len(search_lines) + 1):
chunk = content_lines[i : i + len(search_lines)]
ratio = SequenceMatcher(None, search_lines, chunk).ratio()
for i in range(len(content_lines_list) - len(search_lines_list) + 1):
chunk = content_lines_list[i : i + len(search_lines_list)]
ratio = SequenceMatcher(None, search_lines_list, chunk).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_match = chunk
Expand All @@ -435,14 +435,19 @@ def find_similar_lines(
if best_ratio < threshold:
return ""

if best_match[0] == search_lines[0] and best_match[-1] == search_lines[-1]:
if (
best_match[0] == search_lines_list[0]
and best_match[-1] == search_lines_list[-1]
):
return "\n".join(best_match)

N = 5
best_match_end = min(len(content_lines), best_match_i + len(search_lines) + N)
best_match_end = min(
len(content_lines_list), best_match_i + len(search_lines_list) + N
)
best_match_i = max(0, best_match_i - N)

best = content_lines[best_match_i:best_match_end]
best = content_lines_list[best_match_i:best_match_end]
return "\n".join(best)


Expand Down Expand Up @@ -584,7 +589,7 @@ async def edit_file_content(
new_string: str,
read_file_timestamps: dict[str, float] | None = None,
description: str = "",
chat_id: str = None,
chat_id: str = "",
) -> str:
"""Edit a file by replacing old_string with new_string.

Expand Down
7 changes: 6 additions & 1 deletion e2e/test_chmod.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ async def test_chmod_error_handling(self):
"chat_id": chat_id,
},
)
self.assertIn("unsupported chmod mode", error_text.lower())
# Check for either error message (from main.py or chmod.py)
self.assertTrue(
"unsupported chmod mode" in error_text.lower() or
"mode must be either 'a+x' or 'a-x'" in error_text.lower(),
f"Expected an error about invalid mode, but got: {error_text}"
)


if __name__ == "__main__":
Expand Down
Loading