diff --git a/node/src/manager/color.rs b/node/src/manager/color.rs index 3b1f4dfe4fa..54d65e88ac0 100644 --- a/node/src/manager/color.rs +++ b/node/src/manager/color.rs @@ -1,4 +1,4 @@ -use std::sync::Mutex; +use std::{io, sync::Mutex}; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; use graph::prelude::{isatty, lazy_static}; @@ -53,6 +53,11 @@ impl Terminal { self.out.set_color(&self.spec).map_err(Into::into) } + pub fn red(&mut self) -> CmdResult { + self.spec.set_fg(Some(Color::Red)); + self.out.set_color(&self.spec).map_err(Into::into) + } + pub fn dim(&mut self) -> CmdResult { self.spec.set_dimmed(true); self.out.set_color(&self.spec).map_err(Into::into) @@ -67,6 +72,18 @@ impl Terminal { self.spec = ColorSpec::new(); self.out.reset().map_err(Into::into) } + + pub fn with_color(&mut self, color: Color, f: F) -> io::Result + where + F: FnOnce(&mut Self) -> io::Result, + { + self.spec.set_fg(Some(color)); + self.out.set_color(&self.spec).map_err(io::Error::from)?; + let res = f(self); + self.spec = ColorSpec::new(); + self.out.set_color(&self.spec).map_err(io::Error::from)?; + res + } } impl std::io::Write for Terminal { diff --git a/node/src/manager/commands/prune.rs b/node/src/manager/commands/prune.rs index 05b1730806d..ea46d77d0de 100644 --- a/node/src/manager/commands/prune.rs +++ b/node/src/manager/commands/prune.rs @@ -18,8 +18,10 @@ use graph_store_postgres::{ command_support::{Phase, PruneTableState}, ConnectionPool, Store, }; +use termcolor::Color; use crate::manager::{ + color::Terminal, commands::stats::show_stats, deployment::DeploymentSearch, fmt::{self, MapOrNull as _}, @@ -329,6 +331,8 @@ pub async fn status( } } + let mut term = Terminal::new(); + let deployment = search.locate_unique(&primary_pool)?; let viewer = store.subgraph_store().prune_viewer(&deployment).await?; @@ -353,32 +357,43 @@ pub async fn status( "No information about prune run #{run} found for deployment {deployment}.\n {runs}" )); }; - println!("prune {deployment} (run #{run})"); - println!( + writeln!(term, "prune {deployment} (run #{run})")?; + + if let (Some(errored_at), Some(error)) = (&state.errored_at, &state.error) { + term.with_color(Color::Red, |term| { + writeln!(term, " error: {error}")?; + writeln!(term, " at: {}", fmt::date_time(errored_at)) + })?; + } + writeln!( + term, " range: {} - {} ({} blocks, should keep {} blocks)", state.first_block, state.latest_block, state.latest_block - state.first_block, state.history_blocks - ); - println!(" started: {}", fmt::date_time(&state.started_at)); + )?; + writeln!(term, " started: {}", fmt::date_time(&state.started_at))?; match &state.finished_at { - Some(finished_at) => println!(" finished: {}", fmt::date_time(finished_at)), - None => println!(" finished: still running"), + Some(finished_at) => writeln!(term, " finished: {}", fmt::date_time(finished_at))?, + None => writeln!(term, " finished: still running")?, } - println!( + writeln!( + term, " duration: {}", fmt::duration(&state.started_at, &state.finished_at) - ); + )?; - println!( + writeln!( + term, "\n{:^30} | {:^22} | {:^8} | {:^11} | {:^8}", "table", "status", "rows", "batch_size", "duration" - ); - println!( + )?; + writeln!( + term, "{:-^30}-+-{:-^22}-+-{:-^8}-+-{:-^11}-+-{:-^8}", "", "", "", "", "" - ); + )?; for ts in table_states { #[allow(unused_variables)] let PruneTableState { @@ -411,10 +426,10 @@ pub async fn status( let batch_size = batch_size.map_or_null(|b| b.to_string()); let duration = started_at.map_or_null(|s| fmt::duration(&s, &finished_at)); let phase = phase.as_str(); - println!( + writeln!(term, "{table_name:<30} | {:<15} {complete:>6} | {rows:>8} | {batch_size:>11} | {duration:>8}", format!("{strategy}/{phase}") - ); + )?; } Ok(()) } diff --git a/store/postgres/migrations/2025-04-25-163121_prune_error/down.sql b/store/postgres/migrations/2025-04-25-163121_prune_error/down.sql new file mode 100644 index 00000000000..02c16447136 --- /dev/null +++ b/store/postgres/migrations/2025-04-25-163121_prune_error/down.sql @@ -0,0 +1,3 @@ +alter table subgraphs.prune_state + drop column errored_at, + drop column error; diff --git a/store/postgres/migrations/2025-04-25-163121_prune_error/up.sql b/store/postgres/migrations/2025-04-25-163121_prune_error/up.sql new file mode 100644 index 00000000000..39e12cd3508 --- /dev/null +++ b/store/postgres/migrations/2025-04-25-163121_prune_error/up.sql @@ -0,0 +1,4 @@ +alter table subgraphs.prune_state + add column errored_at timestamptz, + add column error text, + add constraint error_ck check ((errored_at is null) = (error is null)); diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index a6767082555..a4ed8fa55d3 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -1002,3 +1002,42 @@ pub(crate) fn histogram_bounds( .map(|bounds| bounds.map(|b| b.bounds).unwrap_or_default()) .map_err(StoreError::from) } + +/// Return the name of the sequence that Postgres uses to handle +/// auto-incrementing columns. This takes Postgres' way of dealing with long +/// table and sequence names into account. +pub(crate) fn seq_name(table_name: &str, column_name: &str) -> String { + // Postgres limits all identifiers to 63 characters. When it + // constructs the name of a sequence for a column in a table, it + // truncates the table name so that appending '_{column}_seq' to + // it is at most 63 characters + let len = 63 - (5 + column_name.len()); + let len = len.min(table_name.len()); + format!("{}_{column_name}_seq", &table_name[0..len]) +} + +#[cfg(test)] +mod test { + use super::seq_name; + + #[test] + fn seq_name_works() { + // Pairs of (table_name, vid_seq_name) + const DATA: &[(&str, &str)] = &[ + ("token", "token_vid_seq"), + ( + "frax_vst_curve_strategy_total_reward_token_collected_event", + "frax_vst_curve_strategy_total_reward_token_collected_ev_vid_seq", + ), + ( + "rolling_asset_sent_for_last_24_hours_per_chain_and_token", + "rolling_asset_sent_for_last_24_hours_per_chain_and_toke_vid_seq", + ), + ]; + + for (tbl, exp) in DATA { + let act = seq_name(tbl, "vid"); + assert_eq!(exp, &act); + } + } +} diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index b02d076fcb0..4703d59f0c1 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1181,13 +1181,20 @@ impl DeploymentStore { // how long pruning itself takes let _section = stopwatch.start_section("transact_blocks_prune"); - self.spawn_prune( + if let Err(res) = self.spawn_prune( logger, - site, + site.cheap_clone(), layout.history_blocks, earliest_block, batch.block_ptr.number, - )?; + ) { + warn!( + logger, + "Failed to spawn prune task. Will try to prune again later"; + "subgraph" => site.deployment.to_string(), + "error" => res.to_string(), + ); + } } Ok(()) @@ -1270,7 +1277,8 @@ impl DeploymentStore { )?; let deployment_id = site.id; - let handle = graph::spawn(run(logger.cheap_clone(), self.clone(), site, req)); + let logger = Logger::new(&logger, o!("component" => "Prune")); + let handle = graph::spawn(run(logger, self.clone(), site, req)); self.prune_handles .lock() .unwrap() diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 6d5295e5535..60386370b9c 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -215,8 +215,6 @@ impl TablePair { let src_nsp = &self.src_nsp; let dst_nsp = &self.dst_nsp; - let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name); - let mut query = String::new(); // What we are about to do would get blocked by autovacuum on our @@ -229,6 +227,8 @@ impl TablePair { // Make sure the vid sequence continues from where it was in case // that we use autoincrementing order of the DB if !self.src.object.has_vid_seq() { + let vid_seq = catalog::seq_name(&self.src.name, VID_COLUMN); + writeln!( query, "select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));" @@ -363,6 +363,11 @@ impl Layout { /// also block queries to the deployment, often for extended periods of /// time. The rebuild strategy never blocks reads, it only ever blocks /// writes. + /// + /// This method will only return an `Err` if storing pruning status + /// fails, e.g. because the database is not available. All errors that + /// happen during pruning itself will be stored in the `prune_state` + /// table and this method will return `Ok` pub fn prune( self: Arc, logger: &Logger, @@ -373,28 +378,38 @@ impl Layout { ) -> Result<(), CancelableError> { let tracker = status::Tracker::new(conn, self.clone())?; - reporter.start(req); + let res = self.prune_inner(logger, reporter, conn, req, cancel, &tracker); - let stats = self.version_stats(conn, reporter, true, cancel)?; + match res { + Ok(_) => { + tracker.finish(conn)?; + } + Err(e) => { + // If we get an error, we need to set the error in the + // database and finish the tracker + let err = e.to_string(); + tracker.error(conn, &err)?; + } + } + Ok(()) + } + + fn prune_inner( + self: Arc, + logger: &Logger, + reporter: &mut dyn PruneReporter, + conn: &mut PgConnection, + req: &PruneRequest, + cancel: &CancelHandle, + tracker: &status::Tracker, + ) -> Result<(), CancelableError> { + reporter.start(req); + let stats = self.version_stats(conn, reporter, true, cancel)?; let prunable_tables: Vec<_> = self.prunable_tables(&stats, req).into_iter().collect(); tracker.start(conn, req, &prunable_tables)?; - - // create a shadow namespace where we will put the copies of our - // tables, but only create it in the database if we really need it let dst_nsp = Namespace::prune(self.site.id); let mut recreate_dst_nsp = true; - - // Go table by table; note that the subgraph writer can write in - // between the execution of the `with_lock` block below, and might - // therefore work with tables where some are pruned and some are not - // pruned yet. That does not affect correctness since we make no - // assumption about where the subgraph head is. If the subgraph - // advances during this loop, we might have an unnecessarily - // pessimistic but still safe value for `final_block`. We do assume - // that `final_block` is far enough from the subgraph head that it - // stays final even if a revert happens during this loop, but that - // is the definition of 'final' for (table, strat) in &prunable_tables { reporter.start_table(table.name.as_str()); tracker.start_table(conn, table)?; @@ -417,7 +432,7 @@ impl Layout { pair.copy_final_entities( conn, reporter, - &tracker, + tracker, req.earliest_block, req.final_block, cancel, @@ -427,7 +442,7 @@ impl Layout { // see also: deployment-lock-for-update reporter.start_switch(); deployment::with_lock(conn, &self.site, |conn| -> Result<_, StoreError> { - pair.copy_nonfinal_entities(conn, reporter, &tracker, req.final_block)?; + pair.copy_nonfinal_entities(conn, reporter, tracker, req.final_block)?; cancel.check_cancel().map_err(CancelableError::from)?; conn.transaction(|conn| pair.switch(logger, conn))?; @@ -473,22 +488,15 @@ impl Layout { reporter.finish_table(table.name.as_str()); tracker.finish_table(conn, table)?; } - // Get rid of the temporary prune schema if we actually created it if !recreate_dst_nsp { catalog::drop_schema(conn, dst_nsp.as_str())?; } - for (table, _) in &prunable_tables { catalog::set_last_pruned_block(conn, &self.site, &table.name, req.earliest_block)?; } - - // Analyze the new tables let tables = prunable_tables.iter().map(|(table, _)| *table).collect(); self.analyze_tables(conn, reporter, tables, cancel)?; - reporter.finish(); - tracker.finish(conn)?; - Ok(()) } } @@ -534,6 +542,8 @@ mod status { started_at -> Timestamptz, finished_at -> Nullable, + errored_at -> Nullable, + error -> Nullable, } } @@ -633,6 +643,9 @@ mod status { pub started_at: DateTime, pub finished_at: Option>, + + pub errored_at: Option>, + pub error: Option, } /// Per-table information about the pruning run for a deployment @@ -870,6 +883,21 @@ mod status { .execute(conn)?; Ok(()) } + + pub(crate) fn error(&self, conn: &mut PgConnection, err: &str) -> StoreResult<()> { + use prune_state as ps; + + update(ps::table) + .filter(ps::id.eq(self.layout.site.id)) + .filter(ps::run.eq(self.run)) + .set(( + ps::finished_at.eq(diesel::dsl::now), + ps::errored_at.eq(diesel::dsl::now), + ps::error.eq(err), + )) + .execute(conn)?; + Ok(()) + } } /// A helper to read pruning progress from the database