From a10ac6dac7f115a1794663ee183d578cdc9d5b47 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 15 Apr 2025 14:55:19 -0700 Subject: [PATCH 1/9] graph, store: 'graphman info': Do not get confused by copies Because the code in primary::queries::fill_assignments used the deployment hash to reference a deployment, it would get confused by copies since for those several deployments have the same hash --- graph/src/components/store/mod.rs | 33 ++++++++++++++++++++++++++++++- store/postgres/src/primary.rs | 15 +++++++------- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index efe16c90ee6..a4ed96b0ba1 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -3,6 +3,11 @@ mod err; mod traits; pub mod write; +use diesel::deserialize::FromSql; +use diesel::pg::Pg; +use diesel::serialize::{Output, ToSql}; +use diesel::sql_types::Integer; +use diesel_derives::{AsExpression, FromSqlRow}; pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCache}; use slog::Logger; @@ -691,7 +696,20 @@ pub struct StoredDynamicDataSource { /// identifier only has meaning in the context of a specific instance of /// graph-node. Only store code should ever construct or consume it; all /// other code passes it around as an opaque token. -#[derive(Copy, Clone, CheapClone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[derive( + Copy, + Clone, + CheapClone, + Debug, + Serialize, + Deserialize, + PartialEq, + Eq, + Hash, + AsExpression, + FromSqlRow, +)] +#[diesel(sql_type = Integer)] pub struct DeploymentId(pub i32); impl Display for DeploymentId { @@ -706,6 +724,19 @@ impl DeploymentId { } } +impl FromSql for DeploymentId { + fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result { + let id = >::from_sql(bytes)?; + Ok(DeploymentId(id)) + } +} + +impl ToSql for DeploymentId { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result { + >::to_sql(&self.0, out) + } +} + /// A unique identifier for a deployment that specifies both its external /// identifier (`hash`) and its unique internal identifier (`id`) which /// ensures we are talking about a unique location for the deployment's data diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 8d49153d214..6b22b8c8e35 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -450,6 +450,7 @@ mod queries { use diesel::sql_types::Text; use graph::prelude::NodeId; use graph::{ + components::store::DeploymentId as GraphDeploymentId, data::subgraph::status, internal_error, prelude::{DeploymentHash, StoreError, SubgraphName}, @@ -646,18 +647,18 @@ mod queries { conn: &mut PgConnection, infos: &mut [status::Info], ) -> Result<(), StoreError> { - let ids: Vec<_> = infos.iter().map(|info| &info.subgraph).collect(); + let ids: Vec<_> = infos.iter().map(|info| &info.id).collect(); let nodes: HashMap<_, _> = a::table .inner_join(ds::table.on(ds::id.eq(a::id))) - .filter(ds::subgraph.eq_any(ids)) - .select((ds::subgraph, a::node_id, a::paused_at.is_not_null())) - .load::<(String, String, bool)>(conn)? + .filter(ds::id.eq_any(ids)) + .select((ds::id, a::node_id, a::paused_at.is_not_null())) + .load::<(GraphDeploymentId, String, bool)>(conn)? .into_iter() - .map(|(subgraph, node, paused)| (subgraph, (node, paused))) + .map(|(id, node, paused)| (id, (node, paused))) .collect(); for info in infos { - info.node = nodes.get(&info.subgraph).map(|(node, _)| node.clone()); - info.paused = nodes.get(&info.subgraph).map(|(_, paused)| *paused); + info.node = nodes.get(&info.id).map(|(node, _)| node.clone()); + info.paused = nodes.get(&info.id).map(|(_, paused)| *paused); } Ok(()) } From 2985a10084a03398c2460a1ce5f5b3ad7fb21a31 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 8 Apr 2025 15:48:17 -0700 Subject: [PATCH 2/9] graph: Introduce a StoreResult --- graph/src/components/store/err.rs | 2 ++ graph/src/components/store/mod.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/graph/src/components/store/err.rs b/graph/src/components/store/err.rs index 264c1b80df2..b8743658030 100644 --- a/graph/src/components/store/err.rs +++ b/graph/src/components/store/err.rs @@ -7,6 +7,8 @@ use diesel::result::Error as DieselError; use thiserror::Error; use tokio::task::JoinError; +pub type StoreResult = Result; + #[derive(Error, Debug)] pub enum StoreError { #[error("store error: {0:#}")] diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index a4ed96b0ba1..88cc49d024a 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -12,7 +12,7 @@ pub use entity_cache::{EntityCache, EntityLfuCache, GetScope, ModificationsAndCa use slog::Logger; pub use super::subgraph::Entity; -pub use err::StoreError; +pub use err::{StoreError, StoreResult}; use itertools::Itertools; use strum_macros::Display; pub use traits::*; From ddf9042babbd7d5cdffff6878e320e6302c2a235 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 16 Apr 2025 10:54:04 -0700 Subject: [PATCH 3/9] node: Extract some formatting helpers into a module --- node/src/manager/commands/copy.rs | 28 ++--------- node/src/manager/commands/prune.rs | 7 +-- node/src/manager/commands/stats.rs | 16 +----- node/src/manager/fmt.rs | 78 ++++++++++++++++++++++++++++++ node/src/manager/mod.rs | 1 + 5 files changed, 87 insertions(+), 43 deletions(-) create mode 100644 node/src/manager/fmt.rs diff --git a/node/src/manager/commands/copy.rs b/node/src/manager/commands/copy.rs index a7857476c58..c09630ae261 100644 --- a/node/src/manager/commands/copy.rs +++ b/node/src/manager/commands/copy.rs @@ -1,5 +1,5 @@ use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQueryDsl}; -use std::{collections::HashMap, sync::Arc, time::SystemTime}; +use std::{collections::HashMap, sync::Arc}; use graph::{ components::store::{BlockStore as _, DeploymentId, DeploymentLocator}, @@ -19,8 +19,8 @@ use graph_store_postgres::{ }; use graph_store_postgres::{ConnectionPool, Shard, Store, SubgraphStore}; -use crate::manager::deployment::DeploymentSearch; use crate::manager::display::List; +use crate::manager::{deployment::DeploymentSearch, fmt}; type UtcDateTime = DateTime; @@ -260,26 +260,6 @@ pub fn status(pools: HashMap, dst: &DeploymentSearch) -> use catalog::active_copies as ac; use catalog::deployment_schemas as ds; - fn duration(start: &UtcDateTime, end: &Option) -> String { - let start = *start; - let end = *end; - - let end = end.unwrap_or(UtcDateTime::from(SystemTime::now())); - let duration = end - start; - - human_duration(duration) - } - - fn human_duration(duration: Duration) -> String { - if duration.num_seconds() < 5 { - format!("{}ms", duration.num_milliseconds()) - } else if duration.num_minutes() < 5 { - format!("{}s", duration.num_seconds()) - } else { - format!("{}m", duration.num_minutes()) - } - } - let primary = pools .get(&*PRIMARY_SHARD) .ok_or_else(|| anyhow!("can not find deployment with id {}", dst))?; @@ -336,7 +316,7 @@ pub fn status(pools: HashMap, dst: &DeploymentSearch) -> state.dst.to_string(), state.target_block_number.to_string(), on_sync.to_str().to_string(), - duration(&state.started_at, &state.finished_at), + fmt::duration(&state.started_at, &state.finished_at), progress, ]; match (cancelled_at, state.cancelled_at) { @@ -378,7 +358,7 @@ pub fn status(pools: HashMap, dst: &DeploymentSearch) -> table.next_vid, table.target_vid, table.batch_size, - human_duration(Duration::milliseconds(table.duration_ms)), + fmt::human_duration(Duration::milliseconds(table.duration_ms)), ); } diff --git a/node/src/manager/commands/prune.rs b/node/src/manager/commands/prune.rs index dbf114453e8..4ca058525e2 100644 --- a/node/src/manager/commands/prune.rs +++ b/node/src/manager/commands/prune.rs @@ -16,10 +16,7 @@ use graph::{ }; use graph_store_postgres::{ConnectionPool, Store}; -use crate::manager::{ - commands::stats::{abbreviate_table_name, show_stats}, - deployment::DeploymentSearch, -}; +use crate::manager::{commands::stats::show_stats, deployment::DeploymentSearch, fmt}; struct Progress { start: Instant, @@ -66,7 +63,7 @@ fn print_batch( }; print!( "\r{:<30} | {:>10} | {:>9}s {phase}", - abbreviate_table_name(table, 30), + fmt::abbreviate(table, 30), total_rows, elapsed.as_secs() ); diff --git a/node/src/manager/commands/stats.rs b/node/src/manager/commands/stats.rs index bb3a928b1ad..abb02fdb77c 100644 --- a/node/src/manager/commands/stats.rs +++ b/node/src/manager/commands/stats.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use std::sync::Arc; use crate::manager::deployment::DeploymentSearch; +use crate::manager::fmt; use diesel::r2d2::ConnectionManager; use diesel::r2d2::PooledConnection; use diesel::PgConnection; @@ -51,19 +52,6 @@ pub async fn account_like( Ok(()) } -pub fn abbreviate_table_name(table: &str, size: usize) -> String { - if table.len() > size { - let fragment = size / 2 - 2; - let last = table.len() - fragment; - let mut table = table.to_string(); - table.replace_range(fragment..last, ".."); - let table = table.trim().to_string(); - table - } else { - table.to_string() - } -} - pub fn show_stats( stats: &[VersionStats], account_like: HashSet, @@ -83,7 +71,7 @@ pub fn show_stats( fn print_stats(s: &VersionStats, account_like: bool) { println!( "{:<26} {:3} | {:>10} | {:>10} | {:>5.1}%", - abbreviate_table_name(&s.tablename, 26), + fmt::abbreviate(&s.tablename, 26), if account_like { "(a)" } else { " " }, s.entities, s.versions, diff --git a/node/src/manager/fmt.rs b/node/src/manager/fmt.rs new file mode 100644 index 00000000000..549d173ca1a --- /dev/null +++ b/node/src/manager/fmt.rs @@ -0,0 +1,78 @@ +use std::time::SystemTime; + +use graph::prelude::chrono::{DateTime, Duration, Local, Utc}; + +pub const NULL: &str = "ø"; +const CHECK: &str = "✓"; + +pub fn null() -> String { + NULL.to_string() +} + +pub fn check() -> String { + CHECK.to_string() +} + +pub trait MapOrNull { + fn map_or_null(&self, f: F) -> String + where + F: FnOnce(&T) -> String; +} + +impl MapOrNull for Option { + fn map_or_null(&self, f: F) -> String + where + F: FnOnce(&T) -> String, + { + self.as_ref() + .map(|value| f(value)) + .unwrap_or_else(|| NULL.to_string()) + } +} + +/// Return the duration from `start` to `end` formatted using +/// `human_duration`. Use now if `end` is `None` +pub fn duration(start: &DateTime, end: &Option>) -> String { + let start = *start; + let end = *end; + + let end = end.unwrap_or(DateTime::::from(SystemTime::now())); + let duration = end - start; + + human_duration(duration) +} + +/// Format a duration using ms/s/m as units depending on how long the +/// duration was +pub fn human_duration(duration: Duration) -> String { + if duration.num_seconds() < 5 { + format!("{}ms", duration.num_milliseconds()) + } else if duration.num_minutes() < 5 { + format!("{}s", duration.num_seconds()) + } else { + format!("{}m", duration.num_minutes()) + } +} + +/// Abbreviate a long name to fit into `size` characters. The abbreviation +/// is done by replacing the middle of the name with `..`. For example, if +/// `name` is `foo_bar_baz` and `size` is 10, the result will be +/// `foo.._baz`. If the name is shorter than `size`, it is returned +/// unchanged. +pub fn abbreviate(name: &str, size: usize) -> String { + if name.len() > size { + let fragment = size / 2 - 2; + let last = name.len() - fragment; + let mut name = name.to_string(); + name.replace_range(fragment..last, ".."); + let table = name.trim().to_string(); + table + } else { + name.to_string() + } +} + +pub fn date_time(date: &DateTime) -> String { + let date = DateTime::::from(*date); + date.format("%Y-%m-%d %H:%M:%S%Z").to_string() +} diff --git a/node/src/manager/mod.rs b/node/src/manager/mod.rs index 6a332653ca8..d95e5fbadc1 100644 --- a/node/src/manager/mod.rs +++ b/node/src/manager/mod.rs @@ -8,6 +8,7 @@ pub mod color; pub mod commands; pub mod deployment; mod display; +pub mod fmt; pub mod prompt; /// A dummy subscription manager that always panics From 4a15ec43a4b037b60e0841d72cb00259cc8ab138 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 8 Apr 2025 15:48:35 -0700 Subject: [PATCH 4/9] store: Track pruning progress in the database Also adds graphman commands to view the progress --- graph/src/components/store/mod.rs | 4 + node/src/bin/manager.rs | 154 ++++-- node/src/manager/commands/prune.rs | 242 ++++++++- .../down.sql | 2 + .../2025-04-08-224710_add_prune_state/up.sql | 60 +++ store/postgres/src/deployment_store.rs | 15 +- store/postgres/src/lib.rs | 1 + store/postgres/src/relational.rs | 2 +- store/postgres/src/relational/prune.rs | 486 +++++++++++++++++- store/postgres/src/subgraph_store.rs | 11 + 10 files changed, 903 insertions(+), 74 deletions(-) create mode 100644 store/postgres/migrations/2025-04-08-224710_add_prune_state/down.sql create mode 100644 store/postgres/migrations/2025-04-08-224710_add_prune_state/up.sql diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 88cc49d024a..ab30caeda75 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -1003,6 +1003,9 @@ pub struct PruneRequest { pub earliest_block: BlockNumber, /// The last block that contains final entities not subject to a reorg pub final_block: BlockNumber, + /// The first block for which the deployment contained entities when the + /// request was made + pub first_block: BlockNumber, /// The latest block, i.e., the subgraph head pub latest_block: BlockNumber, /// Use the rebuild strategy when removing more than this fraction of @@ -1066,6 +1069,7 @@ impl PruneRequest { earliest_block, final_block, latest_block, + first_block, rebuild_threshold, delete_threshold, }) diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 803625a6021..81c794485d4 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -297,35 +297,13 @@ pub enum Command { #[clap(subcommand)] Index(IndexCommand), - /// Prune a deployment + /// Prune subgraphs by removing old entity versions /// /// Keep only entity versions that are needed to respond to queries at /// block heights that are within `history` blocks of the subgraph head; /// all other entity versions are removed. - /// - /// Unless `--once` is given, this setting is permanent and the subgraph - /// will periodically be pruned to remove history as the subgraph head - /// moves forward. - Prune { - /// The deployment to prune (see `help info`) - deployment: DeploymentSearch, - /// Prune by rebuilding tables when removing more than this fraction - /// of history. Defaults to GRAPH_STORE_HISTORY_REBUILD_THRESHOLD - #[clap(long, short)] - rebuild_threshold: Option, - /// Prune by deleting when removing more than this fraction of - /// history but less than rebuild_threshold. Defaults to - /// GRAPH_STORE_HISTORY_DELETE_THRESHOLD - #[clap(long, short)] - delete_threshold: Option, - /// How much history to keep in blocks. Defaults to - /// GRAPH_MIN_HISTORY_BLOCKS - #[clap(long, short = 'y')] - history: Option, - /// Prune only this once - #[clap(long, short)] - once: bool, - }, + #[clap(subcommand)] + Prune(PruneCommand), /// General database management #[clap(subcommand)] @@ -694,6 +672,67 @@ pub enum StatsCommand { }, } +#[derive(Clone, Debug, Subcommand)] +pub enum PruneCommand { + /// Prune a deployment in the foreground + /// + /// Unless `--once` is given, this setting is permanent and the subgraph + /// will periodically be pruned to remove history as the subgraph head + /// moves forward. + Run { + /// The deployment to prune (see `help info`) + deployment: DeploymentSearch, + /// Prune by rebuilding tables when removing more than this fraction + /// of history. Defaults to GRAPH_STORE_HISTORY_REBUILD_THRESHOLD + #[clap(long, short)] + rebuild_threshold: Option, + /// Prune by deleting when removing more than this fraction of + /// history but less than rebuild_threshold. Defaults to + /// GRAPH_STORE_HISTORY_DELETE_THRESHOLD + #[clap(long, short)] + delete_threshold: Option, + /// How much history to keep in blocks. Defaults to + /// GRAPH_MIN_HISTORY_BLOCKS + #[clap(long, short = 'y')] + history: Option, + /// Prune only this once + #[clap(long, short)] + once: bool, + }, + /// Prune a deployment in the background + /// + /// Set the amount of history the subgraph should retain. The actual + /// data removal happens in the background and can be monitored with + /// `prune status`. It can take several minutes of the first pruning to + /// start, during which time `prune status` will not return any + /// information + Set { + /// The deployment to prune (see `help info`) + deployment: DeploymentSearch, + /// Prune by rebuilding tables when removing more than this fraction + /// of history. Defaults to GRAPH_STORE_HISTORY_REBUILD_THRESHOLD + #[clap(long, short)] + rebuild_threshold: Option, + /// Prune by deleting when removing more than this fraction of + /// history but less than rebuild_threshold. Defaults to + /// GRAPH_STORE_HISTORY_DELETE_THRESHOLD + #[clap(long, short)] + delete_threshold: Option, + /// How much history to keep in blocks. Defaults to + /// GRAPH_MIN_HISTORY_BLOCKS + #[clap(long, short = 'y')] + history: Option, + }, + /// Show the status of a pruning operation + Status { + /// The number of the pruning run + #[clap(long, short)] + run: Option, + /// The deployment to check (see `help info`) + deployment: DeploymentSearch, + }, +} + #[derive(Clone, Debug, Subcommand)] pub enum IndexCommand { /// Creates a new database index. @@ -1613,25 +1652,52 @@ async fn main() -> anyhow::Result<()> { } } } - Prune { - deployment, - history, - rebuild_threshold, - delete_threshold, - once, - } => { - let (store, primary_pool) = ctx.store_and_primary(); - let history = history.unwrap_or(ENV_VARS.min_history_blocks.try_into()?); - commands::prune::run( - store, - primary_pool, - deployment, - history, - rebuild_threshold, - delete_threshold, - once, - ) - .await + Prune(cmd) => { + use PruneCommand::*; + match cmd { + Run { + deployment, + history, + rebuild_threshold, + delete_threshold, + once, + } => { + let (store, primary_pool) = ctx.store_and_primary(); + let history = history.unwrap_or(ENV_VARS.min_history_blocks.try_into()?); + commands::prune::run( + store, + primary_pool, + deployment, + history, + rebuild_threshold, + delete_threshold, + once, + ) + .await + } + Set { + deployment, + rebuild_threshold, + delete_threshold, + history, + } => { + let (store, primary_pool) = ctx.store_and_primary(); + let history = history.unwrap_or(ENV_VARS.min_history_blocks.try_into()?); + commands::prune::set( + store, + primary_pool, + deployment, + history, + rebuild_threshold, + delete_threshold, + ) + .await + } + Status { run, deployment } => { + let (store, primary_pool) = ctx.store_and_primary(); + commands::prune::status(store, primary_pool, deployment, run).await + } + } } Drop { deployment, diff --git a/node/src/manager/commands/prune.rs b/node/src/manager/commands/prune.rs index 4ca058525e2..0fc5538fc71 100644 --- a/node/src/manager/commands/prune.rs +++ b/node/src/manager/commands/prune.rs @@ -6,7 +6,7 @@ use std::{ }; use graph::{ - components::store::{PrunePhase, PruneRequest}, + components::store::{DeploymentLocator, PrunePhase, PruneRequest}, env::ENV_VARS, }; use graph::{ @@ -14,9 +14,16 @@ use graph::{ data::subgraph::status, prelude::{anyhow, BlockNumber}, }; -use graph_store_postgres::{ConnectionPool, Store}; +use graph_store_postgres::{ + command_support::{Phase, PruneTableState}, + ConnectionPool, Store, +}; -use crate::manager::{commands::stats::show_stats, deployment::DeploymentSearch, fmt}; +use crate::manager::{ + commands::stats::show_stats, + deployment::DeploymentSearch, + fmt::{self, MapOrNull as _}, +}; struct Progress { start: Instant, @@ -153,15 +160,19 @@ impl PruneReporter for Progress { } } -pub async fn run( - store: Arc, +struct Args { + history: BlockNumber, + deployment: DeploymentLocator, + earliest_block: BlockNumber, + latest_block: BlockNumber, +} + +fn check_args( + store: &Arc, primary_pool: ConnectionPool, search: DeploymentSearch, history: usize, - rebuild_threshold: Option, - delete_threshold: Option, - once: bool, -) -> Result<(), anyhow::Error> { +) -> Result { let history = history as BlockNumber; let deployment = search.locate_unique(&primary_pool)?; let mut info = store @@ -178,22 +189,38 @@ pub async fn run( .chains .pop() .ok_or_else(|| anyhow!("deployment {} does not index any chain", deployment))?; - let latest = status.latest_block.map(|ptr| ptr.number()).unwrap_or(0); - if latest <= history { - return Err(anyhow!("deployment {deployment} has only indexed up to block {latest} and we can't preserve {history} blocks of history")); + let latest_block = status.latest_block.map(|ptr| ptr.number()).unwrap_or(0); + if latest_block <= history { + return Err(anyhow!("deployment {deployment} has only indexed up to block {latest_block} and we can't preserve {history} blocks of history")); } + Ok(Args { + history, + deployment, + earliest_block: status.earliest_block_number, + latest_block, + }) +} - println!("prune {deployment}"); - println!(" latest: {latest}"); - println!(" final: {}", latest - ENV_VARS.reorg_threshold()); - println!(" earliest: {}\n", latest - history); +async fn first_prune( + store: &Arc, + args: &Args, + rebuild_threshold: Option, + delete_threshold: Option, +) -> Result<(), anyhow::Error> { + println!("prune {}", args.deployment); + println!( + " range: {} - {} ({} blocks)", + args.earliest_block, + args.latest_block, + args.latest_block - args.earliest_block + ); let mut req = PruneRequest::new( - &deployment, - history, + &args.deployment, + args.history, ENV_VARS.reorg_threshold(), - status.earliest_block_number, - latest, + args.earliest_block, + args.latest_block, )?; if let Some(rebuild_threshold) = rebuild_threshold { req.rebuild_threshold = rebuild_threshold; @@ -206,17 +233,186 @@ pub async fn run( store .subgraph_store() - .prune(reporter, &deployment, req) + .prune(reporter, &args.deployment, req) .await?; + Ok(()) +} + +async fn run_inner( + store: Arc, + primary_pool: ConnectionPool, + search: DeploymentSearch, + history: usize, + rebuild_threshold: Option, + delete_threshold: Option, + once: bool, + do_first_prune: bool, +) -> Result<(), anyhow::Error> { + let args = check_args(&store, primary_pool, search, history)?; + + if do_first_prune { + first_prune(&store, &args, rebuild_threshold, delete_threshold).await?; + } // Only after everything worked out, make the history setting permanent if !once { store.subgraph_store().set_history_blocks( - &deployment, - history, + &args.deployment, + args.history, ENV_VARS.reorg_threshold(), )?; } Ok(()) } + +pub async fn run( + store: Arc, + primary_pool: ConnectionPool, + search: DeploymentSearch, + history: usize, + rebuild_threshold: Option, + delete_threshold: Option, + once: bool, +) -> Result<(), anyhow::Error> { + run_inner( + store, + primary_pool, + search, + history, + rebuild_threshold, + delete_threshold, + once, + true, + ) + .await +} + +pub async fn set( + store: Arc, + primary_pool: ConnectionPool, + search: DeploymentSearch, + history: usize, + rebuild_threshold: Option, + delete_threshold: Option, +) -> Result<(), anyhow::Error> { + run_inner( + store, + primary_pool, + search, + history, + rebuild_threshold, + delete_threshold, + false, + false, + ) + .await +} + +pub async fn status( + store: Arc, + primary_pool: ConnectionPool, + search: DeploymentSearch, + run: Option, +) -> Result<(), anyhow::Error> { + fn percentage(left: Option, x: Option, right: Option) -> String { + match (left, x, right) { + (Some(left), Some(x), Some(right)) => { + let range = right - left; + if range == 0 { + return fmt::null(); + } + let percent = (x - left) as f64 / range as f64 * 100.0; + format!("{:.0}%", percent.min(100.0)) + } + _ => fmt::null(), + } + } + + let deployment = search.locate_unique(&primary_pool)?; + + let viewer = store.subgraph_store().prune_viewer(&deployment).await?; + let runs = viewer.runs()?; + if runs.is_empty() { + return Err(anyhow!("No prune runs found for deployment {deployment}")); + } + let run = run.unwrap_or(*runs.last().unwrap()); + let Some((state, table_states)) = viewer.state(run)? else { + let runs = match runs.len() { + 0 => unreachable!("we checked that runs is not empty"), + 1 => format!("There is only one prune run #{}", runs[0]), + _ => format!( + "Only prune runs #{} up to #{} exist", + runs[0], + runs.last().unwrap() + ), + }; + return Err(anyhow!( + "No information about prune run #{run} found for deployment {deployment}. {runs}" + )); + }; + println!("prune {deployment} (run #{run})"); + println!( + " range: {} - {} ({} blocks, should keep {} blocks)", + state.first_block, + state.latest_block, + state.latest_block - state.first_block, + state.history_blocks + ); + println!(" started: {}", fmt::date_time(&state.started_at)); + match &state.finished_at { + Some(finished_at) => println!(" finished: {}", fmt::date_time(finished_at)), + None => println!(" finished: still running"), + } + println!( + " duration: {}", + fmt::duration(&state.started_at, &state.finished_at) + ); + + println!( + "\n{:^30} | {:^22} | {:^8} | {:^11} | {:^8}", + "table", "status", "rows", "batch_size", "duration" + ); + println!( + "{:-^30}-+-{:-^22}-+-{:-^8}-+-{:-^11}-+-{:-^8}", + "", "", "", "", "" + ); + for ts in table_states { + #[allow(unused_variables)] + let PruneTableState { + vid: _, + id: _, + run: _, + table_name, + strategy, + phase, + start_vid, + final_vid, + nonfinal_vid, + rows, + next_vid, + batch_size, + started_at, + finished_at, + } = ts; + + let complete = match phase { + Phase::Queued | Phase::Started => "0%".to_string(), + Phase::CopyFinal => percentage(start_vid, next_vid, final_vid), + Phase::CopyNonfinal | Phase::Delete => percentage(start_vid, next_vid, nonfinal_vid), + Phase::Done => fmt::check(), + Phase::Unknown => fmt::null(), + }; + + let table_name = fmt::abbreviate(&table_name, 30); + let rows = rows.map_or_null(|rows| rows.to_string()); + let batch_size = batch_size.map_or_null(|b| b.to_string()); + let duration = started_at.map_or_null(|s| fmt::duration(&s, &finished_at)); + let phase = phase.as_str(); + println!( + "{table_name:<30} | {:<15} {complete:>6} | {rows:>8} | {batch_size:>11} | {duration:>8}", + format!("{strategy}/{phase}") + ); + } + Ok(()) +} diff --git a/store/postgres/migrations/2025-04-08-224710_add_prune_state/down.sql b/store/postgres/migrations/2025-04-08-224710_add_prune_state/down.sql new file mode 100644 index 00000000000..324bc18f154 --- /dev/null +++ b/store/postgres/migrations/2025-04-08-224710_add_prune_state/down.sql @@ -0,0 +1,2 @@ +drop table subgraphs.prune_table_state; +drop table subgraphs.prune_state; diff --git a/store/postgres/migrations/2025-04-08-224710_add_prune_state/up.sql b/store/postgres/migrations/2025-04-08-224710_add_prune_state/up.sql new file mode 100644 index 00000000000..8c767ed7384 --- /dev/null +++ b/store/postgres/migrations/2025-04-08-224710_add_prune_state/up.sql @@ -0,0 +1,60 @@ +create table subgraphs.prune_state( + -- diesel can't deal with composite primary keys + vid int primary key + generated always as identity, + + -- id of the deployment + id int not null, + -- how many times the deployment has been pruned + run int not null, + + -- from PruneRequest + first_block int not null, + final_block int not null, + latest_block int not null, + history_blocks int not null, + + started_at timestamptz not null, + finished_at timestamptz, + + constraint prune_state_id_run_uq unique(id, run) +); + +create table subgraphs.prune_table_state( + -- diesel can't deal with composite primary keys + vid int primary key + generated always as identity, + + id int not null, + run int not null, + table_name text not null, + -- 'r' (rebuild) or 'd' (delete) + strategy char not null, + phase text not null, + + start_vid int8, + final_vid int8, + nonfinal_vid int8, + rows int8, + + next_vid int8, + batch_size int8, + + started_at timestamptz, + finished_at timestamptz, + + constraint prune_table_state_id_run_table_name_uq + unique(id, run, table_name), + + constraint prune_table_state_strategy_ck + check(strategy in ('r', 'd')), + + constraint prune_table_state_phase_ck + check(phase in ('queued', 'started', 'copy_final', + 'copy_nonfinal', 'delete', 'done')), + + constraint prune_table_state_id_run_fk + foreign key(id, run) + references subgraphs.prune_state(id, run) + on delete cascade +); diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index bb7f842843b..1cb569730a0 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -53,7 +53,7 @@ use crate::detail::ErrorDetail; use crate::dynds::DataSourcesTable; use crate::primary::{DeploymentId, Primary}; use crate::relational::index::{CreateIndex, IndexList, Method}; -use crate::relational::{Layout, LayoutCache, SqlName, Table}; +use crate::relational::{self, Layout, LayoutCache, SqlName, Table}; use crate::relational_queries::FromEntityData; use crate::{advisory_lock, catalog, retry}; use crate::{detail, ConnectionPool}; @@ -876,6 +876,19 @@ impl DeploymentStore { }) .await } + + pub(crate) async fn prune_viewer( + self: &Arc, + site: Arc, + ) -> Result { + let store = self.cheap_clone(); + let layout = self + .pool + .with_conn(move |conn, _| store.layout(conn, site.clone()).map_err(|e| e.into())) + .await?; + + Ok(relational::prune::Viewer::new(self.pool.clone(), layout)) + } } /// Methods that back the trait `WritableStore`, but have small variations in their signatures diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index 25c8c285910..baf4d523ed5 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -85,5 +85,6 @@ pub mod command_support { } pub use crate::deployment::{on_sync, OnSync}; pub use crate::primary::Namespace; + pub use crate::relational::prune::{Phase, PruneState, PruneTableState, Viewer}; pub use crate::relational::{Catalog, Column, ColumnType, Layout, SqlName}; } diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 27cee515265..35e35a35746 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -16,7 +16,7 @@ mod query_tests; pub(crate) mod dsl; pub(crate) mod index; -mod prune; +pub(crate) mod prune; mod rollup; pub(crate) mod value; diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 5c3035ce172..37a053f6b2b 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -28,6 +28,8 @@ use super::{ Catalog, Layout, Namespace, }; +pub use status::{Phase, PruneState, PruneTableState, Viewer}; + /// Utility to copy relevant data out of a source table and into a new /// destination table and replace the source table with the destination /// table @@ -90,6 +92,7 @@ impl TablePair { &self, conn: &mut PgConnection, reporter: &mut dyn PruneReporter, + tracker: &status::Tracker, earliest_block: BlockNumber, final_block: BlockNumber, cancel: &CancelHandle, @@ -99,6 +102,7 @@ impl TablePair { // Determine the last vid that we need to copy let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?; let mut batcher = VidBatcher::load(conn, &self.src_nsp, &self.src, range)?; + tracker.start_copy_final(conn, &self.src, range)?; while !batcher.finished() { let (_, rows) = batcher.step(|start, end| { @@ -132,11 +136,13 @@ impl TablePair { .map_err(StoreError::from) }) })?; + let rows = rows.unwrap_or(0); + tracker.copy_final_batch(conn, &self.src, rows, &batcher)?; cancel.check_cancel()?; reporter.prune_batch( self.src.name.as_str(), - rows.unwrap_or(0), + rows, PrunePhase::CopyFinal, batcher.finished(), ); @@ -151,6 +157,7 @@ impl TablePair { &self, conn: &mut PgConnection, reporter: &mut dyn PruneReporter, + tracker: &status::Tracker, final_block: BlockNumber, ) -> Result<(), StoreError> { let column_list = self.column_list(); @@ -158,6 +165,7 @@ impl TablePair { // Determine the last vid that we need to copy let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?; let mut batcher = VidBatcher::load(conn, &self.src.nsp, &self.src, range)?; + tracker.start_copy_nonfinal(conn, &self.src, range)?; while !batcher.finished() { let (_, rows) = batcher.step(|start, end| { @@ -186,10 +194,13 @@ impl TablePair { .map_err(StoreError::from) }) })?; + let rows = rows.unwrap_or(0); + + tracker.copy_nonfinal_batch(conn, &self.src, rows as i64, &batcher)?; reporter.prune_batch( self.src.name.as_str(), - rows.unwrap_or(0), + rows, PrunePhase::CopyNonfinal, batcher.finished(), ); @@ -352,18 +363,21 @@ impl Layout { /// time. The rebuild strategy never blocks reads, it only ever blocks /// writes. pub fn prune( - &self, + self: Arc, logger: &Logger, reporter: &mut dyn PruneReporter, conn: &mut PgConnection, req: &PruneRequest, cancel: &CancelHandle, ) -> Result<(), CancelableError> { + let tracker = status::Tracker::new(conn, self.clone())?; + reporter.start(req); let stats = self.version_stats(conn, reporter, true, cancel)?; let prunable_tables: Vec<_> = self.prunable_tables(&stats, req).into_iter().collect(); + tracker.start(conn, req, &prunable_tables)?; // create a shadow namespace where we will put the copies of our // tables, but only create it in the database if we really need it @@ -382,6 +396,7 @@ impl Layout { // is the definition of 'final' for (table, strat) in &prunable_tables { reporter.start_table(table.name.as_str()); + tracker.start_table(conn, table)?; match strat { PruningStrategy::Rebuild => { if recreate_dst_nsp { @@ -401,6 +416,7 @@ impl Layout { pair.copy_final_entities( conn, reporter, + &tracker, req.earliest_block, req.final_block, cancel, @@ -410,7 +426,7 @@ impl Layout { // see also: deployment-lock-for-update reporter.start_switch(); deployment::with_lock(conn, &self.site, |conn| -> Result<_, StoreError> { - pair.copy_nonfinal_entities(conn, reporter, req.final_block)?; + pair.copy_nonfinal_entities(conn, reporter, &tracker, req.final_block)?; cancel.check_cancel().map_err(CancelableError::from)?; conn.transaction(|conn| pair.switch(logger, conn))?; @@ -426,6 +442,7 @@ impl Layout { let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?; let mut batcher = VidBatcher::load(conn, &self.site.namespace, &table, range)?; + tracker.start_delete(conn, table, range, &batcher)?; while !batcher.finished() { let (_, rows) = batcher.step(|start, end| {sql_query(format!( "/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \ @@ -439,10 +456,13 @@ impl Layout { .bind::(start) .bind::(end) .execute(conn).map_err(StoreError::from)})?; + let rows = rows.unwrap_or(0); + + tracker.delete_batch(conn, table, rows, &batcher)?; reporter.prune_batch( table.name.as_str(), - rows.unwrap_or(0), + rows, PrunePhase::Delete, batcher.finished(), ); @@ -450,6 +470,7 @@ impl Layout { } } reporter.finish_table(table.name.as_str()); + tracker.finish_table(conn, table)?; } // Get rid of the temporary prune schema if we actually created it if !recreate_dst_nsp { @@ -465,7 +486,462 @@ impl Layout { self.analyze_tables(conn, reporter, tables, cancel)?; reporter.finish(); + tracker.finish(conn)?; Ok(()) } } + +mod status { + use std::sync::Arc; + + use chrono::{DateTime, Utc}; + use diesel::{ + deserialize::FromSql, + dsl::insert_into, + pg::{Pg, PgValue}, + query_builder::QueryFragment, + serialize::{Output, ToSql}, + sql_types::Text, + table, update, AsChangeset, Connection, ExpressionMethods as _, OptionalExtension, + PgConnection, QueryDsl as _, RunQueryDsl as _, + }; + use graph::{ + components::store::{PruneRequest, PruningStrategy, StoreResult}, + prelude::StoreError, + }; + + use crate::{ + relational::{Layout, Table}, + vid_batcher::{VidBatcher, VidRange}, + ConnectionPool, + }; + + table! { + subgraphs.prune_state(vid) { + vid -> Integer, + // Deployment id (sgd) + id -> Integer, + run -> Integer, + // The first block in the subgraph when the prune started + first_block -> Integer, + final_block -> Integer, + latest_block -> Integer, + // The amount of history configured + history_blocks -> Integer, + + started_at -> Timestamptz, + finished_at -> Nullable, + } + } + + table! { + subgraphs.prune_table_state(vid) { + vid -> Integer, + // Deployment id (sgd) + id -> Integer, + run -> Integer, + table_name -> Text, + + strategy -> Char, + // see enum Phase + phase -> Text, + + start_vid -> Nullable, + final_vid -> Nullable, + nonfinal_vid -> Nullable, + rows -> Nullable, + + next_vid -> Nullable, + batch_size -> Nullable, + + started_at -> Nullable, + finished_at -> Nullable, + } + } + + #[derive(Clone, Copy, Debug, AsExpression, FromSqlRow)] + #[diesel(sql_type = Text)] + pub enum Phase { + Queued, + Started, + /// Only used when strategy is Rebuild + CopyFinal, + /// Only used when strategy is Rebuild + CopyNonfinal, + /// Only used when strategy is Delete + Delete, + Done, + /// Not a real phase, indicates that the database has an invalid + /// value + Unknown, + } + + impl Phase { + pub fn from_str(phase: &str) -> Self { + use Phase::*; + match phase { + "queued" => Queued, + "started" => Started, + "copy_final" => CopyFinal, + "copy_nonfinal" => CopyNonfinal, + "delete" => Delete, + "done" => Done, + _ => Unknown, + } + } + + pub fn as_str(&self) -> &str { + use Phase::*; + match self { + Queued => "queued", + Started => "started", + CopyFinal => "copy_final", + CopyNonfinal => "copy_nonfinal", + Delete => "delete", + Done => "done", + Unknown => "*unknown*", + } + } + } + + impl ToSql for Phase { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result { + let phase = self.as_str(); + >::to_sql(phase, &mut out.reborrow()) + } + } + + impl FromSql for Phase { + fn from_sql(bytes: PgValue) -> diesel::deserialize::Result { + Ok(Phase::from_str(std::str::from_utf8(bytes.as_bytes())?)) + } + } + + /// Information about one pruning run for a deployment + #[derive(Queryable)] + pub struct PruneState { + pub vid: i32, + pub id: i32, + pub run: i32, + pub first_block: i32, + pub final_block: i32, + pub latest_block: i32, + pub history_blocks: i32, + + pub started_at: DateTime, + pub finished_at: Option>, + } + + /// Per-table information about the pruning run for a deployment + #[derive(Queryable)] + pub struct PruneTableState { + pub vid: i32, + pub id: i32, + pub run: i32, + pub table_name: String, + + // 'r' for rebuild or 'd' for delete + pub strategy: String, + pub phase: Phase, + + pub start_vid: Option, + pub final_vid: Option, + pub nonfinal_vid: Option, + pub rows: Option, + + pub next_vid: Option, + pub batch_size: Option, + + pub started_at: Option>, + pub finished_at: Option>, + } + + /// A helper to persist pruning progress in the database + pub(super) struct Tracker { + layout: Arc, + run: i32, + } + + impl Tracker { + pub(super) fn new(conn: &mut PgConnection, layout: Arc) -> StoreResult { + use prune_state as ps; + let run = ps::table + .filter(ps::id.eq(layout.site.id)) + .order(ps::run.desc()) + .select(ps::run) + .get_result::(conn) + .optional() + .map_err(StoreError::from)? + .unwrap_or(0) + + 1; + + Ok(Tracker { layout, run }) + } + + pub(super) fn start( + &self, + conn: &mut PgConnection, + req: &PruneRequest, + prunable_tables: &[(&Arc, PruningStrategy)], + ) -> StoreResult<()> { + use prune_state as ps; + use prune_table_state as pts; + + conn.transaction(|conn| { + insert_into(ps::table) + .values(( + ps::id.eq(self.layout.site.id), + ps::run.eq(self.run), + ps::first_block.eq(req.first_block), + ps::final_block.eq(req.final_block), + ps::latest_block.eq(req.latest_block), + ps::history_blocks.eq(req.history_blocks), + ps::started_at.eq(diesel::dsl::now), + )) + .execute(conn)?; + + for (table, strat) in prunable_tables { + let strat = match strat { + PruningStrategy::Rebuild => "r", + PruningStrategy::Delete => "d", + }; + insert_into(pts::table) + .values(( + pts::id.eq(self.layout.site.id), + pts::run.eq(self.run), + pts::table_name.eq(table.name.as_str()), + pts::strategy.eq(strat), + pts::phase.eq(Phase::Queued), + )) + .execute(conn)?; + } + Ok(()) + }) + } + + pub(crate) fn start_table( + &self, + conn: &mut PgConnection, + table: &Table, + ) -> StoreResult<()> { + use prune_table_state as pts; + + self.update_table_state( + conn, + table, + ( + pts::started_at.eq(diesel::dsl::now), + pts::phase.eq(Phase::Started), + ), + )?; + + Ok(()) + } + + pub(crate) fn start_copy_final( + &self, + conn: &mut PgConnection, + table: &Table, + range: VidRange, + ) -> StoreResult<()> { + use prune_table_state as pts; + + let values = ( + pts::phase.eq(Phase::CopyFinal), + pts::start_vid.eq(range.min), + pts::next_vid.eq(range.min), + pts::final_vid.eq(range.max), + pts::rows.eq(0), + ); + + self.update_table_state(conn, table, values) + } + + pub(crate) fn copy_final_batch( + &self, + conn: &mut PgConnection, + table: &Table, + rows: usize, + batcher: &VidBatcher, + ) -> StoreResult<()> { + use prune_table_state as pts; + + let values = ( + pts::next_vid.eq(batcher.next_vid()), + pts::batch_size.eq(batcher.batch_size() as i64), + pts::rows.eq(pts::rows + (rows as i64)), + ); + + self.update_table_state(conn, table, values) + } + + pub(crate) fn start_copy_nonfinal( + &self, + conn: &mut PgConnection, + table: &Table, + range: VidRange, + ) -> StoreResult<()> { + use prune_table_state as pts; + + let values = ( + pts::phase.eq(Phase::CopyNonfinal), + pts::nonfinal_vid.eq(range.max), + ); + self.update_table_state(conn, table, values) + } + + pub(crate) fn copy_nonfinal_batch( + &self, + conn: &mut PgConnection, + src: &Table, + rows: i64, + batcher: &VidBatcher, + ) -> StoreResult<()> { + use prune_table_state as pts; + + let values = ( + pts::next_vid.eq(batcher.next_vid()), + pts::batch_size.eq(batcher.batch_size() as i64), + pts::rows.eq(pts::rows + rows), + ); + + self.update_table_state(conn, src, values) + } + + pub(crate) fn finish_table( + &self, + conn: &mut PgConnection, + table: &Table, + ) -> StoreResult<()> { + use prune_table_state as pts; + + let values = ( + pts::finished_at.eq(diesel::dsl::now), + pts::phase.eq(Phase::Done), + ); + + self.update_table_state(conn, table, values) + } + + pub(crate) fn start_delete( + &self, + conn: &mut PgConnection, + table: &Table, + range: VidRange, + batcher: &VidBatcher, + ) -> StoreResult<()> { + use prune_table_state as pts; + + let values = ( + pts::phase.eq(Phase::Delete), + pts::start_vid.eq(range.min), + pts::final_vid.eq(range.max), + pts::nonfinal_vid.eq(range.max), + pts::rows.eq(0), + pts::next_vid.eq(range.min), + pts::batch_size.eq(batcher.batch_size() as i64), + ); + + self.update_table_state(conn, table, values) + } + + pub(crate) fn delete_batch( + &self, + conn: &mut PgConnection, + table: &Table, + rows: usize, + batcher: &VidBatcher, + ) -> StoreResult<()> { + use prune_table_state as pts; + + let values = ( + pts::next_vid.eq(batcher.next_vid()), + pts::batch_size.eq(batcher.batch_size() as i64), + pts::rows.eq(pts::rows - (rows as i64)), + ); + + self.update_table_state(conn, table, values) + } + + fn update_table_state( + &self, + conn: &mut PgConnection, + table: &Table, + values: V, + ) -> StoreResult<()> + where + V: AsChangeset, + C: QueryFragment, + { + use prune_table_state as pts; + + update(pts::table) + .filter(pts::id.eq(self.layout.site.id)) + .filter(pts::run.eq(self.run)) + .filter(pts::table_name.eq(table.name.as_str())) + .set(values) + .execute(conn)?; + Ok(()) + } + + pub(crate) fn finish(&self, conn: &mut PgConnection) -> StoreResult<()> { + use prune_state as ps; + + update(ps::table) + .filter(ps::id.eq(self.layout.site.id)) + .filter(ps::run.eq(self.run)) + .set((ps::finished_at.eq(diesel::dsl::now),)) + .execute(conn)?; + Ok(()) + } + } + + /// A helper to read pruning progress from the database + pub struct Viewer { + pool: ConnectionPool, + layout: Arc, + } + + impl Viewer { + pub fn new(pool: ConnectionPool, layout: Arc) -> Self { + Self { pool, layout } + } + + pub fn runs(&self) -> StoreResult> { + use prune_state as ps; + + let mut conn = self.pool.get()?; + let runs = ps::table + .filter(ps::id.eq(self.layout.site.id)) + .select(ps::run) + .order(ps::run.asc()) + .load::(&mut conn) + .map_err(StoreError::from)?; + let runs = runs.into_iter().map(|run| run as usize).collect::>(); + Ok(runs) + } + + pub fn state(&self, run: usize) -> StoreResult)>> { + use prune_state as ps; + use prune_table_state as pts; + + let mut conn = self.pool.get()?; + + let ptss = pts::table + .filter(pts::id.eq(self.layout.site.id)) + .filter(pts::run.eq(run as i32)) + .order(pts::table_name.asc()) + .load::(&mut conn) + .map_err(StoreError::from)?; + + ps::table + .filter(ps::id.eq(self.layout.site.id)) + .filter(ps::run.eq(run as i32)) + .first::(&mut conn) + .optional() + .map_err(StoreError::from) + .map(|state| state.map(|state| (state, ptss))) + } + } +} diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index a4e7ffda659..d19cc68f44a 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -40,6 +40,7 @@ use crate::{ deployment::{OnSync, SubgraphHealth}, primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site}, relational::{ + self, index::{IndexList, Method}, Layout, }, @@ -1249,6 +1250,16 @@ impl SubgraphStoreInner { store.prune(reporter, site, req).await } + pub async fn prune_viewer( + &self, + deployment: &DeploymentLocator, + ) -> Result { + let site = self.find_site(deployment.id.into())?; + let store = self.for_site(&site)?; + + store.prune_viewer(site).await + } + pub fn set_history_blocks( &self, deployment: &DeploymentLocator, From b3d423e8aca4fef2ab42484f3741c867bba38db8 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 16 Apr 2025 12:56:38 -0700 Subject: [PATCH 5/9] store: Limit for how many prune runs we keep status info --- graph/src/env/store.rs | 6 ++++++ node/src/manager/commands/prune.rs | 6 ++++-- store/postgres/src/relational/prune.rs | 10 ++++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 8197d07b6bc..1c768f45bed 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -111,6 +111,9 @@ pub struct EnvVarsStore { /// blocks) than its history limit. The default value is 1.2 and the /// value must be at least 1.01 pub history_slack_factor: f64, + /// For how many prune runs per deployment to keep status information. + /// Set by `GRAPH_STORE_HISTORY_KEEP_STATUS`. The default is 5 + pub prune_keep_history: usize, /// How long to accumulate changes into a batch before a write has to /// happen. Set by the environment variable /// `GRAPH_STORE_WRITE_BATCH_DURATION` in seconds. The default is 300s. @@ -184,6 +187,7 @@ impl TryFrom for EnvVarsStore { rebuild_threshold: x.rebuild_threshold.0, delete_threshold: x.delete_threshold.0, history_slack_factor: x.history_slack_factor.0, + prune_keep_history: x.prune_keep_status, write_batch_duration: Duration::from_secs(x.write_batch_duration_in_secs), write_batch_size: x.write_batch_size * 1_000, create_gin_indexes: x.create_gin_indexes, @@ -257,6 +261,8 @@ pub struct InnerStore { delete_threshold: ZeroToOneF64, #[envconfig(from = "GRAPH_STORE_HISTORY_SLACK_FACTOR", default = "1.2")] history_slack_factor: HistorySlackF64, + #[envconfig(from = "GRAPH_STORE_HISTORY_KEEP_STATUS", default = "5")] + prune_keep_status: usize, #[envconfig(from = "GRAPH_STORE_WRITE_BATCH_DURATION", default = "300")] write_batch_duration_in_secs: u64, #[envconfig(from = "GRAPH_STORE_WRITE_BATCH_SIZE", default = "10000")] diff --git a/node/src/manager/commands/prune.rs b/node/src/manager/commands/prune.rs index 0fc5538fc71..05b1730806d 100644 --- a/node/src/manager/commands/prune.rs +++ b/node/src/manager/commands/prune.rs @@ -341,14 +341,16 @@ pub async fn status( let runs = match runs.len() { 0 => unreachable!("we checked that runs is not empty"), 1 => format!("There is only one prune run #{}", runs[0]), + 2 => format!("Only prune runs #{} and #{} exist", runs[0], runs[1]), _ => format!( - "Only prune runs #{} up to #{} exist", + "Only prune runs #{} and #{} up to #{} exist", runs[0], + runs[1], runs.last().unwrap() ), }; return Err(anyhow!( - "No information about prune run #{run} found for deployment {deployment}. {runs}" + "No information about prune run #{run} found for deployment {deployment}.\n {runs}" )); }; println!("prune {deployment} (run #{run})"); diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 37a053f6b2b..3748dde587c 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -508,6 +508,7 @@ mod status { }; use graph::{ components::store::{PruneRequest, PruningStrategy, StoreResult}, + env::ENV_VARS, prelude::StoreError, }; @@ -676,6 +677,15 @@ mod status { .unwrap_or(0) + 1; + // Delete old prune state. Keep the initial run and the last + // `prune_keep_history` runs (including this one) + diesel::delete(ps::table) + .filter(ps::id.eq(layout.site.id)) + .filter(ps::run.gt(1)) + .filter(ps::run.lt(run - (ENV_VARS.store.prune_keep_history - 1) as i32)) + .execute(conn) + .map_err(StoreError::from)?; + Ok(Tracker { layout, run }) } From 29ac576259a74f339c33b660afa19993d1f8d1f5 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 16 Apr 2025 13:04:18 -0700 Subject: [PATCH 6/9] store: Map the pruning status tables in the sharded schema in the primary --- store/postgres/src/pool/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index 5fcc7b0cd1c..628a977ff9b 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -66,6 +66,8 @@ const SHARDED_TABLES: [(&str, &[&str]); 2] = [ "subgraph", "subgraph_version", "subgraph_deployment_assignment", + "prune_state", + "prune_table_state", ], ), ]; From 5e565a72eb7ea4246c5ae3324d33a85a5787ca5d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 16 Apr 2025 14:41:49 -0700 Subject: [PATCH 7/9] node: Format larger durations as days/hours/minutes --- node/src/manager/fmt.rs | 47 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/node/src/manager/fmt.rs b/node/src/manager/fmt.rs index 549d173ca1a..6aaa12192a7 100644 --- a/node/src/manager/fmt.rs +++ b/node/src/manager/fmt.rs @@ -50,7 +50,20 @@ pub fn human_duration(duration: Duration) -> String { } else if duration.num_minutes() < 5 { format!("{}s", duration.num_seconds()) } else { - format!("{}m", duration.num_minutes()) + let minutes = duration.num_minutes(); + if minutes < 90 { + format!("{}m", duration.num_minutes()) + } else { + let hours = minutes / 60; + let minutes = minutes % 60; + if hours < 24 { + format!("{}h {}m", hours, minutes) + } else { + let days = hours / 24; + let hours = hours % 24; + format!("{}d {}h {}m", days, hours, minutes) + } + } } } @@ -76,3 +89,35 @@ pub fn date_time(date: &DateTime) -> String { let date = DateTime::::from(*date); date.format("%Y-%m-%d %H:%M:%S%Z").to_string() } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_human_duration() { + let duration = Duration::seconds(1); + assert_eq!(human_duration(duration), "1000ms"); + + let duration = Duration::seconds(10); + assert_eq!(human_duration(duration), "10s"); + + let duration = Duration::minutes(5); + assert_eq!(human_duration(duration), "5m"); + + let duration = Duration::hours(1); + assert_eq!(human_duration(duration), "60m"); + + let duration = Duration::minutes(100); + assert_eq!(human_duration(duration), "1h 40m"); + + let duration = Duration::days(1); + assert_eq!(human_duration(duration), "1d 0h 0m"); + + let duration = Duration::days(1) + Duration::minutes(35); + assert_eq!(human_duration(duration), "1d 0h 35m"); + + let duration = Duration::days(1) + Duration::minutes(95); + assert_eq!(human_duration(duration), "1d 1h 35m"); + } +} From 0912ff58da3b9a2de3f428b0238fd2e8a90c20e3 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 16 Apr 2025 16:04:39 -0700 Subject: [PATCH 8/9] Cargo.toml: Enable line wrapping for help text in clap --- Cargo.lock | 13 ++++++++++++- Cargo.toml | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec8c31e1233..abf9368d3e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -724,6 +724,7 @@ dependencies = [ "anstyle", "clap_lex", "strsim", + "terminal_size 0.3.0", ] [[package]] @@ -773,7 +774,7 @@ dependencies = [ "lazy_static", "libc", "regex", - "terminal_size", + "terminal_size 0.1.17", "unicode-width", "winapi", "winapi-util", @@ -5235,6 +5236,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "terminal_size" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7" +dependencies = [ + "rustix", + "windows-sys 0.48.0", +] + [[package]] name = "test-store" version = "0.36.0" diff --git a/Cargo.toml b/Cargo.toml index ffc3961d405..e258a84082a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ async-graphql-axum = "7.0.15" axum = "0.8.1" chrono = "0.4.38" bs58 = "0.5.1" -clap = { version = "4.5.4", features = ["derive", "env"] } +clap = { version = "4.5.4", features = ["derive", "env", "wrap_help"] } derivative = "2.2.0" diesel = { version = "2.2.7", features = [ "postgres", From 28fa4445d0ca42e0ea234826ea553533c8e9a4da Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 17 Apr 2025 18:58:52 -0700 Subject: [PATCH 9/9] store: Address review comments for tracking pruning status --- store/postgres/src/relational/prune.rs | 46 +++----------------------- 1 file changed, 5 insertions(+), 41 deletions(-) diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 3748dde587c..1a1236e2aaf 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -137,7 +137,7 @@ impl TablePair { }) })?; let rows = rows.unwrap_or(0); - tracker.copy_final_batch(conn, &self.src, rows, &batcher)?; + tracker.finish_batch(conn, &self.src, rows as i64, &batcher)?; cancel.check_cancel()?; reporter.prune_batch( @@ -196,7 +196,7 @@ impl TablePair { })?; let rows = rows.unwrap_or(0); - tracker.copy_nonfinal_batch(conn, &self.src, rows as i64, &batcher)?; + tracker.finish_batch(conn, &self.src, rows as i64, &batcher)?; reporter.prune_batch( self.src.name.as_str(), @@ -458,7 +458,7 @@ impl Layout { .execute(conn).map_err(StoreError::from)})?; let rows = rows.unwrap_or(0); - tracker.delete_batch(conn, table, rows, &batcher)?; + tracker.finish_batch(conn, table, -(rows as i64), &batcher)?; reporter.prune_batch( table.name.as_str(), @@ -682,7 +682,7 @@ mod status { diesel::delete(ps::table) .filter(ps::id.eq(layout.site.id)) .filter(ps::run.gt(1)) - .filter(ps::run.lt(run - (ENV_VARS.store.prune_keep_history - 1) as i32)) + .filter(ps::run.lt(run - (ENV_VARS.store.prune_keep_history as i32 - 1))) .execute(conn) .map_err(StoreError::from)?; @@ -768,24 +768,6 @@ mod status { self.update_table_state(conn, table, values) } - pub(crate) fn copy_final_batch( - &self, - conn: &mut PgConnection, - table: &Table, - rows: usize, - batcher: &VidBatcher, - ) -> StoreResult<()> { - use prune_table_state as pts; - - let values = ( - pts::next_vid.eq(batcher.next_vid()), - pts::batch_size.eq(batcher.batch_size() as i64), - pts::rows.eq(pts::rows + (rows as i64)), - ); - - self.update_table_state(conn, table, values) - } - pub(crate) fn start_copy_nonfinal( &self, conn: &mut PgConnection, @@ -801,7 +783,7 @@ mod status { self.update_table_state(conn, table, values) } - pub(crate) fn copy_nonfinal_batch( + pub(crate) fn finish_batch( &self, conn: &mut PgConnection, src: &Table, @@ -856,24 +838,6 @@ mod status { self.update_table_state(conn, table, values) } - pub(crate) fn delete_batch( - &self, - conn: &mut PgConnection, - table: &Table, - rows: usize, - batcher: &VidBatcher, - ) -> StoreResult<()> { - use prune_table_state as pts; - - let values = ( - pts::next_vid.eq(batcher.next_vid()), - pts::batch_size.eq(batcher.batch_size() as i64), - pts::rows.eq(pts::rows - (rows as i64)), - ); - - self.update_table_state(conn, table, values) - } - fn update_table_state( &self, conn: &mut PgConnection,