From 4bc2fff675a894061629e0a6935787a3db3b08a3 Mon Sep 17 00:00:00 2001 From: Edwin <9cb14c1ec0@sendmeemail.xyz> Date: Thu, 19 Feb 2026 09:46:49 -0500 Subject: [PATCH 1/2] per-team + monthly log partitioning Two-level PostgreSQL declarative partitioning for the logs table: - Level 1: LIST by team_id (isolates each team's data) - Level 2: RANGE by created_at monthly (instant retention via partition drops) Co-Authored-By: Claude Opus 4.6 --- backend/app/api/admin.py | 4 + backend/app/main.py | 9 + backend/app/services/partitions.py | 204 ++++++++++++++++++ backend/app/services/retention.py | 20 +- .../migrations/002_partition_logs_by_team.sql | 120 +++++++++++ 5 files changed, 340 insertions(+), 17 deletions(-) create mode 100644 backend/app/services/partitions.py create mode 100644 backend/migrations/002_partition_logs_by_team.sql diff --git a/backend/app/api/admin.py b/backend/app/api/admin.py index c4270bb..8336ce9 100644 --- a/backend/app/api/admin.py +++ b/backend/app/api/admin.py @@ -7,6 +7,7 @@ MembershipCreate, MembershipResponse, ) from app.api.deps import AdminUser +from app.services.partitions import create_team_partition, drop_team_partition router = APIRouter() @@ -116,6 +117,8 @@ async def create_team(admin: AdminUser, data: TeamCreate): retention_days=data.retention_days, ) + await create_team_partition(team.id) + return TeamWithKey( id=team.id, name=team.name, @@ -163,6 +166,7 @@ async def delete_team(admin: AdminUser, team_id: UUID): if team is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found") + await drop_team_partition(team.id) await team.delete() return {"message": "Team deleted"} diff --git a/backend/app/main.py b/backend/app/main.py index 2d9d205..1fb91ea 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,6 +9,7 @@ from app.api import api_router from app.models import User from app.services.retention import RetentionService +from app.services.partitions import ensure_upcoming_partitions from app.migrations import run_migrations settings = get_settings() @@ -35,6 +36,7 @@ async def lifespan(app: FastAPI): await init_db() await run_migrations() await create_admin_user() + await ensure_upcoming_partitions() # Start retention cleanup scheduler (runs every hour) scheduler.add_job( @@ -43,6 +45,13 @@ async def lifespan(app: FastAPI): id="retention_cleanup", replace_existing=True, ) + # Ensure next month's partitions exist (runs daily) + scheduler.add_job( + ensure_upcoming_partitions, + trigger=IntervalTrigger(hours=24), + id="partition_maintenance", + replace_existing=True, + ) scheduler.start() yield diff --git a/backend/app/services/partitions.py b/backend/app/services/partitions.py new file mode 100644 index 0000000..803fc8b --- /dev/null +++ b/backend/app/services/partitions.py @@ -0,0 +1,204 @@ +from datetime import datetime, timedelta, timezone +from uuid import UUID + +from tortoise import Tortoise +from tortoise.backends.base.client import BaseDBAsyncClient + +from app.models import Team + + +def _team_hex(team_id: UUID) -> str: + """Return the hex representation of a team UUID (no dashes) for use in partition names.""" + return team_id.hex + + +def _partition_name(team_id: UUID) -> str: + """Top-level team partition name: logs_.""" + return f"logs_{_team_hex(team_id)}" + + +def _monthly_partition_name(team_id: UUID, year: int, month: int) -> str: + """Monthly sub-partition name: logs__YYYY_MM.""" + return f"logs_{_team_hex(team_id)}_{year:04d}_{month:02d}" + + +def _default_subpartition_name(team_id: UUID) -> str: + """Default sub-partition for a team: logs__default.""" + return f"logs_{_team_hex(team_id)}_default" + + +async def _get_conn() -> BaseDBAsyncClient: + return Tortoise.get_connection("default") + + +async def _table_exists(conn: BaseDBAsyncClient, table_name: str) -> bool: + rows = await conn.execute_query_dict( + "SELECT 1 FROM pg_class WHERE relname = $1 AND relkind IN ('r', 'p')", + [table_name], + ) + return len(rows) > 0 + + +async def create_team_partition(team_id: UUID) -> None: + """Create team-level LIST partition + default sub-partition + current & next month.""" + conn = await _get_conn() + part_name = _partition_name(team_id) + + if await _table_exists(conn, part_name): + return + + team_id_str = str(team_id) + + # Create team partition (sub-partitioned by range on created_at) + await conn.execute_script( + f"CREATE TABLE {part_name} PARTITION OF logs " + f"FOR VALUES IN ('{team_id_str}') " + f"PARTITION BY RANGE (created_at);" + ) + + # Create default sub-partition (safety net) + default_name = _default_subpartition_name(team_id) + await conn.execute_script( + f"CREATE TABLE {default_name} PARTITION OF {part_name} DEFAULT;" + ) + + # Create current and next month sub-partitions + now = datetime.now(timezone.utc) + await create_monthly_partition(team_id, now.year, now.month) + + # Next month + next_month = (now.replace(day=1) + timedelta(days=32)).replace(day=1) + await create_monthly_partition(team_id, next_month.year, next_month.month) + + +async def create_monthly_partition(team_id: UUID, year: int, month: int) -> None: + """Create a single monthly RANGE sub-partition for a team.""" + conn = await _get_conn() + part_name = _monthly_partition_name(team_id, year, month) + + if await _table_exists(conn, part_name): + return + + team_part = _partition_name(team_id) + + # Calculate range boundaries: [start_of_month, start_of_next_month) + start = datetime(year, month, 1, tzinfo=timezone.utc) + if month == 12: + end = datetime(year + 1, 1, 1, tzinfo=timezone.utc) + else: + end = datetime(year, month + 1, 1, tzinfo=timezone.utc) + + start_str = start.strftime("%Y-%m-%d") + end_str = end.strftime("%Y-%m-%d") + + await conn.execute_script( + f"CREATE TABLE {part_name} PARTITION OF {team_part} " + f"FOR VALUES FROM ('{start_str}') TO ('{end_str}');" + ) + + +async def drop_team_partition(team_id: UUID) -> None: + """Drop team partition (cascades to all monthly sub-partitions). Instant.""" + conn = await _get_conn() + part_name = _partition_name(team_id) + + if not await _table_exists(conn, part_name): + return + + await conn.execute_script(f"DROP TABLE {part_name};") + + +async def drop_monthly_partition(team_id: UUID, year: int, month: int) -> None: + """Drop a specific monthly sub-partition. Used for retention.""" + conn = await _get_conn() + part_name = _monthly_partition_name(team_id, year, month) + + if not await _table_exists(conn, part_name): + return + + await conn.execute_script(f"DROP TABLE {part_name};") + + +async def ensure_upcoming_partitions() -> None: + """For all teams, ensure current + next month sub-partitions exist. + Called by scheduler and at startup.""" + now = datetime.now(timezone.utc) + next_month = (now.replace(day=1) + timedelta(days=32)).replace(day=1) + + teams = await Team.all() + conn = await _get_conn() + + for team in teams: + team_part = _partition_name(team.id) + + # Only create monthly partitions if the team partition exists + if not await _table_exists(conn, team_part): + # Team partition missing — create the full structure + await create_team_partition(team.id) + else: + # Just ensure current + next month exist + await create_monthly_partition(team.id, now.year, now.month) + await create_monthly_partition(team.id, next_month.year, next_month.month) + + +async def cleanup_expired_partitions() -> None: + """For teams with retention_days, drop monthly partitions older than cutoff. + For partial-month boundaries, use DELETE on the boundary month.""" + conn = await _get_conn() + teams = await Team.filter(retention_days__isnull=False) + + for team in teams: + if team.retention_days is None: + continue + + cutoff = datetime.now(timezone.utc) - timedelta(days=team.retention_days) + team_part = _partition_name(team.id) + + if not await _table_exists(conn, team_part): + continue + + # Find all monthly sub-partitions for this team + rows = await conn.execute_query_dict( + "SELECT c.relname FROM pg_class c " + "JOIN pg_inherits i ON c.oid = i.inhrelid " + "JOIN pg_class p ON p.oid = i.inhparent " + "WHERE p.relname = $1 AND c.relname != $2 " + "ORDER BY c.relname", + [team_part, _default_subpartition_name(team.id)], + ) + + for row in rows: + relname = row["relname"] + + # Parse partition name: logs__YYYY_MM + parts = relname.rsplit("_", 2) + if len(parts) < 3: + continue + try: + p_year = int(parts[-2]) + p_month = int(parts[-1]) + except ValueError: + continue + + # End of this partition's month + if p_month == 12: + partition_end = datetime(p_year + 1, 1, 1, tzinfo=timezone.utc) + else: + partition_end = datetime(p_year, p_month + 1, 1, tzinfo=timezone.utc) + + partition_start = datetime(p_year, p_month, 1, tzinfo=timezone.utc) + + if partition_end <= cutoff: + # Entire partition is expired — drop it (instant) + await conn.execute_script(f"DROP TABLE {relname};") + print(f"Dropped expired partition {relname}") + elif partition_start < cutoff < partition_end: + # Partial month — delete rows before cutoff + cutoff_str = cutoff.strftime("%Y-%m-%d %H:%M:%S+00") + result = await conn.execute_query( + f"DELETE FROM {relname} WHERE created_at < $1", + [cutoff_str], + ) + deleted = result[0] + if deleted: + print(f"Deleted {deleted} rows from boundary partition {relname}") diff --git a/backend/app/services/retention.py b/backend/app/services/retention.py index e1d67b3..4534972 100644 --- a/backend/app/services/retention.py +++ b/backend/app/services/retention.py @@ -1,22 +1,8 @@ -from datetime import datetime, timedelta, timezone -from app.models import Team, Log +from app.services.partitions import cleanup_expired_partitions class RetentionService: @staticmethod async def cleanup_expired_logs(): - """Delete logs older than their team's retention period.""" - teams = await Team.filter(retention_days__isnull=False) - - for team in teams: - if team.retention_days is None: - continue - - cutoff = datetime.now(timezone.utc) - timedelta(days=team.retention_days) - deleted = await Log.filter( - team=team, - created_at__lt=cutoff - ).delete() - - if deleted: - print(f"Deleted {deleted} expired logs for team {team.name}") + """Drop expired monthly partitions and delete partial-month rows.""" + await cleanup_expired_partitions() diff --git a/backend/migrations/002_partition_logs_by_team.sql b/backend/migrations/002_partition_logs_by_team.sql new file mode 100644 index 0000000..043116f --- /dev/null +++ b/backend/migrations/002_partition_logs_by_team.sql @@ -0,0 +1,120 @@ +-- Migration: Convert logs table to two-level partitioned structure +-- Level 1: LIST by team_id (isolates each team's data) +-- Level 2: RANGE by created_at monthly (enables instant retention via partition drops) +-- +-- Idempotency: Checks if logs is already partitioned before running. + +DO $$ +DECLARE + rec RECORD; + month_rec RECORD; + team_hex TEXT; + team_part TEXT; + default_sub TEXT; + monthly_part TEXT; + start_date DATE; + end_date DATE; + max_id BIGINT; +BEGIN + -- Check if logs is already partitioned — if so, skip entire migration + IF EXISTS ( + SELECT 1 FROM pg_partitioned_table pt + JOIN pg_class c ON c.oid = pt.partrelid + WHERE c.relname = 'logs' + ) THEN + RAISE NOTICE 'logs table is already partitioned, skipping migration'; + RETURN; + END IF; + + -- 1. Rename existing table + ALTER TABLE logs RENAME TO logs_old; + + -- Rename the sequence if it exists so we can reuse the name + IF EXISTS (SELECT 1 FROM pg_sequences WHERE sequencename = 'logs_id_seq') THEN + ALTER SEQUENCE logs_id_seq RENAME TO logs_old_id_seq; + END IF; + + -- 2. Create partitioned parent table + -- PK includes team_id and created_at since they are partition keys. + -- id uses a shared sequence for global uniqueness. + CREATE SEQUENCE logs_id_seq; + + CREATE TABLE logs ( + id BIGINT NOT NULL DEFAULT nextval('logs_id_seq'), + team_id UUID NOT NULL, + "timestamp" TIMESTAMPTZ NOT NULL, + level VARCHAR(5) NOT NULL, + message TEXT NOT NULL, + metadata JSONB, + source VARCHAR(255), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (id, team_id, created_at) + ) PARTITION BY LIST (team_id); + + ALTER SEQUENCE logs_id_seq OWNED BY logs.id; + + -- Create indexes on the parent (inherited by all partitions) + CREATE INDEX idx_logs_team_id_id ON logs (team_id, id DESC); + CREATE INDEX idx_logs_timestamp ON logs ("timestamp"); + CREATE INDEX idx_logs_level ON logs (level); + CREATE INDEX idx_logs_source ON logs (source); + CREATE INDEX idx_logs_created_at ON logs (created_at); + + -- 3. Create top-level default partition (safety net, itself sub-partitioned) + CREATE TABLE logs_default PARTITION OF logs DEFAULT + PARTITION BY RANGE (created_at); + CREATE TABLE logs_default_default PARTITION OF logs_default DEFAULT; + + -- 4. For each existing team, create partitions + FOR rec IN SELECT id FROM teams LOOP + team_hex := REPLACE(rec.id::TEXT, '-', ''); + team_part := 'logs_' || team_hex; + default_sub := team_part || '_default'; + + -- Create team partition (sub-partitioned by range on created_at) + EXECUTE format( + 'CREATE TABLE %I PARTITION OF logs FOR VALUES IN (%L) PARTITION BY RANGE (created_at)', + team_part, rec.id::TEXT + ); + + -- Create default sub-partition (safety net) + EXECUTE format( + 'CREATE TABLE %I PARTITION OF %I DEFAULT', + default_sub, team_part + ); + + -- Create monthly sub-partitions for each month that has data + FOR month_rec IN + SELECT DISTINCT date_trunc('month', created_at)::DATE AS month_start + FROM logs_old + WHERE team_id = rec.id + ORDER BY month_start + LOOP + start_date := month_rec.month_start; + end_date := (month_rec.month_start + INTERVAL '1 month')::DATE; + monthly_part := team_part || '_' || to_char(start_date, 'YYYY_MM'); + + EXECUTE format( + 'CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)', + monthly_part, team_part, start_date, end_date + ); + END LOOP; + END LOOP; + + -- 5. Copy all data from old table + INSERT INTO logs (id, team_id, "timestamp", level, message, metadata, source, created_at) + SELECT id, team_id, "timestamp", level, message, metadata, source, created_at + FROM logs_old; + + -- 6. Reset the sequence to continue after the max ID + SELECT COALESCE(MAX(id), 0) INTO max_id FROM logs; + PERFORM setval('logs_id_seq', max_id + 1, false); + + -- 7. Drop the old table and its sequence + DROP TABLE logs_old; + IF EXISTS (SELECT 1 FROM pg_sequences WHERE sequencename = 'logs_old_id_seq') THEN + DROP SEQUENCE logs_old_id_seq; + END IF; + + RAISE NOTICE 'Successfully partitioned logs table by team_id and created_at'; +END $$; From 426655fd07571ccbfb114676ca3eb8d35548a3fe Mon Sep 17 00:00:00 2001 From: Edwin <9cb14c1ec0@sendmeemail.xyz> Date: Thu, 19 Feb 2026 09:55:38 -0500 Subject: [PATCH 2/2] add identifier validation guards for partition DDL Validate UUID hex and partition names against strict regexes before string-interpolating into SQL, preventing injection from unexpected inputs. Co-Authored-By: Claude Opus 4.6 --- backend/app/services/partitions.py | 38 +++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/backend/app/services/partitions.py b/backend/app/services/partitions.py index 803fc8b..2e22963 100644 --- a/backend/app/services/partitions.py +++ b/backend/app/services/partitions.py @@ -1,3 +1,4 @@ +import re from datetime import datetime, timedelta, timezone from uuid import UUID @@ -6,25 +7,39 @@ from app.models import Team +_HEX_RE = re.compile(r"[a-f0-9]{32}\Z") +_IDENT_RE = re.compile(r"^logs_[a-f0-9]{32}(?:_(?:\d{4}_\d{2}|default))?$") +ALLOWED_PARTITION_RE = re.compile(r"^logs_[0-9a-f]+_\d{4}_\d{2}$") + def _team_hex(team_id: UUID) -> str: - """Return the hex representation of a team UUID (no dashes) for use in partition names.""" - return team_id.hex + """Return the validated hex representation of a team UUID (no dashes).""" + hex_str = team_id.hex + if not _HEX_RE.fullmatch(hex_str): + raise ValueError(f"Invalid UUID hex: {hex_str!r}") + return hex_str + + +def _safe_ident(name: str) -> str: + """Validate that a partition identifier matches the expected naming pattern.""" + if not _IDENT_RE.match(name): + raise ValueError(f"Invalid partition identifier: {name!r}") + return name def _partition_name(team_id: UUID) -> str: """Top-level team partition name: logs_.""" - return f"logs_{_team_hex(team_id)}" + return _safe_ident(f"logs_{_team_hex(team_id)}") def _monthly_partition_name(team_id: UUID, year: int, month: int) -> str: """Monthly sub-partition name: logs__YYYY_MM.""" - return f"logs_{_team_hex(team_id)}_{year:04d}_{month:02d}" + return _safe_ident(f"logs_{_team_hex(team_id)}_{year:04d}_{month:02d}") def _default_subpartition_name(team_id: UUID) -> str: """Default sub-partition for a team: logs__default.""" - return f"logs_{_team_hex(team_id)}_default" + return _safe_ident(f"logs_{_team_hex(team_id)}_default") async def _get_conn() -> BaseDBAsyncClient: @@ -47,11 +62,14 @@ async def create_team_partition(team_id: UUID) -> None: if await _table_exists(conn, part_name): return + # Validate hex before using in DDL; _partition_name already called _team_hex + # but we also need the canonical UUID string for the LIST value. + _team_hex(team_id) # raises on invalid hex team_id_str = str(team_id) # Create team partition (sub-partitioned by range on created_at) await conn.execute_script( - f"CREATE TABLE {part_name} PARTITION OF logs " + f"CREATE TABLE {part_name} PARTITION OF logs " # noqa: S608 f"FOR VALUES IN ('{team_id_str}') " f"PARTITION BY RANGE (created_at);" ) @@ -170,6 +188,10 @@ async def cleanup_expired_partitions() -> None: for row in rows: relname = row["relname"] + # Defensive guard: only operate on names matching expected pattern + if not ALLOWED_PARTITION_RE.match(relname): + continue + # Parse partition name: logs__YYYY_MM parts = relname.rsplit("_", 2) if len(parts) < 3: @@ -190,13 +212,13 @@ async def cleanup_expired_partitions() -> None: if partition_end <= cutoff: # Entire partition is expired — drop it (instant) - await conn.execute_script(f"DROP TABLE {relname};") + await conn.execute_script(f"DROP TABLE {relname};") # noqa: S608 print(f"Dropped expired partition {relname}") elif partition_start < cutoff < partition_end: # Partial month — delete rows before cutoff cutoff_str = cutoff.strftime("%Y-%m-%d %H:%M:%S+00") result = await conn.execute_query( - f"DELETE FROM {relname} WHERE created_at < $1", + f"DELETE FROM {relname} WHERE created_at < $1", # noqa: S608 [cutoff_str], ) deleted = result[0]