diff --git a/CHANGELOG.md b/CHANGELOG.md index 41a8e04..dbc0b6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,39 @@ ## Unreleased +**Phase 2a: Fix Commands** + +Complete the diagnose→fix→verify loop with safe remediation commands. + +### New Commands + +- **`pgcrate vacuum`**: Table bloat and vacuum health diagnostic +- **`pgcrate fix sequence`**: Upgrade sequence types (smallint→integer→bigint) to prevent exhaustion +- **`pgcrate fix index --drop`**: Safely drop unused/duplicate indexes with `DROP INDEX CONCURRENTLY` +- **`pgcrate fix vacuum`**: Run VACUUM on tables (regular, freeze, full, analyze) + +### Fix Command Features + +- **Gate system**: `--read-write`, `--primary`, `--yes` flags required based on operation risk +- **Dry-run mode**: All fix commands support `--dry-run` to preview SQL +- **SQL preview**: Fix actions include exact SQL that will be executed +- **Evidence collection**: Detailed context about the issue being fixed +- **Safety checks**: Block dangerous operations (e.g., cannot drop primary key index) +- **Verification**: `--verify` flag runs post-fix validation + +### Triage Enhancements + +- **`--include-fixes` flag**: Returns structured fix actions for detected issues +- **StructuredAction format**: Each action includes command, args, risk level, gates, evidence, and verify steps + +### Index Evidence Improvements + +- Added `stats_since` and `stats_age_days` for confidence in usage statistics +- Added `backing_constraint` field for indexes backing constraints +- Added `is_replica_identity` field for logical replication safety + +--- + **Phase 1: JSON Contracts Foundation** ### Breaking Changes diff --git a/Cargo.lock b/Cargo.lock index 15712e5..41f14fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -856,6 +856,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef4605b7c057056dd35baeb6ac0c0338e4975b1f2bef0f65da953285eb007095" dependencies = [ "bytes", + "chrono", "fallible-iterator", "postgres-protocol", ] diff --git a/Cargo.toml b/Cargo.toml index b0987cf..67367e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ exclude = ["target/", "website/"] bytes = "1" clap = { version = "4", features = ["derive"] } dialoguer = "0.11" -tokio-postgres = "0.7" +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "time", "signal"] } colored = "2" chrono = { version = "0.4", features = ["serde"] } diff --git a/README.md b/README.md index 4499702..11d40ff 100644 --- a/README.md +++ b/README.md @@ -135,13 +135,14 @@ Agent-friendly health checks with JSON output for automation: ```bash pgcrate triage # Quick health overview (locks, xid, sequences) -pgcrate triage --json # Machine-readable JSON output +pgcrate triage --include-fixes --json # Include recommended fix actions pgcrate context --json # Connection context, server info, privileges pgcrate capabilities --json # What can this connection do? pgcrate locks # Blocking locks and long transactions pgcrate xid # Transaction ID wraparound analysis pgcrate sequences # Sequence exhaustion check pgcrate indexes # Missing, unused, duplicate indexes +pgcrate vacuum # Table bloat and vacuum health ``` All diagnostic commands support timeout flags for production safety: @@ -149,6 +150,41 @@ All diagnostic commands support timeout flags for production safety: - `--statement-timeout ` - Query timeout (default: 30000ms) - `--lock-timeout ` - Lock wait timeout (default: 500ms) +### Fix Commands + +Safe remediation for issues found by diagnostics: + +```bash +# Sequence fixes (prevent exhaustion) +pgcrate fix sequence public.order_seq --upgrade-to bigint --dry-run +pgcrate fix sequence public.order_seq --upgrade-to bigint --yes + +# Index fixes (remove unused indexes) +pgcrate fix index --drop public.idx_unused --dry-run +pgcrate fix index --drop public.idx_unused --yes + +# Vacuum fixes (reclaim space) +pgcrate fix vacuum public.orders --dry-run +pgcrate fix vacuum public.orders --yes +pgcrate fix vacuum public.orders --full --yes # ACCESS EXCLUSIVE lock +pgcrate fix vacuum public.orders --analyze --yes # Update statistics +``` + +**Gate flags required for fix commands:** +- `--read-write` - Required for all fix operations +- `--primary` - Required for database-modifying operations +- `--yes` - Required for medium/high risk operations + +**Risk levels:** +- **Low:** `ALTER SEQUENCE`, regular `VACUUM` +- **Medium:** `DROP INDEX CONCURRENTLY` (requires `--yes`) +- **High:** `VACUUM FULL` (requires `--yes`, takes exclusive lock) + +Fix commands include evidence collection, safety checks, and optional verification: +```bash +pgcrate --read-write --primary fix sequence public.order_seq --upgrade-to bigint --yes --verify +``` + ### Data Operations ```bash @@ -272,6 +308,10 @@ DROP TABLE users; | `pgcrate xid` | Transaction ID wraparound analysis | | `pgcrate sequences` | Sequence exhaustion check | | `pgcrate indexes` | Missing, unused, duplicate indexes | +| `pgcrate vacuum` | Table bloat and vacuum health | +| `pgcrate fix sequence` | Upgrade sequence type to prevent exhaustion | +| `pgcrate fix index` | Drop unused/duplicate indexes | +| `pgcrate fix vacuum` | Run VACUUM on tables | | `pgcrate doctor` | Run health checks | | `pgcrate bootstrap` | Setup environment with anonymized data from source | | `pgcrate snapshot ` | Save (with profiles), restore, list, or delete snapshots | diff --git a/llms.txt b/llms.txt index 5db0eb2..0a35f45 100644 --- a/llms.txt +++ b/llms.txt @@ -736,6 +736,75 @@ CREATE TABLE IF NOT EXISTS pgcrate.schema_migrations ( - **Schema versioning**: Table is created with IF NOT EXISTS for compatibility - **Timestamps**: Uses TIMESTAMPTZ with UTC for consistency +## DIAGNOSTICS + +### Health Check Commands + +```bash +# Quick triage (locks, xid, sequences) +pgcrate triage +pgcrate triage --json +pgcrate triage --include-fixes --json # Include recommended fix actions + +# Individual diagnostics +pgcrate locks # Blocking locks and long transactions +pgcrate xid # Transaction ID wraparound analysis +pgcrate sequences # Sequence exhaustion check +pgcrate indexes # Missing, unused, duplicate indexes +pgcrate vacuum # Table bloat and vacuum health + +# Connection context +pgcrate context --json # Connection info, server version, privileges +pgcrate capabilities --json # What can this connection do? +``` + +**Timeout Flags (for production safety):** +- `--connect-timeout ` - Connection timeout (default: 5000ms) +- `--statement-timeout ` - Query timeout (default: 30000ms) +- `--lock-timeout ` - Lock wait timeout (default: 500ms) + +**Exit Codes:** +- `0` = healthy +- `1` = warning +- `2` = critical +- `10+` = operational failure + +### Fix Commands + +Safe remediation for issues found by diagnostics: + +```bash +# Sequence fixes (prevent exhaustion) +pgcrate --read-write --primary fix sequence public.order_seq --upgrade-to bigint --dry-run +pgcrate --read-write --primary fix sequence public.order_seq --upgrade-to bigint --yes + +# Index fixes (remove unused indexes) +pgcrate --read-write --primary fix index --drop public.idx_unused --dry-run +pgcrate --read-write --primary fix index --drop public.idx_unused --yes + +# Vacuum fixes (reclaim space) +pgcrate --read-write --primary fix vacuum public.orders --dry-run +pgcrate --read-write --primary fix vacuum public.orders --yes +pgcrate --read-write --primary fix vacuum public.orders --full --yes # ACCESS EXCLUSIVE lock +pgcrate --read-write --primary fix vacuum public.orders --analyze --yes # Update statistics +``` + +**Gate Flags (required for fix operations):** +- `--read-write` - Required for all fix operations +- `--primary` - Required for database-modifying operations +- `--yes` - Required for medium/high risk operations + +**Risk Levels:** +- **Low:** `ALTER SEQUENCE`, regular `VACUUM` +- **Medium:** `DROP INDEX CONCURRENTLY` (requires `--yes`) +- **High:** `VACUUM FULL` (requires `--yes`, takes exclusive lock) + +**Verification:** +```bash +# Run post-fix verification +pgcrate --read-write --primary fix sequence public.order_seq --upgrade-to bigint --yes --verify +``` + ## GLOBAL FLAGS - `-d, --database-url `: Override database connection @@ -760,6 +829,17 @@ Currently, `--json` is supported for these commands: - `snapshot info` - Snapshot details - `sql` - SQL query results - `status` - Migration status (alias for `migrate status`) +- `triage` - Health overview with actions +- `context` - Connection context and server info +- `capabilities` - Permission discovery +- `locks` - Blocking locks and transactions +- `xid` - Transaction ID wraparound +- `sequences` - Sequence exhaustion check +- `indexes` - Index health analysis +- `vacuum` - Table bloat analysis +- `fix sequence` - Sequence upgrade result +- `fix index` - Index drop result +- `fix vacuum` - Vacuum result For unsupported commands, `--json` returns a JSON error: `"--json not supported for '' yet"`. diff --git a/src/commands/fix/common.rs b/src/commands/fix/common.rs new file mode 100644 index 0000000..643967f --- /dev/null +++ b/src/commands/fix/common.rs @@ -0,0 +1,392 @@ +//! Common types for fix commands. +//! +//! These types match the action.schema.json contract for structured actions. + +use serde::Serialize; + +/// Action type classification +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum ActionType { + /// Remediation fix + Fix, +} + +/// Risk level for an action +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Risk { + /// Low risk (non-blocking, reversible) + Low, + /// Medium risk (may affect performance, requires confirmation) + Medium, + /// High risk (may block operations, requires explicit confirmation) + High, +} + +/// Gates that must be satisfied before an action can execute +#[derive(Debug, Clone, Default, Serialize)] +pub struct ActionGates { + /// Requires --read-write flag (not in read-only mode) + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub requires_write: bool, + /// Requires --primary flag (confirmed connection to primary) + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub requires_primary: bool, + /// Requires --yes flag (explicit confirmation) + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub requires_confirmation: bool, +} + +impl ActionGates { + pub fn write_primary() -> Self { + Self { + requires_write: true, + requires_primary: true, + ..Default::default() + } + } + + pub fn write_primary_confirm() -> Self { + Self { + requires_write: true, + requires_primary: true, + requires_confirmation: true, + } + } +} + +/// A verification step to run after a fix +#[derive(Debug, Clone, Serialize)] +pub struct VerifyStep { + /// Human-readable description + pub description: String, + /// Command to run (e.g., "pgcrate sequences --json") + pub command: String, + /// JSONPath expression for expected result + pub expected: String, +} + +/// A structured action that can be executed +#[derive(Debug, Clone, Serialize)] +pub struct StructuredAction { + /// Unique identifier (e.g., "fix.sequence.upgrade-bigint.public.order_seq") + pub action_id: String, + /// Type of action + pub action_type: ActionType, + /// Command to run + pub command: &'static str, + /// Command arguments + pub args: Vec, + /// Human-readable description + pub description: String, + /// Whether this action is available (gates are satisfied) + pub available: bool, + /// Whether this action mutates state + pub mutates: bool, + /// Risk level + pub risk: Risk, + /// Gate requirements + pub gates: ActionGates, + /// SQL that will be executed (for preview) + #[serde(skip_serializing_if = "Option::is_none")] + pub sql_preview: Option>, + /// Evidence supporting this action + #[serde(skip_serializing_if = "Option::is_none")] + pub evidence: Option, + /// Verification steps to run after fix + #[serde(skip_serializing_if = "Option::is_none")] + pub verify: Option>, + /// Reason action is blocked (if available=false) + #[serde(skip_serializing_if = "Option::is_none")] + pub blocked_reason: Option, +} + +impl StructuredAction { + /// Create a new action builder + pub fn builder(action_id: impl Into, action_type: ActionType) -> ActionBuilder { + ActionBuilder::new(action_id.into(), action_type) + } +} + +/// Builder for StructuredAction +pub struct ActionBuilder { + action_id: String, + action_type: ActionType, + command: &'static str, + args: Vec, + description: String, + mutates: bool, + risk: Risk, + gates: ActionGates, + sql_preview: Option>, + evidence: Option, + verify: Option>, +} + +impl ActionBuilder { + pub fn new(action_id: String, action_type: ActionType) -> Self { + Self { + action_id, + action_type, + command: "pgcrate", + args: Vec::new(), + description: String::new(), + mutates: false, + risk: Risk::Low, + gates: ActionGates::default(), + sql_preview: None, + evidence: None, + verify: None, + } + } + + pub fn command(mut self, command: &'static str) -> Self { + self.command = command; + self + } + + pub fn args(mut self, args: Vec) -> Self { + self.args = args; + self + } + + pub fn description(mut self, desc: impl Into) -> Self { + self.description = desc.into(); + self + } + + pub fn mutates(mut self, mutates: bool) -> Self { + self.mutates = mutates; + self + } + + pub fn risk(mut self, risk: Risk) -> Self { + self.risk = risk; + self + } + + pub fn gates(mut self, gates: ActionGates) -> Self { + self.gates = gates; + self + } + + pub fn sql_preview(mut self, sql: Vec) -> Self { + self.sql_preview = Some(sql); + self + } + + pub fn evidence(mut self, evidence: serde_json::Value) -> Self { + self.evidence = Some(evidence); + self + } + + pub fn verify(mut self, steps: Vec) -> Self { + self.verify = Some(steps); + self + } + + /// Build the action, checking gates against current state + pub fn build(self, read_write: bool, is_primary: bool, confirmed: bool) -> StructuredAction { + let gate_result = check_gates(&self.gates, read_write, is_primary, confirmed); + + StructuredAction { + action_id: self.action_id, + action_type: self.action_type, + command: self.command, + args: self.args, + description: self.description, + available: gate_result.passed, + mutates: self.mutates, + risk: self.risk, + gates: self.gates, + sql_preview: self.sql_preview, + evidence: self.evidence, + verify: self.verify, + blocked_reason: gate_result.blocked_reason, + } + } +} + +/// Result of checking action gates +#[derive(Debug, Clone)] +pub struct GateCheckResult { + pub passed: bool, + pub blocked_reason: Option, +} + +/// Check whether gates are satisfied +fn check_gates( + gates: &ActionGates, + read_write: bool, + is_primary: bool, + confirmed: bool, +) -> GateCheckResult { + let mut missing_flags = Vec::new(); + + if gates.requires_write && !read_write { + missing_flags.push("--read-write"); + } + if gates.requires_primary && !is_primary { + missing_flags.push("--primary"); + } + if gates.requires_confirmation && !confirmed { + missing_flags.push("--yes"); + } + + if missing_flags.is_empty() { + GateCheckResult { + passed: true, + blocked_reason: None, + } + } else { + GateCheckResult { + passed: false, + blocked_reason: Some(format!( + "Missing required flags: {}", + missing_flags.join(", ") + )), + } + } +} + +/// Result of executing a fix command +#[derive(Debug, Clone, Serialize)] +pub struct FixResult { + /// Whether the fix was executed (vs dry-run) + pub executed: bool, + /// Whether the fix succeeded + pub success: bool, + /// SQL that was (or would be) executed + pub sql: Vec, + /// Human-readable summary + pub summary: String, + /// Optional error message + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + /// Verification results (if --verify was used) + #[serde(skip_serializing_if = "Option::is_none")] + pub verification: Option, +} + +/// Result of verification steps +#[derive(Debug, Clone, Serialize)] +pub struct VerificationResult { + /// Whether all verification steps passed + pub passed: bool, + /// Individual step results + pub steps: Vec, +} + +/// Result of a single verification step +#[derive(Debug, Clone, Serialize)] +pub struct VerifyStepResult { + pub description: String, + pub passed: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub actual: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Print fix result in human-readable format. +/// +/// The optional `dry_run_note` is printed after the SQL preview for dry runs. +pub fn print_fix_result(result: &FixResult, quiet: bool, dry_run_note: Option<&str>) { + if quiet { + if !result.success { + if let Some(err) = &result.error { + eprintln!("Error: {}", err); + } + } + return; + } + + if result.executed { + if result.success { + println!("SUCCESS: {}", result.summary); + } else { + println!("FAILED: {}", result.summary); + if let Some(err) = &result.error { + println!("Error: {}", err); + } + } + } else { + println!("DRY RUN: {}", result.summary); + println!(); + println!("SQL to execute:"); + for sql in &result.sql { + println!(" {}", sql); + } + println!(); + if let Some(note) = dry_run_note { + println!("{}", note); + } + println!("To execute, add --yes flag."); + } + + // Print verification results if present + if let Some(verification) = &result.verification { + println!(); + if verification.passed { + println!("VERIFICATION: PASSED"); + } else { + println!("VERIFICATION: FAILED"); + } + for step in &verification.steps { + let status = if step.passed { "✓" } else { "✗" }; + println!(" {} {}", status, step.description); + if let Some(err) = &step.error { + println!(" Error: {}", err); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_gates_all_pass() { + let gates = ActionGates::write_primary_confirm(); + let result = check_gates(&gates, true, true, true); + assert!(result.passed); + assert!(result.blocked_reason.is_none()); + } + + #[test] + fn test_gates_missing_write() { + let gates = ActionGates::write_primary(); + let result = check_gates(&gates, false, true, true); + assert!(!result.passed); + assert!(result.blocked_reason.unwrap().contains("--read-write")); + } + + #[test] + fn test_gates_missing_multiple() { + let gates = ActionGates::write_primary_confirm(); + let result = check_gates(&gates, false, false, false); + assert!(!result.passed); + let reason = result.blocked_reason.unwrap(); + assert!(reason.contains("--read-write")); + assert!(reason.contains("--primary")); + assert!(reason.contains("--yes")); + } + + #[test] + fn test_action_builder() { + let action = StructuredAction::builder("test.action", ActionType::Fix) + .description("Test action") + .mutates(true) + .risk(Risk::Low) + .gates(ActionGates::write_primary()) + .sql_preview(vec!["SELECT 1".to_string()]) + .build(true, true, true); + + assert_eq!(action.action_id, "test.action"); + assert!(action.available); + assert!(action.mutates); + assert_eq!(action.risk, Risk::Low); + } +} diff --git a/src/commands/fix/index.rs b/src/commands/fix/index.rs new file mode 100644 index 0000000..d28f9f9 --- /dev/null +++ b/src/commands/fix/index.rs @@ -0,0 +1,381 @@ +//! Fix index command: Safely drop unused/duplicate indexes. +//! +//! Indexes that are never used waste disk space and slow down writes. +//! This command provides safe index dropping with comprehensive evidence +//! and safety checks. + +use anyhow::{bail, Context, Result}; +use serde::Serialize; +use tokio_postgres::Client; + +use super::common::{ + print_fix_result, ActionGates, ActionType, FixResult, Risk, StructuredAction, VerifyStep, +}; +use crate::sql::quote_ident; + +/// Evidence for index drop action +#[derive(Debug, Clone, Serialize)] +pub struct IndexDropEvidence { + pub schema: String, + pub index_name: String, + pub table_name: String, + pub idx_scan: i64, + pub idx_tup_read: i64, + pub size_bytes: i64, + pub size_pretty: String, + pub is_unique: bool, + pub is_primary_key: bool, + pub is_replica_identity: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub backing_constraint: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stats_since: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub stats_age_days: Option, +} + +/// Safety check result +#[derive(Debug, Clone)] +pub struct SafetyCheck { + pub can_drop: bool, + pub reason: Option, +} + +/// Get detailed information about an index for dropping +pub async fn get_index_info( + client: &Client, + schema: &str, + name: &str, +) -> Result { + // Get index details including safety information + let query = r#" + SELECT + n.nspname as schema_name, + i.relname as index_name, + t.relname as table_name, + COALESCE(s.idx_scan, 0) as idx_scan, + COALESCE(s.idx_tup_read, 0) as idx_tup_read, + pg_relation_size(i.oid) as size_bytes, + pg_size_pretty(pg_relation_size(i.oid)) as size_pretty, + ix.indisunique as is_unique, + ix.indisprimary as is_primary_key, + ix.indisreplident as is_replica_identity, + c.conname as constraint_name + FROM pg_class i + JOIN pg_index ix ON i.oid = ix.indexrelid + JOIN pg_class t ON t.oid = ix.indrelid + JOIN pg_namespace n ON n.oid = i.relnamespace + LEFT JOIN pg_stat_user_indexes s ON s.indexrelid = i.oid + LEFT JOIN pg_constraint c ON c.conindid = i.oid + WHERE n.nspname = $1 AND i.relname = $2 + "#; + + let row = client + .query_opt(query, &[&schema, &name]) + .await + .context("Failed to query index")? + .ok_or_else(|| anyhow::anyhow!("Index {}.{} not found", schema, name))?; + + // Get stats reset time + let stats_query = r#" + SELECT + stats_reset, + (EXTRACT(EPOCH FROM (now() - stats_reset)) / 86400)::int as days_old + FROM pg_stat_database + WHERE datname = current_database() + "#; + + let (stats_since, stats_age_days) = match client.query_opt(stats_query, &[]).await? { + Some(stats_row) => { + let reset: Option> = stats_row.get("stats_reset"); + let days: Option = stats_row.get("days_old"); + (reset.map(|r| r.to_rfc3339()), days) + } + None => (None, None), + }; + + Ok(IndexDropEvidence { + schema: row.get("schema_name"), + index_name: row.get("index_name"), + table_name: row.get("table_name"), + idx_scan: row.get("idx_scan"), + idx_tup_read: row.get("idx_tup_read"), + size_bytes: row.get("size_bytes"), + size_pretty: row.get("size_pretty"), + is_unique: row.get("is_unique"), + is_primary_key: row.get("is_primary_key"), + is_replica_identity: row.get("is_replica_identity"), + backing_constraint: row.get("constraint_name"), + stats_since, + stats_age_days, + }) +} + +/// Check if an index is safe to drop +pub fn check_safety(evidence: &IndexDropEvidence) -> SafetyCheck { + if evidence.is_primary_key { + return SafetyCheck { + can_drop: false, + reason: Some( + "Cannot drop primary key index. Drop the primary key constraint instead." + .to_string(), + ), + }; + } + + if evidence.is_replica_identity { + return SafetyCheck { + can_drop: false, + reason: Some( + "Cannot drop replica identity index. It is used for logical replication." + .to_string(), + ), + }; + } + + if let Some(constraint) = &evidence.backing_constraint { + return SafetyCheck { + can_drop: false, + reason: Some(format!( + "Cannot drop index directly. It backs constraint '{}'. Drop the constraint instead.", + constraint + )), + }; + } + + SafetyCheck { + can_drop: true, + reason: None, + } +} + +/// Generate SQL for dropping an index +pub fn generate_drop_sql(schema: &str, name: &str, concurrent: bool) -> String { + if concurrent { + format!( + "DROP INDEX CONCURRENTLY {}.{};", + quote_ident(schema), + quote_ident(name) + ) + } else { + format!("DROP INDEX {}.{};", quote_ident(schema), quote_ident(name)) + } +} + +/// Execute index drop +pub async fn execute_drop( + client: &Client, + schema: &str, + name: &str, + dry_run: bool, +) -> Result { + // Get current state and check safety + let evidence = get_index_info(client, schema, name).await?; + let safety = check_safety(&evidence); + + if !safety.can_drop { + bail!( + "{}", + safety + .reason + .unwrap_or_else(|| "Cannot drop index".to_string()) + ); + } + + // Use CONCURRENTLY to avoid blocking + let sql = generate_drop_sql(schema, name, true); + + if dry_run { + return Ok(FixResult { + executed: false, + success: true, + sql: vec![sql], + summary: format!( + "Would drop index {}.{} ({}, {} scans)", + schema, name, evidence.size_pretty, evidence.idx_scan + ), + error: None, + verification: None, + }); + } + + // Execute the drop + // Note: DROP INDEX CONCURRENTLY cannot run in a transaction + match client.batch_execute(&sql).await { + Ok(_) => Ok(FixResult { + executed: true, + success: true, + sql: vec![sql], + summary: format!( + "Dropped index {}.{} ({} reclaimed)", + schema, name, evidence.size_pretty + ), + error: None, + verification: None, + }), + Err(e) => Ok(FixResult { + executed: true, + success: false, + sql: vec![sql], + summary: format!("Failed to drop index {}.{}", schema, name), + error: Some(e.to_string()), + verification: None, + }), + } +} + +/// Get verification steps for index drop. +pub fn get_verify_steps(index_name: &str) -> Vec { + vec![VerifyStep { + description: format!( + "Verify index {} no longer exists in unused list", + index_name + ), + command: "pgcrate indexes --json".to_string(), + expected: format!("$.data.unused[?(@.index=='{}')].length() == 0", index_name), + }] +} + +/// Create a structured action for index drop +pub fn create_drop_action( + evidence: &IndexDropEvidence, + read_write: bool, + is_primary: bool, + confirmed: bool, +) -> StructuredAction { + let action_id = format!("fix.index.drop.{}.{}", evidence.schema, evidence.index_name); + + let safety = check_safety(evidence); + let sql = generate_drop_sql(&evidence.schema, &evidence.index_name, true); + + let verify_steps = get_verify_steps(&evidence.index_name); + + let builder = StructuredAction::builder(action_id, ActionType::Fix) + .command("pgcrate") + .args(vec![ + "fix".to_string(), + "index".to_string(), + "--drop".to_string(), + format!("{}.{}", evidence.schema, evidence.index_name), + ]) + .description(format!( + "Drop unused index {}.{} ({}, {} scans since stats reset)", + evidence.schema, evidence.index_name, evidence.size_pretty, evidence.idx_scan + )) + .mutates(true) + .risk(Risk::Medium) // Requires confirmation, concurrent drop + .gates(ActionGates::write_primary_confirm()) + .sql_preview(vec![sql]) + .evidence(serde_json::to_value(evidence).unwrap_or_default()) + .verify(verify_steps); + + // If safety check fails, we still build the action but mark it unavailable + let mut action = builder.build(read_write, is_primary, confirmed); + + if !safety.can_drop { + action.available = false; + action.blocked_reason = safety.reason; + } + + action +} + +/// Print fix result in human-readable format +pub fn print_human(result: &FixResult, quiet: bool) { + print_fix_result( + result, + quiet, + Some("Note: Uses DROP INDEX CONCURRENTLY to avoid blocking."), + ); +} + +/// Print fix result as JSON +pub fn print_json( + result: &FixResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{DiagnosticOutput, Severity}; + + let severity = if result.success { + Severity::Healthy + } else { + Severity::Error + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts("pgcrate.fix.index", result, severity, t), + None => DiagnosticOutput::new("pgcrate.fix.index", result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_evidence() -> IndexDropEvidence { + IndexDropEvidence { + schema: "public".to_string(), + index_name: "idx_users_email".to_string(), + table_name: "users".to_string(), + idx_scan: 0, + idx_tup_read: 0, + size_bytes: 1048576, + size_pretty: "1 MB".to_string(), + is_unique: false, + is_primary_key: false, + is_replica_identity: false, + backing_constraint: None, + stats_since: Some("2024-01-01T00:00:00Z".to_string()), + stats_age_days: Some(30), + } + } + + #[test] + fn test_safety_check_normal_index() { + let evidence = test_evidence(); + let safety = check_safety(&evidence); + assert!(safety.can_drop); + assert!(safety.reason.is_none()); + } + + #[test] + fn test_safety_check_primary_key() { + let mut evidence = test_evidence(); + evidence.is_primary_key = true; + let safety = check_safety(&evidence); + assert!(!safety.can_drop); + assert!(safety.reason.unwrap().contains("primary key")); + } + + #[test] + fn test_safety_check_replica_identity() { + let mut evidence = test_evidence(); + evidence.is_replica_identity = true; + let safety = check_safety(&evidence); + assert!(!safety.can_drop); + assert!(safety.reason.unwrap().contains("replica identity")); + } + + #[test] + fn test_safety_check_backing_constraint() { + let mut evidence = test_evidence(); + evidence.backing_constraint = Some("users_email_key".to_string()); + let safety = check_safety(&evidence); + assert!(!safety.can_drop); + assert!(safety.reason.unwrap().contains("constraint")); + } + + #[test] + fn test_generate_drop_sql_concurrent() { + let sql = generate_drop_sql("public", "idx_test", true); + assert_eq!(sql, "DROP INDEX CONCURRENTLY \"public\".\"idx_test\";"); + } + + #[test] + fn test_generate_drop_sql_blocking() { + let sql = generate_drop_sql("public", "idx_test", false); + assert_eq!(sql, "DROP INDEX \"public\".\"idx_test\";"); + } +} diff --git a/src/commands/fix/mod.rs b/src/commands/fix/mod.rs new file mode 100644 index 0000000..8590bf3 --- /dev/null +++ b/src/commands/fix/mod.rs @@ -0,0 +1,13 @@ +//! Fix commands: Remediation actions for diagnostics findings. +//! +//! Fix commands are separate from diagnostic commands because they mutate state. +//! They follow a diagnose → fix → verify workflow with proper gating. + +pub mod common; +pub mod index; +pub mod sequence; +pub mod vacuum; +pub mod verify; + +// Re-export StructuredAction for triage --include-fixes +pub use common::StructuredAction; diff --git a/src/commands/fix/sequence.rs b/src/commands/fix/sequence.rs new file mode 100644 index 0000000..ea03cb0 --- /dev/null +++ b/src/commands/fix/sequence.rs @@ -0,0 +1,325 @@ +//! Fix sequence command: Upgrade sequence types to prevent exhaustion. +//! +//! Sequences can exhaust their maximum value based on data type: +//! - smallint: 32,767 +//! - integer: 2,147,483,647 +//! - bigint: 9,223,372,036,854,775,807 +//! +//! This command upgrades sequences to larger types (ALTER SEQUENCE ... AS type). + +use anyhow::{bail, Context, Result}; +use serde::Serialize; +use tokio_postgres::Client; + +use super::common::{ + print_fix_result, ActionGates, ActionType, FixResult, Risk, StructuredAction, VerifyStep, +}; +use crate::sql::quote_ident; + +/// Sequence type hierarchy +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum SequenceType { + SmallInt, + Integer, + BigInt, +} + +impl SequenceType { + pub fn from_str(s: &str) -> Option { + match s.to_lowercase().as_str() { + "smallint" => Some(SequenceType::SmallInt), + "integer" | "int" | "int4" => Some(SequenceType::Integer), + "bigint" | "int8" => Some(SequenceType::BigInt), + _ => None, + } + } + + pub fn to_sql(self) -> &'static str { + match self { + SequenceType::SmallInt => "smallint", + SequenceType::Integer => "integer", + SequenceType::BigInt => "bigint", + } + } + + pub fn max_value(&self) -> i64 { + match self { + SequenceType::SmallInt => 32_767, + SequenceType::Integer => 2_147_483_647, + SequenceType::BigInt => 9_223_372_036_854_775_807, + } + } +} + +/// Evidence for sequence upgrade action +#[derive(Debug, Clone, Serialize)] +pub struct SequenceEvidence { + pub current_type: String, + pub target_type: String, + pub current_pct: f64, + pub last_value: i64, + pub max_value: i64, + pub upgrade_max_value: i64, +} + +/// Information about a sequence for fix operations +#[derive(Debug, Clone, Serialize)] +pub struct SequenceFixInfo { + pub schema: String, + pub name: String, + pub data_type: String, + pub last_value: i64, + pub max_value: i64, + pub pct_used: f64, +} + +/// Get information about a specific sequence +pub async fn get_sequence_info( + client: &Client, + schema: &str, + name: &str, +) -> Result { + let query = r#" + SELECT + schemaname, + sequencename, + data_type::text as data_type, + COALESCE(last_value, 0) as last_value, + max_value, + CASE + WHEN increment_by > 0 AND max_value > 0 AND last_value IS NOT NULL + THEN round(100.0 * last_value / max_value, 2)::float8 + ELSE 0::float8 + END as pct_used + FROM pg_sequences + WHERE schemaname = $1 AND sequencename = $2 + "#; + + let row = client + .query_opt(query, &[&schema, &name]) + .await + .context("Failed to query sequence")? + .ok_or_else(|| anyhow::anyhow!("Sequence {}.{} not found", schema, name))?; + + Ok(SequenceFixInfo { + schema: row.get("schemaname"), + name: row.get("sequencename"), + data_type: row.get("data_type"), + last_value: row.get("last_value"), + max_value: row.get("max_value"), + pct_used: row.get("pct_used"), + }) +} + +/// Generate SQL for upgrading a sequence type +pub fn generate_upgrade_sql(schema: &str, name: &str, target_type: SequenceType) -> String { + format!( + "ALTER SEQUENCE {}.{} AS {};", + quote_ident(schema), + quote_ident(name), + target_type.to_sql() + ) +} + +/// Execute sequence upgrade +pub async fn execute_upgrade( + client: &Client, + schema: &str, + name: &str, + target_type: SequenceType, + dry_run: bool, +) -> Result { + // Get current state + let info = get_sequence_info(client, schema, name).await?; + + // Validate upgrade path + let current_type = SequenceType::from_str(&info.data_type) + .ok_or_else(|| anyhow::anyhow!("Unknown sequence type: {}", info.data_type))?; + + if target_type <= current_type { + bail!( + "Cannot downgrade sequence from {} to {}", + current_type.to_sql(), + target_type.to_sql() + ); + } + + let sql = generate_upgrade_sql(schema, name, target_type); + + if dry_run { + return Ok(FixResult { + executed: false, + success: true, + sql: vec![sql], + summary: format!( + "Would upgrade {}.{} from {} to {}", + schema, + name, + current_type.to_sql(), + target_type.to_sql() + ), + error: None, + verification: None, + }); + } + + // Execute the upgrade + match client.batch_execute(&sql).await { + Ok(_) => Ok(FixResult { + executed: true, + success: true, + sql: vec![sql], + summary: format!( + "Upgraded {}.{} from {} to {}", + schema, + name, + current_type.to_sql(), + target_type.to_sql() + ), + error: None, + verification: None, + }), + Err(e) => Ok(FixResult { + executed: true, + success: false, + sql: vec![sql], + summary: format!("Failed to upgrade {}.{}", schema, name), + error: Some(e.to_string()), + verification: None, + }), + } +} + +/// Get verification steps for sequence upgrade. +pub fn get_verify_steps(schema: &str, name: &str) -> Vec { + vec![VerifyStep { + description: format!("Verify {}.{} is no longer critical", schema, name), + command: "pgcrate sequences --all --json".to_string(), + expected: format!( + "$.data.sequences[?(@.name=='{}')].status != 'critical'", + name + ), + }] +} + +/// Create a structured action for sequence upgrade +pub fn create_upgrade_action( + info: &SequenceFixInfo, + target_type: SequenceType, + read_write: bool, + is_primary: bool, + confirmed: bool, +) -> StructuredAction { + let current_type = SequenceType::from_str(&info.data_type).unwrap_or(SequenceType::Integer); + + let action_id = format!( + "fix.sequence.upgrade-{}.{}.{}", + target_type.to_sql(), + info.schema, + info.name + ); + + let sql = generate_upgrade_sql(&info.schema, &info.name, target_type); + + let evidence = SequenceEvidence { + current_type: current_type.to_sql().to_string(), + target_type: target_type.to_sql().to_string(), + current_pct: info.pct_used, + last_value: info.last_value, + max_value: info.max_value, + upgrade_max_value: target_type.max_value(), + }; + + let verify_steps = get_verify_steps(&info.schema, &info.name); + + StructuredAction::builder(action_id, ActionType::Fix) + .command("pgcrate") + .args(vec![ + "fix".to_string(), + "sequence".to_string(), + format!("{}.{}", info.schema, info.name), + "--upgrade-to".to_string(), + target_type.to_sql().to_string(), + ]) + .description(format!( + "Upgrade sequence {}.{} from {} to {} (currently at {:.1}%)", + info.schema, + info.name, + current_type.to_sql(), + target_type.to_sql(), + info.pct_used + )) + .mutates(true) + .risk(Risk::Low) // ALTER SEQUENCE is non-blocking and safe + .gates(ActionGates::write_primary()) + .sql_preview(vec![sql]) + .evidence(serde_json::to_value(evidence).unwrap_or_default()) + .verify(verify_steps) + .build(read_write, is_primary, confirmed) +} + +/// Print fix result in human-readable format +pub fn print_human(result: &FixResult, quiet: bool) { + print_fix_result(result, quiet, None); +} + +/// Print fix result as JSON +pub fn print_json( + result: &FixResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{DiagnosticOutput, Severity}; + + let severity = if result.success { + Severity::Healthy + } else { + Severity::Error + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts("pgcrate.fix.sequence", result, severity, t), + None => DiagnosticOutput::new("pgcrate.fix.sequence", result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sequence_type_ordering() { + assert!(SequenceType::SmallInt < SequenceType::Integer); + assert!(SequenceType::Integer < SequenceType::BigInt); + } + + #[test] + fn test_sequence_type_from_str() { + assert_eq!( + SequenceType::from_str("smallint"), + Some(SequenceType::SmallInt) + ); + assert_eq!( + SequenceType::from_str("integer"), + Some(SequenceType::Integer) + ); + assert_eq!(SequenceType::from_str("int"), Some(SequenceType::Integer)); + assert_eq!(SequenceType::from_str("int4"), Some(SequenceType::Integer)); + assert_eq!(SequenceType::from_str("bigint"), Some(SequenceType::BigInt)); + assert_eq!(SequenceType::from_str("int8"), Some(SequenceType::BigInt)); + assert_eq!(SequenceType::from_str("unknown"), None); + } + + #[test] + fn test_generate_upgrade_sql() { + let sql = generate_upgrade_sql("public", "order_seq", SequenceType::BigInt); + assert_eq!(sql, "ALTER SEQUENCE \"public\".\"order_seq\" AS bigint;"); + } + + #[test] + fn test_generate_upgrade_sql_special_chars() { + let sql = generate_upgrade_sql("My Schema", "Order-Seq", SequenceType::BigInt); + assert_eq!(sql, "ALTER SEQUENCE \"My Schema\".\"Order-Seq\" AS bigint;"); + } +} diff --git a/src/commands/fix/vacuum.rs b/src/commands/fix/vacuum.rs new file mode 100644 index 0000000..56390ce --- /dev/null +++ b/src/commands/fix/vacuum.rs @@ -0,0 +1,390 @@ +//! Fix vacuum command: Trigger vacuum operations on tables. +//! +//! VACUUM reclaims storage from dead tuples and updates statistics. +//! Options: +//! - Regular VACUUM: Online, non-blocking +//! - VACUUM FREEZE: Also freezes old XIDs +//! - VACUUM FULL: Rewrites table, requires ACCESS EXCLUSIVE lock +//! - VACUUM ANALYZE: Also updates statistics + +use anyhow::{Context, Result}; +use serde::Serialize; +use tokio_postgres::Client; + +use super::common::{ + print_fix_result, ActionGates, ActionType, FixResult, Risk, StructuredAction, VerifyStep, +}; +use crate::sql::quote_ident; + +/// VACUUM options +#[derive(Debug, Clone, Default)] +pub struct VacuumOptions { + pub freeze: bool, + pub full: bool, + pub analyze: bool, +} + +#[allow(dead_code)] // Used by --include-fixes +impl VacuumOptions { + pub fn risk(&self) -> Risk { + if self.full { + Risk::High // ACCESS EXCLUSIVE lock + } else { + Risk::Low // Non-blocking + } + } + + pub fn to_sql_options(&self) -> String { + let mut opts = Vec::new(); + if self.freeze { + opts.push("FREEZE"); + } + if self.full { + opts.push("FULL"); + } + if self.analyze { + opts.push("ANALYZE"); + } + if opts.is_empty() { + String::new() + } else { + format!("({})", opts.join(", ")) + } + } +} + +/// Evidence for vacuum action +#[derive(Debug, Clone, Serialize)] +pub struct VacuumEvidence { + pub schema: String, + pub table: String, + pub dead_tuples: i64, + pub live_tuples: i64, + pub dead_pct: f64, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_vacuum: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_autovacuum: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_analyze: Option, + pub table_size: String, + pub table_size_bytes: i64, +} + +/// Get table vacuum status +pub async fn get_table_vacuum_info( + client: &Client, + schema: &str, + table: &str, +) -> Result { + let query = r#" + SELECT + schemaname, + relname, + n_dead_tup as dead_tuples, + n_live_tup as live_tuples, + CASE + WHEN n_live_tup + n_dead_tup = 0 THEN 0 + ELSE round(100.0 * n_dead_tup / (n_live_tup + n_dead_tup), 2)::float8 + END as dead_pct, + last_vacuum, + last_autovacuum, + last_analyze, + pg_size_pretty(pg_total_relation_size(relid)) as table_size, + pg_total_relation_size(relid) as table_size_bytes + FROM pg_stat_user_tables + WHERE schemaname = $1 AND relname = $2 + "#; + + let row = client + .query_opt(query, &[&schema, &table]) + .await + .context("Failed to query table stats")? + .ok_or_else(|| anyhow::anyhow!("Table {}.{} not found", schema, table))?; + + let last_vacuum: Option> = row.get("last_vacuum"); + let last_autovacuum: Option> = row.get("last_autovacuum"); + let last_analyze: Option> = row.get("last_analyze"); + + Ok(VacuumEvidence { + schema: row.get("schemaname"), + table: row.get("relname"), + dead_tuples: row.get("dead_tuples"), + live_tuples: row.get("live_tuples"), + dead_pct: row.get("dead_pct"), + last_vacuum: last_vacuum.map(|t| t.to_rfc3339()), + last_autovacuum: last_autovacuum.map(|t| t.to_rfc3339()), + last_analyze: last_analyze.map(|t| t.to_rfc3339()), + table_size: row.get("table_size"), + table_size_bytes: row.get("table_size_bytes"), + }) +} + +/// Generate SQL for vacuum operation +pub fn generate_vacuum_sql(schema: &str, table: &str, options: &VacuumOptions) -> String { + let opts = options.to_sql_options(); + if opts.is_empty() { + format!("VACUUM {}.{};", quote_ident(schema), quote_ident(table)) + } else { + format!( + "VACUUM {} {}.{};", + opts, + quote_ident(schema), + quote_ident(table) + ) + } +} + +/// Execute vacuum operation +pub async fn execute_vacuum( + client: &Client, + schema: &str, + table: &str, + options: &VacuumOptions, + dry_run: bool, +) -> Result { + // Get current state + let evidence = get_table_vacuum_info(client, schema, table).await?; + + let sql = generate_vacuum_sql(schema, table, options); + + let mode = if options.full { + "VACUUM FULL" + } else if options.freeze { + "VACUUM FREEZE" + } else if options.analyze { + "VACUUM ANALYZE" + } else { + "VACUUM" + }; + + if dry_run { + let warning = if options.full { + "\n\nWARNING: VACUUM FULL requires ACCESS EXCLUSIVE lock and will block all operations on the table." + } else { + "" + }; + + return Ok(FixResult { + executed: false, + success: true, + sql: vec![sql], + summary: format!( + "Would run {} on {}.{} ({} dead tuples, {:.1}%){}", + mode, schema, table, evidence.dead_tuples, evidence.dead_pct, warning + ), + error: None, + verification: None, + }); + } + + // Execute vacuum + // Note: VACUUM cannot run in a transaction, so we use batch_execute + match client.batch_execute(&sql).await { + Ok(_) => Ok(FixResult { + executed: true, + success: true, + sql: vec![sql], + summary: format!("{} completed on {}.{}", mode, schema, table), + error: None, + verification: None, + }), + Err(e) => Ok(FixResult { + executed: true, + success: false, + sql: vec![sql], + summary: format!("Failed to {} {}.{}", mode, schema, table), + error: Some(e.to_string()), + verification: None, + }), + } +} + +/// Get verification steps for vacuum. +/// Note: Vacuum verification is limited - we check overall status improved, +/// not the specific table, since JSONPath comparison operators are limited. +pub fn get_verify_steps() -> Vec { + vec![VerifyStep { + description: "Verify vacuum status is not critical".to_string(), + command: "pgcrate vacuum --json".to_string(), + expected: "$.data.overall_status != 'critical'".to_string(), + }] +} + +/// Create a structured action for vacuum +pub fn create_vacuum_action( + evidence: &VacuumEvidence, + options: &VacuumOptions, + read_write: bool, + is_primary: bool, + confirmed: bool, +) -> StructuredAction { + let mode = if options.full { + "full" + } else if options.freeze { + "freeze" + } else if options.analyze { + "analyze" + } else { + "regular" + }; + + let action_id = format!("fix.vacuum.{}.{}.{}", mode, evidence.schema, evidence.table); + + let sql = generate_vacuum_sql(&evidence.schema, &evidence.table, options); + let risk = options.risk(); + + // VACUUM FULL requires confirmation due to ACCESS EXCLUSIVE lock + let gates = if options.full { + ActionGates::write_primary_confirm() + } else { + ActionGates::write_primary() + }; + + let mut args = vec![ + "fix".to_string(), + "vacuum".to_string(), + format!("{}.{}", evidence.schema, evidence.table), + ]; + if options.freeze { + args.push("--freeze".to_string()); + } + if options.full { + args.push("--full".to_string()); + } + if options.analyze { + args.push("--analyze".to_string()); + } + + let verify_steps = get_verify_steps(); + + let mode_desc = if options.full { + "VACUUM FULL (requires ACCESS EXCLUSIVE lock)" + } else if options.freeze { + "VACUUM FREEZE" + } else if options.analyze { + "VACUUM ANALYZE" + } else { + "VACUUM" + }; + + StructuredAction::builder(action_id, ActionType::Fix) + .command("pgcrate") + .args(args) + .description(format!( + "{} on {}.{} ({} dead tuples, {:.1}% bloat)", + mode_desc, evidence.schema, evidence.table, evidence.dead_tuples, evidence.dead_pct + )) + .mutates(true) + .risk(risk) + .gates(gates) + .sql_preview(vec![sql]) + .evidence(serde_json::to_value(evidence).unwrap_or_default()) + .verify(verify_steps) + .build(read_write, is_primary, confirmed) +} + +/// Print fix result in human-readable format +pub fn print_human(result: &FixResult, quiet: bool) { + print_fix_result(result, quiet, None); +} + +/// Print fix result as JSON +pub fn print_json( + result: &FixResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{DiagnosticOutput, Severity}; + + let severity = if result.success { + Severity::Healthy + } else { + Severity::Error + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts("pgcrate.fix.vacuum", result, severity, t), + None => DiagnosticOutput::new("pgcrate.fix.vacuum", result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_vacuum_options_regular() { + let opts = VacuumOptions::default(); + assert_eq!(opts.to_sql_options(), ""); + assert_eq!(opts.risk(), Risk::Low); + } + + #[test] + fn test_vacuum_options_freeze() { + let opts = VacuumOptions { + freeze: true, + ..Default::default() + }; + assert_eq!(opts.to_sql_options(), "(FREEZE)"); + assert_eq!(opts.risk(), Risk::Low); + } + + #[test] + fn test_vacuum_options_full() { + let opts = VacuumOptions { + full: true, + ..Default::default() + }; + assert_eq!(opts.to_sql_options(), "(FULL)"); + assert_eq!(opts.risk(), Risk::High); + } + + #[test] + fn test_vacuum_options_analyze() { + let opts = VacuumOptions { + analyze: true, + ..Default::default() + }; + assert_eq!(opts.to_sql_options(), "(ANALYZE)"); + assert_eq!(opts.risk(), Risk::Low); + } + + #[test] + fn test_vacuum_options_multiple() { + let opts = VacuumOptions { + freeze: true, + full: true, + analyze: true, + }; + assert_eq!(opts.to_sql_options(), "(FREEZE, FULL, ANALYZE)"); + } + + #[test] + fn test_generate_vacuum_sql_regular() { + let opts = VacuumOptions::default(); + let sql = generate_vacuum_sql("public", "orders", &opts); + assert_eq!(sql, "VACUUM \"public\".\"orders\";"); + } + + #[test] + fn test_generate_vacuum_sql_full() { + let opts = VacuumOptions { + full: true, + ..Default::default() + }; + let sql = generate_vacuum_sql("public", "orders", &opts); + assert_eq!(sql, "VACUUM (FULL) \"public\".\"orders\";"); + } + + #[test] + fn test_generate_vacuum_sql_analyze() { + let opts = VacuumOptions { + analyze: true, + ..Default::default() + }; + let sql = generate_vacuum_sql("public", "orders", &opts); + assert_eq!(sql, "VACUUM (ANALYZE) \"public\".\"orders\";"); + } +} diff --git a/src/commands/fix/verify.rs b/src/commands/fix/verify.rs new file mode 100644 index 0000000..e91e8d0 --- /dev/null +++ b/src/commands/fix/verify.rs @@ -0,0 +1,443 @@ +//! Verification runner for fix commands. +//! +//! After a fix is executed, verification steps can be run to confirm the fix worked. +//! Each step specifies a command to run, and an expected result. + +use anyhow::{Context, Result}; +use std::process::Command; + +use super::common::{VerificationResult, VerifyStep, VerifyStepResult}; + +/// Run verification steps and return results. +/// +/// This executes each verification step by running the specified command +/// and checking if the output matches the expected condition. +pub fn run_verification(steps: &[VerifyStep]) -> VerificationResult { + let mut step_results = Vec::new(); + let mut all_passed = true; + + for step in steps { + let result = run_single_step(step); + if !result.passed { + all_passed = false; + } + step_results.push(result); + } + + VerificationResult { + passed: all_passed, + steps: step_results, + } +} + +/// Run a single verification step. +fn run_single_step(step: &VerifyStep) -> VerifyStepResult { + // Parse the command + let parts: Vec<&str> = step.command.split_whitespace().collect(); + if parts.is_empty() { + return VerifyStepResult { + description: step.description.clone(), + passed: false, + actual: None, + error: Some("Empty command".to_string()), + }; + } + + let program = parts[0]; + let args = &parts[1..]; + + // Execute the command + let output = match Command::new(program).args(args).output() { + Ok(o) => o, + Err(e) => { + return VerifyStepResult { + description: step.description.clone(), + passed: false, + actual: None, + error: Some(format!("Failed to execute command: {}", e)), + }; + } + }; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return VerifyStepResult { + description: step.description.clone(), + passed: false, + actual: None, + error: Some(format!("Command failed: {}", stderr)), + }; + } + + let stdout = String::from_utf8_lossy(&output.stdout); + + // Parse JSON output + let json: serde_json::Value = match serde_json::from_str(&stdout) { + Ok(v) => v, + Err(e) => { + return VerifyStepResult { + description: step.description.clone(), + passed: false, + actual: Some(stdout.to_string()), + error: Some(format!("Failed to parse JSON: {}", e)), + }; + } + }; + + // Evaluate the expected condition + match evaluate_condition(&json, &step.expected) { + Ok(passed) => VerifyStepResult { + description: step.description.clone(), + passed, + actual: Some(format_actual(&json, &step.expected)), + error: if passed { + None + } else { + Some(format!("Condition not met: {}", step.expected)) + }, + }, + Err(e) => VerifyStepResult { + description: step.description.clone(), + passed: false, + actual: Some(stdout.to_string()), + error: Some(format!("Failed to evaluate condition: {}", e)), + }, + } +} + +/// Find the outer comparison operator in a condition string. +/// Returns the operator and its position, skipping operators inside brackets. +fn find_outer_operator(s: &str) -> (Option<&'static str>, usize) { + let mut depth = 0; + let chars: Vec = s.chars().collect(); + + for i in 0..chars.len() { + match chars[i] { + '[' | '(' => depth += 1, + ']' | ')' => depth -= 1, + '!' if depth == 0 && i + 1 < chars.len() && chars[i + 1] == '=' => { + return (Some("!="), i); + } + '=' if depth == 0 && i + 1 < chars.len() && chars[i + 1] == '=' => { + return (Some("=="), i); + } + _ => {} + } + } + + (None, 0) +} + +/// Evaluate a simplified JSONPath-like condition. +/// +/// Supports basic patterns like: +/// - `$.data.field` - check field exists and is truthy +/// - `$.data.field == "value"` - equality check +/// - `$.data.field != "value"` - inequality check +/// - `$.data.array.length() == 0` - array length check +/// - `$.data.status != 'critical'` - status check +fn evaluate_condition(json: &serde_json::Value, condition: &str) -> Result { + // Parse the condition + let condition = condition.trim(); + + // Find the outer comparison operator (not inside brackets) + let (op, split_pos) = find_outer_operator(condition); + + match op { + Some("==") => { + let path = condition[..split_pos].trim(); + let expected = condition[split_pos + 2..] + .trim() + .trim_matches(|c| c == '\'' || c == '"'); + + // Handle .length() == N + if let Some(array_path) = path.strip_suffix(".length()") { + let value = get_json_path(json, array_path)?; + if let Some(arr) = value.as_array() { + let expected_len: usize = expected.parse().context("Invalid length")?; + return Ok(arr.len() == expected_len); + } + return Ok(false); + } + + let value = get_json_path(json, path)?; + Ok(value_matches(&value, expected)) + } + Some("!=") => { + let path = condition[..split_pos].trim(); + let expected = condition[split_pos + 2..] + .trim() + .trim_matches(|c| c == '\'' || c == '"'); + + // Handle .length() != N + if let Some(array_path) = path.strip_suffix(".length()") { + let value = get_json_path(json, array_path)?; + if let Some(arr) = value.as_array() { + let expected_len: usize = expected.parse().context("Invalid length")?; + return Ok(arr.len() != expected_len); + } + return Ok(true); // Not an array, so != holds + } + + let value = get_json_path(json, path)?; + Ok(!value_matches(&value, expected)) + } + _ => { + // Simple existence/truthy check + let value = get_json_path(json, condition)?; + Ok(!value.is_null() && value != serde_json::Value::Bool(false)) + } + } +} + +/// Get a value from JSON using a simplified JSONPath. +/// +/// Supports: +/// - `$.foo.bar` - object traversal +/// - `$.foo[0]` - array index +/// - `$.foo[?(@.field=='value')]` - simple array filter (first match) +fn get_json_path(json: &serde_json::Value, path: &str) -> Result { + let path = path.trim_start_matches("$."); + let path = path.trim_start_matches('$'); + + let mut current = json.clone(); + + for part in split_path(path) { + if part.is_empty() { + continue; + } + + // Check for array filter: [?(@.field=='value')] + if part.starts_with("[?(") && part.ends_with(")]") { + let filter = &part[3..part.len() - 2]; // Extract @.field=='value' + current = apply_array_filter(¤t, filter)?; + continue; + } + + // Check for array index only: [0] + if part.starts_with('[') && part.ends_with(']') { + let idx_str = &part[1..part.len() - 1]; + let idx: usize = idx_str.parse().context("Invalid array index")?; + if let Some(arr) = current.as_array() { + current = arr.get(idx).cloned().unwrap_or(serde_json::Value::Null); + } else { + return Ok(serde_json::Value::Null); + } + continue; + } + + // Check for field[index] pattern (e.g., items[0]) + if let Some(bracket_idx) = part.find('[') { + let field_name = &part[..bracket_idx]; + let bracket_part = &part[bracket_idx..]; + + // First access the field + if let Some(obj) = current.as_object() { + current = obj + .get(field_name) + .cloned() + .unwrap_or(serde_json::Value::Null); + } else { + return Ok(serde_json::Value::Null); + } + + // Then apply the bracket operation + if bracket_part.starts_with("[?(") && bracket_part.ends_with(")]") { + let filter = &bracket_part[3..bracket_part.len() - 2]; + current = apply_array_filter(¤t, filter)?; + } else if bracket_part.starts_with('[') && bracket_part.ends_with(']') { + let idx_str = &bracket_part[1..bracket_part.len() - 1]; + let idx: usize = idx_str.parse().context("Invalid array index")?; + if let Some(arr) = current.as_array() { + current = arr.get(idx).cloned().unwrap_or(serde_json::Value::Null); + } else { + return Ok(serde_json::Value::Null); + } + } + continue; + } + + // Object field access + if let Some(obj) = current.as_object() { + current = obj.get(part).cloned().unwrap_or(serde_json::Value::Null); + } else { + return Ok(serde_json::Value::Null); + } + } + + Ok(current) +} + +/// Split a path by dots, respecting brackets. +fn split_path(path: &str) -> Vec<&str> { + let mut parts = Vec::new(); + let mut start = 0; + let mut depth = 0; + + for (i, c) in path.char_indices() { + match c { + '[' => depth += 1, + ']' => depth -= 1, + '.' if depth == 0 => { + if start < i { + parts.push(&path[start..i]); + } + start = i + 1; + } + _ => {} + } + } + + if start < path.len() { + parts.push(&path[start..]); + } + + parts +} + +/// Apply a simple array filter like @.field=='value' +fn apply_array_filter(json: &serde_json::Value, filter: &str) -> Result { + let arr = json.as_array().context("Expected array for filter")?; + + // Parse filter: @.field=='value' or @.field=='value' && @.other=='val2' + // For simplicity, we only support single conditions with == for now + let filter = filter.trim_start_matches('@'); + + if let Some((field_path, value)) = filter.split_once("==") { + let field_path = field_path.trim().trim_start_matches('.'); + let value = value.trim().trim_matches(|c| c == '\'' || c == '"'); + + for item in arr { + let item_value = get_json_path(item, field_path)?; + if value_matches(&item_value, value) { + return Ok(item.clone()); + } + } + } + + Ok(serde_json::Value::Null) +} + +/// Check if a JSON value matches an expected string. +fn value_matches(value: &serde_json::Value, expected: &str) -> bool { + match value { + serde_json::Value::String(s) => s == expected, + serde_json::Value::Number(n) => n.to_string() == expected, + serde_json::Value::Bool(b) => (*b && expected == "true") || (!*b && expected == "false"), + serde_json::Value::Null => expected == "null", + _ => false, + } +} + +/// Format the actual value for display based on the condition type. +fn format_actual(json: &serde_json::Value, condition: &str) -> String { + // Try to extract the relevant part of the JSON for display + if let Some((path, _)) = condition + .split_once("==") + .or_else(|| condition.split_once("!=")) + { + let path = path.trim(); + if let Ok(value) = get_json_path(json, path) { + return serde_json::to_string_pretty(&value).unwrap_or_else(|_| "?".to_string()); + } + } + serde_json::to_string_pretty(json).unwrap_or_else(|_| "Failed to format".to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_evaluate_equality() { + let json: serde_json::Value = serde_json::json!({ + "data": { + "status": "healthy", + "count": 5 + } + }); + + assert!(evaluate_condition(&json, "$.data.status == 'healthy'").unwrap()); + assert!(!evaluate_condition(&json, "$.data.status == 'critical'").unwrap()); + assert!(evaluate_condition(&json, "$.data.count == '5'").unwrap()); + } + + #[test] + fn test_evaluate_inequality() { + let json: serde_json::Value = serde_json::json!({ + "data": { + "status": "healthy" + } + }); + + assert!(evaluate_condition(&json, "$.data.status != 'critical'").unwrap()); + assert!(!evaluate_condition(&json, "$.data.status != 'healthy'").unwrap()); + } + + #[test] + fn test_evaluate_array_length() { + let json: serde_json::Value = serde_json::json!({ + "data": { + "items": [1, 2, 3], + "empty": [] + } + }); + + assert!(evaluate_condition(&json, "$.data.items.length() == 3").unwrap()); + assert!(evaluate_condition(&json, "$.data.empty.length() == 0").unwrap()); + assert!(evaluate_condition(&json, "$.data.items.length() != 0").unwrap()); + } + + #[test] + fn test_evaluate_array_filter() { + let json: serde_json::Value = serde_json::json!({ + "data": { + "sequences": [ + {"name": "seq1", "status": "healthy"}, + {"name": "seq2", "status": "critical"} + ] + } + }); + + // Check that a filtered sequence has the expected status + assert!(evaluate_condition( + &json, + "$.data.sequences[?(@.name=='seq1')].status == 'healthy'" + ) + .unwrap()); + + assert!(evaluate_condition( + &json, + "$.data.sequences[?(@.name=='seq2')].status != 'healthy'" + ) + .unwrap()); + } + + #[test] + fn test_split_path() { + assert_eq!(split_path("foo.bar.baz"), vec!["foo", "bar", "baz"]); + assert_eq!(split_path("foo[0].bar"), vec!["foo[0]", "bar"]); + assert_eq!( + split_path("foo[?(@.x=='y')].bar"), + vec!["foo[?(@.x=='y')]", "bar"] + ); + } + + #[test] + fn test_get_json_path() { + let json: serde_json::Value = serde_json::json!({ + "data": { + "items": [ + {"name": "a"}, + {"name": "b"} + ] + } + }); + + let result = get_json_path(&json, "$.data.items[0].name").unwrap(); + assert_eq!(result, serde_json::Value::String("a".to_string())); + + let result = get_json_path(&json, "data.items[1].name").unwrap(); + assert_eq!(result, serde_json::Value::String("b".to_string())); + } +} diff --git a/src/commands/indexes.rs b/src/commands/indexes.rs index a61a40c..f1b89c3 100644 --- a/src/commands/indexes.rs +++ b/src/commands/indexes.rs @@ -40,6 +40,18 @@ pub struct UnusedIndex { pub is_unique: bool, /// Whether this is a primary key (keep for data integrity) pub is_primary: bool, + /// Whether this index is used as replica identity for logical replication + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub is_replica_identity: bool, + /// Name of constraint this index backs (if any) + #[serde(skip_serializing_if = "Option::is_none")] + pub backing_constraint: Option, + /// When stats were last reset (for confidence in usage data) + #[serde(skip_serializing_if = "Option::is_none")] + pub stats_since: Option, + /// Days since stats reset + #[serde(skip_serializing_if = "Option::is_none")] + pub stats_age_days: Option, } /// Indexes that cover the same columns @@ -131,6 +143,24 @@ pub async fn get_missing_index_candidates( /// Get indexes that haven't been used since stats reset pub async fn get_unused_indexes(client: &Client, limit: usize) -> Result> { + // Get stats reset time first + let stats_query = r#" + SELECT + stats_reset, + (EXTRACT(EPOCH FROM (now() - stats_reset)) / 86400)::int as days_old + FROM pg_stat_database + WHERE datname = current_database() + "#; + + let (stats_since, stats_age_days) = match client.query_opt(stats_query, &[]).await? { + Some(stats_row) => { + let reset: Option> = stats_row.get("stats_reset"); + let days: Option = stats_row.get("days_old"); + (reset.map(|r| r.to_rfc3339()), days) + } + None => (None, None), + }; + let query = r#" SELECT s.schemaname, @@ -140,9 +170,12 @@ pub async fn get_unused_indexes(client: &Client, limit: usize) -> Result 0 ORDER BY pg_relation_size(s.indexrelid) DESC @@ -162,6 +195,10 @@ pub async fn get_unused_indexes(client: &Client, limit: usize) -> Result = result .unused .iter() - .filter(|u| !u.is_primary && !u.is_unique) + .filter(|u| { + !u.is_primary + && !u.is_unique + && !u.is_replica_identity + && u.backing_constraint.is_none() + }) .collect(); if !droppable.is_empty() { @@ -477,7 +532,12 @@ pub fn print_human(result: &IndexesResult, verbose: bool) { let droppable_unused: Vec<_> = result .unused .iter() - .filter(|u| !u.is_primary && !u.is_unique) + .filter(|u| { + !u.is_primary + && !u.is_unique + && !u.is_replica_identity + && u.backing_constraint.is_none() + }) .collect(); if !droppable_unused.is_empty() || !result.duplicates.is_empty() { diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 64e7f08..6d5a7aa 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -9,6 +9,7 @@ pub mod context; mod db; mod doctor; mod extension; +pub mod fix; pub mod indexes; pub mod locks; mod migrations; @@ -20,6 +21,7 @@ pub mod sequences; mod snapshot; mod sql_cmd; pub mod triage; +pub mod vacuum; pub mod xid; // Re-export snapshot commands from new module diff --git a/src/commands/triage.rs b/src/commands/triage.rs index a0fb572..55e4302 100644 --- a/src/commands/triage.rs +++ b/src/commands/triage.rs @@ -106,10 +106,21 @@ pub struct TriageResults { pub skipped_checks: Vec, /// Worst status across all checks pub overall_status: CheckStatus, + /// Structured fix actions (when --include-fixes is used) + #[serde(skip_serializing_if = "Option::is_none")] + pub actions: Option>, } impl TriageResults { - pub fn new(mut checks: Vec, skipped_checks: Vec) -> Self { + pub fn new(checks: Vec, skipped_checks: Vec) -> Self { + Self::with_actions(checks, skipped_checks, None) + } + + pub fn with_actions( + mut checks: Vec, + skipped_checks: Vec, + actions: Option>, + ) -> Self { // Sort by severity: critical first, then warning, then healthy checks.sort_by_key(|c| match c.status { CheckStatus::Critical => 0, @@ -132,6 +143,7 @@ impl TriageResults { checks, skipped_checks, overall_status, + actions, } } @@ -754,6 +766,130 @@ pub fn print_human(results: &TriageResults, quiet: bool) { } } +/// Generate structured fix actions from triage findings. +/// +/// This is called when --include-fixes is specified. It queries for detailed +/// information about critical/warning findings and generates actionable fixes. +pub async fn generate_fix_actions( + client: &Client, + results: &TriageResults, + read_write: bool, + is_primary: bool, +) -> Vec { + use super::fix::index::{create_drop_action, get_index_info}; + use super::fix::sequence::{create_upgrade_action, get_sequence_info, SequenceType}; + use super::fix::vacuum::{create_vacuum_action, get_table_vacuum_info, VacuumOptions}; + + let mut actions = Vec::new(); + + // Generate actions for sequence issues + if let Some(check) = results.checks.iter().find(|c| c.name == "sequences") { + if check.status == CheckStatus::Critical || check.status == CheckStatus::Warning { + // Query for critical sequences + let query = r#" + SELECT schemaname, sequencename + FROM pg_sequences + WHERE last_value IS NOT NULL + AND CASE + WHEN increment_by > 0 THEN (last_value::numeric / max_value::numeric * 100)::int + ELSE ((min_value::numeric - last_value::numeric) / (min_value::numeric - max_value::numeric) * 100)::int + END > 70 + ORDER BY CASE + WHEN increment_by > 0 THEN (last_value::numeric / max_value::numeric * 100)::int + ELSE ((min_value::numeric - last_value::numeric) / (min_value::numeric - max_value::numeric) * 100)::int + END DESC + LIMIT 5 + "#; + + if let Ok(rows) = client.query(query, &[]).await { + for row in rows { + let schema: String = row.get("schemaname"); + let name: String = row.get("sequencename"); + + if let Ok(info) = get_sequence_info(client, &schema, &name).await { + // Only suggest upgrade if not already bigint + if info.data_type != "bigint" { + let action = create_upgrade_action( + &info, + SequenceType::BigInt, + read_write, + is_primary, + false, // Not confirmed + ); + actions.push(action); + } + } + } + } + } + } + + // Generate actions for unused indexes + // Query for indexes with 0 scans since stats reset + let unused_indexes_query = r#" + SELECT + schemaname, + indexrelname as index_name + FROM pg_stat_user_indexes + WHERE idx_scan = 0 + AND schemaname NOT IN ('pg_catalog', 'information_schema') + ORDER BY pg_relation_size(indexrelid) DESC + LIMIT 5 + "#; + + if let Ok(rows) = client.query(unused_indexes_query, &[]).await { + for row in rows { + let schema: String = row.get("schemaname"); + let index_name: String = row.get("index_name"); + + if let Ok(evidence) = get_index_info(client, &schema, &index_name).await { + let action = create_drop_action(&evidence, read_write, is_primary, false); + actions.push(action); + } + } + } + + // Generate actions for tables needing vacuum + // Query for tables with high dead tuple percentage + let vacuum_query = r#" + SELECT + schemaname, + relname as table_name, + n_dead_tup, + n_live_tup, + CASE + WHEN n_live_tup + n_dead_tup > 0 THEN + (n_dead_tup::float / (n_live_tup + n_dead_tup) * 100)::int + ELSE 0 + END as dead_pct + FROM pg_stat_user_tables + WHERE n_dead_tup > 10000 + AND CASE + WHEN n_live_tup + n_dead_tup > 0 THEN + (n_dead_tup::float / (n_live_tup + n_dead_tup) * 100) + ELSE 0 + END > 15 + ORDER BY n_dead_tup DESC + LIMIT 5 + "#; + + if let Ok(rows) = client.query(vacuum_query, &[]).await { + for row in rows { + let schema: String = row.get("schemaname"); + let table_name: String = row.get("table_name"); + + if let Ok(evidence) = get_table_vacuum_info(client, &schema, &table_name).await { + let options = VacuumOptions::default(); + let action = + create_vacuum_action(&evidence, &options, read_write, is_primary, false); + actions.push(action); + } + } + } + + actions +} + /// Print triage results as JSON with schema versioning. pub fn print_json( results: &TriageResults, diff --git a/src/commands/vacuum.rs b/src/commands/vacuum.rs new file mode 100644 index 0000000..9a759ee --- /dev/null +++ b/src/commands/vacuum.rs @@ -0,0 +1,482 @@ +//! Vacuum command: Monitor table bloat and vacuum health. +//! +//! Tables accumulate dead tuples from updates and deletes. VACUUM reclaims +//! this space. This command identifies tables needing vacuum attention. +//! +//! Two modes: +//! - Heuristic mode (always works): Uses pg_stat_user_tables dead tuple counts +//! - Full mode (requires pgstattuple): Accurate bloat measurement + +use anyhow::Result; +use serde::Serialize; +use tokio_postgres::Client; + +/// Default thresholds for vacuum warnings +const DEFAULT_WARNING_PCT: f64 = 10.0; +const DEFAULT_CRITICAL_PCT: f64 = 25.0; +const CRITICAL_DEAD_TUPLES: i64 = 1_000_000; + +/// Table vacuum status +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum VacuumStatus { + Healthy, + Warning, + Critical, +} + +impl VacuumStatus { + pub fn from_dead_pct(pct: f64, dead_tuples: i64) -> Self { + if pct >= DEFAULT_CRITICAL_PCT || dead_tuples >= CRITICAL_DEAD_TUPLES { + VacuumStatus::Critical + } else if pct >= DEFAULT_WARNING_PCT { + VacuumStatus::Warning + } else { + VacuumStatus::Healthy + } + } + + pub fn emoji(&self) -> &'static str { + match self { + VacuumStatus::Healthy => "✓", + VacuumStatus::Warning => "⚠", + VacuumStatus::Critical => "✗", + } + } +} + +/// Information about a table's vacuum state +#[derive(Debug, Clone, Serialize)] +pub struct TableVacuumInfo { + pub schema: String, + pub table: String, + pub dead_tuples: i64, + pub live_tuples: i64, + pub dead_pct: f64, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_vacuum: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_autovacuum: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_analyze: Option, + pub table_size: String, + pub table_size_bytes: i64, + pub status: VacuumStatus, + /// Bloat estimate from pgstattuple (if available) + #[serde(skip_serializing_if = "Option::is_none")] + pub bloat_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub bloat_pct: Option, + /// Method used to estimate bloat + pub estimate_method: EstimateMethod, +} + +/// How bloat was estimated +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum EstimateMethod { + /// Dead tuple count from pg_stat_user_tables + Heuristic, + /// Accurate measurement from pgstattuple extension + Pgstattuple, +} + +/// Full vacuum analysis results +#[derive(Debug, Serialize)] +pub struct VacuumResult { + pub tables: Vec, + pub overall_status: VacuumStatus, + /// Whether pgstattuple is available for accurate measurements + pub pgstattuple_available: bool, + /// Stats reset time (for confidence) + #[serde(skip_serializing_if = "Option::is_none")] + pub stats_since: Option, +} + +/// Check if pgstattuple extension is available +async fn check_pgstattuple(client: &Client) -> bool { + let query = r#" + SELECT EXISTS( + SELECT 1 FROM pg_extension WHERE extname = 'pgstattuple' + ) + "#; + + match client.query_one(query, &[]).await { + Ok(row) => row.get::<_, bool>(0), + Err(_) => false, + } +} + +/// Get stats reset time +async fn get_stats_since(client: &Client) -> Option { + let query = r#" + SELECT stats_reset + FROM pg_stat_database + WHERE datname = current_database() + "#; + + match client.query_opt(query, &[]).await { + Ok(Some(row)) => { + let reset: Option> = row.get("stats_reset"); + reset.map(|r| r.to_rfc3339()) + } + _ => None, + } +} + +/// Get table vacuum info using heuristic method (pg_stat_user_tables) +async fn get_tables_heuristic( + client: &Client, + schema_filter: Option<&str>, + table_filter: Option<&str>, + threshold: f64, +) -> Result> { + let mut query = String::from( + r#" + SELECT + schemaname, + relname, + n_dead_tup as dead_tuples, + n_live_tup as live_tuples, + CASE + WHEN n_live_tup + n_dead_tup = 0 THEN 0 + ELSE round(100.0 * n_dead_tup / (n_live_tup + n_dead_tup), 2)::float8 + END as dead_pct, + last_vacuum, + last_autovacuum, + last_analyze, + pg_size_pretty(pg_total_relation_size(relid)) as table_size, + pg_total_relation_size(relid) as table_size_bytes + FROM pg_stat_user_tables + WHERE 1=1 + "#, + ); + + let mut params: Vec> = Vec::new(); + + if let Some(schema) = schema_filter { + params.push(Box::new(schema.to_string())); + query.push_str(&format!(" AND schemaname = ${}", params.len())); + } + + if let Some(table) = table_filter { + params.push(Box::new(table.to_string())); + query.push_str(&format!(" AND relname = ${}", params.len())); + } + + // Only include tables with some dead tuples or matching filter + if schema_filter.is_none() && table_filter.is_none() { + params.push(Box::new(threshold)); + query.push_str(&format!( + " AND (n_dead_tup > 0 OR CASE WHEN n_live_tup + n_dead_tup = 0 THEN 0::float8 ELSE (100.0 * n_dead_tup / (n_live_tup + n_dead_tup))::float8 END >= ${})", + params.len() + )); + } + + query.push_str(" ORDER BY dead_pct DESC, dead_tuples DESC LIMIT 50"); + + let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = + params.iter().map(|p| p.as_ref()).collect(); + + let rows = client.query(&query, ¶m_refs).await?; + let mut results = Vec::new(); + + for row in rows { + let dead_tuples: i64 = row.get("dead_tuples"); + let dead_pct: f64 = row.get("dead_pct"); + let status = VacuumStatus::from_dead_pct(dead_pct, dead_tuples); + + let last_vacuum: Option> = row.get("last_vacuum"); + let last_autovacuum: Option> = row.get("last_autovacuum"); + let last_analyze: Option> = row.get("last_analyze"); + + results.push(TableVacuumInfo { + schema: row.get("schemaname"), + table: row.get("relname"), + dead_tuples, + live_tuples: row.get("live_tuples"), + dead_pct, + last_vacuum: last_vacuum.map(|t| t.to_rfc3339()), + last_autovacuum: last_autovacuum.map(|t| t.to_rfc3339()), + last_analyze: last_analyze.map(|t| t.to_rfc3339()), + table_size: row.get("table_size"), + table_size_bytes: row.get("table_size_bytes"), + status, + bloat_bytes: None, + bloat_pct: None, + estimate_method: EstimateMethod::Heuristic, + }); + } + + Ok(results) +} + +/// Get accurate bloat info using pgstattuple (for a specific table) +async fn get_table_pgstattuple( + client: &Client, + schema: &str, + table: &str, +) -> Result> { + // pgstattuple returns detailed tuple-level statistics + let query = format!( + r#" + SELECT + dead_tuple_len, + CASE + WHEN table_len = 0 THEN 0 + ELSE round(100.0 * dead_tuple_len / table_len, 2)::float8 + END as bloat_pct + FROM pgstattuple('"{}"."{}"') + "#, + schema.replace('"', "\"\""), + table.replace('"', "\"\"") + ); + + match client.query_opt(&query, &[]).await { + Ok(Some(row)) => { + let dead_len: i64 = row.get("dead_tuple_len"); + let bloat_pct: f64 = row.get("bloat_pct"); + Ok(Some((dead_len, bloat_pct))) + } + Ok(None) => Ok(None), + Err(_) => Ok(None), // pgstattuple may fail for some tables + } +} + +/// Run vacuum analysis +pub async fn run_vacuum( + client: &Client, + schema: Option<&str>, + table: Option<&str>, + threshold: Option, +) -> Result { + let threshold = threshold.unwrap_or(DEFAULT_WARNING_PCT); + let pgstattuple_available = check_pgstattuple(client).await; + let stats_since = get_stats_since(client).await; + + let mut tables = get_tables_heuristic(client, schema, table, threshold).await?; + + // If pgstattuple is available and we're looking at specific tables, get accurate info + if pgstattuple_available && (schema.is_some() || table.is_some()) { + for t in &mut tables { + if let Ok(Some((bloat_bytes, bloat_pct))) = + get_table_pgstattuple(client, &t.schema, &t.table).await + { + t.bloat_bytes = Some(bloat_bytes); + t.bloat_pct = Some(bloat_pct); + t.estimate_method = EstimateMethod::Pgstattuple; + // Update status based on more accurate bloat measurement + t.status = VacuumStatus::from_dead_pct(bloat_pct, t.dead_tuples); + } + } + } + + let overall_status = tables + .iter() + .map(|t| &t.status) + .max_by_key(|s| match s { + VacuumStatus::Healthy => 0, + VacuumStatus::Warning => 1, + VacuumStatus::Critical => 2, + }) + .cloned() + .unwrap_or(VacuumStatus::Healthy); + + Ok(VacuumResult { + tables, + overall_status, + pgstattuple_available, + stats_since, + }) +} + +/// Format number for display +fn format_number(n: i64) -> String { + if n >= 1_000_000_000 { + format!("{:.1}B", n as f64 / 1_000_000_000.0) + } else if n >= 1_000_000 { + format!("{:.1}M", n as f64 / 1_000_000.0) + } else if n >= 1_000 { + format!("{:.1}K", n as f64 / 1_000.0) + } else { + format!("{}", n) + } +} + +/// Print vacuum analysis in human-readable format +pub fn print_human(result: &VacuumResult, quiet: bool) { + if result.tables.is_empty() { + if !quiet { + println!("No tables need vacuum attention (below threshold)."); + if !result.pgstattuple_available { + println!(); + println!( + "Note: Install pgstattuple extension for more accurate bloat measurements." + ); + } + } + return; + } + + println!("VACUUM STATUS:"); + println!(); + + // Header + println!( + " {:3} {:40} {:>10} {:>10} {:>8} {:>10}", + "", "TABLE", "DEAD", "LIVE", "DEAD %", "SIZE" + ); + println!(" {}", "-".repeat(86)); + + for t in &result.tables { + let full_name = format!("{}.{}", t.schema, t.table); + println!( + " {} {:40} {:>10} {:>10} {:>7.1}% {:>10}", + t.status.emoji(), + if full_name.len() > 40 { + format!("{}...", &full_name[..37]) + } else { + full_name + }, + format_number(t.dead_tuples), + format_number(t.live_tuples), + t.dead_pct, + t.table_size + ); + + // Show pgstattuple data if available + if let (Some(bloat_bytes), Some(bloat_pct)) = (t.bloat_bytes, t.bloat_pct) { + println!( + " (pgstattuple: {:.1}% bloat, {} bytes)", + bloat_pct, bloat_bytes + ); + } + } + + // Summary + let warning_count = result + .tables + .iter() + .filter(|t| t.status == VacuumStatus::Warning) + .count(); + let critical_count = result + .tables + .iter() + .filter(|t| t.status == VacuumStatus::Critical) + .count(); + + if warning_count > 0 || critical_count > 0 { + println!(); + if critical_count > 0 { + println!( + " ✗ {} tables CRITICAL (>{:.0}% or >{} dead tuples)", + critical_count, DEFAULT_CRITICAL_PCT, CRITICAL_DEAD_TUPLES + ); + } + if warning_count > 0 { + println!( + " ⚠ {} tables WARNING (>{:.0}%)", + warning_count, DEFAULT_WARNING_PCT + ); + } + } + + // Recommendations + let critical_tables: Vec<_> = result + .tables + .iter() + .filter(|t| t.status == VacuumStatus::Critical) + .collect(); + + if !critical_tables.is_empty() { + println!(); + println!("RECOMMENDED ACTIONS:"); + println!(); + for t in critical_tables.iter().take(3) { + println!(" VACUUM {}.{};", t.schema, t.table); + } + if critical_tables.len() > 3 { + println!(" ... and {} more", critical_tables.len() - 3); + } + } + + if !result.pgstattuple_available && !quiet { + println!(); + println!("Note: Install pgstattuple for accurate bloat measurements:"); + println!(" CREATE EXTENSION pgstattuple;"); + } +} + +/// Print vacuum analysis as JSON with schema versioning. +pub fn print_json( + result: &VacuumResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{schema, DiagnosticOutput, Severity}; + + let severity = match result.overall_status { + VacuumStatus::Healthy => Severity::Healthy, + VacuumStatus::Warning => Severity::Warning, + VacuumStatus::Critical => Severity::Critical, + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts(schema::VACUUM, result, severity, t), + None => DiagnosticOutput::new(schema::VACUUM, result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_vacuum_status_healthy() { + assert_eq!( + VacuumStatus::from_dead_pct(5.0, 1000), + VacuumStatus::Healthy + ); + } + + #[test] + fn test_vacuum_status_warning() { + assert_eq!( + VacuumStatus::from_dead_pct(15.0, 50000), + VacuumStatus::Warning + ); + } + + #[test] + fn test_vacuum_status_critical_pct() { + assert_eq!( + VacuumStatus::from_dead_pct(30.0, 50000), + VacuumStatus::Critical + ); + } + + #[test] + fn test_vacuum_status_critical_count() { + // Even with low percentage, high absolute count triggers critical + assert_eq!( + VacuumStatus::from_dead_pct(5.0, 2_000_000), + VacuumStatus::Critical + ); + } + + #[test] + fn test_format_number_millions() { + assert_eq!(format_number(1_500_000), "1.5M"); + } + + #[test] + fn test_format_number_thousands() { + assert_eq!(format_number(5_500), "5.5K"); + } + + #[test] + fn test_format_number_small() { + assert_eq!(format_number(500), "500"); + } +} diff --git a/src/main.rs b/src/main.rs index 10af13f..d6cbcba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -70,11 +70,13 @@ fn json_supported(command: &Commands) -> bool { Commands::Describe { .. } => true, Commands::Diff { .. } => true, Commands::Doctor { .. } => true, - Commands::Triage => true, + Commands::Triage { .. } => true, Commands::Locks { .. } => true, Commands::Xid { .. } => true, Commands::Sequences { .. } => true, Commands::Indexes { .. } => true, + Commands::Vacuum { .. } => true, + Commands::Fix { .. } => true, Commands::Context => true, Commands::Capabilities => true, Commands::Sql { .. } => true, @@ -285,7 +287,11 @@ enum Commands { strict: bool, }, /// Quick database health triage (locks, transactions, XID, sequences, connections) - Triage, + Triage { + /// Include structured fix actions in output + #[arg(long)] + include_fixes: bool, + }, /// Inspect blocking locks and long transactions Locks { /// Show only blocking chains @@ -334,6 +340,20 @@ enum Commands { #[arg(long, default_value = "20")] unused_limit: usize, }, + /// Monitor table bloat and vacuum health + Vacuum { + /// Filter to specific table (schema.table) + #[arg(long, value_name = "TABLE")] + table: Option, + /// Warning threshold percentage (default: 10) + #[arg(long, value_name = "PCT")] + threshold: Option, + }, + /// Fix commands for remediation + Fix { + #[command(subcommand)] + command: FixCommands, + }, /// Show connection context, server info, extensions, and privileges Context, /// Show available capabilities based on privileges and connection mode @@ -743,6 +763,65 @@ enum SeedCommands { }, } +#[derive(Subcommand)] +enum FixCommands { + /// Upgrade sequence type to prevent exhaustion + Sequence { + /// Sequence to upgrade (schema.sequence) + sequence: String, + /// Target type to upgrade to (integer, bigint) + #[arg(long, value_name = "TYPE")] + upgrade_to: String, + /// Show what would be done without executing + #[arg(long)] + dry_run: bool, + /// Confirm execution (required for fixes) + #[arg(long)] + yes: bool, + /// Run verification after fix + #[arg(long)] + verify: bool, + }, + /// Drop unused or duplicate index + Index { + /// Index to drop (schema.index) + #[arg(long, value_name = "INDEX")] + drop: String, + /// Show what would be done without executing + #[arg(long)] + dry_run: bool, + /// Confirm execution (required for fixes) + #[arg(long)] + yes: bool, + /// Run verification after fix + #[arg(long)] + verify: bool, + }, + /// Run vacuum on a table + Vacuum { + /// Table to vacuum (schema.table) + table: String, + /// Include FREEZE option + #[arg(long)] + freeze: bool, + /// Include FULL option (requires ACCESS EXCLUSIVE lock) + #[arg(long)] + full: bool, + /// Include ANALYZE option + #[arg(long)] + analyze: bool, + /// Show what would be done without executing + #[arg(long)] + dry_run: bool, + /// Confirm execution (required for VACUUM FULL) + #[arg(long)] + yes: bool, + /// Run verification after fix + #[arg(long)] + verify: bool, + }, +} + #[tokio::main] async fn main() { // Load .env file if present (before parsing CLI so env vars are available) @@ -1130,7 +1209,7 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { std::process::exit(exit_code); } } - Commands::Triage => { + Commands::Triage { ref include_fixes } => { let config = Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; let conn_result = connection::resolve_and_validate( @@ -1155,7 +1234,22 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { eprintln!("pgcrate: timeouts: {}", session.effective_timeouts()); } - let results = commands::triage::run_triage(session.client()).await; + let mut results = commands::triage::run_triage(session.client()).await; + + // Generate fix actions when --include-fixes is specified + if *include_fixes { + let actions = commands::triage::generate_fix_actions( + session.client(), + &results, + cli.read_write, + cli.allow_primary, + ) + .await; + + if !actions.is_empty() { + results.actions = Some(actions); + } + } if cli.json { commands::triage::print_json(&results, Some(session.effective_timeouts()))?; @@ -1168,6 +1262,256 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { std::process::exit(exit_code); } } + Commands::Vacuum { + ref table, + threshold, + } => { + let config = + Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; + let conn_result = connection::resolve_and_validate( + &config, + cli.database_url.as_deref(), + cli.connection.as_deref(), + cli.env_var.as_deref(), + cli.allow_primary, + cli.read_write, + cli.quiet, + )?; + + // Use DiagnosticSession with timeout enforcement + let timeout_config = parse_timeout_config(&cli)?; + let session = DiagnosticSession::connect(&conn_result.url, timeout_config).await?; + + // Set up Ctrl+C handler to cancel queries gracefully + setup_ctrlc_handler(session.cancel_token()); + + // Show effective timeouts unless quiet + if !cli.quiet && !cli.json { + eprintln!("pgcrate: timeouts: {}", session.effective_timeouts()); + } + + // Parse table filter if provided + let (schema_filter, table_filter) = if let Some(ref t) = table { + if let Some((s, tbl)) = t.split_once('.') { + (Some(s), Some(tbl)) + } else { + (None, Some(t.as_str())) + } + } else { + (None, None) + }; + + let result = commands::vacuum::run_vacuum( + session.client(), + schema_filter, + table_filter, + threshold, + ) + .await?; + + if cli.json { + commands::vacuum::print_json(&result, Some(session.effective_timeouts()))?; + } else { + commands::vacuum::print_human(&result, cli.quiet); + } + + // Exit with appropriate code + match result.overall_status { + commands::vacuum::VacuumStatus::Critical => std::process::exit(2), + commands::vacuum::VacuumStatus::Warning => std::process::exit(1), + commands::vacuum::VacuumStatus::Healthy => {} + } + } + Commands::Fix { ref command } => { + let config = + Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; + + // Fix commands require read-write access + let conn_result = connection::resolve_and_validate( + &config, + cli.database_url.as_deref(), + cli.connection.as_deref(), + cli.env_var.as_deref(), + cli.allow_primary, + true, // Always require read-write for fix commands + cli.quiet, + )?; + + let timeout_config = parse_timeout_config(&cli)?; + let session = DiagnosticSession::connect(&conn_result.url, timeout_config).await?; + setup_ctrlc_handler(session.cancel_token()); + + if !cli.quiet && !cli.json { + eprintln!("pgcrate: timeouts: {}", session.effective_timeouts()); + } + + match command { + FixCommands::Sequence { + sequence, + upgrade_to, + dry_run, + yes, + verify, + } => { + // Parse sequence name + let (schema, name) = if let Some((s, n)) = sequence.split_once('.') { + (s, n) + } else { + ("public", sequence.as_str()) + }; + + // Parse target type + let target_type = commands::fix::sequence::SequenceType::from_str(upgrade_to) + .ok_or_else(|| { + anyhow::anyhow!( + "Invalid target type '{}'. Use: integer, bigint", + upgrade_to + ) + })?; + + // Check gates + if !cli.read_write && !cli.allow_primary { + anyhow::bail!("Fix commands require --read-write and --primary flags"); + } + + let mut result = commands::fix::sequence::execute_upgrade( + session.client(), + schema, + name, + target_type, + *dry_run || !*yes, + ) + .await?; + + // Run verification if requested and fix was executed successfully + if *verify && result.executed && result.success { + let verify_steps = commands::fix::sequence::get_verify_steps(schema, name); + let verification = commands::fix::verify::run_verification(&verify_steps); + result.verification = Some(verification); + } + + if cli.json { + commands::fix::sequence::print_json( + &result, + Some(session.effective_timeouts()), + )?; + } else { + commands::fix::sequence::print_human(&result, cli.quiet); + } + + if !result.success { + std::process::exit(1); + } + } + FixCommands::Index { + drop, + dry_run, + yes, + verify, + } => { + // Parse index name + let (schema, name) = if let Some((s, n)) = drop.split_once('.') { + (s, n) + } else { + ("public", drop.as_str()) + }; + + // Check gates - index drop requires confirmation + if !cli.read_write && !cli.allow_primary { + anyhow::bail!("Fix commands require --read-write and --primary flags"); + } + + let mut result = commands::fix::index::execute_drop( + session.client(), + schema, + name, + *dry_run || !*yes, + ) + .await?; + + // Run verification if requested and fix was executed successfully + if *verify && result.executed && result.success { + let verify_steps = commands::fix::index::get_verify_steps(name); + let verification = commands::fix::verify::run_verification(&verify_steps); + result.verification = Some(verification); + } + + if cli.json { + commands::fix::index::print_json( + &result, + Some(session.effective_timeouts()), + )?; + } else { + commands::fix::index::print_human(&result, cli.quiet); + } + + if !result.success { + std::process::exit(1); + } + } + FixCommands::Vacuum { + table, + freeze, + full, + analyze, + dry_run, + yes, + verify, + } => { + // Parse table name + let (schema, name) = if let Some((s, n)) = table.split_once('.') { + (s, n) + } else { + ("public", table.as_str()) + }; + + // Check gates - VACUUM FULL requires confirmation + if !cli.read_write && !cli.allow_primary { + anyhow::bail!("Fix commands require --read-write and --primary flags"); + } + if *full && !*yes { + anyhow::bail!( + "VACUUM FULL requires ACCESS EXCLUSIVE lock. Use --yes to confirm." + ); + } + + let options = commands::fix::vacuum::VacuumOptions { + freeze: *freeze, + full: *full, + analyze: *analyze, + }; + + let mut result = commands::fix::vacuum::execute_vacuum( + session.client(), + schema, + name, + &options, + *dry_run || !*yes, + ) + .await?; + + // Run verification if requested and fix was executed successfully + if *verify && result.executed && result.success { + let verify_steps = commands::fix::vacuum::get_verify_steps(); + let verification = commands::fix::verify::run_verification(&verify_steps); + result.verification = Some(verification); + } + + if cli.json { + commands::fix::vacuum::print_json( + &result, + Some(session.effective_timeouts()), + )?; + } else { + commands::fix::vacuum::print_human(&result, cli.quiet); + } + + if !result.success { + std::process::exit(1); + } + } + } + } Commands::Locks { blocking, long_tx, @@ -1833,11 +2177,13 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { | Commands::Model { .. } | Commands::Init { .. } | Commands::Doctor { .. } - | Commands::Triage + | Commands::Triage { .. } | Commands::Locks { .. } | Commands::Xid { .. } | Commands::Sequences { .. } | Commands::Indexes { .. } + | Commands::Vacuum { .. } + | Commands::Fix { .. } | Commands::Context | Commands::Capabilities | Commands::Sql { .. } diff --git a/src/output.rs b/src/output.rs index a7795b6..95d8164 100644 --- a/src/output.rs +++ b/src/output.rs @@ -424,6 +424,7 @@ pub mod schema { pub const XID: &str = "pgcrate.diagnostics.xid"; pub const SEQUENCES: &str = "pgcrate.diagnostics.sequences"; pub const INDEXES: &str = "pgcrate.diagnostics.indexes"; + pub const VACUUM: &str = "pgcrate.diagnostics.vacuum"; pub const CONTEXT: &str = "pgcrate.diagnostics.context"; pub const CAPABILITIES: &str = "pgcrate.diagnostics.capabilities"; } diff --git a/studio/tasks/PGC-40-phase2a-fix-commands/review-request.md b/studio/tasks/PGC-40-phase2a-fix-commands/review-request.md new file mode 100644 index 0000000..e496eb6 --- /dev/null +++ b/studio/tasks/PGC-40-phase2a-fix-commands/review-request.md @@ -0,0 +1,154 @@ +# Phase 2a Code Review Request + +## Context + +**Project:** pgcrate-dba-diagnostics +**Branch:** `feature/phase2a-fix-commands` +**Base:** `c7d46cb` (Phase 1: JSON contracts foundation) +**Commits to review:** 3 commits totaling ~3,600 lines added + +This PR implements the fix command system for pgcrate—the ability to not just diagnose PostgreSQL issues but remediate them safely. + +--- + +## pgcrate Vision (from VISION.md) + +pgcrate is a **PostgreSQL CLI for application developers** with these principles: + +1. **Developer-first** - For app developers who manage their own DBs, not DBAs +2. **Safe defaults** - Read-only by default, explicit flags for mutations +3. **Diagnose-first** - Diagnostics identify issues, fixes require explicit action +4. **Machine-readable** - JSON output for automation and LLM integration +5. **Evidence-based** - Actions include evidence (stats, measurements) supporting recommendations + +The fix command system embodies the **diagnose → fix → verify** workflow: +- Diagnostics surface issues with evidence +- Fix commands show exact SQL that will run (`--dry-run`) +- Gates prevent accidental execution (`--read-write`, `--primary`, `--yes`) +- Verification confirms the fix worked + +--- + +## Commits + +### 1. `4d3bfaf` - Phase 2a: Add vacuum diagnostic and fix commands + +Core implementation: +- `src/commands/fix/` module with common types, sequence/index/vacuum fixes +- `src/commands/vacuum.rs` - New vacuum diagnostic +- Gate system for safe execution +- Verification runner for post-fix validation + +### 2. `3b1dfad` - Phase 2a: Add integration tests for fix commands + +- `tests/diagnostics/fix.rs` - 12 integration tests covering: + - Sequence upgrade (smallint→integer→bigint) + - Index drop with safety checks + - Vacuum operations + - Gate enforcement + - Dry-run behavior + +### 3. `a807cec` - Phase 2a: Update documentation for fix commands + +- README.md - Added vacuum and fix command sections +- CHANGELOG.md - Phase 2a release notes +- llms.txt - Updated for LLM consumption + +--- + +## Files to Review + +### Core Implementation (priority) + +| File | Lines | Purpose | +|------|-------|---------| +| `src/commands/fix/common.rs` | 335 | StructuredAction, ActionGates, Risk levels, gate checking | +| `src/commands/fix/sequence.rs` | 391 | ALTER SEQUENCE type upgrades | +| `src/commands/fix/index.rs` | 431 | DROP INDEX CONCURRENTLY with safety checks | +| `src/commands/fix/vacuum.rs` | 443 | VACUUM operations (regular/freeze/full/analyze) | +| `src/commands/fix/verify.rs` | 443 | JSONPath-based verification runner | +| `src/commands/vacuum.rs` | 482 | Vacuum health diagnostic | + +### Integration Points + +| File | Purpose | +|------|---------| +| `src/main.rs` | CLI routing for fix subcommands | +| `src/commands/triage.rs` | `--include-fixes` flag, action generation | +| `src/commands/indexes.rs` | Enhanced evidence collection | + +### Tests + +| File | Purpose | +|------|---------| +| `tests/diagnostics/fix.rs` | Integration tests for all fix commands | + +--- + +## Review Focus + +Please evaluate: + +### 1. Architecture +- Does the gate system (`requires_write`, `requires_primary`, `requires_confirmation`) provide appropriate safety? +- Is the StructuredAction pattern clear and well-designed? +- Does the separation between diagnostics (read-only) and fixes (mutations) make sense? + +### 2. Code Quality +- Any remaining bloat or unnecessary abstractions? +- Are error messages helpful? +- Is the SQL generation safe (identifier quoting, etc.)? + +### 3. Safety +- Are the safety checks in `fix/index.rs` sufficient? (primary key, replica identity, backing constraint) +- Is the risk level assignment appropriate? (Low for ALTER SEQUENCE, Medium for DROP INDEX, High for VACUUM FULL) +- Any SQL injection vectors? + +### 4. Testing +- Do the integration tests cover the important paths? +- Any edge cases missing? + +### 5. Documentation +- Is the README clear on how to use fix commands? +- Are the CLI help messages helpful? + +--- + +## Commands to Explore + +```bash +cd /Users/jackschultz/workspace/dev/pgcrate-studio/pgcrate-dba-diagnostics + +# View commits +git log --oneline c7d46cb..a807cec +git show 4d3bfaf --stat +git show 3b1dfad --stat +git show a807cec --stat + +# Read core files +cat src/commands/fix/common.rs +cat src/commands/fix/sequence.rs +cat src/commands/fix/index.rs + +# Run tests +cargo test --test integration fix + +# See CLI help +cargo run -- fix --help +cargo run -- fix sequence --help +cargo run -- vacuum --help +``` + +--- + +## Known Decisions + +These were intentional choices: + +1. **JSONPath evaluation is simple** - Uses basic path extraction, not full JSONPath spec. Complex conditions like `&&` or `<` are not supported. + +2. **Only Fix action type** - Removed Investigate/Monitor variants as they weren't used. Can add back if needed. + +3. **Three risk levels** - Removed None/Extreme as they weren't used. Low/Medium/High covers current operations. + +4. **VACUUM FULL requires --yes** - Despite being a standard operation, ACCESS EXCLUSIVE lock warrants explicit confirmation. diff --git a/tests/diagnostics/fix.rs b/tests/diagnostics/fix.rs new file mode 100644 index 0000000..6431e22 --- /dev/null +++ b/tests/diagnostics/fix.rs @@ -0,0 +1,531 @@ +//! Integration tests for fix commands. +//! +//! Tests fix sequence, fix index, and fix vacuum commands +//! including dry-run mode, gate checks, and safety blocks. + +use crate::common::{parse_json, stdout, TestDatabase, TestProject}; + +// ============================================================================ +// fix sequence +// ============================================================================ + +#[test] +fn test_fix_sequence_dry_run_shows_sql() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Create a test sequence + db.run_sql_ok("CREATE SEQUENCE test_seq AS integer;"); + + // Dry run should show SQL without executing + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "fix", + "sequence", + "public.test_seq", + "--upgrade-to", + "bigint", + "--dry-run", + ]); + + assert!(output.status.success()); + let out = stdout(&output); + assert!(out.contains("DRY RUN"), "Should indicate dry run mode"); + assert!( + out.contains("ALTER SEQUENCE"), + "Should show the SQL: {}", + out + ); +} + +#[test] +fn test_fix_sequence_requires_gates() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + db.run_sql_ok("CREATE SEQUENCE test_seq AS integer;"); + + // Without --read-write and --primary, should fail + let output = project.run_pgcrate(&[ + "fix", + "sequence", + "public.test_seq", + "--upgrade-to", + "bigint", + "--dry-run", + ]); + + assert!(!output.status.success(), "Should fail without gate flags"); +} + +#[test] +fn test_fix_sequence_json_output() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + db.run_sql_ok("CREATE SEQUENCE test_seq AS integer;"); + + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "--json", + "fix", + "sequence", + "public.test_seq", + "--upgrade-to", + "bigint", + "--dry-run", + ]); + + assert!(output.status.success()); + let json = parse_json(&output); + assert_eq!(json.get("ok"), Some(&serde_json::json!(true))); + assert_eq!( + json.get("schema_id"), + Some(&serde_json::json!("pgcrate.fix.sequence")) + ); + + let data = json.get("data").expect("Should have data field"); + assert_eq!(data.get("executed"), Some(&serde_json::json!(false))); + assert_eq!(data.get("success"), Some(&serde_json::json!(true))); + assert!(data.get("sql").is_some(), "Should have sql field"); +} + +#[test] +fn test_fix_sequence_blocks_downgrade() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Create a bigint sequence + db.run_sql_ok("CREATE SEQUENCE test_seq AS bigint;"); + + // Trying to downgrade to integer should fail + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "fix", + "sequence", + "public.test_seq", + "--upgrade-to", + "integer", + "--dry-run", + ]); + + assert!( + !output.status.success(), + "Should fail when trying to downgrade" + ); +} + +// ============================================================================ +// fix index +// ============================================================================ + +#[test] +fn test_fix_index_drop_dry_run() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Create a table and unused index + db.run_sql_ok("CREATE TABLE test_table (id serial PRIMARY KEY, name text);"); + db.run_sql_ok("CREATE INDEX idx_test_name ON test_table(name);"); + + // Dry run should show SQL + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "fix", + "index", + "--drop", + "public.idx_test_name", + "--dry-run", + ]); + + assert!( + output.status.success(), + "fix index failed: stdout={} stderr={}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + let out = stdout(&output); + assert!(out.contains("DRY RUN"), "Should indicate dry run mode"); + assert!( + out.contains("DROP INDEX CONCURRENTLY"), + "Should show concurrent drop SQL: {}", + out + ); +} + +#[test] +fn test_fix_index_blocks_primary_key() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Create a table with primary key + db.run_sql_ok("CREATE TABLE test_table (id serial PRIMARY KEY);"); + + // Get the primary key index name + let output = db.run_sql_ok( + "SELECT indexname FROM pg_indexes WHERE tablename = 'test_table' AND indexdef LIKE '%PRIMARY%' LIMIT 1;", + ); + let stdout_str = String::from_utf8_lossy(&output.stdout); + let index_name = stdout_str + .lines() + .skip(2) + .next() + .map(|s| s.trim()) + .unwrap_or("test_table_pkey"); + + // Trying to drop primary key index should fail + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "fix", + "index", + "--drop", + &format!("public.{}", index_name), + "--yes", + ]); + + assert!( + !output.status.success(), + "Should fail when trying to drop primary key index" + ); +} + +#[test] +fn test_fix_index_json_output() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + db.run_sql_ok("CREATE TABLE test_table (id serial PRIMARY KEY, name text);"); + db.run_sql_ok("CREATE INDEX idx_test_name ON test_table(name);"); + + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "--json", + "fix", + "index", + "--drop", + "public.idx_test_name", + "--dry-run", + ]); + + assert!(output.status.success()); + let json = parse_json(&output); + assert_eq!(json.get("ok"), Some(&serde_json::json!(true))); + assert_eq!( + json.get("schema_id"), + Some(&serde_json::json!("pgcrate.fix.index")) + ); + + let data = json.get("data").expect("Should have data field"); + assert_eq!(data.get("executed"), Some(&serde_json::json!(false))); +} + +// ============================================================================ +// fix vacuum +// ============================================================================ + +#[test] +fn test_fix_vacuum_dry_run() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Create a table + db.run_sql_ok("CREATE TABLE test_table (id serial PRIMARY KEY, name text);"); + db.run_sql_ok("INSERT INTO test_table (name) VALUES ('test');"); + + // Dry run should show SQL + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "fix", + "vacuum", + "public.test_table", + "--dry-run", + ]); + + assert!(output.status.success()); + let out = stdout(&output); + assert!(out.contains("DRY RUN"), "Should indicate dry run mode"); + assert!(out.contains("VACUUM"), "Should show VACUUM SQL: {}", out); +} + +#[test] +fn test_fix_vacuum_full_requires_yes() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + db.run_sql_ok("CREATE TABLE test_table (id serial PRIMARY KEY);"); + + // VACUUM FULL without --yes should fail + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "fix", + "vacuum", + "public.test_table", + "--full", + ]); + + assert!( + !output.status.success(), + "VACUUM FULL should require --yes flag" + ); +} + +#[test] +fn test_fix_vacuum_json_output() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + db.run_sql_ok("CREATE TABLE test_table (id serial PRIMARY KEY, name text);"); + + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "--json", + "fix", + "vacuum", + "public.test_table", + "--dry-run", + ]); + + assert!(output.status.success()); + let json = parse_json(&output); + assert_eq!(json.get("ok"), Some(&serde_json::json!(true))); + assert_eq!( + json.get("schema_id"), + Some(&serde_json::json!("pgcrate.fix.vacuum")) + ); + + let data = json.get("data").expect("Should have data field"); + assert_eq!(data.get("executed"), Some(&serde_json::json!(false))); +} + +// ============================================================================ +// vacuum diagnostic +// ============================================================================ + +#[test] +fn test_vacuum_diagnostic_json() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Create a table with some data + db.run_sql_ok("CREATE TABLE test_table (id serial PRIMARY KEY, name text);"); + db.run_sql_ok("INSERT INTO test_table (name) SELECT 'test' FROM generate_series(1, 100);"); + + let output = project.run_pgcrate_ok(&["vacuum", "--json"]); + + let json = parse_json(&output); + assert_eq!(json.get("ok"), Some(&serde_json::json!(true))); + assert_eq!( + json.get("schema_id"), + Some(&serde_json::json!("pgcrate.diagnostics.vacuum")) + ); + + let data = json.get("data").expect("Should have data field"); + assert!(data.get("tables").is_some(), "Should have tables field"); +} + +// ============================================================================ +// triage --include-fixes +// ============================================================================ + +#[test] +fn test_triage_include_fixes_json() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["triage", "--include-fixes", "--json"]); + + // Should succeed (exit code 0, 1, or 2 depending on state) + assert!( + output.status.code().unwrap_or(99) <= 2, + "triage should return valid exit code" + ); + + let json = parse_json(&output); + assert_eq!(json.get("ok"), Some(&serde_json::json!(true))); + + let data = json.get("data").expect("Should have data field"); + // With --include-fixes, actions should be present (even if empty) + assert!( + data.get("actions").is_some(), + "Should have actions field with --include-fixes: {}", + json + ); +} + +// ============================================================================ +// Execution tests (actually run fixes) +// ============================================================================ + +#[test] +fn test_fix_sequence_executes_upgrade() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Create an integer sequence + db.run_sql_ok("CREATE SEQUENCE exec_test_seq AS integer;"); + + // Verify it's integer before fix + let before = + db.run_sql_ok("SELECT data_type FROM pg_sequences WHERE sequencename = 'exec_test_seq';"); + let before_type = String::from_utf8_lossy(&before.stdout); + assert!( + before_type.contains("integer"), + "Sequence should be integer before fix: {}", + before_type + ); + + // Execute the upgrade with --yes + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "fix", + "sequence", + "public.exec_test_seq", + "--upgrade-to", + "bigint", + "--yes", + ]); + + assert!( + output.status.success(), + "Fix should succeed: stdout={} stderr={}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + let out = stdout(&output); + assert!(out.contains("SUCCESS"), "Should indicate success: {}", out); + + // Verify it's now bigint + let after = + db.run_sql_ok("SELECT data_type FROM pg_sequences WHERE sequencename = 'exec_test_seq';"); + let after_type = String::from_utf8_lossy(&after.stdout); + assert!( + after_type.contains("bigint"), + "Sequence should be bigint after fix: {}", + after_type + ); +} + +// ============================================================================ +// Special identifier tests +// ============================================================================ + +#[test] +fn test_fix_sequence_with_reserved_word_schema() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Create a schema named "user" (reserved word) + db.run_sql_ok("CREATE SCHEMA \"user\";"); + db.run_sql_ok("CREATE SEQUENCE \"user\".test_seq AS integer;"); + + // Dry run should work with quoted identifier + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "fix", + "sequence", + "user.test_seq", + "--upgrade-to", + "bigint", + "--dry-run", + ]); + + assert!( + output.status.success(), + "Should handle reserved word schema: stdout={} stderr={}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + let out = stdout(&output); + // The SQL should have properly quoted the "user" schema + assert!( + out.contains("\"user\""), + "Should quote reserved word 'user': {}", + out + ); +} + +#[test] +fn test_fix_vacuum_with_special_table_name() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Create a table with a name that needs quoting + db.run_sql_ok("CREATE TABLE \"My-Table\" (id serial PRIMARY KEY);"); + db.run_sql_ok("INSERT INTO \"My-Table\" DEFAULT VALUES;"); + + // Dry run should work with special characters + let output = project.run_pgcrate(&[ + "--read-write", + "--primary", + "fix", + "vacuum", + "public.My-Table", + "--dry-run", + ]); + + assert!( + output.status.success(), + "Should handle special table name: stdout={} stderr={}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + let out = stdout(&output); + // The SQL should have properly quoted the table name + assert!( + out.contains("\"My-Table\""), + "Should quote special table name: {}", + out + ); +} diff --git a/tests/diagnostics/mod.rs b/tests/diagnostics/mod.rs index 57a662c..dbcf5ef 100644 --- a/tests/diagnostics/mod.rs +++ b/tests/diagnostics/mod.rs @@ -1,4 +1,5 @@ mod basic; +mod fix; mod indexes; mod locks; mod sequences_scenarios;