Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 157 additions & 59 deletions store/postgres/src/dynds/private.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::ops::Bound;
use std::{collections::HashMap, i32, ops::Bound};

use diesel::{
pg::sql_types,
pg::{sql_types, Pg},
prelude::*,
query_builder::{AstPass, QueryFragment, QueryId},
sql_query,
sql_types::{Binary, Bool, Integer, Jsonb, Nullable},
PgConnection, QueryDsl, RunQueryDsl,
Expand All @@ -16,7 +17,7 @@ use graph::{
prelude::{serde_json, BlockNumber, StoreError},
};

use crate::primary::Namespace;
use crate::{primary::Namespace, relational_queries::POSTGRES_MAX_PARAMETERS};

type DynTable = diesel_dynamic_schema::Table<String, Namespace>;
type DynColumn<ST> = diesel_dynamic_schema::Column<DynTable, &'static str, ST>;
Expand Down Expand Up @@ -226,16 +227,12 @@ impl DataSourcesTable {
return Ok(count as usize);
}

type Tuple = (
(Bound<i32>, Bound<i32>),
i32,
Option<Vec<u8>>,
Option<serde_json::Value>,
i32,
Option<i32>,
);
let manifest_map =
ManifestIdxMap::new(src_manifest_idx_and_name, dst_manifest_idx_and_name);

let src_tuples = self
// Load all data sources that were created up to and including
// `target_block` and transform them ready for insertion
let dss: Vec<_> = self
.table
.clone()
.filter(
Expand All @@ -250,55 +247,18 @@ impl DataSourcesTable {
&self.done_at,
))
.order_by(&self.vid)
.load::<Tuple>(conn)?;
.load::<DsForCopy>(conn)?
.into_iter()
.map(|ds| ds.src_to_dst(target_block, &manifest_map, &self.namespace, &dst.namespace))
.collect::<Result<_, _>>()?;

// Split all dss into chunks so that we never use more than
// `POSTGRES_MAX_PARAMETERS` bind variables per chunk
let chunk_size = POSTGRES_MAX_PARAMETERS / CopyDsQuery::BIND_PARAMS;
let mut count = 0;
for (block_range, src_manifest_idx, param, context, causality_region, done_at) in src_tuples
{
let name = &src_manifest_idx_and_name
.iter()
.find(|(idx, _)| idx == &src_manifest_idx)
.with_context(|| {
anyhow!(
"the source {} does not have a template with index {}",
self.namespace,
src_manifest_idx
)
})?
.1;
let dst_manifest_idx = dst_manifest_idx_and_name
.iter()
.find(|(_, n)| n == name)
.with_context(|| {
anyhow!(
"the destination {} is missing a template with name {}. The source {} created one at block {:?}",
dst.namespace,
name, self.namespace, block_range.0
)
})?
.0;

let query = format!(
"\
insert into {dst}(block_range, manifest_idx, param, context, causality_region, done_at)
values(case
when upper($2) <= $1 then $2
else int4range(lower($2), null)
end,
$3, $4, $5, $6, $7)
",
dst = dst.qname
);

count += sql_query(query)
.bind::<Integer, _>(target_block)
.bind::<sql_types::Range<Integer>, _>(block_range)
.bind::<Integer, _>(dst_manifest_idx)
.bind::<Nullable<Binary>, _>(param)
.bind::<Nullable<Jsonb>, _>(context)
.bind::<Integer, _>(causality_region)
.bind::<Nullable<Integer>, _>(done_at)
.execute(conn)?;
for chunk in dss.chunks(chunk_size) {
let query = CopyDsQuery::new(dst, chunk)?;
count += query.execute(conn)?;
}

// If the manifest idxes remained constant, we can test that both tables have the same
Expand Down Expand Up @@ -361,3 +321,141 @@ impl DataSourcesTable {
.optional()?)
}
}

/// Map src manifest indexes to dst manifest indexes. If the
/// destination is missing an entry, put `None` as the value for the
/// source index
struct ManifestIdxMap {
map: HashMap<i32, (Option<i32>, String)>,
}

impl ManifestIdxMap {
fn new(src: &[(i32, String)], dst: &[(i32, String)]) -> Self {
let dst_idx_map: HashMap<&String, i32> =
HashMap::from_iter(dst.iter().map(|(idx, name)| (name, *idx)));
let map = src
.iter()
.map(|(src_idx, src_name)| {
(
*src_idx,
(dst_idx_map.get(src_name).copied(), src_name.to_string()),
)
})
.collect();
ManifestIdxMap { map }
}

fn dst_idx(
&self,
src_idx: i32,
src_nsp: &Namespace,
src_created: BlockNumber,
dst_nsp: &Namespace,
) -> Result<i32, StoreError> {
let (dst_idx, name) = self.map.get(&src_idx).with_context(|| {
anyhow!(
"the source {src_nsp} does not have a template with \
index {src_idx} but created one at block {src_created}"
)
})?;
let dst_idx = dst_idx.with_context(|| {
anyhow!(
"the destination {dst_nsp} is missing a template with \
name {name}. The source {src_nsp} created one at block {src_created}"
)
})?;
Ok(dst_idx)
}
}

#[derive(Queryable)]
struct DsForCopy {
block_range: (Bound<i32>, Bound<i32>),
idx: i32,
param: Option<Vec<u8>>,
context: Option<serde_json::Value>,
causality_region: i32,
done_at: Option<i32>,
}

impl DsForCopy {
fn src_to_dst(
mut self,
target_block: BlockNumber,
map: &ManifestIdxMap,
src_nsp: &Namespace,
dst_nsp: &Namespace,
) -> Result<Self, StoreError> {
// unclamp block range if it ends beyond target block
match self.block_range.1 {
Bound::Included(block) if block > target_block => self.block_range.1 = Bound::Unbounded,
Bound::Excluded(block) if block - 1 > target_block => {
self.block_range.1 = Bound::Unbounded
}
_ => { /* use block range as is */ }
}
// Translate manifest index
let src_created = match self.block_range.0 {
Bound::Included(block) => block,
Bound::Excluded(block) => block + 1,
Bound::Unbounded => 0,
};
self.idx = map.dst_idx(self.idx, src_nsp, src_created, dst_nsp)?;
Ok(self)
}
}

struct CopyDsQuery<'a> {
dst: &'a DataSourcesTable,
dss: &'a [DsForCopy],
}

impl<'a> CopyDsQuery<'a> {
const BIND_PARAMS: usize = 6;

fn new(dst: &'a DataSourcesTable, dss: &'a [DsForCopy]) -> Result<Self, StoreError> {
Ok(CopyDsQuery { dst, dss })
}
}

impl<'a> QueryFragment<Pg> for CopyDsQuery<'a> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();
out.push_sql("insert into ");
out.push_sql(&self.dst.qname);
out.push_sql(
"(block_range, manifest_idx, param, context, causality_region, done_at) values ",
);
let mut first = true;
for ds in self.dss.iter() {
if first {
first = false;
} else {
out.push_sql(", ");
}
out.push_sql("(");
out.push_bind_param::<sql_types::Range<Integer>, _>(&ds.block_range)?;
out.push_sql(", ");
out.push_bind_param::<Integer, _>(&ds.idx)?;
out.push_sql(", ");
out.push_bind_param::<Nullable<Binary>, _>(&ds.param)?;
out.push_sql(", ");
out.push_bind_param::<Nullable<Jsonb>, _>(&ds.context)?;
out.push_sql(", ");
out.push_bind_param::<Integer, _>(&ds.causality_region)?;
out.push_sql(", ");
out.push_bind_param::<Nullable<Integer>, _>(&ds.done_at)?;
out.push_sql(")");
}

Ok(())
}
}

impl<'a> QueryId for CopyDsQuery<'a> {
type QueryId = ();

const HAS_STATIC_QUERY_ID: bool = false;
}

impl<'a, Conn> RunQueryDsl<Conn> for CopyDsQuery<'a> {}
2 changes: 1 addition & 1 deletion store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::{
const BASE_SQL_COLUMNS: [&str; 2] = ["id", "vid"];

/// The maximum number of bind variables that can be used in a query
const POSTGRES_MAX_PARAMETERS: usize = u16::MAX as usize; // 65535
pub(crate) const POSTGRES_MAX_PARAMETERS: usize = u16::MAX as usize; // 65535

const SORT_KEY_COLUMN: &str = "sort_key$";

Expand Down
Loading