From 1aa9af154cbd4f2a92a92bd013c2f3c79a4f81bd Mon Sep 17 00:00:00 2001 From: Jack Schultz Date: Mon, 19 Jan 2026 11:49:01 -0600 Subject: [PATCH 1/6] Add pgcrate bloat command for table/index bloat estimation - Statistical index bloat estimation using pg_class/pg_stats (ioguix-style) - Table bloat from dead tuple ratios via pg_stat_user_tables - Status thresholds: 20% warning, 50% critical - Recommendations for VACUUM FULL and REINDEX when critical - Full JSON support with pgcrate.diagnostics.bloat schema - Added diagnostics.bloat capability check - Integration tests for empty DB, tables, JSON structure, limit option --- CHANGELOG.md | 13 ++ README.md | 2 + llms.txt | 2 + src/commands/bloat.rs | 391 +++++++++++++++++++++++++++++++++++ src/commands/capabilities.rs | 31 +++ src/commands/mod.rs | 1 + src/main.rs | 44 ++++ src/output.rs | 1 + tests/diagnostics/bloat.rs | 121 +++++++++++ tests/diagnostics/mod.rs | 1 + 10 files changed, 607 insertions(+) create mode 100644 src/commands/bloat.rs create mode 100644 tests/diagnostics/bloat.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index dbc0b6d..c40240c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,19 @@ ## Unreleased +**Phase 3b: Bloat Diagnostics** + +### New Commands + +- **`pgcrate bloat`**: Estimate table and index bloat + - Statistical index bloat estimation (ioguix-style, works without extensions) + - Table bloat from dead tuple ratios (pg_stat_user_tables) + - Recommendations for VACUUM FULL and REINDEX when critical + - `--limit` option to control number of results + - Full JSON support with `pgcrate.diagnostics.bloat` schema + +--- + **Phase 2a: Fix Commands** Complete the diagnose→fix→verify loop with safe remediation commands. diff --git a/README.md b/README.md index 11d40ff..2276b60 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,7 @@ pgcrate xid # Transaction ID wraparound analysis pgcrate sequences # Sequence exhaustion check pgcrate indexes # Missing, unused, duplicate indexes pgcrate vacuum # Table bloat and vacuum health +pgcrate bloat # Estimate table and index bloat ``` All diagnostic commands support timeout flags for production safety: @@ -309,6 +310,7 @@ DROP TABLE users; | `pgcrate sequences` | Sequence exhaustion check | | `pgcrate indexes` | Missing, unused, duplicate indexes | | `pgcrate vacuum` | Table bloat and vacuum health | +| `pgcrate bloat` | Estimate table and index bloat | | `pgcrate fix sequence` | Upgrade sequence type to prevent exhaustion | | `pgcrate fix index` | Drop unused/duplicate indexes | | `pgcrate fix vacuum` | Run VACUUM on tables | diff --git a/llms.txt b/llms.txt index 0a35f45..8b59a98 100644 --- a/llms.txt +++ b/llms.txt @@ -752,6 +752,7 @@ pgcrate xid # Transaction ID wraparound analysis pgcrate sequences # Sequence exhaustion check pgcrate indexes # Missing, unused, duplicate indexes pgcrate vacuum # Table bloat and vacuum health +pgcrate bloat # Estimate table and index bloat # Connection context pgcrate context --json # Connection info, server version, privileges @@ -837,6 +838,7 @@ Currently, `--json` is supported for these commands: - `sequences` - Sequence exhaustion check - `indexes` - Index health analysis - `vacuum` - Table bloat analysis +- `bloat` - Table and index bloat estimation - `fix sequence` - Sequence upgrade result - `fix index` - Index drop result - `fix vacuum` - Vacuum result diff --git a/src/commands/bloat.rs b/src/commands/bloat.rs new file mode 100644 index 0000000..746b3bf --- /dev/null +++ b/src/commands/bloat.rs @@ -0,0 +1,391 @@ +//! Bloat command: Estimate table and index bloat. +//! +//! Shows wasted disk space from fragmentation. Tables bloat from updates/deletes; +//! indexes bloat from page splits and deletions. + +use anyhow::Result; +use serde::Serialize; +use tokio_postgres::Client; + +const WARNING_PCT: f64 = 20.0; +const CRITICAL_PCT: f64 = 50.0; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum BloatStatus { + Healthy, + Warning, + Critical, +} + +impl BloatStatus { + fn from_pct(pct: f64) -> Self { + if pct >= CRITICAL_PCT { + BloatStatus::Critical + } else if pct >= WARNING_PCT { + BloatStatus::Warning + } else { + BloatStatus::Healthy + } + } + + fn emoji(&self) -> &'static str { + match self { + BloatStatus::Healthy => "✓", + BloatStatus::Warning => "⚠", + BloatStatus::Critical => "✗", + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct TableBloat { + pub schema: String, + pub table: String, + pub size_bytes: i64, + pub bloat_bytes: i64, + pub bloat_pct: f64, + pub status: BloatStatus, +} + +#[derive(Debug, Clone, Serialize)] +pub struct IndexBloat { + pub schema: String, + pub table: String, + pub index: String, + pub size_bytes: i64, + pub bloat_bytes: i64, + pub bloat_pct: f64, + pub status: BloatStatus, +} + +#[derive(Debug, Serialize)] +pub struct BloatResult { + pub tables: Vec, + pub indexes: Vec, + pub total_table_bloat_bytes: i64, + pub total_index_bloat_bytes: i64, + pub overall_status: BloatStatus, +} + +/// Statistical index bloat estimation (ioguix-style). +/// Works without extensions by using pg_class and pg_stats. +async fn get_index_bloat(client: &Client, limit: usize) -> Result> { + // This query estimates index bloat using statistics. + // It calculates expected index size based on tuple counts and widths, + // then compares to actual size. + let query = r#" +WITH btree_index_atts AS ( + SELECT + n.nspname, + ci.relname AS index_relname, + ct.relname AS table_relname, + i.indexrelid, + i.indrelid, + pg_catalog.array_agg(a.attnum ORDER BY a.attnum) AS indkey, + pg_catalog.array_agg(a.attname ORDER BY a.attnum) AS indkeys + FROM pg_catalog.pg_index i + JOIN pg_catalog.pg_class ci ON ci.oid = i.indexrelid + JOIN pg_catalog.pg_class ct ON ct.oid = i.indrelid + JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace + JOIN pg_catalog.pg_attribute a ON a.attrelid = ct.oid AND a.attnum = ANY(i.indkey) + WHERE ci.relam = (SELECT oid FROM pg_am WHERE amname = 'btree') + AND n.nspname NOT IN ('pg_catalog', 'information_schema') + GROUP BY n.nspname, ci.relname, ct.relname, i.indexrelid, i.indrelid +), +index_stats AS ( + SELECT + b.nspname AS schema, + b.table_relname AS "table", + b.index_relname AS index, + pg_relation_size(b.indexrelid) AS size_bytes, + ci.relpages, + ci.reltuples, + coalesce( + ceil( + ci.reltuples * ( + sum(coalesce(s.avg_width, 8)) + 8 + 6 + ) / ( + current_setting('block_size')::int - 24 + ) + ), + 0 + )::bigint AS est_pages + FROM btree_index_atts b + JOIN pg_catalog.pg_class ci ON ci.oid = b.indexrelid + LEFT JOIN pg_catalog.pg_stats s ON s.schemaname = b.nspname + AND s.tablename = b.table_relname + AND s.attname = ANY(b.indkeys) + GROUP BY b.nspname, b.table_relname, b.index_relname, b.indexrelid, ci.relpages, ci.reltuples +) +SELECT + schema, + "table", + index, + size_bytes, + GREATEST(0, (relpages - est_pages) * current_setting('block_size')::int)::bigint AS bloat_bytes, + CASE WHEN relpages > 0 + THEN round(100.0 * GREATEST(0, relpages - est_pages) / relpages, 1) + ELSE 0 + END::float8 AS bloat_pct +FROM index_stats +WHERE relpages > 1 + AND size_bytes > 65536 +ORDER BY bloat_bytes DESC +LIMIT $1 +"#; + + let rows = client.query(query, &[&(limit as i64)]).await?; + let mut results = Vec::with_capacity(rows.len()); + + for row in rows { + let bloat_pct: f64 = row.get("bloat_pct"); + results.push(IndexBloat { + schema: row.get("schema"), + table: row.get("table"), + index: row.get("index"), + size_bytes: row.get("size_bytes"), + bloat_bytes: row.get("bloat_bytes"), + bloat_pct, + status: BloatStatus::from_pct(bloat_pct), + }); + } + + Ok(results) +} + +/// Table bloat from dead tuples (pg_stat_user_tables). +/// For accurate measurement, use pgstattuple extension. +async fn get_table_bloat(client: &Client, limit: usize) -> Result> { + let query = r#" +SELECT + schemaname AS schema, + relname AS "table", + pg_total_relation_size(relid) AS size_bytes, + CASE WHEN n_live_tup + n_dead_tup > 0 + THEN (n_dead_tup::float8 / (n_live_tup + n_dead_tup) * pg_total_relation_size(relid))::bigint + ELSE 0 + END AS bloat_bytes, + CASE WHEN n_live_tup + n_dead_tup > 0 + THEN round(100.0 * n_dead_tup / (n_live_tup + n_dead_tup), 1) + ELSE 0 + END::float8 AS bloat_pct +FROM pg_stat_user_tables +WHERE n_dead_tup > 0 + AND pg_total_relation_size(relid) > 65536 +ORDER BY bloat_bytes DESC +LIMIT $1 +"#; + + let rows = client.query(query, &[&(limit as i64)]).await?; + let mut results = Vec::with_capacity(rows.len()); + + for row in rows { + let bloat_pct: f64 = row.get("bloat_pct"); + results.push(TableBloat { + schema: row.get("schema"), + table: row.get("table"), + size_bytes: row.get("size_bytes"), + bloat_bytes: row.get("bloat_bytes"), + bloat_pct, + status: BloatStatus::from_pct(bloat_pct), + }); + } + + Ok(results) +} + +pub async fn get_bloat(client: &Client, limit: usize) -> Result { + let tables = get_table_bloat(client, limit).await?; + let indexes = get_index_bloat(client, limit).await?; + + let total_table_bloat: i64 = tables.iter().map(|t| t.bloat_bytes).sum(); + let total_index_bloat: i64 = indexes.iter().map(|i| i.bloat_bytes).sum(); + + let worst_table = tables.iter().map(|t| &t.status).max_by_key(|s| match s { + BloatStatus::Healthy => 0, + BloatStatus::Warning => 1, + BloatStatus::Critical => 2, + }); + + let worst_index = indexes.iter().map(|i| &i.status).max_by_key(|s| match s { + BloatStatus::Healthy => 0, + BloatStatus::Warning => 1, + BloatStatus::Critical => 2, + }); + + let overall_status = match (worst_table, worst_index) { + (Some(t), Some(i)) => { + if matches!(t, BloatStatus::Critical) || matches!(i, BloatStatus::Critical) { + BloatStatus::Critical + } else if matches!(t, BloatStatus::Warning) || matches!(i, BloatStatus::Warning) { + BloatStatus::Warning + } else { + BloatStatus::Healthy + } + } + (Some(s), None) | (None, Some(s)) => *s, + (None, None) => BloatStatus::Healthy, + }; + + Ok(BloatResult { + tables, + indexes, + total_table_bloat_bytes: total_table_bloat, + total_index_bloat_bytes: total_index_bloat, + overall_status, + }) +} + +fn format_bytes(bytes: i64) -> String { + if bytes >= 1_073_741_824 { + format!("{:.1} GB", bytes as f64 / 1_073_741_824.0) + } else if bytes >= 1_048_576 { + format!("{:.1} MB", bytes as f64 / 1_048_576.0) + } else if bytes >= 1024 { + format!("{:.1} KB", bytes as f64 / 1024.0) + } else { + format!("{} B", bytes) + } +} + +pub fn print_human(result: &BloatResult, quiet: bool) { + if result.tables.is_empty() && result.indexes.is_empty() { + if !quiet { + println!("No significant bloat detected."); + } + return; + } + + let total_bloat = result.total_table_bloat_bytes + result.total_index_bloat_bytes; + println!( + "BLOAT SUMMARY: {} estimated reclaimable", + format_bytes(total_bloat) + ); + println!(); + + // Tables + if !result.tables.is_empty() { + println!( + "TABLES ({} reclaimable):", + format_bytes(result.total_table_bloat_bytes) + ); + println!( + " {:3} {:40} {:>10} {:>10} {:>6}", + "", "TABLE", "SIZE", "BLOAT", "%" + ); + println!(" {}", "-".repeat(74)); + + for t in &result.tables { + let name = format!("{}.{}", t.schema, t.table); + let display_name = if name.len() > 40 { + format!("{}...", &name[..37]) + } else { + name + }; + println!( + " {} {:40} {:>10} {:>10} {:>5.1}%", + t.status.emoji(), + display_name, + format_bytes(t.size_bytes), + format_bytes(t.bloat_bytes), + t.bloat_pct + ); + } + println!(); + } + + // Indexes + if !result.indexes.is_empty() { + println!( + "INDEXES ({} reclaimable):", + format_bytes(result.total_index_bloat_bytes) + ); + println!( + " {:3} {:40} {:>10} {:>10} {:>6}", + "", "INDEX", "SIZE", "BLOAT", "%" + ); + println!(" {}", "-".repeat(74)); + + for i in &result.indexes { + let name = format!("{}.{}", i.schema, i.index); + let display_name = if name.len() > 40 { + format!("{}...", &name[..37]) + } else { + name + }; + println!( + " {} {:40} {:>10} {:>10} {:>5.1}%", + i.status.emoji(), + display_name, + format_bytes(i.size_bytes), + format_bytes(i.bloat_bytes), + i.bloat_pct + ); + } + } + + // Recommendations + let critical_tables: Vec<_> = result + .tables + .iter() + .filter(|t| t.status == BloatStatus::Critical) + .collect(); + let critical_indexes: Vec<_> = result + .indexes + .iter() + .filter(|i| i.status == BloatStatus::Critical) + .collect(); + + if !critical_tables.is_empty() || !critical_indexes.is_empty() { + println!(); + println!("RECOMMENDATIONS:"); + for t in critical_tables.iter().take(3) { + println!(" VACUUM FULL {}.{};", t.schema, t.table); + } + for i in critical_indexes.iter().take(3) { + println!(" REINDEX INDEX {}.{};", i.schema, i.index); + } + } +} + +pub fn print_json( + result: &BloatResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{schema, DiagnosticOutput, Severity}; + + let severity = match result.overall_status { + BloatStatus::Healthy => Severity::Healthy, + BloatStatus::Warning => Severity::Warning, + BloatStatus::Critical => Severity::Critical, + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts(schema::BLOAT, result, severity, t), + None => DiagnosticOutput::new(schema::BLOAT, result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_status_thresholds() { + assert_eq!(BloatStatus::from_pct(10.0), BloatStatus::Healthy); + assert_eq!(BloatStatus::from_pct(20.0), BloatStatus::Warning); + assert_eq!(BloatStatus::from_pct(50.0), BloatStatus::Critical); + assert_eq!(BloatStatus::from_pct(75.0), BloatStatus::Critical); + } + + #[test] + fn test_format_bytes() { + assert_eq!(format_bytes(500), "500 B"); + assert_eq!(format_bytes(1024), "1.0 KB"); + assert_eq!(format_bytes(1_500_000), "1.4 MB"); + assert_eq!(format_bytes(2_000_000_000), "1.9 GB"); + } +} diff --git a/src/commands/capabilities.rs b/src/commands/capabilities.rs index 6c35c97..3da30a3 100644 --- a/src/commands/capabilities.rs +++ b/src/commands/capabilities.rs @@ -106,6 +106,8 @@ pub async fn run_capabilities(client: &Client, read_only: bool) -> Result CapabilityInfo { + let requirements = vec![Requirement { + what: "pg_stat_user_tables SELECT".to_string(), + met: has_pg_stat_user_tables, + }]; + + let (status, reasons) = if !has_pg_stat_user_tables { + ( + CapabilityStatus::Unavailable, + vec![ReasonInfo::new( + ReasonCode::MissingPrivilege, + "Cannot read pg_stat_user_tables", + )], + ) + } else { + (CapabilityStatus::Available, vec![]) + }; + + CapabilityInfo { + id: "diagnostics.bloat", + name: "Bloat", + description: "Table and index bloat estimation", + status, + reasons, + requirements, + limitations: vec![], + } +} + fn check_xid_capability(has_pg_database: bool) -> CapabilityInfo { let requirements = vec![Requirement { what: "pg_database SELECT".to_string(), diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 6d5a7aa..19f6c92 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -3,6 +3,7 @@ //! Each submodule contains related command functions. mod anonymize; +pub mod bloat; mod bootstrap; pub mod capabilities; pub mod context; diff --git a/src/main.rs b/src/main.rs index d6cbcba..016831d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -76,6 +76,7 @@ fn json_supported(command: &Commands) -> bool { Commands::Sequences { .. } => true, Commands::Indexes { .. } => true, Commands::Vacuum { .. } => true, + Commands::Bloat { .. } => true, Commands::Fix { .. } => true, Commands::Context => true, Commands::Capabilities => true, @@ -349,6 +350,12 @@ enum Commands { #[arg(long, value_name = "PCT")] threshold: Option, }, + /// Estimate table and index bloat + Bloat { + /// Number of items to show (default: 10) + #[arg(long, default_value = "10")] + limit: usize, + }, /// Fix commands for remediation Fix { #[command(subcommand)] @@ -1322,6 +1329,42 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { commands::vacuum::VacuumStatus::Healthy => {} } } + Commands::Bloat { limit } => { + let config = + Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; + let conn_result = connection::resolve_and_validate( + &config, + cli.database_url.as_deref(), + cli.connection.as_deref(), + cli.env_var.as_deref(), + cli.allow_primary, + cli.read_write, + cli.quiet, + )?; + + let timeout_config = parse_timeout_config(&cli)?; + let session = DiagnosticSession::connect(&conn_result.url, timeout_config).await?; + setup_ctrlc_handler(session.cancel_token()); + + if !cli.quiet && !cli.json { + eprintln!("pgcrate: timeouts: {}", session.effective_timeouts()); + } + + let result = commands::bloat::get_bloat(session.client(), limit).await?; + + if cli.json { + commands::bloat::print_json(&result, Some(session.effective_timeouts()))?; + } else { + commands::bloat::print_human(&result, cli.quiet); + } + + // Exit with appropriate code + match result.overall_status { + commands::bloat::BloatStatus::Critical => std::process::exit(2), + commands::bloat::BloatStatus::Warning => std::process::exit(1), + commands::bloat::BloatStatus::Healthy => {} + } + } Commands::Fix { ref command } => { let config = Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; @@ -2183,6 +2226,7 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { | Commands::Sequences { .. } | Commands::Indexes { .. } | Commands::Vacuum { .. } + | Commands::Bloat { .. } | Commands::Fix { .. } | Commands::Context | Commands::Capabilities diff --git a/src/output.rs b/src/output.rs index 95d8164..0a34c5b 100644 --- a/src/output.rs +++ b/src/output.rs @@ -425,6 +425,7 @@ pub mod schema { pub const SEQUENCES: &str = "pgcrate.diagnostics.sequences"; pub const INDEXES: &str = "pgcrate.diagnostics.indexes"; pub const VACUUM: &str = "pgcrate.diagnostics.vacuum"; + pub const BLOAT: &str = "pgcrate.diagnostics.bloat"; pub const CONTEXT: &str = "pgcrate.diagnostics.context"; pub const CAPABILITIES: &str = "pgcrate.diagnostics.capabilities"; } diff --git a/tests/diagnostics/bloat.rs b/tests/diagnostics/bloat.rs new file mode 100644 index 0000000..e23784a --- /dev/null +++ b/tests/diagnostics/bloat.rs @@ -0,0 +1,121 @@ +//! Integration tests for bloat diagnostic command. + +use crate::common::{parse_json, stdout, TestDatabase, TestProject}; + +#[test] +fn test_bloat_empty_database() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::empty(&db); + + std::fs::write( + project.path("pgcrate.toml"), + format!( + r#"[database] +url = "{}" +"#, + db.url() + ), + ) + .unwrap(); + + let output = project.run_pgcrate(&["bloat"]); + + // Empty database should be healthy + assert!( + output.status.success(), + "Empty database should have no bloat" + ); +} + +#[test] +fn test_bloat_with_tables() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["bloat"]); + + // Fresh tables should have minimal bloat (healthy) + assert!( + output.status.code().unwrap_or(99) <= 2, + "bloat should return valid exit code" + ); +} + +#[test] +fn test_bloat_json_structure() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate_ok(&["bloat", "--json"]); + + let json = parse_json(&output); + assert!(json.is_object(), "Should return JSON object"); + + // Schema versioning fields + assert_eq!(json.get("ok"), Some(&serde_json::json!(true))); + assert_eq!( + json.get("schema_id"), + Some(&serde_json::json!("pgcrate.diagnostics.bloat")) + ); + assert!(json.get("schema_version").is_some()); + + // Data fields + let data = json.get("data").expect("JSON should have data field"); + assert!(data.get("tables").is_some(), "Should have tables array"); + assert!(data.get("indexes").is_some(), "Should have indexes array"); + assert!( + data.get("overall_status").is_some(), + "Should have overall_status" + ); + assert!( + data.get("total_table_bloat_bytes").is_some(), + "Should have total_table_bloat_bytes" + ); + assert!( + data.get("total_index_bloat_bytes").is_some(), + "Should have total_index_bloat_bytes" + ); +} + +#[test] +fn test_bloat_limit_option() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Test with custom limit + let output = project.run_pgcrate(&["bloat", "--limit", "5"]); + + assert!( + output.status.code().unwrap_or(99) <= 2, + "bloat with limit should work" + ); +} + +#[test] +fn test_bloat_human_output() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["bloat"]); + let out = stdout(&output); + + // Should show bloat summary + assert!( + out.contains("BLOAT") || out.contains("bloat") || out.contains("No significant bloat"), + "Should show bloat information: {}", + out + ); +} diff --git a/tests/diagnostics/mod.rs b/tests/diagnostics/mod.rs index dbcf5ef..8276cd1 100644 --- a/tests/diagnostics/mod.rs +++ b/tests/diagnostics/mod.rs @@ -1,4 +1,5 @@ mod basic; +mod bloat; mod fix; mod indexes; mod locks; From 6d646c32178fda2664eec3e653cb0f0ac4197d34 Mon Sep 17 00:00:00 2001 From: Jack Schultz Date: Mon, 19 Jan 2026 11:49:56 -0600 Subject: [PATCH 2/6] Fix UTF-8 slicing bug in bloat display name truncation Use chars().count() and chars().take() instead of byte slicing to avoid panic on multi-byte UTF-8 characters in table/index names. --- src/commands/bloat.rs | 10 +- .../PGC-50-phase3b-bloat-replication/task.md | 217 ++++++++++++++++++ 2 files changed, 223 insertions(+), 4 deletions(-) create mode 100644 studio/tasks/PGC-50-phase3b-bloat-replication/task.md diff --git a/src/commands/bloat.rs b/src/commands/bloat.rs index 746b3bf..6e65dd8 100644 --- a/src/commands/bloat.rs +++ b/src/commands/bloat.rs @@ -278,8 +278,9 @@ pub fn print_human(result: &BloatResult, quiet: bool) { for t in &result.tables { let name = format!("{}.{}", t.schema, t.table); - let display_name = if name.len() > 40 { - format!("{}...", &name[..37]) + let display_name = if name.chars().count() > 40 { + let truncated: String = name.chars().take(37).collect(); + format!("{}...", truncated) } else { name }; @@ -309,8 +310,9 @@ pub fn print_human(result: &BloatResult, quiet: bool) { for i in &result.indexes { let name = format!("{}.{}", i.schema, i.index); - let display_name = if name.len() > 40 { - format!("{}...", &name[..37]) + let display_name = if name.chars().count() > 40 { + let truncated: String = name.chars().take(37).collect(); + format!("{}...", truncated) } else { name }; diff --git a/studio/tasks/PGC-50-phase3b-bloat-replication/task.md b/studio/tasks/PGC-50-phase3b-bloat-replication/task.md new file mode 100644 index 0000000..02cbd50 --- /dev/null +++ b/studio/tasks/PGC-50-phase3b-bloat-replication/task.md @@ -0,0 +1,217 @@ +# Phase 3b/4: Bloat + Replication Diagnostics + +**Branch:** `feature/phase3b-bloat-replication` +**Location:** `/Users/jackschultz/workspace/dev/pgcrate-studio/pgcrate-dba-diagnostics` + +--- + +## Summary + +Add `pgcrate bloat` and `pgcrate replication` to complete production diagnostics. Defer `pgcrate pooler` (architecture mismatch - requires separate endpoint). + +--- + +## Commands + +### `pgcrate bloat` + +```bash +pgcrate bloat # All tables + indexes +pgcrate bloat --table schema.tbl # Specific table +pgcrate bloat --schema app # All in schema +pgcrate bloat --limit 20 # Top N by bloat +pgcrate bloat --json # JSON output +``` + +**Metrics:** +- Table bloat: dead tuples ratio, pgstattuple if available +- Index bloat: statistical estimation (ioguix-style) +- Bloat bytes and percentage + +**Thresholds:** +- Warning: >20% bloat +- Critical: >50% bloat + +**Capability:** `diagnostics.bloat` - always available (statistical), enhanced with pgstattuple + +### `pgcrate replication` + +```bash +pgcrate replication # Full overview +pgcrate replication --slots-only # Just slots +pgcrate replication --json # JSON output +``` + +**Metrics:** +- Server role (primary/standby via pg_is_in_recovery()) +- Replica lag (write_lag, flush_lag, replay_lag, byte lag) +- Slot status (active, wal_status, retained bytes) +- WAL receiver status (standby only) + +**Thresholds:** +- Warning: replay_lag >30s OR inactive slot retaining >1GB +- Critical: replay_lag >5min OR wal_status='lost' + +**Capability:** `diagnostics.replication` - requires pg_stat_replication SELECT + +--- + +## Task Breakdown + +### PGC-50a: `pgcrate bloat` + +1. Create `src/commands/bloat.rs` +2. Statistical index bloat query (no extension) +3. Table bloat from pg_stat_user_tables + optional pgstattuple +4. Human + JSON output +5. Wire up in main.rs, capabilities.rs, output.rs +6. Integration tests + +### PGC-50b: `pgcrate replication` + +1. Create `src/commands/replication.rs` +2. Server role detection +3. Query pg_stat_replication, pg_replication_slots, pg_stat_wal_receiver +4. Human + JSON output +5. Wire up +6. Integration tests (limited - hard to test without replica) + +--- + +## SQL Queries + +### Bloat - Index Statistical Estimation + +```sql +WITH index_stats AS ( + SELECT + schemaname, + tablename, + indexname, + pg_relation_size(indexrelid) as index_size, + idx_scan, + idx_tup_read, + idx_tup_fetch + FROM pg_stat_user_indexes + JOIN pg_index ON indexrelid = pg_stat_user_indexes.indexrelid + WHERE NOT indisunique -- unique indexes don't bloat the same way +), +index_bloat AS ( + SELECT + nspname AS schema, + tblname AS table, + idxname AS index, + bs*(relpages)::bigint AS real_size, + bs*(relpages-est_pages)::bigint AS bloat_bytes, + 100 * (relpages-est_pages)::float / relpages AS bloat_pct + FROM ( + SELECT + coalesce(1 + ceil(reltuples/floor((bs-pageopqdata-pagehdr)/(4+nulldatahdrwidth)::float)), 0) AS est_pages, + bs, nspname, tblname, idxname, relpages + FROM ( + SELECT + maxalign, bs, nspname, tblname, idxname, reltuples, relpages, + pagehdr, pageopqdata, + (index_tuple_hdr_bm + maxalign - CASE WHEN index_tuple_hdr_bm%maxalign = 0 THEN maxalign ELSE index_tuple_hdr_bm%maxalign END + nulldatawidth + maxalign - CASE WHEN nulldatawidth%maxalign = 0 THEN maxalign ELSE nulldatawidth%maxalign END)::float AS nulldatahdrwidth + FROM ( + SELECT + n.nspname, ct.relname AS tblname, ci.relname AS idxname, + ci.reltuples, ci.relpages, + current_setting('block_size')::int AS bs, + CASE WHEN version() ~ 'mingw32|64-bit' THEN 8 ELSE 4 END AS maxalign, + 24 AS pagehdr, 16 AS pageopqdata, + CASE WHEN max(coalesce(s.null_frac,0)) = 0 THEN 2 ELSE 2 + (32 + 8 - 1) / 8 END AS index_tuple_hdr_bm, + sum((1-coalesce(s.null_frac, 0)) * coalesce(s.avg_width, 1024)) AS nulldatawidth + FROM pg_index i + JOIN pg_class ct ON ct.oid = i.indrelid + JOIN pg_class ci ON ci.oid = i.indexrelid + JOIN pg_namespace n ON n.oid = ct.relnamespace + JOIN pg_stats s ON s.schemaname = n.nspname AND s.tablename = ct.relname AND s.attname = ANY(ARRAY(SELECT a.attname FROM pg_attribute a WHERE a.attrelid = ct.oid AND a.attnum = ANY(i.indkey))) + WHERE NOT i.indisunique + AND n.nspname NOT IN ('pg_catalog', 'information_schema') + GROUP BY 1,2,3,4,5,6,7,8,9 + ) sub1 + ) sub2 + ) sub3 + WHERE relpages > 0 +) +SELECT * FROM index_bloat WHERE bloat_pct > 0 ORDER BY bloat_bytes DESC; +``` + +### Bloat - Table (pg_stat_user_tables) + +```sql +SELECT + schemaname, + relname, + pg_total_relation_size(relid) as total_bytes, + pg_table_size(relid) as table_bytes, + n_live_tup, + n_dead_tup, + CASE WHEN n_live_tup > 0 + THEN round(100.0 * n_dead_tup / (n_live_tup + n_dead_tup), 1) + ELSE 0 + END as dead_tuple_pct, + last_vacuum, + last_autovacuum +FROM pg_stat_user_tables +WHERE n_dead_tup > 0 +ORDER BY n_dead_tup DESC; +``` + +### Replication - Primary + +```sql +SELECT + application_name, + client_addr::text, + state, + sync_state, + sent_lsn::text, + write_lsn::text, + flush_lsn::text, + replay_lsn::text, + write_lag::text, + flush_lag::text, + replay_lag::text, + pg_wal_lsn_diff(sent_lsn, replay_lsn) as lag_bytes +FROM pg_stat_replication; +``` + +### Replication - Slots + +```sql +SELECT + slot_name, + slot_type, + database, + active, + wal_status, + pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as retained_bytes +FROM pg_replication_slots; +``` + +### Replication - Standby + +```sql +SELECT + status, + sender_host, + sender_port, + slot_name, + pg_wal_lsn_diff(latest_end_lsn, received_lsn) as lag_bytes +FROM pg_stat_wal_receiver; +``` + +--- + +## Success Criteria + +- [ ] `pgcrate bloat` shows table and index bloat estimates +- [ ] `pgcrate bloat --json` matches schema patterns +- [ ] Graceful degradation without pgstattuple +- [ ] `pgcrate replication` detects primary vs standby +- [ ] `pgcrate replication` shows replica lag and slot status +- [ ] `pgcrate replication` handles "no replication" gracefully +- [ ] Capabilities updated for both commands +- [ ] All tests pass From 90eeb2c2b16336c219fc13224b058ad1f2be90fb Mon Sep 17 00:00:00 2001 From: Jack Schultz Date: Mon, 19 Jan 2026 12:01:33 -0600 Subject: [PATCH 3/6] Add pgcrate replication command for streaming replication health - Server role detection (primary vs standby via pg_is_in_recovery()) - Replica lag monitoring from pg_stat_replication (write, flush, replay lag) - Replication slot status from pg_replication_slots (active, wal_status, retained) - WAL receiver info on standby servers from pg_stat_wal_receiver - Status thresholds: - Warning: replay_lag >30s or inactive slot retaining >1GB - Critical: replay_lag >5min or wal_status='lost' or slot retaining >10GB - Full JSON support with pgcrate.diagnostics.replication schema - Added diagnostics.replication capability (degraded if no pg_stat_replication access) - Integration tests for standalone server (no replica) scenarios --- CHANGELOG.md | 11 +- README.md | 2 + llms.txt | 2 + src/commands/capabilities.rs | 36 +++ src/commands/mod.rs | 1 + src/commands/replication.rs | 460 +++++++++++++++++++++++++++++++ src/main.rs | 40 +++ src/output.rs | 1 + tests/diagnostics/mod.rs | 1 + tests/diagnostics/replication.rs | 136 +++++++++ 10 files changed, 689 insertions(+), 1 deletion(-) create mode 100644 src/commands/replication.rs create mode 100644 tests/diagnostics/replication.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index c40240c..bca675a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## Unreleased -**Phase 3b: Bloat Diagnostics** +**Phase 3b: Bloat + Replication Diagnostics** ### New Commands @@ -13,6 +13,15 @@ - `--limit` option to control number of results - Full JSON support with `pgcrate.diagnostics.bloat` schema +- **`pgcrate replication`**: Monitor streaming replication health + - Server role detection (primary vs standby) + - Replica lag monitoring (write, flush, replay lag) + - Replication slot status and WAL retention + - WAL receiver info (standby only) + - Warning: replay_lag >30s or inactive slot retaining >1GB + - Critical: replay_lag >5min or wal_status='lost' + - Full JSON support with `pgcrate.diagnostics.replication` schema + --- **Phase 2a: Fix Commands** diff --git a/README.md b/README.md index 2276b60..69fd148 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,7 @@ pgcrate sequences # Sequence exhaustion check pgcrate indexes # Missing, unused, duplicate indexes pgcrate vacuum # Table bloat and vacuum health pgcrate bloat # Estimate table and index bloat +pgcrate replication # Streaming replication health ``` All diagnostic commands support timeout flags for production safety: @@ -311,6 +312,7 @@ DROP TABLE users; | `pgcrate indexes` | Missing, unused, duplicate indexes | | `pgcrate vacuum` | Table bloat and vacuum health | | `pgcrate bloat` | Estimate table and index bloat | +| `pgcrate replication` | Streaming replication health monitoring | | `pgcrate fix sequence` | Upgrade sequence type to prevent exhaustion | | `pgcrate fix index` | Drop unused/duplicate indexes | | `pgcrate fix vacuum` | Run VACUUM on tables | diff --git a/llms.txt b/llms.txt index 8b59a98..dead58f 100644 --- a/llms.txt +++ b/llms.txt @@ -753,6 +753,7 @@ pgcrate sequences # Sequence exhaustion check pgcrate indexes # Missing, unused, duplicate indexes pgcrate vacuum # Table bloat and vacuum health pgcrate bloat # Estimate table and index bloat +pgcrate replication # Streaming replication health # Connection context pgcrate context --json # Connection info, server version, privileges @@ -839,6 +840,7 @@ Currently, `--json` is supported for these commands: - `indexes` - Index health analysis - `vacuum` - Table bloat analysis - `bloat` - Table and index bloat estimation +- `replication` - Streaming replication health - `fix sequence` - Sequence upgrade result - `fix index` - Index drop result - `fix vacuum` - Vacuum result diff --git a/src/commands/capabilities.rs b/src/commands/capabilities.rs index 3da30a3..32767d7 100644 --- a/src/commands/capabilities.rs +++ b/src/commands/capabilities.rs @@ -81,6 +81,7 @@ pub async fn run_capabilities(client: &Client, read_only: bool) -> Result Result CapabilityInfo { + let requirements = vec![Requirement { + what: "pg_stat_replication SELECT".to_string(), + met: has_pg_stat_replication, + }]; + + let (status, reasons) = if !has_pg_stat_replication { + ( + CapabilityStatus::Degraded, + vec![ReasonInfo::new( + ReasonCode::MissingPrivilege, + "Cannot read pg_stat_replication (replica info unavailable)", + )], + ) + } else { + (CapabilityStatus::Available, vec![]) + }; + + CapabilityInfo { + id: "diagnostics.replication", + name: "Replication", + description: "Streaming replication health monitoring", + status, + reasons, + requirements, + limitations: if !has_pg_stat_replication { + vec!["Replica lag information not available".to_string()] + } else { + vec![] + }, + } +} + fn check_bloat_capability(has_pg_stat_user_tables: bool) -> CapabilityInfo { let requirements = vec![Requirement { what: "pg_stat_user_tables SELECT".to_string(), diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 19f6c92..64bcd18 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -15,6 +15,7 @@ pub mod indexes; pub mod locks; mod migrations; pub mod model; +pub mod replication; mod role; mod schema; mod seed; diff --git a/src/commands/replication.rs b/src/commands/replication.rs new file mode 100644 index 0000000..2ec14f7 --- /dev/null +++ b/src/commands/replication.rs @@ -0,0 +1,460 @@ +//! Replication command: Monitor streaming replication health. +//! +//! Shows replica lag, slot status, and WAL receiver info. +//! Works on both primary and standby servers. + +use anyhow::Result; +use serde::Serialize; +use tokio_postgres::Client; + +const LAG_WARNING_SECS: f64 = 30.0; +const LAG_CRITICAL_SECS: f64 = 300.0; // 5 minutes +const SLOT_RETAINED_WARNING_BYTES: i64 = 1_073_741_824; // 1GB +const SLOT_RETAINED_CRITICAL_BYTES: i64 = 10_737_418_240; // 10GB + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum ReplicationStatus { + Healthy, + Warning, + Critical, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum ServerRole { + Primary, + Standby, +} + +#[derive(Debug, Clone, Serialize)] +pub struct ReplicaInfo { + pub application_name: String, + pub client_addr: Option, + pub state: String, + pub sync_state: String, + pub sent_lsn: Option, + pub write_lsn: Option, + pub flush_lsn: Option, + pub replay_lsn: Option, + pub write_lag_secs: Option, + pub flush_lag_secs: Option, + pub replay_lag_secs: Option, + pub lag_bytes: Option, + pub status: ReplicationStatus, +} + +#[derive(Debug, Clone, Serialize)] +pub struct SlotInfo { + pub slot_name: String, + pub slot_type: String, + pub database: Option, + pub active: bool, + pub wal_status: Option, + pub retained_bytes: Option, + pub status: ReplicationStatus, +} + +#[derive(Debug, Clone, Serialize)] +pub struct WalReceiverInfo { + pub status: String, + pub sender_host: Option, + pub sender_port: Option, + pub slot_name: Option, + pub received_lsn: Option, + pub latest_end_lsn: Option, + pub lag_bytes: Option, +} + +#[derive(Debug, Serialize)] +pub struct ReplicationResult { + pub server_role: ServerRole, + pub replicas: Vec, + pub slots: Vec, + pub wal_receiver: Option, + pub overall_status: ReplicationStatus, +} + +async fn get_server_role(client: &Client) -> Result { + let row = client.query_one("SELECT pg_is_in_recovery()", &[]).await?; + let in_recovery: bool = row.get(0); + Ok(if in_recovery { + ServerRole::Standby + } else { + ServerRole::Primary + }) +} + +async fn get_replicas(client: &Client) -> Result> { + let query = r#" +SELECT + application_name, + client_addr::text, + state, + sync_state, + sent_lsn::text, + write_lsn::text, + flush_lsn::text, + replay_lsn::text, + EXTRACT(EPOCH FROM write_lag)::float8 AS write_lag_secs, + EXTRACT(EPOCH FROM flush_lag)::float8 AS flush_lag_secs, + EXTRACT(EPOCH FROM replay_lag)::float8 AS replay_lag_secs, + pg_wal_lsn_diff(sent_lsn, replay_lsn)::bigint AS lag_bytes +FROM pg_stat_replication +ORDER BY application_name +"#; + + let rows = client.query(query, &[]).await?; + let mut results = Vec::with_capacity(rows.len()); + + for row in rows { + let replay_lag_secs: Option = row.get("replay_lag_secs"); + let status = match replay_lag_secs { + Some(lag) if lag >= LAG_CRITICAL_SECS => ReplicationStatus::Critical, + Some(lag) if lag >= LAG_WARNING_SECS => ReplicationStatus::Warning, + _ => ReplicationStatus::Healthy, + }; + + results.push(ReplicaInfo { + application_name: row.get("application_name"), + client_addr: row.get("client_addr"), + state: row.get("state"), + sync_state: row.get("sync_state"), + sent_lsn: row.get("sent_lsn"), + write_lsn: row.get("write_lsn"), + flush_lsn: row.get("flush_lsn"), + replay_lsn: row.get("replay_lsn"), + write_lag_secs: row.get("write_lag_secs"), + flush_lag_secs: row.get("flush_lag_secs"), + replay_lag_secs, + lag_bytes: row.get("lag_bytes"), + status, + }); + } + + Ok(results) +} + +async fn get_slots(client: &Client) -> Result> { + // Check if pg_current_wal_lsn exists (PG10+) or use pg_current_xlog_location (PG9) + let query = r#" +SELECT + slot_name, + slot_type, + database, + active, + wal_status, + pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)::bigint AS retained_bytes +FROM pg_replication_slots +ORDER BY slot_name +"#; + + let rows = client.query(query, &[]).await?; + let mut results = Vec::with_capacity(rows.len()); + + for row in rows { + let active: bool = row.get("active"); + let wal_status: Option = row.get("wal_status"); + let retained_bytes: Option = row.get("retained_bytes"); + + let status = if wal_status.as_deref() == Some("lost") { + ReplicationStatus::Critical + } else if !active { + match retained_bytes { + Some(bytes) if bytes >= SLOT_RETAINED_CRITICAL_BYTES => ReplicationStatus::Critical, + Some(bytes) if bytes >= SLOT_RETAINED_WARNING_BYTES => ReplicationStatus::Warning, + _ => ReplicationStatus::Healthy, + } + } else { + ReplicationStatus::Healthy + }; + + results.push(SlotInfo { + slot_name: row.get("slot_name"), + slot_type: row.get("slot_type"), + database: row.get("database"), + active, + wal_status, + retained_bytes, + status, + }); + } + + Ok(results) +} + +async fn get_wal_receiver(client: &Client) -> Result> { + let query = r#" +SELECT + status, + sender_host, + sender_port, + slot_name, + received_lsn::text, + latest_end_lsn::text, + CASE + WHEN latest_end_lsn IS NOT NULL AND received_lsn IS NOT NULL + THEN pg_wal_lsn_diff(latest_end_lsn, received_lsn)::bigint + ELSE NULL + END AS lag_bytes +FROM pg_stat_wal_receiver +LIMIT 1 +"#; + + let rows = client.query(query, &[]).await?; + if rows.is_empty() { + return Ok(None); + } + + let row = &rows[0]; + Ok(Some(WalReceiverInfo { + status: row.get("status"), + sender_host: row.get("sender_host"), + sender_port: row.get("sender_port"), + slot_name: row.get("slot_name"), + received_lsn: row.get("received_lsn"), + latest_end_lsn: row.get("latest_end_lsn"), + lag_bytes: row.get("lag_bytes"), + })) +} + +pub async fn get_replication(client: &Client) -> Result { + let server_role = get_server_role(client).await?; + + let replicas = if server_role == ServerRole::Primary { + get_replicas(client).await? + } else { + vec![] + }; + + let slots = get_slots(client).await?; + + let wal_receiver = if server_role == ServerRole::Standby { + get_wal_receiver(client).await? + } else { + None + }; + + // Calculate overall status + let worst_replica = replicas.iter().map(|r| &r.status).max_by_key(|s| match s { + ReplicationStatus::Healthy => 0, + ReplicationStatus::Warning => 1, + ReplicationStatus::Critical => 2, + }); + + let worst_slot = slots.iter().map(|s| &s.status).max_by_key(|s| match s { + ReplicationStatus::Healthy => 0, + ReplicationStatus::Warning => 1, + ReplicationStatus::Critical => 2, + }); + + let overall_status = match (worst_replica, worst_slot) { + (Some(r), Some(s)) => { + if matches!(r, ReplicationStatus::Critical) || matches!(s, ReplicationStatus::Critical) + { + ReplicationStatus::Critical + } else if matches!(r, ReplicationStatus::Warning) + || matches!(s, ReplicationStatus::Warning) + { + ReplicationStatus::Warning + } else { + ReplicationStatus::Healthy + } + } + (Some(s), None) | (None, Some(s)) => *s, + (None, None) => ReplicationStatus::Healthy, + }; + + Ok(ReplicationResult { + server_role, + replicas, + slots, + wal_receiver, + overall_status, + }) +} + +fn format_bytes(bytes: i64) -> String { + if bytes >= 1_073_741_824 { + format!("{:.1} GB", bytes as f64 / 1_073_741_824.0) + } else if bytes >= 1_048_576 { + format!("{:.1} MB", bytes as f64 / 1_048_576.0) + } else if bytes >= 1024 { + format!("{:.1} KB", bytes as f64 / 1024.0) + } else { + format!("{} B", bytes) + } +} + +fn format_lag(secs: Option) -> String { + match secs { + Some(s) if s >= 60.0 => format!("{:.1}m", s / 60.0), + Some(s) => format!("{:.1}s", s), + None => "-".to_string(), + } +} + +fn status_emoji(status: &ReplicationStatus) -> &'static str { + match status { + ReplicationStatus::Healthy => "✓", + ReplicationStatus::Warning => "⚠", + ReplicationStatus::Critical => "✗", + } +} + +pub fn print_human(result: &ReplicationResult, quiet: bool) { + let role_str = match result.server_role { + ServerRole::Primary => "PRIMARY", + ServerRole::Standby => "STANDBY", + }; + + if !quiet { + println!( + "REPLICATION STATUS: {} ({})", + role_str, + status_emoji(&result.overall_status) + ); + println!(); + } + + // Replicas (primary only) + if result.server_role == ServerRole::Primary { + if result.replicas.is_empty() { + if !quiet { + println!("REPLICAS: none"); + } + } else { + println!("REPLICAS:"); + println!( + " {:3} {:20} {:15} {:10} {:>10} {:>10}", + "", "APPLICATION", "CLIENT", "STATE", "LAG", "BYTES" + ); + println!(" {}", "-".repeat(72)); + + for r in &result.replicas { + let client = r.client_addr.as_deref().unwrap_or("-"); + let client_display = if client.len() > 15 { + format!("{}...", &client[..12]) + } else { + client.to_string() + }; + let app_display = if r.application_name.len() > 20 { + format!("{}...", &r.application_name[..17]) + } else { + r.application_name.clone() + }; + + println!( + " {} {:20} {:15} {:10} {:>10} {:>10}", + status_emoji(&r.status), + app_display, + client_display, + r.state, + format_lag(r.replay_lag_secs), + r.lag_bytes + .map(format_bytes) + .unwrap_or_else(|| "-".to_string()) + ); + } + println!(); + } + } + + // WAL Receiver (standby only) + if let Some(ref wr) = result.wal_receiver { + println!("WAL RECEIVER:"); + println!(" Status: {}", wr.status); + if let Some(ref host) = wr.sender_host { + let port = wr + .sender_port + .map(|p| format!(":{}", p)) + .unwrap_or_default(); + println!(" Sender: {}{}", host, port); + } + if let Some(ref slot) = wr.slot_name { + println!(" Slot: {}", slot); + } + if let Some(bytes) = wr.lag_bytes { + println!(" Lag: {}", format_bytes(bytes)); + } + println!(); + } + + // Slots + if result.slots.is_empty() { + if !quiet { + println!("REPLICATION SLOTS: none"); + } + } else { + println!("REPLICATION SLOTS:"); + println!( + " {:3} {:30} {:10} {:8} {:>12} {:>10}", + "", "SLOT", "TYPE", "ACTIVE", "WAL STATUS", "RETAINED" + ); + println!(" {}", "-".repeat(78)); + + for s in &result.slots { + let name_display = if s.slot_name.len() > 30 { + format!("{}...", &s.slot_name[..27]) + } else { + s.slot_name.clone() + }; + let active_str = if s.active { "yes" } else { "no" }; + let wal_status = s.wal_status.as_deref().unwrap_or("-"); + let retained = s + .retained_bytes + .map(format_bytes) + .unwrap_or_else(|| "-".to_string()); + + println!( + " {} {:30} {:10} {:8} {:>12} {:>10}", + status_emoji(&s.status), + name_display, + s.slot_type, + active_str, + wal_status, + retained + ); + } + } +} + +pub fn print_json( + result: &ReplicationResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{schema, DiagnosticOutput, Severity}; + + let severity = match result.overall_status { + ReplicationStatus::Healthy => Severity::Healthy, + ReplicationStatus::Warning => Severity::Warning, + ReplicationStatus::Critical => Severity::Critical, + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts(schema::REPLICATION, result, severity, t), + None => DiagnosticOutput::new(schema::REPLICATION, result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_lag() { + assert_eq!(format_lag(Some(5.0)), "5.0s"); + assert_eq!(format_lag(Some(90.0)), "1.5m"); + assert_eq!(format_lag(None), "-"); + } + + #[test] + fn test_format_bytes() { + assert_eq!(format_bytes(500), "500 B"); + assert_eq!(format_bytes(1024), "1.0 KB"); + assert_eq!(format_bytes(1_500_000), "1.4 MB"); + assert_eq!(format_bytes(2_000_000_000), "1.9 GB"); + } +} diff --git a/src/main.rs b/src/main.rs index 016831d..e542d1d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -77,6 +77,7 @@ fn json_supported(command: &Commands) -> bool { Commands::Indexes { .. } => true, Commands::Vacuum { .. } => true, Commands::Bloat { .. } => true, + Commands::Replication => true, Commands::Fix { .. } => true, Commands::Context => true, Commands::Capabilities => true, @@ -356,6 +357,8 @@ enum Commands { #[arg(long, default_value = "10")] limit: usize, }, + /// Monitor streaming replication health + Replication, /// Fix commands for remediation Fix { #[command(subcommand)] @@ -1365,6 +1368,42 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { commands::bloat::BloatStatus::Healthy => {} } } + Commands::Replication => { + let config = + Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; + let conn_result = connection::resolve_and_validate( + &config, + cli.database_url.as_deref(), + cli.connection.as_deref(), + cli.env_var.as_deref(), + cli.allow_primary, + cli.read_write, + cli.quiet, + )?; + + let timeout_config = parse_timeout_config(&cli)?; + let session = DiagnosticSession::connect(&conn_result.url, timeout_config).await?; + setup_ctrlc_handler(session.cancel_token()); + + if !cli.quiet && !cli.json { + eprintln!("pgcrate: timeouts: {}", session.effective_timeouts()); + } + + let result = commands::replication::get_replication(session.client()).await?; + + if cli.json { + commands::replication::print_json(&result, Some(session.effective_timeouts()))?; + } else { + commands::replication::print_human(&result, cli.quiet); + } + + // Exit with appropriate code + match result.overall_status { + commands::replication::ReplicationStatus::Critical => std::process::exit(2), + commands::replication::ReplicationStatus::Warning => std::process::exit(1), + commands::replication::ReplicationStatus::Healthy => {} + } + } Commands::Fix { ref command } => { let config = Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; @@ -2227,6 +2266,7 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { | Commands::Indexes { .. } | Commands::Vacuum { .. } | Commands::Bloat { .. } + | Commands::Replication | Commands::Fix { .. } | Commands::Context | Commands::Capabilities diff --git a/src/output.rs b/src/output.rs index 0a34c5b..a729a00 100644 --- a/src/output.rs +++ b/src/output.rs @@ -426,6 +426,7 @@ pub mod schema { pub const INDEXES: &str = "pgcrate.diagnostics.indexes"; pub const VACUUM: &str = "pgcrate.diagnostics.vacuum"; pub const BLOAT: &str = "pgcrate.diagnostics.bloat"; + pub const REPLICATION: &str = "pgcrate.diagnostics.replication"; pub const CONTEXT: &str = "pgcrate.diagnostics.context"; pub const CAPABILITIES: &str = "pgcrate.diagnostics.capabilities"; } diff --git a/tests/diagnostics/mod.rs b/tests/diagnostics/mod.rs index 8276cd1..ded4bcb 100644 --- a/tests/diagnostics/mod.rs +++ b/tests/diagnostics/mod.rs @@ -3,4 +3,5 @@ mod bloat; mod fix; mod indexes; mod locks; +mod replication; mod sequences_scenarios; diff --git a/tests/diagnostics/replication.rs b/tests/diagnostics/replication.rs new file mode 100644 index 0000000..c13c95d --- /dev/null +++ b/tests/diagnostics/replication.rs @@ -0,0 +1,136 @@ +//! Integration tests for replication diagnostic command. +//! +//! Note: These tests run against a standalone database without replicas, +//! so they test the "no replication" path and JSON structure. + +use crate::common::{parse_json, stdout, TestDatabase, TestProject}; + +#[test] +fn test_replication_standalone_server() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::empty(&db); + + std::fs::write( + project.path("pgcrate.toml"), + format!( + r#"[database] +url = "{}" +"#, + db.url() + ), + ) + .unwrap(); + + // Standalone server should succeed with healthy status + let output = project.run_pgcrate(&["replication"]); + + assert!( + output.status.success(), + "Standalone server should report healthy replication status" + ); +} + +#[test] +fn test_replication_json_structure() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::empty(&db); + + std::fs::write( + project.path("pgcrate.toml"), + format!( + r#"[database] +url = "{}" +"#, + db.url() + ), + ) + .unwrap(); + + let output = project.run_pgcrate_ok(&["replication", "--json"]); + + let json = parse_json(&output); + assert!(json.is_object(), "Should return JSON object"); + + // Schema versioning fields + assert_eq!(json.get("ok"), Some(&serde_json::json!(true))); + assert_eq!( + json.get("schema_id"), + Some(&serde_json::json!("pgcrate.diagnostics.replication")) + ); + assert!(json.get("schema_version").is_some()); + + // Data fields + let data = json.get("data").expect("JSON should have data field"); + assert!(data.get("server_role").is_some(), "Should have server_role"); + assert!(data.get("replicas").is_some(), "Should have replicas array"); + assert!(data.get("slots").is_some(), "Should have slots array"); + assert!( + data.get("overall_status").is_some(), + "Should have overall_status" + ); +} + +#[test] +fn test_replication_detects_primary() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::empty(&db); + + std::fs::write( + project.path("pgcrate.toml"), + format!( + r#"[database] +url = "{}" +"#, + db.url() + ), + ) + .unwrap(); + + let output = project.run_pgcrate_ok(&["replication", "--json"]); + + let json = parse_json(&output); + let data = json.get("data").expect("JSON should have data field"); + + // Standalone server should report as primary + assert_eq!( + data.get("server_role"), + Some(&serde_json::json!("primary")), + "Standalone server should report as primary" + ); +} + +#[test] +fn test_replication_human_output() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::empty(&db); + + std::fs::write( + project.path("pgcrate.toml"), + format!( + r#"[database] +url = "{}" +"#, + db.url() + ), + ) + .unwrap(); + + let output = project.run_pgcrate(&["replication"]); + let out = stdout(&output); + + // Should show role and status + assert!( + out.contains("PRIMARY") || out.contains("STANDBY"), + "Should show server role: {}", + out + ); + assert!( + out.contains("REPLICATION") || out.contains("replication"), + "Should contain replication info: {}", + out + ); +} From 05d8074be032c5ed1a8902dbec0db70893dc63f0 Mon Sep 17 00:00:00 2001 From: Jack Schultz Date: Mon, 19 Jan 2026 14:30:54 -0600 Subject: [PATCH 4/6] Fix bugs and add UX improvements from agent feedback Bug fixes: - Fix UTF-8 slicing in sequences.rs display (same issue as bloat.rs) - Fix xid_age type overflow in triage.rs (i32 -> i64 for large XID ages) - Add better error context in xid.rs for empty database handling - Align triage sequences check with sequences.rs (consistent float calculation) UX improvements: - Add 'migration' as visible alias for 'migrate' command - Add 'create' as visible alias for 'migrate new' command These fixes address issues reported in 25+ agent feedback sessions. --- src/commands/sequences.rs | 12 +++++++----- src/commands/triage.rs | 32 ++++++++++++++++---------------- src/commands/xid.rs | 21 ++++++++++++++++----- src/main.rs | 2 ++ 4 files changed, 41 insertions(+), 26 deletions(-) diff --git a/src/commands/sequences.rs b/src/commands/sequences.rs index 1ebccb2..34230ea 100644 --- a/src/commands/sequences.rs +++ b/src/commands/sequences.rs @@ -175,14 +175,16 @@ pub fn print_human(result: &SequencesResult, quiet: bool, show_all: bool) { for seq in to_show { let full_name = format!("{}.{}", seq.schema, seq.name); + // Use chars().count() for UTF-8 safe length check + let display_name = if full_name.chars().count() > 40 { + format!("{}...", full_name.chars().take(37).collect::()) + } else { + full_name + }; println!( " {} {:40} {:>12} {:>12} {:>6.1}%", seq.status.emoji(), - if full_name.len() > 40 { - format!("{}...", &full_name[..37]) - } else { - full_name - }, + display_name, format_number(seq.last_value), format_number(seq.max_value), seq.pct_used diff --git a/src/commands/triage.rs b/src/commands/triage.rs index 55e4302..6b5aaf4 100644 --- a/src/commands/triage.rs +++ b/src/commands/triage.rs @@ -321,7 +321,7 @@ async fn check_xid_age(client: &Client) -> CheckOutcome { let query = r#" SELECT datname, - age(datfrozenxid) as xid_age + age(datfrozenxid)::bigint as xid_age FROM pg_database WHERE datallowconn ORDER BY age(datfrozenxid) DESC @@ -331,7 +331,8 @@ async fn check_xid_age(client: &Client) -> CheckOutcome { match client.query_one(query, &[]).await { Ok(row) => { let datname: String = row.get("datname"); - let xid_age: i32 = row.get("xid_age"); + // Use i64 to handle XID ages that could exceed i32 max + let xid_age: i64 = row.get("xid_age"); // XID wraparound happens at 2^31 (~2.1 billion) let max_xid: i64 = 2_147_483_648; @@ -387,18 +388,17 @@ async fn check_sequences(client: &Client) -> CheckOutcome { let label = "SEQUENCES"; // Check sequences that are >70% exhausted + // Use same calculation as sequences.rs for consistency (float with rounding) let query = r#" SELECT schemaname || '.' || sequencename as seq_name, - last_value, + COALESCE(last_value, 0) as last_value, CASE - WHEN increment_by > 0 THEN - (last_value::numeric / max_value::numeric * 100)::int - ELSE - ((min_value::numeric - last_value::numeric) / (min_value::numeric - max_value::numeric) * 100)::int + WHEN increment_by > 0 AND max_value > 0 AND last_value IS NOT NULL + THEN round(100.0 * last_value / max_value, 1)::double precision + ELSE 0::double precision END as pct_used FROM pg_sequences - WHERE last_value IS NOT NULL ORDER BY pct_used DESC LIMIT 5 "#; @@ -408,27 +408,27 @@ async fn check_sequences(client: &Client) -> CheckOutcome { let critical: Vec<_> = rows .iter() .filter(|r| { - let pct: i32 = r.get("pct_used"); - pct > 85 + let pct: f64 = r.get("pct_used"); + pct > 85.0 }) .collect(); let warning: Vec<_> = rows .iter() .filter(|r| { - let pct: i32 = r.get("pct_used"); - pct > 70 && pct <= 85 + let pct: f64 = r.get("pct_used"); + pct > 70.0 && pct <= 85.0 }) .collect(); if !critical.is_empty() { let seq_name: String = critical[0].get("seq_name"); - let pct: i32 = critical[0].get("pct_used"); + let pct: f64 = critical[0].get("pct_used"); CheckOutcome::Ok(CheckResult { name, label, status: CheckStatus::Critical, - summary: format!("{} at {}% (+ {} more)", seq_name, pct, critical.len() - 1), + summary: format!("{} at {:.1}% (+ {} more)", seq_name, pct, critical.len() - 1), details: None, next_actions: vec![NextAction::pgcrate( &["sequences"], @@ -437,12 +437,12 @@ async fn check_sequences(client: &Client) -> CheckOutcome { }) } else if !warning.is_empty() { let seq_name: String = warning[0].get("seq_name"); - let pct: i32 = warning[0].get("pct_used"); + let pct: f64 = warning[0].get("pct_used"); CheckOutcome::Ok(CheckResult { name, label, status: CheckStatus::Warning, - summary: format!("{} at {}%", seq_name, pct), + summary: format!("{} at {:.1}%", seq_name, pct), details: None, next_actions: vec![NextAction::pgcrate( &["sequences"], diff --git a/src/commands/xid.rs b/src/commands/xid.rs index e157fec..d88efed 100644 --- a/src/commands/xid.rs +++ b/src/commands/xid.rs @@ -4,7 +4,7 @@ //! If XID age gets too high, the database will shut down to prevent data corruption. //! This command helps monitor XID age at database and table levels. -use anyhow::Result; +use anyhow::{Context, Result}; use serde::Serialize; use tokio_postgres::Client; @@ -79,25 +79,31 @@ pub struct XidResult { /// Get database-level XID ages pub async fn get_database_xid(client: &Client) -> Result> { + // Use explicit double precision cast to avoid deserialization issues let query = r#" SELECT datname, age(datfrozenxid)::bigint as xid_age, - round(100.0 * age(datfrozenxid) / 2147483647, 2)::float8 as pct_used + (100.0 * age(datfrozenxid)::double precision / 2147483647.0)::double precision as pct_used FROM pg_database WHERE datallowconn ORDER BY age(datfrozenxid) DESC "#; - let rows = client.query(query, &[]).await?; + let rows = client + .query(query, &[]) + .await + .context("Failed to query database XID ages")?; + let mut results = Vec::new(); for row in rows { let xid_age: i64 = row.get("xid_age"); + let pct_used: f64 = row.get("pct_used"); results.push(DatabaseXid { datname: row.get("datname"), xid_age, - pct_used: row.get("pct_used"), + pct_used, status: XidStatus::from_age(xid_age), }); } @@ -107,6 +113,7 @@ pub async fn get_database_xid(client: &Client) -> Result> { /// Get table-level XID ages (oldest unfrozen tables) pub async fn get_table_xid(client: &Client, limit: usize) -> Result> { + // Query user tables with XID age. Returns empty vec for databases with no tables. let query = r#" SELECT s.schemaname, @@ -120,7 +127,11 @@ pub async fn get_table_xid(client: &Client, limit: usize) -> Result Date: Mon, 19 Jan 2026 14:40:24 -0600 Subject: [PATCH 5/6] docs: Update CHANGELOG with bug fixes and UX improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug Fixes: - UTF-8 string slicing in bloat/sequences (prevents panic on non-ASCII) - XID age type overflow in triage (i32 → i64) - Triage sequences percentage calculation alignment - Better error context for XID on empty databases UX Improvements: - Add 'migration' as alias for 'migrate' command - Add 'create' as alias for 'migrate new' command --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bca675a..c2c2aca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,18 @@ - Critical: replay_lag >5min or wal_status='lost' - Full JSON support with `pgcrate.diagnostics.replication` schema +### Bug Fixes + +- Fix UTF-8 string slicing in `bloat` and `sequences` display (prevents panic on non-ASCII names) +- Fix XID age type overflow in triage (i32 → i64 for databases with high XID age) +- Fix triage sequences percentage calculation to match `sequences` command (consistent rounding) +- Add better error context for XID command on empty databases + +### UX Improvements + +- Add `migration` as alias for `migrate` command (reduces confusion) +- Add `create` as alias for `migrate new` command + --- **Phase 2a: Fix Commands** From accc23a83c73050f0d3f263bb81a9f7f7a5822fc Mon Sep 17 00:00:00 2001 From: Jack Schultz Date: Mon, 19 Jan 2026 14:52:07 -0600 Subject: [PATCH 6/6] Add pgcrate queries and connections commands (Phase 3a) Completes the "why is prod slow?" workflow with: pgcrate queries: - Top queries from pg_stat_statements by total time, mean time, or calls - Cache hit ratio per query - Status thresholds: warning >1s mean, critical >5s mean - Graceful degradation when extension not installed - JSON output with pgcrate.diagnostics.queries schema pgcrate connections: - Connection usage vs max_connections with percentage - Breakdown by state (active, idle, idle in transaction) - Group by user, database, or application - Status thresholds: warning >75%, critical >90% - JSON output with pgcrate.diagnostics.connections schema Updated capabilities: - diagnostics.queries: available when pg_stat_statements installed - diagnostics.connections: always available (uses pg_stat_activity) --- CHANGELOG.md | 27 ++ README.md | 6 + src/commands/capabilities.rs | 43 ++- src/commands/connections.rs | 495 +++++++++++++++++++++++++++++++++++ src/commands/mod.rs | 2 + src/commands/queries.rs | 427 ++++++++++++++++++++++++++++++ src/main.rs | 118 +++++++++ src/output.rs | 2 + 8 files changed, 1111 insertions(+), 9 deletions(-) create mode 100644 src/commands/connections.rs create mode 100644 src/commands/queries.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index c2c2aca..2d2fd6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,33 @@ ## Unreleased +**Phase 3a: Query Performance Diagnostics** + +Complete the "why is prod slow?" workflow with query and connection analysis. + +### New Commands + +- **`pgcrate queries`**: Top queries from pg_stat_statements + - Sort by total time, mean time, or call count (`--by total|mean|calls`) + - Cache hit ratio per query + - Status thresholds: warning >1s mean, critical >5s mean + - Graceful degradation when pg_stat_statements not installed + - Full JSON support with `pgcrate.diagnostics.queries` schema + +- **`pgcrate connections`**: Connection usage analysis + - Usage vs max_connections with percentage + - Breakdown by state (active, idle, idle in transaction) + - Group by user, database, or application (`--by-user`, `--by-database`, `--by-application`) + - Status thresholds: warning >75%, critical >90% + - Full JSON support with `pgcrate.diagnostics.connections` schema + +### Capabilities + +- `diagnostics.queries` - Available when pg_stat_statements extension is installed +- `diagnostics.connections` - Always available (uses pg_stat_activity) + +--- + **Phase 3b: Bloat + Replication Diagnostics** ### New Commands diff --git a/README.md b/README.md index 69fd148..3c0664b 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,10 @@ pgcrate indexes # Missing, unused, duplicate indexes pgcrate vacuum # Table bloat and vacuum health pgcrate bloat # Estimate table and index bloat pgcrate replication # Streaming replication health +pgcrate queries # Top queries (requires pg_stat_statements) +pgcrate queries --by mean # Sort by mean execution time +pgcrate connections # Connection usage vs max_connections +pgcrate connections --by-user # Group by user ``` All diagnostic commands support timeout flags for production safety: @@ -313,6 +317,8 @@ DROP TABLE users; | `pgcrate vacuum` | Table bloat and vacuum health | | `pgcrate bloat` | Estimate table and index bloat | | `pgcrate replication` | Streaming replication health monitoring | +| `pgcrate queries` | Top queries from pg_stat_statements | +| `pgcrate connections` | Connection usage vs max_connections | | `pgcrate fix sequence` | Upgrade sequence type to prevent exhaustion | | `pgcrate fix index` | Drop unused/duplicate indexes | | `pgcrate fix vacuum` | Run VACUUM on tables | diff --git a/src/commands/capabilities.rs b/src/commands/capabilities.rs index 32767d7..9c17c87 100644 --- a/src/commands/capabilities.rs +++ b/src/commands/capabilities.rs @@ -121,8 +121,10 @@ pub async fn run_capabilities(client: &Client, read_only: bool) -> Result CapabilityInfo { met: has_pg_stat_statements, }]; - let (status, reasons) = if !has_pg_stat_statements { + let (status, reasons, limitations) = if !has_pg_stat_statements { ( CapabilityStatus::Unavailable, vec![ReasonInfo::new( ReasonCode::MissingExtension, "pg_stat_statements extension not installed or accessible", )], + vec!["Install pg_stat_statements extension to enable query analysis".to_string()], ) } else { - // Even with the extension, this capability is not yet implemented + (CapabilityStatus::Available, vec![], vec![]) + }; + + CapabilityInfo { + id: "diagnostics.queries", + name: "Query Analysis", + description: "Slow query identification (pg_stat_statements)", + status, + reasons, + requirements, + limitations, + } +} + +fn check_connections_capability(has_pg_stat_activity: bool) -> CapabilityInfo { + let requirements = vec![Requirement { + what: "pg_stat_activity SELECT".to_string(), + met: has_pg_stat_activity, + }]; + + let (status, reasons) = if !has_pg_stat_activity { ( CapabilityStatus::Unavailable, vec![ReasonInfo::new( - ReasonCode::NotApplicable, - "Query analysis not yet implemented (planned for Phase 3)", + ReasonCode::MissingPrivilege, + "Cannot read pg_stat_activity", )], ) + } else { + (CapabilityStatus::Available, vec![]) }; CapabilityInfo { - id: "diagnostics.queries", - name: "Query Analysis", - description: "Slow query identification (pg_stat_statements)", + id: "diagnostics.connections", + name: "Connections", + description: "Connection usage analysis vs max_connections", status, reasons, requirements, - limitations: vec!["Not yet implemented".to_string()], + limitations: vec![], } } diff --git a/src/commands/connections.rs b/src/commands/connections.rs new file mode 100644 index 0000000..9b1355f --- /dev/null +++ b/src/commands/connections.rs @@ -0,0 +1,495 @@ +//! Connections command: Connection usage analysis. +//! +//! Monitors connection pool usage against max_connections to identify +//! connection exhaustion risks and connection management issues. + +use anyhow::{Context, Result}; +use serde::Serialize; +use std::collections::HashMap; +use tokio_postgres::Client; + +/// Status thresholds (percentage of max_connections) +const CONN_WARNING_PCT: f64 = 75.0; +const CONN_CRITICAL_PCT: f64 = 90.0; + +/// Connection status level +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum ConnectionStatus { + Healthy, + Warning, + Critical, +} + +impl ConnectionStatus { + pub fn from_pct(pct: f64) -> Self { + if pct >= CONN_CRITICAL_PCT { + ConnectionStatus::Critical + } else if pct >= CONN_WARNING_PCT { + ConnectionStatus::Warning + } else { + ConnectionStatus::Healthy + } + } + + pub fn emoji(&self) -> &'static str { + match self { + ConnectionStatus::Healthy => "✓", + ConnectionStatus::Warning => "⚠", + ConnectionStatus::Critical => "✗", + } + } +} + +/// Connection statistics +#[derive(Debug, Clone, Serialize)] +pub struct ConnectionStats { + pub total: i32, + pub max_connections: i32, + pub reserved_connections: i32, + pub available: i32, + pub usage_pct: f64, + pub by_state: HashMap, + pub status: ConnectionStatus, +} + +/// Connections grouped by user +#[derive(Debug, Clone, Serialize)] +pub struct UserConnections { + pub username: String, + pub count: i32, + pub by_state: HashMap, +} + +/// Connections grouped by database +#[derive(Debug, Clone, Serialize)] +pub struct DatabaseConnections { + pub database: String, + pub count: i32, + pub by_state: HashMap, +} + +/// Connections grouped by application +#[derive(Debug, Clone, Serialize)] +pub struct ApplicationConnections { + pub application_name: String, + pub count: i32, + pub by_state: HashMap, +} + +/// Full connections results +#[derive(Debug, Serialize)] +pub struct ConnectionsResult { + pub stats: ConnectionStats, + #[serde(skip_serializing_if = "Option::is_none")] + pub by_user: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub by_database: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub by_application: Option>, + pub overall_status: ConnectionStatus, +} + +/// Get max_connections setting +async fn get_max_connections(client: &Client) -> Result { + let query = "SELECT setting::int FROM pg_settings WHERE name = 'max_connections'"; + let row = client + .query_one(query, &[]) + .await + .context("Failed to get max_connections")?; + Ok(row.get::<_, i32>(0)) +} + +/// Get superuser_reserved_connections setting +async fn get_reserved_connections(client: &Client) -> Result { + let query = "SELECT setting::int FROM pg_settings WHERE name = 'superuser_reserved_connections'"; + let row = client + .query_one(query, &[]) + .await + .context("Failed to get superuser_reserved_connections")?; + Ok(row.get::<_, i32>(0)) +} + +/// Get current connection count +async fn get_total_connections(client: &Client) -> Result { + let query = "SELECT COUNT(*)::int FROM pg_stat_activity WHERE pid != pg_backend_pid()"; + let row = client + .query_one(query, &[]) + .await + .context("Failed to count connections")?; + Ok(row.get::<_, i32>(0)) +} + +/// Get connection counts by state +async fn get_connections_by_state(client: &Client) -> Result> { + let query = r#" + SELECT + COALESCE(state, 'null') as state, + COUNT(*)::int as count + FROM pg_stat_activity + WHERE pid != pg_backend_pid() + GROUP BY state + ORDER BY count DESC + "#; + + let rows = client + .query(query, &[]) + .await + .context("Failed to get connections by state")?; + + let mut by_state = HashMap::new(); + for row in rows { + let state: String = row.get("state"); + let count: i32 = row.get("count"); + by_state.insert(state, count); + } + + Ok(by_state) +} + +/// Get connection counts grouped by user +async fn get_connections_by_user(client: &Client) -> Result> { + let query = r#" + SELECT + COALESCE(usename, '') as username, + COALESCE(state, 'null') as state, + COUNT(*)::int as count + FROM pg_stat_activity + WHERE pid != pg_backend_pid() + GROUP BY usename, state + ORDER BY usename, count DESC + "#; + + let rows = client + .query(query, &[]) + .await + .context("Failed to get connections by user")?; + + // Aggregate into UserConnections + let mut user_map: HashMap = HashMap::new(); + for row in rows { + let username: String = row.get("username"); + let state: String = row.get("state"); + let count: i32 = row.get("count"); + + let entry = user_map.entry(username.clone()).or_insert_with(|| UserConnections { + username, + count: 0, + by_state: HashMap::new(), + }); + entry.count += count; + entry.by_state.insert(state, count); + } + + let mut users: Vec<_> = user_map.into_values().collect(); + users.sort_by(|a, b| b.count.cmp(&a.count)); + Ok(users) +} + +/// Get connection counts grouped by database +async fn get_connections_by_database(client: &Client) -> Result> { + let query = r#" + SELECT + COALESCE(datname, '') as database, + COALESCE(state, 'null') as state, + COUNT(*)::int as count + FROM pg_stat_activity + WHERE pid != pg_backend_pid() + GROUP BY datname, state + ORDER BY datname, count DESC + "#; + + let rows = client + .query(query, &[]) + .await + .context("Failed to get connections by database")?; + + // Aggregate into DatabaseConnections + let mut db_map: HashMap = HashMap::new(); + for row in rows { + let database: String = row.get("database"); + let state: String = row.get("state"); + let count: i32 = row.get("count"); + + let entry = db_map.entry(database.clone()).or_insert_with(|| DatabaseConnections { + database, + count: 0, + by_state: HashMap::new(), + }); + entry.count += count; + entry.by_state.insert(state, count); + } + + let mut dbs: Vec<_> = db_map.into_values().collect(); + dbs.sort_by(|a, b| b.count.cmp(&a.count)); + Ok(dbs) +} + +/// Get connection counts grouped by application +async fn get_connections_by_application(client: &Client) -> Result> { + let query = r#" + SELECT + COALESCE(NULLIF(application_name, ''), '') as app_name, + COALESCE(state, 'null') as state, + COUNT(*)::int as count + FROM pg_stat_activity + WHERE pid != pg_backend_pid() + GROUP BY application_name, state + ORDER BY application_name, count DESC + "#; + + let rows = client + .query(query, &[]) + .await + .context("Failed to get connections by application")?; + + // Aggregate into ApplicationConnections + let mut app_map: HashMap = HashMap::new(); + for row in rows { + let application_name: String = row.get("app_name"); + let state: String = row.get("state"); + let count: i32 = row.get("count"); + + let entry = app_map + .entry(application_name.clone()) + .or_insert_with(|| ApplicationConnections { + application_name, + count: 0, + by_state: HashMap::new(), + }); + entry.count += count; + entry.by_state.insert(state, count); + } + + let mut apps: Vec<_> = app_map.into_values().collect(); + apps.sort_by(|a, b| b.count.cmp(&a.count)); + Ok(apps) +} + +/// Run full connections analysis +pub async fn run_connections( + client: &Client, + include_by_user: bool, + include_by_database: bool, + include_by_application: bool, +) -> Result { + let max_connections = get_max_connections(client).await?; + let reserved_connections = get_reserved_connections(client).await?; + let total = get_total_connections(client).await?; + let by_state = get_connections_by_state(client).await?; + + let available = max_connections - reserved_connections; + let usage_pct = if available > 0 { + (100.0 * total as f64) / available as f64 + } else { + 0.0 + }; + let status = ConnectionStatus::from_pct(usage_pct); + + let stats = ConnectionStats { + total, + max_connections, + reserved_connections, + available, + usage_pct, + by_state, + status, + }; + + let by_user = if include_by_user { + Some(get_connections_by_user(client).await?) + } else { + None + }; + + let by_database = if include_by_database { + Some(get_connections_by_database(client).await?) + } else { + None + }; + + let by_application = if include_by_application { + Some(get_connections_by_application(client).await?) + } else { + None + }; + + Ok(ConnectionsResult { + stats, + by_user, + by_database, + by_application, + overall_status: status, + }) +} + +/// Print connections in human-readable format +pub fn print_human(result: &ConnectionsResult, quiet: bool) { + let stats = &result.stats; + + println!("CONNECTIONS:"); + println!(); + println!( + " {} {}/{} ({:.1}%)", + stats.status.emoji(), + stats.total, + stats.available, + stats.usage_pct + ); + println!(); + println!( + " max_connections: {}", + stats.max_connections + ); + println!( + " superuser_reserved: {}", + stats.reserved_connections + ); + println!( + " available: {}", + stats.available + ); + println!(" in use: {}", stats.total); + + // State breakdown + if !stats.by_state.is_empty() { + println!(); + println!(" BY STATE:"); + let mut states: Vec<_> = stats.by_state.iter().collect(); + states.sort_by(|a, b| b.1.cmp(a.1)); + for (state, count) in states { + let state_display = if state == "null" { "null (backend)" } else { state }; + println!(" {:30} {:>5}", state_display, count); + } + } + + // By user breakdown + if let Some(ref users) = result.by_user { + println!(); + println!(" BY USER:"); + for user in users.iter().take(10) { + let states: String = user + .by_state + .iter() + .map(|(s, c)| format!("{}:{}", s, c)) + .collect::>() + .join(", "); + println!(" {:30} {:>5} ({})", user.username, user.count, states); + } + if users.len() > 10 { + println!(" ... and {} more users", users.len() - 10); + } + } + + // By database breakdown + if let Some(ref dbs) = result.by_database { + println!(); + println!(" BY DATABASE:"); + for db in dbs.iter().take(10) { + let states: String = db + .by_state + .iter() + .map(|(s, c)| format!("{}:{}", s, c)) + .collect::>() + .join(", "); + println!(" {:30} {:>5} ({})", db.database, db.count, states); + } + if dbs.len() > 10 { + println!(" ... and {} more databases", dbs.len() - 10); + } + } + + // By application breakdown + if let Some(ref apps) = result.by_application { + println!(); + println!(" BY APPLICATION:"); + for app in apps.iter().take(10) { + let app_name = if app.application_name.chars().count() > 30 { + format!("{}...", app.application_name.chars().take(27).collect::()) + } else { + app.application_name.clone() + }; + println!(" {:30} {:>5}", app_name, app.count); + } + if apps.len() > 10 { + println!(" ... and {} more applications", apps.len() - 10); + } + } + + // Warnings + if !quiet { + match stats.status { + ConnectionStatus::Critical => { + println!(); + println!(" ✗ CRITICAL: Connection usage >{}%", CONN_CRITICAL_PCT); + println!(" Consider increasing max_connections or investigating connection leaks"); + } + ConnectionStatus::Warning => { + println!(); + println!(" ⚠ WARNING: Connection usage >{}%", CONN_WARNING_PCT); + println!(" Monitor for potential connection exhaustion"); + } + ConnectionStatus::Healthy => {} + } + + // Check for idle in transaction + if let Some(idle_in_tx) = stats.by_state.get("idle in transaction") { + if *idle_in_tx > 5 { + println!(); + println!( + " ⚠ {} connections idle in transaction (potential lock holders)", + idle_in_tx + ); + } + } + } +} + +/// Print connections as JSON with schema versioning +pub fn print_json( + result: &ConnectionsResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{schema, DiagnosticOutput, Severity}; + + let severity = match result.overall_status { + ConnectionStatus::Healthy => Severity::Healthy, + ConnectionStatus::Warning => Severity::Warning, + ConnectionStatus::Critical => Severity::Critical, + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts(schema::CONNECTIONS, result, severity, t), + None => DiagnosticOutput::new(schema::CONNECTIONS, result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_connection_status_healthy() { + assert_eq!(ConnectionStatus::from_pct(50.0), ConnectionStatus::Healthy); + } + + #[test] + fn test_connection_status_warning() { + assert_eq!(ConnectionStatus::from_pct(80.0), ConnectionStatus::Warning); + } + + #[test] + fn test_connection_status_critical() { + assert_eq!(ConnectionStatus::from_pct(95.0), ConnectionStatus::Critical); + } + + #[test] + fn test_connection_status_boundary() { + assert_eq!(ConnectionStatus::from_pct(74.9), ConnectionStatus::Healthy); + assert_eq!(ConnectionStatus::from_pct(75.0), ConnectionStatus::Warning); + assert_eq!(ConnectionStatus::from_pct(89.9), ConnectionStatus::Warning); + assert_eq!(ConnectionStatus::from_pct(90.0), ConnectionStatus::Critical); + } +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 64bcd18..58755fb 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -6,6 +6,7 @@ mod anonymize; pub mod bloat; mod bootstrap; pub mod capabilities; +pub mod connections; pub mod context; mod db; mod doctor; @@ -15,6 +16,7 @@ pub mod indexes; pub mod locks; mod migrations; pub mod model; +pub mod queries; pub mod replication; mod role; mod schema; diff --git a/src/commands/queries.rs b/src/commands/queries.rs new file mode 100644 index 0000000..e7be033 --- /dev/null +++ b/src/commands/queries.rs @@ -0,0 +1,427 @@ +//! Queries command: Top queries from pg_stat_statements. +//! +//! Shows the most expensive queries by execution time, helping identify +//! performance bottlenecks and optimization opportunities. + +use anyhow::{Context, Result}; +use serde::Serialize; +use tokio_postgres::Client; + +/// Status thresholds (in milliseconds) +const QUERY_WARNING_MS: f64 = 1000.0; // 1 second mean time +const QUERY_CRITICAL_MS: f64 = 5000.0; // 5 seconds mean time + +/// Sort order for query results +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum QuerySortBy { + /// Sort by total execution time (default) + #[default] + TotalTime, + /// Sort by mean execution time per call + MeanTime, + /// Sort by number of calls + Calls, +} + +impl QuerySortBy { + pub fn from_str(s: &str) -> Option { + match s.to_lowercase().as_str() { + "total" | "total_time" => Some(QuerySortBy::TotalTime), + "mean" | "mean_time" | "avg" => Some(QuerySortBy::MeanTime), + "calls" | "count" => Some(QuerySortBy::Calls), + _ => None, + } + } + + fn order_by_clause(&self) -> &'static str { + match self { + QuerySortBy::TotalTime => "total_exec_time DESC", + QuerySortBy::MeanTime => "mean_exec_time DESC", + QuerySortBy::Calls => "calls DESC", + } + } +} + +/// Query information from pg_stat_statements +#[derive(Debug, Clone, Serialize)] +pub struct QueryInfo { + pub queryid: i64, + pub query: String, + pub calls: i64, + pub total_exec_time_ms: f64, + pub mean_exec_time_ms: f64, + pub rows: i64, + pub cache_hit_ratio: Option, + pub status: QueryStatus, +} + +/// Query status level +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum QueryStatus { + Healthy, + Warning, + Critical, +} + +impl QueryStatus { + pub fn from_mean_time(mean_ms: f64) -> Self { + if mean_ms >= QUERY_CRITICAL_MS { + QueryStatus::Critical + } else if mean_ms >= QUERY_WARNING_MS { + QueryStatus::Warning + } else { + QueryStatus::Healthy + } + } + + pub fn emoji(&self) -> &'static str { + match self { + QueryStatus::Healthy => "✓", + QueryStatus::Warning => "⚠", + QueryStatus::Critical => "✗", + } + } +} + +/// Full queries results +#[derive(Debug, Serialize)] +pub struct QueriesResult { + pub queries: Vec, + pub overall_status: QueryStatus, + pub extension_available: bool, + pub stats_since: Option, + pub total_queries_tracked: i64, +} + +/// Check if pg_stat_statements extension is installed and accessible +pub async fn check_extension(client: &Client) -> Result { + let query = r#" + SELECT EXISTS ( + SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements' + ) + "#; + + let row = client + .query_one(query, &[]) + .await + .context("Failed to check pg_stat_statements extension")?; + + Ok(row.get::<_, bool>(0)) +} + +/// Get stats reset time if available +async fn get_stats_since(client: &Client) -> Option { + // pg_stat_statements_reset() time isn't directly available + // We use pg_stat_statements_info if on PG14+, otherwise skip + let query = r#" + SELECT stats_reset::text + FROM pg_stat_statements_info + LIMIT 1 + "#; + + client + .query_opt(query, &[]) + .await + .ok() + .flatten() + .and_then(|row| row.get::<_, Option>(0)) +} + +/// Get total count of tracked queries +async fn get_total_queries(client: &Client) -> i64 { + let query = "SELECT COUNT(*)::bigint FROM pg_stat_statements"; + client + .query_one(query, &[]) + .await + .map(|r| r.get::<_, i64>(0)) + .unwrap_or(0) +} + +/// Get top queries from pg_stat_statements +pub async fn get_queries( + client: &Client, + sort_by: QuerySortBy, + limit: usize, +) -> Result> { + // Build query with dynamic ORDER BY + // Using pg_stat_statements columns available in PG13+ + let query = format!( + r#" + SELECT + queryid, + LEFT(query, 500) as query, + calls, + total_exec_time as total_exec_time_ms, + mean_exec_time as mean_exec_time_ms, + rows, + CASE + WHEN shared_blks_hit + shared_blks_read > 0 + THEN (100.0 * shared_blks_hit / (shared_blks_hit + shared_blks_read))::double precision + ELSE NULL + END as cache_hit_ratio + FROM pg_stat_statements + WHERE query NOT LIKE '%pg_stat_statements%' + ORDER BY {} + LIMIT $1 + "#, + sort_by.order_by_clause() + ); + + let rows = client + .query(&query, &[&(limit as i64)]) + .await + .context("Failed to query pg_stat_statements")?; + + let mut queries = Vec::new(); + for row in rows { + let mean_exec_time_ms: f64 = row.get("mean_exec_time_ms"); + queries.push(QueryInfo { + queryid: row.get("queryid"), + query: row.get("query"), + calls: row.get("calls"), + total_exec_time_ms: row.get("total_exec_time_ms"), + mean_exec_time_ms, + rows: row.get("rows"), + cache_hit_ratio: row.get("cache_hit_ratio"), + status: QueryStatus::from_mean_time(mean_exec_time_ms), + }); + } + + Ok(queries) +} + +/// Run full queries analysis +pub async fn run_queries( + client: &Client, + sort_by: QuerySortBy, + limit: usize, +) -> Result { + // Check extension availability first + let extension_available = check_extension(client).await?; + + if !extension_available { + return Ok(QueriesResult { + queries: vec![], + overall_status: QueryStatus::Healthy, + extension_available: false, + stats_since: None, + total_queries_tracked: 0, + }); + } + + let queries = get_queries(client, sort_by, limit).await?; + let stats_since = get_stats_since(client).await; + let total_queries_tracked = get_total_queries(client).await; + + // Overall status is worst of query statuses + let overall_status = queries + .iter() + .map(|q| &q.status) + .max_by_key(|s| match s { + QueryStatus::Healthy => 0, + QueryStatus::Warning => 1, + QueryStatus::Critical => 2, + }) + .cloned() + .unwrap_or(QueryStatus::Healthy); + + Ok(QueriesResult { + queries, + overall_status, + extension_available: true, + stats_since, + total_queries_tracked, + }) +} + +/// Format duration in human-readable form +fn format_duration_ms(ms: f64) -> String { + if ms >= 60000.0 { + format!("{:.1}m", ms / 60000.0) + } else if ms >= 1000.0 { + format!("{:.2}s", ms / 1000.0) + } else { + format!("{:.1}ms", ms) + } +} + +/// Format large numbers for display +fn format_number(n: i64) -> String { + if n >= 1_000_000_000 { + format!("{:.1}B", n as f64 / 1_000_000_000.0) + } else if n >= 1_000_000 { + format!("{:.1}M", n as f64 / 1_000_000.0) + } else if n >= 1_000 { + format!("{:.1}K", n as f64 / 1_000.0) + } else { + format!("{}", n) + } +} + +/// Truncate query for display +fn truncate_query(query: &str, max_len: usize) -> String { + let clean = query.replace('\n', " ").replace(" ", " "); + // Use chars for UTF-8 safe truncation + if clean.chars().count() <= max_len { + clean + } else { + format!("{}...", clean.chars().take(max_len - 3).collect::()) + } +} + +/// Print queries in human-readable format +pub fn print_human(result: &QueriesResult, quiet: bool) { + if !result.extension_available { + if !quiet { + println!("pg_stat_statements extension not installed."); + println!(); + println!("To enable query tracking:"); + println!(" 1. Add to postgresql.conf: shared_preload_libraries = 'pg_stat_statements'"); + println!(" 2. Restart PostgreSQL"); + println!(" 3. Run: CREATE EXTENSION pg_stat_statements;"); + } + return; + } + + if result.queries.is_empty() { + if !quiet { + println!("No queries recorded in pg_stat_statements."); + } + return; + } + + println!("TOP QUERIES:"); + if let Some(ref since) = result.stats_since { + println!(" Stats since: {}", since); + } + println!(" Total tracked: {}", format_number(result.total_queries_tracked)); + println!(); + + // Header + println!( + " {:3} {:>10} {:>10} {:>10} {:>8} {:>6} QUERY", + "", "CALLS", "TOTAL", "MEAN", "ROWS", "CACHE" + ); + println!(" {}", "-".repeat(90)); + + for query in &result.queries { + let cache_str = query + .cache_hit_ratio + .map(|r| format!("{:.0}%", r)) + .unwrap_or_else(|| "-".to_string()); + + println!( + " {} {:>10} {:>10} {:>10} {:>8} {:>6} {}", + query.status.emoji(), + format_number(query.calls), + format_duration_ms(query.total_exec_time_ms), + format_duration_ms(query.mean_exec_time_ms), + format_number(query.rows), + cache_str, + truncate_query(&query.query, 50) + ); + } + + // Summary of issues + let warning_count = result + .queries + .iter() + .filter(|q| q.status == QueryStatus::Warning) + .count(); + let critical_count = result + .queries + .iter() + .filter(|q| q.status == QueryStatus::Critical) + .count(); + + if warning_count > 0 || critical_count > 0 { + println!(); + if critical_count > 0 { + println!( + " ✗ {} queries with mean time >{}s (CRITICAL)", + critical_count, + QUERY_CRITICAL_MS / 1000.0 + ); + } + if warning_count > 0 { + println!( + " ⚠ {} queries with mean time >{}s (WARNING)", + warning_count, + QUERY_WARNING_MS / 1000.0 + ); + } + } +} + +/// Print queries as JSON with schema versioning +pub fn print_json( + result: &QueriesResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{schema, DiagnosticOutput, Severity}; + + let severity = if !result.extension_available { + // Extension not available - report as healthy (not an error condition) + Severity::Healthy + } else { + match result.overall_status { + QueryStatus::Healthy => Severity::Healthy, + QueryStatus::Warning => Severity::Warning, + QueryStatus::Critical => Severity::Critical, + } + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts(schema::QUERIES, result, severity, t), + None => DiagnosticOutput::new(schema::QUERIES, result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_query_status_healthy() { + assert_eq!(QueryStatus::from_mean_time(500.0), QueryStatus::Healthy); + } + + #[test] + fn test_query_status_warning() { + assert_eq!(QueryStatus::from_mean_time(1500.0), QueryStatus::Warning); + } + + #[test] + fn test_query_status_critical() { + assert_eq!(QueryStatus::from_mean_time(6000.0), QueryStatus::Critical); + } + + #[test] + fn test_format_duration_ms() { + assert_eq!(format_duration_ms(500.0), "500.0ms"); + assert_eq!(format_duration_ms(1500.0), "1.50s"); + assert_eq!(format_duration_ms(90000.0), "1.5m"); + } + + #[test] + fn test_sort_by_from_str() { + assert_eq!(QuerySortBy::from_str("total"), Some(QuerySortBy::TotalTime)); + assert_eq!(QuerySortBy::from_str("mean"), Some(QuerySortBy::MeanTime)); + assert_eq!(QuerySortBy::from_str("calls"), Some(QuerySortBy::Calls)); + assert_eq!(QuerySortBy::from_str("invalid"), None); + } + + #[test] + fn test_truncate_query() { + let short = "SELECT 1"; + assert_eq!(truncate_query(short, 20), "SELECT 1"); + + let long = "SELECT * FROM users WHERE id = 1 AND name = 'test'"; + let result = truncate_query(long, 20); + assert!(result.ends_with("...")); + assert!(result.chars().count() <= 20); + } +} diff --git a/src/main.rs b/src/main.rs index ed2e257..12aabe1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,6 +78,8 @@ fn json_supported(command: &Commands) -> bool { Commands::Vacuum { .. } => true, Commands::Bloat { .. } => true, Commands::Replication => true, + Commands::Queries { .. } => true, + Commands::Connections { .. } => true, Commands::Fix { .. } => true, Commands::Context => true, Commands::Capabilities => true, @@ -360,6 +362,27 @@ enum Commands { }, /// Monitor streaming replication health Replication, + /// Show top queries from pg_stat_statements + Queries { + /// Sort by: total (default), mean, calls + #[arg(long, value_name = "FIELD")] + by: Option, + /// Number of queries to show (default: 10) + #[arg(long, default_value = "10")] + limit: usize, + }, + /// Analyze connection usage vs max_connections + Connections { + /// Group by user + #[arg(long)] + by_user: bool, + /// Group by database + #[arg(long)] + by_database: bool, + /// Group by application + #[arg(long)] + by_application: bool, + }, /// Fix commands for remediation Fix { #[command(subcommand)] @@ -1406,6 +1429,99 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { commands::replication::ReplicationStatus::Healthy => {} } } + Commands::Queries { ref by, limit } => { + let config = + Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; + let conn_result = connection::resolve_and_validate( + &config, + cli.database_url.as_deref(), + cli.connection.as_deref(), + cli.env_var.as_deref(), + cli.allow_primary, + cli.read_write, + cli.quiet, + )?; + + let timeout_config = parse_timeout_config(&cli)?; + let session = DiagnosticSession::connect(&conn_result.url, timeout_config).await?; + setup_ctrlc_handler(session.cancel_token()); + + if !cli.quiet && !cli.json { + eprintln!("pgcrate: timeouts: {}", session.effective_timeouts()); + } + + // Parse sort order + let sort_by = by + .as_ref() + .map(|s| { + commands::queries::QuerySortBy::from_str(s).ok_or_else(|| { + anyhow::anyhow!("Invalid --by value '{}'. Use: total, mean, calls", s) + }) + }) + .transpose()? + .unwrap_or_default(); + + let result = commands::queries::run_queries(session.client(), sort_by, limit).await?; + + if cli.json { + commands::queries::print_json(&result, Some(session.effective_timeouts()))?; + } else { + commands::queries::print_human(&result, cli.quiet); + } + + // Exit with appropriate code + match result.overall_status { + commands::queries::QueryStatus::Critical => std::process::exit(2), + commands::queries::QueryStatus::Warning => std::process::exit(1), + commands::queries::QueryStatus::Healthy => {} + } + } + Commands::Connections { + by_user, + by_database, + by_application, + } => { + let config = + Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; + let conn_result = connection::resolve_and_validate( + &config, + cli.database_url.as_deref(), + cli.connection.as_deref(), + cli.env_var.as_deref(), + cli.allow_primary, + cli.read_write, + cli.quiet, + )?; + + let timeout_config = parse_timeout_config(&cli)?; + let session = DiagnosticSession::connect(&conn_result.url, timeout_config).await?; + setup_ctrlc_handler(session.cancel_token()); + + if !cli.quiet && !cli.json { + eprintln!("pgcrate: timeouts: {}", session.effective_timeouts()); + } + + let result = commands::connections::run_connections( + session.client(), + by_user, + by_database, + by_application, + ) + .await?; + + if cli.json { + commands::connections::print_json(&result, Some(session.effective_timeouts()))?; + } else { + commands::connections::print_human(&result, cli.quiet); + } + + // Exit with appropriate code + match result.overall_status { + commands::connections::ConnectionStatus::Critical => std::process::exit(2), + commands::connections::ConnectionStatus::Warning => std::process::exit(1), + commands::connections::ConnectionStatus::Healthy => {} + } + } Commands::Fix { ref command } => { let config = Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; @@ -2269,6 +2385,8 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { | Commands::Vacuum { .. } | Commands::Bloat { .. } | Commands::Replication + | Commands::Queries { .. } + | Commands::Connections { .. } | Commands::Fix { .. } | Commands::Context | Commands::Capabilities diff --git a/src/output.rs b/src/output.rs index a729a00..2661610 100644 --- a/src/output.rs +++ b/src/output.rs @@ -429,6 +429,8 @@ pub mod schema { pub const REPLICATION: &str = "pgcrate.diagnostics.replication"; pub const CONTEXT: &str = "pgcrate.diagnostics.context"; pub const CAPABILITIES: &str = "pgcrate.diagnostics.capabilities"; + pub const QUERIES: &str = "pgcrate.diagnostics.queries"; + pub const CONNECTIONS: &str = "pgcrate.diagnostics.connections"; } // =============================================================================