Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ those.
copying or grafting should take. This limits how long transactions for
such long running operations will be, and therefore helps control bloat
in other tables. Value is in seconds and defaults to 180s.
- `GRAPH_STORE_BATCH_TIMEOUT`: How long a batch operation during copying or
grafting is allowed to take at most. This is meant to guard against
batches that are catastrophically big and should be set to a small
multiple of `GRAPH_STORE_BATCH_TARGET_DURATION`, like 10 times that
- `GRAPH_STORE_BATCH_TIMEOUT`: How long a batch operation during copying,
grafting, or pruning is allowed to take at most. This is meant to guard
against batches that are catastrophically big and should be set to a
small multiple of `GRAPH_STORE_BATCH_TARGET_DURATION`, like 10 times that
value, and needs to be at least 2 times that value when set. If this
timeout is hit, the batch size is reset to 1 so we can be sure that
batches stay below `GRAPH_STORE_BATCH_TARGET_DURATION` and the smaller
Expand Down
4 changes: 2 additions & 2 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30);
const REPLICATION_SLEEP: Duration = Duration::from_secs(10);

lazy_static! {
static ref STATEMENT_TIMEOUT: Option<String> = ENV_VARS
pub(crate) static ref BATCH_STATEMENT_TIMEOUT: Option<String> = ENV_VARS
.store
.batch_timeout
.map(|duration| format!("set local statement_timeout={}", duration.as_millis()));
Expand Down Expand Up @@ -792,7 +792,7 @@ impl CopyTableWorker {
}

match conn.transaction(|conn| {
if let Some(timeout) = STATEMENT_TIMEOUT.as_ref() {
if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() {
conn.batch_execute(timeout)?;
}
self.table.copy_batch(conn)
Expand Down
84 changes: 60 additions & 24 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use graph::{
use itertools::Itertools;

use crate::{
catalog, deployment,
catalog,
copy::BATCH_STATEMENT_TIMEOUT,
deployment,
relational::{Table, VID_COLUMN},
vid_batcher::{VidBatcher, VidRange},
};
Expand Down Expand Up @@ -105,16 +107,15 @@ impl TablePair {
tracker.start_copy_final(conn, &self.src, range)?;

while !batcher.finished() {
let (_, rows) = batcher.step(|start, end| {
conn.transaction(|conn| {
// Page through all rows in `src` in batches of `batch_size`
// and copy the ones that are visible to queries at block
// heights between `earliest_block` and `final_block`, but
// whose block_range does not extend past `final_block`
// since they could still be reverted while we copy.
// The conditions on `block_range` are expressed redundantly
// to make more indexes useable
sql_query(format!(
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
// Page through all rows in `src` in batches of `batch_size`
// and copy the ones that are visible to queries at block
// heights between `earliest_block` and `final_block`, but
// whose block_range does not extend past `final_block`
// since they could still be reverted while we copy.
// The conditions on `block_range` are expressed redundantly
// to make more indexes useable
sql_query(format!(
"/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \
insert into {dst}({column_list}) \
select {column_list} from {src} \
Expand All @@ -128,13 +129,12 @@ impl TablePair {
dst = self.dst.qualified_name,
batch_size = end - start + 1,
))
.bind::<Integer, _>(earliest_block)
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(start)
.bind::<BigInt, _>(end)
.execute(conn)
.map_err(StoreError::from)
})
.bind::<Integer, _>(earliest_block)
.bind::<Integer, _>(final_block)
.bind::<BigInt, _>(start)
.bind::<BigInt, _>(end)
.execute(conn)
.map_err(StoreError::from)
})?;
let rows = rows.unwrap_or(0);
tracker.finish_batch(conn, &self.src, rows as i64, &batcher)?;
Expand Down Expand Up @@ -168,14 +168,13 @@ impl TablePair {
tracker.start_copy_nonfinal(conn, &self.src, range)?;

while !batcher.finished() {
let (_, rows) = batcher.step(|start, end| {
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
// Page through all the rows in `src` in batches of
// `batch_size` that are visible to queries at block heights
// starting right after `final_block`. The conditions on
// `block_range` are expressed redundantly to make more
// indexes useable
conn.transaction(|conn| {
sql_query(format!(
sql_query(format!(
"/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \
insert into {dst}({column_list}) \
select {column_list} from {src} \
Expand All @@ -192,7 +191,6 @@ impl TablePair {
.bind::<BigInt, _>(end)
.execute(conn)
.map_err(StoreError::from)
})
})?;
let rows = rows.unwrap_or(0);

Expand Down Expand Up @@ -460,7 +458,8 @@ impl Layout {

tracker.start_delete(conn, table, range, &batcher)?;
while !batcher.finished() {
let (_, rows) = batcher.step(|start, end| {sql_query(format!(
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
sql_query(format!(
"/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \
delete from {qname} \
where coalesce(upper(block_range), 2147483647) <= $1 \
Expand All @@ -471,7 +470,8 @@ impl Layout {
.bind::<Integer, _>(req.earliest_block)
.bind::<BigInt, _>(start)
.bind::<BigInt, _>(end)
.execute(conn).map_err(StoreError::from)})?;
.execute(conn).map_err(StoreError::from)
})?;
let rows = rows.unwrap_or(0);

tracker.finish_batch(conn, table, -(rows as i64), &batcher)?;
Expand Down Expand Up @@ -501,6 +501,42 @@ impl Layout {
}
}

/// Perform a step with the `batcher`. If that step takes longer than
/// `BATCH_STATEMENT_TIMEOUT`, kill the query and reset the batch size of
/// the batcher to 1 and perform a step with that size which we assume takes
/// less than `BATCH_STATEMENT_TIMEOUT`.
///
/// Doing this serves as a safeguard against very bad batch size estimations
/// so that batches never take longer than `BATCH_SIZE_TIMEOUT`
fn batch_with_timeout<F, T>(
conn: &mut PgConnection,
batcher: &mut VidBatcher,
query: F,
) -> Result<Option<T>, StoreError>
where
F: Fn(&mut PgConnection, i64, i64) -> Result<T, StoreError>,
{
let res = batcher
.step(|start, end| {
conn.transaction(|conn| {
if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() {
conn.batch_execute(timeout)?;
}
query(conn, start, end)
})
})
.map(|(_, res)| res);

if !matches!(res, Err(StoreError::StatementTimeout)) {
return res;
}

batcher.set_batch_size(1);
batcher
.step(|start, end| conn.transaction(|conn| query(conn, start, end)))
.map(|(_, res)| res)
}

mod status {
use std::sync::Arc;

Expand Down
4 changes: 2 additions & 2 deletions store/postgres/src/vid_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ impl VidBatcher {
/// The function returns the time it took to process the batch and the
/// result of `f`. If the batcher is finished, `f` will not be called,
/// and `None` will be returned as its result.
pub fn step<F, T>(&mut self, mut f: F) -> Result<(Duration, Option<T>), StoreError>
pub fn step<F, T>(&mut self, f: F) -> Result<(Duration, Option<T>), StoreError>
where
F: FnMut(i64, i64) -> Result<T, StoreError>,
F: FnOnce(i64, i64) -> Result<T, StoreError>,
{
if self.finished() {
return Ok((Duration::from_secs(0), None));
Expand Down
Loading