diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 9b64390581b..f2f7e9f1d66 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -33,7 +33,7 @@ use graph::{ use itertools::Itertools; use crate::{ - advisory_lock, catalog, + advisory_lock, catalog, deployment, dynds::DataSourcesTable, primary::{DeploymentId, Site}, relational::index::IndexList, @@ -208,17 +208,17 @@ impl CopyState { }) }) .collect::>()?; - tables.sort_by_key(|table| table.batch.dst.object.to_string()); + tables.sort_by_key(|table| table.dst.object.to_string()); let values = tables .iter() .map(|table| { ( - cts::entity_type.eq(table.batch.dst.object.as_str()), + cts::entity_type.eq(table.dst.object.as_str()), cts::dst.eq(dst.site.id), - cts::next_vid.eq(table.batch.next_vid()), - cts::target_vid.eq(table.batch.target_vid()), - cts::batch_size.eq(table.batch.batch_size()), + cts::next_vid.eq(table.batcher.next_vid()), + cts::target_vid.eq(table.batcher.target_vid()), + cts::batch_size.eq(table.batcher.batch_size() as i64), ) }) .collect::>(); @@ -294,51 +294,11 @@ pub(crate) fn source( /// so that we can copy rows from one to the other with very little /// transformation. See `CopyEntityBatchQuery` for the details of what /// exactly that means -pub(crate) struct BatchCopy { +struct TableState { src: Arc, dst: Arc
, - batcher: VidBatcher, -} - -impl BatchCopy { - pub fn new(batcher: VidBatcher, src: Arc
, dst: Arc
) -> Self { - Self { src, dst, batcher } - } - - /// Copy one batch of entities and update internal state so that the - /// next call to `run` will copy the next batch - pub fn run(&mut self, conn: &mut PgConnection) -> Result { - let (duration, _) = self.batcher.step(|start, end| { - rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? - .execute(conn)?; - Ok(()) - })?; - - Ok(duration) - } - - pub fn finished(&self) -> bool { - self.batcher.finished() - } - - /// The first `vid` that has not been copied yet - pub fn next_vid(&self) -> i64 { - self.batcher.next_vid() - } - - /// The last `vid` that should be copied - pub fn target_vid(&self) -> i64 { - self.batcher.target_vid() - } - - pub fn batch_size(&self) -> i64 { - self.batcher.batch_size() as i64 - } -} - -struct TableState { - batch: BatchCopy, dst_site: Arc, + batcher: VidBatcher, duration_ms: i64, } @@ -354,14 +314,16 @@ impl TableState { let vid_range = VidRange::for_copy(conn, &src, target_block)?; let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?; Ok(Self { - batch: BatchCopy::new(batcher, src, dst), + src, + dst, dst_site, + batcher, duration_ms: 0, }) } fn finished(&self) -> bool { - self.batch.finished() + self.batcher.finished() } fn load( @@ -427,11 +389,12 @@ impl TableState { VidRange::new(current_vid, target_vid), )? .with_batch_size(size as usize); - let batch = BatchCopy::new(batcher, src, dst); Ok(TableState { - batch, + src, + dst, dst_site: dst_layout.site.clone(), + batcher, duration_ms, }) } @@ -460,20 +423,20 @@ impl TableState { update( cts::table .filter(cts::dst.eq(self.dst_site.id)) - .filter(cts::entity_type.eq(self.batch.dst.object.as_str())) + .filter(cts::entity_type.eq(self.dst.object.as_str())) .filter(cts::duration_ms.eq(0)), ) .set(cts::started_at.eq(sql("now()"))) .execute(conn)?; let values = ( - cts::next_vid.eq(self.batch.next_vid()), - cts::batch_size.eq(self.batch.batch_size()), + cts::next_vid.eq(self.batcher.next_vid()), + cts::batch_size.eq(self.batcher.batch_size() as i64), cts::duration_ms.eq(self.duration_ms), ); update( cts::table .filter(cts::dst.eq(self.dst_site.id)) - .filter(cts::entity_type.eq(self.batch.dst.object.as_str())), + .filter(cts::entity_type.eq(self.dst.object.as_str())), ) .set(values) .execute(conn)?; @@ -486,7 +449,7 @@ impl TableState { update( cts::table .filter(cts::dst.eq(self.dst_site.id)) - .filter(cts::entity_type.eq(self.batch.dst.object.as_str())), + .filter(cts::entity_type.eq(self.dst.object.as_str())), ) .set(cts::finished_at.eq(sql("now()"))) .execute(conn)?; @@ -512,7 +475,17 @@ impl TableState { } fn copy_batch(&mut self, conn: &mut PgConnection) -> Result { - let duration = self.batch.run(conn)?; + let (duration, count) = self.batcher.step(|start, end| { + let count = rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? + .count_current() + .get_result::(conn) + .optional()?; + Ok(count.unwrap_or(0) as i32) + })?; + + let count = count.unwrap_or(0); + + deployment::update_entity_count(conn, &self.dst_site, count)?; self.record_progress(conn, duration)?; @@ -539,12 +512,12 @@ impl<'a> CopyProgress<'a> { let target_vid: i64 = state .tables .iter() - .map(|table| table.batch.target_vid()) + .map(|table| table.batcher.target_vid()) .sum(); let current_vid = state .tables .iter() - .map(|table| table.batch.next_vid()) + .map(|table| table.batcher.next_vid()) .sum(); Self { logger, @@ -577,23 +550,23 @@ impl<'a> CopyProgress<'a> { } } - fn update(&mut self, batch: &BatchCopy) { + fn update(&mut self, entity_type: &EntityType, batcher: &VidBatcher) { if self.last_log.elapsed() > LOG_INTERVAL { info!( self.logger, "Copied {:.2}% of `{}` entities ({}/{} entity versions), {:.2}% of overall data", - Self::progress_pct(batch.next_vid(), batch.target_vid()), - batch.dst.object, - batch.next_vid(), - batch.target_vid(), - Self::progress_pct(self.current_vid + batch.next_vid(), self.target_vid) + Self::progress_pct(batcher.next_vid(), batcher.target_vid()), + entity_type, + batcher.next_vid(), + batcher.target_vid(), + Self::progress_pct(self.current_vid + batcher.next_vid(), self.target_vid) ); self.last_log = Instant::now(); } } - fn table_finished(&mut self, batch: &BatchCopy) { - self.current_vid += batch.next_vid(); + fn table_finished(&mut self, batcher: &VidBatcher) { + self.current_vid += batcher.next_vid(); } fn finished(&self) { @@ -728,9 +701,9 @@ impl Connection { if status == Status::Cancelled { return Ok(status); } - progress.update(&table.batch); + progress.update(&table.dst.object, &table.batcher); } - progress.table_finished(&table.batch); + progress.table_finished(&table.batcher); } // Create indexes for all the attributes that were postponed at the start of @@ -740,8 +713,8 @@ impl Connection { for table in state.tables.iter() { let arr = index_list.indexes_for_table( &self.dst.site.namespace, - &table.batch.src.name.to_string(), - &table.batch.dst, + &table.src.name.to_string(), + &table.dst, true, true, )?; @@ -756,18 +729,12 @@ impl Connection { // Here we need to skip those created in the first step for the old fields. for table in state.tables.iter() { let orig_colums = table - .batch .src .columns .iter() .map(|c| c.name.to_string()) .collect_vec(); - for sql in table - .batch - .dst - .create_postponed_indexes(orig_colums) - .into_iter() - { + for sql in table.dst.create_postponed_indexes(orig_colums).into_iter() { let query = sql_query(sql); query.execute(conn)?; } diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 836048912b1..92181ac5a6c 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -1249,17 +1249,12 @@ pub fn update_entity_count( Ok(()) } -/// Set the deployment's entity count to whatever `full_count_query` produces -pub fn set_entity_count( - conn: &mut PgConnection, - site: &Site, - full_count_query: &str, -) -> Result<(), StoreError> { +/// Set the deployment's entity count back to `0` +pub fn clear_entity_count(conn: &mut PgConnection, site: &Site) -> Result<(), StoreError> { use subgraph_deployment as d; - let full_count_query = format!("({})", full_count_query); update(d::table.filter(d::id.eq(site.id))) - .set(d::entity_count.eq(sql(&full_count_query))) + .set(d::entity_count.eq(BigDecimal::from(0))) .execute(conn)?; Ok(()) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 92b1a27a7e5..05f2b44a33e 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1306,7 +1306,8 @@ impl DeploymentStore { let layout = self.layout(conn, site.clone())?; if truncate { - deployment::set_entity_count(conn, site.as_ref(), layout.count_query.as_str())?; + layout.truncate_tables(conn)?; + deployment::clear_entity_count(conn, site.as_ref())?; } else { let count = layout.revert_block(conn, block)?; deployment::update_entity_count(conn, site.as_ref(), count)?; @@ -1570,13 +1571,10 @@ impl DeploymentStore { .number .checked_add(1) .expect("block numbers fit into an i32"); - dst.revert_block(conn, block_to_revert)?; - info!(logger, "Rewound subgraph to block {}", block.number; - "time_ms" => start.elapsed().as_millis()); + let count = dst.revert_block(conn, block_to_revert)?; + deployment::update_entity_count(conn, &dst.site, count)?; - let start = Instant::now(); - deployment::set_entity_count(conn, &dst.site, &dst.count_query)?; - info!(logger, "Counted the entities"; + info!(logger, "Rewound subgraph to block {}", block.number; "time_ms" => start.elapsed().as_millis()); deployment::set_history_blocks( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index b12cf790697..c1e7bceef60 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -73,7 +73,7 @@ use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR}; use graph::data::subgraph::schema::POI_TABLE; use graph::prelude::{ anyhow, info, BlockNumber, DeploymentHash, Entity, EntityOperation, Logger, - QueryExecutionError, StoreError, StoreEvent, ValueType, BLOCK_NUMBER_MAX, + QueryExecutionError, StoreError, ValueType, }; use crate::block_range::{BoundSide, BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; @@ -231,8 +231,6 @@ pub struct Layout { pub tables: HashMap>, /// The database schema for this subgraph pub catalog: Catalog, - /// The query to count all entities - pub count_query: String, /// How many blocks of history the subgraph should keep pub history_blocks: BlockNumber, @@ -290,25 +288,6 @@ impl Layout { )) } - let count_query = tables - .iter() - .map(|table| { - if table.immutable { - format!( - "select count(*) from \"{}\".\"{}\"", - &catalog.site.namespace, table.name - ) - } else { - format!( - "select count(*) from \"{}\".\"{}\" where block_range @> {}", - &catalog.site.namespace, table.name, BLOCK_NUMBER_MAX - ) - } - }) - .collect::>() - .join("\nunion all\n"); - let count_query = format!("select sum(e.count) from ({}) e", count_query); - let tables: HashMap<_, _> = tables .into_iter() .fold(HashMap::new(), |mut tables, table| { @@ -322,7 +301,6 @@ impl Layout { site, catalog, tables, - count_query, history_blocks: i32::MAX, input_schema: schema.cheap_clone(), rollups, @@ -1026,17 +1004,20 @@ impl Layout { Ok(count) } - pub fn truncate_tables(&self, conn: &mut PgConnection) -> Result { + pub fn truncate_tables(&self, conn: &mut PgConnection) -> Result<(), StoreError> { for table in self.tables.values() { sql_query(&format!("TRUNCATE TABLE {}", table.qualified_name)).execute(conn)?; } - Ok(StoreEvent::new(vec![])) + Ok(()) } /// Revert the block with number `block` and all blocks with higher /// numbers. After this operation, only entity versions inserted or /// updated at blocks with numbers strictly lower than `block` will /// remain + /// + /// The `i32` that is returned is the amount by which the entity count + /// for the subgraph needs to be adjusted pub fn revert_block( &self, conn: &mut PgConnection, diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index f4b55e89150..19f9400c470 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -4799,6 +4799,10 @@ impl<'a> CopyEntityBatchQuery<'a> { last_vid, }) } + + pub fn count_current(self) -> CountCurrentVersionsQuery<'a> { + CountCurrentVersionsQuery::new(self) + } } impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { @@ -4810,6 +4814,8 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { // Construct a query // insert into {dst}({columns}) // select {columns} from {src} + // where vid >= {first_vid} and vid <= {last_vid} + // returning {upper_inf(block_range)|true} out.push_sql("insert into "); out.push_sql(self.dst.qualified_name.as_str()); out.push_sql("("); @@ -4905,6 +4911,12 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { out.push_bind_param::(&self.first_vid)?; out.push_sql(" and vid <= "); out.push_bind_param::(&self.last_vid)?; + out.push_sql("\n returning "); + if self.dst.immutable { + out.push_sql("true"); + } else { + out.push_sql(BLOCK_RANGE_CURRENT); + } Ok(()) } } @@ -4917,6 +4929,40 @@ impl<'a> QueryId for CopyEntityBatchQuery<'a> { impl<'a, Conn> RunQueryDsl for CopyEntityBatchQuery<'a> {} +#[derive(Debug, Clone)] +pub struct CountCurrentVersionsQuery<'a> { + copy: CopyEntityBatchQuery<'a>, +} + +impl<'a> CountCurrentVersionsQuery<'a> { + pub fn new(copy: CopyEntityBatchQuery<'a>) -> Self { + Self { copy } + } +} +impl<'a> QueryFragment for CountCurrentVersionsQuery<'a> { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + // Generate a query + // with copy_cte as ( {copy} ) + // select count(*) from copy_cte where {block_range_current} + out.push_sql("with copy_cte(current) as ("); + self.copy.walk_ast(out.reborrow())?; + out.push_sql(")\nselect count(*) from copy_cte where current"); + Ok(()) + } +} + +impl<'a> QueryId for CountCurrentVersionsQuery<'a> { + type QueryId = (); + + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl<'a> Query for CountCurrentVersionsQuery<'a> { + type SqlType = BigInt; +} + +impl<'a, Conn> RunQueryDsl for CountCurrentVersionsQuery<'a> {} + /// Helper struct for returning the id's touched by the RevertRemove and /// RevertExtend queries #[derive(QueryableByName, PartialEq, Eq, Hash)]