Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions src/postgres_mcp/database_health/sequence_health_calc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from dataclasses import dataclass

from psycopg.sql import Identifier
Expand Down Expand Up @@ -135,14 +136,26 @@ async def _get_sequence_metrics(self) -> list[SequenceMetrics]:
return sequence_metrics

def _parse_sequence_name(self, default_value: str) -> tuple[str, str]:
"""Parse schema and sequence name from default value expression."""
# Handle both formats:
# nextval('id_seq'::regclass)
# nextval(('id_seq'::text)::regclass)

# Remove nextval and cast parts
clean_value = default_value.replace("nextval('", "").replace("'::regclass)", "")
clean_value = clean_value.replace("('", "").replace("'::text)", "")
"""Parse schema and sequence name from default value expression.

Handles formats like:
- nextval('id_seq'::regclass)
- nextval(('id_seq'::text)::regclass)
- nextval('"UpperCaseSeq"'::regclass)
- nextval('"Schema"."Seq"'::regclass)

Note: Sequence names containing literal dots (e.g., "my.seq") are not
supported and will be incorrectly parsed as schema.name.
"""
# Extract the sequence reference from inside the single quotes
# Handles both nextval('...') and nextval(('...'::text)::regclass)
match = re.search(r"nextval\(\(?'([^']+)'", default_value)
if not match:
return "public", ""

clean_value = match.group(1)
# Remove quotes so sql.Identifier can add them correctly
clean_value = clean_value.replace('"', "")

# Split into schema and sequence
parts = clean_value.split(".")
Expand Down
27 changes: 27 additions & 0 deletions tests/unit/database_health/test_database_health_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async def setup_test_tables(sql_driver):
async with conn_pool.connection() as conn:
# Drop existing tables if they exist
await conn.execute("DROP TABLE IF EXISTS test_orders")
await conn.execute('DROP TABLE IF EXISTS "UpperCaseOrders"')
await conn.execute("DROP TABLE IF EXISTS test_customers")
await conn.execute("DROP SEQUENCE IF EXISTS test_seq")

Expand All @@ -40,6 +41,7 @@ async def setup_test_tables(sql_driver):
"""
)

# Lowercase table name (original test case)
await conn.execute(
"""
CREATE TABLE test_orders (
Expand All @@ -52,6 +54,18 @@ async def setup_test_tables(sql_driver):
"""
)

# Uppercase table name to test quoted identifier handling (PR #94 fix)
await conn.execute(
"""
CREATE TABLE "UpperCaseOrders" (
id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES test_customers(id),
amount DECIMAL NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
)
"""
)

# Create some indexes to test index health
await conn.execute(
"""
Expand Down Expand Up @@ -101,9 +115,21 @@ async def setup_test_tables(sql_driver):
"""
)

# Insert data into uppercase table to test sequence handling
await conn.execute(
"""
INSERT INTO "UpperCaseOrders" (customer_id, amount)
SELECT
(random() * 99)::int + 1,
(random() * 500)::decimal
FROM generate_series(1, 100) i
"""
)

# Run ANALYZE to update statistics
await conn.execute("ANALYZE test_customers")
await conn.execute("ANALYZE test_orders")
await conn.execute('ANALYZE "UpperCaseOrders"')


async def cleanup_test_tables(sql_driver):
Expand All @@ -112,6 +138,7 @@ async def cleanup_test_tables(sql_driver):
try:
async with conn_pool.connection() as conn:
await conn.execute("DROP TABLE IF EXISTS test_orders")
await conn.execute('DROP TABLE IF EXISTS "UpperCaseOrders"')
await conn.execute("DROP TABLE IF EXISTS test_customers")
await conn.execute("DROP SEQUENCE IF EXISTS test_seq")
finally:
Expand Down
75 changes: 75 additions & 0 deletions tests/unit/database_health/test_sequence_health_calc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Unit tests for SequenceHealthCalc, specifically _parse_sequence_name parsing."""

import pytest

from postgres_mcp.database_health.sequence_health_calc import SequenceHealthCalc


class TestParseSequenceName:
"""Tests for _parse_sequence_name method."""

@pytest.fixture
def calc(self):
"""Create a SequenceHealthCalc instance with a mock driver."""
# We only need the instance for calling _parse_sequence_name
# which doesn't use the sql_driver
return SequenceHealthCalc(sql_driver=None) # type: ignore[arg-type]

def test_simple_sequence(self, calc):
"""Parse simple unquoted sequence name."""
schema, name = calc._parse_sequence_name("nextval('my_seq'::regclass)")
assert schema == "public"
assert name == "my_seq"

def test_sequence_with_schema(self, calc):
"""Parse sequence with explicit schema."""
schema, name = calc._parse_sequence_name("nextval('myschema.my_seq'::regclass)")
assert schema == "myschema"
assert name == "my_seq"

def test_uppercase_sequence(self, calc):
"""Parse uppercase sequence name (quoted in PostgreSQL)."""
schema, name = calc._parse_sequence_name("nextval('\"UpperCaseSeq\"'::regclass)")
assert schema == "public"
assert name == "UpperCaseSeq"

def test_uppercase_with_schema(self, calc):
"""Parse uppercase sequence with schema."""
schema, name = calc._parse_sequence_name('nextval(\'"MySchema"."MySeq"\'::regclass)')
assert schema == "MySchema"
assert name == "MySeq"

def test_text_cast_format(self, calc):
"""Parse sequence with text cast format."""
schema, name = calc._parse_sequence_name("nextval(('my_seq'::text)::regclass)")
assert schema == "public"
assert name == "my_seq"

def test_serial_sequence_naming(self, calc):
"""Parse auto-generated SERIAL sequence name."""
schema, name = calc._parse_sequence_name("nextval('users_id_seq'::regclass)")
assert schema == "public"
assert name == "users_id_seq"

def test_uppercase_table_serial(self, calc):
"""Parse sequence from uppercase table with SERIAL column."""
# PostgreSQL generates: "TableName_column_seq"
schema, name = calc._parse_sequence_name("nextval('\"UpperCaseOrders_id_seq\"'::regclass)")
assert schema == "public"
assert name == "UpperCaseOrders_id_seq"

# Known limitation: sequence names containing dots
# This test documents the current behavior (which is incorrect for this edge case)
@pytest.mark.xfail(reason="Known limitation: dots in sequence names not supported")
def test_sequence_name_with_dot(self, calc):
"""Sequence names containing dots are not correctly parsed.

This is a known limitation. In PostgreSQL, you can have a sequence
named "my.seq" (with a literal dot), which would be stored as:
nextval('"my.seq"'::regclass)

The current parser incorrectly splits this as schema="my", name="seq".
"""
schema, name = calc._parse_sequence_name("nextval('\"my.seq\"'::regclass)")
assert schema == "public"
assert name == "my.seq"