Skip to content

Commit 2be3f44

Browse files
committed
store: Kill and retry long running prune operations
Uses the same timeout as copying. If a prune operation now takes longer than `GRAPH_STORE_BATCH_TIMEOUT` it is aborted and retried with a batch size of 1
1 parent ef15800 commit 2be3f44

File tree

2 files changed

+62
-26
lines changed

2 files changed

+62
-26
lines changed

store/postgres/src/copy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30);
6565
const REPLICATION_SLEEP: Duration = Duration::from_secs(10);
6666

6767
lazy_static! {
68-
static ref STATEMENT_TIMEOUT: Option<String> = ENV_VARS
68+
pub(crate) static ref BATCH_STATEMENT_TIMEOUT: Option<String> = ENV_VARS
6969
.store
7070
.batch_timeout
7171
.map(|duration| format!("set local statement_timeout={}", duration.as_millis()));
@@ -792,7 +792,7 @@ impl CopyTableWorker {
792792
}
793793

794794
match conn.transaction(|conn| {
795-
if let Some(timeout) = STATEMENT_TIMEOUT.as_ref() {
795+
if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() {
796796
conn.batch_execute(timeout)?;
797797
}
798798
self.table.copy_batch(conn)

store/postgres/src/relational/prune.rs

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use graph::{
1818
use itertools::Itertools;
1919

2020
use crate::{
21-
catalog, deployment,
21+
catalog,
22+
copy::BATCH_STATEMENT_TIMEOUT,
23+
deployment,
2224
relational::{Table, VID_COLUMN},
2325
vid_batcher::{VidBatcher, VidRange},
2426
};
@@ -105,16 +107,15 @@ impl TablePair {
105107
tracker.start_copy_final(conn, &self.src, range)?;
106108

107109
while !batcher.finished() {
108-
let (_, rows) = batcher.step(|start, end| {
109-
conn.transaction(|conn| {
110-
// Page through all rows in `src` in batches of `batch_size`
111-
// and copy the ones that are visible to queries at block
112-
// heights between `earliest_block` and `final_block`, but
113-
// whose block_range does not extend past `final_block`
114-
// since they could still be reverted while we copy.
115-
// The conditions on `block_range` are expressed redundantly
116-
// to make more indexes useable
117-
sql_query(format!(
110+
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
111+
// Page through all rows in `src` in batches of `batch_size`
112+
// and copy the ones that are visible to queries at block
113+
// heights between `earliest_block` and `final_block`, but
114+
// whose block_range does not extend past `final_block`
115+
// since they could still be reverted while we copy.
116+
// The conditions on `block_range` are expressed redundantly
117+
// to make more indexes useable
118+
sql_query(format!(
118119
"/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \
119120
insert into {dst}({column_list}) \
120121
select {column_list} from {src} \
@@ -128,13 +129,12 @@ impl TablePair {
128129
dst = self.dst.qualified_name,
129130
batch_size = end - start + 1,
130131
))
131-
.bind::<Integer, _>(earliest_block)
132-
.bind::<Integer, _>(final_block)
133-
.bind::<BigInt, _>(start)
134-
.bind::<BigInt, _>(end)
135-
.execute(conn)
136-
.map_err(StoreError::from)
137-
})
132+
.bind::<Integer, _>(earliest_block)
133+
.bind::<Integer, _>(final_block)
134+
.bind::<BigInt, _>(start)
135+
.bind::<BigInt, _>(end)
136+
.execute(conn)
137+
.map_err(StoreError::from)
138138
})?;
139139
let rows = rows.unwrap_or(0);
140140
tracker.finish_batch(conn, &self.src, rows as i64, &batcher)?;
@@ -168,14 +168,13 @@ impl TablePair {
168168
tracker.start_copy_nonfinal(conn, &self.src, range)?;
169169

170170
while !batcher.finished() {
171-
let (_, rows) = batcher.step(|start, end| {
171+
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
172172
// Page through all the rows in `src` in batches of
173173
// `batch_size` that are visible to queries at block heights
174174
// starting right after `final_block`. The conditions on
175175
// `block_range` are expressed redundantly to make more
176176
// indexes useable
177-
conn.transaction(|conn| {
178-
sql_query(format!(
177+
sql_query(format!(
179178
"/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \
180179
insert into {dst}({column_list}) \
181180
select {column_list} from {src} \
@@ -192,7 +191,6 @@ impl TablePair {
192191
.bind::<BigInt, _>(end)
193192
.execute(conn)
194193
.map_err(StoreError::from)
195-
})
196194
})?;
197195
let rows = rows.unwrap_or(0);
198196

@@ -460,7 +458,8 @@ impl Layout {
460458

461459
tracker.start_delete(conn, table, range, &batcher)?;
462460
while !batcher.finished() {
463-
let (_, rows) = batcher.step(|start, end| {sql_query(format!(
461+
let rows = batch_with_timeout(conn, &mut batcher, |conn, start, end| {
462+
sql_query(format!(
464463
"/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \
465464
delete from {qname} \
466465
where coalesce(upper(block_range), 2147483647) <= $1 \
@@ -471,7 +470,8 @@ impl Layout {
471470
.bind::<Integer, _>(req.earliest_block)
472471
.bind::<BigInt, _>(start)
473472
.bind::<BigInt, _>(end)
474-
.execute(conn).map_err(StoreError::from)})?;
473+
.execute(conn).map_err(StoreError::from)
474+
})?;
475475
let rows = rows.unwrap_or(0);
476476

477477
tracker.finish_batch(conn, table, -(rows as i64), &batcher)?;
@@ -501,6 +501,42 @@ impl Layout {
501501
}
502502
}
503503

504+
/// Perform a step with the `batcher`. If that step takes longer than
505+
/// `BATCH_STATEMENT_TIMEOUT`, kill the query and reset the batch size of
506+
/// the batcher to 1 and perform a step with that size which we assume takes
507+
/// less than `BATCH_STATEMENT_TIMEOUT`.
508+
///
509+
/// Doing this serves as a safeguard against very bad batch size estimations
510+
/// so that batches never take longer than `BATCH_SIZE_TIMEOUT`
511+
fn batch_with_timeout<F, T>(
512+
conn: &mut PgConnection,
513+
batcher: &mut VidBatcher,
514+
query: F,
515+
) -> Result<Option<T>, StoreError>
516+
where
517+
F: Fn(&mut PgConnection, i64, i64) -> Result<T, StoreError>,
518+
{
519+
let res = batcher
520+
.step(|start, end| {
521+
conn.transaction(|conn| {
522+
if let Some(timeout) = BATCH_STATEMENT_TIMEOUT.as_ref() {
523+
conn.batch_execute(timeout)?;
524+
}
525+
query(conn, start, end)
526+
})
527+
})
528+
.map(|(_, res)| res);
529+
530+
if !matches!(res, Err(StoreError::StatementTimeout)) {
531+
return res;
532+
}
533+
534+
batcher.set_batch_size(1);
535+
batcher
536+
.step(|start, end| conn.transaction(|conn| query(conn, start, end)))
537+
.map(|(_, res)| res)
538+
}
539+
504540
mod status {
505541
use std::sync::Arc;
506542

0 commit comments

Comments
 (0)