From 12b2184920b1a1ca188ad144ccba488ff3792257 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sun, 2 Feb 2025 18:15:23 -0800 Subject: [PATCH] store: Try to avoid pathological batch size adjustments If a batch is processed very fast, that might indicate that there are very few rows actually matching the range. If this happens repeatedly, we can end up with a gigantic batch size where processing a batch takes way too long once we get into a region with denser vids. This patch tries to address that by taking a fast batch as an indication that vids are distributed sparsely and that we should jump directly to the next existing vid rather than adjusting the batch size. --- store/postgres/src/copy.rs | 30 ++++++++++++++- store/postgres/src/relational/dsl.rs | 55 +++++++++++++++++++++++++++- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index c526a93c7b8..2411ffe685b 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -54,6 +54,11 @@ const INITIAL_BATCH_SIZE: i64 = 10_000; /// therefore tread lightly in that case const INITIAL_BATCH_SIZE_LIST: i64 = 100; +/// If a batch takes less than this time, we might be in a situation where +/// vids are very sparse and should explicitly query for the next vid we +/// need to copy so we skip over large gaps +const MIN_BATCH_DURATION: Duration = Duration::from_secs(5); + const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60); /// If replicas are lagging by more than this, the copying code will pause @@ -389,7 +394,30 @@ impl BatchCopy { // remember how far we got self.next_vid = last_vid + 1; - self.batch_size.adapt(duration); + if duration < MIN_BATCH_DURATION { + // The last batch was very short, and we might be in a situation + // where there are large gaps in the vids. If we adjust the + // batch size, we might increase the batch size to enormous + // values if that happens repeateadly. Rather than adjust the + // batch size, jump to the next actually existing vid + let src = self.src.dsl_table(); + let next_vid = src + .select(src.vid_column()) + .filter(src.vid_column().ge(self.next_vid)) + .get_result::(conn) + .optional()?; + if let Some(next_vid) = next_vid { + self.next_vid = next_vid; + if next_vid - last_vid > INITIAL_BATCH_SIZE { + // If we skipped over a large gap, we might have to + // adjust the batch size so we don't have too large a + // bach size + self.batch_size.size = INITIAL_BATCH_SIZE; + } + } + } else { + self.batch_size.adapt(duration); + } Ok(duration) } diff --git a/store/postgres/src/relational/dsl.rs b/store/postgres/src/relational/dsl.rs index 6812bbb37e9..437a72bda6b 100644 --- a/store/postgres/src/relational/dsl.rs +++ b/store/postgres/src/relational/dsl.rs @@ -30,8 +30,8 @@ use crate::relational::ColumnType; use crate::relational_queries::PARENT_ID; use super::value::FromOidRow; -use super::Column as RelColumn; use super::SqlName; +use super::{Column as RelColumn, VID_COLUMN}; use super::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; const TYPENAME: &str = "__typename"; @@ -224,6 +224,10 @@ impl<'a> Table<'a> { BlockColumn::new(*self) } + pub(crate) fn vid_column(&self) -> VidColumn<'a> { + VidColumn::new(*self) + } + /// An expression that is true if the entity has changed since `block` pub fn changed_since(&self, block: BlockNumber) -> ChangedSince<'a> { let column = self.block_column(); @@ -529,6 +533,55 @@ impl QueryFragment for BlockColumn<'_> { } } +/// Generated by `Table.vid_column` +#[derive(Debug, Clone, Copy)] +pub struct VidColumn<'a> { + table: Table<'a>, +} + +impl<'a> VidColumn<'a> { + fn new(table: Table<'a>) -> Self { + VidColumn { table } + } + + pub fn name(&self) -> &str { + VID_COLUMN + } +} + +impl std::fmt::Display for VidColumn<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}.{}", self.table.alias.as_str(), self.name()) + } +} + +impl QueryFragment for VidColumn<'_> { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + self.table.walk_ast(out.reborrow())?; + out.push_sql("."); + out.push_sql(self.name()); + Ok(()) + } +} + +impl<'a> QueryId for VidColumn<'a> { + type QueryId = (); + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl<'a, QS> SelectableExpression for VidColumn<'a> where Self: Expression {} + +impl<'a, QS> AppearsOnTable for VidColumn<'a> where Self: Expression {} + +impl<'a> Expression for VidColumn<'a> { + type SqlType = BigInt; +} + +impl<'a> ValidGrouping<()> for VidColumn<'a> { + type IsAggregate = is_aggregate::No; +} + /// Generated by `Table.at_block` #[derive(Debug, Clone, Copy)] pub struct AtBlock<'a> {