diff --git a/chain/src/chain.rs b/chain/src/chain.rs index b285f49a8d..5e26f08428 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -24,13 +24,11 @@ use starcoin_open_block::OpenedBlock; use starcoin_state_api::{AccountStateReader, ChainStateReader, ChainStateWriter}; use starcoin_statedb::ChainStateDB; use starcoin_storage::flexi_dag::SyncFlexiDagSnapshot; -use starcoin_storage::storage::CodecKVStore; use starcoin_storage::Store; use starcoin_time_service::TimeService; use starcoin_types::block::BlockIdAndNumber; use starcoin_types::contract_event::ContractEventInfo; use starcoin_types::filter::Filter; -use starcoin_types::header::DagHeader; use starcoin_types::startup_info::{ChainInfo, ChainStatus}; use starcoin_types::transaction::RichTransactionInfo; use starcoin_types::{ @@ -55,6 +53,19 @@ pub struct ChainStatusWithBlock { pub head: Block, } +impl ChainStatusWithBlock { + pub fn new(head_block: Block, block_info: BlockInfo, dag_tips: Vec) -> Self { + Self { + status: ChainStatus::new(head_block.header.clone(), block_info, Some(dag_tips)), + head: head_block, + } + } + + pub fn dag_tips(&self) -> Option<&Vec> { + self.status.tips_hash.as_ref() + } +} + pub struct BlockChain { genesis_hash: HashValue, txn_accumulator: MerkleAccumulator, diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 70c73faab9..adbc37c90d 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -3,5 +3,5 @@ #![deny(clippy::integer_arithmetic)] mod chain; pub mod verifier; -pub use chain::BlockChain; +pub use chain::{BlockChain, ChainStatusWithBlock}; pub use starcoin_chain_api::{ChainReader, ChainWriter}; diff --git a/sync/src/block_connector/block_connector_service.rs b/sync/src/block_connector/block_connector_service.rs index b191273678..9f3358adc0 100644 --- a/sync/src/block_connector/block_connector_service.rs +++ b/sync/src/block_connector/block_connector_service.rs @@ -10,6 +10,7 @@ use crate::tasks::{BlockConnectedEvent, BlockConnectedFinishEvent, BlockDiskChec use anyhow::bail; use anyhow::{format_err, Ok, Result}; use network_api::PeerProvider; +use parking_lot::Mutex; use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService}; use starcoin_config::{NodeConfig, G_CRATE_VERSION}; use starcoin_consensus::BlockDAG; @@ -29,7 +30,7 @@ use starcoin_txpool_mock_service::MockTxPoolService; use starcoin_types::block::ExecutedBlock; use starcoin_types::sync_status::SyncStatus; use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use sysinfo::{DiskExt, System, SystemExt}; const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3; diff --git a/sync/src/block_connector/test_write_block_chain.rs b/sync/src/block_connector/test_write_block_chain.rs index 71a264861b..e55f414d2e 100644 --- a/sync/src/block_connector/test_write_block_chain.rs +++ b/sync/src/block_connector/test_write_block_chain.rs @@ -2,14 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::integer_arithmetic)] use crate::block_connector::WriteBlockChainService; +use parking_lot::Mutex; use starcoin_account_api::AccountInfo; use starcoin_chain::{BlockChain, ChainReader}; use starcoin_chain_service::WriteableChainService; use starcoin_config::NodeConfig; -use starcoin_consensus::Consensus; use starcoin_consensus::{BlockDAG, Consensus, FlexiDagStorage, FlexiDagStorageConfig}; use starcoin_crypto::HashValue; -use starcoin_genesis::Genesis as StarcoinGenesis; use starcoin_service_registry::bus::BusService; use starcoin_service_registry::{RegistryAsyncService, RegistryService}; use starcoin_storage::Store; @@ -19,7 +18,7 @@ use starcoin_types::block::Block; use starcoin_types::blockhash::ORIGIN; use starcoin_types::header::Header; use starcoin_types::startup_info::StartupInfo; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; pub async fn create_writeable_block_chain() -> ( WriteBlockChainService, diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index 75d38f0585..502e09548c 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -3,7 +3,8 @@ use crate::block_connector::metrics::ChainMetrics; use anyhow::{bail, format_err, Ok, Result}; -use starcoin_chain::BlockChain; +use parking_lot::Mutex; +use starcoin_chain::{BlockChain, ChainStatusWithBlock}; use starcoin_chain_api::{ChainReader, ChainWriter, ConnectBlockError, WriteableChainService}; use starcoin_config::NodeConfig; use starcoin_consensus::BlockDAG; @@ -21,11 +22,54 @@ use starcoin_types::{ startup_info::StartupInfo, system_events::{NewBranch, NewHeadBlock}, }; -use std::fmt::Formatter; -use std::sync::{Arc, Mutex}; +use std::{collections::HashMap, fmt::Formatter, sync::Arc}; use super::BlockConnectorService; +struct DagBranch { + selected_parent: HashValue, + status: ChainStatusWithBlock, +} + +impl DagBranch { + pub fn new(head: Block, selected_parent: HashValue, dag_tips: Vec) -> Self { + todo!() + //Self { + // selected_parent, + // status: ChainStatusWithBlock::new(block, dag_tips), + //} + } + + pub fn selected_parent(&self) -> HashValue { + self.selected_parent + } + + pub fn dag_tips(&self) -> Vec { + self.status + .dag_tips() + .clone() + .expect("invalid tips_hash for a dag branch") + } + + pub fn appendable(&self, other: &DagBranch) -> Option { + if self.selected_parent == other.selected_parent { + match (self.status.dag_tips(), other.status.dag_tips()) { + (None, _) | (_, None) => None, + (Some(me), Some(other)) => { + todo!() + } + } + } else { + None + } + } +} + +#[derive(Default)] +struct DagBranchesStatus { + branches: HashMap, +} + const MAX_ROLL_BACK_BLOCK: usize = 10; pub struct WriteBlockChainService

@@ -35,6 +79,7 @@ where config: Arc, startup_info: StartupInfo, main: BlockChain, + dag_branches: DagBranchesStatus, storage: Arc, txpool: P, bus: ServiceRef, @@ -104,15 +149,7 @@ where .as_ref() .map(|metrics| metrics.chain_block_connect_time.start_timer()); - //Todo: make sure the parents have been see - // 1. check if parents block exists in block_storage/dag_storage. - // 2. evict a event to activate a task to fetch the missing blocks - // 3. return error for further retrying - let result = if block.is_dag() { - self.connect_dag_inner(block) - } else { - self.connect_inner(block) - }; + let result = self.connect_inner(block); if let Some(metrics) = self.metrics.as_ref() { let result = match result.as_ref() { @@ -164,6 +201,7 @@ where config, startup_info, main, + dag_branches: Default::default(), storage, txpool, bus, @@ -177,8 +215,6 @@ where fn find_or_fork( &self, header: &BlockHeader, - dag_block_next_parent: Option, - dag_block_parents: Option>, ) -> Result<(Option, Option)> { let block_id = header.id(); let block_info = self.storage.get_block_info(block_id)?; @@ -195,11 +231,11 @@ where self.vm_metrics.clone(), )?) } - } else if self.block_exist(header.parent_hash())? || self.blocks_exist(dag_block_parents)? { + } else if self.block_exist(header.parent_hash())? { let net = self.config.net(); Some(BlockChain::new( net.time_service(), - dag_block_next_parent.unwrap_or(header.parent_hash()), + header.parent_hash(), self.storage.clone(), net.id().clone(), self.vm_metrics.clone(), @@ -209,7 +245,6 @@ where }; Ok((block_info, block_chain)) } - fn block_exist(&self, block_id: HashValue) -> Result { Ok(matches!(self.storage.get_block_info(block_id)?, Some(_))) } @@ -270,7 +305,7 @@ where if branch_total_difficulty > main_total_difficulty { self.update_startup_info(new_branch.head_block().header())?; - let dag_parents = self.dag.lock().unwrap().get_parents(new_head_block)?; + let dag_parents = self.dag.lock().get_parents(new_head_block)?; ctx.broadcast(NewHeadBlock( Arc::new(new_branch.head_block()), Some(dag_parents), @@ -282,11 +317,7 @@ where } } - pub fn select_head( - &mut self, - new_branch: BlockChain, - dag_block_parents: Option>, - ) -> Result<()> { + pub fn select_head(&mut self, new_branch: BlockChain) -> Result<()> { let executed_block = new_branch.head_block(); let main_total_difficulty = self.main.get_total_difficulty()?; let branch_total_difficulty = new_branch.get_total_difficulty()?; @@ -294,16 +325,10 @@ where if branch_total_difficulty > main_total_difficulty { let (enacted_count, enacted_blocks, retracted_count, retracted_blocks) = - if dag_block_parents.is_some() { - // for dag - self.find_ancestors_from_dag_accumulator(&new_branch)? + if !parent_is_main_head { + self.find_ancestors_from_accumulator(&new_branch)? } else { - // for single chain - if !parent_is_main_head { - self.find_ancestors_from_accumulator(&new_branch)? - } else { - (1, vec![executed_block.block.clone()], 0, vec![]) - } + (1, vec![executed_block.block.clone()], 0, vec![]) }; self.main = new_branch; @@ -316,7 +341,7 @@ where )?; } else { //send new branch event - self.broadcast_new_branch(executed_block, dag_block_parents); + self.broadcast_new_branch(executed_block); } Ok(()) } @@ -647,120 +672,93 @@ where } } - fn broadcast_new_branch( - &self, - block: ExecutedBlock, - dag_block_parents: Option>, - ) { + fn broadcast_new_branch(&self, block: ExecutedBlock) { if let Some(metrics) = self.metrics.as_ref() { metrics .chain_select_head_total .with_label_values(&["new_branch"]) .inc() } - if let Err(e) = self - .bus - .broadcast(NewBranch(Arc::new(block), dag_block_parents)) - { + let dag_parents = block.block.header().clone().parents_hash(); + if let Err(e) = self.bus.broadcast(NewBranch(Arc::new(block), dag_parents)) { error!("Broadcast NewBranch error: {:?}", e); } } - fn switch_branch( - &mut self, - block: Block, - dag_block_parents: Option>, - dag_block_next_parent: Option, - next_tips: &mut Option>, - ) -> Result { - let (block_info, fork) = self.find_or_fork( - block.header(), - dag_block_next_parent, - dag_block_parents.clone(), - )?; - match (block_info, fork) { - //block has been processed in some branch, so just trigger a head selection. - (Some(block_info), Some(branch)) => { - // both are different, select one - debug!( - "Block {} has been processed, trigger head selection, total_difficulty: {}", - block_info.block_id(), - branch.get_total_difficulty()? - ); - let exe_block = branch.head_block(); - self.select_head(branch, dag_block_parents)?; - if let Some(new_tips) = next_tips { - new_tips.push(block_info.block_id().clone()); - } - Ok(ConnectOk::Duplicate(exe_block)) - } - //block has been processed, and its parent is main chain, so just connect it to main chain. - (Some(block_info), None) => { - // both are identical - let block_id: HashValue = block_info.block_id().clone(); - let executed_block = self.main.connect(ExecutedBlock { - block: block.clone(), - block_info, - })?; - info!( - "Block {} main has been processed, trigger head selection", - block_id, - ); - self.do_new_head(executed_block.clone(), 1, vec![block], 0, vec![])?; - Ok(ConnectOk::Connect(executed_block)) + fn connect_dag_inner(&mut self, block: Block) -> Result { + let block_id = block.id(); + let ghost_dag_data = self.dag.lock().addToDag(DagHeader::new(block.header))?; + let selected_parent = self + .storage + .get_block_by_hash(ghost_dag_data.selected_parent)? + .expect("selected parent should in storage"); + // todo: + // 1. keep tracking received blocks in a multi-queue + // 2. for a block, check if [selected_parent, ... mergeset_blues] is one queue + // 2a. if there is a existing queue, append the block to it. + // 2b. if there is a shorter queue, extend it. + // 2c. if not, create a new queue, and append the block to it + // 3. select the longest queue as main + + // This block has been processed before, should we update here? + if let Some(branch) = self.dag_branches.branches.get(&block_id) { + if branch.selected_parent != selected_parent.id() { + // inconsistency found + todo!() } - (None, Some(mut branch)) => { - // the block is not in the block, but the parent is - let result = branch.apply(block); - let executed_block = result?; - self.select_head(branch, dag_block_parents)?; - Ok(ConnectOk::ExeConnectBranch(executed_block)) + let mut branch_tips_iter = branch.status.dag_tips().unwrap().iter(); + let mut mine_tips_iter = ghost_dag_data.mergeset_blues.iter(); + loop { + match (branch_tips_iter.next(), mine_tips_iter.next()) { + (Some(b), Some(m)) if b == m => { /* nothing to do*/ } + (None, None) => { + // we're done iterating. everything is good. + break; + } + _ => { + // inconsistency found + todo!() + } + } } - (None, None) => Err(ConnectBlockError::FutureBlock(Box::new(block)).into()), - } - } - - fn connect_to_main(&mut self, block: Block) -> Result { - let block_id = block.id(); - if block_id == *starcoin_storage::BARNARD_HARD_FORK_HASH - && block.header().number() == starcoin_storage::BARNARD_HARD_FORK_HEIGHT - { - debug!("barnard hard fork {}", block_id); - return Err(ConnectBlockError::BarnardHardFork(Box::new(block)).into()); - } - if self.main.current_header().id() == block_id { - debug!("Repeat connect, current header is {} already.", block_id); - return Ok(ConnectOk::MainDuplicate); } - if self.main.current_header().id() == block.header().parent_hash() - && !self.block_exist(block_id)? - { - return self.apply_and_select_head(block); + // find the longest-same-prefix branch + let mut longest = (None, 0); + for branch in &self.dag_branches.branches { + let mut branch_tips_iter = branch.1.status.dag_tips().unwrap().iter(); + let mut mine_tips_iter = ghost_dag_data.mergeset_blues.iter(); + let mut index = 1usize; + + let res = loop { + match (branch_tips_iter.next(), mine_tips_iter.next()) { + (Some(b), Some(m)) if b == m => index += 1, + _ => break (branch.0, index - 1), + } + }; + if res.1 > longest.1 { + longest = (Some(res.0), res.1) + } } - // todo: should switch dag together - self.switch_branch(block, None, None, &mut None) - } - fn apply_and_select_head(&mut self, block: Block) -> Result { - let executed_block = self.main.apply(block)?; - let enacted_blocks = vec![executed_block.block().clone()]; - self.do_new_head(executed_block.clone(), 1, enacted_blocks, 0, vec![])?; - return Ok(ConnectOk::ExeConnectMain(executed_block)); - } + let mut chain = match longest { + (None, _) => self.main.fork(selected_parent.header.parent_hash())?, + (Some(v), index) => { + let _parent_hash = self + .dag_branches + .branches + .get(v) + .and_then(|branch| branch.status.dag_tips()) + .and_then(|tips| tips.get(index)) + .expect("must exist") + .clone(); + // fork chain + //self.main.fork(parent_hash); + todo!() + } + }; - fn connect_dag_inner(&mut self, block: Block) -> Result { - let ghost_dag_data = self - .dag - .lock() - .unwrap() - .addToDag(DagHeader::new(block.header))?; - let selected_parent = self - .storage - .get_block_by_hash(ghost_dag_data.selected_parent)? - .expect("selected parent should in storage"); - // Todo: check if select_parent is current_head of main, if not, fork it with `select_parent`. - let mut chain = self.main.fork(selected_parent.header.parent_hash())?; + //let mut chain = self.main.fork(selected_parent.header.parent_hash())?; for blue_hash in ghost_dag_data.mergeset_blues.iter() { if let Some(blue_block) = self.storage.get_block(blue_hash.to_owned())? { chain.apply(blue_block); @@ -769,7 +767,6 @@ where return Ok(ConnectOk::DagConnectMissingBlock); } } - // Todo: if forking chain fails, //self.broadcast_new_head(); Ok(ConnectOk::DagConnected) } @@ -782,17 +779,56 @@ where debug!("barnard hard fork {}", block_id); return Err(ConnectBlockError::BarnardHardFork(Box::new(block)).into()); } + + if block.is_dag() { + return self.connect_dag_inner(block); + } + if self.main.current_header().id() == block_id { debug!("Repeat connect, current header is {} already.", block_id); return Ok(ConnectOk::MainDuplicate); } - // normal block, just connect to main - // let mut next_tips = Some(vec![]); - let executed_block = self.connect_to_main(block)?.clone(); - if let Some(block) = executed_block.block() { - self.broadcast_new_head(block.clone(), None, None); + + if self.main.current_header().id() == block.header().parent_hash() + && !self.block_exist(block_id)? + { + let executed_block = self.main.apply(block)?; + let enacted_blocks = vec![executed_block.block().clone()]; + self.do_new_head(executed_block.clone(), 1, enacted_blocks, 0, vec![])?; + return Ok(ConnectOk::ExeConnectMain(executed_block)); + } + let (block_info, fork) = self.find_or_fork(block.header())?; + match (block_info, fork) { + //block has been processed in some branch, so just trigger a head selection. + (Some(block_info), Some(branch)) => { + debug!( + "Block {} has been processed, trigger head selection, total_difficulty: {}", + block_id, + branch.get_total_difficulty()? + ); + self.select_head(branch)?; + Ok(ConnectOk::Duplicate(ExecutedBlock { block, block_info })) + } + //block has been processed, and its parent is main chain, so just connect it to main chain. + (Some(block_info), None) => { + let executed_block = self.main.connect(ExecutedBlock { + block: block.clone(), + block_info, + })?; + info!( + "Block {} main has been processed, trigger head selection", + block_id + ); + self.do_new_head(executed_block.clone(), 1, vec![block], 0, vec![])?; + Ok(ConnectOk::Connect(executed_block)) + } + (None, Some(mut branch)) => { + let executed_block = branch.apply(block)?; + self.select_head(branch)?; + Ok(ConnectOk::ExeConnectBranch(executed_block)) + } + (None, None) => Err(ConnectBlockError::FutureBlock(Box::new(block)).into()), } - return Ok(executed_block); } #[cfg(test)]