Skip to content

Commit

Permalink
update dag accumulator if the main append some new tips (#3968)
Browse files Browse the repository at this point in the history
* debug sync successfully

* dag sync in a synchronized way

* fix sync bugs

* fix bug: update the dag accumulator at necessary point
  • Loading branch information
jackzhhuang authored Sep 15, 2023
1 parent 6101e5d commit e361e0c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 23 deletions.
20 changes: 19 additions & 1 deletion chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use starcoin_state_api::{AccountStateReader, ChainStateReader, ChainStateWriter}
use starcoin_statedb::ChainStateDB;
use starcoin_storage::flexi_dag::SyncFlexiDagSnapshot;
use starcoin_storage::Store;
use starcoin_storage::storage::CodecKVStore;
use starcoin_time_service::TimeService;
use starcoin_types::block::BlockIdAndNumber;
use starcoin_types::contract_event::ContractEventInfo;
Expand Down Expand Up @@ -107,6 +108,7 @@ impl BlockChain {
)),
None => None,
};
let dag_snapshot_tips = storage.get_accumulator_snapshot_storage().get(head_id)?.map(|snapshot| snapshot.child_hashes);
let mut chain = Self {
genesis_hash: genesis,
time_service,
Expand All @@ -124,7 +126,7 @@ impl BlockChain {
status: ChainStatus::new(
head_block.header.clone(),
block_info,
Some(vec![head_block.id()]),
dag_snapshot_tips,
),
head: head_block,
},
Expand Down Expand Up @@ -636,6 +638,22 @@ impl BlockChain {
);
Ok(())
}

pub fn dag_parents_in_tips(&self, dag_parents: Vec<HashValue>) -> Result<bool> {
Ok(dag_parents.into_iter().all(|parent| {
match &self.status.status.tips_hash {
Some(tips) => tips.contains(&parent),
None => false,
}
}))
}

pub fn is_head_of_dag_accumulator(&self, next_tips: Vec<HashValue>) -> Result<bool> {
let key = Self::calculate_dag_accumulator_key(next_tips)?;
let next_tips_info = self.storage.get_dag_accumulator_info(key)?;

return Ok(next_tips_info == self.dag_accumulator.as_ref().map(|accumulator| accumulator.get_info()));
}
}

impl ChainReader for BlockChain {
Expand Down
3 changes: 2 additions & 1 deletion sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl ServiceHandler<Self, BlockConnectedRequest> for BlockConnectorService {
//TODO refactor connect and execute

let block = msg.block;
self.chain_service.try_connect(block, msg.dag_parents)
let result = self.chain_service.try_connect(block, msg.dag_parents);
result
}
}
70 changes: 49 additions & 21 deletions sync/src/block_connector/write_block_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,19 @@ where
let mut retracted = vec![];
let mut enacted = vec![];

let snapshot = new_branch.get_dag_accumulator_snapshot(new_branch.head_block().header().id())?;
let mut children = snapshot.child_hashes.clone();
children.sort();
for child in children {
match self
.storage
.get_block(child)? {
Some(block) => enacted.push(block),
None => bail!("the block{} dose not exist in new branch, ignore", child.clone()),
}
}
enacted.reverse();

loop {
if min_leaf_index == 0 {
break;
Expand All @@ -467,7 +480,7 @@ where
if let anyhow::Result::Ok(Some(block)) = block {
rollback_blocks.push(block);
} else {
warn!("the block{} dose not exist in main branch, ignore", child.clone());
bail!("the block{} dose not exist in main branch, ignore", child.clone());
}
return Ok(rollback_blocks);
})?.into_iter());
Expand All @@ -482,7 +495,7 @@ where
if let anyhow::Result::Ok(Some(block)) = block {
rollback_blocks.push(block);
} else {
warn!("the block{} dose not exist in new branch, ignore", child.clone());
bail!("the block{} dose not exist in new branch, ignore", child.clone());
}
return Ok(rollback_blocks);
})?.into_iter());
Expand Down Expand Up @@ -861,28 +874,43 @@ where
bail!("no new block has been executed successfully!");
}

// 1, write to disc
self.main
.append_dag_accumulator_leaf(new_tips.clone())?;

// 2, broadcast the blocks sorted by their id
executed_blocks
.iter()
.for_each(|(exe_block, dag_block_parents)| {
if let Some(block) = exe_block {
self.broadcast_new_head(
block.clone(),
Some(dag_block_parents.clone()),
Some(new_tips.clone()),
);
}
});
let mut connected = self.main.is_head_of_dag_accumulator(new_tips.clone())?;
if self.main.dag_parents_in_tips(new_tips.clone())? {
// 1, write to disc
if !connected {
self.main
.append_dag_accumulator_leaf(new_tips.clone())?;
connected = true;
}
}

if connected {
// 2, broadcast the blocks sorted by their id
executed_blocks
.iter()
.for_each(|(exe_block, dag_block_parents)| {
if let Some(block) = exe_block {
self.broadcast_new_head(
block.clone(),
Some(dag_block_parents.clone()),
Some(new_tips.clone()),
);
}
});
}

return executed_blocks
.last()
.map(|(exe_block, _)| {
ConnectOk::ExeConnectMain(
exe_block.as_ref().expect("exe block should not be None!").clone(),
)
if connected {
ConnectOk::ExeConnectMain(
exe_block.as_ref().expect("exe block should not be None!").clone(),
)
} else {
ConnectOk::ExeConnectBranch(
exe_block.as_ref().expect("exe block should not be None!").clone(),
)
}
})
.ok_or_else(|| format_err!("no block has been executed successfully!"));
}
Expand Down

0 comments on commit e361e0c

Please sign in to comment.