diff --git a/CHANGELOG.md b/CHANGELOG.md index dbc0b6d..a3fac8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,37 @@ ## Unreleased +**Phase 3a: Query Performance Diagnostics** + +Complete the "why is prod slow?" workflow with query and connection analysis. + +### New Commands + +- **`pgcrate queries`**: Top queries from pg_stat_statements by execution time, mean time, or call count +- **`pgcrate connections`**: Connection usage analysis vs max_connections with breakdown by state, user, and database + +### Queries Command Features + +- **Sort options**: `--by total` (default), `--by mean`, `--by calls` +- **Limit control**: `--limit N` (default: 10) or `--all` for all tracked queries +- **Graceful degradation**: Works without pg_stat_statements extension (shows unavailable message) +- **Cache hit ratio**: Shows buffer cache efficiency per query +- **Status thresholds**: Critical (mean > 5s), Warning (mean > 1s), Healthy (< 1s) + +### Connections Command Features + +- **Usage analysis**: Total connections vs max_connections with percentage +- **State breakdown**: Connections grouped by state (active, idle, idle in transaction) +- **Grouping options**: `--by-user` and `--by-database` for detailed analysis +- **Status thresholds**: Critical (> 90%), Warning (> 75%), Healthy (< 75%) + +### Capabilities + +- `diagnostics.queries` capability added (requires pg_stat_statements extension) +- `diagnostics.connections` capability added (always available via pg_stat_activity) + +--- + **Phase 2a: Fix Commands** Complete the diagnose→fix→verify loop with safe remediation commands. diff --git a/README.md b/README.md index 11d40ff..4155715 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,8 @@ pgcrate xid # Transaction ID wraparound analysis pgcrate sequences # Sequence exhaustion check pgcrate indexes # Missing, unused, duplicate indexes pgcrate vacuum # Table bloat and vacuum health +pgcrate queries # Top queries from pg_stat_statements +pgcrate connections # Connection usage vs max_connections ``` All diagnostic commands support timeout flags for production safety: @@ -309,6 +311,8 @@ DROP TABLE users; | `pgcrate sequences` | Sequence exhaustion check | | `pgcrate indexes` | Missing, unused, duplicate indexes | | `pgcrate vacuum` | Table bloat and vacuum health | +| `pgcrate queries` | Top queries from pg_stat_statements | +| `pgcrate connections` | Connection usage vs max_connections | | `pgcrate fix sequence` | Upgrade sequence type to prevent exhaustion | | `pgcrate fix index` | Drop unused/duplicate indexes | | `pgcrate fix vacuum` | Run VACUUM on tables | diff --git a/llms.txt b/llms.txt index 0a35f45..479a8fc 100644 --- a/llms.txt +++ b/llms.txt @@ -752,6 +752,13 @@ pgcrate xid # Transaction ID wraparound analysis pgcrate sequences # Sequence exhaustion check pgcrate indexes # Missing, unused, duplicate indexes pgcrate vacuum # Table bloat and vacuum health +pgcrate queries # Top queries from pg_stat_statements +pgcrate queries --by mean # Sort by mean execution time +pgcrate queries --by calls # Sort by call count +pgcrate queries --limit 20 # Show more queries +pgcrate connections # Connection usage vs max_connections +pgcrate connections --by-user # Group by user +pgcrate connections --by-database # Group by database # Connection context pgcrate context --json # Connection info, server version, privileges @@ -837,6 +844,8 @@ Currently, `--json` is supported for these commands: - `sequences` - Sequence exhaustion check - `indexes` - Index health analysis - `vacuum` - Table bloat analysis +- `queries` - Query performance analysis +- `connections` - Connection usage analysis - `fix sequence` - Sequence upgrade result - `fix index` - Index drop result - `fix vacuum` - Vacuum result diff --git a/src/commands/capabilities.rs b/src/commands/capabilities.rs index 6c35c97..b8e9a9a 100644 --- a/src/commands/capabilities.rs +++ b/src/commands/capabilities.rs @@ -116,8 +116,10 @@ pub async fn run_capabilities(client: &Client, read_only: bool) -> Result CapabilityInfo { )], ) } else { - // Even with the extension, this capability is not yet implemented + (CapabilityStatus::Available, vec![]) + }; + + CapabilityInfo { + id: "diagnostics.queries", + name: "Query Analysis", + description: "Slow query identification (pg_stat_statements)", + status, + reasons, + requirements, + limitations: vec![], + } +} + +fn check_connections_capability(has_pg_stat_activity: bool) -> CapabilityInfo { + let requirements = vec![Requirement { + what: "pg_stat_activity SELECT".to_string(), + met: has_pg_stat_activity, + }]; + + let (status, reasons) = if !has_pg_stat_activity { ( CapabilityStatus::Unavailable, vec![ReasonInfo::new( - ReasonCode::NotApplicable, - "Query analysis not yet implemented (planned for Phase 3)", + ReasonCode::MissingPrivilege, + "Cannot read pg_stat_activity", )], ) + } else { + (CapabilityStatus::Available, vec![]) }; CapabilityInfo { - id: "diagnostics.queries", - name: "Query Analysis", - description: "Slow query identification (pg_stat_statements)", + id: "diagnostics.connections", + name: "Connections", + description: "Connection usage analysis", status, reasons, requirements, - limitations: vec!["Not yet implemented".to_string()], + limitations: vec![], } } diff --git a/src/commands/connections.rs b/src/commands/connections.rs new file mode 100644 index 0000000..4bd0e14 --- /dev/null +++ b/src/commands/connections.rs @@ -0,0 +1,414 @@ +//! Connections command: Connection analysis vs max_connections. +//! +//! Shows connection usage vs limits, with breakdowns by state, +//! user, and database. + +use anyhow::Result; +use serde::Serialize; +use std::collections::HashMap; +use tokio_postgres::Client; + +/// Status level for connection usage +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum ConnectionStatus { + Healthy, + Warning, + Critical, +} + +impl ConnectionStatus { + /// Derive status from usage percentage. + /// - Critical: > 90% + /// - Warning: > 75% + /// - Healthy: <= 75% + pub fn from_usage_pct(pct: f64) -> Self { + if pct > 90.0 { + ConnectionStatus::Critical + } else if pct > 75.0 { + ConnectionStatus::Warning + } else { + ConnectionStatus::Healthy + } + } + + pub fn emoji(&self) -> &'static str { + match self { + ConnectionStatus::Healthy => "✓", + ConnectionStatus::Warning => "⚠", + ConnectionStatus::Critical => "✗", + } + } +} + +/// Connection statistics summary +#[derive(Debug, Clone, Serialize)] +pub struct ConnectionStats { + pub total: i32, + pub max_connections: i32, + pub usage_pct: f64, + pub reserved_superuser: i32, + pub available: i32, + pub by_state: HashMap, + pub status: ConnectionStatus, +} + +/// Connections grouped by user +#[derive(Debug, Clone, Serialize)] +pub struct UserConnections { + pub username: String, + pub total: i32, + pub by_state: HashMap, +} + +/// Connections grouped by database +#[derive(Debug, Clone, Serialize)] +pub struct DatabaseConnections { + pub database: String, + pub total: i32, + pub by_state: HashMap, +} + +/// Full connections results +#[derive(Debug, Serialize)] +pub struct ConnectionsResult { + pub stats: ConnectionStats, + #[serde(skip_serializing_if = "Option::is_none")] + pub by_user: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub by_database: Option>, + pub overall_status: ConnectionStatus, +} + +async fn get_max_connections(client: &Client) -> Result { + let row = client + .query_one( + "SELECT setting::int FROM pg_settings WHERE name = 'max_connections'", + &[], + ) + .await?; + Ok(row.get(0)) +} + +async fn get_reserved_superuser(client: &Client) -> Result { + let row = client + .query_one( + "SELECT setting::int FROM pg_settings WHERE name = 'superuser_reserved_connections'", + &[], + ) + .await?; + Ok(row.get(0)) +} + +/// Get connection statistics +pub async fn get_connections( + client: &Client, + include_by_user: bool, + include_by_database: bool, +) -> Result { + let max_connections = get_max_connections(client).await?; + let reserved_superuser = get_reserved_superuser(client).await?; + + // Get connection counts by state + let state_query = r#" + SELECT + COALESCE(state, 'unknown') as state, + count(*)::int as count + FROM pg_stat_activity + WHERE pid != pg_backend_pid() + GROUP BY state + ORDER BY count DESC + "#; + + let state_rows = client.query(state_query, &[]).await?; + + let mut by_state: HashMap = HashMap::new(); + let mut total = 0i32; + + for row in state_rows { + let state: String = row.get("state"); + let count: i32 = row.get("count"); + by_state.insert(state, count); + total += count; + } + + // Add 1 for our own connection + total += 1; + + let usage_pct = if max_connections > 0 { + 100.0 * total as f64 / max_connections as f64 + } else { + 0.0 + }; + + let available = max_connections - reserved_superuser - total; + let status = ConnectionStatus::from_usage_pct(usage_pct); + + let stats = ConnectionStats { + total, + max_connections, + usage_pct, + reserved_superuser, + available: available.max(0), + by_state, + status, + }; + + // Get by user if requested + let by_user = if include_by_user { + let user_query = r#" + SELECT + COALESCE(usename, 'unknown') as username, + COALESCE(state, 'unknown') as state, + count(*)::int as count + FROM pg_stat_activity + WHERE pid != pg_backend_pid() + GROUP BY usename, state + ORDER BY usename, count DESC + "#; + + let user_rows = client.query(user_query, &[]).await?; + + let mut user_map: HashMap = HashMap::new(); + + for row in user_rows { + let username: String = row.get("username"); + let state: String = row.get("state"); + let count: i32 = row.get("count"); + + let entry = user_map.entry(username.clone()).or_insert(UserConnections { + username, + total: 0, + by_state: HashMap::new(), + }); + + entry.total += count; + entry.by_state.insert(state, count); + } + + let mut users: Vec<_> = user_map.into_values().collect(); + users.sort_by(|a, b| b.total.cmp(&a.total)); + Some(users) + } else { + None + }; + + // Get by database if requested + let by_database = if include_by_database { + let db_query = r#" + SELECT + COALESCE(datname, 'unknown') as database, + COALESCE(state, 'unknown') as state, + count(*)::int as count + FROM pg_stat_activity + WHERE pid != pg_backend_pid() + GROUP BY datname, state + ORDER BY datname, count DESC + "#; + + let db_rows = client.query(db_query, &[]).await?; + + let mut db_map: HashMap = HashMap::new(); + + for row in db_rows { + let database: String = row.get("database"); + let state: String = row.get("state"); + let count: i32 = row.get("count"); + + let entry = db_map + .entry(database.clone()) + .or_insert(DatabaseConnections { + database, + total: 0, + by_state: HashMap::new(), + }); + + entry.total += count; + entry.by_state.insert(state, count); + } + + let mut dbs: Vec<_> = db_map.into_values().collect(); + dbs.sort_by(|a, b| b.total.cmp(&a.total)); + Some(dbs) + } else { + None + }; + + Ok(ConnectionsResult { + stats, + by_user, + by_database, + overall_status: status, + }) +} + +/// Print connections in human-readable format +pub fn print_human(result: &ConnectionsResult, quiet: bool) { + let stats = &result.stats; + + println!("CONNECTIONS:"); + println!(); + + // Summary line + println!( + " {} {}/{} connections ({:.1}%)", + stats.status.emoji(), + stats.total, + stats.max_connections, + stats.usage_pct + ); + println!( + " Reserved for superuser: {}, Available: {}", + stats.reserved_superuser, stats.available + ); + println!(); + + // By state breakdown + println!(" BY STATE:"); + let mut states: Vec<_> = stats.by_state.iter().collect(); + states.sort_by(|a, b| b.1.cmp(a.1)); + + for (state, count) in states { + let state_display = match state.as_str() { + "idle in transaction (aborted)" => "idle in tx (aborted)", + s => s, + }; + println!(" {:25} {:>5}", state_display, count); + } + + // By user if available + if let Some(ref by_user) = result.by_user { + println!(); + println!(" BY USER:"); + for user in by_user.iter().take(10) { + let states_str: String = user + .by_state + .iter() + .map(|(s, c)| format!("{}:{}", abbreviate_state(s), c)) + .collect::>() + .join(", "); + println!( + " {:25} {:>5} ({})", + user.username, user.total, states_str + ); + } + if by_user.len() > 10 && !quiet { + println!(" ... and {} more users", by_user.len() - 10); + } + } + + // By database if available + if let Some(ref by_database) = result.by_database { + println!(); + println!(" BY DATABASE:"); + for db in by_database.iter().take(10) { + let states_str: String = db + .by_state + .iter() + .map(|(s, c)| format!("{}:{}", abbreviate_state(s), c)) + .collect::>() + .join(", "); + println!(" {:25} {:>5} ({})", db.database, db.total, states_str); + } + if by_database.len() > 10 && !quiet { + println!(" ... and {} more databases", by_database.len() - 10); + } + } + + // Status-based recommendations + if stats.status == ConnectionStatus::Critical { + println!(); + println!(" CRITICAL: Connection pool near exhaustion!"); + println!(" Recommendations:"); + println!(" - Check for connection leaks in application code"); + println!(" - Review idle connections that could be closed"); + println!(" - Consider using a connection pooler (PgBouncer, pgpool)"); + println!(" - Increase max_connections if resources allow"); + } else if stats.status == ConnectionStatus::Warning { + println!(); + println!(" WARNING: Connection usage above 75%"); + println!(" Monitor closely and consider optimization."); + } +} + +fn abbreviate_state(state: &str) -> &str { + match state { + "active" => "act", + "idle" => "idl", + "idle in transaction" => "itx", + "idle in transaction (aborted)" => "abt", + "fastpath function call" => "fpc", + "disabled" => "dis", + _ => state, + } +} + +/// Print connections as JSON with schema versioning +pub fn print_json( + result: &ConnectionsResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{schema, DiagnosticOutput, Severity}; + + // Convert ConnectionStatus to Severity + let severity = match result.overall_status { + ConnectionStatus::Healthy => Severity::Healthy, + ConnectionStatus::Warning => Severity::Warning, + ConnectionStatus::Critical => Severity::Critical, + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts(schema::CONNECTIONS, result, severity, t), + None => DiagnosticOutput::new(schema::CONNECTIONS, result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_connection_status_healthy() { + assert_eq!( + ConnectionStatus::from_usage_pct(50.0), + ConnectionStatus::Healthy + ); + assert_eq!( + ConnectionStatus::from_usage_pct(75.0), + ConnectionStatus::Healthy + ); + } + + #[test] + fn test_connection_status_warning() { + assert_eq!( + ConnectionStatus::from_usage_pct(76.0), + ConnectionStatus::Warning + ); + assert_eq!( + ConnectionStatus::from_usage_pct(90.0), + ConnectionStatus::Warning + ); + } + + #[test] + fn test_connection_status_critical() { + assert_eq!( + ConnectionStatus::from_usage_pct(91.0), + ConnectionStatus::Critical + ); + assert_eq!( + ConnectionStatus::from_usage_pct(100.0), + ConnectionStatus::Critical + ); + } + + #[test] + fn test_abbreviate_state() { + assert_eq!(abbreviate_state("active"), "act"); + assert_eq!(abbreviate_state("idle"), "idl"); + assert_eq!(abbreviate_state("idle in transaction"), "itx"); + } +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 6d5a7aa..d8410e5 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -5,6 +5,7 @@ mod anonymize; mod bootstrap; pub mod capabilities; +pub mod connections; pub mod context; mod db; mod doctor; @@ -14,6 +15,7 @@ pub mod indexes; pub mod locks; mod migrations; pub mod model; +pub mod queries; mod role; mod schema; mod seed; diff --git a/src/commands/queries.rs b/src/commands/queries.rs new file mode 100644 index 0000000..255124e --- /dev/null +++ b/src/commands/queries.rs @@ -0,0 +1,438 @@ +//! Queries command: Top queries from pg_stat_statements. +//! +//! Shows top queries by execution time, mean time, or call count. +//! Requires pg_stat_statements extension to be installed. + +use anyhow::Result; +use serde::Serialize; +use tokio_postgres::Client; + +/// Default limit when --all is specified (arbitrarily high) +pub const ALL_QUERIES_LIMIT: usize = 1000; + +/// Sort order for queries +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum QuerySortBy { + /// Sort by total execution time (default) + #[default] + TotalTime, + /// Sort by mean execution time per call + MeanTime, + /// Sort by number of calls + Calls, +} + +impl QuerySortBy { + pub fn from_str(s: &str) -> Option { + match s.to_lowercase().as_str() { + "total" | "total_time" => Some(Self::TotalTime), + "mean" | "mean_time" | "avg" => Some(Self::MeanTime), + "calls" | "count" => Some(Self::Calls), + _ => None, + } + } +} + +/// Status level for individual queries +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum QueryStatus { + Healthy, + Warning, + Critical, +} + +impl QueryStatus { + /// Derive status from mean execution time in milliseconds. + /// - Critical: > 5000ms (5 seconds) + /// - Warning: > 1000ms (1 second) + /// - Healthy: <= 1000ms + pub fn from_mean_time(mean_ms: f64) -> Self { + if mean_ms > 5000.0 { + QueryStatus::Critical + } else if mean_ms > 1000.0 { + QueryStatus::Warning + } else { + QueryStatus::Healthy + } + } + + pub fn emoji(&self) -> &'static str { + match self { + QueryStatus::Healthy => "✓", + QueryStatus::Warning => "⚠", + QueryStatus::Critical => "✗", + } + } +} + +/// Information about a single query +#[derive(Debug, Clone, Serialize)] +pub struct QueryInfo { + pub queryid: i64, + pub query: String, + pub calls: i64, + pub total_exec_time_ms: f64, + pub mean_exec_time_ms: f64, + pub rows: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub cache_hit_ratio: Option, + pub status: QueryStatus, +} + +impl QueryInfo { + /// Redact the query text to remove string literals. + pub fn redact_query(&mut self) { + use crate::redact; + self.query = redact::redact_query(&self.query); + } +} + +/// Full queries results +#[derive(Debug, Serialize)] +pub struct QueriesResult { + pub queries: Vec, + pub overall_status: QueryStatus, + pub extension_available: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub stats_since: Option, + pub sort_by: String, + pub limit: usize, +} + +impl QueriesResult { + /// Apply redaction to all query text in the result. + pub fn redact(&mut self) { + for q in &mut self.queries { + q.redact_query(); + } + } +} + +/// Check if pg_stat_statements extension is available +pub async fn check_extension_available(client: &Client) -> bool { + client + .query_one( + "SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements')", + &[], + ) + .await + .map(|r| r.get::<_, bool>(0)) + .unwrap_or(false) +} + +/// Get stats reset timestamp if available +pub async fn get_stats_since(client: &Client) -> Option { + client + .query_opt( + "SELECT stats_reset::text FROM pg_stat_statements_info LIMIT 1", + &[], + ) + .await + .ok() + .flatten() + .and_then(|r| r.get::<_, Option>(0)) +} + +/// Get top queries from pg_stat_statements +pub async fn get_queries( + client: &Client, + sort_by: QuerySortBy, + limit: usize, +) -> Result { + // First check if extension is available + let extension_available = check_extension_available(client).await; + + if !extension_available { + let sort_by_str = match sort_by { + QuerySortBy::TotalTime => "total_time", + QuerySortBy::MeanTime => "mean_time", + QuerySortBy::Calls => "calls", + }; + return Ok(QueriesResult { + queries: vec![], + overall_status: QueryStatus::Healthy, + extension_available: false, + stats_since: None, + sort_by: sort_by_str.to_string(), + limit, + }); + } + + // Get stats reset time + let stats_since = get_stats_since(client).await; + + // Build query with appropriate ORDER BY + let order_clause = match sort_by { + QuerySortBy::TotalTime => "total_exec_time DESC", + QuerySortBy::MeanTime => "mean_exec_time DESC", + QuerySortBy::Calls => "calls DESC", + }; + + let query = format!( + r#" + SELECT + queryid, + left(query, 500) as query, + calls, + total_exec_time as total_exec_time_ms, + mean_exec_time as mean_exec_time_ms, + rows, + shared_blks_hit, + shared_blks_read + FROM pg_stat_statements + WHERE queryid IS NOT NULL + AND query NOT LIKE '%pg_stat_statements%' + ORDER BY {} + LIMIT $1 + "#, + order_clause + ); + + let rows = client.query(&query, &[&(limit as i64)]).await?; + + let mut queries = Vec::new(); + for row in rows { + let mean_exec_time_ms: f64 = row.get("mean_exec_time_ms"); + let shared_blks_hit: i64 = row.get("shared_blks_hit"); + let shared_blks_read: i64 = row.get("shared_blks_read"); + + let cache_hit_ratio = if shared_blks_hit + shared_blks_read > 0 { + Some(100.0 * shared_blks_hit as f64 / (shared_blks_hit + shared_blks_read) as f64) + } else { + None + }; + + queries.push(QueryInfo { + queryid: row.get("queryid"), + query: row.get("query"), + calls: row.get("calls"), + total_exec_time_ms: row.get("total_exec_time_ms"), + mean_exec_time_ms, + rows: row.get("rows"), + cache_hit_ratio, + status: QueryStatus::from_mean_time(mean_exec_time_ms), + }); + } + + // Calculate overall status (worst among all queries) + let overall_status = queries + .iter() + .map(|q| &q.status) + .max_by_key(|s| match s { + QueryStatus::Healthy => 0, + QueryStatus::Warning => 1, + QueryStatus::Critical => 2, + }) + .cloned() + .unwrap_or(QueryStatus::Healthy); + + let sort_by_str = match sort_by { + QuerySortBy::TotalTime => "total_time", + QuerySortBy::MeanTime => "mean_time", + QuerySortBy::Calls => "calls", + }; + + Ok(QueriesResult { + queries, + overall_status, + extension_available: true, + stats_since, + sort_by: sort_by_str.to_string(), + limit, + }) +} + +fn format_duration_ms(ms: f64) -> String { + if ms < 1.0 { + format!("{:.2}µs", ms * 1000.0) + } else if ms < 1000.0 { + format!("{:.1}ms", ms) + } else if ms < 60000.0 { + format!("{:.2}s", ms / 1000.0) + } else { + format!("{:.1}m", ms / 60000.0) + } +} + +fn format_number(n: i64) -> String { + if n >= 1_000_000_000 { + format!("{:.1}B", n as f64 / 1_000_000_000.0) + } else if n >= 1_000_000 { + format!("{:.1}M", n as f64 / 1_000_000.0) + } else if n >= 1_000 { + format!("{:.1}K", n as f64 / 1_000.0) + } else { + format!("{}", n) + } +} + +fn truncate_query(query: &str, max_len: usize) -> String { + let clean = query.replace('\n', " ").replace(" ", " "); + let char_count = clean.chars().count(); + if char_count <= max_len { + return clean; + } + // Safe UTF-8 truncation using char boundaries + let byte_pos = clean + .char_indices() + .nth(max_len.saturating_sub(3)) + .map(|(i, _)| i) + .unwrap_or(clean.len()); + format!("{}...", &clean[..byte_pos]) +} + +/// Print queries in human-readable format +pub fn print_human(result: &QueriesResult, quiet: bool) { + if !result.extension_available { + if !quiet { + println!("pg_stat_statements extension not available."); + println!(); + println!("To enable query analysis:"); + println!(" 1. Install extension: CREATE EXTENSION pg_stat_statements;"); + println!( + " 2. Add to postgresql.conf: shared_preload_libraries = 'pg_stat_statements'" + ); + println!(" 3. Restart PostgreSQL"); + } + return; + } + + if result.queries.is_empty() { + if !quiet { + println!("No query statistics found."); + } + return; + } + + println!("TOP QUERIES (by {}):", result.sort_by); + if let Some(ref since) = result.stats_since { + println!("Stats since: {}", since); + } + println!(); + + // Header + println!( + " {:3} {:>10} {:>10} {:>10} {:>8} {:>6} QUERY", + "", "CALLS", "TOTAL", "MEAN", "ROWS", "CACHE" + ); + println!(" {}", "-".repeat(90)); + + for query in &result.queries { + let cache_str = query + .cache_hit_ratio + .map(|r| format!("{:.0}%", r)) + .unwrap_or_else(|| "-".to_string()); + + println!( + " {} {:>10} {:>10} {:>10} {:>8} {:>6} {}", + query.status.emoji(), + format_number(query.calls), + format_duration_ms(query.total_exec_time_ms), + format_duration_ms(query.mean_exec_time_ms), + format_number(query.rows), + cache_str, + truncate_query(&query.query, 40) + ); + } + + // Summary + let warning_count = result + .queries + .iter() + .filter(|q| q.status == QueryStatus::Warning) + .count(); + let critical_count = result + .queries + .iter() + .filter(|q| q.status == QueryStatus::Critical) + .count(); + + if warning_count > 0 || critical_count > 0 { + println!(); + if critical_count > 0 { + println!(" ✗ {} queries CRITICAL (mean > 5s)", critical_count); + } + if warning_count > 0 { + println!(" ⚠ {} queries WARNING (mean > 1s)", warning_count); + } + } +} + +/// Print queries as JSON with schema versioning +pub fn print_json( + result: &QueriesResult, + timeouts: Option, +) -> Result<()> { + use crate::output::{schema, DiagnosticOutput, Severity}; + + // Convert QueryStatus to Severity + let severity = match result.overall_status { + QueryStatus::Healthy => Severity::Healthy, + QueryStatus::Warning => Severity::Warning, + QueryStatus::Critical => Severity::Critical, + }; + + let output = match timeouts { + Some(t) => DiagnosticOutput::with_timeouts(schema::QUERIES, result, severity, t), + None => DiagnosticOutput::new(schema::QUERIES, result, severity), + }; + output.print()?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_query_status_healthy() { + assert_eq!(QueryStatus::from_mean_time(500.0), QueryStatus::Healthy); + } + + #[test] + fn test_query_status_warning() { + assert_eq!(QueryStatus::from_mean_time(2000.0), QueryStatus::Warning); + } + + #[test] + fn test_query_status_critical() { + assert_eq!(QueryStatus::from_mean_time(6000.0), QueryStatus::Critical); + } + + #[test] + fn test_sort_by_parsing() { + assert_eq!(QuerySortBy::from_str("total"), Some(QuerySortBy::TotalTime)); + assert_eq!(QuerySortBy::from_str("mean"), Some(QuerySortBy::MeanTime)); + assert_eq!(QuerySortBy::from_str("calls"), Some(QuerySortBy::Calls)); + assert_eq!(QuerySortBy::from_str("invalid"), None); + } + + #[test] + fn test_format_duration_microseconds() { + assert_eq!(format_duration_ms(0.5), "500.00µs"); + } + + #[test] + fn test_format_duration_milliseconds() { + assert_eq!(format_duration_ms(50.0), "50.0ms"); + } + + #[test] + fn test_format_duration_seconds() { + assert_eq!(format_duration_ms(5000.0), "5.00s"); + } + + #[test] + fn test_format_duration_minutes() { + assert_eq!(format_duration_ms(120000.0), "2.0m"); + } + + #[test] + fn test_format_number() { + assert_eq!(format_number(500), "500"); + assert_eq!(format_number(5000), "5.0K"); + assert_eq!(format_number(5000000), "5.0M"); + assert_eq!(format_number(5000000000), "5.0B"); + } +} diff --git a/src/main.rs b/src/main.rs index d6cbcba..f046733 100644 --- a/src/main.rs +++ b/src/main.rs @@ -76,6 +76,8 @@ fn json_supported(command: &Commands) -> bool { Commands::Sequences { .. } => true, Commands::Indexes { .. } => true, Commands::Vacuum { .. } => true, + Commands::Queries { .. } => true, + Commands::Connections { .. } => true, Commands::Fix { .. } => true, Commands::Context => true, Commands::Capabilities => true, @@ -349,6 +351,27 @@ enum Commands { #[arg(long, value_name = "PCT")] threshold: Option, }, + /// Top queries from pg_stat_statements + Queries { + /// Sort by: total (default), mean, calls + #[arg(long, value_name = "FIELD")] + by: Option, + /// Number of queries to show (default: 10) + #[arg(long, default_value = "10")] + limit: usize, + /// Show all tracked queries (ignores --limit) + #[arg(long)] + all: bool, + }, + /// Analyze connection usage vs max_connections + Connections { + /// Group connections by user + #[arg(long)] + by_user: bool, + /// Group connections by database + #[arg(long)] + by_database: bool, + }, /// Fix commands for remediation Fix { #[command(subcommand)] @@ -1322,6 +1345,116 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { commands::vacuum::VacuumStatus::Healthy => {} } } + Commands::Queries { ref by, limit, all } => { + let config = + Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; + let conn_result = connection::resolve_and_validate( + &config, + cli.database_url.as_deref(), + cli.connection.as_deref(), + cli.env_var.as_deref(), + cli.allow_primary, + cli.read_write, + cli.quiet, + )?; + + // Use DiagnosticSession with timeout enforcement + let timeout_config = parse_timeout_config(&cli)?; + let session = DiagnosticSession::connect(&conn_result.url, timeout_config).await?; + + // Set up Ctrl+C handler to cancel queries gracefully + setup_ctrlc_handler(session.cancel_token()); + + // Show effective timeouts unless quiet + if !cli.quiet && !cli.json { + eprintln!("pgcrate: timeouts: {}", session.effective_timeouts()); + } + + // Parse sort order + let sort_by = by + .as_ref() + .map(|s| { + commands::queries::QuerySortBy::from_str(s).ok_or_else(|| { + anyhow::anyhow!("Invalid sort field '{}'. Use: total, mean, or calls", s) + }) + }) + .transpose()? + .unwrap_or_default(); + + let effective_limit = if all { + commands::queries::ALL_QUERIES_LIMIT + } else { + limit + }; + + let mut result = + commands::queries::get_queries(session.client(), sort_by, effective_limit).await?; + + // Apply redaction unless explicitly disabled + if !cli.no_redact { + result.redact(); + } else { + eprintln!("pgcrate: WARNING: --no-redact disables query text redaction. Output may contain sensitive data."); + } + + if cli.json { + commands::queries::print_json(&result, Some(session.effective_timeouts()))?; + } else { + commands::queries::print_human(&result, cli.quiet); + } + + // Exit with appropriate code + match result.overall_status { + commands::queries::QueryStatus::Critical => std::process::exit(2), + commands::queries::QueryStatus::Warning => std::process::exit(1), + commands::queries::QueryStatus::Healthy => {} + } + } + Commands::Connections { + by_user, + by_database, + } => { + let config = + Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; + let conn_result = connection::resolve_and_validate( + &config, + cli.database_url.as_deref(), + cli.connection.as_deref(), + cli.env_var.as_deref(), + cli.allow_primary, + cli.read_write, + cli.quiet, + )?; + + // Use DiagnosticSession with timeout enforcement + let timeout_config = parse_timeout_config(&cli)?; + let session = DiagnosticSession::connect(&conn_result.url, timeout_config).await?; + + // Set up Ctrl+C handler to cancel queries gracefully + setup_ctrlc_handler(session.cancel_token()); + + // Show effective timeouts unless quiet + if !cli.quiet && !cli.json { + eprintln!("pgcrate: timeouts: {}", session.effective_timeouts()); + } + + let result = + commands::connections::get_connections(session.client(), by_user, by_database) + .await?; + + if cli.json { + commands::connections::print_json(&result, Some(session.effective_timeouts()))?; + } else { + commands::connections::print_human(&result, cli.quiet); + } + + // Exit with appropriate code + match result.overall_status { + commands::connections::ConnectionStatus::Critical => std::process::exit(2), + commands::connections::ConnectionStatus::Warning => std::process::exit(1), + commands::connections::ConnectionStatus::Healthy => {} + } + } Commands::Fix { ref command } => { let config = Config::load(cli.config_path.as_deref()).context("Failed to load configuration")?; @@ -2183,6 +2316,8 @@ async fn run(cli: Cli, output: &Output) -> Result<()> { | Commands::Sequences { .. } | Commands::Indexes { .. } | Commands::Vacuum { .. } + | Commands::Queries { .. } + | Commands::Connections { .. } | Commands::Fix { .. } | Commands::Context | Commands::Capabilities diff --git a/src/output.rs b/src/output.rs index 95d8164..063a170 100644 --- a/src/output.rs +++ b/src/output.rs @@ -427,6 +427,8 @@ pub mod schema { pub const VACUUM: &str = "pgcrate.diagnostics.vacuum"; pub const CONTEXT: &str = "pgcrate.diagnostics.context"; pub const CAPABILITIES: &str = "pgcrate.diagnostics.capabilities"; + pub const QUERIES: &str = "pgcrate.diagnostics.queries"; + pub const CONNECTIONS: &str = "pgcrate.diagnostics.connections"; } // ============================================================================= diff --git a/tests/diagnostics/connections.rs b/tests/diagnostics/connections.rs new file mode 100644 index 0000000..44749f8 --- /dev/null +++ b/tests/diagnostics/connections.rs @@ -0,0 +1,222 @@ +//! Integration tests for `pgcrate connections` command. +//! +//! Tests connection analysis vs max_connections. + +use crate::common::{parse_json, stdout, TestDatabase, TestProject}; + +// ============================================================================ +// connections - basic functionality +// ============================================================================ + +#[test] +fn test_connections_runs_without_error() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // connections should succeed - pg_stat_activity is always available + let output = project.run_pgcrate(&["connections"]); + + assert!( + output.status.code().unwrap_or(99) <= 2, + "connections should return valid exit code (0=healthy, 1=warning, 2=critical)" + ); +} + +#[test] +fn test_connections_json_structure() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate_ok(&["connections", "--json"]); + + let json = parse_json(&output); + assert!(json.is_object(), "Should return JSON object"); + + // Schema versioning fields + assert_eq!(json.get("ok"), Some(&serde_json::json!(true))); + assert_eq!( + json.get("schema_id"), + Some(&serde_json::json!("pgcrate.diagnostics.connections")) + ); + assert!( + json.get("schema_version").is_some(), + "JSON should have schema_version: {}", + json + ); + + // Data fields (nested in data object) + let data = json.get("data").expect("JSON should have data field"); + assert!( + data.get("stats").is_some(), + "JSON should have data.stats: {}", + json + ); + assert!( + data.get("overall_status").is_some(), + "JSON should have data.overall_status: {}", + json + ); + + // Stats should have expected fields + let stats = data.get("stats").expect("should have stats"); + assert!( + stats.get("total").is_some(), + "stats should have total: {}", + json + ); + assert!( + stats.get("max_connections").is_some(), + "stats should have max_connections: {}", + json + ); + assert!( + stats.get("usage_pct").is_some(), + "stats should have usage_pct: {}", + json + ); + assert!( + stats.get("by_state").is_some(), + "stats should have by_state: {}", + json + ); +} + +#[test] +fn test_connections_by_user_option() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate_ok(&["connections", "--by-user", "--json"]); + + let json = parse_json(&output); + let data = json.get("data").expect("JSON should have data field"); + assert!( + data.get("by_user").is_some(), + "JSON should have data.by_user when --by-user is specified: {}", + json + ); +} + +#[test] +fn test_connections_by_database_option() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate_ok(&["connections", "--by-database", "--json"]); + + let json = parse_json(&output); + let data = json.get("data").expect("JSON should have data field"); + assert!( + data.get("by_database").is_some(), + "JSON should have data.by_database when --by-database is specified: {}", + json + ); +} + +#[test] +fn test_connections_both_options() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate_ok(&["connections", "--by-user", "--by-database", "--json"]); + + let json = parse_json(&output); + let data = json.get("data").expect("JSON should have data field"); + assert!( + data.get("by_user").is_some(), + "JSON should have data.by_user: {}", + json + ); + assert!( + data.get("by_database").is_some(), + "JSON should have data.by_database: {}", + json + ); +} + +#[test] +fn test_connections_human_output() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["connections"]); + + let out = stdout(&output); + // Should show connection summary + assert!( + out.contains("CONNECTIONS") || out.contains("connections"), + "Should show connections header: {}", + out + ); + assert!( + out.contains("max_connections") || out.contains("/") || out.contains("%"), + "Should show connection usage: {}", + out + ); +} + +#[test] +fn test_connections_shows_states() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["connections"]); + + let out = stdout(&output); + // Should show state breakdown + assert!( + out.contains("STATE") || out.contains("idle") || out.contains("active"), + "Should show connection states: {}", + out + ); +} + +#[test] +fn test_connections_healthy_on_fresh_db() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // A test database should have very few connections (healthy) + let output = project.run_pgcrate(&["connections", "--json"]); + + assert!( + output.status.success(), + "Fresh DB should have healthy connection count" + ); + + let json = parse_json(&output); + let data = json.get("data").expect("JSON should have data field"); + let status = data + .get("overall_status") + .expect("should have overall_status"); + assert_eq!( + status, + &serde_json::json!("healthy"), + "Fresh DB should be healthy: {}", + json + ); +} diff --git a/tests/diagnostics/mod.rs b/tests/diagnostics/mod.rs index dbcf5ef..24b36a8 100644 --- a/tests/diagnostics/mod.rs +++ b/tests/diagnostics/mod.rs @@ -1,5 +1,7 @@ mod basic; +mod connections; mod fix; mod indexes; mod locks; +mod queries; mod sequences_scenarios; diff --git a/tests/diagnostics/queries.rs b/tests/diagnostics/queries.rs new file mode 100644 index 0000000..07afd58 --- /dev/null +++ b/tests/diagnostics/queries.rs @@ -0,0 +1,177 @@ +//! Integration tests for `pgcrate queries` command. +//! +//! Tests query analysis from pg_stat_statements. + +use crate::common::{parse_json, stdout, TestDatabase, TestProject}; + +// ============================================================================ +// queries - basic functionality +// ============================================================================ + +#[test] +fn test_queries_runs_without_error() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // queries may return 0 (healthy) even without pg_stat_statements + // or may return non-zero if there are slow queries + let output = project.run_pgcrate(&["queries"]); + + assert!( + output.status.code().unwrap_or(99) <= 2, + "queries should return valid exit code (0=healthy, 1=warning, 2=critical)" + ); +} + +#[test] +fn test_queries_json_structure() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["queries", "--json"]); + + // Should succeed (extension may or may not be available) + assert!( + output.status.code().unwrap_or(99) <= 2, + "queries --json should return valid exit code" + ); + + let json = parse_json(&output); + assert!(json.is_object(), "Should return JSON object"); + + // Schema versioning fields + assert_eq!(json.get("ok"), Some(&serde_json::json!(true))); + assert_eq!( + json.get("schema_id"), + Some(&serde_json::json!("pgcrate.diagnostics.queries")) + ); + assert!( + json.get("schema_version").is_some(), + "JSON should have schema_version: {}", + json + ); + + // Data fields (nested in data object) + let data = json.get("data").expect("JSON should have data field"); + assert!( + data.get("extension_available").is_some(), + "JSON should have data.extension_available: {}", + json + ); + assert!( + data.get("queries").is_some(), + "JSON should have data.queries: {}", + json + ); + assert!( + data.get("overall_status").is_some(), + "JSON should have data.overall_status: {}", + json + ); +} + +#[test] +fn test_queries_sort_options() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + // Test different sort options + for sort_by in &["total", "mean", "calls"] { + let output = project.run_pgcrate(&["queries", "--by", sort_by]); + assert!( + output.status.code().unwrap_or(99) <= 2, + "queries --by {} should succeed", + sort_by + ); + } +} + +#[test] +fn test_queries_limit_option() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["queries", "--limit", "5", "--json"]); + assert!( + output.status.code().unwrap_or(99) <= 2, + "queries --limit should succeed" + ); + + let json = parse_json(&output); + let data = json.get("data").expect("JSON should have data field"); + assert_eq!( + data.get("limit"), + Some(&serde_json::json!(5)), + "JSON should reflect limit: {}", + json + ); +} + +#[test] +fn test_queries_all_option() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["queries", "--all", "--json"]); + assert!( + output.status.code().unwrap_or(99) <= 2, + "queries --all should succeed" + ); + + let json = parse_json(&output); + let data = json.get("data").expect("JSON should have data field"); + assert_eq!( + data.get("limit"), + Some(&serde_json::json!(1000)), + "JSON should reflect large limit for --all: {}", + json + ); +} + +#[test] +fn test_queries_invalid_sort_option() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["queries", "--by", "invalid"]); + assert!(!output.status.success(), "queries --by invalid should fail"); +} + +#[test] +fn test_queries_human_output() { + skip_if_no_db!(); + let db = TestDatabase::new(); + let project = TestProject::from_fixture("with_migrations", &db); + + project.run_pgcrate_ok(&["migrate", "up"]); + + let output = project.run_pgcrate(&["queries"]); + + let out = stdout(&output); + // Should show either query info or extension unavailable message + assert!( + out.contains("QUERIES") + || out.contains("pg_stat_statements") + || out.contains("No query statistics"), + "Should show query info or extension message: {}", + out + ); +}