11use std:: { collections:: HashMap , ops:: Bound } ;
22
33use diesel:: {
4- pg:: sql_types,
4+ pg:: { sql_types, Pg } ,
55 prelude:: * ,
6+ query_builder:: { AstPass , QueryFragment , QueryId } ,
67 sql_query,
78 sql_types:: { Binary , Bool , Integer , Jsonb , Nullable } ,
89 PgConnection , QueryDsl , RunQueryDsl ,
@@ -16,7 +17,7 @@ use graph::{
1617 prelude:: { serde_json, BlockNumber , StoreError } ,
1718} ;
1819
19- use crate :: primary:: Namespace ;
20+ use crate :: { primary:: Namespace , relational_queries :: POSTGRES_MAX_PARAMETERS } ;
2021
2122type DynTable = diesel_dynamic_schema:: Table < String , Namespace > ;
2223type DynColumn < ST > = diesel_dynamic_schema:: Column < DynTable , & ' static str , ST > ;
@@ -226,16 +227,12 @@ impl DataSourcesTable {
226227 return Ok ( count as usize ) ;
227228 }
228229
229- type Tuple = (
230- ( Bound < i32 > , Bound < i32 > ) ,
231- i32 ,
232- Option < Vec < u8 > > ,
233- Option < serde_json:: Value > ,
234- i32 ,
235- Option < i32 > ,
236- ) ;
230+ let manifest_map =
231+ ManifestIdxMap :: new ( src_manifest_idx_and_name, dst_manifest_idx_and_name) ;
237232
238- let src_tuples = self
233+ // Load all data sources that were created up to and including
234+ // `target_block` and transform them ready for insertion
235+ let dss: Vec < _ > = self
239236 . table
240237 . clone ( )
241238 . filter (
@@ -250,34 +247,18 @@ impl DataSourcesTable {
250247 & self . done_at ,
251248 ) )
252249 . order_by ( & self . vid )
253- . load :: < Tuple > ( conn) ?;
250+ . load :: < DsForCopy > ( conn) ?
251+ . into_iter ( )
252+ . map ( |ds| ds. src_to_dst ( target_block, & manifest_map) )
253+ . collect :: < Result < _ , _ > > ( ) ?;
254254
255- let manifest_map =
256- ManifestIdxMap :: new ( src_manifest_idx_and_name, dst_manifest_idx_and_name) ;
255+ // Split all dss into chunks so that we never use more than
256+ // `POSTGRES_MAX_PARAMETERS` bind variables per chunk
257+ let chunk_size = POSTGRES_MAX_PARAMETERS / CopyDsQuery :: BIND_PARAMS ;
257258 let mut count = 0 ;
258- for ( block_range, src_idx, param, context, causality_region, done_at) in src_tuples {
259- let dst_idx = manifest_map. dst_idx ( src_idx) ?;
260- let query = format ! (
261- "\
262- insert into {dst}(block_range, manifest_idx, param, context, causality_region, done_at)
263- values(case
264- when upper($2) <= $1 then $2
265- else int4range(lower($2), null)
266- end,
267- $3, $4, $5, $6, $7)
268- " ,
269- dst = dst. qname
270- ) ;
271-
272- count += sql_query ( query)
273- . bind :: < Integer , _ > ( target_block)
274- . bind :: < sql_types:: Range < Integer > , _ > ( block_range)
275- . bind :: < Integer , _ > ( dst_idx)
276- . bind :: < Nullable < Binary > , _ > ( param)
277- . bind :: < Nullable < Jsonb > , _ > ( context)
278- . bind :: < Integer , _ > ( causality_region)
279- . bind :: < Nullable < Integer > , _ > ( done_at)
280- . execute ( conn) ?;
259+ for chunk in dss. chunks ( chunk_size) {
260+ let query = CopyDsQuery :: new ( dst, chunk) ?;
261+ count += query. execute ( conn) ?;
281262 }
282263
283264 // If the manifest idxes remained constant, we can test that both tables have the same
@@ -344,12 +325,12 @@ impl DataSourcesTable {
344325/// Map src manifest indexes to dst manifest indexes. If the
345326/// destination is missing an entry, put `None` as the value for the
346327/// source index
347- struct ManifestIdxMap < ' a > {
348- map : HashMap < i32 , ( Option < i32 > , & ' a String ) > ,
328+ struct ManifestIdxMap {
329+ map : HashMap < i32 , ( Option < i32 > , String ) > ,
349330}
350331
351- impl < ' a > ManifestIdxMap < ' a > {
352- fn new ( src : & ' a [ ( i32 , String ) ] , dst : & ' a [ ( i32 , String ) ] ) -> Self {
332+ impl ManifestIdxMap {
333+ fn new ( src : & [ ( i32 , String ) ] , dst : & [ ( i32 , String ) ] ) -> Self {
353334 let map = src
354335 . iter ( )
355336 . map ( |( src_idx, src_name) | {
@@ -359,7 +340,7 @@ impl<'a> ManifestIdxMap<'a> {
359340 dst. iter ( )
360341 . find ( |( _, dst_name) | src_name == dst_name)
361342 . map ( |( dst_idx, _) | * dst_idx) ,
362- src_name,
343+ src_name. to_string ( ) ,
363344 ) ,
364345 )
365346 } )
@@ -380,3 +361,85 @@ impl<'a> ManifestIdxMap<'a> {
380361 Ok ( dst_idx)
381362 }
382363}
364+
365+ #[ derive( Queryable ) ]
366+ struct DsForCopy {
367+ block_range : ( Bound < i32 > , Bound < i32 > ) ,
368+ idx : i32 ,
369+ param : Option < Vec < u8 > > ,
370+ context : Option < serde_json:: Value > ,
371+ causality_region : i32 ,
372+ done_at : Option < i32 > ,
373+ }
374+
375+ impl DsForCopy {
376+ fn src_to_dst (
377+ mut self ,
378+ target_block : BlockNumber ,
379+ map : & ManifestIdxMap ,
380+ ) -> Result < Self , StoreError > {
381+ // unclamp block range if it ends beyond target block
382+ match self . block_range . 1 {
383+ Bound :: Included ( block) if block > target_block => self . block_range . 1 = Bound :: Unbounded ,
384+ _ => { /* use block range as is */ }
385+ }
386+ // Translate manifest index
387+ self . idx = map. dst_idx ( self . idx ) ?;
388+ Ok ( self )
389+ }
390+ }
391+
392+ struct CopyDsQuery < ' a > {
393+ dst : & ' a DataSourcesTable ,
394+ dss : & ' a [ DsForCopy ] ,
395+ }
396+
397+ impl < ' a > CopyDsQuery < ' a > {
398+ const BIND_PARAMS : usize = 6 ;
399+
400+ fn new ( dst : & ' a DataSourcesTable , dss : & ' a [ DsForCopy ] ) -> Result < Self , StoreError > {
401+ Ok ( CopyDsQuery { dst, dss } )
402+ }
403+ }
404+
405+ impl < ' a > QueryFragment < Pg > for CopyDsQuery < ' a > {
406+ fn walk_ast < ' b > ( & ' b self , mut out : AstPass < ' _ , ' b , Pg > ) -> QueryResult < ( ) > {
407+ out. unsafe_to_cache_prepared ( ) ;
408+ out. push_sql ( "insert into " ) ;
409+ out. push_sql ( & self . dst . qname ) ;
410+ out. push_sql (
411+ "(block_range, manifest_idx, param, context, causality_region, done_at) values " ,
412+ ) ;
413+ let mut first = true ;
414+ for ds in self . dss . iter ( ) {
415+ if first {
416+ first = false ;
417+ } else {
418+ out. push_sql ( ", " ) ;
419+ }
420+ out. push_sql ( "(" ) ;
421+ out. push_bind_param :: < sql_types:: Range < Integer > , _ > ( & ds. block_range ) ?;
422+ out. push_sql ( ", " ) ;
423+ out. push_bind_param :: < Integer , _ > ( & ds. idx ) ?;
424+ out. push_sql ( ", " ) ;
425+ out. push_bind_param :: < Nullable < Binary > , _ > ( & ds. param ) ?;
426+ out. push_sql ( ", " ) ;
427+ out. push_bind_param :: < Nullable < Jsonb > , _ > ( & ds. context ) ?;
428+ out. push_sql ( ", " ) ;
429+ out. push_bind_param :: < Integer , _ > ( & ds. causality_region ) ?;
430+ out. push_sql ( ", " ) ;
431+ out. push_bind_param :: < Nullable < Integer > , _ > ( & ds. done_at ) ?;
432+ out. push_sql ( ")" ) ;
433+ }
434+
435+ Ok ( ( ) )
436+ }
437+ }
438+
439+ impl < ' a > QueryId for CopyDsQuery < ' a > {
440+ type QueryId = ( ) ;
441+
442+ const HAS_STATIC_QUERY_ID : bool = false ;
443+ }
444+
445+ impl < ' a , Conn > RunQueryDsl < Conn > for CopyDsQuery < ' a > { }
0 commit comments