diff --git a/src/postgres_mcp/database_health/sequence_health_calc.py b/src/postgres_mcp/database_health/sequence_health_calc.py index 81582fa6..2fb4ebf4 100644 --- a/src/postgres_mcp/database_health/sequence_health_calc.py +++ b/src/postgres_mcp/database_health/sequence_health_calc.py @@ -1,3 +1,4 @@ +import re from dataclasses import dataclass from psycopg.sql import Identifier @@ -135,14 +136,26 @@ async def _get_sequence_metrics(self) -> list[SequenceMetrics]: return sequence_metrics def _parse_sequence_name(self, default_value: str) -> tuple[str, str]: - """Parse schema and sequence name from default value expression.""" - # Handle both formats: - # nextval('id_seq'::regclass) - # nextval(('id_seq'::text)::regclass) - - # Remove nextval and cast parts - clean_value = default_value.replace("nextval('", "").replace("'::regclass)", "") - clean_value = clean_value.replace("('", "").replace("'::text)", "") + """Parse schema and sequence name from default value expression. + + Handles formats like: + - nextval('id_seq'::regclass) + - nextval(('id_seq'::text)::regclass) + - nextval('"UpperCaseSeq"'::regclass) + - nextval('"Schema"."Seq"'::regclass) + + Note: Sequence names containing literal dots (e.g., "my.seq") are not + supported and will be incorrectly parsed as schema.name. + """ + # Extract the sequence reference from inside the single quotes + # Handles both nextval('...') and nextval(('...'::text)::regclass) + match = re.search(r"nextval\(\(?'([^']+)'", default_value) + if not match: + return "public", "" + + clean_value = match.group(1) + # Remove quotes so sql.Identifier can add them correctly + clean_value = clean_value.replace('"', "") # Split into schema and sequence parts = clean_value.split(".") diff --git a/tests/unit/database_health/test_database_health_tool.py b/tests/unit/database_health/test_database_health_tool.py index 0ac79bf3..54b1ee1b 100644 --- a/tests/unit/database_health/test_database_health_tool.py +++ b/tests/unit/database_health/test_database_health_tool.py @@ -22,6 +22,7 @@ async def setup_test_tables(sql_driver): async with conn_pool.connection() as conn: # Drop existing tables if they exist await conn.execute("DROP TABLE IF EXISTS test_orders") + await conn.execute('DROP TABLE IF EXISTS "UpperCaseOrders"') await conn.execute("DROP TABLE IF EXISTS test_customers") await conn.execute("DROP SEQUENCE IF EXISTS test_seq") @@ -40,6 +41,7 @@ async def setup_test_tables(sql_driver): """ ) + # Lowercase table name (original test case) await conn.execute( """ CREATE TABLE test_orders ( @@ -52,6 +54,18 @@ async def setup_test_tables(sql_driver): """ ) + # Uppercase table name to test quoted identifier handling (PR #94 fix) + await conn.execute( + """ + CREATE TABLE "UpperCaseOrders" ( + id SERIAL PRIMARY KEY, + customer_id INTEGER REFERENCES test_customers(id), + amount DECIMAL NOT NULL, + created_at TIMESTAMP DEFAULT NOW() + ) + """ + ) + # Create some indexes to test index health await conn.execute( """ @@ -101,9 +115,21 @@ async def setup_test_tables(sql_driver): """ ) + # Insert data into uppercase table to test sequence handling + await conn.execute( + """ + INSERT INTO "UpperCaseOrders" (customer_id, amount) + SELECT + (random() * 99)::int + 1, + (random() * 500)::decimal + FROM generate_series(1, 100) i + """ + ) + # Run ANALYZE to update statistics await conn.execute("ANALYZE test_customers") await conn.execute("ANALYZE test_orders") + await conn.execute('ANALYZE "UpperCaseOrders"') async def cleanup_test_tables(sql_driver): @@ -112,6 +138,7 @@ async def cleanup_test_tables(sql_driver): try: async with conn_pool.connection() as conn: await conn.execute("DROP TABLE IF EXISTS test_orders") + await conn.execute('DROP TABLE IF EXISTS "UpperCaseOrders"') await conn.execute("DROP TABLE IF EXISTS test_customers") await conn.execute("DROP SEQUENCE IF EXISTS test_seq") finally: diff --git a/tests/unit/database_health/test_sequence_health_calc.py b/tests/unit/database_health/test_sequence_health_calc.py new file mode 100644 index 00000000..a108a8c0 --- /dev/null +++ b/tests/unit/database_health/test_sequence_health_calc.py @@ -0,0 +1,75 @@ +"""Unit tests for SequenceHealthCalc, specifically _parse_sequence_name parsing.""" + +import pytest + +from postgres_mcp.database_health.sequence_health_calc import SequenceHealthCalc + + +class TestParseSequenceName: + """Tests for _parse_sequence_name method.""" + + @pytest.fixture + def calc(self): + """Create a SequenceHealthCalc instance with a mock driver.""" + # We only need the instance for calling _parse_sequence_name + # which doesn't use the sql_driver + return SequenceHealthCalc(sql_driver=None) # type: ignore[arg-type] + + def test_simple_sequence(self, calc): + """Parse simple unquoted sequence name.""" + schema, name = calc._parse_sequence_name("nextval('my_seq'::regclass)") + assert schema == "public" + assert name == "my_seq" + + def test_sequence_with_schema(self, calc): + """Parse sequence with explicit schema.""" + schema, name = calc._parse_sequence_name("nextval('myschema.my_seq'::regclass)") + assert schema == "myschema" + assert name == "my_seq" + + def test_uppercase_sequence(self, calc): + """Parse uppercase sequence name (quoted in PostgreSQL).""" + schema, name = calc._parse_sequence_name("nextval('\"UpperCaseSeq\"'::regclass)") + assert schema == "public" + assert name == "UpperCaseSeq" + + def test_uppercase_with_schema(self, calc): + """Parse uppercase sequence with schema.""" + schema, name = calc._parse_sequence_name('nextval(\'"MySchema"."MySeq"\'::regclass)') + assert schema == "MySchema" + assert name == "MySeq" + + def test_text_cast_format(self, calc): + """Parse sequence with text cast format.""" + schema, name = calc._parse_sequence_name("nextval(('my_seq'::text)::regclass)") + assert schema == "public" + assert name == "my_seq" + + def test_serial_sequence_naming(self, calc): + """Parse auto-generated SERIAL sequence name.""" + schema, name = calc._parse_sequence_name("nextval('users_id_seq'::regclass)") + assert schema == "public" + assert name == "users_id_seq" + + def test_uppercase_table_serial(self, calc): + """Parse sequence from uppercase table with SERIAL column.""" + # PostgreSQL generates: "TableName_column_seq" + schema, name = calc._parse_sequence_name("nextval('\"UpperCaseOrders_id_seq\"'::regclass)") + assert schema == "public" + assert name == "UpperCaseOrders_id_seq" + + # Known limitation: sequence names containing dots + # This test documents the current behavior (which is incorrect for this edge case) + @pytest.mark.xfail(reason="Known limitation: dots in sequence names not supported") + def test_sequence_name_with_dot(self, calc): + """Sequence names containing dots are not correctly parsed. + + This is a known limitation. In PostgreSQL, you can have a sequence + named "my.seq" (with a literal dot), which would be stored as: + nextval('"my.seq"'::regclass) + + The current parser incorrectly splits this as schema="my", name="seq". + """ + schema, name = calc._parse_sequence_name("nextval('\"my.seq\"'::regclass)") + assert schema == "public" + assert name == "my.seq"