diff --git a/Cargo.lock b/Cargo.lock index 0f4d8c0405..81ff007b28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2814,7 +2814,7 @@ dependencies = [ ] [[package]] -name = "flexidag" +name = "flexidag-service" version = "1.13.7" dependencies = [ "anyhow", @@ -9332,6 +9332,7 @@ name = "starcoin-chain" version = "1.13.7" dependencies = [ "anyhow", + "async-std", "bcs-ext", "clap 3.2.23", "proptest", @@ -9438,6 +9439,7 @@ name = "starcoin-chain-service" version = "1.13.7" dependencies = [ "anyhow", + "async-std", "async-trait", "futures 0.3.26", "rand 0.8.5", @@ -9449,6 +9451,7 @@ dependencies = [ "starcoin-config", "starcoin-consensus", "starcoin-crypto", + "starcoin-flexidag", "starcoin-logger", "starcoin-network-rpc-api", "starcoin-service-registry", @@ -9583,6 +9586,7 @@ dependencies = [ "rust-argon2", "serde 1.0.152", "sha3", + "starcoin-accumulator", "starcoin-chain-api", "starcoin-config", "starcoin-crypto", @@ -9760,6 +9764,43 @@ dependencies = [ "tokio-executor 0.2.0-alpha.6", ] +[[package]] +name = "starcoin-flexidag" +version = "1.13.7" +dependencies = [ + "anyhow", + "async-trait", + "bcs-ext", + "bincode", + "byteorder", + "futures 0.3.26", + "hex", + "itertools", + "once_cell", + "parking_lot 0.12.1", + "proptest", + "proptest-derive", + "rand 0.8.5", + "rand_core 0.6.4", + "rocksdb", + "rust-argon2", + "serde 1.0.152", + "sha3", + "starcoin-accumulator", + "starcoin-chain-api", + "starcoin-config", + "starcoin-crypto", + "starcoin-logger", + "starcoin-service-registry", + "starcoin-state-api", + "starcoin-storage", + "starcoin-time-service", + "starcoin-types", + "starcoin-vm-types", + "thiserror", + "tokio", +] + [[package]] name = "starcoin-framework" version = "11.0.0" @@ -9950,6 +9991,7 @@ name = "starcoin-miner" version = "1.13.7" dependencies = [ "anyhow", + "async-std", "bcs-ext", "futures 0.3.26", "futures-timer", @@ -9965,6 +10007,7 @@ dependencies = [ "starcoin-consensus", "starcoin-crypto", "starcoin-executor", + "starcoin-flexidag", "starcoin-genesis", "starcoin-logger", "starcoin-metrics", @@ -10812,6 +10855,7 @@ dependencies = [ "starcoin-consensus", "starcoin-crypto", "starcoin-executor", + "starcoin-flexidag", "starcoin-genesis", "starcoin-logger", "starcoin-metrics", diff --git a/Cargo.toml b/Cargo.toml index 3886e5a707..6fab846e5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -503,7 +503,7 @@ starcoin-parallel-executor = { path = "vm/parallel-executor" } starcoin-transaction-benchmarks = { path = "vm/transaction-benchmarks" } starcoin-language-e2e-tests = { path = "vm/e2e-tests" } starcoin-proptest-helpers = { path = "vm/proptest-helpers" } -flexidag = { path ="flexidag" } +starcoin-flexidag = { path = "flexidag" } flexidag-service = { path ="flexidag/service" } syn = { version = "1.0.107", features = [ "full", diff --git a/block-relayer/src/block_relayer.rs b/block-relayer/src/block_relayer.rs index c49969cb3c..e82259aa77 100644 --- a/block-relayer/src/block_relayer.rs +++ b/block-relayer/src/block_relayer.rs @@ -240,11 +240,7 @@ impl BlockRelayer { ) .await?; - block_connector_service.notify(PeerNewBlock::new( - peer_id, - block, - compact_block_msg.message.tips_hash, - ))?; + block_connector_service.notify(PeerNewBlock::new(peer_id, block))?; } Ok(()) }; diff --git a/chain/Cargo.toml b/chain/Cargo.toml index 8dac82c310..5a01426439 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -24,6 +24,7 @@ starcoin-vm-types = { workspace = true } starcoin-storage = { workspace = true } thiserror = { workspace = true } starcoin-network-rpc-api = { workspace = true } +async-std = { workspace = true } [dev-dependencies] proptest = { workspace = true } diff --git a/chain/api/Cargo.toml b/chain/api/Cargo.toml index 1648fcdee5..094c6edcb8 100644 --- a/chain/api/Cargo.toml +++ b/chain/api/Cargo.toml @@ -18,7 +18,6 @@ thiserror = { workspace = true } starcoin-network-rpc-api = { workspace = true } starcoin-config = { workspace = true } - [dev-dependencies] [features] diff --git a/chain/api/src/chain.rs b/chain/api/src/chain.rs index 016ef4df36..271d0d7ab8 100644 --- a/chain/api/src/chain.rs +++ b/chain/api/src/chain.rs @@ -103,8 +103,6 @@ pub trait ChainReader { access_path: Option, ) -> Result>; - /// get the current tips hash value - fn current_tips_hash(&self) -> Option; fn net_id(&self) -> ChainNetworkID; } @@ -117,18 +115,6 @@ pub trait ChainWriter { fn apply(&mut self, block: Block) -> Result; fn chain_state(&mut self) -> &ChainStateDB; - - /// Get the dag accumulator info - fn get_current_dag_accumulator_info(&self) -> Result; - - /// Fork the accumulator - fn fork_dag_accumulator(&mut self, accumulator_info: AccumulatorInfo) -> Result<()>; - - /// Append the dag accumulator leaf - fn append_dag_accumulator_leaf( - &mut self, - tips: Vec, - ) -> Result<(HashValue, AccumulatorInfo)>; } /// `Chain` is a trait that defines a single Chain. diff --git a/chain/chain-notify/src/lib.rs b/chain/chain-notify/src/lib.rs index de3f0900e5..0cd0a22d6e 100644 --- a/chain/chain-notify/src/lib.rs +++ b/chain/chain-notify/src/lib.rs @@ -52,7 +52,7 @@ impl EventHandler for ChainNotifyHandlerService { item: NewHeadBlock, ctx: &mut ServiceContext, ) { - let NewHeadBlock(block_detail, _tips_hash) = item; + let NewHeadBlock(block_detail) = item; let block = block_detail.block(); // notify header. self.notify_new_block(block, ctx); diff --git a/chain/service/Cargo.toml b/chain/service/Cargo.toml index e1dbd7f95d..b772b9e707 100644 --- a/chain/service/Cargo.toml +++ b/chain/service/Cargo.toml @@ -1,5 +1,6 @@ [dependencies] anyhow = { workspace = true } +async-std = { workspace = true } async-trait = { workspace = true } futures = { workspace = true } rand = { workspace = true } @@ -21,6 +22,7 @@ tokio = { workspace = true } starcoin-network-rpc-api = { workspace = true } starcoin-consensus = { workspace = true } starcoin-accumulator = { package = "starcoin-accumulator", workspace = true } +starcoin-flexidag = { workspace = true } [dev-dependencies] stest = { workspace = true } diff --git a/chain/service/src/chain_service.rs b/chain/service/src/chain_service.rs index 058c7c6301..72439f83ea 100644 --- a/chain/service/src/chain_service.rs +++ b/chain/service/src/chain_service.rs @@ -1,8 +1,9 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use anyhow::{bail, format_err, Error, Result}; -use starcoin_accumulator::Accumulator; +use anyhow::{bail, format_err, Error, Ok, Result}; +use starcoin_accumulator::node::AccumulatorStoreType; +use starcoin_accumulator::{Accumulator, MerkleAccumulator}; use starcoin_chain::BlockChain; use starcoin_chain_api::message::{ChainRequest, ChainResponse}; use starcoin_chain_api::{ @@ -11,17 +12,22 @@ use starcoin_chain_api::{ use starcoin_config::NodeConfig; use starcoin_consensus::BlockDAG; use starcoin_crypto::HashValue; +use starcoin_flexidag::flexidag_service::{ + GetDagAccumulatorLeafDetail, GetDagBlockParents, UpdateDagTips, +}; +use starcoin_flexidag::{flexidag_service, FlexidagService}; use starcoin_logger::prelude::*; use starcoin_network_rpc_api::dag_protocol::{ GetDagAccumulatorLeaves, GetTargetDagAccumulatorLeafDetail, TargetDagAccumulatorLeaf, TargetDagAccumulatorLeafDetail, }; use starcoin_service_registry::{ - ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, + ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRef, }; use starcoin_storage::{BlockStore, Storage, Store}; use starcoin_types::block::ExecutedBlock; use starcoin_types::contract_event::ContractEventInfo; +use starcoin_types::dag_block::KTotalDifficulty; use starcoin_types::filter::Filter; use starcoin_types::system_events::NewHeadBlock; use starcoin_types::transaction::RichTransactionInfo; @@ -33,7 +39,7 @@ use starcoin_types::{ }; use starcoin_vm_runtime::metrics::VMMetrics; use starcoin_vm_types::access_path::AccessPath; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; /// A Chain reader service to provider Reader API. pub struct ChainReaderService { @@ -45,7 +51,7 @@ impl ChainReaderService { config: Arc, startup_info: StartupInfo, storage: Arc, - dag: BlockDAG, + flexidag_service: ServiceRef, vm_metrics: Option, ) -> Result { Ok(Self { @@ -53,7 +59,7 @@ impl ChainReaderService { config.clone(), startup_info, storage.clone(), - dag, + flexidag_service, vm_metrics.clone(), )?, }) @@ -68,10 +74,8 @@ impl ServiceFactory for ChainReaderService { .get_startup_info()? .ok_or_else(|| format_err!("StartupInfo should exist at service init."))?; let vm_metrics = ctx.get_shared_opt::()?; - let dag = ctx - .get_shared_opt::()? - .expect("dag should be initialized at service init"); - Self::new(config, startup_info, storage, dag, vm_metrics) + let flexidag_service = ctx.service_ref::()?.clone(); + Self::new(config, startup_info, storage, flexidag_service, vm_metrics) } } @@ -88,11 +92,11 @@ impl ActorService for ChainReaderService { } impl EventHandler for ChainReaderService { - fn handle_event(&mut self, event: NewHeadBlock, _ctx: &mut ServiceContext) { - let new_head = event.0.block().header(); + fn handle_event(&mut self, event: NewHeadBlock, ctx: &mut ServiceContext) { + let new_head = event.0.block().header().clone(); if let Err(e) = if self.inner.get_main().can_connect(event.0.as_ref()) { match self.inner.update_chain_head(event.0.as_ref().clone()) { - Ok(_) => self.inner.update_dag_accumulator(new_head.id()), + std::result::Result::Ok(_) => (), Err(e) => Err(e), } } else { @@ -107,7 +111,7 @@ impl ServiceHandler for ChainReaderService { fn handle( &mut self, msg: ChainRequest, - _ctx: &mut ServiceContext, + ctx: &mut ServiceContext, ) -> Result { match msg { ChainRequest::CurrentHeader() => Ok(ChainResponse::BlockHeader(Box::new( @@ -281,8 +285,8 @@ pub struct ChainReaderServiceInner { startup_info: StartupInfo, main: BlockChain, storage: Arc, + flexidag_service: ServiceRef, vm_metrics: Option, - dag: BlockDAG, } impl ChainReaderServiceInner { @@ -290,7 +294,7 @@ impl ChainReaderServiceInner { config: Arc, startup_info: StartupInfo, storage: Arc, - dag: BlockDAG, + flexidag_service: ServiceRef, vm_metrics: Option, ) -> Result { let net = config.net(); @@ -300,14 +304,13 @@ impl ChainReaderServiceInner { storage.clone(), config.net().id().clone(), vm_metrics.clone(), - Some(dag.clone()), )?; Ok(Self { config, startup_info, main, storage, - dag, + flexidag_service, vm_metrics, }) } @@ -329,13 +332,19 @@ impl ChainReaderServiceInner { self.storage.clone(), self.config.net().id().clone(), self.vm_metrics.clone(), - Some(self.dag.clone()), )?; Ok(()) } - pub fn update_dag_accumulator(&mut self, head_id: HashValue) -> Result<()> { - self.main.update_dag_accumulator(head_id) + pub fn update_dag_accumulator(&mut self, new_block_header: BlockHeader) -> Result<()> { + async_std::task::block_on(self.flexidag_service.send(UpdateDagTips { + block_header: new_block_header, + current_head_block_id: self.main.status().info().id(), + k_total_difficulty: KTotalDifficulty { + head_block_id: self.main.status().info().id(), + total_difficulty: self.main.status().info().get_total_difficulty(), + }, + }))? } } @@ -472,54 +481,37 @@ impl ReadableChainService for ChainReaderServiceInner { &self, req: GetDagAccumulatorLeaves, ) -> anyhow::Result> { - match self - .main - .get_dag_leaves(req.accumulator_leaf_index, true, req.batch_size) - { - Ok(leaves) => Ok(leaves - .into_iter() - .enumerate() - .map( - |(index, leaf)| match self.main.get_dag_accumulator_snapshot(leaf) { - Ok(snapshot) => TargetDagAccumulatorLeaf { - accumulator_root: snapshot.accumulator_info.accumulator_root, - leaf_index: req.accumulator_leaf_index.saturating_sub(index as u64), - }, - Err(error) => { - panic!( - "error occured when query the accumulator snapshot: {}", - error.to_string() - ); - } - }, - ) - .collect()), - Err(error) => { - bail!( - "an error occured when getting the leaves of the accumulator, {}", - error.to_string() - ); - } - } + Ok(async_std::task::block_on(self.flexidag_service.send( + flexidag_service::GetDagAccumulatorLeaves { + leaf_index: req.accumulator_leaf_index, + batch_size: req.batch_size, + reverse: true, + }, + ))?? + .into_iter() + .map(|leaf| TargetDagAccumulatorLeaf { + accumulator_root: leaf.dag_accumulator_root, + leaf_index: leaf.leaf_index, + }) + .collect()) } fn get_target_dag_accumulator_leaf_detail( &self, req: GetTargetDagAccumulatorLeafDetail, ) -> anyhow::Result> { - let end_index = std::cmp::min( - req.leaf_index + req.batch_size - 1, - self.main.get_dag_current_leaf_number()? - 1, - ); - let mut details = [].to_vec(); - for index in req.leaf_index..=end_index { - let snapshot = self.main.get_dag_accumulator_snapshot_by_index(index)?; - details.push(TargetDagAccumulatorLeafDetail { - accumulator_root: snapshot.accumulator_info.accumulator_root, - tips: snapshot.child_hashes, - }); - } - Ok(details) + let dag_details = + async_std::task::block_on(self.flexidag_service.send(GetDagAccumulatorLeafDetail { + leaf_index: req.leaf_index, + batch_size: req.batch_size, + }))??; + Ok(dag_details + .into_iter() + .map(|detail| TargetDagAccumulatorLeafDetail { + accumulator_root: detail.accumulator_root, + tips: detail.tips, + }) + .collect()) } } diff --git a/chain/src/chain.rs b/chain/src/chain.rs index fd7935dbd9..e577128e52 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -14,6 +14,7 @@ use starcoin_chain_api::{ ExecutedBlock, MintedUncleNumber, TransactionInfoWithProof, VerifiedBlock, VerifyBlockField, }; use starcoin_config::ChainNetworkID; +use starcoin_consensus::dag::types::ghostdata::GhostdagData; use starcoin_consensus::{BlockDAG, Consensus, FlexiDagStorage}; use starcoin_crypto::hash::PlainCryptoHash; use starcoin_crypto::HashValue; @@ -23,11 +24,14 @@ use starcoin_open_block::OpenedBlock; use starcoin_state_api::{AccountStateReader, ChainStateReader, ChainStateWriter}; use starcoin_statedb::ChainStateDB; use starcoin_storage::flexi_dag::SyncFlexiDagSnapshot; +use starcoin_storage::storage::CodecKVStore; use starcoin_storage::Store; use starcoin_time_service::TimeService; -use starcoin_types::block::{BlockIdAndNumber, ParentsHash}; +use starcoin_types::block::BlockIdAndNumber; use starcoin_types::contract_event::ContractEventInfo; +use starcoin_types::dag_block::KTotalDifficulty; use starcoin_types::filter::Filter; +use starcoin_types::header::DagHeader; use starcoin_types::startup_info::{ChainInfo, ChainStatus}; use starcoin_types::transaction::RichTransactionInfo; use starcoin_types::{ @@ -43,6 +47,7 @@ use starcoin_vm_types::account_config::genesis_address; use starcoin_vm_types::genesis_config::ConsensusStrategy; use starcoin_vm_types::on_chain_resource::Epoch; use std::cmp::min; +use std::collections::BTreeSet; use std::iter::Extend; use std::option::Option::{None, Some}; use std::{collections::HashMap, sync::Arc}; @@ -52,19 +57,6 @@ pub struct ChainStatusWithBlock { pub head: Block, } -impl ChainStatusWithBlock { - pub fn new(head_block: Block, block_info: BlockInfo, dag_tips: Vec) -> Self { - Self { - status: ChainStatus::new(head_block.header.clone(), block_info, Some(dag_tips)), - head: head_block, - } - } - - pub fn dag_tips(&self) -> Option<&Vec> { - self.status.tips_hash.as_ref() - } -} - pub struct BlockChain { genesis_hash: HashValue, txn_accumulator: MerkleAccumulator, @@ -76,9 +68,7 @@ pub struct BlockChain { uncles: HashMap, epoch: Epoch, vm_metrics: Option, - dag_accumulator: Option, net: ChainNetworkID, - dag: Option, } impl BlockChain { @@ -88,35 +78,11 @@ impl BlockChain { storage: Arc, net: ChainNetworkID, vm_metrics: Option, - dag: Option, ) -> Result { let head = storage .get_block_by_hash(head_block_hash)? .ok_or_else(|| format_err!("Can not find block by hash {:?}", head_block_hash))?; - - Self::new_with_uncles(time_service, head, None, storage, net, vm_metrics, dag) - } - - fn get_dag_data( - storage: Arc, - header: &BlockHeader, - net: ChainNetworkID, - ) -> Result<(Option, Option>)> { - let (op_dag_accumulator_info, op_tips) = storage.get_flexidag_init_data(header, net)?; - match (op_dag_accumulator_info, op_tips.clone()) { - (Some(dag_accumulator_info), Some(_tips)) => { - let dag_accumulator = Some(info_2_accumulator( - dag_accumulator_info, - AccumulatorStoreType::SyncDag, - storage.as_ref(), - )); - Ok((dag_accumulator, op_tips)) - } - (None, None) => Ok((None, None)), - _ => { - bail!("dag accumulator info and tips should be both None or Some") - } - } + Self::new_with_uncles(time_service, head, None, storage, net, vm_metrics) } fn new_with_uncles( @@ -126,7 +92,6 @@ impl BlockChain { storage: Arc, net: ChainNetworkID, vm_metrics: Option, - dag: Option, ) -> Result { let block_info = storage .get_block_info(head_block.id())? @@ -155,8 +120,6 @@ impl BlockChain { // .get_accumulator_snapshot_storage() // .get(head_id)? // .map(|snapshot| snapshot.child_hashes); - let (dag_accumulator, dag_snapshot_tips) = - Self::get_dag_data(storage.clone(), head_block.header(), net.clone())?; let mut chain = Self { genesis_hash: genesis, time_service, @@ -171,7 +134,7 @@ impl BlockChain { storage.as_ref(), ), status: ChainStatusWithBlock { - status: ChainStatus::new(head_block.header.clone(), block_info, dag_snapshot_tips), + status: ChainStatus::new(head_block.header.clone(), block_info), head: head_block, }, statedb: chain_state, @@ -179,9 +142,7 @@ impl BlockChain { uncles: HashMap::new(), epoch, vm_metrics, - dag_accumulator, net, - dag, }; watch(CHAIN_WATCH_NAME, "n1251"); match uncles { @@ -217,6 +178,7 @@ impl BlockChain { None, genesis_block, None, + None, )?; let new_tips = vec![genesis_id]; @@ -230,15 +192,15 @@ impl BlockChain { .expect("failed to calculate the dag key"), new_tips, dag_accumulator.get_info(), + genesis_id, + [KTotalDifficulty { + head_block_id: genesis_id, + total_difficulty: executed_block.block_info().get_total_difficulty(), + }] + .into_iter() + .collect(), )?; - Self::new( - time_service, - executed_block.block.id(), - storage, - net, - None, - None, - ) + Self::new(time_service, executed_block.block.id(), storage, net, None) } pub fn current_epoch_uncles_size(&self) -> u64 { @@ -346,7 +308,7 @@ impl BlockChain { let final_block_gas_limit = block_gas_limit .map(|block_gas_limit| min(block_gas_limit, on_chain_block_gas_limit)) .unwrap_or(on_chain_block_gas_limit); - let tips_hash = self.status().tips_hash; + let strategy = epoch.strategy(); let difficulty = strategy.calculate_next_difficulty(self)?; let mut opened_block = OpenedBlock::new( @@ -359,7 +321,6 @@ impl BlockChain { difficulty, strategy, None, - tips_hash, None, )?; let excluded_txns = opened_block.push_txns(user_txns)?; @@ -435,11 +396,7 @@ impl BlockChain { } //TODO remove this function. - pub fn update_chain_head( - &mut self, - block: Block, - parents_hash: Option>, - ) -> Result { + pub fn update_chain_head(&mut self, block: Block) -> Result { let block_info = self .storage .get_block_info(block.id())? @@ -466,13 +423,15 @@ impl BlockChain { .storage .get_block_by_hash(blue.id())? .expect("block blue need exist"); - transactions.extend( - blue_block - .transactions() - .iter() - .cloned() - .map(Transaction::UserTransaction), - ); + // Todo: we already added txns of blue_blocks to target block when mining it. + // see create_block_template in MinerService + //transactions.extend( + // blue_block + // .transactions() + // .iter() + // .cloned() + // .map(Transaction::UserTransaction), + //); total_difficulty += blue_block.header.difficulty(); } transactions.extend( @@ -796,97 +755,19 @@ impl BlockChain { pub fn get_block_accumulator(&self) -> &MerkleAccumulator { &self.block_accumulator } - - pub fn get_dag_leaves( - &self, - start_index: u64, - reverse: bool, - batch_size: u64, - ) -> Result> { - self.dag_accumulator - .as_ref() - .ok_or_else(|| anyhow!("dag accumulator is None"))? - .get_leaves(start_index, reverse, batch_size) - } - - pub fn get_dag_current_leaf_number(&self) -> Result { - Ok(self - .dag_accumulator - .as_ref() - .ok_or_else(|| anyhow!("dag accumulator is None"))? - .num_leaves()) - } - - pub fn get_dag_accumulator_snapshot(&self, key: HashValue) -> Result { - Ok(self - .storage - .query_by_hash(key)? - .expect("dag accumualator's snapshot should not be None")) - } - - pub fn get_dag_accumulator_snapshot_by_index( - &self, - leaf_index: u64, - ) -> Result { - let leaf_hash = self - .dag_accumulator - .as_ref() - .ok_or_else(|| anyhow!("dag accumulator is None"))? - .get_leaf(leaf_index)? - .expect("dag accumulator's leaf should not be None"); - Ok(self - .storage - .query_by_hash(leaf_hash)? - .expect("dag accumualator's snapshot should not be None")) - } - - pub fn update_dag_accumulator(&mut self, head_id: HashValue) -> Result<()> { - self.dag_accumulator = Some( - self.dag_accumulator - .as_ref() - .ok_or_else(|| anyhow!("dag accumulator is None"))? - .fork( - self.storage - .get_dag_accumulator_info(head_id) - .expect("accumulator info should not be None"), - ), - ); - Ok(()) - } - - pub fn dag_parents_in_tips(&self, dag_parents: Vec) -> Result { - Ok(dag_parents - .into_iter() - .all(|parent| match &self.status.status.tips_hash { - Some(tips) => tips.contains(&parent), - None => false, - })) - } - - pub fn is_head_of_dag_accumulator(&self, next_tips: Vec) -> Result { - let key = Self::calculate_dag_accumulator_key(next_tips)?; - let next_tips_info = self.storage.get_dag_accumulator_info(key)?; - - return Ok(next_tips_info - == self - .dag_accumulator - .as_ref() - .map(|accumulator| accumulator.get_info())); - } } impl ChainReader for BlockChain { fn info(&self) -> ChainInfo { + let (dag_accumulator, k_total_difficulties) = self.storage.get_lastest_snapshot()?.map(|snapshot| { + (Some(snapshot.accumulator_info), Some(snapshot.k_total_difficulties)) + }).unwrap_or((None, None)); ChainInfo::new( self.status.head.header().chain_id(), self.genesis_hash, self.status.status.clone(), - self.storage - .get_dag_accumulator_info(self.status.head.header().id()) - .expect(&format!( - "the dag accumulator info cannot be found by id: {}", - self.status.head.header().id() - )), + dag_accumulator, + k_total_difficulties, ) } @@ -902,19 +783,6 @@ impl ChainReader for BlockChain { self.status.status.head().clone() } - fn current_tips_hash(&self) -> Option { - match self.status.status.tips_hash.clone() { - Some(tips_hash) => { - assert!(!tips_hash.is_empty()); - Some( - Self::calculate_dag_accumulator_key(tips_hash) - .expect("calculate dag key should be successful"), - ) - } - None => None, - } - } - fn net_id(&self) -> ChainNetworkID { self.net.clone() } @@ -1096,7 +964,6 @@ impl ChainReader for BlockChain { self.storage.clone(), self.net.clone(), self.vm_metrics.clone(), - self.dag.clone(), //TODO: check missing blocks need to be clean ) } @@ -1431,10 +1298,17 @@ impl ChainWriter for BlockChain { } fn connect(&mut self, executed_block: ExecutedBlock) -> Result { - if executed_block.block.is_dag() { - return self.connect_dag(executed_block); - } let (block, block_info) = (executed_block.block(), executed_block.block_info()); + if self.status.status.tips_hash.is_some() { + let mut tips = self.status.status.tips_hash.clone().unwrap(); + tips.sort(); + debug_assert!( + block.header().parent_hash() == Self::calculate_dag_accumulator_key(tips.clone())? + || block.header().parent_hash() == self.status.status.head().id() + ); + } else { + debug_assert!(block.header().parent_hash() == self.status.status.head().id()); + } //TODO try reuse accumulator and state db. let txn_accumulator_info = block_info.get_txn_accumulator_info(); let block_accumulator_info = block_info.get_block_accumulator_info(); @@ -1484,43 +1358,6 @@ impl ChainWriter for BlockChain { fn chain_state(&mut self) -> &ChainStateDB { &self.statedb } - - fn get_current_dag_accumulator_info(&self) -> Result { - Ok(self - .dag_accumulator - .as_ref() - .ok_or_else(|| anyhow!("dag accumualator is None"))? - .get_info()) - } - - fn fork_dag_accumulator(&mut self, accumulator_info: AccumulatorInfo) -> Result<()> { - self.dag_accumulator = Some( - self.dag_accumulator - .as_ref() - .ok_or_else(|| anyhow!("dag accumulator is None"))? - .fork(Some(accumulator_info)), - ); - Ok(()) - } - - fn append_dag_accumulator_leaf( - &mut self, - tips: Vec, - ) -> Result<(HashValue, AccumulatorInfo)> { - let key = Self::calculate_dag_accumulator_key(tips.clone())?; - let dag_accumulator = self - .dag_accumulator - .as_ref() - .ok_or_else(|| anyhow!("dag accumulator is None"))? - .fork(None); - dag_accumulator.append(&[key])?; - dag_accumulator.flush()?; - self.storage - .append_dag_accumulator_leaf(key, tips, dag_accumulator.get_info())?; - let accumulator_info = dag_accumulator.get_info(); - self.dag_accumulator = Some(dag_accumulator); - Ok((key, accumulator_info)) - } } pub(crate) fn info_2_accumulator( diff --git a/chain/src/verifier/mod.rs b/chain/src/verifier/mod.rs index aec2c961a1..2b93cb8da7 100644 --- a/chain/src/verifier/mod.rs +++ b/chain/src/verifier/mod.rs @@ -189,6 +189,10 @@ impl BlockVerifier for BasicVerifier { let current = chain_status.head(); let current_id = current.id(); let expect_number = current.number().saturating_add(1); + + // if chain_status.tips_hash.is_some() { + // let mut tips_hash = chain_status.tips_hash.clone().unwrap(); + // tips_hash.sort(); // dag // todo: For a dag block diff --git a/chain/tests/test_block_chain.rs b/chain/tests/test_block_chain.rs index 0ef43579f3..6ae4469cb3 100644 --- a/chain/tests/test_block_chain.rs +++ b/chain/tests/test_block_chain.rs @@ -131,11 +131,11 @@ fn test_block_chain() -> Result<()> { let mut mock_chain = MockChain::new(ChainNetwork::new_test())?; let block = mock_chain.produce()?; assert_eq!(block.header().number(), 1); - mock_chain.apply(block, None)?; + mock_chain.apply(block)?; assert_eq!(mock_chain.head().current_header().number(), 1); let block = mock_chain.produce()?; assert_eq!(block.header().number(), 2); - mock_chain.apply(block, None)?; + mock_chain.apply(block)?; assert_eq!(mock_chain.head().current_header().number(), 2); Ok(()) } diff --git a/cmd/peer-watcher/src/lib.rs b/cmd/peer-watcher/src/lib.rs index 8b9d2d2500..bb75a86819 100644 --- a/cmd/peer-watcher/src/lib.rs +++ b/cmd/peer-watcher/src/lib.rs @@ -24,6 +24,7 @@ pub fn build_lighting_network( chain_info.genesis_hash(), chain_info.status().clone(), chain_info.dag_accumulator_info().clone(), + chain_info.k_total_difficulties().clone(), ); build_network_worker( network_config, diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 1514cd0c26..3c6203ffb3 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -27,6 +27,7 @@ parking_lot = { workspace = true } itertools = { workspace = true } starcoin-config = { workspace = true } bcs-ext = { workspace = true } +starcoin-accumulator = { package = "starcoin-accumulator", workspace = true } [dev-dependencies] proptest = { workspace = true } diff --git a/consensus/src/dag/blockdag.rs b/consensus/src/dag/blockdag.rs index ed36b7cd73..9398592aac 100644 --- a/consensus/src/dag/blockdag.rs +++ b/consensus/src/dag/blockdag.rs @@ -10,15 +10,29 @@ use crate::consensusdb::{ HeaderStore, ReachabilityStoreReader, RelationsStore, RelationsStoreReader, }, }; +use crate::FlexiDagStorageConfig; use anyhow::{anyhow, bail, Ok}; +use bcs_ext::BCSCodec; use parking_lot::RwLock; -use starcoin_crypto::{HashValue as Hash, HashValue}; -use starcoin_types::block::BlockHeader; +use starcoin_accumulator::accumulator_info::AccumulatorInfo; +use starcoin_accumulator::node::AccumulatorStoreType; +use starcoin_accumulator::{Accumulator, MerkleAccumulator}; +use starcoin_config::{NodeConfig, RocksdbConfig}; +use starcoin_crypto::HashValue as Hash; +use starcoin_storage::flexi_dag::SyncFlexiDagSnapshotHasher; +use starcoin_storage::storage::CodecKVStore; +use starcoin_storage::{BlockStore, Storage, Store, SyncFlexiDagStore}; +use starcoin_types::block::BlockNumber; +use starcoin_types::dag_block::KTotalDifficulty; +use starcoin_types::startup_info; use starcoin_types::{ blockhash::{BlockHashes, KType}, consensus_header::ConsensusHeader, }; -use std::sync::Arc; +use std::collections::{HashSet, BTreeSet}; +use std::collections::{BinaryHeap, HashMap}; +use std::path::Path; +use std::sync::{Arc, Mutex}; pub type DbGhostdagManager = GhostdagManager< DbGhostdagStore, @@ -57,6 +71,90 @@ impl BlockDAG { dag } + pub fn calculate_dag_accumulator_key(snapshot: &SyncFlexiDagSnapshotHasher) -> anyhow::Result { + Ok(Hash::sha3_256_of(&snapshot.encode().expect( + "encoding the sorted relatship set must be successful", + ))) + } + + pub fn try_init_with_storage( + storage: Arc, + config: Arc, + ) -> anyhow::Result<(Option, Option)> { + let startup_info = storage + .get_startup_info()? + .expect("startup info must exist"); + if let Some(key) = startup_info.get_dag_main() { + let accumulator_info = storage + .get_dag_accumulator_info()? + .expect("dag accumulator info should exist"); + assert!( + accumulator_info.get_num_leaves() > 0, + "the number of dag accumulator leaf must be greater than 0" + ); + let dag_accumulator = MerkleAccumulator::new_with_info( + accumulator_info, + storage.get_accumulator_store(AccumulatorStoreType::SyncDag), + ); + + Ok(( + Some(Self::new_by_config( + config.data_dir().join("flexidag").as_path(), + )?), + Some(dag_accumulator), + )) + } else { + let block_header = storage + .get_block_header_by_hash(startup_info.get_main().clone())? + .expect("the genesis block in dag accumulator must none be none"); + let fork_height = storage.dag_fork_height(config.net().id().clone()); + if block_header.number() < fork_height { + Ok((None, None)) + } else if block_header.number() == fork_height { + let dag_accumulator = MerkleAccumulator::new_with_info( + AccumulatorInfo::default(), + storage.get_accumulator_store(AccumulatorStoreType::SyncDag), + ); + + + let mut k_total_difficulties = BTreeSet::new(); + k_total_difficulties.insert(KTotalDifficulty { + head_block_id: block_header.id(), + total_difficulty: storage + .get_block_info(block_header.id())? + .expect("block info must exist") + .get_total_difficulty(), + }); + let snapshot_hasher = SyncFlexiDagSnapshotHasher { + child_hashes: vec![block_header.id()], + head_block_id: block_header.id(), + k_total_difficulties, + }; + let key = Self::calculate_dag_accumulator_key(&snapshot_hasher)?; + dag_accumulator.append(&[key])?; + storage.get_accumulator_snapshot_storage().put( + key, + snapshot_hasher.to_snapshot(dag_accumulator.get_info()), + )?; + dag_accumulator.flush()?; + Ok(( + Some(Self::new_by_config( + config.data_dir().join("flexidag").as_path(), + )?), + Some(dag_accumulator), + )) + } else { + bail!("failed to init dag") + } + } + } + + pub fn new_by_config(db_path: &Path) -> anyhow::Result { + let config = FlexiDagStorageConfig::create_with_params(1, RocksdbConfig::default()); + let db = FlexiDagStorage::create_from_path(db_path, config)?; + let dag = Self::new(16, db); + Ok(dag) + } pub fn init_with_genesis(&self, genesis: BlockHeader) -> anyhow::Result<()> { let origin = genesis.parent_hash(); if self.storage.relations_store.has(origin)? { diff --git a/flexidag/Cargo.toml b/flexidag/Cargo.toml index b005187ef3..00e042c08b 100644 --- a/flexidag/Cargo.toml +++ b/flexidag/Cargo.toml @@ -1,8 +1,8 @@ [package] +name = "starcoin-flexidag" authors = { workspace = true } edition = { workspace = true } license = { workspace = true } -name = "flexidag" publish = { workspace = true } version = "1.13.7" homepage = { workspace = true } @@ -11,6 +11,7 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } +async-trait = { workspace = true } byteorder = { workspace = true } futures = { workspace = true } @@ -22,13 +23,17 @@ rand = { workspace = true } rand_core = { default-features = false, workspace = true } rust-argon2 = { workspace = true } sha3 = { workspace = true } +starcoin-config = { workspace = true } starcoin-chain-api = { workspace = true } starcoin-crypto = { workspace = true } starcoin-logger = { workspace = true } +starcoin-service-registry = { workspace = true } starcoin-state-api = { workspace = true } starcoin-time-service = { workspace = true } starcoin-types = { workspace = true } starcoin-vm-types = { workspace = true } +tokio = { workspace = true } +starcoin-accumulator = { workspace = true } thiserror = { workspace = true } rocksdb = { workspace = true } bincode = { workspace = true } diff --git a/flexidag/service/Cargo.toml b/flexidag/service/Cargo.toml index e024ed7d3a..96b5a0cf2d 100644 --- a/flexidag/service/Cargo.toml +++ b/flexidag/service/Cargo.toml @@ -38,4 +38,4 @@ parking_lot = { workspace = true } itertools = { workspace = true } starcoin-config = { workspace = true } bcs-ext = { workspace = true } -flexidag = {workspace = true} \ No newline at end of file +starcoin-flexidag = {workspace = true} \ No newline at end of file diff --git a/flexidag/src/lib.rs b/flexidag/src/lib.rs index 8335f05670..fd332ebcee 100644 --- a/flexidag/src/lib.rs +++ b/flexidag/src/lib.rs @@ -7,3 +7,5 @@ pub use consensusdb::consensus_relations::{ }; pub use consensusdb::prelude::{FlexiDagStorage, FlexiDagStorageConfig, StoreError}; pub use consensusdb::schema; +pub mod flexidag_service; +pub use flexidag_service::FlexidagService; diff --git a/miner/Cargo.toml b/miner/Cargo.toml index d5180be4e1..6a023bf48c 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -27,6 +27,8 @@ starcoin-txpool-api = { workspace = true } starcoin-vm-types = { workspace = true } tokio = { features = ["full"], workspace = true } starcoin-types = { package = "starcoin-types", workspace = true } +starcoin-flexidag = { workspace = true } +async-std = { workspace = true } [dev-dependencies] starcoin-network-rpc = { package = "starcoin-network-rpc", workspace = true } diff --git a/miner/src/create_block_template/mod.rs b/miner/src/create_block_template/mod.rs index 1ea977f4d2..c64accdd13 100644 --- a/miner/src/create_block_template/mod.rs +++ b/miner/src/create_block_template/mod.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::create_block_template::metrics::BlockBuilderMetrics; -use anyhow::{format_err, Result}; +use anyhow::{bail, format_err, Result}; use futures::executor::block_on; use starcoin_account_api::{AccountAsyncService, AccountInfo, DefaultAccountChangeEvent}; use starcoin_account_service::AccountService; @@ -13,10 +13,13 @@ use starcoin_config::NodeConfig; use starcoin_consensus::{BlockDAG, Consensus}; use starcoin_crypto::hash::HashValue; use starcoin_executor::VMMetrics; +use starcoin_flexidag::flexidag_service::GetDagTips; +use starcoin_flexidag::{flexidag_service, FlexidagService}; use starcoin_logger::prelude::*; use starcoin_open_block::OpenedBlock; use starcoin_service_registry::{ - ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRequest, + ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRef, + ServiceRequest, }; use starcoin_storage::{BlockStore, Storage, Store}; use starcoin_txpool::TxPoolService; @@ -79,8 +82,7 @@ impl ServiceFactory for BlockBuilderService { .and_then(|registry| BlockBuilderMetrics::register(registry).ok()); let vm_metrics = ctx.get_shared_opt::()?; - let dag = ctx.get_shared::()?; - + let flexidag_service = ctx.service_ref::()?.clone(); let inner = Inner::new( config.net(), storage, @@ -89,8 +91,8 @@ impl ServiceFactory for BlockBuilderService { config.miner.block_gas_limit, miner_account, metrics, + flexidag_service, vm_metrics, - dag, )?; Ok(Self { inner }) } @@ -193,8 +195,8 @@ pub struct Inner

{ local_block_gas_limit: Option, miner_account: AccountInfo, metrics: Option, + flexidag_service: ServiceRef, vm_metrics: Option, - dag: BlockDAG, } impl

Inner

@@ -209,8 +211,8 @@ where local_block_gas_limit: Option, miner_account: AccountInfo, metrics: Option, + flexidag_service: ServiceRef, vm_metrics: Option, - dag: BlockDAG, ) -> Result { let chain = BlockChain::new( net.time_service(), @@ -218,7 +220,6 @@ where storage.clone(), net.id().clone(), vm_metrics.clone(), - Some(dag.clone()), )?; Ok(Inner { @@ -230,8 +231,8 @@ where local_block_gas_limit, miner_account, metrics, + flexidag_service, vm_metrics, - dag, }) } @@ -260,7 +261,6 @@ where self.storage.clone(), self.chain.net_id(), self.vm_metrics.clone(), - Some(self.dag.clone()), )?; //current block possible be uncle. self.uncles.insert(current_id, current_header); @@ -318,7 +318,8 @@ where // block_gas_limit / min_gas_per_txn let max_txns = (block_gas_limit / 200) * 2; - let txns = self.tx_provider.get_txns(max_txns); + let mut txns = self.tx_provider.get_txns(max_txns); + let author = *self.miner_account.address(); let previous_header = self.chain.current_header(); diff --git a/network-p2p/types/src/peer_id.rs b/network-p2p/types/src/peer_id.rs index 7b84e334d4..88049e769a 100644 --- a/network-p2p/types/src/peer_id.rs +++ b/network-p2p/types/src/peer_id.rs @@ -12,7 +12,7 @@ use std::convert::TryFrom; use std::fmt; use std::str::FromStr; -#[derive(Eq, PartialEq, Hash, Clone, Debug, JsonSchema)] +#[derive(Eq, PartialEq, Hash, Clone, Debug, JsonSchema, Copy)] pub struct PeerId(#[schemars(with = "String")] Libp2pPeerId); impl PeerId { diff --git a/network/api/src/peer_provider.rs b/network/api/src/peer_provider.rs index 0987895bbf..52ef7d8271 100644 --- a/network/api/src/peer_provider.rs +++ b/network/api/src/peer_provider.rs @@ -7,6 +7,7 @@ use anyhow::Result; use futures::channel::oneshot::Receiver; use futures::future::BoxFuture; use itertools::Itertools; +use network_p2p_types::peer_id; use network_p2p_types::{peer_id::PeerId, ReputationChange}; use parking_lot::Mutex; use rand::prelude::IteratorRandom; @@ -91,6 +92,7 @@ pub enum PeerStrategy { WeightedRandom, Best, Avg, + DagSync(PeerId), } impl Default for PeerStrategy { @@ -106,6 +108,7 @@ impl std::fmt::Display for PeerStrategy { Self::WeightedRandom => "weighted", Self::Best => "top", Self::Avg => "avg", + PeerStrategy::DagSync(_) => "dag_sync", }; write!(f, "{}", display) } @@ -314,6 +317,14 @@ impl PeerSelector { } } + pub fn peer_infos(&self) -> Vec { + self.details + .lock() + .iter() + .map(|peer| peer.peer_info.clone()) + .collect() + } + pub fn peers(&self) -> Vec { self.details .lock() @@ -384,13 +395,18 @@ impl PeerSelector { .load(Ordering::SeqCst) .checked_div(self.len() as u64)?; if avg_score < 200 { - return self.random(); + if let PeerStrategy::DagSync(peer_id) = &self.strategy { + return Some(peer_id.clone()); + } else { + return self.random(); + } } match &self.strategy { PeerStrategy::Random => self.random(), PeerStrategy::WeightedRandom => self.weighted_random(), PeerStrategy::Best => self.top_score(), PeerStrategy::Avg => self.avg_score(), + PeerStrategy::DagSync(peer_id) => Some(peer_id.clone()), } } diff --git a/network/api/src/tests.rs b/network/api/src/tests.rs index cf18f2ce9b..5264d236b9 100644 --- a/network/api/src/tests.rs +++ b/network/api/src/tests.rs @@ -41,6 +41,7 @@ fn test_peer_selector() { HashValue::zero(), mock_chain_status(100.into()), None, + None, ), vec![], vec![], @@ -53,6 +54,7 @@ fn test_peer_selector() { HashValue::zero(), mock_chain_status(99.into()), None, + None, ), vec![], vec![], @@ -65,6 +67,7 @@ fn test_peer_selector() { HashValue::zero(), mock_chain_status(100.into()), None, + None, ), vec![], vec![], @@ -77,6 +80,7 @@ fn test_peer_selector() { HashValue::zero(), mock_chain_status(1.into()), None, + None, ), vec![], vec![], diff --git a/network/src/service.rs b/network/src/service.rs index ee81010783..dc9f1c188e 100644 --- a/network/src/service.rs +++ b/network/src/service.rs @@ -559,10 +559,6 @@ impl Inner { peer_info.peer_info.update_chain_status(ChainStatus::new( block_header.clone(), compact_block_message.block_info.clone(), - compact_block_message - .compact_block - .dag_parent_and_tips() - .map(|s| s.1.iter().map(|h| h.id()).collect::>()), )); if self.self_peer.known_blocks.contains(&block_id) { @@ -721,15 +717,11 @@ impl Inner { //2. Sync status change. // may be update by repeat message, but can not find a more good way. self.network_service.update_business_status( - ChainStatus::new( - msg.compact_block.header.clone(), - msg.block_info.clone(), - msg.compact_block - .dag_parent_and_tips() - .map(|s| s.1.iter().map(|h| h.id()).collect::>()), - ) - .encode() - .expect("Encoding the compact_block.header and block_info must be successful"), + ChainStatus::new(msg.compact_block.header.clone(), msg.block_info.clone()) + .encode() + .expect( + "Encoding the compact_block.header and block_info must be successful", + ), ); self.self_peer.known_blocks.put(id, ()); diff --git a/network/tests/network_service_test.rs b/network/tests/network_service_test.rs index 15b6b9318f..897a0c35c9 100644 --- a/network/tests/network_service_test.rs +++ b/network/tests/network_service_test.rs @@ -39,6 +39,7 @@ fn build_test_network_services(num: usize) -> Vec { HashValue::random(), ChainStatus::random(), None, + None, ); for _index in 0..num { let mut boot_nodes = Vec::new(); diff --git a/node/src/node.rs b/node/src/node.rs index 356120448b..9a476de876 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -13,11 +13,13 @@ use futures::executor::block_on; use futures_timer::Delay; use network_api::{PeerProvider, PeerSelector, PeerStrategy}; use starcoin_account_service::{AccountEventService, AccountService, AccountStorage}; +use starcoin_accumulator::node::AccumulatorStoreType; use starcoin_block_relayer::BlockRelayer; use starcoin_chain_notify::ChainNotifyHandlerService; use starcoin_chain_service::ChainReaderService; use starcoin_config::NodeConfig; -use starcoin_consensus::{BlockDAG, FlexiDagStorage}; +use starcoin_consensus::{BlockDAG, FlexiDagStorage, FlexiDagStorageConfig}; +use starcoin_crypto::HashValue; use starcoin_genesis::{Genesis, GenesisError}; use starcoin_logger::prelude::*; use starcoin_logger::structured_log::init_slog_logger; @@ -220,13 +222,7 @@ impl ServiceHandler for NodeService { block_hash ) })?; - let result = connect_service - .send(ExecuteRequest { - block, - dag_block_parent: parents, - dag_transaction_parent: None, - }) - .await??; + let result = connect_service.send(ExecuteRequest { block }).await??; info!("Re execute result: {:?}", result); Ok(()) }; diff --git a/rpc/server/src/module/chain_rpc.rs b/rpc/server/src/module/chain_rpc.rs index eea1652095..db62630e2e 100644 --- a/rpc/server/src/module/chain_rpc.rs +++ b/rpc/server/src/module/chain_rpc.rs @@ -73,7 +73,7 @@ where let fut = async move { let chain_status = service.main_status().await?; //TODO get chain info from chain service. - Ok(ChainInfo::new(chain_id, genesis_hash, chain_status, None).into()) + Ok(ChainInfo::new(chain_id, genesis_hash, chain_status, None, None).into()) }; Box::pin(fut.boxed().map_err(map_err)) } diff --git a/state/service/src/service.rs b/state/service/src/service.rs index 6f86426341..f54738a1e8 100644 --- a/state/service/src/service.rs +++ b/state/service/src/service.rs @@ -131,7 +131,7 @@ impl ServiceHandler for ChainStateService { impl EventHandler for ChainStateService { fn handle_event(&mut self, msg: NewHeadBlock, _ctx: &mut ServiceContext) { - let NewHeadBlock(block, _) = msg; + let NewHeadBlock(block) = msg; let state_root = block.header().state_root(); debug!("ChainStateActor change StateRoot to : {:?}", state_root); diff --git a/storage/src/flexi_dag/mod.rs b/storage/src/flexi_dag/mod.rs index c3333272d7..e79065d98d 100644 --- a/storage/src/flexi_dag/mod.rs +++ b/storage/src/flexi_dag/mod.rs @@ -1,4 +1,7 @@ -use std::sync::Arc; +use std::{ + collections::BTreeSet, + sync::Arc, +}; use crate::{ accumulator::{AccumulatorStorage, DagBlockAccumulatorStorage}, @@ -11,11 +14,43 @@ use bcs_ext::BCSCodec; use serde::{Deserialize, Serialize}; use starcoin_accumulator::accumulator_info::AccumulatorInfo; use starcoin_crypto::HashValue; +use starcoin_types::dag_block::KTotalDifficulty; #[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)] pub struct SyncFlexiDagSnapshot { pub child_hashes: Vec, // child nodes(tips), to get the relationship, use dag's relationship store pub accumulator_info: AccumulatorInfo, + pub head_block_id: HashValue, // to initialize the BlockInfo + pub k_total_difficulties: BTreeSet, // the k-th smallest total difficulty +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)] +pub struct SyncFlexiDagSnapshotHasher { + pub child_hashes: Vec, // child nodes(tips), to get the relationship, use dag's relationship store + pub head_block_id: HashValue, // to initialize the BlockInfo + pub k_total_difficulties: BTreeSet, // the k-th smallest total difficulty +} + +impl SyncFlexiDagSnapshotHasher { + pub fn to_snapshot(self, accumulator_info: AccumulatorInfo) -> SyncFlexiDagSnapshot { + SyncFlexiDagSnapshot { + child_hashes: self.child_hashes, + accumulator_info, + head_block_id: self.head_block_id, + k_total_difficulties: self.k_total_difficulties, + } + } +} + +impl From for SyncFlexiDagSnapshotHasher { + fn from(mut value: SyncFlexiDagSnapshot) -> Self { + value.child_hashes.sort(); + SyncFlexiDagSnapshotHasher { + child_hashes: value.child_hashes, + head_block_id: value.head_block_id, + k_total_difficulties: value.k_total_difficulties + } + } } impl ValueCodec for SyncFlexiDagSnapshot { diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 56c5efda89..9902e9f564 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -15,13 +15,15 @@ use crate::{ transaction::TransactionStorage, transaction_info::{TransactionInfoHashStorage, TransactionInfoStorage}, }; -use anyhow::{bail, format_err, Error, Ok, Result}; +use anyhow::{anyhow, bail, format_err, Error, Ok, Result}; use flexi_dag::{SyncFlexiDagSnapshot, SyncFlexiDagSnapshotStorage, SyncFlexiDagStorage}; use network_p2p_types::peer_id::PeerId; use num_enum::{IntoPrimitive, TryFromPrimitive}; use once_cell::sync::Lazy; use starcoin_accumulator::{ - accumulator_info::AccumulatorInfo, node::AccumulatorStoreType, AccumulatorTreeStore, + accumulator_info::{self, AccumulatorInfo}, + node::AccumulatorStoreType, + AccumulatorTreeStore, MerkleAccumulator, Accumulator, }; use starcoin_config::ChainNetworkID; use starcoin_crypto::HashValue; @@ -29,7 +31,9 @@ use starcoin_state_store_api::{StateNode, StateNodeStore}; use starcoin_types::{ block::{Block, BlockBody, BlockHeader, BlockInfo}, contract_event::ContractEvent, - startup_info::{ChainInfo, ChainStatus, SnapshotRange, StartupInfo}, + dag_block::KTotalDifficulty, + header, + startup_info::{self, ChainInfo, ChainStatus, SnapshotRange, StartupInfo}, transaction::{RichTransactionInfo, Transaction}, }; use starcoin_vm_types::{ @@ -37,7 +41,7 @@ use starcoin_vm_types::{ state_store::table::{TableHandle, TableInfo}, }; use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, fmt::{Debug, Display, Formatter}, sync::Arc, }; @@ -326,14 +330,13 @@ pub trait SyncFlexiDagStore { key: HashValue, new_tips: Vec, accumulator_info: AccumulatorInfo, + head_block_id: HashValue, + k_total_difficulties: BTreeSet, ) -> Result<()>; - fn get_dag_accumulator_info(&self, block_id: HashValue) -> Result>; + fn get_dag_accumulator_info(&self) -> Result>; fn get_tips_by_block_id(&self, block_id: HashValue) -> Result>; - fn get_flexidag_init_data( - &self, - head_block: &BlockHeader, - id: ChainNetworkID, - ) -> Result<(Option, Option>)>; + fn dag_fork_height(&self, id: ChainNetworkID) -> BlockNumber; + fn get_lastest_snapshot(&self) -> Result>; } // TODO: remove Arc, we can clone Storage directly. @@ -451,15 +454,13 @@ impl BlockStore for Storage { let head_block_info = self.get_block_info(head_block.id())?.ok_or_else(|| { format_err!("Startup block info {:?} should exist", startup_info.main) })?; - - let (flexi_dag_accumulator_info, tips_header_hash) = - self.get_flexidag_init_data(&head_block, id)?; - + let snapshot = self.get_lastest_snapshot()?.ok_or_else(|| anyhow!("latest snapshot is none"))?; let chain_info = ChainInfo::new( head_block.chain_id(), genesis_hash, - ChainStatus::new(head_block.clone(), head_block_info, tips_header_hash), - flexi_dag_accumulator_info, + ChainStatus::new(head_block.clone(), head_block_info), + Some(snapshot.accumulator_info), + Some(snapshot.k_total_difficulties), ); Ok(Some(chain_info)) } @@ -672,68 +673,73 @@ impl SyncFlexiDagStore for Storage { self.flexi_dag_storage.get_snapshot_storage() } - fn get_dag_accumulator_info(&self, block_id: HashValue) -> Result> { - match self - .flexi_dag_storage - .get_snapshot_storage() - .get(block_id)? - { - Some(snapshot) => Ok(Some(snapshot.accumulator_info)), - None => Ok(None), + fn get_lastest_snapshot(&self) -> Result> { + let info = self.get_dag_accumulator_info()?.ok_or_else(|| anyhow!("dag startup info is none"))?; + let merkle_tree = MerkleAccumulator::new_with_info(info, self.get_accumulator_store(AccumulatorStoreType::SyncDag)); + let key = merkle_tree.get_leaf(merkle_tree.num_leaves() - 1)?.ok_or_else(|| anyhow!("faile to get the key since it is none"))?; + self.query_by_hash(key) + } + + fn get_dag_accumulator_info(&self) -> Result> { + let startup_info = self.get_startup_info()?; + if startup_info.is_none() { + return Ok(None); } + + let dag_main = startup_info.unwrap().get_dag_main(); + if dag_main.is_none() { + return Ok(None); + } + + let dag_main = dag_main.unwrap(); + + Ok(Some( + self.flexi_dag_storage + .get_snapshot_storage() + .get(dag_main)? + .expect("snapshot should not be none") + .accumulator_info, + )) } + // update dag accumulator fn append_dag_accumulator_leaf( &self, key: HashValue, new_tips: Vec, accumulator_info: AccumulatorInfo, + head_block_id: HashValue, + k_total_difficulties: BTreeSet, ) -> Result<()> { let snapshot = SyncFlexiDagSnapshot { child_hashes: new_tips.clone(), accumulator_info: accumulator_info.clone(), + head_block_id, + k_total_difficulties, }; // for sync - if self.flexi_dag_storage.get_hashes_by_hash(key)?.is_some() { - if let Some(t) = self.flexi_dag_storage.get_hashes_by_hash(key)? { - if t != snapshot { - bail!("the accumulator differ from other"); - } + if let Some(t) = self.flexi_dag_storage.get_hashes_by_hash(key)? { + if t != snapshot { + panic!("the accumulator differ from other"); } + } else { + self.flexi_dag_storage.put_hashes(key, snapshot)?; } - self.flexi_dag_storage.put_hashes(key, snapshot.clone())?; - - // for block chain - new_tips.iter().try_fold((), |_, block_id| { - if let Some(t) = self - .flexi_dag_storage - .get_hashes_by_hash(block_id.clone())? - { - if t != snapshot { - bail!("the key {} should not exists", block_id); - } - } - self.flexi_dag_storage - .put_hashes(block_id.clone(), snapshot.clone()) - })?; + Ok(()) } - fn get_tips_by_block_id(&self, block_id: HashValue) -> Result> { - match self.query_by_hash(block_id)? { + fn get_tips_by_block_id(&self, key: HashValue) -> Result> { + match self.query_by_hash(key)? { Some(snapshot) => Ok(snapshot.child_hashes), None => { - bail!("failed to get snapshot by hash: {}", block_id); + bail!("failed to get snapshot by hash: {}", key); } } } - fn get_flexidag_init_data( - &self, - head_block: &BlockHeader, - id: ChainNetworkID, - ) -> Result<(Option, Option>)> { - let flexi_dag_number = match id { + fn dag_fork_height(&self, id: ChainNetworkID) -> BlockNumber { + match id { ChainNetworkID::Builtin(network_id) => match network_id { starcoin_config::BuiltinNetworkID::Test => TEST_FLEXIDAG_FORK_HEIGHT, starcoin_config::BuiltinNetworkID::Dev => DEV_FLEXIDAG_FORK_HEIGHT, @@ -743,28 +749,7 @@ impl SyncFlexiDagStore for Storage { starcoin_config::BuiltinNetworkID::Main => MAIN_FLEXIDAG_FORK_HEIGHT, }, ChainNetworkID::Custom(_) => DEV_FLEXIDAG_FORK_HEIGHT, - }; - - let (dag_accumulator_info, tips) = if flexi_dag_number == head_block.number() { - ( - Some(AccumulatorInfo::default()), - Some(vec![head_block.id()]), - ) - } else if flexi_dag_number < head_block.number() { - let dag_accumulator_info = self - .get_dag_accumulator_info(head_block.id())? - .expect("the dag accumulator info must exist!"); - let tips = self.get_tips_by_block_id(head_block.id())?; - assert!( - tips.len() > 0, - "the length of the tips must be greater than 0" - ); - (Some(dag_accumulator_info), Some(tips)) - } else { - (None, None) - }; - - Ok((dag_accumulator_info, tips)) + } } } diff --git a/storage/src/tests/test_dag.rs b/storage/src/tests/test_dag.rs index 159c905ba2..046b400498 100644 --- a/storage/src/tests/test_dag.rs +++ b/storage/src/tests/test_dag.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeSet; + use starcoin_accumulator::{accumulator_info::AccumulatorInfo, Accumulator, MerkleAccumulator}; use starcoin_config::RocksdbConfig; use starcoin_crypto::HashValue; @@ -52,6 +54,7 @@ impl SyncFlexiDagManagerImp { } } +// jacktest todo: fix this impl SyncFlexiDagManager for SyncFlexiDagManagerImp { fn insert_hashes(&self, mut child_hashes: Vec) -> Result { child_hashes.sort(); @@ -62,6 +65,8 @@ impl SyncFlexiDagManager for SyncFlexiDagManagerImp { SyncFlexiDagSnapshot { child_hashes, accumulator_info: self.get_accumulator_info(), + k_total_difficulties: BTreeSet::new(), + head_block_id: accumulator_key, }, )?; Ok(accumulator_key) @@ -194,6 +199,7 @@ fn test_syn_dag_accumulator_insert_and_find() { ); } +#[ignore = "todo to use a new test"] #[test] fn test_syn_dag_accumulator_fork() { let mut syn_accumulator = SyncFlexiDagManagerImp::new(); diff --git a/storage/src/tests/test_storage.rs b/storage/src/tests/test_storage.rs index 90e2cdecff..1c098eeba8 100644 --- a/storage/src/tests/test_storage.rs +++ b/storage/src/tests/test_storage.rs @@ -25,6 +25,9 @@ use starcoin_types::transaction::{ RichTransactionInfo, SignedUserTransaction, Transaction, TransactionInfo, }; use starcoin_types::vm_error::KeptVMStatus; +use starcoin_vm_types::account_address::AccountAddress; +use starcoin_vm_types::language_storage::TypeTag; +use starcoin_vm_types::state_store::table::{TableHandle, TableInfo}; //use starcoin_vm_types::account_address::AccountAddress; //use starcoin_vm_types::state_store::table::{TableHandle, TableInfo}; use std::path::Path; @@ -296,7 +299,7 @@ fn generate_old_db(path: &Path) -> Result> { BlockBody::new(vec![txn.clone()], None), ); let mut txn_inf_ids = vec![]; - let block_metadata = block.to_metadata(0, None); + let block_metadata = block.to_metadata(0); let txn_info_0 = TransactionInfo::new( block_metadata.id(), HashValue::random(), diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 256457897d..7210d528ca 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -44,6 +44,7 @@ sysinfo = { workspace = true } thiserror = { workspace = true } starcoin-consensus = { workspace = true } timeout-join-handler = { workspace = true } +starcoin-flexidag = { workspace = true } [dev-dependencies] hex = { workspace = true } diff --git a/sync/api/src/lib.rs b/sync/api/src/lib.rs index d541abfa25..60f4c869b2 100644 --- a/sync/api/src/lib.rs +++ b/sync/api/src/lib.rs @@ -27,16 +27,11 @@ pub struct StartSyncTxnEvent; pub struct PeerNewBlock { peer_id: PeerId, new_block: Block, - dag_parents: Option>, } impl PeerNewBlock { - pub fn new(peer_id: PeerId, new_block: Block, dag_parents: Option>) -> Self { - PeerNewBlock { - peer_id, - new_block, - dag_parents, - } + pub fn new(peer_id: PeerId, new_block: Block) -> Self { + PeerNewBlock { peer_id, new_block } } pub fn get_peer_id(&self) -> PeerId { @@ -46,10 +41,6 @@ impl PeerNewBlock { pub fn get_block(&self) -> &Block { &self.new_block } - - pub fn get_dag_parents(&self) -> &Option> { - &self.dag_parents - } } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/sync/src/block_connector/block_connector_service.rs b/sync/src/block_connector/block_connector_service.rs index d53cee6b16..2cbd91a85a 100644 --- a/sync/src/block_connector/block_connector_service.rs +++ b/sync/src/block_connector/block_connector_service.rs @@ -3,6 +3,7 @@ #[cfg(test)] use super::CheckBlockConnectorHashValue; +use crate::block_connector::write_block_chain::ConnectOk; use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService}; use crate::sync::{CheckSyncEvent, SyncService}; use crate::tasks::{BlockConnectedEvent, BlockConnectedFinishEvent, BlockDiskCheckEvent}; @@ -13,15 +14,17 @@ use network_api::PeerProvider; use parking_lot::Mutex; use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService}; use starcoin_config::{NodeConfig, G_CRATE_VERSION}; +use starcoin_consensus::dag::blockdag::InitDagState; use starcoin_consensus::BlockDAG; use starcoin_crypto::HashValue; use starcoin_executor::VMMetrics; +use starcoin_flexidag::FlexidagService; use starcoin_logger::prelude::*; use starcoin_network::NetworkServiceRef; use starcoin_service_registry::{ ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, }; -use starcoin_storage::{BlockStore, Storage}; +use starcoin_storage::{flexi_dag, BlockStore, Storage}; use starcoin_sync_api::PeerNewBlock; use starcoin_txpool::TxPoolService; use starcoin_txpool_api::TxPoolSyncService; @@ -30,7 +33,8 @@ use starcoin_txpool_mock_service::MockTxPoolService; use starcoin_types::block::ExecutedBlock; use starcoin_types::sync_status::SyncStatus; use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown}; -use std::sync::Arc; +use std::result; +use std::sync::{Arc, Mutex}; use sysinfo::{DiskExt, System, SystemExt}; const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3; @@ -141,7 +145,7 @@ where txpool, bus, vm_metrics, - dag.clone(), + ctx.service_ref::()?.clone(), )?; Ok(Self::new(chain_service, config)) @@ -240,7 +244,7 @@ impl EventHandler for BlockConnectorService { - if let Err(e) = self.chain_service.apply_failed(block, msg.dag_parents) { + if let Err(e) = self.chain_service.apply_failed(block) { error!("Process connected new block from sync error: {:?}", e); } } @@ -260,13 +264,19 @@ impl EventHandler where TransactionPoolServiceT: TxPoolSyncService + 'static, { - fn handle_event(&mut self, msg: MinedBlock, _ctx: &mut ServiceContext) { + fn handle_event(&mut self, msg: MinedBlock, ctx: &mut ServiceContext) { let MinedBlock(new_block) = msg; + let block_header = new_block.header().clone(); let id = new_block.header().id(); debug!("try connect mined block: {}", id); - match self.chain_service.try_connect(new_block.as_ref().clone()) { - std::result::Result::Ok(_) => debug!("Process mined block {} success.", id), + match self.chain_service.try_connect(block) { + std::result::Result::Ok(ConnectOk::DagConnected) => { + match self.chain_service.dump_tips(block_header) { + std::result::Result::Ok(_) => (), + Err(e) => error!("failed to dump tips to dag accumulator: {}", e), + } + } Err(e) => { warn!("Process mined block {} fail, error: {:?}", id, e); } @@ -300,6 +310,8 @@ where std::result::Result::Ok(connect_error) => { match connect_error { ConnectBlockError::FutureBlock(block) => { + self.chain_service + .update_tips(msg.get_block().header().clone())?; //TODO cache future block if let std::result::Result::Ok(sync_service) = ctx.service_ref::() diff --git a/sync/src/block_connector/mod.rs b/sync/src/block_connector/mod.rs index 3e69c86527..a567f76c35 100644 --- a/sync/src/block_connector/mod.rs +++ b/sync/src/block_connector/mod.rs @@ -38,8 +38,6 @@ impl ServiceRequest for ResetRequest { #[derive(Debug, Clone)] pub struct ExecuteRequest { pub block: Block, - pub dag_block_parent: Option>, - pub dag_transaction_parent: Option, } impl ServiceRequest for ExecuteRequest { diff --git a/sync/src/block_connector/test_write_block_chain.rs b/sync/src/block_connector/test_write_block_chain.rs index 304cd72a82..62ffc34ca4 100644 --- a/sync/src/block_connector/test_write_block_chain.rs +++ b/sync/src/block_connector/test_write_block_chain.rs @@ -6,9 +6,11 @@ use parking_lot::Mutex; use starcoin_account_api::AccountInfo; use starcoin_chain::{BlockChain, ChainReader}; use starcoin_chain_service::WriteableChainService; -use starcoin_config::NodeConfig; +use starcoin_config::{NodeConfig, RocksdbConfig}; +use starcoin_consensus::Consensus; use starcoin_consensus::{BlockDAG, Consensus, FlexiDagStorage, FlexiDagStorageConfig}; use starcoin_crypto::HashValue; +use starcoin_genesis::Genesis as StarcoinGenesis; use starcoin_service_registry::bus::BusService; use starcoin_service_registry::{RegistryAsyncService, RegistryService}; use starcoin_storage::Store; @@ -18,7 +20,7 @@ use starcoin_types::block::Block; use starcoin_types::blockhash::ORIGIN; use starcoin_types::consensus_header::Header; use starcoin_types::startup_info::StartupInfo; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; pub async fn create_writeable_block_chain() -> ( WriteBlockChainService, @@ -43,15 +45,11 @@ pub async fn create_writeable_block_chain() -> ( ) .expect("init chain and genesis error"); - let flex_dag_config = FlexiDagStorageConfig::create_with_params(1, 0, 1024); + let flex_dag_config = FlexiDagStorageConfig::create_with_params(1, RocksdbConfig::default()); let flex_dag_db = FlexiDagStorage::create_from_path("./smolstc", flex_dag_config) .expect("Failed to create flexidag storage"); let dag = BlockDAG::new( - Header::new( - genesis.block().header().clone(), - vec![HashValue::new(ORIGIN)], - ), 3, flex_dag_db, ); diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index 9efccd1424..a5e8228665 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::block_connector::metrics::ChainMetrics; +use crate::tasks::BlockDiskCheckEvent; use anyhow::{bail, format_err, Ok, Result}; +use async_std::stream::StreamExt; +use futures::executor::block_on; use parking_lot::Mutex; use starcoin_chain::BlockChain; use starcoin_chain_api::{ChainReader, ChainWriter, ConnectBlockError, WriteableChainService}; @@ -10,18 +13,26 @@ use starcoin_config::NodeConfig; use starcoin_consensus::BlockDAG; use starcoin_crypto::HashValue; use starcoin_executor::VMMetrics; +use starcoin_flexidag::flexidag_service::{AddToDag, DumpTipsToAccumulator, UpdateDagTips}; +use starcoin_flexidag::FlexidagService; use starcoin_logger::prelude::*; use starcoin_service_registry::bus::{Bus, BusService}; use starcoin_service_registry::{ServiceContext, ServiceRef}; +use starcoin_storage::flexi_dag::KTotalDifficulty; +use starcoin_storage::storage::CodecKVStore; use starcoin_storage::Store; +use starcoin_time_service::{DagBlockTimeWindowService, TimeWindowResult}; use starcoin_txpool_api::TxPoolSyncService; use starcoin_types::block::BlockInfo; +use starcoin_types::blockhash::BlockHashMap; +use starcoin_types::dag_block::KTotalDifficulty; +use starcoin_types::consensus_header::DagHeader; use starcoin_types::{ block::{Block, BlockHeader, ExecutedBlock}, startup_info::StartupInfo, system_events::{NewBranch, NewHeadBlock}, }; -use std::{fmt::Formatter, sync::Arc}; +use std::{fmt::Formatter, sync::Arc, sync::Mutex}; use super::BlockConnectorService; @@ -40,7 +51,7 @@ where metrics: Option, vm_metrics: Option, dag_block_pool: Arc)>>>, - dag: Arc>, + flexidag_service: ServiceRef, } #[derive(Clone, Debug)] @@ -136,7 +147,7 @@ where txpool: TransactionPoolServiceT, bus: ServiceRef, vm_metrics: Option, - dag: Arc>, + flexidag_service: ServiceRef, ) -> Result { let net = config.net(); let main = BlockChain::new( @@ -161,7 +172,7 @@ where metrics, vm_metrics, dag_block_pool: Arc::new(Mutex::new(vec![])), - dag, + flexidag_service, }) } @@ -255,13 +266,7 @@ where let branch_total_difficulty = new_branch.get_total_difficulty()?; if branch_total_difficulty > main_total_difficulty { self.update_startup_info(new_branch.head_block().header())?; - - let dag_parents = self.dag.lock().get_parents(new_head_block)?; - ctx.broadcast(NewHeadBlock( - Arc::new(new_branch.head_block()), - Some(dag_parents), - )); - + ctx.broadcast(NewHeadBlock(Arc::new(new_branch.head_block()))); Ok(()) } else { bail!("no need to switch"); @@ -398,7 +403,7 @@ where .storage .get_tips_by_block_id(executed_block.block.header().id()) .ok(); - self.broadcast_new_head(executed_block, dag_block_parents, next_tips); + self.broadcast_new_head(executed_block); Ok(()) } @@ -602,12 +607,7 @@ where Ok(blocks) } - fn broadcast_new_head( - &self, - block: ExecutedBlock, - dag_parents: Option>, - next_tips: Option>, - ) { + fn broadcast_new_head(&self, block: ExecutedBlock) { if let Some(metrics) = self.metrics.as_ref() { metrics .chain_select_head_total @@ -617,7 +617,7 @@ where if let Err(e) = self .bus - .broadcast(NewHeadBlock(Arc::new(block), dag_parents)) + .broadcast(NewHeadBlock(Arc::new(block))) { error!("Broadcast NewHeadBlock error: {:?}", e); } diff --git a/sync/src/sync.rs b/sync/src/sync.rs index 856ab5ffc5..cdab025c5d 100644 --- a/sync/src/sync.rs +++ b/sync/src/sync.rs @@ -5,7 +5,8 @@ use crate::block_connector::BlockConnectorService; use crate::sync_metrics::SyncMetrics; use crate::tasks::{full_sync_task, sync_dag_full_task, AncestorEvent, SyncFetcher}; use crate::verified_rpc_client::{RpcVerifyError, VerifiedRpcClient}; -use anyhow::{format_err, Result}; +use anyhow::{format_err, Ok, Result}; +use forkable_jellyfish_merkle::node_type::Node; use futures::executor::block_on; use futures::FutureExt; use futures_timer::Delay; @@ -17,11 +18,13 @@ use starcoin_chain_api::{ChainReader, ChainWriter}; use starcoin_config::NodeConfig; use starcoin_consensus::BlockDAG; use starcoin_executor::VMMetrics; +use starcoin_flexidag::flexidag_service::GetDagAccumulatorInfo; +use starcoin_flexidag::{flexidag_service, FlexidagService}; use starcoin_logger::prelude::*; use starcoin_network::NetworkServiceRef; use starcoin_network::PeerEvent; use starcoin_service_registry::{ - ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, + ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRef, }; use starcoin_storage::block_info::BlockInfoStore; use starcoin_storage::{BlockStore, Storage, Store, SyncFlexiDagStore}; @@ -65,6 +68,7 @@ pub struct SyncService { storage: Arc, metrics: Option, peer_score_metrics: Option, + flexidag_service: ServiceRef, vm_metrics: Option, } @@ -72,6 +76,7 @@ impl SyncService { pub fn new( config: Arc, storage: Arc, + flexidag_service: ServiceRef, vm_metrics: Option, ) -> Result { let startup_info = storage @@ -96,24 +101,19 @@ impl SyncService { // let genesis = storage // .get_genesis()? // .ok_or_else(|| format_err!("Can not find genesis hash in storage."))?; - let dag_accumulator_info = - match storage.get_dag_accumulator_info(head_block_info.block_id().clone())? { - Some(info) => Some(info), - None => { - warn!( - "Can not find dag accumulator info by head block id: {}, use genesis info.", - head_block_info.block_id(), - ); - None - } - }; + let dag_accumulator_info = match storage.get_dag_accumulator_info()? { + Some(info) => Some(info), + None => { + warn!( + "Can not find dag accumulator info by head block id: {}, use genesis info.", + head_block_info.block_id(), + ); + None + } + }; Ok(Self { sync_status: SyncStatus::new( - ChainStatus::new( - head_block.header.clone(), - head_block_info, - Some(storage.get_tips_by_block_id(head_block_hash)?), - ), + ChainStatus::new(head_block.header.clone(), head_block_info), dag_accumulator_info, ), stage: SyncStage::NotStart, @@ -121,10 +121,78 @@ impl SyncService { storage, metrics, peer_score_metrics, + flexidag_service, vm_metrics, }) } + pub async fn create_verified_client( + &self, + network: NetworkServiceRef, + config: Arc, + peer_strategy: Option, + peers: Vec, + ) -> Result> { + let peer_select_strategy = + peer_strategy.unwrap_or_else(|| config.sync.peer_select_strategy()); + + let mut peer_set = network.peer_set().await?; + + loop { + if peer_set.is_empty() || peer_set.len() < (config.net().min_peers() as usize) { + let level = if config.net().is_dev() || config.net().is_test() { + Level::Debug + } else { + Level::Info + }; + log!( + level, + "[sync]Waiting enough peers to sync, current: {:?} peers, min peers: {:?}", + peer_set.len(), + config.net().min_peers() + ); + + Delay::new(Duration::from_secs(1)).await; + peer_set = network.peer_set().await?; + } else { + break; + } + } + + let peer_reputations = network + .reputations(REPUTATION_THRESHOLD) + .await? + .await? + .into_iter() + .map(|(peer, reputation)| { + ( + peer, + (REPUTATION_THRESHOLD.abs().saturating_add(reputation)) as u64, + ) + }) + .collect(); + + let peer_selector = PeerSelector::new_with_reputation( + peer_reputations, + peer_set, + peer_select_strategy, + peer_score_metrics, + ); + + peer_selector.retain_rpc_peers(); + if !peers.is_empty() { + peer_selector.retain(peers.as_ref()) + } + if peer_selector.is_empty() { + return Err(format_err!("[sync] No peers to sync.")); + } + + Ok(Arc::new(VerifiedRpcClient::new( + peer_selector.clone(), + network.clone(), + ))) + } + pub fn check_and_start_sync( &mut self, peers: Vec, @@ -190,63 +258,8 @@ impl SyncService { .expect("storage must exist") .get_accumulator_snapshot_storage(); - let dag = ctx.get_shared::>>()?; - - let test_storage = storage.clone(); let fut = async move { - let peer_select_strategy = - peer_strategy.unwrap_or_else(|| config.sync.peer_select_strategy()); - - let mut peer_set = network.peer_set().await?; - - loop { - if peer_set.is_empty() || peer_set.len() < (config.net().min_peers() as usize) { - let level = if config.net().is_dev() || config.net().is_test() { - Level::Debug - } else { - Level::Info - }; - log!( - level, - "[sync]Waiting enough peers to sync, current: {:?} peers, min peers: {:?}", - peer_set.len(), - config.net().min_peers() - ); - - Delay::new(Duration::from_secs(1)).await; - peer_set = network.peer_set().await?; - } else { - break; - } - } - - let peer_reputations = network - .reputations(REPUTATION_THRESHOLD) - .await? - .await? - .into_iter() - .map(|(peer, reputation)| { - ( - peer, - (REPUTATION_THRESHOLD.abs().saturating_add(reputation)) as u64, - ) - }) - .collect(); - - let peer_selector = PeerSelector::new_with_reputation( - peer_reputations, - peer_set, - peer_select_strategy, - peer_score_metrics, - ); - - peer_selector.retain_rpc_peers(); - if !peers.is_empty() { - peer_selector.retain(peers.as_ref()) - } - if peer_selector.is_empty() { - return Err(format_err!("[sync] No peers to sync.")); - } + let rpc_client = self.create_verified_client(network.clone(), config.clone(), peer_strategy, peers).await?; let startup_info = storage .get_startup_info()? @@ -257,46 +270,56 @@ impl SyncService { format_err!("Can not find block info by id: {}", current_block_id) })?; - let rpc_client = Arc::new(VerifiedRpcClient::new( - peer_selector.clone(), - network.clone(), - )); - - // for testing, we start dag sync directly - if let Some((target, op_dag_accumulator_info)) = - rpc_client.get_best_target(current_block_info.get_total_difficulty())? - { - if let Some(target_accumulator_info) = op_dag_accumulator_info { - let local_dag_accumulator_info = storage - .get_dag_accumulator_info(current_block_id)? - .expect("current dag accumulator info should exist"); - let (fut, task_handle, task_event_handle) = sync_dag_full_task( - local_dag_accumulator_info, - target_accumulator_info, - rpc_client.clone(), - dag_accumulator_store, - dag_accumulator_snapshot, - storage.clone(), - config.net().time_service(), - vm_metrics.clone(), - connector_service.clone(), - network.clone(), - skip_pow_verify, - dag.clone(), - block_chain_service.clone(), - config.net().id().clone(), - )?; - self_ref.notify(SyncBeginEvent { - target, - task_handle, - task_event_handle, - peer_selector, - })?; - if let Some(sync_task_total) = sync_task_total.as_ref() { - sync_task_total.with_label_values(&["start"]).inc(); - } - Ok(Some(fut.await?)) + + let op_local_dag_accumulator_info = + self.flexidag_service.send(GetDagAccumulatorInfo).await??; + + if let Some(local_dag_accumulator_info) = op_local_dag_accumulator_info { + let dag_sync_futs = rpc_client + .get_dag_targets(current_block_info.get_total_difficulty(), local_dag_accumulator_info.get_num_leaves())? + .into_iter() + .fold(Ok(vec![]), |mut futs, (peer_id, target_accumulator_infos)| { + rpc_client.switch_strategy(PeerStrategy::DagSync(peer_id)); + let (fut, task_handle, task_event_handle) = sync_dag_full_task( + local_dag_accumulator_info, + target_accumulator_info, + rpc_client.clone(), + dag_accumulator_store, + dag_accumulator_snapshot, + storage.clone(), + config.net().time_service(), + vm_metrics.clone(), + connector_service.clone(), + network.clone(), + skip_pow_verify, + dag.clone(), + block_chain_service.clone(), + config.net().id().clone(), + )?; + self_ref.notify(SyncBeginEvent { + target, + task_handle, + task_event_handle, + peer_selector, + })?; + if let Some(sync_task_total) = sync_task_total.as_ref() { + sync_task_total.with_label_values(&["start"]).inc(); + } + futs.and_then(|v| v.push(fut)) + })? + .into_iter() + .fold(Ok(vec![]), |chain, fut| Ok(vec![fut.await?]))?; + assert!(dag_sync_futs.len() <= 1); + if dag_sync_futs.len() == 1 { + Ok(Some(dag_sync_futs[0])) } else { + debug!("[sync]No best peer to request, current is beast."); + Ok(None) + } + } else { + if let Some((target, _)) = + rpc_client.get_best_target(current_block_info.get_total_difficulty())? + { info!("[sync] Find target({}), total_difficulty:{}, current head({})'s total_difficulty({})", target.target_id.id(), target.block_info.total_difficulty, current_block_id, current_block_info.total_difficulty); let (fut, task_handle, task_event_handle) = full_sync_task( @@ -326,10 +349,10 @@ impl SyncService { sync_task_total.with_label_values(&["start"]).inc(); } Ok(Some(fut.await?)) + } else { + debug!("[sync]No best peer to request, current is beast."); + Ok(None) } - } else { - debug!("[sync]No best peer to request, current is beast."); - Ok(None) } }; let network = ctx.get_shared::()?; @@ -357,7 +380,7 @@ impl SyncService { let current_block_id = startup_info.main; let local_dag_accumulator_info = test_storage - .get_dag_accumulator_info(current_block_id).unwrap() + .get_dag_accumulator_info().unwrap() .expect("current dag accumulator info should exist"); if let Some(sync_task_total) = sync_task_total.as_ref() { @@ -456,8 +479,9 @@ impl ServiceFactory for SyncService { fn create(ctx: &mut ServiceContext) -> Result { let config = ctx.get_shared::>()?; let storage = ctx.get_shared::>()?; + let flexidag_service = ctx.service_ref::()?.clone(); let vm_metrics = ctx.get_shared_opt::()?; - Self::new(config, storage, vm_metrics) + Self::new(config, storage, flexidag_service, vm_metrics) } } @@ -667,15 +691,14 @@ impl EventHandler for SyncService { impl EventHandler for SyncService { fn handle_event(&mut self, msg: NewHeadBlock, ctx: &mut ServiceContext) { - let NewHeadBlock(block, tips_hash) = msg; + let NewHeadBlock(block) = msg; if self.sync_status.update_chain_status(ChainStatus::new( block.header().clone(), block.block_info.clone(), - tips_hash, )) { self.sync_status.update_dag_accumulator_info( self.storage - .get_dag_accumulator_info(block.header().id()) + .get_dag_accumulator_info() .expect("dag accumulator info must exist"), ); ctx.broadcast(SyncStatusChangeEvent(self.sync_status.clone())); diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index edc8a419e7..7f0e0cdaba 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -8,13 +8,17 @@ use futures::future::BoxFuture; use futures::FutureExt; use network_api::PeerId; use network_api::PeerProvider; +use starcoin_accumulator::accumulator_info::AccumulatorInfo; use starcoin_accumulator::{Accumulator, MerkleAccumulator}; use starcoin_chain::{verifier::BasicVerifier, BlockChain}; use starcoin_chain_api::{ChainReader, ChainWriter, ConnectBlockError, ExecutedBlock}; -use starcoin_config::G_CRATE_VERSION; +use starcoin_config::{Connect, G_CRATE_VERSION}; use starcoin_consensus::BlockDAG; use starcoin_crypto::HashValue; +use starcoin_flexidag::flexidag_service::{AddToDag, GetDagTips, ForkDagAccumulator, FinishSync}; +use starcoin_flexidag::FlexidagService; use starcoin_logger::prelude::*; +use starcoin_service_registry::ServiceRef; use starcoin_storage::BARNARD_HARD_FORK_HASH; use starcoin_sync_api::SyncTarget; use starcoin_types::block::{Block, BlockIdAndNumber, BlockInfo, BlockNumber}; @@ -30,10 +34,13 @@ pub struct SyncBlockData { pub(crate) block: Block, pub(crate) info: Option, pub(crate) peer_id: Option, - pub(crate) accumulator_root: Option, // the block belongs to this accumulator leaf - pub(crate) count_in_leaf: u64, // the number of the block in the accumulator leaf + pub(crate) accumulator_root: Option, // the block belongs to this dag accumulator leaf + pub(crate) count_in_leaf: u64, // the count of the block in the dag accumulator leaf + pub(crate) dag_accumulator_index: Option, // the index of the accumulator leaf which the block belogs to } + + impl SyncBlockData { pub fn new( block: Block, @@ -41,8 +48,7 @@ impl SyncBlockData { peer_id: Option, accumulator_root: Option, count_in_leaf: u64, - dag_block_headers: Option>, - dag_transaction_header: Option, + dag_acccumulator_index: Option, ) -> Self { Self { block, @@ -50,6 +56,7 @@ impl SyncBlockData { peer_id, accumulator_root, count_in_leaf, + dag_accumulator_index, } } } @@ -142,10 +149,10 @@ impl TaskState for BlockSyncTask { .fetch_blocks(no_exist_block_ids) .await? .into_iter() - .fold(result_map, |mut result_map, (block, peer_id)| { + .fold(result_map, |mut result_map, (block, peer_id, _, _)| { result_map.insert( block.id(), - SyncBlockData::new(block, None, peer_id, None, 1, None, None), + SyncBlockData::new(block, None, peer_id, None, 1, None), ); result_map }) @@ -166,8 +173,8 @@ impl TaskState for BlockSyncTask { .fetch_blocks(block_ids) .await? .into_iter() - .map(|(block, peer_id)| { - SyncBlockData::new(block, None, peer_id, None, 1, None, None) + .map(|(block, peer_id, _, _)| { + SyncBlockData::new(block, None, peer_id, None, 1, None) }) .collect()) } @@ -203,7 +210,7 @@ impl TaskState for BlockSyncTask { pub struct BlockCollector { //node's current block info current_block_info: BlockInfo, - target: Option, + target: Option, // single chain use only // the block chain init by ancestor chain: BlockChain, event_handle: H, @@ -212,7 +219,8 @@ pub struct BlockCollector { last_accumulator_root: HashValue, dag_block_pool: Vec, target_accumulator_root: HashValue, - dag: Option>>, + flexidag_service: ServiceRef, + new_dag_accumulator_info: Option, } impl BlockCollector @@ -228,8 +236,13 @@ where peer_provider: N, skip_pow_verify: bool, target_accumulator_root: HashValue, - dag: Option>>, + flexidag_service: ServiceRef, ) -> Self { + if let Some(dag) = &dag { + dag.lock() + .expect("failed to lock the dag") + .clear_missing_block(); + } Self { current_block_info, target, @@ -240,10 +253,15 @@ where last_accumulator_root: HashValue::zero(), dag_block_pool: Vec::new(), target_accumulator_root, - dag: dag.clone(), + flexidag_service, + new_dag_accumulator_info: None, } } + pub fn check_if_became_dag(&self) -> Result { + Ok(async_std::task::block_on(self.flexidag_service.send(GetDagTips))??.is_some()) + } + #[cfg(test)] pub fn apply_block_for_test( &mut self, @@ -251,7 +269,7 @@ where parents_hash: Option>, next_tips: &mut Option>, ) -> Result<()> { - self.apply_block(block, None, parents_hash, next_tips) + self.apply_block(block, None) } fn notify_connected_block( @@ -260,7 +278,6 @@ where block_info: BlockInfo, action: BlockConnectAction, state: CollectorState, - dag_parents: Option>, ) -> Result { let total_difficulty = block_info.get_total_difficulty(); @@ -286,7 +303,6 @@ where // second, construct the block connect event. let block_connect_event = BlockConnectedEvent { block, - dag_parents, feedback: sender, action, }; @@ -319,13 +335,7 @@ where Ok(state) } - fn apply_block( - &mut self, - block: Block, - peer_id: Option, - parents_hash: Option>, - next_tips: &mut Option>, - ) -> Result<()> { + fn apply_block(&mut self, block: Block, peer_id: Option) -> Result<()> { if let Some((_failed_block, pre_peer_id, err, version)) = self .chain .get_storage() @@ -393,7 +403,8 @@ where fn broadcast_dag_chain_block( &mut self, - broadcast_blocks: Vec<(Block, BlockInfo, Option>, BlockConnectAction)>, + broadcast_blocks: Vec<(Block, BlockInfo, BlockConnectAction)>, + start_index: u64, ) -> Result { let state = if self.last_accumulator_root == self.target_accumulator_root { CollectorState::Enough @@ -401,29 +412,16 @@ where CollectorState::Need }; - let last_index = broadcast_blocks.len() - 1; - broadcast_blocks.into_iter().enumerate().for_each( - |(index, (block, block_info, dag_parents, action))| { - if last_index == index && state == CollectorState::Enough { - let _ = self.notify_connected_block( - block, - block_info, - action, - CollectorState::Enough, - dag_parents, - ); - } else { - let _ = self.notify_connected_block( - block, - block_info, - action, - CollectorState::Need, - dag_parents, - ); - } - }, - ); - + self.new_dag_accumulator_info = Some(async_std::task::block_on(self.flexidag_service.send(ForkDagAccumulator { + new_blocks: broadcast_blocks.into_iter().map(|(block, _, _)| block.id()).collect(), + dag_accumulator_index: start_index, + block_header_id: self.chain.head_block().id(), + }))??); + if state == State::Enough { + async_std::task::block_on(self.flexidag_service.send(FinishSync { + dag_accumulator_info: self.new_dag_accumulator_info.clone(), + }))?? + } return Ok(state); } @@ -460,53 +458,66 @@ where Ok(CollectorState::Need) }; - self.notify_connected_block(block, block_info, action, state?, None) + let result = self.notify_connected_block(block, block_info, action, state?); + match result { + Ok(state) => {} + Err(e) => { + error!("notify connected block error: {:?}", e); + Err(e) + } + } } - fn collect_item( - &mut self, - item: SyncBlockData, - next_tips: &mut Option>, - ) -> Result<(Block, BlockInfo, Option>, BlockConnectAction)> { + fn collect_dag_item(&mut self, item: SyncBlockData) -> Result<()> { let (block, block_info, peer_id) = item.into(); let block_id = block.id(); let timestamp = block.header().timestamp(); - if let Some(parents) = block.header().parents_hash().clone() { - if let Some(dag) = &self.dag { - // let color = dag - // .lock() - // .unwrap() - // .commit_header(&Header::new(block.header().clone(), parents.clone()))?; - // if let ColoringOutput::Red = color { - // panic!("the red block should not be applied or connected!"); - // } - let _ = dag - .lock() - .unwrap() - .push_parent_children(block_id, Arc::new(parents)); + let add_dag_result = async_std::task::block_on(self.flexidag_service.send(AddToDag { + block_header: block.header().clone(), + }))??; + let selected_parent = self + .storage + .get_block_by_hash(add_dag_result.selected_parent)? + .expect("selected parent should in storage"); + let mut chain = self.chain.fork(selected_parent.header.parent_hash())?; + for blue_hash in add_dag_result.mergeset_blues.mergeset_blues.iter() { + if let Some(blue_block) = self.storage.get_block(blue_hash.to_owned())? { + match chain.apply(blue_block) { + Ok(_executed_block) => (), + Err(e) => warn!("failed to connect dag block: {:?}", e), + } } else { - panic!("in dag sync, the dag should not be None") + error!("Failed to get block {:?}", blue_hash); } } + if chain.status().info().total_difficulty > self.chain.status().info().total_difficulty { + self.chain = chain; + } + + Ok(()) + } + + fn collect_item( + &mut self, + item: SyncBlockData, + ) -> Result<(Block, BlockInfo, BlockConnectAction)> { + let (block, block_info, peer_id) = item.into(); + let block_id = block.id(); + let timestamp = block.header().timestamp(); + return match block_info { Some(block_info) => { - //If block_info exists, it means that this block was already executed and try connect in the previous sync, but the sync task was interrupted. + //If block_info exists, it means that this block was already executed and + // try connect in the previous sync, but the sync task was interrupted. //So, we just need to update chain and continue self.chain.connect(ExecutedBlock { block: block.clone(), block_info: block_info.clone(), })?; - let block_info = self.chain.status().info; - let tips_hash = self.chain.status().tips_hash; - Ok(( - block, - block_info, - tips_hash, - BlockConnectAction::ConnectExecutedBlock, - )) + Ok((block, block_info, BlockConnectAction::ConnectExecutedBlock)) } None => { self.apply_block(block.clone(), peer_id)?; @@ -516,60 +527,6 @@ where } }; } - - // fn process_received_block(&self, item: SyncBlockData, next_tips: &mut Option>) -> Result { - - // let (block, block_info, parent_hash, action) = self.collect_item(item, next_tips)?; - // ///////// - // // let (block, block_info, peer_id) = item.into(); - // // let timestamp = block.header().timestamp(); - // // let (block_info, action) = match block_info { - // // Some(block_info) => { - // // //If block_info exists, it means that this block was already executed and try connect in the previous sync, but the sync task was interrupted. - // // //So, we just need to update chain and continue - // // self.chain.connect(ExecutedBlock { - // // block: block.clone(), - // // block_info: block_info.clone(), - // // })?; - // // (block_info, BlockConnectAction::ConnectExecutedBlock) - // // } - // // None => { - // // self.apply_block(block.clone(), peer_id)?; - // // self.chain.time_service().adjust(timestamp); - // // ( - // // self.chain.status().info, - // // BlockConnectAction::ConnectNewBlock, - // // ) - // // } - // // }; - - // //verify target - // let state: Result = - // if block_info.block_accumulator_info.num_leaves - // == self.target.block_info.block_accumulator_info.num_leaves - // { - // if block_info != self.target.block_info { - // Err(TaskError::BreakError( - // RpcVerifyError::new_with_peers( - // self.target.peers.clone(), - // format!( - // "Verify target error, expect target: {:?}, collect target block_info:{:?}", - // self.target.block_info, - // block_info - // ), - // ) - // .into(), - // ) - // .into()) - // } else { - // Ok(CollectorState::Enough) - // } - // } else { - // Ok(CollectorState::Need) - // }; - - // self.notify_connected_block(block, block_info, action, state?, parent_hash) - // } } impl TaskResultCollector for BlockCollector @@ -590,14 +547,6 @@ where return Ok(CollectorState::Need); } else { process_block_pool = std::mem::take(&mut self.dag_block_pool); - - self.chain.status().tips_hash = Some( - process_block_pool - .iter() - .clone() - .map(|item| item.block.header().id()) - .collect(), - ); } } else { // it is a single chain @@ -606,12 +555,17 @@ where assert!(!process_block_pool.is_empty()); - let mut next_tips = Some(vec![]); + // let mut next_tips = Some(vec![]); let mut block_to_broadcast = vec![]; - for item in process_block_pool { - block_to_broadcast.push(self.collect_item(item, &mut next_tips)?) + if item.accumulator_root.is_some() { + for item in process_block_pool { + self.collect_dag_item(item)? + } + } else { + for item in process_block_pool { + block_to_broadcast.push(self.collect_item(item)?) + } } - //verify target match self.target { Some(_) => { @@ -622,19 +576,16 @@ where ); let (block, block_info, _, action) = block_to_broadcast.pop().unwrap(); // self.check_if_sync_complete(block_info) - self.broadcast_single_chain_block(block, block_info, action) - } - None => { - // dag - assert!(!next_tips - .as_ref() - .expect("next_tips should not be None") - .is_empty()); - self.chain.append_dag_accumulator_leaf( - next_tips.expect("next_tips should not be None"), - )?; - self.broadcast_dag_chain_block(block_to_broadcast) + match self.broadcast_single_chain_block(block, block_info, action) { + Ok(_) => { + if self.check_if_became_dag()? { + Ok(CollectorState::Enough) + } + } + Err(e) => Err(e), + } } + None => self.broadcast_dag_chain_block(block_to_broadcast, item.dag_accumulator_index), } } diff --git a/sync/src/tasks/inner_sync_task.rs b/sync/src/tasks/inner_sync_task.rs index e47a4bc9f9..b893b3da89 100644 --- a/sync/src/tasks/inner_sync_task.rs +++ b/sync/src/tasks/inner_sync_task.rs @@ -144,7 +144,6 @@ where self.storage.clone(), self.net_id.clone(), vm_metrics, - None, //TODO: FIXME )?; let block_collector = BlockCollector::new_with_handle( current_block_info.clone(), diff --git a/sync/src/tasks/mod.rs b/sync/src/tasks/mod.rs index 46d6a82ded..f85b5a2e79 100644 --- a/sync/src/tasks/mod.rs +++ b/sync/src/tasks/mod.rs @@ -39,6 +39,29 @@ use stream_task::{ }; pub trait SyncFetcher: PeerOperator + BlockIdFetcher + BlockFetcher + BlockInfoFetcher { + fn get_dag_targets(&self, total_difficulty: U256, local_dag_accumulator_leaf_num: u64) -> Result> { + Ok(self + .peer_selector() + .peer_infos() + .into_iter() + .filter(|peer_info| { + match (peer_info.chain_info().dag_accumulator_info(), peer_info.chain_info().k_total_difficulties()) { + (Some(info), Some(k)) => { + k.first() <= total_difficulty || info.get_num_leaves() > local_dag_accumulator_leaf_num + } + (None, None) => false, + _ => { + warn!("dag accumulator is inconsistent with k total difficulty"); + false + } + } + }) + .map(|peer_info| { + (peer_info.peer_id, peer_info.chain_info().dag_accumulator_info().clone()) + }) + .collect()); + } + fn get_best_target( &self, min_difficulty: U256, @@ -320,7 +343,7 @@ impl BlockFetcher for VerifiedRpcClient { ) -> BoxFuture<'_, Result)>>> { self.get_blocks(block_ids.clone()) .and_then(|blocks| async move { - let results: Result)>> = block_ids + let results = block_ids .iter() .zip(blocks) .map(|(id, block)| { @@ -393,7 +416,7 @@ impl BlockLocalStore for Arc { let block_info = self.get_block_info(id)?; Ok(Some(SyncBlockData::new( - block, block_info, None, None, 1, None, None, + block, block_info, None, None, 1, None, ))) } None => Ok(None), @@ -411,7 +434,6 @@ pub enum BlockConnectAction { #[derive(Clone, Debug)] pub struct BlockConnectedEvent { pub block: Block, - pub dag_parents: Option>, pub feedback: Option>, pub action: BlockConnectAction, } diff --git a/sync/src/tasks/sync_dag_block_task.rs b/sync/src/tasks/sync_dag_block_task.rs index e3e36d6cff..2428898926 100644 --- a/sync/src/tasks/sync_dag_block_task.rs +++ b/sync/src/tasks/sync_dag_block_task.rs @@ -58,13 +58,6 @@ impl SyncDagBlockTask { .expect(format!("index: {} must be valid for getting snapshot", index).as_str()) .expect(format!("index: {} should not be None for getting snapshot", index).as_str()); - // let block_with_infos = self - // .local_store - // .get_block_with_info(snapshot.child_hashes.clone())?; - - // assert_eq!(block_with_infos.len(), snapshot.child_hashes.len()); - - // the order must be the same between snapshot.child_hashes and block_with_infos let mut absent_block = vec![]; let mut result = vec![]; snapshot.child_hashes.iter().for_each(|block_id| { @@ -99,7 +92,6 @@ impl SyncDagBlockTask { .1 .to_owned(); }); - result.sort_by_key(|item| item.block_id); let block_info = self .local_store @@ -114,6 +106,7 @@ impl SyncDagBlockTask { peer_id: item.peer_id, accumulator_root: Some(snapshot.accumulator_info.get_accumulator_root().clone()), count_in_leaf: snapshot.child_hashes.len() as u64, + dag_accumulator_index: Some(index), }) .collect()) } diff --git a/sync/src/tasks/sync_dag_full_task.rs b/sync/src/tasks/sync_dag_full_task.rs index c6b1a2c3d3..80af07a501 100644 --- a/sync/src/tasks/sync_dag_full_task.rs +++ b/sync/src/tasks/sync_dag_full_task.rs @@ -194,17 +194,16 @@ where let event_handle = Arc::new(TaskEventCounterHandle::new()); let ext_error_handle = Arc::new(ExtSyncTaskErrorHandle::new(fetcher.clone())); - let start_block_id = get_start_block_id(&accumulator, start_index, local_store.clone()) - .map_err(|err| TaskError::BreakError(anyhow!(err))); - let chain = BlockChain::new( - time_service.clone(), - start_block_id?, - local_store.clone(), - net_id, - vm_metrics, - None, //TODO: FIXME - ) - .map_err(|err| TaskError::BreakError(anyhow!(err))); + // let start_block_id = get_start_block_id(&accumulator, start_index, local_store.clone()) + // .map_err(|err| TaskError::BreakError(anyhow!(err))); + // let chain = BlockChain::new( + // time_service.clone(), + // start_block_id?, + // local_store.clone(), + // net_id, + // vm_metrics, + // ) + // .map_err(|err| TaskError::BreakError(anyhow!(err))); let leaf = accumulator .get_leaf(start_index) @@ -222,16 +221,16 @@ where .as_str(), ); - snapshot.child_hashes.sort(); - let last_chain_block = snapshot - .child_hashes - .iter() - .last() - .expect("block id should not be None") - .clone(); + let chain = BlockChain::new( + time_service.clone(), + snapshot.head_block_id?, + local_store.clone(), + net_id, + vm_metrics, + )?; let current_block_info = local_store - .get_block_info(last_chain_block)? + .get_block_info(snapshot.head_block_id)? .ok_or_else(|| format_err!("Can not find block info by id: {}", last_chain_block)) .map_err(|err| TaskError::BreakError(anyhow!(err))); diff --git a/sync/src/tasks/tests.rs b/sync/src/tasks/tests.rs index e0382bad68..88895a69c4 100644 --- a/sync/src/tasks/tests.rs +++ b/sync/src/tasks/tests.rs @@ -766,7 +766,7 @@ impl MockLocalBlockStore { ); self.store.lock().unwrap().insert( block.id(), - SyncBlockData::new(block.clone(), Some(block_info), None, None, 1, None, None), + SyncBlockData::new(block.clone(), Some(block_info), None, None, 1, None), ); } } diff --git a/sync/src/verified_rpc_client.rs b/sync/src/verified_rpc_client.rs index 0381a077e1..57c682d320 100644 --- a/sync/src/verified_rpc_client.rs +++ b/sync/src/verified_rpc_client.rs @@ -381,7 +381,16 @@ impl VerifiedRpcClient { pub async fn get_blocks( &self, ids: Vec, - ) -> Result)>>> { + ) -> Result< + Vec< + Option<( + Block, + Option, + Option>, + Option, + )>, + >, + > { let peer_id = self.select_a_peer()?; let start_time = Instant::now(); let blocks = self.client.get_blocks(peer_id.clone(), ids.clone()).await?; @@ -395,7 +404,7 @@ impl VerifiedRpcClient { .zip(blocks) .map(|(id, block)| { if let Some(block) = block { - let actual_id = block.id(); + let actual_id = block.0.id(); if actual_id != id { warn!( "Get block by id: {:?} from peer: {:?}, but got block: {:?}", @@ -403,7 +412,7 @@ impl VerifiedRpcClient { ); None } else { - Some((block, Some(peer_id.clone()))) + Some((block.0, Some(peer_id.clone()), block.1, block.2)) } } else { None diff --git a/types/src/dag_block.rs b/types/src/dag_block.rs index bc089a92e5..672b728850 100644 --- a/types/src/dag_block.rs +++ b/types/src/dag_block.rs @@ -945,3 +945,20 @@ impl EpochUncleSummary { } } } +#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)] +pub struct KTotalDifficulty { + pub head_block_id: HashValue, + pub total_difficulty: U256, +} + +impl Ord for KTotalDifficulty { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.total_difficulty.cmp(&other.total_difficulty) + } +} + +impl PartialOrd for KTotalDifficulty { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} diff --git a/types/src/startup_info.rs b/types/src/startup_info.rs index 8ff3d94dc8..4e08226411 100644 --- a/types/src/startup_info.rs +++ b/types/src/startup_info.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::block::{BlockHeader, BlockInfo, BlockNumber}; +use crate::dag_block::KTotalDifficulty; use anyhow::Result; use bcs_ext::{BCSCodec, Sample}; use schemars::JsonSchema; @@ -11,6 +12,7 @@ use starcoin_accumulator::MerkleAccumulator; use starcoin_crypto::HashValue; use starcoin_uint::U256; use starcoin_vm_types::genesis_config::ChainId; +use std::collections::BTreeSet; use std::convert::{TryFrom, TryInto}; use std::fmt; use std::fmt::Formatter; @@ -23,6 +25,7 @@ pub struct ChainInfo { genesis_hash: HashValue, status: ChainStatus, flexi_dag_accumulator_info: Option, + k_total_difficulties: Option>, } impl ChainInfo { @@ -31,12 +34,14 @@ impl ChainInfo { genesis_hash: HashValue, status: ChainStatus, flexi_dag_accumulator_info: Option, + k_total_difficulties: Option>, ) -> Self { Self { chain_id, genesis_hash, status, flexi_dag_accumulator_info, + k_total_difficulties, } } @@ -75,6 +80,10 @@ impl ChainInfo { self.status.info.get_total_difficulty() } + pub fn k_total_difficulties(&self) -> &Option> { + &self.k_total_difficulties + } + pub fn into_inner(self) -> (ChainId, HashValue, ChainStatus) { (self.chain_id, self.genesis_hash, self.status) } @@ -90,6 +99,7 @@ impl ChainInfo { rand::random::(), rand::random::(), )), + k_total_difficulties: Some(BTreeSet::new()), } } } @@ -101,6 +111,7 @@ impl std::default::Default for ChainInfo { genesis_hash: HashValue::default(), status: ChainStatus::sample(), flexi_dag_accumulator_info: Some(AccumulatorInfo::default()), + k_total_difficulties: Some(BTreeSet::new()), } } } @@ -122,17 +133,11 @@ pub struct ChainStatus { pub head: BlockHeader, /// Chain block info pub info: BlockInfo, - /// tips of the dag chain in dag accumulator snapshots - pub tips_hash: Option>, } impl ChainStatus { - pub fn new(head: BlockHeader, info: BlockInfo, tips_hash: Option>) -> Self { - Self { - head, - info, - tips_hash, - } + pub fn new(head: BlockHeader, info: BlockInfo) -> Self { + Self { head, info } } pub fn random() -> Self { @@ -156,7 +161,6 @@ impl ChainStatus { Self { head: head.clone(), info: block_info, - tips_hash: Some(vec![head.id()]), } } @@ -175,14 +179,6 @@ impl ChainStatus { pub fn into_inner(self) -> (BlockHeader, BlockInfo) { (self.head, self.info) } - - pub fn get_last_tip_block_id(&self) -> Option { - if let Some(tips) = &self.tips_hash { - tips.into_iter().max().cloned() - } else { - None - } - } } impl Sample for ChainStatus { @@ -190,7 +186,6 @@ impl Sample for ChainStatus { Self { head: BlockHeader::sample(), info: BlockInfo::sample(), - tips_hash: Some(vec![HashValue::zero()]), } } } @@ -230,6 +225,9 @@ impl DagChainStatus { pub struct StartupInfo { /// main chain head block hash pub main: HashValue, + + /// dag accumulator info hash + pub dag_main: Option, } impl fmt::Display for StartupInfo { @@ -243,7 +241,14 @@ impl fmt::Display for StartupInfo { impl StartupInfo { pub fn new(main: HashValue) -> Self { - Self { main } + Self { + main, + dag_main: None, + } + } + + pub fn new_with_dag(main: HashValue, dag_main: Option) -> Self { + Self { main, dag_main } } pub fn update_main(&mut self, new_head: HashValue) { @@ -253,6 +258,14 @@ impl StartupInfo { pub fn get_main(&self) -> &HashValue { &self.main } + + pub fn update_dag_main(&mut self, new_head: HashValue) { + self.dag_main = Some(new_head); + } + + pub fn get_dag_main(&self) -> Option { + self.dag_main + } } impl TryFrom> for StartupInfo { diff --git a/types/src/system_events.rs b/types/src/system_events.rs index d6c9ce2773..0a84fe1a2d 100644 --- a/types/src/system_events.rs +++ b/types/src/system_events.rs @@ -10,7 +10,7 @@ use starcoin_crypto::HashValue; use starcoin_vm_types::genesis_config::ConsensusStrategy; use std::sync::Arc; #[derive(Clone, Debug)] -pub struct NewHeadBlock(pub Arc, pub Option>); +pub struct NewHeadBlock(pub Arc); /// may be uncle block #[derive(Clone, Debug)] diff --git a/vm/starcoin-transactional-test-harness/src/fork_chain.rs b/vm/starcoin-transactional-test-harness/src/fork_chain.rs index 2ad3fb7311..b5cf984a64 100644 --- a/vm/starcoin-transactional-test-harness/src/fork_chain.rs +++ b/vm/starcoin-transactional-test-harness/src/fork_chain.rs @@ -132,7 +132,7 @@ impl ForkBlockChain { self.number_hash_map .insert(self.current_number, block.header().id()); self.status = Some(ChainStatusWithBlock { - status: ChainStatus::new(block.header().clone(), block_info.clone(), None), + status: ChainStatus::new(block.header().clone(), block_info.clone()), head: block.clone(), }); self.storage.save_block_info(block_info)?; @@ -199,6 +199,7 @@ impl ChainApi for MockChainApi { HashValue::random(), status.status, None, + None, ))), None => match client { Some(client) => client.info().await.map_err(|e| anyhow!("{}", e)),