@@ -339,101 +339,75 @@ where
339
339
let change_set = self
340
340
. relay_client
341
341
. rpc ( )
342
- . query_storage ( keys. clone ( ) , start, Some ( latest_finalized_hash) )
343
- . await ?;
344
-
345
- let mut change_set_join_set: JoinSet < Result < Option < _ > , anyhow:: Error > > = JoinSet :: new ( ) ;
346
- let mut parachain_headers_with_proof = BTreeMap :: < H256 , ParachainHeaderProofs > :: default ( ) ;
347
- log:: debug!( target: "hyperspace" , "Got {} authority set changes" , change_set. len( ) ) ;
348
-
349
- fn clone_storage_change_sets < T : light_client_common:: config:: Config + Send + Sync > (
350
- changes : & [ StorageChangeSet < T :: Hash > ] ,
351
- ) -> Vec < StorageChangeSet < T :: Hash > > {
352
- changes
353
- . iter ( )
354
- . map ( |change| StorageChangeSet {
355
- block : change. block . clone ( ) ,
356
- changes : change. changes . clone ( ) ,
357
- } )
358
- . collect ( )
359
- }
342
+ . query_storage ( keys. clone ( ) , latest_finalized_hash, Some ( latest_finalized_hash) )
343
+ . await ?
344
+ . pop ( )
345
+ . unwrap ( ) ;
346
+
347
+ log:: debug!( target: "hyperspace" , "Got {} authority set changes" , change_set. changes. len( ) ) ;
348
+
349
+ let change = StorageChangeSet {
350
+ block : change_set. block . clone ( ) ,
351
+ changes : change_set. changes . clone ( ) ,
352
+ } ;
360
353
let latest_para_height = Arc :: new ( AtomicU32 :: new ( 0u32 ) ) ;
361
- for changes in change_set. chunks ( PROCESS_CHANGES_SET_BATCH_SIZE ) {
362
- for change in clone_storage_change_sets :: < T > ( changes) {
363
- let header_numbers = header_numbers. clone ( ) ;
364
- let keys = vec ! [ para_storage_key. clone( ) ] ;
365
- let client = self . clone ( ) ;
366
- let to = self . rpc_call_delay . as_millis ( ) ;
367
- let duration1 = Duration :: from_millis ( rand:: thread_rng ( ) . gen_range ( 1 ..to) as u64 ) ;
368
- let latest_para_height = latest_para_height. clone ( ) ;
369
- change_set_join_set. spawn ( async move {
370
- sleep ( duration1) . await ;
371
- let header = client
372
- . relay_client
373
- . rpc ( )
374
- . header ( Some ( change. block ) )
375
- . await ?
376
- . ok_or_else ( || anyhow ! ( "block not found {:?}" , change. block) ) ?;
377
-
378
- let parachain_header_bytes = {
379
- let key = T :: Storage :: paras_heads ( client. para_id ) ;
380
- let data = client
381
- . relay_client
382
- . storage ( )
383
- . at ( header. hash ( ) )
384
- . fetch ( & key)
385
- . await ?
386
- . expect ( "Header exists in its own changeset; qed" ) ;
387
- <T :: Storage as RuntimeStorage >:: HeadData :: from_inner ( data)
388
- } ;
389
-
390
- let para_header: T :: Header =
391
- Decode :: decode ( & mut parachain_header_bytes. as_ref ( ) ) ?;
392
- let para_block_number = para_header. number ( ) ;
393
- // skip genesis header or any unknown headers
394
- if para_block_number == Zero :: zero ( ) ||
395
- !header_numbers. contains ( & para_block_number)
396
- {
397
- return Ok ( None )
398
- }
399
-
400
- let state_proof = client
401
- . relay_client
402
- . rpc ( )
403
- . read_proof ( keys. iter ( ) . map ( AsRef :: as_ref) , Some ( header. hash ( ) ) )
404
- . await ?
405
- . proof
406
- . into_iter ( )
407
- . map ( |p| p. 0 )
408
- . collect ( ) ;
409
-
410
- let TimeStampExtWithProof { ext : extrinsic, proof : extrinsic_proof } =
411
- fetch_timestamp_extrinsic_with_proof (
412
- & client. para_client ,
413
- Some ( para_header. hash ( ) ) ,
414
- )
415
- . await
416
- . map_err ( |err| anyhow ! ( "Error fetching timestamp with proof: {err:?}" ) ) ?;
417
- let proofs = ParachainHeaderProofs { state_proof, extrinsic, extrinsic_proof } ;
418
- latest_para_height. fetch_max ( u32:: from ( para_block_number) , Ordering :: SeqCst ) ;
419
- Ok ( Some ( ( H256 :: from ( header. hash ( ) ) , proofs) ) )
420
- } ) ;
421
- }
422
354
423
- while let Some ( res) = change_set_join_set. join_next ( ) . await {
424
- if let Some ( ( hash, proofs) ) = res?? {
425
- parachain_headers_with_proof. insert ( hash, proofs) ;
426
- }
427
- }
355
+ let header_numbers = header_numbers. clone ( ) ;
356
+ let keys = vec ! [ para_storage_key. clone( ) ] ;
357
+ let client = self . clone ( ) ;
358
+ let to = self . rpc_call_delay . as_millis ( ) ;
359
+ let duration1 = Duration :: from_millis ( rand:: thread_rng ( ) . gen_range ( 1 ..to) as u64 ) ;
360
+ let latest_para_height = latest_para_height. clone ( ) ;
361
+ let header = client
362
+ . relay_client
363
+ . rpc ( )
364
+ . header ( Some ( change. block ) )
365
+ . await ?
366
+ . ok_or_else ( || anyhow ! ( "block not found {:?}" , change. block) ) ?;
367
+
368
+ let parachain_header_bytes = {
369
+ let key = T :: Storage :: paras_heads ( client. para_id ) ;
370
+ let data = client
371
+ . relay_client
372
+ . storage ( )
373
+ . at ( header. hash ( ) )
374
+ . fetch ( & key)
375
+ . await ?
376
+ . expect ( "Header exists in its own changeset; qed" ) ;
377
+ <T :: Storage as RuntimeStorage >:: HeadData :: from_inner ( data)
378
+ } ;
379
+
380
+ let para_header: T :: Header = Decode :: decode ( & mut parachain_header_bytes. as_ref ( ) ) ?;
381
+ let para_block_number = para_header. number ( ) ;
382
+ // skip genesis header or any unknown headers
383
+ if para_block_number == Zero :: zero ( ) || !header_numbers. contains ( & para_block_number) {
384
+ return Err ( anyhow ! ( "genesis header or unknown header" ) )
428
385
}
429
386
387
+ let state_proof = client
388
+ . relay_client
389
+ . rpc ( )
390
+ . read_proof ( keys. iter ( ) . map ( AsRef :: as_ref) , Some ( header. hash ( ) ) )
391
+ . await ?
392
+ . proof
393
+ . into_iter ( )
394
+ . map ( |p| p. 0 )
395
+ . collect ( ) ;
396
+
397
+ let TimeStampExtWithProof { ext : extrinsic, proof : extrinsic_proof } =
398
+ fetch_timestamp_extrinsic_with_proof ( & client. para_client , Some ( para_header. hash ( ) ) )
399
+ . await
400
+ . map_err ( |err| anyhow ! ( "Error fetching timestamp with proof: {err:?}" ) ) ?;
401
+ let proofs = ParachainHeaderProofs { state_proof, extrinsic, extrinsic_proof } ;
402
+ latest_para_height. fetch_max ( u32:: from ( para_block_number) , Ordering :: SeqCst ) ;
403
+
430
404
unknown_headers. sort_by_key ( |header| header. number ( ) ) ;
431
405
// overwrite unknown headers
432
406
finality_proof. unknown_headers = unknown_headers;
433
407
434
408
Ok ( ParachainHeadersWithFinalityProof {
435
409
finality_proof,
436
- parachain_headers : parachain_headers_with_proof ,
410
+ parachain_header : ( H256 :: from ( header . hash ( ) ) , proofs ) ,
437
411
latest_para_height : latest_para_height. load ( Ordering :: SeqCst ) ,
438
412
} )
439
413
}
0 commit comments