From 01e6dd5a71968fe55d0de0d6aad4fc7a8a12f086 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 3 Feb 2025 14:49:24 -0800 Subject: [PATCH 1/9] store: Do not assume that copies start at vid == 0 --- store/postgres/src/copy.rs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index c526a93c7b8..624a6cbbe94 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -525,7 +525,6 @@ impl TableState { &mut self, conn: &mut PgConnection, elapsed: Duration, - first_batch: bool, ) -> Result<(), StoreError> { use copy_table_state as cts; @@ -533,17 +532,17 @@ impl TableState { // 300B years self.duration_ms += i64::try_from(elapsed.as_millis()).unwrap_or(0); - if first_batch { - // Reset started_at so that finished_at - started_at is an - // accurate indication of how long we worked on a table. - update( - cts::table - .filter(cts::dst.eq(self.dst_site.id)) - .filter(cts::entity_type.eq(self.batch.dst.object.as_str())), - ) - .set(cts::started_at.eq(sql("now()"))) - .execute(conn)?; - } + // Reset started_at so that finished_at - started_at is an accurate + // indication of how long we worked on a table if we haven't worked + // on the table yet. + update( + cts::table + .filter(cts::dst.eq(self.dst_site.id)) + .filter(cts::entity_type.eq(self.batch.dst.object.as_str())) + .filter(cts::duration_ms.eq(0)), + ) + .set(cts::started_at.eq(sql("now()"))) + .execute(conn)?; let values = ( cts::next_vid.eq(self.batch.next_vid), cts::batch_size.eq(self.batch.batch_size.size), @@ -591,11 +590,9 @@ impl TableState { } fn copy_batch(&mut self, conn: &mut PgConnection) -> Result { - let first_batch = self.batch.next_vid == 0; - let duration = self.batch.run(conn)?; - self.record_progress(conn, duration, first_batch)?; + self.record_progress(conn, duration)?; if self.finished() { self.record_finished(conn)?; From 229d95ef23fef0114ace105f269337139723795d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 3 Feb 2025 14:58:15 -0800 Subject: [PATCH 2/9] store: Start copies at the minimum vid, not just at 0 --- store/postgres/src/copy.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 624a6cbbe94..a567c8d09de 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -414,7 +414,9 @@ impl TableState { target_block: &BlockPtr, ) -> Result { #[derive(QueryableByName)] - struct MaxVid { + struct VidRange { + #[diesel(sql_type = BigInt)] + min_vid: i64, #[diesel(sql_type = BigInt)] max_vid: i64, } @@ -424,19 +426,21 @@ impl TableState { } else { "lower(block_range) <= $1" }; - let target_vid = sql_query(format!( - "select coalesce(max(vid), -1) as max_vid from {} where {}", + let (next_vid, target_vid) = sql_query(format!( + "select coalesce(min(vid), 0) as min_vid, \ + coalesce(max(vid), -1) as max_vid \ + from {} where {}", src.qualified_name.as_str(), max_block_clause )) .bind::(&target_block.number) - .load::(conn)? + .load::(conn)? .first() - .map(|v| v.max_vid) - .unwrap_or(-1); + .map(|v| (v.min_vid, v.max_vid)) + .unwrap_or((0, -1)); Ok(Self { - batch: BatchCopy::new(src, dst, 0, target_vid), + batch: BatchCopy::new(src, dst, next_vid, target_vid), dst_site, duration_ms: 0, }) From a0860d4f6cf5f324bcb2802c13f6cd939ad27902 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 4 Feb 2025 16:44:37 -0800 Subject: [PATCH 3/9] graph: Add utility for handling cumulative histograms --- graph/src/util/mod.rs | 2 + graph/src/util/ogive.rs | 279 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 281 insertions(+) create mode 100644 graph/src/util/ogive.rs diff --git a/graph/src/util/mod.rs b/graph/src/util/mod.rs index 68c86daea3c..4cdf52a82a5 100644 --- a/graph/src/util/mod.rs +++ b/graph/src/util/mod.rs @@ -12,6 +12,8 @@ pub mod error; pub mod stats; +pub mod ogive; + pub mod cache_weight; pub mod timed_rw_lock; diff --git a/graph/src/util/ogive.rs b/graph/src/util/ogive.rs new file mode 100644 index 00000000000..476bfd76ce8 --- /dev/null +++ b/graph/src/util/ogive.rs @@ -0,0 +1,279 @@ +use std::ops::RangeInclusive; + +use crate::{constraint_violation, prelude::StoreError}; + +/// A helper to deal with cumulative histograms, also known as ogives. This +/// implementation is restricted to histograms where each bin has the same +/// size. As a cumulative function of a histogram, an ogive is a piecewise +/// linear function `f` and since it is strictly monotonically increasing, +/// it has an inverse `g`. +/// +/// For the given `points`, `f(points[i]) = i * bin_size` and `f` is the +/// piecewise linear interpolant between those points. The inverse `g` is +/// the piecewise linear interpolant of `g(i * bin_size) = points[i]`. Note +/// that that means that `f` divides the y-axis into `points.len()` equal +/// parts. +/// +/// The word 'ogive' is somewhat obscure, but has a lot fewer letters than +/// 'piecewise linear function'. Copolit also claims that it is also a lot +/// more fun to say. +pub struct Ogive { + /// The breakpoints of the piecewise linear function + points: Vec, + /// The size of each bin; the linear piece from `points[i]` to + /// `points[i+1]` rises by this much + bin_size: f64, + /// The range of the ogive, i.e., the minimum and maximum entries from + /// points + range: RangeInclusive, +} + +impl Ogive { + /// Create an ogive from a histogram with breaks at the given points and + /// a total count of `total` entries. As a function, the ogive is 0 at + /// `points[0]` and `total` at `points[points.len() - 1]`. + /// + /// The `points` must have at least one entry. The `points` are sorted + /// and deduplicated, i.e., they don't have to be in ascending order. + pub fn from_equi_histogram(mut points: Vec, total: usize) -> Result { + if points.is_empty() { + return Err(constraint_violation!( + "histogram must have at least one point" + )); + } + + points.sort_unstable(); + points.dedup(); + + let bins = points.len() - 1; + let bin_size = total as f64 / bins as f64; + let range = points[0]..=points[bins]; + let points = points.into_iter().map(|p| p as f64).collect(); + Ok(Self { + points, + bin_size, + range, + }) + } + + pub fn start(&self) -> i64 { + *self.range.start() + } + + pub fn end(&self) -> i64 { + *self.range.end() + } + + /// Find the next point `next` such that there are `size` entries + /// between `point` and `next`, i.e., such that `f(next) - f(point) = + /// size`. + /// + /// It is an error if `point` is smaller than `points[0]`. If `point` is + /// bigger than `points.last()`, that is returned instead. + /// + /// The method calculates `g(f(point) + size)` + pub fn next_point(&self, point: i64, size: usize) -> Result { + if point >= *self.range.end() { + return Ok(*self.range.end()); + } + // This can only fail if point < self.range.start + self.check_in_range(point)?; + + let point_value = self.value(point)?; + let next_value = point_value + size as i64; + let next_point = self.inverse(next_value)?; + Ok(next_point) + } + + /// Return the index of the support point immediately preceding `point`. + /// It is an error if `point` is outside the range of points of this + /// ogive; this also implies that the returned index is always strictly + /// less than `self.points.len() - 1` + fn interval_start(&self, point: i64) -> Result { + self.check_in_range(point)?; + + let point = point as f64; + let idx = self + .points + .iter() + .position(|&p| point < p) + .unwrap_or(self.points.len() - 1) + - 1; + Ok(idx) + } + + /// Return the value of the ogive at `point`, i.e., `f(point)`. It is an + /// error if `point` is outside the range of points of this ogive. + fn value(&self, point: i64) -> Result { + if self.points.len() == 1 { + return Ok(*self.range.end()); + } + + let idx = self.interval_start(point)?; + let bin_size = self.bin_size as f64; + let (a, b) = (self.points[idx], self.points[idx + 1]); + let point = point as f64; + let value = (idx as f64 + (point - a) / (b - a)) * bin_size; + Ok(value as i64) + } + + /// Return the value of the inverse ogive at `value`, i.e., `g(value)`. + /// It is an error if `value` is negative. If `value` is greater than + /// the total count of the ogive, the maximum point of the ogive is + /// returned. + fn inverse(&self, value: i64) -> Result { + let value = value as f64; + if value < 0.0 { + return Err(constraint_violation!("value {} can not be negative", value)); + } + let idx = (value / self.bin_size) as usize; + if idx >= self.points.len() - 1 { + return Ok(*self.range.end()); + } + let (a, b) = (self.points[idx] as f64, self.points[idx + 1] as f64); + let lambda = (value - idx as f64 * self.bin_size) / self.bin_size; + let x = (1.0 - lambda) * a + lambda * b; + Ok(x as i64) + } + + fn check_in_range(&self, point: i64) -> Result<(), StoreError> { + if !self.range.contains(&point) { + return Err(constraint_violation!( + "point {} is outside of the range [{}, {}]", + point, + self.range.start(), + self.range.end(), + )); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn simple() { + // This is just the linear function y = (70 / 5) * (x - 10) + let points: Vec = vec![10, 20, 30, 40, 50, 60]; + let ogive = Ogive::from_equi_histogram(points, 700).unwrap(); + + // The function represented by `points` + fn f(x: i64) -> i64 { + 70 * (x - 10) / 5 + } + + // The inverse of `f` + fn g(x: i64) -> i64 { + x * 5 / 70 + 10 + } + + // Check that the ogive is correct + assert_eq!(ogive.bin_size, 700 as f64 / 5 as f64); + assert_eq!(ogive.range, 10..=60); + + // Test value method + for point in vec![20, 30, 45, 50, 60] { + assert_eq!(ogive.value(point).unwrap(), f(point), "value for {}", point); + } + + // Test next_point method + for step in vec![50, 140, 200] { + for value in vec![10, 20, 30, 35, 45, 50, 60] { + assert_eq!( + ogive.next_point(value, step).unwrap(), + g(f(value) + step as i64).min(60), + "inverse for {} with step {}", + value, + step + ); + } + } + + // Exceeding the range caps it at the maximum point + assert_eq!(ogive.next_point(50, 140).unwrap(), 60); + assert_eq!(ogive.next_point(50, 500).unwrap(), 60); + + // Point to the left of the range should return an error + assert!(ogive.next_point(9, 140).is_err()); + // Point to the right of the range gets capped + assert_eq!(ogive.next_point(61, 140).unwrap(), 60); + } + + #[test] + fn single_bin() { + // A histogram with only one bin + let points: Vec = vec![10, 20]; + let ogive = Ogive::from_equi_histogram(points, 700).unwrap(); + + // The function represented by `points` + fn f(x: i64) -> i64 { + 700 * (x - 10) / 10 + } + + // The inverse of `f` + fn g(x: i64) -> i64 { + x * 10 / 700 + 10 + } + + // Check that the ogive is correct + assert_eq!(ogive.bin_size, 700 as f64 / 1 as f64); + assert_eq!(ogive.range, 10..=20); + + // Test value method + for point in vec![10, 15, 20] { + assert_eq!(ogive.value(point).unwrap(), f(point), "value for {}", point); + } + + // Test next_point method + for step in vec![50, 140, 200] { + for value in vec![10, 15, 20] { + assert_eq!( + ogive.next_point(value, step).unwrap(), + g(f(value) + step as i64).min(20), + "inverse for {} with step {}", + value, + step + ); + } + } + + // Exceeding the range caps it at the maximum point + assert_eq!(ogive.next_point(20, 140).unwrap(), 20); + assert_eq!(ogive.next_point(20, 500).unwrap(), 20); + + // Point to the left of the range should return an error + assert!(ogive.next_point(9, 140).is_err()); + // Point to the right of the range gets capped + assert_eq!(ogive.next_point(21, 140).unwrap(), 20); + } + + #[test] + fn one_bin() { + let points: Vec = vec![10]; + let ogive = Ogive::from_equi_histogram(points, 700).unwrap(); + + assert_eq!(ogive.next_point(10, 1).unwrap(), 10); + assert_eq!(ogive.next_point(10, 4).unwrap(), 10); + assert_eq!(ogive.next_point(15, 1).unwrap(), 10); + + assert!(ogive.next_point(9, 1).is_err()); + } + + #[test] + fn exponential() { + let points: Vec = vec![32, 48, 56, 60, 62, 64]; + let ogive = Ogive::from_equi_histogram(points, 100).unwrap(); + + assert_eq!(ogive.value(50).unwrap(), 25); + assert_eq!(ogive.value(56).unwrap(), 40); + assert_eq!(ogive.value(58).unwrap(), 50); + assert_eq!(ogive.value(63).unwrap(), 90); + + assert_eq!(ogive.next_point(32, 40).unwrap(), 56); + assert_eq!(ogive.next_point(50, 10).unwrap(), 54); + assert_eq!(ogive.next_point(50, 50).unwrap(), 61); + assert_eq!(ogive.next_point(40, 40).unwrap(), 58); + } +} From f2d5e44aaa6b7310f776052033470831feecfb95 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 5 Feb 2025 09:06:57 -0800 Subject: [PATCH 4/9] store: Move AdaptiveBatchSize to its own module --- store/postgres/src/copy.rs | 61 ++----------------------- store/postgres/src/lib.rs | 1 + store/postgres/src/relational/prune.rs | 5 +- store/postgres/src/vid_batcher.rs | 63 ++++++++++++++++++++++++++ 4 files changed, 69 insertions(+), 61 deletions(-) create mode 100644 store/postgres/src/vid_batcher.rs diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index a567c8d09de..595866709db 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -19,21 +19,17 @@ use std::{ }; use diesel::{ - deserialize::FromSql, dsl::sql, insert_into, - pg::Pg, r2d2::{ConnectionManager, PooledConnection}, - select, - serialize::{Output, ToSql}, - sql_query, + select, sql_query, sql_types::{BigInt, Integer}, update, Connection as _, ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, RunQueryDsl, }; use graph::{ constraint_violation, - prelude::{info, o, warn, BlockNumber, BlockPtr, Logger, StoreError, ENV_VARS}, + prelude::{info, o, warn, BlockNumber, BlockPtr, Logger, StoreError}, schema::EntityType, }; use itertools::Itertools; @@ -43,17 +39,11 @@ use crate::{ dynds::DataSourcesTable, primary::{DeploymentId, Site}, relational::index::IndexList, + vid_batcher::AdaptiveBatchSize, }; use crate::{connection_pool::ConnectionPool, relational::Layout}; use crate::{relational::Table, relational_queries as rq}; -/// The initial batch size for tables that do not have an array column -const INITIAL_BATCH_SIZE: i64 = 10_000; -/// The initial batch size for tables that do have an array column; those -/// arrays can be large and large arrays will slow down copying a lot. We -/// therefore tread lightly in that case -const INITIAL_BATCH_SIZE_LIST: i64 = 100; - const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60); /// If replicas are lagging by more than this, the copying code will pause @@ -299,51 +289,6 @@ pub(crate) fn source( .map_err(StoreError::from) } -/// Track the desired size of a batch in such a way that doing the next -/// batch gets close to TARGET_DURATION for the time it takes to copy one -/// batch, but don't step up the size by more than 2x at once -#[derive(Debug, Queryable)] -pub(crate) struct AdaptiveBatchSize { - pub size: i64, -} - -impl AdaptiveBatchSize { - pub fn new(table: &Table) -> Self { - let size = if table.columns.iter().any(|col| col.is_list()) { - INITIAL_BATCH_SIZE_LIST - } else { - INITIAL_BATCH_SIZE - }; - - Self { size } - } - - // adjust batch size by trying to extrapolate in such a way that we - // get close to TARGET_DURATION for the time it takes to copy one - // batch, but don't step up batch_size by more than 2x at once - pub fn adapt(&mut self, duration: Duration) { - // Avoid division by zero - let duration = duration.as_millis().max(1); - let new_batch_size = self.size as f64 - * ENV_VARS.store.batch_target_duration.as_millis() as f64 - / duration as f64; - self.size = (2 * self.size).min(new_batch_size.round() as i64); - } -} - -impl ToSql for AdaptiveBatchSize { - fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result { - >::to_sql(&self.size, out) - } -} - -impl FromSql for AdaptiveBatchSize { - fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result { - let size = >::from_sql(bytes)?; - Ok(AdaptiveBatchSize { size }) - } -} - /// A helper to copy entities from one table to another in batches that are /// small enough to not interfere with the rest of the operations happening /// in the database. The `src` and `dst` table must have the same structure diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index 409ce182d77..759e8601313 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -36,6 +36,7 @@ mod store; mod store_events; mod subgraph_store; pub mod transaction_receipt; +mod vid_batcher; mod writable; pub mod graphman; diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 10a9cff1626..e0dc5b7235f 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -18,10 +18,9 @@ use graph::{ use itertools::Itertools; use crate::{ - catalog, - copy::AdaptiveBatchSize, - deployment, + catalog, deployment, relational::{Table, VID_COLUMN}, + vid_batcher::AdaptiveBatchSize, }; use super::{Catalog, Layout, Namespace}; diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs new file mode 100644 index 00000000000..27d6f86cb8d --- /dev/null +++ b/store/postgres/src/vid_batcher.rs @@ -0,0 +1,63 @@ +use std::time::Duration; + +use diesel::{ + deserialize::FromSql, + pg::Pg, + serialize::{Output, ToSql}, + sql_types::BigInt, +}; +use graph::env::ENV_VARS; + +use crate::relational::Table; + +/// The initial batch size for tables that do not have an array column +const INITIAL_BATCH_SIZE: i64 = 10_000; +/// The initial batch size for tables that do have an array column; those +/// arrays can be large and large arrays will slow down copying a lot. We +/// therefore tread lightly in that case +const INITIAL_BATCH_SIZE_LIST: i64 = 100; + +/// Track the desired size of a batch in such a way that doing the next +/// batch gets close to TARGET_DURATION for the time it takes to copy one +/// batch, but don't step up the size by more than 2x at once +#[derive(Debug, Queryable)] +pub(crate) struct AdaptiveBatchSize { + pub size: i64, +} + +impl AdaptiveBatchSize { + pub fn new(table: &Table) -> Self { + let size = if table.columns.iter().any(|col| col.is_list()) { + INITIAL_BATCH_SIZE_LIST + } else { + INITIAL_BATCH_SIZE + }; + + Self { size } + } + + // adjust batch size by trying to extrapolate in such a way that we + // get close to TARGET_DURATION for the time it takes to copy one + // batch, but don't step up batch_size by more than 2x at once + pub fn adapt(&mut self, duration: Duration) { + // Avoid division by zero + let duration = duration.as_millis().max(1); + let new_batch_size = self.size as f64 + * ENV_VARS.store.batch_target_duration.as_millis() as f64 + / duration as f64; + self.size = (2 * self.size).min(new_batch_size.round() as i64); + } +} + +impl ToSql for AdaptiveBatchSize { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result { + >::to_sql(&self.size, out) + } +} + +impl FromSql for AdaptiveBatchSize { + fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result { + let size = >::from_sql(bytes)?; + Ok(AdaptiveBatchSize { size }) + } +} From 8749f20a1152fa29efd4e94fa488501e93aa297c Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 5 Feb 2025 12:29:52 -0800 Subject: [PATCH 5/9] store: Move batching logic for copies into seperate struct --- store/postgres/src/catalog.rs | 28 +++ store/postgres/src/copy.rs | 127 ++++------ store/postgres/src/vid_batcher.rs | 373 +++++++++++++++++++++++++++++- 3 files changed, 441 insertions(+), 87 deletions(-) diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index dc73ec6f7f5..8e988e31522 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -912,3 +912,31 @@ fn has_minmax_multi_ops(conn: &mut PgConnection) -> Result { Ok(sql_query(QUERY).get_result::(conn)?.has_ops) } + +pub(crate) fn histogram_bounds( + conn: &mut PgConnection, + namespace: &Namespace, + table: &SqlName, + column: &str, +) -> Result, StoreError> { + const QUERY: &str = "select histogram_bounds::text::int8[] bounds \ + from pg_stats \ + where schemaname = $1 \ + and tablename = $2 \ + and attname = $3"; + + #[derive(Queryable, QueryableByName)] + struct Bounds { + #[diesel(sql_type = Array)] + bounds: Vec, + } + + sql_query(QUERY) + .bind::(namespace.as_str()) + .bind::(table.as_str()) + .bind::(column) + .get_result::(conn) + .optional() + .map(|bounds| bounds.map(|b| b.bounds).unwrap_or_default()) + .map_err(StoreError::from) +} diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 595866709db..9b64390581b 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -22,10 +22,8 @@ use diesel::{ dsl::sql, insert_into, r2d2::{ConnectionManager, PooledConnection}, - select, sql_query, - sql_types::{BigInt, Integer}, - update, Connection as _, ExpressionMethods, OptionalExtension, PgConnection, QueryDsl, - RunQueryDsl, + select, sql_query, update, Connection as _, ExpressionMethods, OptionalExtension, PgConnection, + QueryDsl, RunQueryDsl, }; use graph::{ constraint_violation, @@ -39,7 +37,7 @@ use crate::{ dynds::DataSourcesTable, primary::{DeploymentId, Site}, relational::index::IndexList, - vid_batcher::AdaptiveBatchSize, + vid_batcher::{VidBatcher, VidRange}, }; use crate::{connection_pool::ConnectionPool, relational::Layout}; use crate::{relational::Table, relational_queries as rq}; @@ -202,6 +200,7 @@ impl CopyState { TableState::init( conn, dst.site.clone(), + &src, src_table.clone(), dst_table.clone(), &target_block, @@ -217,9 +216,9 @@ impl CopyState { ( cts::entity_type.eq(table.batch.dst.object.as_str()), cts::dst.eq(dst.site.id), - cts::next_vid.eq(table.batch.next_vid), - cts::target_vid.eq(table.batch.target_vid), - cts::batch_size.eq(table.batch.batch_size.size), + cts::next_vid.eq(table.batch.next_vid()), + cts::target_vid.eq(table.batch.target_vid()), + cts::batch_size.eq(table.batch.batch_size()), ) }) .collect::>(); @@ -298,49 +297,42 @@ pub(crate) fn source( pub(crate) struct BatchCopy { src: Arc, dst: Arc
, - /// The `vid` of the next entity version that we will copy - next_vid: i64, - /// The last `vid` that should be copied - target_vid: i64, - batch_size: AdaptiveBatchSize, + batcher: VidBatcher, } impl BatchCopy { - pub fn new(src: Arc
, dst: Arc
, first_vid: i64, last_vid: i64) -> Self { - let batch_size = AdaptiveBatchSize::new(&dst); - - Self { - src, - dst, - next_vid: first_vid, - target_vid: last_vid, - batch_size, - } + pub fn new(batcher: VidBatcher, src: Arc
, dst: Arc
) -> Self { + Self { src, dst, batcher } } /// Copy one batch of entities and update internal state so that the /// next call to `run` will copy the next batch pub fn run(&mut self, conn: &mut PgConnection) -> Result { - let start = Instant::now(); - - // Copy all versions with next_vid <= vid <= next_vid + batch_size - 1, - // but do not go over target_vid - let last_vid = (self.next_vid + self.batch_size.size - 1).min(self.target_vid); - rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, self.next_vid, last_vid)? - .execute(conn)?; + let (duration, _) = self.batcher.step(|start, end| { + rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? + .execute(conn)?; + Ok(()) + })?; - let duration = start.elapsed(); + Ok(duration) + } - // remember how far we got - self.next_vid = last_vid + 1; + pub fn finished(&self) -> bool { + self.batcher.finished() + } - self.batch_size.adapt(duration); + /// The first `vid` that has not been copied yet + pub fn next_vid(&self) -> i64 { + self.batcher.next_vid() + } - Ok(duration) + /// The last `vid` that should be copied + pub fn target_vid(&self) -> i64 { + self.batcher.target_vid() } - pub fn finished(&self) -> bool { - self.next_vid > self.target_vid + pub fn batch_size(&self) -> i64 { + self.batcher.batch_size() as i64 } } @@ -354,38 +346,15 @@ impl TableState { fn init( conn: &mut PgConnection, dst_site: Arc, + src_layout: &Layout, src: Arc
, dst: Arc
, target_block: &BlockPtr, ) -> Result { - #[derive(QueryableByName)] - struct VidRange { - #[diesel(sql_type = BigInt)] - min_vid: i64, - #[diesel(sql_type = BigInt)] - max_vid: i64, - } - - let max_block_clause = if src.immutable { - "block$ <= $1" - } else { - "lower(block_range) <= $1" - }; - let (next_vid, target_vid) = sql_query(format!( - "select coalesce(min(vid), 0) as min_vid, \ - coalesce(max(vid), -1) as max_vid \ - from {} where {}", - src.qualified_name.as_str(), - max_block_clause - )) - .bind::(&target_block.number) - .load::(conn)? - .first() - .map(|v| (v.min_vid, v.max_vid)) - .unwrap_or((0, -1)); - + let vid_range = VidRange::for_copy(conn, &src, target_block)?; + let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?; Ok(Self { - batch: BatchCopy::new(src, dst, next_vid, target_vid), + batch: BatchCopy::new(batcher, src, dst), dst_site, duration_ms: 0, }) @@ -451,10 +420,14 @@ impl TableState { ); match (src, dst) { (Ok(src), Ok(dst)) => { - let mut batch = BatchCopy::new(src, dst, current_vid, target_vid); - let batch_size = AdaptiveBatchSize { size }; - - batch.batch_size = batch_size; + let batcher = VidBatcher::load( + conn, + &src_layout.site.namespace, + &src, + VidRange::new(current_vid, target_vid), + )? + .with_batch_size(size as usize); + let batch = BatchCopy::new(batcher, src, dst); Ok(TableState { batch, @@ -493,8 +466,8 @@ impl TableState { .set(cts::started_at.eq(sql("now()"))) .execute(conn)?; let values = ( - cts::next_vid.eq(self.batch.next_vid), - cts::batch_size.eq(self.batch.batch_size.size), + cts::next_vid.eq(self.batch.next_vid()), + cts::batch_size.eq(self.batch.batch_size()), cts::duration_ms.eq(self.duration_ms), ); update( @@ -566,12 +539,12 @@ impl<'a> CopyProgress<'a> { let target_vid: i64 = state .tables .iter() - .map(|table| table.batch.target_vid) + .map(|table| table.batch.target_vid()) .sum(); let current_vid = state .tables .iter() - .map(|table| table.batch.next_vid.min(table.batch.target_vid)) + .map(|table| table.batch.next_vid()) .sum(); Self { logger, @@ -609,18 +582,18 @@ impl<'a> CopyProgress<'a> { info!( self.logger, "Copied {:.2}% of `{}` entities ({}/{} entity versions), {:.2}% of overall data", - Self::progress_pct(batch.next_vid, batch.target_vid), + Self::progress_pct(batch.next_vid(), batch.target_vid()), batch.dst.object, - batch.next_vid, - batch.target_vid, - Self::progress_pct(self.current_vid + batch.next_vid, self.target_vid) + batch.next_vid(), + batch.target_vid(), + Self::progress_pct(self.current_vid + batch.next_vid(), self.target_vid) ); self.last_log = Instant::now(); } } fn table_finished(&mut self, batch: &BatchCopy) { - self.current_vid += batch.next_vid; + self.current_vid += batch.next_vid(); } fn finished(&self) { diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs index 27d6f86cb8d..658a1e3c78b 100644 --- a/store/postgres/src/vid_batcher.rs +++ b/store/postgres/src/vid_batcher.rs @@ -1,14 +1,24 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use diesel::{ deserialize::FromSql, pg::Pg, serialize::{Output, ToSql}, - sql_types::BigInt, + sql_query, + sql_types::{BigInt, Integer}, + PgConnection, RunQueryDsl as _, +}; +use graph::{ + env::ENV_VARS, + prelude::{BlockPtr, StoreError}, + util::ogive::Ogive, }; -use graph::env::ENV_VARS; -use crate::relational::Table; +use crate::{ + catalog, + primary::Namespace, + relational::{Table, VID_COLUMN}, +}; /// The initial batch size for tables that do not have an array column const INITIAL_BATCH_SIZE: i64 = 10_000; @@ -23,6 +33,7 @@ const INITIAL_BATCH_SIZE_LIST: i64 = 100; #[derive(Debug, Queryable)] pub(crate) struct AdaptiveBatchSize { pub size: i64, + pub target: Duration, } impl AdaptiveBatchSize { @@ -33,19 +44,21 @@ impl AdaptiveBatchSize { INITIAL_BATCH_SIZE }; - Self { size } + Self { + size, + target: ENV_VARS.store.batch_target_duration, + } } // adjust batch size by trying to extrapolate in such a way that we // get close to TARGET_DURATION for the time it takes to copy one // batch, but don't step up batch_size by more than 2x at once - pub fn adapt(&mut self, duration: Duration) { + pub fn adapt(&mut self, duration: Duration) -> i64 { // Avoid division by zero let duration = duration.as_millis().max(1); - let new_batch_size = self.size as f64 - * ENV_VARS.store.batch_target_duration.as_millis() as f64 - / duration as f64; + let new_batch_size = self.size as f64 * self.target.as_millis() as f64 / duration as f64; self.size = (2 * self.size).min(new_batch_size.round() as i64); + self.size } } @@ -58,6 +71,346 @@ impl ToSql for AdaptiveBatchSize { impl FromSql for AdaptiveBatchSize { fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result { let size = >::from_sql(bytes)?; - Ok(AdaptiveBatchSize { size }) + Ok(AdaptiveBatchSize { + size, + target: ENV_VARS.store.batch_target_duration, + }) + } +} + +/// A timer that works like `std::time::Instant` in non-test code, but +/// returns a fake elapsed value in tests +struct Timer { + start: Instant, + #[cfg(test)] + duration: Duration, +} + +impl Timer { + fn new() -> Self { + Self { + start: Instant::now(), + #[cfg(test)] + duration: Duration::from_secs(0), + } + } + + fn start(&mut self) { + self.start = Instant::now(); + } + + #[cfg(test)] + fn elapsed(&self) -> Duration { + self.duration + } + + #[cfg(not(test))] + fn elapsed(&self) -> Duration { + self.start.elapsed() + } + + #[cfg(test)] + fn set(&mut self, duration: Duration) { + self.duration = duration; + } +} + +/// A batcher for moving through a large range of `vid` values in a way such +/// that each batch takes approximatley the same amount of time. The batcher +/// takes uneven distributions of `vid` values into account by using the +/// histogram from `pg_stats` for the table through which we are iterating. +pub(crate) struct VidBatcher { + batch_size: AdaptiveBatchSize, + start: i64, + end: i64, + max_vid: i64, + + ogive: Option, + + step_timer: Timer, +} + +impl VidBatcher { + fn histogram_bounds( + conn: &mut PgConnection, + nsp: &Namespace, + table: &Table, + range: VidRange, + ) -> Result, StoreError> { + let bounds = catalog::histogram_bounds(conn, nsp, &table.name, VID_COLUMN)? + .into_iter() + .filter(|bound| range.min < *bound && range.max > *bound) + .chain(vec![range.min, range.max].into_iter()) + .collect::>(); + Ok(bounds) + } + + /// Initialize a batcher for batching through entries in `table` with + /// `vid` in the given `vid_range` + /// + /// The `vid_range` is inclusive, i.e., the batcher will iterate over + /// all vids `vid_range.0 <= vid <= vid_range.1`; for an empty table, + /// the `vid_range` must be set to `(-1, 0)` + pub fn load( + conn: &mut PgConnection, + nsp: &Namespace, + table: &Table, + vid_range: VidRange, + ) -> Result { + let bounds = Self::histogram_bounds(conn, nsp, table, vid_range)?; + let batch_size = AdaptiveBatchSize::new(table); + Self::new(bounds, vid_range, batch_size) + } + + fn new( + bounds: Vec, + range: VidRange, + batch_size: AdaptiveBatchSize, + ) -> Result { + let start = range.min; + + let mut ogive = if range.is_empty() { + None + } else { + Some(Ogive::from_equi_histogram(bounds, range.size())?) + }; + let end = match ogive.as_mut() { + None => start + batch_size.size, + Some(ogive) => ogive.next_point(start, batch_size.size as usize)?, + }; + + Ok(Self { + batch_size, + start, + end, + max_vid: range.max, + ogive, + step_timer: Timer::new(), + }) + } + + /// Explicitly set the batch size + pub fn with_batch_size(mut self: VidBatcher, size: usize) -> Self { + self.batch_size.size = size as i64; + self + } + + pub(crate) fn next_vid(&self) -> i64 { + self.start + } + + pub(crate) fn target_vid(&self) -> i64 { + self.max_vid + } + + pub fn batch_size(&self) -> usize { + self.batch_size.size as usize + } + + pub fn finished(&self) -> bool { + self.start > self.max_vid + } + + /// Perform the work for one batch. The function `f` is called with the + /// start and end `vid` for this batch and should perform all the work + /// for rows with `start <= vid <= end`, i.e. the start and end values + /// are inclusive. + /// + /// Once `f` returns, the batch size will be adjusted so that the time + /// the next batch will take is close to the target duration. + /// + /// The function returns the time it took to process the batch and the + /// result of `f`. If the batcher is finished, `f` will not be called, + /// and `None` will be returned as its result. + pub fn step(&mut self, mut f: F) -> Result<(Duration, Option), StoreError> + where + F: FnMut(i64, i64) -> Result, + { + if self.finished() { + return Ok((Duration::from_secs(0), None)); + } + + match self.ogive.as_mut() { + None => Ok((Duration::from_secs(0), None)), + Some(ogive) => { + self.step_timer.start(); + + let res = f(self.start, self.end)?; + let duration = self.step_timer.elapsed(); + + let batch_size = self.batch_size.adapt(duration); + self.start = self.end + 1; + self.end = ogive.next_point(self.start, batch_size as usize)?; + + Ok((duration, Some(res))) + } + } + } +} + +#[derive(Copy, Clone, QueryableByName)] +pub(crate) struct VidRange { + #[diesel(sql_type = BigInt, column_name = "min_vid")] + min: i64, + #[diesel(sql_type = BigInt, column_name = "max_vid")] + max: i64, +} + +const EMPTY_VID_RANGE: VidRange = VidRange { max: -1, min: 0 }; + +impl VidRange { + pub fn new(min_vid: i64, max_vid: i64) -> Self { + Self { + min: min_vid, + max: max_vid, + } + } + + pub fn is_empty(&self) -> bool { + self.max == -1 + } + + pub fn size(&self) -> usize { + (self.max - self.min + 1) as usize + } + + pub fn for_copy( + conn: &mut PgConnection, + src: &Table, + target_block: &BlockPtr, + ) -> Result { + let max_block_clause = if src.immutable { + "block$ <= $1" + } else { + "lower(block_range) <= $1" + }; + let vid_range = sql_query(format!( + "select coalesce(min(vid), 0) as min_vid, \ + coalesce(max(vid), -1) as max_vid \ + from {} where {}", + src.qualified_name.as_str(), + max_block_clause + )) + .bind::(&target_block.number) + .load::(conn)? + .pop() + .unwrap_or(EMPTY_VID_RANGE); + Ok(vid_range) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const S001: Duration = Duration::from_secs(1); + const S010: Duration = Duration::from_secs(10); + const S050: Duration = Duration::from_secs(50); + const S100: Duration = Duration::from_secs(100); + const S200: Duration = Duration::from_secs(200); + + struct Batcher { + vid: VidBatcher, + } + + impl Batcher { + fn new(bounds: Vec, size: i64) -> Self { + let batch_size = AdaptiveBatchSize { size, target: S100 }; + let vid_range = VidRange::new(bounds[0], *bounds.last().unwrap()); + Self { + vid: VidBatcher::new(bounds, vid_range, batch_size).unwrap(), + } + } + + #[track_caller] + fn at(&self, start: i64, end: i64, size: i64) { + assert_eq!(self.vid.start, start, "at start"); + assert_eq!(self.vid.end, end, "at end"); + assert_eq!(self.vid.batch_size.size, size, "at size"); + } + + #[track_caller] + fn step(&mut self, start: i64, end: i64, duration: Duration) { + self.vid.step_timer.set(duration); + + match self.vid.step(|s, e| Ok((s, e))).unwrap() { + (d, Some((s, e))) => { + // Failing here indicates that our clever Timer is misbehaving + assert_eq!(d, duration, "step duration"); + assert_eq!(s, start, "step start"); + assert_eq!(e, end, "step end"); + } + (_, None) => { + if start > end { + // Expected, the batcher is exhausted + return; + } else { + panic!("step didn't return start and end") + } + } + } + } + + #[track_caller] + fn run(&mut self, start: i64, end: i64, size: i64, duration: Duration) { + self.at(start, end, size); + self.step(start, end, duration); + } + + fn finished(&self) -> bool { + self.vid.finished() + } + } + + #[test] + fn simple() { + let bounds = vec![10, 20, 30, 40, 49]; + let mut batcher = Batcher::new(bounds, 5); + + batcher.at(10, 15, 5); + + batcher.step(10, 15, S001); + batcher.at(16, 26, 10); + + batcher.step(16, 26, S001); + batcher.at(27, 46, 20); + assert!(!batcher.finished()); + + batcher.step(27, 46, S001); + batcher.at(47, 49, 40); + assert!(!batcher.finished()); + + batcher.step(47, 49, S001); + assert!(batcher.finished()); + batcher.at(50, 49, 80); + } + + #[test] + fn non_uniform() { + // A distribution that is flat in the beginning and then steeper and + // linear towards the end. The easiest way to see this is to graph + // `(bounds[i], i*40)` + let bounds = vec![40, 180, 260, 300, 320, 330, 340, 350, 359]; + let mut batcher = Batcher::new(bounds, 10); + + // The schedule of how we move through the bounds above in batches, + // with varying timings for each batch + batcher.run(040, 075, 10, S010); + batcher.run(076, 145, 20, S010); + batcher.run(146, 240, 40, S200); + batcher.run(241, 270, 20, S200); + batcher.run(271, 281, 10, S200); + batcher.run(282, 287, 05, S050); + batcher.run(288, 298, 10, S050); + batcher.run(299, 309, 20, S050); + batcher.run(310, 325, 40, S100); + batcher.run(326, 336, 40, S100); + batcher.run(337, 347, 40, S100); + batcher.run(348, 357, 40, S100); + batcher.run(358, 359, 40, S010); + assert!(batcher.finished()); + + batcher.at(360, 359, 80); + batcher.step(360, 359, S010); } } From d63782c55d6ac604b204f396af27aff11b5661c8 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 7 Feb 2025 13:26:02 -0800 Subject: [PATCH 6/9] store: Introduce a VidRange struct --- store/postgres/src/vid_batcher.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs index 658a1e3c78b..3744012f99b 100644 --- a/store/postgres/src/vid_batcher.rs +++ b/store/postgres/src/vid_batcher.rs @@ -271,9 +271,10 @@ impl VidRange { } pub fn size(&self) -> usize { - (self.max - self.min + 1) as usize + (self.max - self.min) as usize + 1 } + /// Return the full range of `vid` values in the table `src` pub fn for_copy( conn: &mut PgConnection, src: &Table, @@ -285,11 +286,13 @@ impl VidRange { "lower(block_range) <= $1" }; let vid_range = sql_query(format!( - "select coalesce(min(vid), 0) as min_vid, \ + "/* controller=copy,target={target_number} */ \ + select coalesce(min(vid), 0) as min_vid, \ coalesce(max(vid), -1) as max_vid \ - from {} where {}", - src.qualified_name.as_str(), - max_block_clause + from {src_name} where {max_block_clause}", + target_number = target_block.number, + src_name = src.qualified_name.as_str(), + max_block_clause = max_block_clause )) .bind::(&target_block.number) .load::(conn)? From c4844ce329c5e761551c4d041f371fed8b995c46 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 7 Feb 2025 15:35:24 -0800 Subject: [PATCH 7/9] store: Use VidRange for pruning --- store/postgres/src/relational/prune.rs | 65 ++++++-------------------- store/postgres/src/vid_batcher.rs | 31 ++++++++++-- 2 files changed, 41 insertions(+), 55 deletions(-) diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index e0dc5b7235f..742d994d2df 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -20,48 +20,11 @@ use itertools::Itertools; use crate::{ catalog, deployment, relational::{Table, VID_COLUMN}, - vid_batcher::AdaptiveBatchSize, + vid_batcher::{AdaptiveBatchSize, VidRange}, }; use super::{Catalog, Layout, Namespace}; -// Additions to `Table` that are useful for pruning -impl Table { - /// Return the first and last vid of any entity that is visible in the - /// block range from `first_block` (inclusive) to `last_block` - /// (exclusive) - fn vid_range( - &self, - conn: &mut PgConnection, - first_block: BlockNumber, - last_block: BlockNumber, - ) -> Result<(i64, i64), StoreError> { - #[derive(QueryableByName)] - struct VidRange { - #[diesel(sql_type = BigInt)] - min_vid: i64, - #[diesel(sql_type = BigInt)] - max_vid: i64, - } - - // Determine the last vid that we need to copy - let VidRange { min_vid, max_vid } = sql_query(format!( - "/* controller=prune,first={first_block},last={last_block} */ \ - select coalesce(min(vid), 0) as min_vid, \ - coalesce(max(vid), -1) as max_vid from {src} \ - where lower(block_range) <= $2 \ - and coalesce(upper(block_range), 2147483647) > $1 \ - and coalesce(upper(block_range), 2147483647) <= $2 \ - and block_range && int4range($1, $2)", - src = self.qualified_name, - )) - .bind::(first_block) - .bind::(last_block) - .get_result::(conn)?; - Ok((min_vid, max_vid)) - } -} - /// Utility to copy relevant data out of a source table and into a new /// destination table and replace the source table with the destination /// table @@ -122,12 +85,12 @@ impl TablePair { let column_list = self.column_list(); // Determine the last vid that we need to copy - let (min_vid, max_vid) = self.src.vid_range(conn, earliest_block, final_block)?; + let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?; let mut batch_size = AdaptiveBatchSize::new(&self.src); // The first vid we still need to copy - let mut next_vid = min_vid; - while next_vid <= max_vid { + let mut next_vid = range.min; + while next_vid <= range.max { let start = Instant::now(); let rows = conn.transaction(|conn| { // Page through all rows in `src` in batches of `batch_size` @@ -167,7 +130,7 @@ impl TablePair { self.src.name.as_str(), rows, PrunePhase::CopyFinal, - next_vid > max_vid, + next_vid > range.max, ); } Ok(()) @@ -185,14 +148,12 @@ impl TablePair { let column_list = self.column_list(); // Determine the last vid that we need to copy - let (min_vid, max_vid) = self - .src - .vid_range(conn, final_block + 1, BLOCK_NUMBER_MAX)?; + let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?; let mut batch_size = AdaptiveBatchSize::new(&self.src); // The first vid we still need to copy - let mut next_vid = min_vid; - while next_vid <= max_vid { + let mut next_vid = range.min; + while next_vid <= range.max { let start = Instant::now(); let rows = conn.transaction(|conn| { // Page through all the rows in `src` in batches of @@ -227,7 +188,7 @@ impl TablePair { self.src.name.as_str(), rows, PrunePhase::CopyNonfinal, - next_vid > max_vid, + next_vid > range.max, ); } Ok(()) @@ -459,10 +420,10 @@ impl Layout { PruningStrategy::Delete => { // Delete all entity versions whose range was closed // before `req.earliest_block` - let (min_vid, max_vid) = table.vid_range(conn, 0, req.earliest_block)?; + let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?; let mut batch_size = AdaptiveBatchSize::new(&table); - let mut next_vid = min_vid; - while next_vid <= max_vid { + let mut next_vid = range.min; + while next_vid <= range.max { let start = Instant::now(); let rows = sql_query(format!( "/* controller=prune,phase=delete,start_vid={next_vid},batch_size={batch_size} */ \ @@ -485,7 +446,7 @@ impl Layout { table.name.as_str(), rows as usize, PrunePhase::Delete, - next_vid > max_vid, + next_vid > range.max, ); } } diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs index 3744012f99b..e6dbd065888 100644 --- a/store/postgres/src/vid_batcher.rs +++ b/store/postgres/src/vid_batcher.rs @@ -10,7 +10,7 @@ use diesel::{ }; use graph::{ env::ENV_VARS, - prelude::{BlockPtr, StoreError}, + prelude::{BlockNumber, BlockPtr, StoreError}, util::ogive::Ogive, }; @@ -251,9 +251,9 @@ impl VidBatcher { #[derive(Copy, Clone, QueryableByName)] pub(crate) struct VidRange { #[diesel(sql_type = BigInt, column_name = "min_vid")] - min: i64, + pub min: i64, #[diesel(sql_type = BigInt, column_name = "max_vid")] - max: i64, + pub max: i64, } const EMPTY_VID_RANGE: VidRange = VidRange { max: -1, min: 0 }; @@ -300,6 +300,31 @@ impl VidRange { .unwrap_or(EMPTY_VID_RANGE); Ok(vid_range) } + + /// Return the first and last vid of any entity that is visible in the + /// block range from `first_block` (inclusive) to `last_block` + /// (exclusive) + pub fn for_prune( + conn: &mut PgConnection, + src: &Table, + first_block: BlockNumber, + last_block: BlockNumber, + ) -> Result { + sql_query(format!( + "/* controller=prune,first={first_block},last={last_block} */ \ + select coalesce(min(vid), 0) as min_vid, \ + coalesce(max(vid), -1) as max_vid from {src} \ + where lower(block_range) <= $2 \ + and coalesce(upper(block_range), 2147483647) > $1 \ + and coalesce(upper(block_range), 2147483647) <= $2 \ + and block_range && int4range($1, $2)", + src = src.qualified_name, + )) + .bind::(first_block) + .bind::(last_block) + .get_result::(conn) + .map_err(StoreError::from) + } } #[cfg(test)] From 5f648ffd409295812ab9149225500c14d89af07e Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 7 Feb 2025 15:49:27 -0800 Subject: [PATCH 8/9] store: Use VidBatcher to batch pruning queries --- store/postgres/src/relational/prune.rs | 136 +++++++++++-------------- 1 file changed, 61 insertions(+), 75 deletions(-) diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 742d994d2df..0e82c1bfcc7 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -1,4 +1,4 @@ -use std::{fmt::Write, sync::Arc, time::Instant}; +use std::{fmt::Write, sync::Arc}; use diesel::{ connection::SimpleConnection, @@ -20,7 +20,7 @@ use itertools::Itertools; use crate::{ catalog, deployment, relational::{Table, VID_COLUMN}, - vid_batcher::{AdaptiveBatchSize, VidRange}, + vid_batcher::{VidBatcher, VidRange}, }; use super::{Catalog, Layout, Namespace}; @@ -86,51 +86,47 @@ impl TablePair { // Determine the last vid that we need to copy let range = VidRange::for_prune(conn, &self.src, earliest_block, final_block)?; - - let mut batch_size = AdaptiveBatchSize::new(&self.src); - // The first vid we still need to copy - let mut next_vid = range.min; - while next_vid <= range.max { - let start = Instant::now(); - let rows = conn.transaction(|conn| { - // Page through all rows in `src` in batches of `batch_size` - // and copy the ones that are visible to queries at block - // heights between `earliest_block` and `final_block`, but - // whose block_range does not extend past `final_block` - // since they could still be reverted while we copy. - // The conditions on `block_range` are expressed redundantly - // to make more indexes useable - sql_query(format!( - "/* controller=prune,phase=final,start_vid={next_vid},batch_size={batch_size} */ \ + let mut batcher = VidBatcher::load(conn, &self.src_nsp, &self.src, range)?; + + while !batcher.finished() { + let (_, rows) = batcher.step(|start, end| { + conn.transaction(|conn| { + // Page through all rows in `src` in batches of `batch_size` + // and copy the ones that are visible to queries at block + // heights between `earliest_block` and `final_block`, but + // whose block_range does not extend past `final_block` + // since they could still be reverted while we copy. + // The conditions on `block_range` are expressed redundantly + // to make more indexes useable + sql_query(format!( + "/* controller=prune,phase=final,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ where lower(block_range) <= $2 \ and coalesce(upper(block_range), 2147483647) > $1 \ and coalesce(upper(block_range), 2147483647) <= $2 \ and block_range && int4range($1, $2, '[]') \ - and vid >= $3 and vid < $3 + $4 \ + and vid >= $3 and vid <= $4 \ order by vid", src = self.src.qualified_name, dst = self.dst.qualified_name, - batch_size = batch_size.size, + batch_size = end - start + 1, )) - .bind::(earliest_block) - .bind::(final_block) - .bind::(next_vid) - .bind::(&batch_size) - .execute(conn) + .bind::(earliest_block) + .bind::(final_block) + .bind::(start) + .bind::(end) + .execute(conn) + .map_err(StoreError::from) + }) })?; cancel.check_cancel()?; - next_vid += batch_size.size; - - batch_size.adapt(start.elapsed()); - reporter.prune_batch( self.src.name.as_str(), - rows, + rows.unwrap_or(0), PrunePhase::CopyFinal, - next_vid > range.max, + batcher.finished(), ); } Ok(()) @@ -149,46 +145,41 @@ impl TablePair { // Determine the last vid that we need to copy let range = VidRange::for_prune(conn, &self.src, final_block + 1, BLOCK_NUMBER_MAX)?; + let mut batcher = VidBatcher::load(conn, &self.src.nsp, &self.src, range)?; - let mut batch_size = AdaptiveBatchSize::new(&self.src); - // The first vid we still need to copy - let mut next_vid = range.min; - while next_vid <= range.max { - let start = Instant::now(); - let rows = conn.transaction(|conn| { + while !batcher.finished() { + let (_, rows) = batcher.step(|start, end| { // Page through all the rows in `src` in batches of // `batch_size` that are visible to queries at block heights - // starting right after `final_block`. - // The conditions on `block_range` are expressed redundantly - // to make more indexes useable - sql_query(format!( - "/* controller=prune,phase=nonfinal,start_vid={next_vid},batch_size={batch_size} */ \ + // starting right after `final_block`. The conditions on + // `block_range` are expressed redundantly to make more + // indexes useable + conn.transaction(|conn| { + sql_query(format!( + "/* controller=prune,phase=nonfinal,start_vid={start},batch_size={batch_size} */ \ insert into {dst}({column_list}) \ select {column_list} from {src} \ where coalesce(upper(block_range), 2147483647) > $1 \ and block_range && int4range($1, null) \ - and vid >= $2 and vid < $2 + $3 \ + and vid >= $2 and vid <= $3 \ order by vid", - dst = self.dst.qualified_name, - src = self.src.qualified_name, - batch_size = batch_size.size - )) - .bind::(final_block) - .bind::(next_vid) - .bind::(&batch_size) - .execute(conn) - .map_err(StoreError::from) + dst = self.dst.qualified_name, + src = self.src.qualified_name, + batch_size = end - start + 1, + )) + .bind::(final_block) + .bind::(start) + .bind::(end) + .execute(conn) + .map_err(StoreError::from) + }) })?; - next_vid += batch_size.size; - - batch_size.adapt(start.elapsed()); - reporter.prune_batch( self.src.name.as_str(), - rows, + rows.unwrap_or(0), PrunePhase::CopyNonfinal, - next_vid > range.max, + batcher.finished(), ); } Ok(()) @@ -421,32 +412,27 @@ impl Layout { // Delete all entity versions whose range was closed // before `req.earliest_block` let range = VidRange::for_prune(conn, &table, 0, req.earliest_block)?; - let mut batch_size = AdaptiveBatchSize::new(&table); - let mut next_vid = range.min; - while next_vid <= range.max { - let start = Instant::now(); - let rows = sql_query(format!( - "/* controller=prune,phase=delete,start_vid={next_vid},batch_size={batch_size} */ \ + let mut batcher = VidBatcher::load(conn, &self.site.namespace, &table, range)?; + + while !batcher.finished() { + let (_, rows) = batcher.step(|start, end| {sql_query(format!( + "/* controller=prune,phase=delete,start_vid={start},batch_size={batch_size} */ \ delete from {qname} \ where coalesce(upper(block_range), 2147483647) <= $1 \ - and vid >= $2 and vid < $2 + $3", + and vid >= $2 and vid <= $3", qname = table.qualified_name, - batch_size = batch_size.size + batch_size = end - start + 1 )) .bind::(req.earliest_block) - .bind::(next_vid) - .bind::(&batch_size) - .execute(conn)?; - - next_vid += batch_size.size; - - batch_size.adapt(start.elapsed()); + .bind::(start) + .bind::(end) + .execute(conn).map_err(StoreError::from)})?; reporter.prune_batch( table.name.as_str(), - rows as usize, + rows.unwrap_or(0), PrunePhase::Delete, - next_vid > range.max, + batcher.finished(), ); } } From 157c2915386941e35d5acd2813d841ebec140718 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 7 Feb 2025 16:18:03 -0800 Subject: [PATCH 9/9] store: Remove unused ToSql/FromSql impls for AdaptiveBatchSize --- store/postgres/src/vid_batcher.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs index e6dbd065888..81da5382e3d 100644 --- a/store/postgres/src/vid_batcher.rs +++ b/store/postgres/src/vid_batcher.rs @@ -1,9 +1,6 @@ use std::time::{Duration, Instant}; use diesel::{ - deserialize::FromSql, - pg::Pg, - serialize::{Output, ToSql}, sql_query, sql_types::{BigInt, Integer}, PgConnection, RunQueryDsl as _, @@ -62,22 +59,6 @@ impl AdaptiveBatchSize { } } -impl ToSql for AdaptiveBatchSize { - fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result { - >::to_sql(&self.size, out) - } -} - -impl FromSql for AdaptiveBatchSize { - fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result { - let size = >::from_sql(bytes)?; - Ok(AdaptiveBatchSize { - size, - target: ENV_VARS.store.batch_target_duration, - }) - } -} - /// A timer that works like `std::time::Instant` in non-test code, but /// returns a fake elapsed value in tests struct Timer {