Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion node/src/manager/color.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Mutex;
use std::{io, sync::Mutex};
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};

use graph::prelude::{isatty, lazy_static};
Expand Down Expand Up @@ -53,6 +53,11 @@ impl Terminal {
self.out.set_color(&self.spec).map_err(Into::into)
}

pub fn red(&mut self) -> CmdResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this one is not used anymore.

self.spec.set_fg(Some(Color::Red));
self.out.set_color(&self.spec).map_err(Into::into)
}

pub fn dim(&mut self) -> CmdResult {
self.spec.set_dimmed(true);
self.out.set_color(&self.spec).map_err(Into::into)
Expand All @@ -67,6 +72,18 @@ impl Terminal {
self.spec = ColorSpec::new();
self.out.reset().map_err(Into::into)
}

pub fn with_color<F, R>(&mut self, color: Color, f: F) -> io::Result<R>
where
F: FnOnce(&mut Self) -> io::Result<R>,
{
self.spec.set_fg(Some(color));
self.out.set_color(&self.spec).map_err(io::Error::from)?;
let res = f(self);
self.spec = ColorSpec::new();
self.out.set_color(&self.spec).map_err(io::Error::from)?;
res
}
}

impl std::io::Write for Terminal {
Expand Down
43 changes: 29 additions & 14 deletions node/src/manager/commands/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use graph_store_postgres::{
command_support::{Phase, PruneTableState},
ConnectionPool, Store,
};
use termcolor::Color;

use crate::manager::{
color::Terminal,
commands::stats::show_stats,
deployment::DeploymentSearch,
fmt::{self, MapOrNull as _},
Expand Down Expand Up @@ -329,6 +331,8 @@ pub async fn status(
}
}

let mut term = Terminal::new();

let deployment = search.locate_unique(&primary_pool)?;

let viewer = store.subgraph_store().prune_viewer(&deployment).await?;
Expand All @@ -353,32 +357,43 @@ pub async fn status(
"No information about prune run #{run} found for deployment {deployment}.\n {runs}"
));
};
println!("prune {deployment} (run #{run})");
println!(
writeln!(term, "prune {deployment} (run #{run})")?;

if let (Some(errored_at), Some(error)) = (&state.errored_at, &state.error) {
term.with_color(Color::Red, |term| {
writeln!(term, " error: {error}")?;
writeln!(term, " at: {}", fmt::date_time(errored_at))
})?;
}
writeln!(
term,
" range: {} - {} ({} blocks, should keep {} blocks)",
state.first_block,
state.latest_block,
state.latest_block - state.first_block,
state.history_blocks
);
println!(" started: {}", fmt::date_time(&state.started_at));
)?;
writeln!(term, " started: {}", fmt::date_time(&state.started_at))?;
match &state.finished_at {
Some(finished_at) => println!(" finished: {}", fmt::date_time(finished_at)),
None => println!(" finished: still running"),
Some(finished_at) => writeln!(term, " finished: {}", fmt::date_time(finished_at))?,
None => writeln!(term, " finished: still running")?,
}
println!(
writeln!(
term,
" duration: {}",
fmt::duration(&state.started_at, &state.finished_at)
);
)?;

println!(
writeln!(
term,
"\n{:^30} | {:^22} | {:^8} | {:^11} | {:^8}",
"table", "status", "rows", "batch_size", "duration"
);
println!(
)?;
writeln!(
term,
"{:-^30}-+-{:-^22}-+-{:-^8}-+-{:-^11}-+-{:-^8}",
"", "", "", "", ""
);
)?;
for ts in table_states {
#[allow(unused_variables)]
let PruneTableState {
Expand Down Expand Up @@ -411,10 +426,10 @@ pub async fn status(
let batch_size = batch_size.map_or_null(|b| b.to_string());
let duration = started_at.map_or_null(|s| fmt::duration(&s, &finished_at));
let phase = phase.as_str();
println!(
writeln!(term,
"{table_name:<30} | {:<15} {complete:>6} | {rows:>8} | {batch_size:>11} | {duration:>8}",
format!("{strategy}/{phase}")
);
)?;
}
Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
alter table subgraphs.prune_state
drop column errored_at,
drop column error;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
alter table subgraphs.prune_state
add column errored_at timestamptz,
add column error text,
add constraint error_ck check ((errored_at is null) = (error is null));
39 changes: 39 additions & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,3 +1002,42 @@ pub(crate) fn histogram_bounds(
.map(|bounds| bounds.map(|b| b.bounds).unwrap_or_default())
.map_err(StoreError::from)
}

/// Return the name of the sequence that Postgres uses to handle
/// auto-incrementing columns. This takes Postgres' way of dealing with long
/// table and sequence names into account.
pub(crate) fn seq_name(table_name: &str, column_name: &str) -> String {
// Postgres limits all identifiers to 63 characters. When it
// constructs the name of a sequence for a column in a table, it
// truncates the table name so that appending '_{column}_seq' to
// it is at most 63 characters
let len = 63 - (5 + column_name.len());
let len = len.min(table_name.len());
format!("{}_{column_name}_seq", &table_name[0..len])
}

#[cfg(test)]
mod test {
use super::seq_name;

#[test]
fn seq_name_works() {
// Pairs of (table_name, vid_seq_name)
const DATA: &[(&str, &str)] = &[
("token", "token_vid_seq"),
(
"frax_vst_curve_strategy_total_reward_token_collected_event",
"frax_vst_curve_strategy_total_reward_token_collected_ev_vid_seq",
),
(
"rolling_asset_sent_for_last_24_hours_per_chain_and_token",
"rolling_asset_sent_for_last_24_hours_per_chain_and_toke_vid_seq",
),
];

for (tbl, exp) in DATA {
let act = seq_name(tbl, "vid");
assert_eq!(exp, &act);
}
}
}
16 changes: 12 additions & 4 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1181,13 +1181,20 @@ impl DeploymentStore {
// how long pruning itself takes
let _section = stopwatch.start_section("transact_blocks_prune");

self.spawn_prune(
if let Err(res) = self.spawn_prune(
logger,
site,
site.cheap_clone(),
layout.history_blocks,
earliest_block,
batch.block_ptr.number,
)?;
) {
warn!(
logger,
"Failed to spawn prune task. Will try to prune again later";
"subgraph" => site.deployment.to_string(),
"error" => res.to_string(),
);
}
}

Ok(())
Expand Down Expand Up @@ -1270,7 +1277,8 @@ impl DeploymentStore {
)?;

let deployment_id = site.id;
let handle = graph::spawn(run(logger.cheap_clone(), self.clone(), site, req));
let logger = Logger::new(&logger, o!("component" => "Prune"));
let handle = graph::spawn(run(logger, self.clone(), site, req));
self.prune_handles
.lock()
.unwrap()
Expand Down
82 changes: 55 additions & 27 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ impl TablePair {
let src_nsp = &self.src_nsp;
let dst_nsp = &self.dst_nsp;

let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name);

let mut query = String::new();

// What we are about to do would get blocked by autovacuum on our
Expand All @@ -229,6 +227,8 @@ impl TablePair {
// Make sure the vid sequence continues from where it was in case
// that we use autoincrementing order of the DB
if !self.src.object.has_vid_seq() {
let vid_seq = catalog::seq_name(&self.src.name, VID_COLUMN);

writeln!(
query,
"select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));"
Expand Down Expand Up @@ -363,6 +363,11 @@ impl Layout {
/// also block queries to the deployment, often for extended periods of
/// time. The rebuild strategy never blocks reads, it only ever blocks
/// writes.
///
/// This method will only return an `Err` if storing pruning status
/// fails, e.g. because the database is not available. All errors that
/// happen during pruning itself will be stored in the `prune_state`
/// table and this method will return `Ok`
pub fn prune(
self: Arc<Self>,
logger: &Logger,
Expand All @@ -373,28 +378,38 @@ impl Layout {
) -> Result<(), CancelableError<StoreError>> {
let tracker = status::Tracker::new(conn, self.clone())?;

reporter.start(req);
let res = self.prune_inner(logger, reporter, conn, req, cancel, &tracker);

let stats = self.version_stats(conn, reporter, true, cancel)?;
match res {
Ok(_) => {
tracker.finish(conn)?;
}
Err(e) => {
// If we get an error, we need to set the error in the
// database and finish the tracker
let err = e.to_string();
tracker.error(conn, &err)?;
}
}

Ok(())
}

fn prune_inner(
self: Arc<Self>,
logger: &Logger,
reporter: &mut dyn PruneReporter,
conn: &mut PgConnection,
req: &PruneRequest,
cancel: &CancelHandle,
tracker: &status::Tracker,
) -> Result<(), CancelableError<StoreError>> {
reporter.start(req);
let stats = self.version_stats(conn, reporter, true, cancel)?;
let prunable_tables: Vec<_> = self.prunable_tables(&stats, req).into_iter().collect();
tracker.start(conn, req, &prunable_tables)?;

// create a shadow namespace where we will put the copies of our
// tables, but only create it in the database if we really need it
let dst_nsp = Namespace::prune(self.site.id);
let mut recreate_dst_nsp = true;

// Go table by table; note that the subgraph writer can write in
// between the execution of the `with_lock` block below, and might
// therefore work with tables where some are pruned and some are not
// pruned yet. That does not affect correctness since we make no
// assumption about where the subgraph head is. If the subgraph
// advances during this loop, we might have an unnecessarily
// pessimistic but still safe value for `final_block`. We do assume
// that `final_block` is far enough from the subgraph head that it
// stays final even if a revert happens during this loop, but that
// is the definition of 'final'
for (table, strat) in &prunable_tables {
reporter.start_table(table.name.as_str());
tracker.start_table(conn, table)?;
Expand All @@ -417,7 +432,7 @@ impl Layout {
pair.copy_final_entities(
conn,
reporter,
&tracker,
tracker,
req.earliest_block,
req.final_block,
cancel,
Expand All @@ -427,7 +442,7 @@ impl Layout {
// see also: deployment-lock-for-update
reporter.start_switch();
deployment::with_lock(conn, &self.site, |conn| -> Result<_, StoreError> {
pair.copy_nonfinal_entities(conn, reporter, &tracker, req.final_block)?;
pair.copy_nonfinal_entities(conn, reporter, tracker, req.final_block)?;
cancel.check_cancel().map_err(CancelableError::from)?;

conn.transaction(|conn| pair.switch(logger, conn))?;
Expand Down Expand Up @@ -473,22 +488,15 @@ impl Layout {
reporter.finish_table(table.name.as_str());
tracker.finish_table(conn, table)?;
}
// Get rid of the temporary prune schema if we actually created it
if !recreate_dst_nsp {
catalog::drop_schema(conn, dst_nsp.as_str())?;
}

for (table, _) in &prunable_tables {
catalog::set_last_pruned_block(conn, &self.site, &table.name, req.earliest_block)?;
}

// Analyze the new tables
let tables = prunable_tables.iter().map(|(table, _)| *table).collect();
self.analyze_tables(conn, reporter, tables, cancel)?;

reporter.finish();
tracker.finish(conn)?;

Ok(())
}
}
Expand Down Expand Up @@ -534,6 +542,8 @@ mod status {

started_at -> Timestamptz,
finished_at -> Nullable<Timestamptz>,
errored_at -> Nullable<Timestamptz>,
error -> Nullable<Text>,
}
}

Expand Down Expand Up @@ -633,6 +643,9 @@ mod status {

pub started_at: DateTime<Utc>,
pub finished_at: Option<DateTime<Utc>>,

pub errored_at: Option<DateTime<Utc>>,
pub error: Option<String>,
}

/// Per-table information about the pruning run for a deployment
Expand Down Expand Up @@ -870,6 +883,21 @@ mod status {
.execute(conn)?;
Ok(())
}

pub(crate) fn error(&self, conn: &mut PgConnection, err: &str) -> StoreResult<()> {
use prune_state as ps;

update(ps::table)
.filter(ps::id.eq(self.layout.site.id))
.filter(ps::run.eq(self.run))
.set((
ps::finished_at.eq(diesel::dsl::now),
ps::errored_at.eq(diesel::dsl::now),
ps::error.eq(err),
))
.execute(conn)?;
Ok(())
}
}

/// A helper to read pruning progress from the database
Expand Down
Loading