Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ const INITIAL_BATCH_SIZE: i64 = 10_000;
/// therefore tread lightly in that case
const INITIAL_BATCH_SIZE_LIST: i64 = 100;

/// If a batch takes less than this time, we might be in a situation where
/// vids are very sparse and should explicitly query for the next vid we
/// need to copy so we skip over large gaps
const MIN_BATCH_DURATION: Duration = Duration::from_secs(5);

const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60);

/// If replicas are lagging by more than this, the copying code will pause
Expand Down Expand Up @@ -389,7 +394,30 @@ impl BatchCopy {
// remember how far we got
self.next_vid = last_vid + 1;

self.batch_size.adapt(duration);
if duration < MIN_BATCH_DURATION {
// The last batch was very short, and we might be in a situation
// where there are large gaps in the vids. If we adjust the
// batch size, we might increase the batch size to enormous
// values if that happens repeateadly. Rather than adjust the
// batch size, jump to the next actually existing vid
let src = self.src.dsl_table();
let next_vid = src
.select(src.vid_column())
.filter(src.vid_column().ge(self.next_vid))
.get_result::<i64>(conn)
.optional()?;
if let Some(next_vid) = next_vid {
self.next_vid = next_vid;
if next_vid - last_vid > INITIAL_BATCH_SIZE {
// If we skipped over a large gap, we might have to
// adjust the batch size so we don't have too large a
// bach size
self.batch_size.size = INITIAL_BATCH_SIZE;
}
}
} else {
self.batch_size.adapt(duration);
}

Ok(duration)
}
Expand Down
55 changes: 54 additions & 1 deletion store/postgres/src/relational/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::relational::ColumnType;
use crate::relational_queries::PARENT_ID;

use super::value::FromOidRow;
use super::Column as RelColumn;
use super::SqlName;
use super::{Column as RelColumn, VID_COLUMN};
use super::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN};

const TYPENAME: &str = "__typename";
Expand Down Expand Up @@ -224,6 +224,10 @@ impl<'a> Table<'a> {
BlockColumn::new(*self)
}

pub(crate) fn vid_column(&self) -> VidColumn<'a> {
VidColumn::new(*self)
}

/// An expression that is true if the entity has changed since `block`
pub fn changed_since(&self, block: BlockNumber) -> ChangedSince<'a> {
let column = self.block_column();
Expand Down Expand Up @@ -529,6 +533,55 @@ impl QueryFragment<Pg> for BlockColumn<'_> {
}
}

/// Generated by `Table.vid_column`
#[derive(Debug, Clone, Copy)]
pub struct VidColumn<'a> {
table: Table<'a>,
}

impl<'a> VidColumn<'a> {
fn new(table: Table<'a>) -> Self {
VidColumn { table }
}

pub fn name(&self) -> &str {
VID_COLUMN
}
}

impl std::fmt::Display for VidColumn<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}.{}", self.table.alias.as_str(), self.name())
}
}

impl QueryFragment<Pg> for VidColumn<'_> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();
self.table.walk_ast(out.reborrow())?;
out.push_sql(".");
out.push_sql(self.name());
Ok(())
}
}

impl<'a> QueryId for VidColumn<'a> {
type QueryId = ();
const HAS_STATIC_QUERY_ID: bool = false;
}

impl<'a, QS> SelectableExpression<QS> for VidColumn<'a> where Self: Expression {}

impl<'a, QS> AppearsOnTable<QS> for VidColumn<'a> where Self: Expression {}

impl<'a> Expression for VidColumn<'a> {
type SqlType = BigInt;
}

impl<'a> ValidGrouping<()> for VidColumn<'a> {
type IsAggregate = is_aggregate::No;
}

/// Generated by `Table.at_block`
#[derive(Debug, Clone, Copy)]
pub struct AtBlock<'a> {
Expand Down