Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions servers/mcp-neo4j-cypher/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

### Fixed
* Fix bug in Dockerfile where build would fail due to `LABEL` statement coming before `FROM` statement
* Fix n8n HTTP MCP compatibility issue by filtering extra parameters (sessionId, action, chatInput, toolCallId) sent by n8n before Pydantic validation
* Fix conftest.py port type issue for testcontainers compatibility

### Changed
* Upgrade FastMCP from 2.10.5 to 2.13.1 to get latest bug fixes and improvements

### Added
* Add comprehensive n8n compatibility integration tests to ensure tool calls work with extra metadata parameters

## v0.5.1

Expand Down
2 changes: 1 addition & 1 deletion servers/mcp-neo4j-cypher/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "A simple Neo4j MCP server"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"fastmcp>=2.10.5",
"fastmcp>=2.13.1",
"neo4j>=5.26.0",
"pydantic>=2.10.1",
"tiktoken>=0.11.0",
Expand Down
104 changes: 103 additions & 1 deletion servers/mcp-neo4j-cypher/src/mcp_neo4j_cypher/server.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import inspect
import json
import logging
import re
from typing import Any, Literal, Optional

from fastmcp.exceptions import ToolError
from fastmcp.server import FastMCP
from fastmcp.tools.tool import TextContent, ToolResult
from fastmcp.tools.tool import FunctionTool, TextContent, ToolResult
from fastmcp.utilities.types import find_kwarg_by_type, get_cached_typeadapter
from mcp.types import ToolAnnotations
from neo4j import AsyncDriver, AsyncGraphDatabase, Query, RoutingControl
from neo4j.exceptions import ClientError, Neo4jError
Expand All @@ -19,6 +21,106 @@
logger = logging.getLogger("mcp_neo4j_cypher")


# Monkey-patch FunctionTool.run to filter out extra arguments before validation
# This fixes compatibility with n8n and other MCP clients that send extra metadata fields
_original_function_tool_run = FunctionTool.run


async def _patched_function_tool_run(self, arguments: dict[str, Any]) -> ToolResult:
"""
Patched version of FunctionTool.run that filters out unknown arguments
before validation to support clients that send extra metadata fields.
"""
from fastmcp.server.context import Context

arguments = arguments.copy()

# Get the function signature to determine valid parameters
sig = inspect.signature(self.fn)
valid_params = set(sig.parameters.keys())

# Filter out arguments that are not in the function signature
filtered_arguments = {k: v for k, v in arguments.items() if k in valid_params}

# Add context if needed
context_kwarg = find_kwarg_by_type(self.fn, kwarg_type=Context)
if context_kwarg and context_kwarg not in filtered_arguments:
from fastmcp.server.context import get_context
filtered_arguments[context_kwarg] = get_context()

# Validate with filtered arguments
type_adapter = get_cached_typeadapter(self.fn)
result = type_adapter.validate_python(filtered_arguments)

if inspect.isawaitable(result):
result = await result

if isinstance(result, ToolResult):
return result

# The rest is handled by the original method's logic
# We'll call the original method from here with filtered arguments
# But since we can't easily call the rest of the original method,
# we need to duplicate the remaining logic

from fastmcp.tools.tool import _convert_to_content
from mcp.types import Audio, ContentBlock, File, Image

unstructured_result = _convert_to_content(result, serializer=self.serializer)

if self.output_schema is None:
# Do not produce a structured output for MCP Content Types
if isinstance(result, ContentBlock | Audio | Image | File) or (
isinstance(result, list | tuple)
and any(isinstance(item, ContentBlock) for item in result)
):
return ToolResult(content=unstructured_result)

# Otherwise, try to serialize the result as a dict
try:
structured_result = self.serializer(result)
return ToolResult(
content=unstructured_result, structured_content=structured_result
)
except Exception as e:
logger.debug(f"Could not serialize result as structured content: {e}")
return ToolResult(content=unstructured_result)

# If we have an output schema, validate structured content
from pydantic import TypeAdapter, ValidationError

if isinstance(result, dict):
structured_result = result
else:
try:
structured_result = self.serializer(result)
except Exception as e:
raise ToolError(
f"Tool {self.name!r} has an output schema but could not serialize result: {e}"
)

if structured_result is None:
raise ToolError(
f"Tool {self.name!r} has an output schema but returned no structured content"
)

try:
output_adapter = TypeAdapter(dict[str, Any])
output_adapter.validate_python(
structured_result, context={"schema": self.output_schema}
)
except ValidationError as e:
raise ToolError(
f"Tool {self.name!r} returned structured content that does not match its output schema: {e}"
)

return ToolResult(content=unstructured_result, structured_content=structured_result)


# Apply the monkey-patch
FunctionTool.run = _patched_function_tool_run


def _format_namespace(namespace: str) -> str:
if namespace:
if namespace.endswith("-"):
Expand Down
2 changes: 1 addition & 1 deletion servers/mcp-neo4j-cypher/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def remove_container():
request.addfinalizer(remove_container)
os.environ["NEO4J_URI"] = neo4j.get_connection_url()
os.environ["NEO4J_HOST"] = neo4j.get_container_host_ip()
os.environ["NEO4J_PORT"] = neo4j.get_exposed_port(7687)
os.environ["NEO4J_PORT"] = str(neo4j.get_exposed_port(7687))

yield neo4j

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
"""Integration tests for n8n compatibility with extra parameters."""

import json
import uuid

import aiohttp
import pytest


async def parse_sse_response(response: aiohttp.ClientResponse) -> dict:
"""Parse Server-Sent Events response from FastMCP 2.0."""
content = await response.text()
lines = content.strip().split("\n")

# Find the data line that contains the JSON
for line in lines:
if line.startswith("data: "):
json_str = line[6:] # Remove 'data: ' prefix
return json.loads(json_str)

raise ValueError("No data line found in SSE response")


@pytest.mark.asyncio
async def test_http_get_schema_with_extra_parameters(http_server):
"""Test that get_neo4j_schema works with extra parameters like n8n sends."""
session_id = str(uuid.uuid4())
async with aiohttp.ClientSession() as session:
# Simulate n8n sending extra parameters
async with session.post(
"http://127.0.0.1:8001/mcp/",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_neo4j_schema",
"arguments": {
"sample_size": 100,
# Extra parameters that n8n might send
"sessionId": "197cdce49d30476c8aa3b74bb65aec4a",
"action": "sendMessage",
"chatInput": "what cloudflare dns records are available",
"toolCallId": "276a9908-a157-4eb3-bdf9-044c506220d4",
},
},
},
headers={
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json",
"mcp-session-id": session_id,
},
) as response:
print(f"Response status: {response.status}")
response_text = await response.text()
print(f"Response text: {response_text}")

assert response.status == 200
result = await parse_sse_response(response)

# Should succeed despite extra parameters
assert "result" in result
assert "content" in result["result"]
assert len(result["result"]["content"]) > 0

# Check that the response is valid JSON schema
content_text = result["result"]["content"][0]["text"]
schema = json.loads(content_text)
assert isinstance(schema, dict)


@pytest.mark.asyncio
async def test_http_read_cypher_with_extra_parameters(http_server):
"""Test that read_neo4j_cypher works with extra parameters."""
session_id = str(uuid.uuid4())
async with aiohttp.ClientSession() as session:
# Simulate n8n sending extra parameters
async with session.post(
"http://127.0.0.1:8001/mcp/",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "read_neo4j_cypher",
"arguments": {
"query": "MATCH (n) RETURN count(n) as total",
# Extra parameters that n8n might send
"sessionId": "test-session-id",
"action": "executeTool",
"chatInput": "show me the count",
"toolCallId": "tool-call-123",
},
},
},
headers={
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json",
"mcp-session-id": session_id,
},
) as response:
print(f"Response status: {response.status}")
response_text = await response.text()
print(f"Response text: {response_text}")

assert response.status == 200
result = await parse_sse_response(response)

# Should succeed despite extra parameters
assert "result" in result
assert "content" in result["result"]
assert len(result["result"]["content"]) > 0

# Check that the response is valid JSON
content_text = result["result"]["content"][0]["text"]
data = json.loads(content_text)
assert isinstance(data, list)


@pytest.mark.asyncio
async def test_http_write_cypher_with_extra_parameters(http_server):
"""Test that write_neo4j_cypher works with extra parameters."""
session_id = str(uuid.uuid4())
async with aiohttp.ClientSession() as session:
# Simulate n8n sending extra parameters
async with session.post(
"http://127.0.0.1:8001/mcp/",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "write_neo4j_cypher",
"arguments": {
"query": "CREATE (n:N8NTest {name: 'test'}) RETURN n",
# Extra parameters that n8n might send
"sessionId": "write-session-id",
"action": "writeData",
"chatInput": "create a test node",
"toolCallId": "write-call-456",
"timestamp": "2024-01-01T00:00:00Z",
},
},
},
headers={
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json",
"mcp-session-id": session_id,
},
) as response:
print(f"Response status: {response.status}")
response_text = await response.text()
print(f"Response text: {response_text}")

assert response.status == 200
result = await parse_sse_response(response)

# Should succeed despite extra parameters
assert "result" in result
assert "content" in result["result"]
assert len(result["result"]["content"]) > 0

# Check that the response contains counter information
content_text = result["result"]["content"][0]["text"]
counters = json.loads(content_text)
assert isinstance(counters, dict)


@pytest.mark.asyncio
async def test_http_query_with_params_and_extra_parameters(http_server):
"""Test that tools work with both valid params and extra parameters."""
session_id = str(uuid.uuid4())
async with aiohttp.ClientSession() as session:
# First create a node
await session.post(
"http://127.0.0.1:8001/mcp/",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "write_neo4j_cypher",
"arguments": {
"query": "CREATE (n:ParamTest {name: $name, value: $value})",
"params": {"name": "test", "value": 42},
},
},
},
headers={
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json",
"mcp-session-id": session_id,
},
)

# Now query with params and extra parameters
async with session.post(
"http://127.0.0.1:8001/mcp/",
json={
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "read_neo4j_cypher",
"arguments": {
"query": "MATCH (n:ParamTest {name: $name}) RETURN n.value as value",
"params": {"name": "test"},
# Extra parameters from n8n
"sessionId": "param-session",
"action": "queryData",
"toolCallId": "param-call-789",
},
},
},
headers={
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json",
"mcp-session-id": session_id,
},
) as response:
assert response.status == 200
result = await parse_sse_response(response)

# Should succeed and return the correct value
assert "result" in result
content_text = result["result"]["content"][0]["text"]
data = json.loads(content_text)
assert isinstance(data, list)
assert len(data) == 1
assert data[0]["value"] == 42
Loading