From c07b0e3d85b1fb4d0ec150f0b65036dafa76169f Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Tue, 15 Jul 2025 11:32:45 -0700 Subject: [PATCH 1/5] Fix escaping sequence names in health check --- .../database_health/sequence_health_calc.py | 3 +++ .../test_database_health_tool.py | 18 +++++++++--------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/postgres_mcp/database_health/sequence_health_calc.py b/src/postgres_mcp/database_health/sequence_health_calc.py index 81582fa6..365b7d1a 100644 --- a/src/postgres_mcp/database_health/sequence_health_calc.py +++ b/src/postgres_mcp/database_health/sequence_health_calc.py @@ -143,6 +143,9 @@ def _parse_sequence_name(self, default_value: str) -> tuple[str, str]: # Remove nextval and cast parts clean_value = default_value.replace("nextval('", "").replace("'::regclass)", "") clean_value = clean_value.replace("('", "").replace("'::text)", "") + # We're going to feed these to sql.Identifier, so we need to remove quotes, or else + # they will be double quoted. + clean_value = clean_value.replace('"', "") # Split into schema and sequence parts = clean_value.split(".") diff --git a/tests/unit/database_health/test_database_health_tool.py b/tests/unit/database_health/test_database_health_tool.py index 0ac79bf3..a5205bdf 100644 --- a/tests/unit/database_health/test_database_health_tool.py +++ b/tests/unit/database_health/test_database_health_tool.py @@ -21,7 +21,7 @@ async def setup_test_tables(sql_driver): conn_pool = await pool_wrapper.pool_connect() async with conn_pool.connection() as conn: # Drop existing tables if they exist - await conn.execute("DROP TABLE IF EXISTS test_orders") + await conn.execute("DROP TABLE IF EXISTS \"TestOrders\"") await conn.execute("DROP TABLE IF EXISTS test_customers") await conn.execute("DROP SEQUENCE IF EXISTS test_seq") @@ -42,7 +42,7 @@ async def setup_test_tables(sql_driver): await conn.execute( """ - CREATE TABLE test_orders ( + CREATE TABLE "TestOrders" ( id SERIAL PRIMARY KEY, customer_id INTEGER REFERENCES test_customers(id), total DECIMAL NOT NULL CHECK (total >= 0), @@ -55,23 +55,23 @@ async def setup_test_tables(sql_driver): # Create some indexes to test index health await conn.execute( """ - CREATE INDEX idx_orders_customer ON test_orders(customer_id) + CREATE INDEX idx_orders_customer ON "TestOrders"(customer_id) """ ) await conn.execute( """ - CREATE INDEX idx_orders_status ON test_orders(status) + CREATE INDEX idx_orders_status ON "TestOrders"(status) """ ) await conn.execute( """ - CREATE INDEX idx_orders_created ON test_orders(created_at) + CREATE INDEX idx_orders_created ON "TestOrders"(created_at) """ ) # Create a duplicate index to test duplicate index detection await conn.execute( """ - CREATE INDEX idx_orders_customer_dup ON test_orders(customer_id) + CREATE INDEX idx_orders_customer_dup ON "TestOrders"(customer_id) """ ) @@ -88,7 +88,7 @@ async def setup_test_tables(sql_driver): await conn.execute( """ - INSERT INTO test_orders (customer_id, total, status) + INSERT INTO "TestOrders" (customer_id, total, status) SELECT (random() * 99)::int + 1, -- Changed to ensure IDs are between 1 and 100 (random() * 1000)::decimal, @@ -103,7 +103,7 @@ async def setup_test_tables(sql_driver): # Run ANALYZE to update statistics await conn.execute("ANALYZE test_customers") - await conn.execute("ANALYZE test_orders") + await conn.execute("ANALYZE \"TestOrders\"") async def cleanup_test_tables(sql_driver): @@ -111,7 +111,7 @@ async def cleanup_test_tables(sql_driver): conn_pool = await pool_wrapper.pool_connect() try: async with conn_pool.connection() as conn: - await conn.execute("DROP TABLE IF EXISTS test_orders") + await conn.execute("DROP TABLE IF EXISTS \"TestOrders\"") await conn.execute("DROP TABLE IF EXISTS test_customers") await conn.execute("DROP SEQUENCE IF EXISTS test_seq") finally: From c837881f48425cfc8f538d2ed1be69d115a17f4f Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Mon, 19 Jan 2026 18:14:03 +0000 Subject: [PATCH 2/5] Improve test coverage for quoted sequence name fix - Restore original lowercase test_orders table - Add separate UpperCaseOrders table to test quoted identifier handling - Add unit tests for _parse_sequence_name covering various formats - Document known limitation: sequence names with dots not supported Co-Authored-By: Claude Opus 4.5 --- .../database_health/sequence_health_calc.py | 17 +++-- .../test_database_health_tool.py | 45 ++++++++--- .../test_sequence_health_calc.py | 75 +++++++++++++++++++ 3 files changed, 122 insertions(+), 15 deletions(-) create mode 100644 tests/unit/database_health/test_sequence_health_calc.py diff --git a/src/postgres_mcp/database_health/sequence_health_calc.py b/src/postgres_mcp/database_health/sequence_health_calc.py index 365b7d1a..74a38dda 100644 --- a/src/postgres_mcp/database_health/sequence_health_calc.py +++ b/src/postgres_mcp/database_health/sequence_health_calc.py @@ -135,16 +135,21 @@ async def _get_sequence_metrics(self) -> list[SequenceMetrics]: return sequence_metrics def _parse_sequence_name(self, default_value: str) -> tuple[str, str]: - """Parse schema and sequence name from default value expression.""" - # Handle both formats: - # nextval('id_seq'::regclass) - # nextval(('id_seq'::text)::regclass) + """Parse schema and sequence name from default value expression. + Handles formats like: + - nextval('id_seq'::regclass) + - nextval(('id_seq'::text)::regclass) + - nextval('"UpperCaseSeq"'::regclass) + - nextval('"Schema"."Seq"'::regclass) + + Note: Sequence names containing literal dots (e.g., "my.seq") are not + supported and will be incorrectly parsed as schema.name. + """ # Remove nextval and cast parts clean_value = default_value.replace("nextval('", "").replace("'::regclass)", "") clean_value = clean_value.replace("('", "").replace("'::text)", "") - # We're going to feed these to sql.Identifier, so we need to remove quotes, or else - # they will be double quoted. + # Remove quotes so sql.Identifier can add them correctly clean_value = clean_value.replace('"', "") # Split into schema and sequence diff --git a/tests/unit/database_health/test_database_health_tool.py b/tests/unit/database_health/test_database_health_tool.py index a5205bdf..0a8454c7 100644 --- a/tests/unit/database_health/test_database_health_tool.py +++ b/tests/unit/database_health/test_database_health_tool.py @@ -21,7 +21,8 @@ async def setup_test_tables(sql_driver): conn_pool = await pool_wrapper.pool_connect() async with conn_pool.connection() as conn: # Drop existing tables if they exist - await conn.execute("DROP TABLE IF EXISTS \"TestOrders\"") + await conn.execute("DROP TABLE IF EXISTS test_orders") + await conn.execute("DROP TABLE IF EXISTS \"UpperCaseOrders\"") await conn.execute("DROP TABLE IF EXISTS test_customers") await conn.execute("DROP SEQUENCE IF EXISTS test_seq") @@ -40,9 +41,10 @@ async def setup_test_tables(sql_driver): """ ) + # Lowercase table name (original test case) await conn.execute( """ - CREATE TABLE "TestOrders" ( + CREATE TABLE test_orders ( id SERIAL PRIMARY KEY, customer_id INTEGER REFERENCES test_customers(id), total DECIMAL NOT NULL CHECK (total >= 0), @@ -52,26 +54,38 @@ async def setup_test_tables(sql_driver): """ ) + # Uppercase table name to test quoted identifier handling (PR #94 fix) + await conn.execute( + """ + CREATE TABLE "UpperCaseOrders" ( + id SERIAL PRIMARY KEY, + customer_id INTEGER REFERENCES test_customers(id), + amount DECIMAL NOT NULL, + created_at TIMESTAMP DEFAULT NOW() + ) + """ + ) + # Create some indexes to test index health await conn.execute( """ - CREATE INDEX idx_orders_customer ON "TestOrders"(customer_id) + CREATE INDEX idx_orders_customer ON test_orders(customer_id) """ ) await conn.execute( """ - CREATE INDEX idx_orders_status ON "TestOrders"(status) + CREATE INDEX idx_orders_status ON test_orders(status) """ ) await conn.execute( """ - CREATE INDEX idx_orders_created ON "TestOrders"(created_at) + CREATE INDEX idx_orders_created ON test_orders(created_at) """ ) # Create a duplicate index to test duplicate index detection await conn.execute( """ - CREATE INDEX idx_orders_customer_dup ON "TestOrders"(customer_id) + CREATE INDEX idx_orders_customer_dup ON test_orders(customer_id) """ ) @@ -88,7 +102,7 @@ async def setup_test_tables(sql_driver): await conn.execute( """ - INSERT INTO "TestOrders" (customer_id, total, status) + INSERT INTO test_orders (customer_id, total, status) SELECT (random() * 99)::int + 1, -- Changed to ensure IDs are between 1 and 100 (random() * 1000)::decimal, @@ -101,9 +115,21 @@ async def setup_test_tables(sql_driver): """ ) + # Insert data into uppercase table to test sequence handling + await conn.execute( + """ + INSERT INTO "UpperCaseOrders" (customer_id, amount) + SELECT + (random() * 99)::int + 1, + (random() * 500)::decimal + FROM generate_series(1, 100) i + """ + ) + # Run ANALYZE to update statistics await conn.execute("ANALYZE test_customers") - await conn.execute("ANALYZE \"TestOrders\"") + await conn.execute("ANALYZE test_orders") + await conn.execute("ANALYZE \"UpperCaseOrders\"") async def cleanup_test_tables(sql_driver): @@ -111,7 +137,8 @@ async def cleanup_test_tables(sql_driver): conn_pool = await pool_wrapper.pool_connect() try: async with conn_pool.connection() as conn: - await conn.execute("DROP TABLE IF EXISTS \"TestOrders\"") + await conn.execute("DROP TABLE IF EXISTS test_orders") + await conn.execute("DROP TABLE IF EXISTS \"UpperCaseOrders\"") await conn.execute("DROP TABLE IF EXISTS test_customers") await conn.execute("DROP SEQUENCE IF EXISTS test_seq") finally: diff --git a/tests/unit/database_health/test_sequence_health_calc.py b/tests/unit/database_health/test_sequence_health_calc.py new file mode 100644 index 00000000..4db4e5e3 --- /dev/null +++ b/tests/unit/database_health/test_sequence_health_calc.py @@ -0,0 +1,75 @@ +"""Unit tests for SequenceHealthCalc, specifically _parse_sequence_name parsing.""" + +import pytest + +from postgres_mcp.database_health.sequence_health_calc import SequenceHealthCalc + + +class TestParseSequenceName: + """Tests for _parse_sequence_name method.""" + + @pytest.fixture + def calc(self): + """Create a SequenceHealthCalc instance with a mock driver.""" + # We only need the instance for calling _parse_sequence_name + # which doesn't use the sql_driver + return SequenceHealthCalc(sql_driver=None) + + def test_simple_sequence(self, calc): + """Parse simple unquoted sequence name.""" + schema, name = calc._parse_sequence_name("nextval('my_seq'::regclass)") + assert schema == "public" + assert name == "my_seq" + + def test_sequence_with_schema(self, calc): + """Parse sequence with explicit schema.""" + schema, name = calc._parse_sequence_name("nextval('myschema.my_seq'::regclass)") + assert schema == "myschema" + assert name == "my_seq" + + def test_uppercase_sequence(self, calc): + """Parse uppercase sequence name (quoted in PostgreSQL).""" + schema, name = calc._parse_sequence_name("nextval('\"UpperCaseSeq\"'::regclass)") + assert schema == "public" + assert name == "UpperCaseSeq" + + def test_uppercase_with_schema(self, calc): + """Parse uppercase sequence with schema.""" + schema, name = calc._parse_sequence_name("nextval('\"MySchema\".\"MySeq\"'::regclass)") + assert schema == "MySchema" + assert name == "MySeq" + + def test_text_cast_format(self, calc): + """Parse sequence with text cast format.""" + schema, name = calc._parse_sequence_name("nextval(('my_seq'::text)::regclass)") + assert schema == "public" + assert name == "my_seq" + + def test_serial_sequence_naming(self, calc): + """Parse auto-generated SERIAL sequence name.""" + schema, name = calc._parse_sequence_name("nextval('users_id_seq'::regclass)") + assert schema == "public" + assert name == "users_id_seq" + + def test_uppercase_table_serial(self, calc): + """Parse sequence from uppercase table with SERIAL column.""" + # PostgreSQL generates: "TableName_column_seq" + schema, name = calc._parse_sequence_name("nextval('\"UpperCaseOrders_id_seq\"'::regclass)") + assert schema == "public" + assert name == "UpperCaseOrders_id_seq" + + # Known limitation: sequence names containing dots + # This test documents the current behavior (which is incorrect for this edge case) + @pytest.mark.xfail(reason="Known limitation: dots in sequence names not supported") + def test_sequence_name_with_dot(self, calc): + """Sequence names containing dots are not correctly parsed. + + This is a known limitation. In PostgreSQL, you can have a sequence + named "my.seq" (with a literal dot), which would be stored as: + nextval('"my.seq"'::regclass) + + The current parser incorrectly splits this as schema="my", name="seq". + """ + schema, name = calc._parse_sequence_name("nextval('\"my.seq\"'::regclass)") + assert schema == "public" + assert name == "my.seq" From 994c87ea6da7999a96a9803dee6d5c562df2f67e Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Mon, 19 Jan 2026 18:41:18 +0000 Subject: [PATCH 3/5] Fix nextval parsing for text-cast format Use regex to extract sequence name from nextval() expressions. The previous string replacement approach failed for the nextval(('id_seq'::text)::regclass) format. Thanks to Codex for catching this issue. Co-Authored-By: Claude Opus 4.5 --- .../database_health/sequence_health_calc.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/postgres_mcp/database_health/sequence_health_calc.py b/src/postgres_mcp/database_health/sequence_health_calc.py index 74a38dda..2fb4ebf4 100644 --- a/src/postgres_mcp/database_health/sequence_health_calc.py +++ b/src/postgres_mcp/database_health/sequence_health_calc.py @@ -1,3 +1,4 @@ +import re from dataclasses import dataclass from psycopg.sql import Identifier @@ -146,9 +147,13 @@ def _parse_sequence_name(self, default_value: str) -> tuple[str, str]: Note: Sequence names containing literal dots (e.g., "my.seq") are not supported and will be incorrectly parsed as schema.name. """ - # Remove nextval and cast parts - clean_value = default_value.replace("nextval('", "").replace("'::regclass)", "") - clean_value = clean_value.replace("('", "").replace("'::text)", "") + # Extract the sequence reference from inside the single quotes + # Handles both nextval('...') and nextval(('...'::text)::regclass) + match = re.search(r"nextval\(\(?'([^']+)'", default_value) + if not match: + return "public", "" + + clean_value = match.group(1) # Remove quotes so sql.Identifier can add them correctly clean_value = clean_value.replace('"', "") From 99edbdd45d0be3fb27a4bf13c2032a5cb6cb0196 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Mon, 19 Jan 2026 18:51:00 +0000 Subject: [PATCH 4/5] Format test files with ruff Co-Authored-By: Claude Opus 4.5 --- tests/unit/database_health/test_database_health_tool.py | 6 +++--- tests/unit/database_health/test_sequence_health_calc.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/database_health/test_database_health_tool.py b/tests/unit/database_health/test_database_health_tool.py index 0a8454c7..54b1ee1b 100644 --- a/tests/unit/database_health/test_database_health_tool.py +++ b/tests/unit/database_health/test_database_health_tool.py @@ -22,7 +22,7 @@ async def setup_test_tables(sql_driver): async with conn_pool.connection() as conn: # Drop existing tables if they exist await conn.execute("DROP TABLE IF EXISTS test_orders") - await conn.execute("DROP TABLE IF EXISTS \"UpperCaseOrders\"") + await conn.execute('DROP TABLE IF EXISTS "UpperCaseOrders"') await conn.execute("DROP TABLE IF EXISTS test_customers") await conn.execute("DROP SEQUENCE IF EXISTS test_seq") @@ -129,7 +129,7 @@ async def setup_test_tables(sql_driver): # Run ANALYZE to update statistics await conn.execute("ANALYZE test_customers") await conn.execute("ANALYZE test_orders") - await conn.execute("ANALYZE \"UpperCaseOrders\"") + await conn.execute('ANALYZE "UpperCaseOrders"') async def cleanup_test_tables(sql_driver): @@ -138,7 +138,7 @@ async def cleanup_test_tables(sql_driver): try: async with conn_pool.connection() as conn: await conn.execute("DROP TABLE IF EXISTS test_orders") - await conn.execute("DROP TABLE IF EXISTS \"UpperCaseOrders\"") + await conn.execute('DROP TABLE IF EXISTS "UpperCaseOrders"') await conn.execute("DROP TABLE IF EXISTS test_customers") await conn.execute("DROP SEQUENCE IF EXISTS test_seq") finally: diff --git a/tests/unit/database_health/test_sequence_health_calc.py b/tests/unit/database_health/test_sequence_health_calc.py index 4db4e5e3..70cc7961 100644 --- a/tests/unit/database_health/test_sequence_health_calc.py +++ b/tests/unit/database_health/test_sequence_health_calc.py @@ -35,7 +35,7 @@ def test_uppercase_sequence(self, calc): def test_uppercase_with_schema(self, calc): """Parse uppercase sequence with schema.""" - schema, name = calc._parse_sequence_name("nextval('\"MySchema\".\"MySeq\"'::regclass)") + schema, name = calc._parse_sequence_name('nextval(\'"MySchema"."MySeq"\'::regclass)') assert schema == "MySchema" assert name == "MySeq" From 7077b3a18534b61e8975dab263e16752b11dbcf8 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Mon, 19 Jan 2026 18:58:58 +0000 Subject: [PATCH 5/5] Fix pyright error for None sql_driver in test Add type: ignore comment since _parse_sequence_name doesn't use sql_driver. Co-Authored-By: Claude Opus 4.5 --- tests/unit/database_health/test_sequence_health_calc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/database_health/test_sequence_health_calc.py b/tests/unit/database_health/test_sequence_health_calc.py index 70cc7961..a108a8c0 100644 --- a/tests/unit/database_health/test_sequence_health_calc.py +++ b/tests/unit/database_health/test_sequence_health_calc.py @@ -13,7 +13,7 @@ def calc(self): """Create a SequenceHealthCalc instance with a mock driver.""" # We only need the instance for calling _parse_sequence_name # which doesn't use the sql_driver - return SequenceHealthCalc(sql_driver=None) + return SequenceHealthCalc(sql_driver=None) # type: ignore[arg-type] def test_simple_sequence(self, calc): """Parse simple unquoted sequence name."""