From e361e0c9c0ad63ad646507cb8d18a4906fe2343b Mon Sep 17 00:00:00 2001 From: Jack Huang Date: Fri, 15 Sep 2023 23:54:46 +0800 Subject: [PATCH] update dag accumulator if the main append some new tips (#3968) * debug sync successfully * dag sync in a synchronized way * fix sync bugs * fix bug: update the dag accumulator at necessary point --- chain/src/chain.rs | 20 +++++- .../block_connector_service.rs | 3 +- sync/src/block_connector/write_block_chain.rs | 70 +++++++++++++------ 3 files changed, 70 insertions(+), 23 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 73903ff151..52bbabbbd8 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -23,6 +23,7 @@ use starcoin_state_api::{AccountStateReader, ChainStateReader, ChainStateWriter} use starcoin_statedb::ChainStateDB; use starcoin_storage::flexi_dag::SyncFlexiDagSnapshot; use starcoin_storage::Store; +use starcoin_storage::storage::CodecKVStore; use starcoin_time_service::TimeService; use starcoin_types::block::BlockIdAndNumber; use starcoin_types::contract_event::ContractEventInfo; @@ -107,6 +108,7 @@ impl BlockChain { )), None => None, }; + let dag_snapshot_tips = storage.get_accumulator_snapshot_storage().get(head_id)?.map(|snapshot| snapshot.child_hashes); let mut chain = Self { genesis_hash: genesis, time_service, @@ -124,7 +126,7 @@ impl BlockChain { status: ChainStatus::new( head_block.header.clone(), block_info, - Some(vec![head_block.id()]), + dag_snapshot_tips, ), head: head_block, }, @@ -636,6 +638,22 @@ impl BlockChain { ); Ok(()) } + + pub fn dag_parents_in_tips(&self, dag_parents: Vec) -> Result { + Ok(dag_parents.into_iter().all(|parent| { + match &self.status.status.tips_hash { + Some(tips) => tips.contains(&parent), + None => false, + } + })) + } + + pub fn is_head_of_dag_accumulator(&self, next_tips: Vec) -> Result { + let key = Self::calculate_dag_accumulator_key(next_tips)?; + let next_tips_info = self.storage.get_dag_accumulator_info(key)?; + + return Ok(next_tips_info == self.dag_accumulator.as_ref().map(|accumulator| accumulator.get_info())); + } } impl ChainReader for BlockChain { diff --git a/sync/src/block_connector/block_connector_service.rs b/sync/src/block_connector/block_connector_service.rs index 50bad1d002..b5766e6bd5 100644 --- a/sync/src/block_connector/block_connector_service.rs +++ b/sync/src/block_connector/block_connector_service.rs @@ -312,6 +312,7 @@ impl ServiceHandler for BlockConnectorService { //TODO refactor connect and execute let block = msg.block; - self.chain_service.try_connect(block, msg.dag_parents) + let result = self.chain_service.try_connect(block, msg.dag_parents); + result } } diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index b79bbb5b53..0ff446779f 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -448,6 +448,19 @@ where let mut retracted = vec![]; let mut enacted = vec![]; + let snapshot = new_branch.get_dag_accumulator_snapshot(new_branch.head_block().header().id())?; + let mut children = snapshot.child_hashes.clone(); + children.sort(); + for child in children { + match self + .storage + .get_block(child)? { + Some(block) => enacted.push(block), + None => bail!("the block{} dose not exist in new branch, ignore", child.clone()), + } + } + enacted.reverse(); + loop { if min_leaf_index == 0 { break; @@ -467,7 +480,7 @@ where if let anyhow::Result::Ok(Some(block)) = block { rollback_blocks.push(block); } else { - warn!("the block{} dose not exist in main branch, ignore", child.clone()); + bail!("the block{} dose not exist in main branch, ignore", child.clone()); } return Ok(rollback_blocks); })?.into_iter()); @@ -482,7 +495,7 @@ where if let anyhow::Result::Ok(Some(block)) = block { rollback_blocks.push(block); } else { - warn!("the block{} dose not exist in new branch, ignore", child.clone()); + bail!("the block{} dose not exist in new branch, ignore", child.clone()); } return Ok(rollback_blocks); })?.into_iter()); @@ -861,28 +874,43 @@ where bail!("no new block has been executed successfully!"); } - // 1, write to disc - self.main - .append_dag_accumulator_leaf(new_tips.clone())?; - - // 2, broadcast the blocks sorted by their id - executed_blocks - .iter() - .for_each(|(exe_block, dag_block_parents)| { - if let Some(block) = exe_block { - self.broadcast_new_head( - block.clone(), - Some(dag_block_parents.clone()), - Some(new_tips.clone()), - ); - } - }); + let mut connected = self.main.is_head_of_dag_accumulator(new_tips.clone())?; + if self.main.dag_parents_in_tips(new_tips.clone())? { + // 1, write to disc + if !connected { + self.main + .append_dag_accumulator_leaf(new_tips.clone())?; + connected = true; + } + } + + if connected { + // 2, broadcast the blocks sorted by their id + executed_blocks + .iter() + .for_each(|(exe_block, dag_block_parents)| { + if let Some(block) = exe_block { + self.broadcast_new_head( + block.clone(), + Some(dag_block_parents.clone()), + Some(new_tips.clone()), + ); + } + }); + } + return executed_blocks .last() .map(|(exe_block, _)| { - ConnectOk::ExeConnectMain( - exe_block.as_ref().expect("exe block should not be None!").clone(), - ) + if connected { + ConnectOk::ExeConnectMain( + exe_block.as_ref().expect("exe block should not be None!").clone(), + ) + } else { + ConnectOk::ExeConnectBranch( + exe_block.as_ref().expect("exe block should not be None!").clone(), + ) + } }) .ok_or_else(|| format_err!("no block has been executed successfully!")); }