Skip to content

Commit 580558d

Browse files
committed
store: Batch data_sources$ inserts during restore
Also move restoring private data sources into the DataSourcesTable
1 parent fb0584b commit 580558d

File tree

2 files changed

+93
-47
lines changed

2 files changed

+93
-47
lines changed

store/postgres/src/dynds/private.rs

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use diesel::{
44
pg::{sql_types, Pg},
55
query_builder::{AstPass, QueryFragment, QueryId},
66
sql_query,
7-
sql_types::{Binary, Bool, Integer, Jsonb, Nullable},
7+
sql_types::{BigInt, Binary, Bool, Integer, Jsonb, Nullable, Text},
88
ExpressionMethods, OptionalExtension, QueryDsl, QueryResult,
99
};
1010
use diesel_async::RunQueryDsl;
@@ -17,6 +17,8 @@ use graph::{
1717
prelude::{serde_json, BlockNumber, StoreError},
1818
};
1919

20+
use crate::parquet::convert::DataSourceRestoreRow;
21+
2022
use crate::{primary::Namespace, relational_queries::POSTGRES_MAX_PARAMETERS, AsyncPgConnection};
2123

2224
type DynTable = diesel_dynamic_schema::Table<String, Namespace>;
@@ -57,6 +59,10 @@ impl DataSourcesTable {
5759
}
5860
}
5961

62+
pub(crate) fn qualified_name(&self) -> &str {
63+
&self.qname
64+
}
65+
6066
pub(crate) fn as_ddl(&self) -> String {
6167
format!(
6268
"
@@ -332,6 +338,25 @@ impl DataSourcesTable {
332338
.await
333339
.optional()?)
334340
}
341+
342+
/// Insert rows from a dump/restore into the `data_sources$` table.
343+
/// Rows are batched to stay within PostgreSQL's bind parameter limit.
344+
pub(crate) async fn insert_rows(
345+
&self,
346+
conn: &mut AsyncPgConnection,
347+
rows: &[DataSourceRestoreRow],
348+
) -> Result<usize, StoreError> {
349+
let chunk_size = POSTGRES_MAX_PARAMETERS / RestoreDsQuery::BIND_PARAMS;
350+
let mut count = 0;
351+
for chunk in rows.chunks(chunk_size) {
352+
let query = RestoreDsQuery {
353+
qname: &self.qname,
354+
rows: chunk,
355+
};
356+
count += query.execute(conn).await?;
357+
}
358+
Ok(count)
359+
}
335360
}
336361

337362
/// Map src manifest indexes to dst manifest indexes. If the
@@ -469,3 +494,58 @@ impl<'a> QueryId for CopyDsQuery<'a> {
469494

470495
const HAS_STATIC_QUERY_ID: bool = false;
471496
}
497+
498+
struct RestoreDsQuery<'a> {
499+
qname: &'a str,
500+
rows: &'a [DataSourceRestoreRow],
501+
}
502+
503+
impl<'a> RestoreDsQuery<'a> {
504+
const BIND_PARAMS: usize = 10;
505+
}
506+
507+
impl<'a> QueryFragment<Pg> for RestoreDsQuery<'a> {
508+
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
509+
out.unsafe_to_cache_prepared();
510+
out.push_sql("insert into ");
511+
out.push_sql(self.qname);
512+
out.push_sql(
513+
"(vid, block_range, causality_region, manifest_idx, \
514+
parent, id, param, context, done_at) values ",
515+
);
516+
for (i, row) in self.rows.iter().enumerate() {
517+
if i > 0 {
518+
out.push_sql(", ");
519+
}
520+
out.push_sql("(");
521+
out.push_bind_param::<BigInt, _>(&row.vid)?;
522+
out.push_sql(", int4range(");
523+
out.push_bind_param::<Integer, _>(&row.block_range_start)?;
524+
out.push_sql(", ");
525+
out.push_bind_param::<Nullable<Integer>, _>(&row.block_range_end)?;
526+
out.push_sql("), ");
527+
out.push_bind_param::<Integer, _>(&row.causality_region)?;
528+
out.push_sql(", ");
529+
out.push_bind_param::<Integer, _>(&row.manifest_idx)?;
530+
out.push_sql(", ");
531+
out.push_bind_param::<Nullable<Integer>, _>(&row.parent)?;
532+
out.push_sql(", ");
533+
out.push_bind_param::<Nullable<Binary>, _>(&row.id)?;
534+
out.push_sql(", ");
535+
out.push_bind_param::<Nullable<Binary>, _>(&row.param)?;
536+
out.push_sql(", ");
537+
out.push_bind_param::<Nullable<Text>, _>(&row.context)?;
538+
out.push_sql("::jsonb, ");
539+
out.push_bind_param::<Nullable<Integer>, _>(&row.done_at)?;
540+
out.push_sql(")");
541+
}
542+
543+
Ok(())
544+
}
545+
}
546+
547+
impl<'a> QueryId for RestoreDsQuery<'a> {
548+
type QueryId = ();
549+
550+
const HAS_STATIC_QUERY_ID: bool = false;
551+
}

store/postgres/src/relational/restore.rs

Lines changed: 12 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use std::sync::Arc;
1212

1313
use diesel::dsl::update;
1414
use diesel::prelude::{ExpressionMethods, QueryDsl};
15-
use diesel::sql_types::{BigInt, Binary, Integer, Nullable, Text};
1615
use diesel_async::{RunQueryDsl, SimpleAsyncConnection};
1716
use graph::blockchain::BlockHash;
1817
use graph::data::subgraph::schema::{DeploymentCreate, SubgraphManifestEntity};
@@ -23,9 +22,7 @@ use graph::semver::Version;
2322
use crate::catalog;
2423
use crate::deployment::create_deployment;
2524
use crate::dynds::DataSourcesTable;
26-
use crate::parquet::convert::{
27-
record_batch_to_data_source_rows, record_batch_to_restore_rows, DataSourceRestoreRow,
28-
};
25+
use crate::parquet::convert::{record_batch_to_data_source_rows, record_batch_to_restore_rows};
2926
use crate::parquet::reader::read_batches;
3027
use crate::primary::Site;
3128
use crate::relational::dump::{Metadata, TableInfo};
@@ -133,39 +130,10 @@ async fn import_entity_table(
133130
Ok(total_inserted)
134131
}
135132

136-
/// Insert a single data_sources$ row via raw SQL.
137-
async fn insert_data_source_row(
138-
conn: &mut AsyncPgConnection,
139-
qualified_table: &str,
140-
row: &DataSourceRestoreRow,
141-
) -> Result<(), StoreError> {
142-
let query = format!(
143-
"INSERT INTO {} (vid, block_range, causality_region, manifest_idx, \
144-
parent, id, param, context, done_at) \
145-
VALUES ($1, int4range($2, $3), $4, $5, $6, $7, $8, $9::jsonb, $10)",
146-
qualified_table,
147-
);
148-
diesel::sql_query(&query)
149-
.bind::<BigInt, _>(row.vid)
150-
.bind::<Integer, _>(row.block_range_start)
151-
.bind::<Nullable<Integer>, _>(row.block_range_end)
152-
.bind::<Integer, _>(row.causality_region)
153-
.bind::<Integer, _>(row.manifest_idx)
154-
.bind::<Nullable<Integer>, _>(row.parent)
155-
.bind::<Nullable<Binary>, _>(row.id.as_deref())
156-
.bind::<Nullable<Binary>, _>(row.param.as_deref())
157-
.bind::<Nullable<Text>, _>(row.context.as_deref())
158-
.bind::<Nullable<Integer>, _>(row.done_at)
159-
.execute(conn)
160-
.await
161-
.map_err(StoreError::from)?;
162-
Ok(())
163-
}
164-
165133
/// Import the `data_sources$` table from Parquet chunks.
166134
async fn import_data_sources(
167135
conn: &mut AsyncPgConnection,
168-
namespace: &str,
136+
ds_table: &DataSourcesTable,
169137
table_info: &TableInfo,
170138
dir: &Path,
171139
logger: &Logger,
@@ -174,8 +142,8 @@ async fn import_data_sources(
174142
return Ok(0);
175143
}
176144

177-
let qualified = format!("\"{}\".\"{DATA_SOURCES_TABLE}\"", namespace);
178-
let max_vid_db = current_max_vid(conn, &qualified).await?;
145+
let qualified = ds_table.qualified_name();
146+
let max_vid_db = current_max_vid(conn, qualified).await?;
179147
if max_vid_db >= table_info.max_vid {
180148
info!(logger, "data_sources$ already fully restored, skipping");
181149
return Ok(0);
@@ -193,15 +161,13 @@ async fn import_data_sources(
193161

194162
for batch in batches {
195163
let batch = batch?;
196-
let rows = record_batch_to_data_source_rows(&batch)?;
197-
198-
for row in &rows {
199-
if max_vid_db >= 0 && row.vid <= max_vid_db {
200-
continue;
201-
}
202-
insert_data_source_row(conn, &qualified, row).await?;
203-
total_inserted += 1;
164+
let mut rows = record_batch_to_data_source_rows(&batch)?;
165+
166+
if max_vid_db >= 0 {
167+
rows.retain(|row| row.vid > max_vid_db);
204168
}
169+
170+
total_inserted += ds_table.insert_rows(conn, &rows).await?;
205171
}
206172
}
207173

@@ -352,8 +318,8 @@ pub async fn import_data(
352318

353319
// Import data_sources$ if present
354320
if let Some(ds_info) = metadata.tables.get(DATA_SOURCES_TABLE) {
355-
let namespace = layout.site.namespace.as_str();
356-
import_data_sources(conn, namespace, ds_info, dir, logger).await?;
321+
let ds_table = DataSourcesTable::new(layout.site.namespace.clone());
322+
import_data_sources(conn, &ds_table, ds_info, dir, logger).await?;
357323
}
358324

359325
Ok(())

0 commit comments

Comments
 (0)