diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 04523e22c3b..e7ce5b028c8 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -226,10 +226,10 @@ those. copying or grafting should take. This limits how long transactions for such long running operations will be, and therefore helps control bloat in other tables. Value is in seconds and defaults to 180s. -- `GRAPH_STORE_BATCH_TIMEOUT`: How long a batch operation during copying or - grafting is allowed to take at most. This is meant to guard against - batches that are catastrophically big and should be set to a small - multiple of `GRAPH_STORE_BATCH_TARGET_DURATION`, like 10 times that +- `GRAPH_STORE_BATCH_TIMEOUT`: How long a batch operation during copying, + grafting, or pruning is allowed to take at most. This is meant to guard + against batches that are catastrophically big and should be set to a + small multiple of `GRAPH_STORE_BATCH_TARGET_DURATION`, like 10 times that value, and needs to be at least 2 times that value when set. If this timeout is hit, the batch size is reset to 1 so we can be sure that batches stay below `GRAPH_STORE_BATCH_TARGET_DURATION` and the smaller diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 58420b053a5..9a8b4fd4328 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -65,7 +65,7 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30); const REPLICATION_SLEEP: Duration = Duration::from_secs(10); lazy_static! { - static ref STATEMENT_TIMEOUT: Option = ENV_VARS + pub(crate) static ref BATCH_STATEMENT_TIMEOUT: Option = ENV_VARS .store .batch_timeout .map(|duration| format!("set local statement_timeout={}", duration.as_millis())); @@ -792,7 +792,7 @@ impl CopyTableWorker { } match conn.transaction(|conn| { - if let Some(timeout) = STATEMENT_TIMEOUT.as_ref() { + if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() { conn.batch_execute(timeout)?; } self.table.copy_batch(conn) diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 60386370b9c..6e3af20179d 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -18,7 +18,9 @@ use graph::{ use itertools::Itertools; use crate::{ - catalog, deployment, + catalog, + copy::BATCH_STATEMENT_TIMEOUT, + deployment, relational::{Table, VID_COLUMN}, vid_batcher::{VidBatcher, VidRange}, }; @@ -105,16 +107,15 @@ impl TablePair { tracker.start_copy_final(conn, &self.src, range)?; while !batcher.finished() { - let (_, rows) = batcher.step(|start, end| { - conn.transaction(|conn| { - // Page through all rows in `src` in batches of `batch_size` - // and copy the ones that are visible to queries at block - // heights between `earliest_block` and `final_block`, but - // whose block_range does not extend past `final_block` - // since they could still be reverted while we copy. - // The conditions on `block_range` are expressed redundantly - // to make more indexes useable - sql_query(format!( + let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| { + // Page through all rows in `src` in batches of `batch_size` + // and copy the ones that are visible to queries at block + // heights between `earliest_block` and `final_block`, but + // whose block_range does not extend past `final_block` + // since they could still be reverted while we copy. + // The conditions on `block_range` are expressed redundantly + // to make more indexes useable + sql_query(format!( "/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ @@ -128,13 +129,12 @@ impl TablePair { dst = self.dst.qualified_name, batch_size = end - start + 1, )) - .bind::(earliest_block) - .bind::(final_block) - .bind::(start) - .bind::(end) - .execute(conn) - .map_err(StoreError::from) - }) + .bind::(earliest_block) + .bind::(final_block) + .bind::(start) + .bind::(end) + .execute(conn) + .map_err(StoreError::from) })?; let rows = rows.unwrap_or(0); tracker.finish_batch(conn, &self.src, rows as i64, &batcher)?; @@ -168,14 +168,13 @@ impl TablePair { tracker.start_copy_nonfinal(conn, &self.src, range)?; while !batcher.finished() { - let (_, rows) = batcher.step(|start, end| { + let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| { // Page through all the rows in `src` in batches of // `batch_size` that are visible to queries at block heights // starting right after `final_block`. The conditions on // `block_range` are expressed redundantly to make more // indexes useable - conn.transaction(|conn| { - sql_query(format!( + sql_query(format!( "/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ @@ -192,7 +191,6 @@ impl TablePair { .bind::(end) .execute(conn) .map_err(StoreError::from) - }) })?; let rows = rows.unwrap_or(0); @@ -460,7 +458,8 @@ impl Layout { tracker.start_delete(conn, table, range, &batcher)?; while !batcher.finished() { - let (_, rows) = batcher.step(|start, end| {sql_query(format!( + let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| { + sql_query(format!( "/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \ delete from {qname} \ where coalesce(upper(block_range), 2147483647) <= $1 \ @@ -471,7 +470,8 @@ impl Layout { .bind::(req.earliest_block) .bind::(start) .bind::(end) - .execute(conn).map_err(StoreError::from)})?; + .execute(conn).map_err(StoreError::from) + })?; let rows = rows.unwrap_or(0); tracker.finish_batch(conn, table, -(rows as i64), &batcher)?; @@ -501,6 +501,42 @@ impl Layout { } } +/// Perform a step with the `batcher`. If that step takes longer than +/// `BATCH_STATEMENT_TIMEOUT`, kill the query and reset the batch size of +/// the batcher to 1 and perform a step with that size which we assume takes +/// less than `BATCH_STATEMENT_TIMEOUT`. +/// +/// Doing this serves as a safeguard against very bad batch size estimations +/// so that batches never take longer than `BATCH_SIZE_TIMEOUT` +fn batch_with_timeout( + conn: &mut PgConnection, + batcher: &mut VidBatcher, + query: F, +) -> Result, StoreError> +where + F: Fn(&mut PgConnection, i64, i64) -> Result, +{ + let res = batcher + .step(|start, end| { + conn.transaction(|conn| { + if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() { + conn.batch_execute(timeout)?; + } + query(conn, start, end) + }) + }) + .map(|(_, res)| res); + + if !matches!(res, Err(StoreError::StatementTimeout)) { + return res; + } + + batcher.set_batch_size(1); + batcher + .step(|start, end| conn.transaction(|conn| query(conn, start, end))) + .map(|(_, res)| res) +} + mod status { use std::sync::Arc; diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs index c1e69ebe017..feb58787c43 100644 --- a/store/postgres/src/vid_batcher.rs +++ b/store/postgres/src/vid_batcher.rs @@ -209,9 +209,9 @@ impl VidBatcher { /// The function returns the time it took to process the batch and the /// result of `f`. If the batcher is finished, `f` will not be called, /// and `None` will be returned as its result. - pub fn step(&mut self, mut f: F) -> Result<(Duration, Option), StoreError> + pub fn step(&mut self, f: F) -> Result<(Duration, Option), StoreError> where - F: FnMut(i64, i64) -> Result, + F: FnOnce(i64, i64) -> Result, { if self.finished() { return Ok((Duration::from_secs(0), None));