Skip to content

Commit 12b2184

Browse files
committed
store: Try to avoid pathological batch size adjustments
If a batch is processed very fast, that might indicate that there are very few rows actually matching the range. If this happens repeatedly, we can end up with a gigantic batch size where processing a batch takes way too long once we get into a region with denser vids. This patch tries to address that by taking a fast batch as an indication that vids are distributed sparsely and that we should jump directly to the next existing vid rather than adjusting the batch size.
1 parent 070072b commit 12b2184

File tree

2 files changed

+83
-2
lines changed

2 files changed

+83
-2
lines changed

store/postgres/src/copy.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ const INITIAL_BATCH_SIZE: i64 = 10_000;
5454
/// therefore tread lightly in that case
5555
const INITIAL_BATCH_SIZE_LIST: i64 = 100;
5656

57+
/// If a batch takes less than this time, we might be in a situation where
58+
/// vids are very sparse and should explicitly query for the next vid we
59+
/// need to copy so we skip over large gaps
60+
const MIN_BATCH_DURATION: Duration = Duration::from_secs(5);
61+
5762
const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60);
5863

5964
/// If replicas are lagging by more than this, the copying code will pause
@@ -389,7 +394,30 @@ impl BatchCopy {
389394
// remember how far we got
390395
self.next_vid = last_vid + 1;
391396

392-
self.batch_size.adapt(duration);
397+
if duration < MIN_BATCH_DURATION {
398+
// The last batch was very short, and we might be in a situation
399+
// where there are large gaps in the vids. If we adjust the
400+
// batch size, we might increase the batch size to enormous
401+
// values if that happens repeateadly. Rather than adjust the
402+
// batch size, jump to the next actually existing vid
403+
let src = self.src.dsl_table();
404+
let next_vid = src
405+
.select(src.vid_column())
406+
.filter(src.vid_column().ge(self.next_vid))
407+
.get_result::<i64>(conn)
408+
.optional()?;
409+
if let Some(next_vid) = next_vid {
410+
self.next_vid = next_vid;
411+
if next_vid - last_vid > INITIAL_BATCH_SIZE {
412+
// If we skipped over a large gap, we might have to
413+
// adjust the batch size so we don't have too large a
414+
// bach size
415+
self.batch_size.size = INITIAL_BATCH_SIZE;
416+
}
417+
}
418+
} else {
419+
self.batch_size.adapt(duration);
420+
}
393421

394422
Ok(duration)
395423
}

store/postgres/src/relational/dsl.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use crate::relational::ColumnType;
3030
use crate::relational_queries::PARENT_ID;
3131

3232
use super::value::FromOidRow;
33-
use super::Column as RelColumn;
3433
use super::SqlName;
34+
use super::{Column as RelColumn, VID_COLUMN};
3535
use super::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
3636

3737
const TYPENAME: &str = "__typename";
@@ -224,6 +224,10 @@ impl<'a> Table<'a> {
224224
BlockColumn::new(*self)
225225
}
226226

227+
pub(crate) fn vid_column(&self) -> VidColumn<'a> {
228+
VidColumn::new(*self)
229+
}
230+
227231
/// An expression that is true if the entity has changed since `block`
228232
pub fn changed_since(&self, block: BlockNumber) -> ChangedSince<'a> {
229233
let column = self.block_column();
@@ -529,6 +533,55 @@ impl QueryFragment<Pg> for BlockColumn<'_> {
529533
}
530534
}
531535

536+
/// Generated by `Table.vid_column`
537+
#[derive(Debug, Clone, Copy)]
538+
pub struct VidColumn<'a> {
539+
table: Table<'a>,
540+
}
541+
542+
impl<'a> VidColumn<'a> {
543+
fn new(table: Table<'a>) -> Self {
544+
VidColumn { table }
545+
}
546+
547+
pub fn name(&self) -> &str {
548+
VID_COLUMN
549+
}
550+
}
551+
552+
impl std::fmt::Display for VidColumn<'_> {
553+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
554+
write!(f, "{}.{}", self.table.alias.as_str(), self.name())
555+
}
556+
}
557+
558+
impl QueryFragment<Pg> for VidColumn<'_> {
559+
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
560+
out.unsafe_to_cache_prepared();
561+
self.table.walk_ast(out.reborrow())?;
562+
out.push_sql(".");
563+
out.push_sql(self.name());
564+
Ok(())
565+
}
566+
}
567+
568+
impl<'a> QueryId for VidColumn<'a> {
569+
type QueryId = ();
570+
const HAS_STATIC_QUERY_ID: bool = false;
571+
}
572+
573+
impl<'a, QS> SelectableExpression<QS> for VidColumn<'a> where Self: Expression {}
574+
575+
impl<'a, QS> AppearsOnTable<QS> for VidColumn<'a> where Self: Expression {}
576+
577+
impl<'a> Expression for VidColumn<'a> {
578+
type SqlType = BigInt;
579+
}
580+
581+
impl<'a> ValidGrouping<()> for VidColumn<'a> {
582+
type IsAggregate = is_aggregate::No;
583+
}
584+
532585
/// Generated by `Table.at_block`
533586
#[derive(Debug, Clone, Copy)]
534587
pub struct AtBlock<'a> {

0 commit comments

Comments
 (0)