@@ -12,11 +12,11 @@ use starcoin_service_registry::{
12
12
ActorService , ServiceContext , ServiceFactory , ServiceHandler , ServiceRequest ,
13
13
} ;
14
14
use starcoin_storage:: {
15
- flexi_dag:: { KTotalDifficulty , SyncFlexiDagSnapshot , SyncFlexiDagSnapshotHasher } ,
15
+ flexi_dag:: { SyncFlexiDagSnapshot , SyncFlexiDagSnapshotHasher } ,
16
16
storage:: CodecKVStore ,
17
17
BlockStore , Storage , SyncFlexiDagStore , block_info:: BlockInfoStore , Store ,
18
18
} ;
19
- use starcoin_types:: { block:: BlockHeader , header:: DagHeader , startup_info} ;
19
+ use starcoin_types:: { block:: BlockHeader , header:: DagHeader , startup_info, dag_block :: KTotalDifficulty } ;
20
20
21
21
#[ derive( Debug , Clone ) ]
22
22
pub struct DumpTipsToAccumulator {
@@ -163,40 +163,41 @@ impl FlexidagService {
163
163
}
164
164
}
165
165
166
- fn create_snapshot_by_tips ( tips : Vec < HashValue > , head_block_id : HashValue ) -> Result < ( HashValue , SyncFlexiDagSnapshotHasher ) > {
167
- let k_total_difficulties = BTreeSet :: new ( ) ;
168
- tips. iter ( ) . for_each ( |block_id| {
166
+ fn create_snapshot_by_tips ( tips : Vec < HashValue > , head_block_id : HashValue , storage : Arc < Storage > ) -> Result < ( HashValue , SyncFlexiDagSnapshotHasher ) > {
167
+ let mut k_total_difficulties = BTreeSet :: new ( ) ;
168
+ tips. iter ( ) . try_for_each ( |block_id| {
169
169
k_total_difficulties. insert ( KTotalDifficulty {
170
170
head_block_id : block_id. clone ( ) ,
171
- total_difficulty : self . storage . get_block_info ( block_id. clone ( ) ) . expect ( "block info should not be none" ) . ok_or_else ( error || anyhow ! ( "block info should not be none" ) ) ?. total_difficulty ,
171
+ total_difficulty : storage. get_block_info ( block_id. clone ( ) ) . expect ( "block info should not be none" ) . ok_or_else ( || anyhow ! ( "block info should not be none" ) ) ?. total_difficulty ,
172
172
} ) ;
173
- } ) ;
173
+ Ok ( ( ) )
174
+ } ) ?;
174
175
175
- let snaphot_hasher = SyncFlexiDagSnapshotHasher {
176
+ let snapshot_hasher = SyncFlexiDagSnapshotHasher {
176
177
child_hashes : tips,
177
178
head_block_id,
178
179
k_total_difficulties,
179
180
} ;
180
181
181
- Ok ( ( BlockDAG :: calculate_dag_accumulator_key ( & snapshot_hasher) ?, snaphot_hasher ) )
182
+ Ok ( ( BlockDAG :: calculate_dag_accumulator_key ( & snapshot_hasher) ?, snapshot_hasher ) )
182
183
}
183
184
184
185
fn merge_from_big_dag ( & mut self , msg : ForkDagAccumulator ) -> Result < AccumulatorInfo > {
185
- let dag_accumulator = self . dag_accumulator . as_mut ( ) . ok_or_else ( "the dag accumulator should not be none" ) ?;
186
+ let dag_accumulator = self . dag_accumulator . as_mut ( ) . ok_or_else ( || anyhow ! ( "the dag accumulator should not be none" ) ) ?;
186
187
if dag_accumulator. num_leaves ( ) != msg. dag_accumulator_index {
187
188
bail ! ( "cannot merge dag accumulator since its number is not the same as other" ) ;
188
189
}
189
- let tip_info = self . tip_info . as_mut ( ) . ok_or_else ( "the tips should not be none" ) ?;
190
+ let tip_info = self . tip_info . as_mut ( ) . ok_or_else ( || anyhow ! ( "the tips should not be none" ) ) ?;
190
191
msg. new_blocks . iter ( ) . for_each ( |block_id| {
191
- if !tip_info. tips . contains ( block_id) {
192
- tip_info. tips . push ( block_id. clone ( ) ) ;
192
+ if !tip_info. tips . as_ref ( ) . expect ( "tips should not be none" ) . contains ( block_id) {
193
+ tip_info. tips . as_mut ( ) . expect ( "tips should not be none" ) . push ( block_id. clone ( ) ) ;
193
194
}
194
195
} ) ;
195
196
196
- let ( key, snaphot_hasher) = Self :: create_snapshot_by_tips ( tip_info. tips , msg. block_header_id ) ?;
197
+ let ( key, snaphot_hasher) = Self :: create_snapshot_by_tips ( tip_info. tips . as_ref ( ) . expect ( "tips should not be none" ) . clone ( ) , msg. block_header_id , self . storage . clone ( ) ) ?;
197
198
dag_accumulator. append ( & vec ! [ key] ) ?;
198
199
let dag_accumulator_info = dag_accumulator. get_info ( ) ;
199
- self . storage . get_accumulator_snapshot_storage ( ) . put ( key, snaphot_hasher. to_snapshot ( dag_accumulator_info) ) ?;
200
+ self . storage . get_accumulator_snapshot_storage ( ) . put ( key, snaphot_hasher. to_snapshot ( dag_accumulator_info. clone ( ) ) ) ?;
200
201
dag_accumulator. flush ( ) ?;
201
202
Ok ( dag_accumulator_info)
202
203
}
@@ -205,25 +206,25 @@ impl FlexidagService {
205
206
let dag_accumulator = self
206
207
. dag_accumulator
207
208
. as_mut ( )
208
- . ok_or_else ( error || anyhow ! ( "dag accumulator is none" ) ) ?;
209
+ . ok_or_else ( || anyhow ! ( "dag accumulator is none" ) ) ?;
209
210
// fetch the block in the dag according to the dag accumulator index
210
211
let previous_key = dag_accumulator. get_leaf ( msg. dag_accumulator_index - 1 ) ?
211
- . ok_or_else ( error || anyhow ! ( "the dag snapshot hash is none" ) ) ?;
212
+ . ok_or_else ( || anyhow ! ( "the dag snapshot hash is none" ) ) ?;
212
213
213
214
let current_key = dag_accumulator. get_leaf ( msg. dag_accumulator_index ) ?
214
- . ok_or_else ( error || anyhow ! ( "the dag snapshot hash is none" ) ) ?;
215
+ . ok_or_else ( || anyhow ! ( "the dag snapshot hash is none" ) ) ?;
215
216
216
217
let pre_snapshot = self
217
218
. storage
218
219
. get_accumulator_snapshot_storage ( )
219
220
. get ( previous_key) ?
220
- . ok_or_else ( error || anyhow ! ( "the dag snapshot is none" ) ) ?;
221
+ . ok_or_else ( || anyhow ! ( "the dag snapshot is none" ) ) ?;
221
222
222
223
let current_snapshot = self
223
224
. storage
224
225
. get_accumulator_snapshot_storage ( )
225
226
. get ( current_key) ?
226
- . ok_or_else ( error || anyhow ! ( "the dag snapshot is none" ) ) ?;
227
+ . ok_or_else ( || anyhow ! ( "the dag snapshot is none" ) ) ?;
227
228
228
229
// fork the dag accumulator according to the ForkDagAccumulator.dag_accumulator_index
229
230
let fork = dag_accumulator. fork ( Some ( pre_snapshot. accumulator_info ) ) ;
@@ -235,10 +236,10 @@ impl FlexidagService {
235
236
}
236
237
} ) ;
237
238
238
- let ( key, snaphot_hasher) = Self :: create_snapshot_by_tips ( new_blocks, msg. block_header_id ) ?;
239
+ let ( key, snaphot_hasher) = Self :: create_snapshot_by_tips ( new_blocks, msg. block_header_id , self . storage . clone ( ) ) ?;
239
240
fork. append ( & vec ! [ key] ) ?;
240
241
let dag_accumulator_info = fork. get_info ( ) ;
241
- self . storage . get_accumulator_snapshot_storage ( ) . put ( key, snaphot_hasher. to_snapshot ( dag_accumulator_info) ) ?;
242
+ self . storage . get_accumulator_snapshot_storage ( ) . put ( key, snaphot_hasher. to_snapshot ( dag_accumulator_info. clone ( ) ) ) ?;
242
243
fork. flush ( ) ?;
243
244
Ok ( dag_accumulator_info)
244
245
}
@@ -298,9 +299,9 @@ impl ServiceHandler<Self, DumpTipsToAccumulator> for FlexidagService {
298
299
ctx : & mut ServiceContext < FlexidagService > ,
299
300
) -> Result < ( ) > {
300
301
let storage = ctx. get_shared :: < Arc < Storage > > ( ) ?;
301
- if self . tips . is_none ( ) {
302
+ if self . tip_info . is_none ( ) {
302
303
let config = ctx. get_shared :: < Arc < NodeConfig > > ( ) ?;
303
- let ( dag, dag_accumulator) = BlockDAG :: try_init_with_storage ( storage, config) ?;
304
+ let ( dag, dag_accumulator) = BlockDAG :: try_init_with_storage ( storage. clone ( ) , config) ?;
304
305
if dag. is_none ( ) {
305
306
Ok ( ( ) ) // the chain is still in single chain
306
307
} else {
@@ -309,41 +310,36 @@ impl ServiceHandler<Self, DumpTipsToAccumulator> for FlexidagService {
309
310
self . dag_accumulator = dag_accumulator;
310
311
self . tip_info = Some ( TipInfo {
311
312
tips : Some ( vec ! [ msg. block_header. id( ) ] ) ,
312
- k_total_difficulties : [ msg. block_header . id ( ) ] . into_iter ( ) . cloned ( ) . collect ( ) ,
313
+ k_total_difficulties : [ msg. k_total_difficulty ] . into_iter ( ) . collect ( ) ,
313
314
} ) ;
314
315
self . storage = storage. clone ( ) ;
315
316
Ok ( ( ) )
316
317
}
317
318
} else {
318
319
// the chain had became the flexidag chain
319
- let tip_info = self
320
+ let mut tip_info = self
320
321
. tip_info
321
322
. take ( )
322
323
. expect ( "the tips should not be none in this branch" ) ;
323
- let key = BlockDAG :: calculate_dag_accumulator_key ( tips. clone ( ) ) ?;
324
+ let snapshot_hasher = SyncFlexiDagSnapshotHasher {
325
+ child_hashes : tip_info. tips . expect ( "the tips should not be none" ) ,
326
+ head_block_id : msg. current_head_block_id ,
327
+ k_total_difficulties : tip_info. k_total_difficulties ,
328
+ } ;
329
+ let key = BlockDAG :: calculate_dag_accumulator_key ( & snapshot_hasher) ?;
324
330
let dag = self
325
331
. dag_accumulator
326
332
. as_mut ( )
327
333
. expect ( "the tips is not none but the dag accumulator is none" ) ;
328
334
dag. append ( & vec ! [ key] ) ?;
329
335
storage. get_accumulator_snapshot_storage ( ) . put (
330
336
key,
331
- SyncFlexiDagSnapshot {
332
- child_hashes : tip_info. tips . expect ( "the tips should not be none" ) ,
333
- accumulator_info : dag. get_info ( ) ,
334
- head_block_id : msg. current_head_block_id ,
335
- k_total_difficulties : tip_info
336
- . k_total_difficulties
337
- . into_iter ( )
338
- . take ( 16 )
339
- . cloned ( )
340
- . collect ( ) ,
341
- } ,
337
+ snapshot_hasher. to_snapshot ( dag. get_info ( ) )
342
338
) ?;
343
339
dag. flush ( ) ?;
344
340
self . tip_info = Some ( TipInfo {
345
341
tips : Some ( vec ! [ msg. block_header. id( ) ] ) ,
346
- k_total_difficulties : [ msg. block_header . id ( ) ] . into_iter ( ) . cloned ( ) . collect ( ) ,
342
+ k_total_difficulties : [ msg. k_total_difficulty ] . into_iter ( ) . collect ( ) ,
347
343
} ) ;
348
344
self . storage = storage. clone ( ) ;
349
345
Ok ( ( ) )
@@ -360,8 +356,8 @@ impl ServiceHandler<Self, UpdateDagTips> for FlexidagService {
360
356
let header = msg. block_header ;
361
357
match & mut self . tip_info {
362
358
Some ( tip_info) => {
363
- if !tip_info. tips . contains ( & header. id ( ) ) {
364
- tip_info. tips . push ( header. id ( ) ) ;
359
+ if !tip_info. tips . as_ref ( ) . expect ( "tips should not be none" ) . contains ( & header. id ( ) ) {
360
+ tip_info. tips . as_mut ( ) . expect ( "tips should not be none" ) . push ( header. id ( ) ) ;
365
361
tip_info. k_total_difficulties . insert ( KTotalDifficulty {
366
362
head_block_id : msg. k_total_difficulty . head_block_id ,
367
363
total_difficulty : msg. k_total_difficulty . total_difficulty ,
@@ -381,10 +377,9 @@ impl ServiceHandler<Self, UpdateDagTips> for FlexidagService {
381
377
// initialize the dag data, the chain will be the dag chain at next block
382
378
self . dag = dag;
383
379
self . tip_info = Some ( TipInfo {
384
- tips : Some ( vec ! [ msg . block_header . id( ) ] ) ,
385
- k_total_difficulties : [ msg. block_header . id ( ) ]
380
+ tips : Some ( vec ! [ header . id( ) ] ) ,
381
+ k_total_difficulties : [ msg. k_total_difficulty ]
386
382
. into_iter ( )
387
- . cloned ( )
388
383
. collect ( ) ,
389
384
} ) ;
390
385
self . dag_accumulator = dag_accumulator;
@@ -411,7 +406,7 @@ impl ServiceHandler<Self, GetDagTips> for FlexidagService {
411
406
_msg : GetDagTips ,
412
407
_ctx : & mut ServiceContext < FlexidagService > ,
413
408
) -> Result < Option < Vec < HashValue > > > {
414
- Ok ( self . tips . clone ( ) )
409
+ Ok ( self . tip_info . as_ref ( ) . ok_or_else ( || anyhow ! ( "tip info is none" ) ) ? . tips . clone ( ) )
415
410
}
416
411
}
417
412
@@ -538,7 +533,7 @@ impl ServiceHandler<Self, ForkDagAccumulator> for FlexidagService {
538
533
let dag_accumulator = self
539
534
. dag_accumulator
540
535
. as_ref ( )
541
- . ok_or_else ( error || anyhow ! ( "dag accumulator is none" ) ) ?;
536
+ . ok_or_else ( || anyhow ! ( "dag accumulator is none" ) ) ?;
542
537
543
538
if msg. dag_accumulator_index > dag_accumulator. num_leaves ( ) {
544
539
self . merge_from_big_dag ( msg)
@@ -554,11 +549,12 @@ impl ServiceHandler<Self, FinishSync> for FlexidagService {
554
549
msg : FinishSync ,
555
550
_ctx : & mut ServiceContext < FlexidagService > ,
556
551
) -> Result < ( ) > {
557
- let dag_accumulator = self . dag_accumulator . ok_or_else ( || anyhow ! ( "the dag_accumulator is none when sync finish" ) ) ?;
552
+ let dag_accumulator = self . dag_accumulator . as_mut ( ) . ok_or_else ( || anyhow ! ( "the dag_accumulator is none when sync finish" ) ) ?;
558
553
let local_info = dag_accumulator. get_info ( ) ;
559
554
if msg. dag_accumulator_info . get_num_leaves ( ) < local_info. get_num_leaves ( ) {
560
- let mut new_dag_accumulator = MerkleAccumulator :: new_with_info ( msg. dag_accumulator_info , self . storage . get_accumulator_store ( AccumulatorStoreType :: SyncDag ) ) ;
561
- for index in msg. dag_accumulator_info . get_num_leaves ( ) ..local_info. get_num_leaves ( ) {
555
+ let start_idnex = msg. dag_accumulator_info . get_num_leaves ( ) ;
556
+ let new_dag_accumulator = MerkleAccumulator :: new_with_info ( msg. dag_accumulator_info , self . storage . get_accumulator_store ( AccumulatorStoreType :: SyncDag ) ) ;
557
+ for index in start_idnex..local_info. get_num_leaves ( ) {
562
558
let key = dag_accumulator. get_leaf ( index) ?. ok_or_else ( || anyhow ! ( "the dag_accumulator leaf is none when sync finish" ) ) ?;
563
559
new_dag_accumulator. append ( & [ key] ) ?;
564
560
}
0 commit comments