From f2c8a261d3b9733badd50c07a8b59aba2ddc6f25 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 24 Mar 2025 16:21:34 -0700 Subject: [PATCH 1/5] store: Refactor handling of manifest indexes for ds copying --- store/postgres/src/dynds/private.rs | 73 ++++++++++++++++++----------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index e8e7f4ce992..f48192b45d4 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -1,4 +1,4 @@ -use std::ops::Bound; +use std::{collections::HashMap, ops::Bound}; use diesel::{ pg::sql_types, @@ -252,32 +252,11 @@ impl DataSourcesTable { .order_by(&self.vid) .load::(conn)?; + let manifest_map = + ManifestIdxMap::new(src_manifest_idx_and_name, dst_manifest_idx_and_name); let mut count = 0; - for (block_range, src_manifest_idx, param, context, causality_region, done_at) in src_tuples - { - let name = &src_manifest_idx_and_name - .iter() - .find(|(idx, _)| idx == &src_manifest_idx) - .with_context(|| { - anyhow!( - "the source {} does not have a template with index {}", - self.namespace, - src_manifest_idx - ) - })? - .1; - let dst_manifest_idx = dst_manifest_idx_and_name - .iter() - .find(|(_, n)| n == name) - .with_context(|| { - anyhow!( - "the destination {} is missing a template with name {}. The source {} created one at block {:?}", - dst.namespace, - name, self.namespace, block_range.0 - ) - })? - .0; - + for (block_range, src_idx, param, context, causality_region, done_at) in src_tuples { + let dst_idx = manifest_map.dst_idx(src_idx)?; let query = format!( "\ insert into {dst}(block_range, manifest_idx, param, context, causality_region, done_at) @@ -293,7 +272,7 @@ impl DataSourcesTable { count += sql_query(query) .bind::(target_block) .bind::, _>(block_range) - .bind::(dst_manifest_idx) + .bind::(dst_idx) .bind::, _>(param) .bind::, _>(context) .bind::(causality_region) @@ -361,3 +340,43 @@ impl DataSourcesTable { .optional()?) } } + +/// Map src manifest indexes to dst manifest indexes. If the +/// destination is missing an entry, put `None` as the value for the +/// source index +struct ManifestIdxMap<'a> { + map: HashMap, &'a String)>, +} + +impl<'a> ManifestIdxMap<'a> { + fn new(src: &'a [(i32, String)], dst: &'a [(i32, String)]) -> Self { + let map = src + .iter() + .map(|(src_idx, src_name)| { + ( + *src_idx, + ( + dst.iter() + .find(|(_, dst_name)| src_name == dst_name) + .map(|(dst_idx, _)| *dst_idx), + src_name, + ), + ) + }) + .collect(); + ManifestIdxMap { map } + } + + fn dst_idx(&self, src_idx: i32) -> Result { + let (dst_idx, name) = self.map.get(&src_idx).with_context(|| { + anyhow!("the source does not have a template with index {}", src_idx) + })?; + let dst_idx = dst_idx.with_context(|| { + anyhow!( + "the destination does not have a template with name {}", + name + ) + })?; + Ok(dst_idx) + } +} From a20cc8ad67e763b7798c0cd680fc0bd75a19e524 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 24 Mar 2025 18:05:59 -0700 Subject: [PATCH 2/5] store: Copy private data sources in batches For large numbers of data sources, the existing RBAR behavior can be very slow --- store/postgres/src/dynds/private.rs | 147 ++++++++++++++++------- store/postgres/src/relational_queries.rs | 2 +- 2 files changed, 106 insertions(+), 43 deletions(-) diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index f48192b45d4..50a433df006 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -1,8 +1,9 @@ use std::{collections::HashMap, ops::Bound}; use diesel::{ - pg::sql_types, + pg::{sql_types, Pg}, prelude::*, + query_builder::{AstPass, QueryFragment, QueryId}, sql_query, sql_types::{Binary, Bool, Integer, Jsonb, Nullable}, PgConnection, QueryDsl, RunQueryDsl, @@ -16,7 +17,7 @@ use graph::{ prelude::{serde_json, BlockNumber, StoreError}, }; -use crate::primary::Namespace; +use crate::{primary::Namespace, relational_queries::POSTGRES_MAX_PARAMETERS}; type DynTable = diesel_dynamic_schema::Table; type DynColumn = diesel_dynamic_schema::Column; @@ -226,16 +227,12 @@ impl DataSourcesTable { return Ok(count as usize); } - type Tuple = ( - (Bound, Bound), - i32, - Option>, - Option, - i32, - Option, - ); + let manifest_map = + ManifestIdxMap::new(src_manifest_idx_and_name, dst_manifest_idx_and_name); - let src_tuples = self + // Load all data sources that were created up to and including + // `target_block` and transform them ready for insertion + let dss: Vec<_> = self .table .clone() .filter( @@ -250,34 +247,18 @@ impl DataSourcesTable { &self.done_at, )) .order_by(&self.vid) - .load::(conn)?; + .load::(conn)? + .into_iter() + .map(|ds| ds.src_to_dst(target_block, &manifest_map)) + .collect::>()?; - let manifest_map = - ManifestIdxMap::new(src_manifest_idx_and_name, dst_manifest_idx_and_name); + // Split all dss into chunks so that we never use more than + // `POSTGRES_MAX_PARAMETERS` bind variables per chunk + let chunk_size = POSTGRES_MAX_PARAMETERS / CopyDsQuery::BIND_PARAMS; let mut count = 0; - for (block_range, src_idx, param, context, causality_region, done_at) in src_tuples { - let dst_idx = manifest_map.dst_idx(src_idx)?; - let query = format!( - "\ - insert into {dst}(block_range, manifest_idx, param, context, causality_region, done_at) - values(case - when upper($2) <= $1 then $2 - else int4range(lower($2), null) - end, - $3, $4, $5, $6, $7) - ", - dst = dst.qname - ); - - count += sql_query(query) - .bind::(target_block) - .bind::, _>(block_range) - .bind::(dst_idx) - .bind::, _>(param) - .bind::, _>(context) - .bind::(causality_region) - .bind::, _>(done_at) - .execute(conn)?; + for chunk in dss.chunks(chunk_size) { + let query = CopyDsQuery::new(dst, chunk)?; + count += query.execute(conn)?; } // If the manifest idxes remained constant, we can test that both tables have the same @@ -344,12 +325,12 @@ impl DataSourcesTable { /// Map src manifest indexes to dst manifest indexes. If the /// destination is missing an entry, put `None` as the value for the /// source index -struct ManifestIdxMap<'a> { - map: HashMap, &'a String)>, +struct ManifestIdxMap { + map: HashMap, String)>, } -impl<'a> ManifestIdxMap<'a> { - fn new(src: &'a [(i32, String)], dst: &'a [(i32, String)]) -> Self { +impl ManifestIdxMap { + fn new(src: &[(i32, String)], dst: &[(i32, String)]) -> Self { let map = src .iter() .map(|(src_idx, src_name)| { @@ -359,7 +340,7 @@ impl<'a> ManifestIdxMap<'a> { dst.iter() .find(|(_, dst_name)| src_name == dst_name) .map(|(dst_idx, _)| *dst_idx), - src_name, + src_name.to_string(), ), ) }) @@ -380,3 +361,85 @@ impl<'a> ManifestIdxMap<'a> { Ok(dst_idx) } } + +#[derive(Queryable)] +struct DsForCopy { + block_range: (Bound, Bound), + idx: i32, + param: Option>, + context: Option, + causality_region: i32, + done_at: Option, +} + +impl DsForCopy { + fn src_to_dst( + mut self, + target_block: BlockNumber, + map: &ManifestIdxMap, + ) -> Result { + // unclamp block range if it ends beyond target block + match self.block_range.1 { + Bound::Included(block) if block > target_block => self.block_range.1 = Bound::Unbounded, + _ => { /* use block range as is */ } + } + // Translate manifest index + self.idx = map.dst_idx(self.idx)?; + Ok(self) + } +} + +struct CopyDsQuery<'a> { + dst: &'a DataSourcesTable, + dss: &'a [DsForCopy], +} + +impl<'a> CopyDsQuery<'a> { + const BIND_PARAMS: usize = 6; + + fn new(dst: &'a DataSourcesTable, dss: &'a [DsForCopy]) -> Result { + Ok(CopyDsQuery { dst, dss }) + } +} + +impl<'a> QueryFragment for CopyDsQuery<'a> { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + out.push_sql("insert into "); + out.push_sql(&self.dst.qname); + out.push_sql( + "(block_range, manifest_idx, param, context, causality_region, done_at) values ", + ); + let mut first = true; + for ds in self.dss.iter() { + if first { + first = false; + } else { + out.push_sql(", "); + } + out.push_sql("("); + out.push_bind_param::, _>(&ds.block_range)?; + out.push_sql(", "); + out.push_bind_param::(&ds.idx)?; + out.push_sql(", "); + out.push_bind_param::, _>(&ds.param)?; + out.push_sql(", "); + out.push_bind_param::, _>(&ds.context)?; + out.push_sql(", "); + out.push_bind_param::(&ds.causality_region)?; + out.push_sql(", "); + out.push_bind_param::, _>(&ds.done_at)?; + out.push_sql(")"); + } + + Ok(()) + } +} + +impl<'a> QueryId for CopyDsQuery<'a> { + type QueryId = (); + + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl<'a, Conn> RunQueryDsl for CopyDsQuery<'a> {} diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index c6567c5d4f7..028f6044c34 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -53,7 +53,7 @@ use crate::{ const BASE_SQL_COLUMNS: [&str; 2] = ["id", "vid"]; /// The maximum number of bind variables that can be used in a query -const POSTGRES_MAX_PARAMETERS: usize = u16::MAX as usize; // 65535 +pub(crate) const POSTGRES_MAX_PARAMETERS: usize = u16::MAX as usize; // 65535 const SORT_KEY_COLUMN: &str = "sort_key$"; From 6e6ea3b0e370cdaa865e48d90ac819cecab3c6a4 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 24 Mar 2025 20:48:19 -0700 Subject: [PATCH 3/5] store: Provide more detail in errors from private data source copy --- store/postgres/src/dynds/private.rs | 30 ++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index 50a433df006..ebfd109b206 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, ops::Bound}; +use std::{collections::HashMap, i32, ops::Bound}; use diesel::{ pg::{sql_types, Pg}, @@ -249,7 +249,7 @@ impl DataSourcesTable { .order_by(&self.vid) .load::(conn)? .into_iter() - .map(|ds| ds.src_to_dst(target_block, &manifest_map)) + .map(|ds| ds.src_to_dst(target_block, &manifest_map, &self.namespace, &dst.namespace)) .collect::>()?; // Split all dss into chunks so that we never use more than @@ -348,14 +348,23 @@ impl ManifestIdxMap { ManifestIdxMap { map } } - fn dst_idx(&self, src_idx: i32) -> Result { + fn dst_idx( + &self, + src_idx: i32, + src_nsp: &Namespace, + src_created: BlockNumber, + dst_nsp: &Namespace, + ) -> Result { let (dst_idx, name) = self.map.get(&src_idx).with_context(|| { - anyhow!("the source does not have a template with index {}", src_idx) + anyhow!( + "the source {src_nsp} does not have a template with \ + index {src_idx} but created one at block {src_created}" + ) })?; let dst_idx = dst_idx.with_context(|| { anyhow!( - "the destination does not have a template with name {}", - name + "the destination {dst_nsp} is missing a template with \ + name {name}. The source {src_nsp} created one at block {src_created}" ) })?; Ok(dst_idx) @@ -377,6 +386,8 @@ impl DsForCopy { mut self, target_block: BlockNumber, map: &ManifestIdxMap, + src_nsp: &Namespace, + dst_nsp: &Namespace, ) -> Result { // unclamp block range if it ends beyond target block match self.block_range.1 { @@ -384,7 +395,12 @@ impl DsForCopy { _ => { /* use block range as is */ } } // Translate manifest index - self.idx = map.dst_idx(self.idx)?; + let src_created = match self.block_range.0 { + Bound::Included(block) => block, + Bound::Excluded(block) => block + 1, + Bound::Unbounded => i32::MAX, + }; + self.idx = map.dst_idx(self.idx, src_nsp, src_created, dst_nsp)?; Ok(self) } } From aa43630155ccf46c1c994183132209145fc2ac15 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 27 Mar 2025 16:57:13 -0700 Subject: [PATCH 4/5] store: Make ManifestIdxMap::new a little more efficient --- store/postgres/src/dynds/private.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index ebfd109b206..e22d58f4fae 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -331,17 +331,14 @@ struct ManifestIdxMap { impl ManifestIdxMap { fn new(src: &[(i32, String)], dst: &[(i32, String)]) -> Self { + let dst_idx_map: HashMap<&String, i32> = + HashMap::from_iter(dst.iter().map(|(idx, name)| (name, *idx))); let map = src .iter() .map(|(src_idx, src_name)| { ( *src_idx, - ( - dst.iter() - .find(|(_, dst_name)| src_name == dst_name) - .map(|(dst_idx, _)| *dst_idx), - src_name.to_string(), - ), + (dst_idx_map.get(src_name).copied(), src_name.to_string()), ) }) .collect(); From 776afa14c324614abc84a72587f199cd7681153a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 27 Mar 2025 17:05:31 -0700 Subject: [PATCH 5/5] store: Fix handling of bounds in DsForCopy::src_to_dst --- store/postgres/src/dynds/private.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index e22d58f4fae..243a7dc5a57 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -389,13 +389,16 @@ impl DsForCopy { // unclamp block range if it ends beyond target block match self.block_range.1 { Bound::Included(block) if block > target_block => self.block_range.1 = Bound::Unbounded, + Bound::Excluded(block) if block - 1 > target_block => { + self.block_range.1 = Bound::Unbounded + } _ => { /* use block range as is */ } } // Translate manifest index let src_created = match self.block_range.0 { Bound::Included(block) => block, Bound::Excluded(block) => block + 1, - Bound::Unbounded => i32::MAX, + Bound::Unbounded => 0, }; self.idx = map.dst_idx(self.idx, src_nsp, src_created, dst_nsp)?; Ok(self)