From 553bf841c4d8250b057c3793b2eeec568777a6df Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 13 May 2025 09:47:48 -0700 Subject: [PATCH 1/2] store: Make VidBatcher::step take a FnOnce instead of FnMut We didn't really need the FnMut --- store/postgres/src/vid_batcher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs index c1e69ebe017..feb58787c43 100644 --- a/store/postgres/src/vid_batcher.rs +++ b/store/postgres/src/vid_batcher.rs @@ -209,9 +209,9 @@ impl VidBatcher { /// The function returns the time it took to process the batch and the /// result of `f`. If the batcher is finished, `f` will not be called, /// and `None` will be returned as its result. - pub fn step(&mut self, mut f: F) -> Result<(Duration, Option), StoreError> + pub fn step(&mut self, f: F) -> Result<(Duration, Option), StoreError> where - F: FnMut(i64, i64) -> Result, + F: FnOnce(i64, i64) -> Result, { if self.finished() { return Ok((Duration::from_secs(0), None)); From 34b3fcae5a281b7fb92445329991cd52782fe432 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 13 May 2025 09:48:27 -0700 Subject: [PATCH 2/2] store: Kill and retry long running prune operations Uses the same timeout as copying. If a prune operation now takes longer than `GRAPH_STORE_BATCH_TIMEOUT` it is aborted and retried with a batch size of 1 --- docs/environment-variables.md | 8 +-- store/postgres/src/copy.rs | 4 +- store/postgres/src/relational/prune.rs | 84 ++++++++++++++++++-------- 3 files changed, 66 insertions(+), 30 deletions(-) diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 04523e22c3b..e7ce5b028c8 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -226,10 +226,10 @@ those. copying or grafting should take. This limits how long transactions for such long running operations will be, and therefore helps control bloat in other tables. Value is in seconds and defaults to 180s. -- `GRAPH_STORE_BATCH_TIMEOUT`: How long a batch operation during copying or - grafting is allowed to take at most. This is meant to guard against - batches that are catastrophically big and should be set to a small - multiple of `GRAPH_STORE_BATCH_TARGET_DURATION`, like 10 times that +- `GRAPH_STORE_BATCH_TIMEOUT`: How long a batch operation during copying, + grafting, or pruning is allowed to take at most. This is meant to guard + against batches that are catastrophically big and should be set to a + small multiple of `GRAPH_STORE_BATCH_TARGET_DURATION`, like 10 times that value, and needs to be at least 2 times that value when set. If this timeout is hit, the batch size is reset to 1 so we can be sure that batches stay below `GRAPH_STORE_BATCH_TARGET_DURATION` and the smaller diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 58420b053a5..9a8b4fd4328 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -65,7 +65,7 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30); const REPLICATION_SLEEP: Duration = Duration::from_secs(10); lazy_static! { - static ref STATEMENT_TIMEOUT: Option = ENV_VARS + pub(crate) static ref BATCH_STATEMENT_TIMEOUT: Option = ENV_VARS .store .batch_timeout .map(|duration| format!("set local statement_timeout={}", duration.as_millis())); @@ -792,7 +792,7 @@ impl CopyTableWorker { } match conn.transaction(|conn| { - if let Some(timeout) = STATEMENT_TIMEOUT.as_ref() { + if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() { conn.batch_execute(timeout)?; } self.table.copy_batch(conn) diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 60386370b9c..6e3af20179d 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -18,7 +18,9 @@ use graph::{ use itertools::Itertools; use crate::{ - catalog, deployment, + catalog, + copy::BATCH_STATEMENT_TIMEOUT, + deployment, relational::{Table, VID_COLUMN}, vid_batcher::{VidBatcher, VidRange}, }; @@ -105,16 +107,15 @@ impl TablePair { tracker.start_copy_final(conn, &self.src, range)?; while !batcher.finished() { - let (_, rows) = batcher.step(|start, end| { - conn.transaction(|conn| { - // Page through all rows in `src` in batches of `batch_size` - // and copy the ones that are visible to queries at block - // heights between `earliest_block` and `final_block`, but - // whose block_range does not extend past `final_block` - // since they could still be reverted while we copy. - // The conditions on `block_range` are expressed redundantly - // to make more indexes useable - sql_query(format!( + let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| { + // Page through all rows in `src` in batches of `batch_size` + // and copy the ones that are visible to queries at block + // heights between `earliest_block` and `final_block`, but + // whose block_range does not extend past `final_block` + // since they could still be reverted while we copy. + // The conditions on `block_range` are expressed redundantly + // to make more indexes useable + sql_query(format!( "/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ @@ -128,13 +129,12 @@ impl TablePair { dst = self.dst.qualified_name, batch_size = end - start + 1, )) - .bind::(earliest_block) - .bind::(final_block) - .bind::(start) - .bind::(end) - .execute(conn) - .map_err(StoreError::from) - }) + .bind::(earliest_block) + .bind::(final_block) + .bind::(start) + .bind::(end) + .execute(conn) + .map_err(StoreError::from) })?; let rows = rows.unwrap_or(0); tracker.finish_batch(conn, &self.src, rows as i64, &batcher)?; @@ -168,14 +168,13 @@ impl TablePair { tracker.start_copy_nonfinal(conn, &self.src, range)?; while !batcher.finished() { - let (_, rows) = batcher.step(|start, end| { + let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| { // Page through all the rows in `src` in batches of // `batch_size` that are visible to queries at block heights // starting right after `final_block`. The conditions on // `block_range` are expressed redundantly to make more // indexes useable - conn.transaction(|conn| { - sql_query(format!( + sql_query(format!( "/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ @@ -192,7 +191,6 @@ impl TablePair { .bind::(end) .execute(conn) .map_err(StoreError::from) - }) })?; let rows = rows.unwrap_or(0); @@ -460,7 +458,8 @@ impl Layout { tracker.start_delete(conn, table, range, &batcher)?; while !batcher.finished() { - let (_, rows) = batcher.step(|start, end| {sql_query(format!( + let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| { + sql_query(format!( "/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \ delete from {qname} \ where coalesce(upper(block_range), 2147483647) <= $1 \ @@ -471,7 +470,8 @@ impl Layout { .bind::(req.earliest_block) .bind::(start) .bind::(end) - .execute(conn).map_err(StoreError::from)})?; + .execute(conn).map_err(StoreError::from) + })?; let rows = rows.unwrap_or(0); tracker.finish_batch(conn, table, -(rows as i64), &batcher)?; @@ -501,6 +501,42 @@ impl Layout { } } +/// Perform a step with the `batcher`. If that step takes longer than +/// `BATCH_STATEMENT_TIMEOUT`, kill the query and reset the batch size of +/// the batcher to 1 and perform a step with that size which we assume takes +/// less than `BATCH_STATEMENT_TIMEOUT`. +/// +/// Doing this serves as a safeguard against very bad batch size estimations +/// so that batches never take longer than `BATCH_SIZE_TIMEOUT` +fn batch_with_timeout( + conn: &mut PgConnection, + batcher: &mut VidBatcher, + query: F, +) -> Result, StoreError> +where + F: Fn(&mut PgConnection, i64, i64) -> Result, +{ + let res = batcher + .step(|start, end| { + conn.transaction(|conn| { + if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() { + conn.batch_execute(timeout)?; + } + query(conn, start, end) + }) + }) + .map(|(_, res)| res); + + if !matches!(res, Err(StoreError::StatementTimeout)) { + return res; + } + + batcher.set_batch_size(1); + batcher + .step(|start, end| conn.transaction(|conn| query(conn, start, end))) + .map(|(_, res)| res) +} + mod status { use std::sync::Arc;