From 36e3ad5af490db571e3beaac7e3555a9430dd0f5 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 12 Feb 2025 10:16:06 -0800 Subject: [PATCH 1/5] store: Fold BatchCopy into TableState With recent changes, BatchCopy has become unnecessary --- store/postgres/src/copy.rs | 117 +++++++++++++------------------------ 1 file changed, 39 insertions(+), 78 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 9b64390581b..45d0240de7f 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -208,17 +208,17 @@ impl CopyState { }) }) .collect::>()?; - tables.sort_by_key(|table| table.batch.dst.object.to_string()); + tables.sort_by_key(|table| table.dst.object.to_string()); let values = tables .iter() .map(|table| { ( - cts::entity_type.eq(table.batch.dst.object.as_str()), + cts::entity_type.eq(table.dst.object.as_str()), cts::dst.eq(dst.site.id), - cts::next_vid.eq(table.batch.next_vid()), - cts::target_vid.eq(table.batch.target_vid()), - cts::batch_size.eq(table.batch.batch_size()), + cts::next_vid.eq(table.batcher.next_vid()), + cts::target_vid.eq(table.batcher.target_vid()), + cts::batch_size.eq(table.batcher.batch_size() as i64), ) }) .collect::>(); @@ -294,51 +294,11 @@ pub(crate) fn source( /// so that we can copy rows from one to the other with very little /// transformation. See `CopyEntityBatchQuery` for the details of what /// exactly that means -pub(crate) struct BatchCopy { +struct TableState { src: Arc, dst: Arc
, - batcher: VidBatcher, -} - -impl BatchCopy { - pub fn new(batcher: VidBatcher, src: Arc
, dst: Arc
) -> Self { - Self { src, dst, batcher } - } - - /// Copy one batch of entities and update internal state so that the - /// next call to `run` will copy the next batch - pub fn run(&mut self, conn: &mut PgConnection) -> Result { - let (duration, _) = self.batcher.step(|start, end| { - rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? - .execute(conn)?; - Ok(()) - })?; - - Ok(duration) - } - - pub fn finished(&self) -> bool { - self.batcher.finished() - } - - /// The first `vid` that has not been copied yet - pub fn next_vid(&self) -> i64 { - self.batcher.next_vid() - } - - /// The last `vid` that should be copied - pub fn target_vid(&self) -> i64 { - self.batcher.target_vid() - } - - pub fn batch_size(&self) -> i64 { - self.batcher.batch_size() as i64 - } -} - -struct TableState { - batch: BatchCopy, dst_site: Arc, + batcher: VidBatcher, duration_ms: i64, } @@ -354,14 +314,16 @@ impl TableState { let vid_range = VidRange::for_copy(conn, &src, target_block)?; let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?; Ok(Self { - batch: BatchCopy::new(batcher, src, dst), + src, + dst, dst_site, + batcher, duration_ms: 0, }) } fn finished(&self) -> bool { - self.batch.finished() + self.batcher.finished() } fn load( @@ -427,11 +389,12 @@ impl TableState { VidRange::new(current_vid, target_vid), )? .with_batch_size(size as usize); - let batch = BatchCopy::new(batcher, src, dst); Ok(TableState { - batch, + src, + dst, dst_site: dst_layout.site.clone(), + batcher, duration_ms, }) } @@ -460,20 +423,20 @@ impl TableState { update( cts::table .filter(cts::dst.eq(self.dst_site.id)) - .filter(cts::entity_type.eq(self.batch.dst.object.as_str())) + .filter(cts::entity_type.eq(self.dst.object.as_str())) .filter(cts::duration_ms.eq(0)), ) .set(cts::started_at.eq(sql("now()"))) .execute(conn)?; let values = ( - cts::next_vid.eq(self.batch.next_vid()), - cts::batch_size.eq(self.batch.batch_size()), + cts::next_vid.eq(self.batcher.next_vid()), + cts::batch_size.eq(self.batcher.batch_size() as i64), cts::duration_ms.eq(self.duration_ms), ); update( cts::table .filter(cts::dst.eq(self.dst_site.id)) - .filter(cts::entity_type.eq(self.batch.dst.object.as_str())), + .filter(cts::entity_type.eq(self.dst.object.as_str())), ) .set(values) .execute(conn)?; @@ -486,7 +449,7 @@ impl TableState { update( cts::table .filter(cts::dst.eq(self.dst_site.id)) - .filter(cts::entity_type.eq(self.batch.dst.object.as_str())), + .filter(cts::entity_type.eq(self.dst.object.as_str())), ) .set(cts::finished_at.eq(sql("now()"))) .execute(conn)?; @@ -512,7 +475,11 @@ impl TableState { } fn copy_batch(&mut self, conn: &mut PgConnection) -> Result { - let duration = self.batch.run(conn)?; + let (duration, _) = self.batcher.step(|start, end| { + rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? + .execute(conn)?; + Ok(()) + })?; self.record_progress(conn, duration)?; @@ -539,12 +506,12 @@ impl<'a> CopyProgress<'a> { let target_vid: i64 = state .tables .iter() - .map(|table| table.batch.target_vid()) + .map(|table| table.batcher.target_vid()) .sum(); let current_vid = state .tables .iter() - .map(|table| table.batch.next_vid()) + .map(|table| table.batcher.next_vid()) .sum(); Self { logger, @@ -577,23 +544,23 @@ impl<'a> CopyProgress<'a> { } } - fn update(&mut self, batch: &BatchCopy) { + fn update(&mut self, entity_type: &EntityType, batcher: &VidBatcher) { if self.last_log.elapsed() > LOG_INTERVAL { info!( self.logger, "Copied {:.2}% of `{}` entities ({}/{} entity versions), {:.2}% of overall data", - Self::progress_pct(batch.next_vid(), batch.target_vid()), - batch.dst.object, - batch.next_vid(), - batch.target_vid(), - Self::progress_pct(self.current_vid + batch.next_vid(), self.target_vid) + Self::progress_pct(batcher.next_vid(), batcher.target_vid()), + entity_type, + batcher.next_vid(), + batcher.target_vid(), + Self::progress_pct(self.current_vid + batcher.next_vid(), self.target_vid) ); self.last_log = Instant::now(); } } - fn table_finished(&mut self, batch: &BatchCopy) { - self.current_vid += batch.next_vid(); + fn table_finished(&mut self, batcher: &VidBatcher) { + self.current_vid += batcher.next_vid(); } fn finished(&self) { @@ -728,9 +695,9 @@ impl Connection { if status == Status::Cancelled { return Ok(status); } - progress.update(&table.batch); + progress.update(&table.dst.object, &table.batcher); } - progress.table_finished(&table.batch); + progress.table_finished(&table.batcher); } // Create indexes for all the attributes that were postponed at the start of @@ -740,8 +707,8 @@ impl Connection { for table in state.tables.iter() { let arr = index_list.indexes_for_table( &self.dst.site.namespace, - &table.batch.src.name.to_string(), - &table.batch.dst, + &table.src.name.to_string(), + &table.dst, true, true, )?; @@ -756,18 +723,12 @@ impl Connection { // Here we need to skip those created in the first step for the old fields. for table in state.tables.iter() { let orig_colums = table - .batch .src .columns .iter() .map(|c| c.name.to_string()) .collect_vec(); - for sql in table - .batch - .dst - .create_postponed_indexes(orig_colums) - .into_iter() - { + for sql in table.dst.create_postponed_indexes(orig_colums).into_iter() { let query = sql_query(sql); query.execute(conn)?; } From b089776a55528fd746cb9c7b965092253adf3f26 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 12 Feb 2025 11:25:49 -0800 Subject: [PATCH 2/5] store: Count entities during copying We used to count entities after all the data had been copied, but that is a very slow operation for large subgraphs; that in turn can lead to bad side-effects like connection timeouts and the copy ultimately failing. We now keep track of the number of current entity versions that we copy with each batch and calculate the entity count incrementally --- store/postgres/src/copy.rs | 16 ++++++--- store/postgres/src/deployment_store.rs | 9 ++--- store/postgres/src/relational.rs | 3 ++ store/postgres/src/relational_queries.rs | 46 ++++++++++++++++++++++++ 4 files changed, 63 insertions(+), 11 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 45d0240de7f..f2f7e9f1d66 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -33,7 +33,7 @@ use graph::{ use itertools::Itertools; use crate::{ - advisory_lock, catalog, + advisory_lock, catalog, deployment, dynds::DataSourcesTable, primary::{DeploymentId, Site}, relational::index::IndexList, @@ -475,12 +475,18 @@ impl TableState { } fn copy_batch(&mut self, conn: &mut PgConnection) -> Result { - let (duration, _) = self.batcher.step(|start, end| { - rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? - .execute(conn)?; - Ok(()) + let (duration, count) = self.batcher.step(|start, end| { + let count = rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? + .count_current() + .get_result::(conn) + .optional()?; + Ok(count.unwrap_or(0) as i32) })?; + let count = count.unwrap_or(0); + + deployment::update_entity_count(conn, &self.dst_site, count)?; + self.record_progress(conn, duration)?; if self.finished() { diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 92b1a27a7e5..e4878b82c9c 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1570,13 +1570,10 @@ impl DeploymentStore { .number .checked_add(1) .expect("block numbers fit into an i32"); - dst.revert_block(conn, block_to_revert)?; - info!(logger, "Rewound subgraph to block {}", block.number; - "time_ms" => start.elapsed().as_millis()); + let (_, count) = dst.revert_block(conn, block_to_revert)?; + deployment::update_entity_count(conn, &dst.site, count)?; - let start = Instant::now(); - deployment::set_entity_count(conn, &dst.site, &dst.count_query)?; - info!(logger, "Counted the entities"; + info!(logger, "Rewound subgraph to block {}", block.number; "time_ms" => start.elapsed().as_millis()); deployment::set_history_blocks( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index b12cf790697..74cb31f5b4d 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -1037,6 +1037,9 @@ impl Layout { /// numbers. After this operation, only entity versions inserted or /// updated at blocks with numbers strictly lower than `block` will /// remain + /// + /// The `i32` that is returned is the amount by which the entity count + /// for the subgraph needs to be adjusted pub fn revert_block( &self, conn: &mut PgConnection, diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index f4b55e89150..19f9400c470 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -4799,6 +4799,10 @@ impl<'a> CopyEntityBatchQuery<'a> { last_vid, }) } + + pub fn count_current(self) -> CountCurrentVersionsQuery<'a> { + CountCurrentVersionsQuery::new(self) + } } impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { @@ -4810,6 +4814,8 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { // Construct a query // insert into {dst}({columns}) // select {columns} from {src} + // where vid >= {first_vid} and vid <= {last_vid} + // returning {upper_inf(block_range)|true} out.push_sql("insert into "); out.push_sql(self.dst.qualified_name.as_str()); out.push_sql("("); @@ -4905,6 +4911,12 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { out.push_bind_param::(&self.first_vid)?; out.push_sql(" and vid <= "); out.push_bind_param::(&self.last_vid)?; + out.push_sql("\n returning "); + if self.dst.immutable { + out.push_sql("true"); + } else { + out.push_sql(BLOCK_RANGE_CURRENT); + } Ok(()) } } @@ -4917,6 +4929,40 @@ impl<'a> QueryId for CopyEntityBatchQuery<'a> { impl<'a, Conn> RunQueryDsl for CopyEntityBatchQuery<'a> {} +#[derive(Debug, Clone)] +pub struct CountCurrentVersionsQuery<'a> { + copy: CopyEntityBatchQuery<'a>, +} + +impl<'a> CountCurrentVersionsQuery<'a> { + pub fn new(copy: CopyEntityBatchQuery<'a>) -> Self { + Self { copy } + } +} +impl<'a> QueryFragment for CountCurrentVersionsQuery<'a> { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + // Generate a query + // with copy_cte as ( {copy} ) + // select count(*) from copy_cte where {block_range_current} + out.push_sql("with copy_cte(current) as ("); + self.copy.walk_ast(out.reborrow())?; + out.push_sql(")\nselect count(*) from copy_cte where current"); + Ok(()) + } +} + +impl<'a> QueryId for CountCurrentVersionsQuery<'a> { + type QueryId = (); + + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl<'a> Query for CountCurrentVersionsQuery<'a> { + type SqlType = BigInt; +} + +impl<'a, Conn> RunQueryDsl for CountCurrentVersionsQuery<'a> {} + /// Helper struct for returning the id's touched by the RevertRemove and /// RevertExtend queries #[derive(QueryableByName, PartialEq, Eq, Hash)] From d36df68ff79a1ddad3317a43da3f36b6b19ceb60 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 12 Feb 2025 11:31:22 -0800 Subject: [PATCH 3/5] store: Clear the entity count when truncating a deployment We used to count entities, but we know the result is 0 because all the tables we are counting are empty --- store/postgres/src/deployment.rs | 11 +++-------- store/postgres/src/deployment_store.rs | 5 +++-- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 836048912b1..92181ac5a6c 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -1249,17 +1249,12 @@ pub fn update_entity_count( Ok(()) } -/// Set the deployment's entity count to whatever `full_count_query` produces -pub fn set_entity_count( - conn: &mut PgConnection, - site: &Site, - full_count_query: &str, -) -> Result<(), StoreError> { +/// Set the deployment's entity count back to `0` +pub fn clear_entity_count(conn: &mut PgConnection, site: &Site) -> Result<(), StoreError> { use subgraph_deployment as d; - let full_count_query = format!("({})", full_count_query); update(d::table.filter(d::id.eq(site.id))) - .set(d::entity_count.eq(sql(&full_count_query))) + .set(d::entity_count.eq(BigDecimal::from(0))) .execute(conn)?; Ok(()) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index e4878b82c9c..05f2b44a33e 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1306,7 +1306,8 @@ impl DeploymentStore { let layout = self.layout(conn, site.clone())?; if truncate { - deployment::set_entity_count(conn, site.as_ref(), layout.count_query.as_str())?; + layout.truncate_tables(conn)?; + deployment::clear_entity_count(conn, site.as_ref())?; } else { let count = layout.revert_block(conn, block)?; deployment::update_entity_count(conn, site.as_ref(), count)?; @@ -1570,7 +1571,7 @@ impl DeploymentStore { .number .checked_add(1) .expect("block numbers fit into an i32"); - let (_, count) = dst.revert_block(conn, block_to_revert)?; + let count = dst.revert_block(conn, block_to_revert)?; deployment::update_entity_count(conn, &dst.site, count)?; info!(logger, "Rewound subgraph to block {}", block.number; From 9886aa181bc0a63fb457a4a44c7e0a6c885a5a1d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 12 Feb 2025 11:33:18 -0800 Subject: [PATCH 4/5] store: Remove unused Layout.count_query --- store/postgres/src/relational.rs | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 74cb31f5b4d..8a8f47b6560 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -73,7 +73,7 @@ use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR}; use graph::data::subgraph::schema::POI_TABLE; use graph::prelude::{ anyhow, info, BlockNumber, DeploymentHash, Entity, EntityOperation, Logger, - QueryExecutionError, StoreError, StoreEvent, ValueType, BLOCK_NUMBER_MAX, + QueryExecutionError, StoreError, StoreEvent, ValueType, }; use crate::block_range::{BoundSide, BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; @@ -231,8 +231,6 @@ pub struct Layout { pub tables: HashMap>, /// The database schema for this subgraph pub catalog: Catalog, - /// The query to count all entities - pub count_query: String, /// How many blocks of history the subgraph should keep pub history_blocks: BlockNumber, @@ -290,25 +288,6 @@ impl Layout { )) } - let count_query = tables - .iter() - .map(|table| { - if table.immutable { - format!( - "select count(*) from \"{}\".\"{}\"", - &catalog.site.namespace, table.name - ) - } else { - format!( - "select count(*) from \"{}\".\"{}\" where block_range @> {}", - &catalog.site.namespace, table.name, BLOCK_NUMBER_MAX - ) - } - }) - .collect::>() - .join("\nunion all\n"); - let count_query = format!("select sum(e.count) from ({}) e", count_query); - let tables: HashMap<_, _> = tables .into_iter() .fold(HashMap::new(), |mut tables, table| { @@ -322,7 +301,6 @@ impl Layout { site, catalog, tables, - count_query, history_blocks: i32::MAX, input_schema: schema.cheap_clone(), rollups, From 5a1bb5e2c6d04a816e890243781e345a6971e50a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 4 Mar 2025 18:38:47 +0100 Subject: [PATCH 5/5] store: Do not return a StoreEvent from Layout.truncate_tables --- store/postgres/src/relational.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 8a8f47b6560..c1e7bceef60 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -73,7 +73,7 @@ use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR}; use graph::data::subgraph::schema::POI_TABLE; use graph::prelude::{ anyhow, info, BlockNumber, DeploymentHash, Entity, EntityOperation, Logger, - QueryExecutionError, StoreError, StoreEvent, ValueType, + QueryExecutionError, StoreError, ValueType, }; use crate::block_range::{BoundSide, BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; @@ -1004,11 +1004,11 @@ impl Layout { Ok(count) } - pub fn truncate_tables(&self, conn: &mut PgConnection) -> Result { + pub fn truncate_tables(&self, conn: &mut PgConnection) -> Result<(), StoreError> { for table in self.tables.values() { sql_query(&format!("TRUNCATE TABLE {}", table.qualified_name)).execute(conn)?; } - Ok(StoreEvent::new(vec![])) + Ok(()) } /// Revert the block with number `block` and all blocks with higher