From 5a44f52fea4e0fe400c99d1e4ab607fae82823d1 Mon Sep 17 00:00:00 2001 From: Jack Huang Date: Fri, 10 Nov 2023 14:05:57 +0800 Subject: [PATCH] add sync for dag accumulator (#3986) --- block-relayer/src/block_relayer.rs | 6 +- chain/api/src/chain.rs | 5 +- chain/service/src/chain_service.rs | 83 +++-- chain/src/chain.rs | 54 ++- chain/src/verifier/mod.rs | 82 ++--- consensus/src/dag/blockdag.rs | 101 ++++-- flexidag/src/flexidag_service.rs | 337 ++++++++++++++---- miner/src/create_block_template/mod.rs | 9 +- network-rpc/api/src/dag_protocol.rs | 2 - network/api/src/messages.rs | 2 - network/src/service.rs | 11 +- network/types/src/peer_info.rs | 2 - node/src/node.rs | 6 +- rpc/api/src/types.rs | 4 +- storage/src/flexi_dag/mod.rs | 37 +- storage/src/lib.rs | 25 +- sync/api/src/lib.rs | 5 +- .../block_connector_service.rs | 15 +- .../block_connector/test_write_block_chain.rs | 4 +- .../test_write_dag_block_chain.rs | 8 +- sync/src/block_connector/write_block_chain.rs | 123 +++---- sync/src/sync.rs | 98 +++-- sync/src/tasks/block_sync_task.rs | 194 +++------- sync/src/tasks/mod.rs | 41 +-- sync/src/tasks/sync_dag_block_task.rs | 13 +- sync/src/tasks/tests.rs | 2 +- sync/src/verified_rpc_client.rs | 9 +- test-helper/src/network.rs | 2 +- types/src/block.rs | 38 +- types/src/dag_block.rs | 17 + types/src/header.rs | 4 +- types/src/startup_info.rs | 29 +- types/src/system_events.rs | 2 +- 33 files changed, 760 insertions(+), 610 deletions(-) diff --git a/block-relayer/src/block_relayer.rs b/block-relayer/src/block_relayer.rs index 168c902c77..33baf7b34d 100644 --- a/block-relayer/src/block_relayer.rs +++ b/block-relayer/src/block_relayer.rs @@ -241,11 +241,7 @@ impl BlockRelayer { ) .await?; - block_connector_service.notify(PeerNewBlock::new( - peer_id, - block, - compact_block_msg.message.tips_hash, - ))?; + block_connector_service.notify(PeerNewBlock::new(peer_id, block))?; } Ok(()) }; diff --git a/chain/api/src/chain.rs b/chain/api/src/chain.rs index 8e872f5315..7943c64919 100644 --- a/chain/api/src/chain.rs +++ b/chain/api/src/chain.rs @@ -116,10 +116,7 @@ pub trait ChainWriter { fn connect(&mut self, executed_block: ExecutedBlock) -> Result; /// Verify, Execute and Connect block to current chain. - fn apply( - &mut self, - block: Block, - ) -> Result; + fn apply(&mut self, block: Block) -> Result; fn chain_state(&mut self) -> &ChainStateDB; } diff --git a/chain/service/src/chain_service.rs b/chain/service/src/chain_service.rs index 1b61e0c4ec..4df9f0581e 100644 --- a/chain/service/src/chain_service.rs +++ b/chain/service/src/chain_service.rs @@ -1,7 +1,7 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use anyhow::{bail, format_err, Error, Result, Ok}; +use anyhow::{bail, format_err, Error, Ok, Result}; use starcoin_accumulator::node::AccumulatorStoreType; use starcoin_accumulator::{Accumulator, MerkleAccumulator}; use starcoin_chain::BlockChain; @@ -12,8 +12,10 @@ use starcoin_chain_api::{ use starcoin_config::NodeConfig; use starcoin_consensus::BlockDAG; use starcoin_crypto::HashValue; -use starcoin_flexidag::{FlexidagService, flexidag_service}; -use starcoin_flexidag::flexidag_service::{GetDagAccumulatorLeafDetail, UpdateDagTips, GetDagBlockParents}; +use starcoin_flexidag::flexidag_service::{ + GetDagAccumulatorLeafDetail, GetDagBlockParents, UpdateDagTips, +}; +use starcoin_flexidag::{flexidag_service, FlexidagService}; use starcoin_logger::prelude::*; use starcoin_network_rpc_api::dag_protocol::{ GetDagAccumulatorLeaves, GetTargetDagAccumulatorLeafDetail, TargetDagAccumulatorLeaf, @@ -25,6 +27,7 @@ use starcoin_service_registry::{ use starcoin_storage::{BlockStore, Storage, Store}; use starcoin_types::block::ExecutedBlock; use starcoin_types::contract_event::ContractEventInfo; +use starcoin_types::dag_block::KTotalDifficulty; use starcoin_types::filter::Filter; use starcoin_types::system_events::NewHeadBlock; use starcoin_types::transaction::RichTransactionInfo; @@ -93,8 +96,7 @@ impl EventHandler for ChainReaderService { let new_head = event.0.block().header().clone(); if let Err(e) = if self.inner.get_main().can_connect(event.0.as_ref()) { match self.inner.update_chain_head(event.0.as_ref().clone()) { - // wait for fixing: update_dag_accumulator should be in BlockChain - std::result::Result::Ok(_) => self.inner.update_dag_accumulator(new_head), + std::result::Result::Ok(_) => (), Err(e) => Err(e), } } else { @@ -266,12 +268,14 @@ impl ServiceHandler for ChainReaderService { ChainRequest::GetTargetDagAccumulatorLeafDetail { leaf_index, batch_size, - } => { - Ok(ChainResponse::TargetDagAccumulatorLeafDetail(self.inner.get_target_dag_accumulator_leaf_detail(GetTargetDagAccumulatorLeafDetail { - leaf_index, - batch_size, - })?)) - }, + } => Ok(ChainResponse::TargetDagAccumulatorLeafDetail( + self.inner.get_target_dag_accumulator_leaf_detail( + GetTargetDagAccumulatorLeafDetail { + leaf_index, + batch_size, + }, + )?, + )), } } } @@ -335,6 +339,11 @@ impl ChainReaderServiceInner { pub fn update_dag_accumulator(&mut self, new_block_header: BlockHeader) -> Result<()> { async_std::task::block_on(self.flexidag_service.send(UpdateDagTips { block_header: new_block_header, + current_head_block_id: self.main.status().info().id(), + k_total_difficulty: KTotalDifficulty { + head_block_id: self.main.status().info().id(), + total_difficulty: self.main.status().info().get_total_difficulty(), + }, }))? } } @@ -357,11 +366,12 @@ impl ReadableChainService for ChainReaderServiceInner { .into_iter() .map(|block| { if let Some(block) = block { - let result_parents = async_std::task::block_on(self.flexidag_service.send(GetDagBlockParents { - block_id: block.id(), - })).expect("failed to get the dag block parents"); - let parents = match result_parents - { + let result_parents = + async_std::task::block_on(self.flexidag_service.send(GetDagBlockParents { + block_id: block.id(), + })) + .expect("failed to get the dag block parents"); + let parents = match result_parents { std::result::Result::Ok(parents) => parents.parents, Err(_) => panic!("failed to get parents of block {}", block.id()), }; @@ -510,32 +520,37 @@ impl ReadableChainService for ChainReaderServiceInner { &self, req: GetDagAccumulatorLeaves, ) -> anyhow::Result> { - Ok(async_std::task::block_on(self.flexidag_service.send(flexidag_service::GetDagAccumulatorLeaves { - leaf_index: req.accumulator_leaf_index, - batch_size: req.batch_size, - reverse: true, - }))??.into_iter().map(|leaf| { - TargetDagAccumulatorLeaf { - accumulator_root: leaf.dag_accumulator_root, - leaf_index: leaf.leaf_index, - } - }).collect()) + Ok(async_std::task::block_on(self.flexidag_service.send( + flexidag_service::GetDagAccumulatorLeaves { + leaf_index: req.accumulator_leaf_index, + batch_size: req.batch_size, + reverse: true, + }, + ))?? + .into_iter() + .map(|leaf| TargetDagAccumulatorLeaf { + accumulator_root: leaf.dag_accumulator_root, + leaf_index: leaf.leaf_index, + }) + .collect()) } fn get_target_dag_accumulator_leaf_detail( &self, req: GetTargetDagAccumulatorLeafDetail, ) -> anyhow::Result> { - let dag_details = async_std::task::block_on(self.flexidag_service.send(GetDagAccumulatorLeafDetail { - leaf_index: req.leaf_index, - batch_size: req.batch_size, - }))??; - Ok(dag_details.into_iter().map(|detail| { - TargetDagAccumulatorLeafDetail { + let dag_details = + async_std::task::block_on(self.flexidag_service.send(GetDagAccumulatorLeafDetail { + leaf_index: req.leaf_index, + batch_size: req.batch_size, + }))??; + Ok(dag_details + .into_iter() + .map(|detail| TargetDagAccumulatorLeafDetail { accumulator_root: detail.accumulator_root, tips: detail.tips, - } - }).collect()) + }) + .collect()) } } diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 59078ce555..51f8147297 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -29,6 +29,7 @@ use starcoin_storage::Store; use starcoin_time_service::TimeService; use starcoin_types::block::BlockIdAndNumber; use starcoin_types::contract_event::ContractEventInfo; +use starcoin_types::dag_block::KTotalDifficulty; use starcoin_types::filter::Filter; use starcoin_types::header::DagHeader; use starcoin_types::startup_info::{ChainInfo, ChainStatus}; @@ -47,6 +48,7 @@ use starcoin_vm_types::effects::Op; use starcoin_vm_types::genesis_config::ConsensusStrategy; use starcoin_vm_types::on_chain_resource::Epoch; use std::cmp::min; +use std::collections::BTreeSet; use std::iter::Extend; use std::option::Option::{None, Some}; use std::{collections::HashMap, sync::Arc}; @@ -192,6 +194,13 @@ impl BlockChain { .expect("failed to calculate the dag key"), new_tips, dag_accumulator.get_info(), + genesis_id, + [KTotalDifficulty { + head_block_id: genesis_id, + total_difficulty: executed_block.block_info().get_total_difficulty(), + }] + .into_iter() + .collect(), )?; Self::new(time_service, executed_block.block.id(), storage, net, None) } @@ -378,10 +387,7 @@ impl BlockChain { V::verify_block(self, block) } - pub fn apply_with_verifier( - &mut self, - block: Block, - ) -> Result + pub fn apply_with_verifier(&mut self, block: Block) -> Result where V: BlockVerifier, { @@ -393,18 +399,12 @@ impl BlockChain { } //TODO remove this function. - pub fn update_chain_head( - &mut self, - block: Block, - ) -> Result { + pub fn update_chain_head(&mut self, block: Block) -> Result { let block_info = self .storage .get_block_info(block.id())? .ok_or_else(|| format_err!("Can not find block info by hash {:?}", block.id()))?; - self.connect(ExecutedBlock { - block, - block_info, - }) + self.connect(ExecutedBlock { block, block_info }) } //TODO consider move this logic to BlockExecutor @@ -572,10 +572,7 @@ impl BlockChain { storage.save_table_infos(txn_table_infos)?; watch(CHAIN_WATCH_NAME, "n26"); - Ok(ExecutedBlock { - block, - block_info, - }) + Ok(ExecutedBlock { block, block_info }) } pub fn get_txn_accumulator(&self) -> &MerkleAccumulator { @@ -593,12 +590,10 @@ impl ChainReader for BlockChain { self.status.head.header().chain_id(), self.genesis_hash, self.status.status.clone(), - self.storage - .get_dag_accumulator_info() - .expect(&format!( - "the dag accumulator info cannot be found by id: {}", - self.status.head.header().id() - )), + self.storage.get_dag_accumulator_info().expect(&format!( + "the dag accumulator info cannot be found by id: {}", + self.status.head.header().id() + )), ) } @@ -607,10 +602,7 @@ impl ChainReader for BlockChain { } fn head_block(&self) -> ExecutedBlock { - ExecutedBlock::new( - self.status.head.clone(), - self.status.status.info.clone(), - ) + ExecutedBlock::new(self.status.head.clone(), self.status.status.info.clone()) } fn current_header(&self) -> BlockHeader { @@ -1082,10 +1074,7 @@ impl ChainWriter for BlockChain { self.statedb = ChainStateDB::new(self.storage.clone().into_super_arc(), Some(state_root)); self.status = ChainStatusWithBlock { - status: ChainStatus::new( - block.header().clone(), - block_info.clone(), - ), + status: ChainStatus::new(block.header().clone(), block_info.clone()), head: block.clone(), }; if self.epoch.end_block_number() == block.header().number() { @@ -1100,10 +1089,7 @@ impl ChainWriter for BlockChain { Ok(executed_block) } - fn apply( - &mut self, - block: Block, - ) -> Result { + fn apply(&mut self, block: Block) -> Result { self.apply_with_verifier::(block) } diff --git a/chain/src/verifier/mod.rs b/chain/src/verifier/mod.rs index e398bee747..eb8ed93d29 100644 --- a/chain/src/verifier/mod.rs +++ b/chain/src/verifier/mod.rs @@ -184,49 +184,49 @@ impl BlockVerifier for BasicVerifier { // dag // jacktest: TODO: the verifying should be modified!!! // if chain_status.tips_hash.is_some() { - // let mut tips_hash = chain_status.tips_hash.clone().unwrap(); - // tips_hash.sort(); - - // if it is a dag block - // if HashValue::sha3_256_of(&tips_hash.encode().expect("hash encode must be successful")) - // != new_block_parent - // { - // // or a block of a single chain - // verify_block!( - // VerifyBlockField::Header, - // expect_number == new_block_header.number(), - // "Invalid block: Unexpect block number, expect:{}, got: {}.", - // expect_number, - // new_block_header.number() - // ); - - // verify_block!( - // VerifyBlockField::Header, - // current_id == new_block_parent, - // "Invalid block: Parent id mismatch, expect:{}, got: {}, number:{}.", - // current_id, - // new_block_parent, - // new_block_header.number() - // ); - // } + // let mut tips_hash = chain_status.tips_hash.clone().unwrap(); + // tips_hash.sort(); + + // if it is a dag block + // if HashValue::sha3_256_of(&tips_hash.encode().expect("hash encode must be successful")) + // != new_block_parent + // { + // // or a block of a single chain + // verify_block!( + // VerifyBlockField::Header, + // expect_number == new_block_header.number(), + // "Invalid block: Unexpect block number, expect:{}, got: {}.", + // expect_number, + // new_block_header.number() + // ); + + // verify_block!( + // VerifyBlockField::Header, + // current_id == new_block_parent, + // "Invalid block: Parent id mismatch, expect:{}, got: {}, number:{}.", + // current_id, + // new_block_parent, + // new_block_header.number() + // ); + // } // } else { - // or a block of a single chain - verify_block!( - VerifyBlockField::Header, - expect_number == new_block_header.number(), - "Invalid block: Unexpect block number, expect:{}, got: {}.", - expect_number, - new_block_header.number() - ); + // or a block of a single chain + verify_block!( + VerifyBlockField::Header, + expect_number == new_block_header.number(), + "Invalid block: Unexpect block number, expect:{}, got: {}.", + expect_number, + new_block_header.number() + ); - verify_block!( - VerifyBlockField::Header, - current_id == new_block_parent, - "Invalid block: Parent id mismatch, expect:{}, got: {}, number:{}.", - current_id, - new_block_parent, - new_block_header.number() - ); + verify_block!( + VerifyBlockField::Header, + current_id == new_block_parent, + "Invalid block: Parent id mismatch, expect:{}, got: {}, number:{}.", + current_id, + new_block_parent, + new_block_header.number() + ); // } verify_block!( VerifyBlockField::Header, diff --git a/consensus/src/dag/blockdag.rs b/consensus/src/dag/blockdag.rs index 728f76c5cd..a50b4459fb 100644 --- a/consensus/src/dag/blockdag.rs +++ b/consensus/src/dag/blockdag.rs @@ -1,7 +1,6 @@ use super::ghostdag::protocol::{ColoringOutput, GhostdagManager}; use super::reachability::{inquirer, reachability_service::MTReachabilityService}; use super::types::ghostdata::GhostdagData; -use crate::FlexiDagStorageConfig; use crate::consensusdb::prelude::StoreError; use crate::consensusdb::schemadb::GhostdagStoreReader; use crate::consensusdb::{ @@ -11,25 +10,26 @@ use crate::consensusdb::{ HeaderStore, ReachabilityStoreReader, RelationsStore, RelationsStoreReader, }, }; +use crate::FlexiDagStorageConfig; use anyhow::{anyhow, bail, Ok}; use bcs_ext::BCSCodec; use parking_lot::RwLock; use starcoin_accumulator::accumulator_info::AccumulatorInfo; -use starcoin_accumulator::{MerkleAccumulator, Accumulator}; use starcoin_accumulator::node::AccumulatorStoreType; +use starcoin_accumulator::{Accumulator, MerkleAccumulator}; use starcoin_config::NodeConfig; use starcoin_crypto::HashValue as Hash; -use starcoin_storage::flexi_dag::SyncFlexiDagSnapshot; +use starcoin_storage::flexi_dag::{KTotalDifficulty, SyncFlexiDagSnapshot, SyncFlexiDagSnapshotHasher}; use starcoin_storage::storage::CodecKVStore; -use starcoin_storage::{Store, SyncFlexiDagStore, Storage, BlockStore}; +use starcoin_storage::{BlockStore, Storage, Store, SyncFlexiDagStore}; use starcoin_types::block::BlockNumber; use starcoin_types::startup_info; use starcoin_types::{ blockhash::{BlockHashes, KType, ORIGIN}, header::{ConsensusHeader, DagHeader}, }; -use std::collections::HashMap; use std::collections::HashSet; +use std::collections::{BinaryHeap, HashMap}; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -90,41 +90,93 @@ impl BlockDAG { dag } - pub fn try_init_with_storage(storage: Arc, config: Arc) -> anyhow::Result<(Option, Option)> { - let startup_info = storage.get_startup_info()?.expect("startup info must exist"); + pub fn calculate_dag_accumulator_key(snapshot: &SyncFlexiDagSnapshotHasher) -> anyhow::Result { + Ok(Hash::sha3_256_of(&snapshot.encode().expect( + "encoding the sorted relatship set must be successful", + ))) + } + + pub fn try_init_with_storage( + storage: Arc, + config: Arc, + ) -> anyhow::Result<(Option, Option)> { + let startup_info = storage + .get_startup_info()? + .expect("startup info must exist"); if let Some(key) = startup_info.get_dag_main() { - let accumulator_info = storage.get_dag_accumulator_info()?.expect("dag accumulator info should exist"); - assert!(accumulator_info.get_num_leaves() > 0, "the number of dag accumulator leaf must be greater than 0"); + let accumulator_info = storage + .get_dag_accumulator_info()? + .expect("dag accumulator info should exist"); + assert!( + accumulator_info.get_num_leaves() > 0, + "the number of dag accumulator leaf must be greater than 0" + ); let dag_accumulator = MerkleAccumulator::new_with_info( accumulator_info, storage.get_accumulator_store(AccumulatorStoreType::SyncDag), ); - let dag_genesis_hash = dag_accumulator.get_leaf(0)?.expect("the genesis in dag accumulator must none be none"); - - let dag_genesis_header = storage.get_block_header_by_hash(dag_genesis_hash)?.expect("the genesis block in dag accumulator must none be none"); + let dag_genesis_hash = dag_accumulator + .get_leaf(0)? + .expect("the genesis in dag accumulator must none be none"); - Ok((Some(Self::new_by_config(DagHeader::new_genesis(dag_genesis_header), config.data_dir().join("flexidag").as_path())?), Some(dag_accumulator))) + let dag_genesis_header = storage + .get_block_header_by_hash(dag_genesis_hash)? + .expect("the genesis block in dag accumulator must none be none"); + Ok(( + Some(Self::new_by_config( + DagHeader::new_genesis(dag_genesis_header), + config.data_dir().join("flexidag").as_path(), + )?), + Some(dag_accumulator), + )) } else { - let block_header = storage.get_block_header_by_hash(startup_info.get_main().clone())?.expect("the genesis block in dag accumulator must none be none"); + let block_header = storage + .get_block_header_by_hash(startup_info.get_main().clone())? + .expect("the genesis block in dag accumulator must none be none"); let fork_height = storage.dag_fork_height(config.net().id().clone()); if block_header.number() < fork_height { Ok((None, None)) } else if block_header.number() == fork_height { - let dag_accumulator = MerkleAccumulator::new_with_info(AccumulatorInfo::default(), storage.get_accumulator_store(AccumulatorStoreType::SyncDag)); - dag_accumulator.append(&[block_header.id()])?; - storage.get_accumulator_snapshot_storage().put(Self::calculate_dag_accumulator_key(vec![block_header.id()])?, SyncFlexiDagSnapshot { + let dag_accumulator = MerkleAccumulator::new_with_info( + AccumulatorInfo::default(), + storage.get_accumulator_store(AccumulatorStoreType::SyncDag), + ); + + + let k_total_difficulties = BinaryHeap::new(); + k_total_difficulties.push(KTotalDifficulty { + head_block_id: block_header.id(), + total_difficulty: storage + .get_block_info(block_header.id())? + .expect("block info must exist") + .get_total_difficulty(), + }); + let snapshot_hasher = SyncFlexiDagSnapshotHasher { child_hashes: vec![block_header.id()], - accumulator_info: dag_accumulator.get_info(), head_block_id: block_header.id(), - })?; - Ok((Some(Self::new_by_config(DagHeader::new_genesis(block_header), config.data_dir().join("flexidag").as_path())?), Some(dag_accumulator))) + k_total_difficulties, + }; + let key = Self::calculate_dag_accumulator_key(&snapshot_hasher)?; + dag_accumulator.append(&[key])?; + storage.get_accumulator_snapshot_storage().put( + key, + snapshot_hasher.to_snapshot(dag_accumulator.get_info()), + )?; + dag_accumulator.flush()?; + Ok(( + Some(Self::new_by_config( + DagHeader::new_genesis(block_header), + config.data_dir().join("flexidag").as_path(), + )?), + Some(dag_accumulator), + )) } else { bail!("failed to init dag") } } } - + pub fn new_by_config(header: DagHeader, db_path: &Path) -> anyhow::Result { let config = FlexiDagStorageConfig::create_with_params(1, 0, 1024); let db = FlexiDagStorage::create_from_path(db_path, config)?; @@ -132,13 +184,6 @@ impl BlockDAG { Ok(dag) } - pub fn calculate_dag_accumulator_key(mut tips: Vec) -> anyhow::Result { - tips.sort(); - Ok(Hash::sha3_256_of(&tips.encode().expect( - "encoding the sorted relatship set must be successful", - ))) - } - pub fn clear_missing_block(&mut self) { self.missing_blocks.clear(); } diff --git a/flexidag/src/flexidag_service.rs b/flexidag/src/flexidag_service.rs index 84e78050f3..8966d0340e 100644 --- a/flexidag/src/flexidag_service.rs +++ b/flexidag/src/flexidag_service.rs @@ -1,18 +1,28 @@ -use std::sync::Arc; - -use anyhow::{anyhow, Result, Ok, bail, Error}; -use starcoin_accumulator::{Accumulator, MerkleAccumulator, accumulator_info::AccumulatorInfo}; -use starcoin_config::NodeConfig; -use starcoin_consensus::{BlockDAG, dag::types::ghostdata::GhostdagData}; +use std::{ + collections::{BTreeSet, BinaryHeap}, + sync::Arc, +}; + +use anyhow::{anyhow, bail, Error, Ok, Result}; +use starcoin_accumulator::{accumulator_info::AccumulatorInfo, Accumulator, MerkleAccumulator}; +use starcoin_config::{NodeConfig, TimeService}; +use starcoin_consensus::{dag::types::ghostdata::GhostdagData, BlockDAG}; use starcoin_crypto::HashValue; -use starcoin_service_registry::{ActorService, ServiceContext, ServiceFactory, ServiceHandler, ServiceRequest}; -use starcoin_storage::{storage::CodecKVStore, flexi_dag::SyncFlexiDagSnapshot, Storage, SyncFlexiDagStore, BlockStore}; -use starcoin_types::{block::BlockHeader, startup_info, header::DagHeader}; +use starcoin_service_registry::{ + ActorService, ServiceContext, ServiceFactory, ServiceHandler, ServiceRequest, +}; +use starcoin_storage::{ + flexi_dag::{KTotalDifficulty, SyncFlexiDagSnapshot, SyncFlexiDagSnapshotHasher}, + storage::CodecKVStore, + BlockStore, Storage, SyncFlexiDagStore, block_info::BlockInfoStore, +}; +use starcoin_types::{block::BlockHeader, header::DagHeader, startup_info}; #[derive(Debug, Clone)] pub struct DumpTipsToAccumulator { pub block_header: BlockHeader, pub current_head_block_id: HashValue, + pub k_total_difficulty: KTotalDifficulty, } impl ServiceRequest for DumpTipsToAccumulator { @@ -23,6 +33,7 @@ impl ServiceRequest for DumpTipsToAccumulator { pub struct UpdateDagTips { pub block_header: BlockHeader, pub current_head_block_id: HashValue, + pub k_total_difficulty: KTotalDifficulty, } impl ServiceRequest for UpdateDagTips { @@ -105,25 +116,36 @@ impl ServiceRequest for AddToDag { type Response = anyhow::Result; } +#[derive(Debug, Clone)] +pub struct ForkDagAccumulator { + pub new_blocks: Vec, + pub dag_accumulator_index: u64, + pub block_header_id: HashValue, +} + +impl ServiceRequest for ForkDagAccumulator { + type Response = anyhow::Result; +} + +pub struct TipInfo { + tips: Option>, // some is for dag or the state of the chain is still in old version + k_total_difficulties: BTreeSet, +} pub struct FlexidagService { dag: Option, dag_accumulator: Option, - tips: Option>, // some is for dag or the state of the chain is still in old version + tip_info: Option, storage: Arc, } impl FlexidagService { - pub fn add_to_dag( - &mut self, - header: BlockHeader, - ) -> Result> { + pub fn add_to_dag(&mut self, header: BlockHeader) -> Result> { let dag = match &mut self.dag { Some(dag) => dag, None => bail!("dag is none"), }; - match dag.get_ghostdag_data(header.id()) - { + match dag.get_ghostdag_data(header.id()) { std::result::Result::Ok(ghost_dag_data) => Ok(ghost_dag_data), Err(_) => std::result::Result::Ok(Arc::new( // jacktest: TODO:add_to_dag should not use parents hash since the block header has them @@ -131,30 +153,115 @@ impl FlexidagService { )), } } + + fn create_snapshot_by_tips(tips: Vec, head_block_id: HashValue) -> Result<(HashValue, SyncFlexiDagSnapshotHasher)> { + let k_total_difficulties = BTreeSet::new(); + tips.iter().for_each(|block_id| { + k_total_difficulties.insert(KTotalDifficulty { + head_block_id: block_id.clone(), + total_difficulty: self.storage.get_block_info(block_id.clone()).expect("block info should not be none").ok_or_else(error || anyhow!("block info should not be none"))?.total_difficulty, + }); + }); + + let snaphot_hasher = SyncFlexiDagSnapshotHasher { + child_hashes: tips, + head_block_id, + k_total_difficulties, + }; + + Ok((BlockDAG::calculate_dag_accumulator_key(&snapshot_hasher)?, snaphot_hasher)) + } + + fn merge_from_big_dag(&mut self, msg: ForkDagAccumulator) -> Result { + let dag_accumulator = self.dag_accumulator.as_mut().ok_or_else("the dag accumulator should not be none")?; + if dag_accumulator.num_leaves() != msg.dag_accumulator_index { + bail!("cannot merge dag accumulator since its number is not the same as other"); + } + let tip_info = self.tip_info.as_mut().ok_or_else("the tips should not be none")?; + msg.new_blocks.iter().for_each(|block_id| { + if !tip_info.tips.contains(block_id) { + tip_info.tips.push(block_id.clone()); + } + }); + + let (key, snaphot_hasher) = Self::create_snapshot_by_tips(tip_info.tips, msg.block_header_id)?; + dag_accumulator.append(&vec![key])?; + let dag_accumulator_info = dag_accumulator.get_info(); + self.storage.get_accumulator_snapshot_storage().put(key, snaphot_hasher.to_snapshot(dag_accumulator_info))?; + dag_accumulator.flush()?; + Ok(dag_accumulator_info) + } + + fn merge_from_small_dag(&mut self, msg: ForkDagAccumulator) -> Result { + let dag_accumulator = self + .dag_accumulator + .as_mut() + .ok_or_else(error || anyhow!("dag accumulator is none"))?; + // fetch the block in the dag according to the dag accumulator index + let previous_key = dag_accumulator.get_leaf(msg.dag_accumulator_index - 1)? + .ok_or_else(error || anyhow!("the dag snapshot hash is none"))?; + + let current_key = dag_accumulator.get_leaf(msg.dag_accumulator_index)? + .ok_or_else(error || anyhow!("the dag snapshot hash is none"))?; + + let pre_snapshot = self + .storage + .get_accumulator_snapshot_storage() + .get(previous_key)? + .ok_or_else(error || anyhow!("the dag snapshot is none"))?; + + let current_snapshot = self + .storage + .get_accumulator_snapshot_storage() + .get(current_key)? + .ok_or_else(error || anyhow!("the dag snapshot is none"))?; + + // fork the dag accumulator according to the ForkDagAccumulator.dag_accumulator_index + let fork = dag_accumulator.fork(Some(pre_snapshot.accumulator_info)); + + let mut new_blocks = msg.new_blocks; + current_snapshot.child_hashes.iter().for_each(|block_id| { + if !new_blocks.contains(block_id) { + new_blocks.push(block_id.clone()); + } + }); + + let (key, snaphot_hasher) = Self::create_snapshot_by_tips(new_blocks, msg.block_header_id)?; + fork.append(&vec![key])?; + let dag_accumulator_info = fork.get_info(); + self.storage.get_accumulator_snapshot_storage().put(key, snaphot_hasher.to_snapshot(dag_accumulator_info))?; + fork.flush()?; + Ok(dag_accumulator_info) + } + } impl ServiceFactory for FlexidagService { fn create(ctx: &mut ServiceContext) -> Result { let storage = ctx.get_shared::>()?; let config = ctx.get_shared::>()?; - let (dag, dag_accumulator) = BlockDAG::try_init_with_storage(storage.clone(), config.clone())?; - let tips = dag_accumulator.as_ref().map(|accumulator| { + let (dag, dag_accumulator) = + BlockDAG::try_init_with_storage(storage.clone(), config.clone())?; + let tip_info = dag_accumulator.as_ref().map(|accumulator| { let tips_index = accumulator.num_leaves(); let tips_key = accumulator .get_leaf(tips_index) .expect("failed to read the dag snapshot hash") .expect("the dag snapshot hash is none"); - storage + let snapshot = storage .get_accumulator_snapshot_storage() .get(tips_key) .expect("failed to read the snapsho object") - .expect("dag snapshot object is none") - .child_hashes + .expect("dag snapshot object is none"); + TipInfo { + tips: Some(snapshot.child_hashes), + k_total_difficulties: snapshot.k_total_difficulties, + } }); Ok(Self { dag, dag_accumulator, - tips, + tip_info, storage: storage.clone(), }) } @@ -172,11 +279,15 @@ impl ActorService for FlexidagService { } } -// send this message after minting a new block -// and the block was committed +// send this message after minting a new block +// and the block was committed // and startup info was updated impl ServiceHandler for FlexidagService { - fn handle(&mut self, msg: DumpTipsToAccumulator, ctx: &mut ServiceContext) -> Result<()> { + fn handle( + &mut self, + msg: DumpTipsToAccumulator, + ctx: &mut ServiceContext, + ) -> Result<()> { let storage = ctx.get_shared::>()?; if self.tips.is_none() { let config = ctx.get_shared::>()?; @@ -186,37 +297,66 @@ impl ServiceHandler for FlexidagService { } else { // initialize the dag data, the chain will be the dag chain at next block self.dag = dag; - self.tips = Some(vec![msg.block_header.id()]); self.dag_accumulator = dag_accumulator; - + self.tip_info = Some(TipInfo { + tips: Some(vec![msg.block_header.id()]), + k_total_difficulties: [msg.block_header.id()].into_iter().cloned().collect(), + }); + self.storage = storage.clone(); Ok(()) } } else { // the chain had became the flexidag chain - let tips = self.tips.take().expect("the tips should not be none in this branch"); + let tip_info = self + .tip_info + .take() + .expect("the tips should not be none in this branch"); let key = BlockDAG::calculate_dag_accumulator_key(tips.clone())?; - let dag = self.dag_accumulator.as_mut().expect("the tips is not none but the dag accumulator is none"); + let dag = self + .dag_accumulator + .as_mut() + .expect("the tips is not none but the dag accumulator is none"); dag.append(&vec![key])?; - storage.get_accumulator_snapshot_storage().put(key, SyncFlexiDagSnapshot { - child_hashes: tips, - accumulator_info: dag.get_info(), - head_block_id: msg.current_head_block_id, - })?; + storage.get_accumulator_snapshot_storage().put( + key, + SyncFlexiDagSnapshot { + child_hashes: tip_info.tips.expect("the tips should not be none"), + accumulator_info: dag.get_info(), + head_block_id: msg.current_head_block_id, + k_total_difficulties: tip_info + .k_total_difficulties + .into_iter() + .take(16) + .cloned() + .collect(), + }, + )?; dag.flush()?; - self.tips = Some(vec![msg.block_header.id()]); + self.tip_info = Some(TipInfo { + tips: Some(vec![msg.block_header.id()]), + k_total_difficulties: [msg.block_header.id()].into_iter().cloned().collect(), + }); + self.storage = storage.clone(); Ok(()) } } } impl ServiceHandler for FlexidagService { - fn handle(&mut self, msg: UpdateDagTips, ctx: &mut ServiceContext) -> Result<()> { + fn handle( + &mut self, + msg: UpdateDagTips, + ctx: &mut ServiceContext, + ) -> Result<()> { let header = msg.block_header; - match self.tips.clone() { - Some(mut tips) => { - if !tips.contains(&header.id()) { - tips.push(header.id()); - self.tips = Some(tips); + match &mut self.tip_info { + Some(tip_info) => { + if !tip_info.tips.contains(&header.id()) { + tip_info.tips.push(header.id()); + tip_info.k_total_difficulties.insert(KTotalDifficulty { + head_block_id: msg.k_total_difficulty.head_block_id, + total_difficulty: msg.k_total_difficulty.total_difficulty, + }); } Ok(()) } @@ -224,19 +364,29 @@ impl ServiceHandler for FlexidagService { let storage = ctx.get_shared::>()?; let config = ctx.get_shared::>()?; if header.number() == storage.dag_fork_height(config.net().id().clone()) { - let (dag, dag_accumulator) = BlockDAG::try_init_with_storage(storage.clone(), config)?; + let (dag, dag_accumulator) = + BlockDAG::try_init_with_storage(storage.clone(), config)?; if dag.is_none() { Ok(()) // the chain is still in single chain } else { // initialize the dag data, the chain will be the dag chain at next block self.dag = dag; - self.tips = Some(vec![header.id()]); + self.tip_info = Some(TipInfo { + tips: Some(vec![msg.block_header.id()]), + k_total_difficulties: [msg.block_header.id()] + .into_iter() + .cloned() + .collect(), + }); self.dag_accumulator = dag_accumulator; - storage.get_startup_info()?.map(|mut startup_info| { - startup_info.dag_main = Some(header.id()); - storage.save_startup_info(startup_info) - }).expect("starup info should not be none") + storage + .get_startup_info()? + .map(|mut startup_info| { + startup_info.dag_main = Some(header.id()); + storage.save_startup_info(startup_info) + }) + .expect("starup info should not be none") } } else { Ok(()) // drop the block, the chain is still in single chain @@ -247,21 +397,34 @@ impl ServiceHandler for FlexidagService { } impl ServiceHandler for FlexidagService { - fn handle(&mut self, _msg: GetDagTips, _ctx: &mut ServiceContext) -> Result>> { + fn handle( + &mut self, + _msg: GetDagTips, + _ctx: &mut ServiceContext, + ) -> Result>> { Ok(self.tips.clone()) } } impl ServiceHandler for FlexidagService { - fn handle(&mut self, _msg: GetDagAccumulatorInfo, _ctx: &mut ServiceContext) -> Result> { - Ok(self.dag_accumulator.as_ref().map(|dag_accumulator_info| { - dag_accumulator_info.get_info() - })) + fn handle( + &mut self, + _msg: GetDagAccumulatorInfo, + _ctx: &mut ServiceContext, + ) -> Result> { + Ok(self + .dag_accumulator + .as_ref() + .map(|dag_accumulator_info| dag_accumulator_info.get_info())) } } impl ServiceHandler for FlexidagService { - fn handle(&mut self, msg: GetDagAccumulatorLeaves, _ctx: &mut ServiceContext) -> Result> { + fn handle( + &mut self, + msg: GetDagAccumulatorLeaves, + _ctx: &mut ServiceContext, + ) -> Result> { match &self.dag_accumulator { Some(dag_accumulator) => { let end_index = std::cmp::min( @@ -275,8 +438,14 @@ impl ServiceHandler for FlexidagService { } else { index }; - let key = dag_accumulator.get_leaf(real_index)?.ok_or_else(|| anyhow!("the dag snapshot hash is none"))?; - let snaptshot = self.storage.get_accumulator_snapshot_storage().get(key)?.expect("the snapshot should not be none"); + let key = dag_accumulator + .get_leaf(real_index)? + .ok_or_else(|| anyhow!("the dag snapshot hash is none"))?; + let snaptshot = self + .storage + .get_accumulator_snapshot_storage() + .get(key)? + .expect("the snapshot should not be none"); result.push(DagAccumulatorLeaf { leaf_index: real_index, dag_accumulator_root: snaptshot.accumulator_info.accumulator_root, @@ -290,16 +459,26 @@ impl ServiceHandler for FlexidagService { } impl ServiceHandler for FlexidagService { - fn handle(&mut self, msg: GetDagBlockParents, _ctx: &mut ServiceContext) -> Result { + fn handle( + &mut self, + msg: GetDagBlockParents, + _ctx: &mut ServiceContext, + ) -> Result { match &self.dag { - Some(dag) => Ok(DagBlockParents { parents: dag.get_parents(msg.block_id)? } ) , + Some(dag) => Ok(DagBlockParents { + parents: dag.get_parents(msg.block_id)?, + }), None => bail!("dag is none"), } } } impl ServiceHandler for FlexidagService { - fn handle(&mut self, msg: GetDagAccumulatorLeafDetail, _ctx: &mut ServiceContext) -> Result> { + fn handle( + &mut self, + msg: GetDagAccumulatorLeafDetail, + _ctx: &mut ServiceContext, + ) -> Result> { match &self.dag_accumulator { Some(dag_accumulator) => { let end_index = std::cmp::min( @@ -307,10 +486,14 @@ impl ServiceHandler for FlexidagService { dag_accumulator.num_leaves() - 1, ); let mut details = vec![]; - let snapshot_storage = self.storage.get_accumulator_snapshot_storage(); + let snapshot_storage = self.storage.get_accumulator_snapshot_storage(); for index in msg.leaf_index..=end_index { - let key = dag_accumulator.get_leaf(index)?.ok_or_else(|| anyhow!("the dag snapshot hash is none"))?; - let snapshot = snapshot_storage.get(key)?.ok_or_else(|| anyhow!("the dag snapshot is none"))?; + let key = dag_accumulator + .get_leaf(index)? + .ok_or_else(|| anyhow!("the dag snapshot hash is none"))?; + let snapshot = snapshot_storage + .get(key)? + .ok_or_else(|| anyhow!("the dag snapshot is none"))?; details.push(DagAccumulatorLeafDetail { accumulator_root: snapshot.accumulator_info.accumulator_root, tips: snapshot.child_hashes, @@ -324,11 +507,37 @@ impl ServiceHandler for FlexidagService { } impl ServiceHandler for FlexidagService { - fn handle(&mut self, msg: AddToDag, _ctx: &mut ServiceContext) -> Result { + fn handle( + &mut self, + msg: AddToDag, + _ctx: &mut ServiceContext, + ) -> Result { let ghost_dag_data = self.add_to_dag(msg.block_header)?; - Ok(MergesetBlues { + Ok(MergesetBlues { selected_parent: ghost_dag_data.selected_parent, - mergeset_blues: ghost_dag_data.mergeset_blues.as_ref().clone(), + mergeset_blues: ghost_dag_data.mergeset_blues.as_ref().clone(), }) } } + +impl ServiceHandler for FlexidagService { + fn handle( + &mut self, + msg: ForkDagAccumulator, + _ctx: &mut ServiceContext, + ) -> Result { + let dag_accumulator = self + .dag_accumulator + .as_ref() + .ok_or_else(error || anyhow!("dag accumulator is none"))?; + + if msg.dag_accumulator_index > dag_accumulator.num_leaves() { + self.merge_from_big_dag(msg) + } else { + self.merge_from_small_dag(msg) + } + + + // append the ForkDagAccumulator.new_blocks and the fetched blocks above into the forked dag accumulator + } +} diff --git a/miner/src/create_block_template/mod.rs b/miner/src/create_block_template/mod.rs index 4a91130f14..17764bcb83 100644 --- a/miner/src/create_block_template/mod.rs +++ b/miner/src/create_block_template/mod.rs @@ -14,11 +14,12 @@ use starcoin_consensus::Consensus; use starcoin_crypto::hash::HashValue; use starcoin_executor::VMMetrics; use starcoin_flexidag::flexidag_service::GetDagTips; -use starcoin_flexidag::{FlexidagService, flexidag_service}; +use starcoin_flexidag::{flexidag_service, FlexidagService}; use starcoin_logger::prelude::*; use starcoin_open_block::OpenedBlock; use starcoin_service_registry::{ - ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRequest, ServiceRef, + ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, ServiceRef, + ServiceRequest, }; use starcoin_storage::{BlockStore, Storage, Store}; use starcoin_txpool::TxPoolService; @@ -372,6 +373,8 @@ where } fn get_dag_block_parents(&self) -> Result>> { - Ok(async_std::task::block_on(self.flexidag_service.send(GetDagTips))??) + Ok(async_std::task::block_on( + self.flexidag_service.send(GetDagTips), + )??) } } diff --git a/network-rpc/api/src/dag_protocol.rs b/network-rpc/api/src/dag_protocol.rs index c47b28ac52..17b2936f7d 100644 --- a/network-rpc/api/src/dag_protocol.rs +++ b/network-rpc/api/src/dag_protocol.rs @@ -44,6 +44,4 @@ pub struct SyncDagBlockInfo { pub block_id: HashValue, pub block: Option, pub peer_id: Option, - pub dag_parents: Vec, - pub dag_transaction_header: Option, } diff --git a/network/api/src/messages.rs b/network/api/src/messages.rs index 053fa3a5c5..0e0de5351b 100644 --- a/network/api/src/messages.rs +++ b/network/api/src/messages.rs @@ -48,7 +48,6 @@ impl Sample for TransactionsMessage { pub struct CompactBlockMessage { pub compact_block: CompactBlock, pub block_info: BlockInfo, - pub tips_hash: Option>, } impl CompactBlockMessage { @@ -60,7 +59,6 @@ impl CompactBlockMessage { Self { compact_block, block_info, - tips_hash, } } } diff --git a/network/src/service.rs b/network/src/service.rs index cdad26aadb..15985eea9f 100644 --- a/network/src/service.rs +++ b/network/src/service.rs @@ -717,12 +717,11 @@ impl Inner { //2. Sync status change. // may be update by repeat message, but can not find a more good way. self.network_service.update_business_status( - ChainStatus::new( - msg.compact_block.header.clone(), - msg.block_info.clone(), - ) - .encode() - .expect("Encoding the compact_block.header and block_info must be successful"), + ChainStatus::new(msg.compact_block.header.clone(), msg.block_info.clone()) + .encode() + .expect( + "Encoding the compact_block.header and block_info must be successful", + ), ); self.self_peer.known_blocks.put(id, ()); diff --git a/network/types/src/peer_info.rs b/network/types/src/peer_info.rs index ca3c898301..13b0463afb 100644 --- a/network/types/src/peer_info.rs +++ b/network/types/src/peer_info.rs @@ -71,8 +71,6 @@ impl PeerInfo { pub fn update_dag_accumulator_info(&mut self, dag_accumulator_info: Option) { self.chain_info .update_dag_accumulator_info(dag_accumulator_info) - - } /// This peer is support notification diff --git a/node/src/node.rs b/node/src/node.rs index a15d0534f7..23d09bb621 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -224,11 +224,7 @@ impl ServiceHandler for NodeService { block_hash ) })?; - let result = connect_service - .send(ExecuteRequest { - block, - }) - .await??; + let result = connect_service.send(ExecuteRequest { block }).await??; info!("Re execute result: {:?}", result); Ok(()) }; diff --git a/rpc/api/src/types.rs b/rpc/api/src/types.rs index cfed8fab61..523be0cb14 100644 --- a/rpc/api/src/types.rs +++ b/rpc/api/src/types.rs @@ -23,7 +23,9 @@ use starcoin_crypto::{CryptoMaterialError, HashValue, ValidCryptoMaterialStringE use starcoin_resource_viewer::{AnnotatedMoveStruct, AnnotatedMoveValue}; use starcoin_service_registry::ServiceRequest; use starcoin_state_api::{StateProof, StateWithProof, StateWithTableItemProof}; -use starcoin_types::block::{Block, BlockBody, BlockHeader, BlockHeaderExtra, BlockInfo, BlockNumber, ParentsHash}; +use starcoin_types::block::{ + Block, BlockBody, BlockHeader, BlockHeaderExtra, BlockInfo, BlockNumber, ParentsHash, +}; use starcoin_types::contract_event::{ContractEvent, ContractEventInfo}; use starcoin_types::event::EventKey; use starcoin_types::genesis_config; diff --git a/storage/src/flexi_dag/mod.rs b/storage/src/flexi_dag/mod.rs index 6c86f98551..789dc31fe1 100644 --- a/storage/src/flexi_dag/mod.rs +++ b/storage/src/flexi_dag/mod.rs @@ -1,4 +1,7 @@ -use std::sync::Arc; +use std::{ + collections::{BTreeSet, BinaryHeap}, + sync::Arc, +}; use crate::{ accumulator::{AccumulatorStorage, DagBlockAccumulatorStorage}, @@ -11,12 +14,44 @@ use bcs_ext::BCSCodec; use serde::{Deserialize, Serialize}; use starcoin_accumulator::accumulator_info::AccumulatorInfo; use starcoin_crypto::HashValue; +use starcoin_types::dag_block::KTotalDifficulty; +use starcoin_uint::U256; #[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)] pub struct SyncFlexiDagSnapshot { pub child_hashes: Vec, // child nodes(tips), to get the relationship, use dag's relationship store pub accumulator_info: AccumulatorInfo, pub head_block_id: HashValue, // to initialize the BlockInfo + pub k_total_difficulties: BTreeSet, // the k-th smallest total difficulty +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)] +pub struct SyncFlexiDagSnapshotHasher { + pub child_hashes: Vec, // child nodes(tips), to get the relationship, use dag's relationship store + pub head_block_id: HashValue, // to initialize the BlockInfo + pub k_total_difficulties: BTreeSet, // the k-th smallest total difficulty +} + +impl SyncFlexiDagSnapshotHasher { + pub fn to_snapshot(self, accumulator_info: AccumulatorInfo) -> SyncFlexiDagSnapshot { + SyncFlexiDagSnapshot { + child_hashes: self.child_hashes, + accumulator_info, + head_block_id: self.head_block_id, + k_total_difficulties: self.k_total_difficulties, + } + } +} + +impl From for SyncFlexiDagSnapshotHasher { + fn from(mut value: SyncFlexiDagSnapshot) -> Self { + value.child_hashes.sort(); + SyncFlexiDagSnapshotHasher { + child_hashes: value.child_hashes, + head_block_id: value.head_block_id, + k_total_difficulties: value.k_total_difficulties + } + } } impl ValueCodec for SyncFlexiDagSnapshot { diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 598b72d764..a3546ee7ec 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -21,7 +21,9 @@ use network_p2p_types::peer_id::PeerId; use num_enum::{IntoPrimitive, TryFromPrimitive}; use once_cell::sync::Lazy; use starcoin_accumulator::{ - accumulator_info::{AccumulatorInfo, self}, node::AccumulatorStoreType, AccumulatorTreeStore, + accumulator_info::{self, AccumulatorInfo}, + node::AccumulatorStoreType, + AccumulatorTreeStore, }; use starcoin_config::ChainNetworkID; use starcoin_crypto::HashValue; @@ -31,8 +33,9 @@ use starcoin_types::{ block::{Block, BlockBody, BlockHeader, BlockInfo, BlockNumber}, blockhash::ORIGIN, contract_event::ContractEvent, + dag_block::KTotalDifficulty, header, - startup_info::{ChainInfo, ChainStatus, SnapshotRange, StartupInfo, self}, + startup_info::{self, ChainInfo, ChainStatus, SnapshotRange, StartupInfo}, transaction::{RichTransactionInfo, Transaction}, }; use starcoin_vm_types::{ @@ -41,7 +44,7 @@ use starcoin_vm_types::{ state_store::table::{TableHandle, TableInfo}, }; use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, fmt::{Debug, Display, Formatter}, sync::Arc, }; @@ -330,6 +333,8 @@ pub trait SyncFlexiDagStore { key: HashValue, new_tips: Vec, accumulator_info: AccumulatorInfo, + head_block_id: HashValue, + k_total_difficulties: BTreeSet, ) -> Result<()>; fn get_dag_accumulator_info(&self) -> Result>; fn get_tips_by_block_id(&self, block_id: HashValue) -> Result>; @@ -682,19 +687,29 @@ impl SyncFlexiDagStore for Storage { let dag_main = dag_main.unwrap(); - Ok(Some(self.flexi_dag_storage.get_snapshot_storage().get(dag_main)?.expect("snapshot should not be none").accumulator_info)) + Ok(Some( + self.flexi_dag_storage + .get_snapshot_storage() + .get(dag_main)? + .expect("snapshot should not be none") + .accumulator_info, + )) } - // update dag accumulator + // update dag accumulator fn append_dag_accumulator_leaf( &self, key: HashValue, new_tips: Vec, accumulator_info: AccumulatorInfo, + head_block_id: HashValue, + k_total_difficulties: BTreeSet, ) -> Result<()> { let snapshot = SyncFlexiDagSnapshot { child_hashes: new_tips.clone(), accumulator_info: accumulator_info.clone(), + head_block_id, + k_total_difficulties, }; // for sync if let Some(t) = self.flexi_dag_storage.get_hashes_by_hash(key)? { diff --git a/sync/api/src/lib.rs b/sync/api/src/lib.rs index 5bb31fea85..60f4c869b2 100644 --- a/sync/api/src/lib.rs +++ b/sync/api/src/lib.rs @@ -31,10 +31,7 @@ pub struct PeerNewBlock { impl PeerNewBlock { pub fn new(peer_id: PeerId, new_block: Block) -> Self { - PeerNewBlock { - peer_id, - new_block, - } + PeerNewBlock { peer_id, new_block } } pub fn get_peer_id(&self) -> PeerId { diff --git a/sync/src/block_connector/block_connector_service.rs b/sync/src/block_connector/block_connector_service.rs index 52ea7ed771..7890f58fd9 100644 --- a/sync/src/block_connector/block_connector_service.rs +++ b/sync/src/block_connector/block_connector_service.rs @@ -3,6 +3,7 @@ #[cfg(test)] use super::CheckBlockConnectorHashValue; +use crate::block_connector::write_block_chain::ConnectOk; use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService}; use crate::sync::{CheckSyncEvent, SyncService}; use crate::tasks::{BlockConnectedEvent, BlockConnectedFinishEvent, BlockDiskCheckEvent}; @@ -12,8 +13,8 @@ use anyhow::{format_err, Ok, Result}; use network_api::PeerProvider; use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService}; use starcoin_config::{NodeConfig, G_CRATE_VERSION}; -use starcoin_consensus::BlockDAG; use starcoin_consensus::dag::blockdag::InitDagState; +use starcoin_consensus::BlockDAG; use starcoin_crypto::HashValue; use starcoin_executor::VMMetrics; use starcoin_flexidag::FlexidagService; @@ -22,7 +23,7 @@ use starcoin_network::NetworkServiceRef; use starcoin_service_registry::{ ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, }; -use starcoin_storage::{BlockStore, Storage, flexi_dag}; +use starcoin_storage::{flexi_dag, BlockStore, Storage}; use starcoin_sync_api::PeerNewBlock; use starcoin_txpool::TxPoolService; use starcoin_txpool_api::TxPoolSyncService; @@ -268,7 +269,7 @@ where debug!("try connect mined block: {}", id); match self.chain_service.try_connect(block) { - std::result::Result::Ok(_) => { + std::result::Result::Ok(ConnectOk::DagConnected) => { match self.chain_service.dump_tips(block_header) { std::result::Result::Ok(_) => (), Err(e) => error!("failed to dump tips to dag accumulator: {}", e), @@ -302,15 +303,13 @@ where return; } let peer_id = msg.get_peer_id(); - if let Err(e) = self - .chain_service - .try_connect(msg.get_block().clone()) - { + if let Err(e) = self.chain_service.try_connect(msg.get_block().clone()) { match e.downcast::() { std::result::Result::Ok(connect_error) => { match connect_error { ConnectBlockError::FutureBlock(block) => { - self.chain_service.update_tips(msg.get_block().header().clone())?; + self.chain_service + .update_tips(msg.get_block().header().clone())?; //TODO cache future block if let std::result::Result::Ok(sync_service) = ctx.service_ref::() diff --git a/sync/src/block_connector/test_write_block_chain.rs b/sync/src/block_connector/test_write_block_chain.rs index 39401352c3..73b78a3dfa 100644 --- a/sync/src/block_connector/test_write_block_chain.rs +++ b/sync/src/block_connector/test_write_block_chain.rs @@ -164,9 +164,7 @@ fn gen_fork_block_chain( .unwrap(); parent_id = block.id(); - writeable_block_chain_service - .try_connect(block) - .unwrap(); + writeable_block_chain_service.try_connect(block).unwrap(); } } } diff --git a/sync/src/block_connector/test_write_dag_block_chain.rs b/sync/src/block_connector/test_write_dag_block_chain.rs index 21932ea913..20d3479214 100644 --- a/sync/src/block_connector/test_write_dag_block_chain.rs +++ b/sync/src/block_connector/test_write_dag_block_chain.rs @@ -36,9 +36,7 @@ pub fn gen_dag_blocks( time_service, ); last_block_hash = Some(block.id()); - let e = writeable_block_chain_service.try_connect( - block, - ); + let e = writeable_block_chain_service.try_connect(block); println!("try_connect result: {:?}", e); assert!(e.is_ok()); if (i + 1) % 3 == 0 { @@ -135,9 +133,7 @@ fn gen_fork_dag_block_chain( .unwrap(); parent_id = block.id(); - writeable_block_chain_service - .try_connect(block) - .unwrap(); + writeable_block_chain_service.try_connect(block).unwrap(); } return Some(parent_id); } diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index 697432b491..0095cde74c 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -16,17 +16,19 @@ use starcoin_consensus::dag::types::ghostdata::GhostdagData; use starcoin_consensus::{BlockDAG, FlexiDagStorage, FlexiDagStorageConfig}; use starcoin_crypto::HashValue; use starcoin_executor::VMMetrics; -use starcoin_flexidag::FlexidagService; use starcoin_flexidag::flexidag_service::{AddToDag, DumpTipsToAccumulator, UpdateDagTips}; +use starcoin_flexidag::FlexidagService; use starcoin_logger::prelude::*; use starcoin_service_registry::bus::{Bus, BusService}; use starcoin_service_registry::{ServiceContext, ServiceRef}; +use starcoin_storage::flexi_dag::KTotalDifficulty; use starcoin_storage::storage::CodecKVStore; use starcoin_storage::Store; use starcoin_time_service::{DagBlockTimeWindowService, TimeWindowResult}; use starcoin_txpool_api::TxPoolSyncService; use starcoin_types::block::BlockInfo; use starcoin_types::blockhash::BlockHashMap; +use starcoin_types::dag_block::KTotalDifficulty; use starcoin_types::header::DagHeader; use starcoin_types::{ block::{Block, BlockHeader, ExecutedBlock}, @@ -117,10 +119,11 @@ where .map(|metrics| metrics.chain_block_connect_time.start_timer()); let result = if block.header().parents_hash().is_some() { - assert!(transaction_parent.is_some(), "in dag branch, the transaction parent should not be none"); - self.connect_dag_inner( - block, - ) + assert!( + transaction_parent.is_some(), + "in dag branch, the transaction parent should not be none" + ); + self.connect_dag_inner(block) } else { self.connect_inner(block) }; @@ -245,10 +248,7 @@ where } #[cfg(test)] - pub fn apply_failed( - &mut self, - block: Block, - ) -> Result<()> { + pub fn apply_failed(&mut self, block: Block) -> Result<()> { use anyhow::bail; use starcoin_chain::verifier::FullVerifier; @@ -283,44 +283,26 @@ where let branch_total_difficulty = new_branch.get_total_difficulty()?; if branch_total_difficulty > main_total_difficulty { self.update_startup_info(new_branch.head_block().header())?; - - if self.dag.is_none() { - ctx.broadcast(NewHeadBlock(Arc::new(new_branch.head_block()), None)); - } else { - let dag_parents = self - .dag - .as_ref() - .expect("the dag should not be None") - .lock() - .expect("failed to lock the dag") - .get_parents(new_head_block)?; - ctx.broadcast(NewHeadBlock( - Arc::new(new_branch.head_block()), - Some(dag_parents), - )); - } - + ctx.broadcast(NewHeadBlock(Arc::new(new_branch.head_block()))); Ok(()) } else { bail!("no need to switch"); } } - pub fn select_head( - &mut self, - new_branch: BlockChain, - ) -> Result<()> { + pub fn select_head(&mut self, new_branch: BlockChain) -> Result<()> { let executed_block = new_branch.head_block(); let main_total_difficulty = self.main.get_total_difficulty()?; let branch_total_difficulty = new_branch.get_total_difficulty()?; let parent_is_main_head = self.is_main_head(&executed_block.header().parent_hash()); if branch_total_difficulty > main_total_difficulty { - let (enacted_count, enacted_blocks, retracted_count, retracted_blocks) = if !parent_is_main_head { + let (enacted_count, enacted_blocks, retracted_count, retracted_blocks) = + if !parent_is_main_head { self.find_ancestors_from_accumulator(&new_branch)? } else { (1, vec![executed_block.block.clone()], 0, vec![]) - }; + }; self.main = new_branch; self.do_new_head( @@ -438,7 +420,7 @@ where .storage .get_tips_by_block_id(executed_block.block.header().id()) .ok(); - self.broadcast_new_head(executed_block, dag_block_parents, next_tips); + self.broadcast_new_head(executed_block); Ok(()) } @@ -646,12 +628,7 @@ where Ok(blocks) } - fn broadcast_new_head( - &self, - block: ExecutedBlock, - dag_parents: Option>, - next_tips: Option>, - ) { + fn broadcast_new_head(&self, block: ExecutedBlock) { if let Some(metrics) = self.metrics.as_ref() { metrics .chain_select_head_total @@ -661,34 +638,25 @@ where if let Err(e) = self .bus - .broadcast(NewHeadBlock(Arc::new(block), dag_parents)) + .broadcast(NewHeadBlock(Arc::new(block))) { error!("Broadcast NewHeadBlock error: {:?}", e); } } - fn broadcast_new_branch( - &self, - block: ExecutedBlock, - ) { + fn broadcast_new_branch(&self, block: ExecutedBlock) { if let Some(metrics) = self.metrics.as_ref() { metrics .chain_select_head_total .with_label_values(&["new_branch"]) .inc() } - if let Err(e) = self - .bus - .broadcast(NewBranch(Arc::new(block))) - { + if let Err(e) = self.bus.broadcast(NewBranch(Arc::new(block))) { error!("Broadcast NewBranch error: {:?}", e); } } - fn switch_branch( - &mut self, - block: Block, - ) -> Result { + fn switch_branch(&mut self, block: Block) -> Result { let (block_info, fork) = self.find_or_fork( block.header(), dag_block_next_parent, @@ -727,7 +695,7 @@ where } (None, Some(mut branch)) => { // the block is not in the block, but the parent is - let result = branch.apply(block, None); + let result = branch.apply(block); let executed_block = result?; self.select_head(branch)?; Ok(ConnectOk::ExeConnectBranch(executed_block)) @@ -758,20 +726,14 @@ where self.switch_branch(block) } - fn apply_and_select_head( - &mut self, - block: Block, - ) -> Result { - let executed_block = self.main.apply(block, None)?; + fn apply_and_select_head(&mut self, block: Block) -> Result { + let executed_block = self.main.apply(block)?; let enacted_blocks = vec![executed_block.block().clone()]; self.do_new_head(executed_block.clone(), 1, enacted_blocks, 0, vec![])?; return Ok(ConnectOk::ExeConnectMain(executed_block)); } - fn add_to_dag( - &mut self, - header: &BlockHeader, - ) -> Result> { + fn add_to_dag(&mut self, header: &BlockHeader) -> Result> { let dag = self.dag.as_mut().expect("dag must be inited before using"); match dag .lock() @@ -787,13 +749,10 @@ where } } - fn connect_dag_inner( - &mut self, - block: Block, - ) -> Result { + fn connect_dag_inner(&mut self, block: Block) -> Result { let add_dag_result = async_std::task::block_on(self.flexidag_service.send(AddToDag { block_header: block.header().clone(), - }))??; + }))??; let selected_parent = self .storage .get_block_by_hash(add_dag_result.selected_parent)? @@ -802,7 +761,7 @@ where let mut transaction_parent = chain.status().head().id().clone(); for blue_hash in add_dag_result.mergeset_blues.mergeset_blues.iter() { if let Some(blue_block) = self.storage.get_block(blue_hash.to_owned())? { - match chain.apply(blue_block, Some(transaction_parent)) { + match chain.apply(blue_block) { Ok(executed_block) => transaction_parent = executed_block, Err(_) => warn!("failed to connect dag block: {:?}", e), } @@ -813,12 +772,22 @@ where } // select new head and update startup info(main but dag main) self.select_head_for_dag(chain)?; - Ok(ConnectOk::DagConnected) + Ok(ConnectOk::DagConnected(KTotalDifficulty { + head_block_id: self.main.status().head().id(), + total_difficulty: self.main.status().info().get_total_difficulty(), + })) } fn select_head_for_dag(&self, new_chain: BlockChain) -> Result<()> { - - + if new_chain.status().info.get_total_difficulty() + > self.main.status().info.get_total_difficulty() + { + let new_head_block = new_chain.head_block(); + self.update_startup_info(new_head_block.header())?; + self.main = new_chain; + self.broadcast_new_head(new_head_block); + } + Ok(()) } @@ -826,9 +795,13 @@ where if block_header.number() < self.storage.dag_fork_height(self.config.net().id().clone()) { Ok(()) } else { - self.flexidag_service.send( DumpTipsToAccumulator { + self.flexidag_service.send(DumpTipsToAccumulator { block_header, current_head_block_id: self.main.status().head().id().clone(), + k_total_difficulty: KTotalDifficulty { + head_block_id: self.main.status().info().id(), + total_difficulty: self.main.status().info().get_total_difficulty(), + }, }) } } @@ -838,6 +811,10 @@ where self.flexidag_service.send(UpdateDagTips { block_header, current_head_block_id: self.main.status().head().id().clone(), + k_total_difficulty: KTotalDifficulty { + head_block_id: self.main.status().head().id().clone(), + total_difficulty: self.main.status().info().get_total_difficulty(), + }, }) } else { Ok(()) // nothing to do @@ -860,7 +837,7 @@ where // let mut next_tips = Some(vec![]); let executed_block = self.connect_to_main(block)?.clone(); if let Some(block) = executed_block.block() { - self.broadcast_new_head(block.clone(), None, None); + self.broadcast_new_head(block.clone()); } return Ok(executed_block); } diff --git a/sync/src/sync.rs b/sync/src/sync.rs index 6d8622b1f0..b6bd70bbda 100644 --- a/sync/src/sync.rs +++ b/sync/src/sync.rs @@ -5,7 +5,7 @@ use crate::block_connector::BlockConnectorService; use crate::sync_metrics::SyncMetrics; use crate::tasks::{full_sync_task, sync_dag_full_task, AncestorEvent, SyncFetcher}; use crate::verified_rpc_client::{RpcVerifyError, VerifiedRpcClient}; -use anyhow::{format_err, Result, Ok}; +use anyhow::{format_err, Ok, Result}; use futures::executor::block_on; use futures::FutureExt; use futures_timer::Delay; @@ -17,8 +17,8 @@ use starcoin_chain_api::{ChainReader, ChainWriter}; use starcoin_config::NodeConfig; use starcoin_consensus::BlockDAG; use starcoin_executor::VMMetrics; -use starcoin_flexidag::{FlexidagService, flexidag_service}; use starcoin_flexidag::flexidag_service::GetDagAccumulatorInfo; +use starcoin_flexidag::{flexidag_service, FlexidagService}; use starcoin_logger::prelude::*; use starcoin_network::NetworkServiceRef; use starcoin_network::PeerEvent; @@ -100,23 +100,19 @@ impl SyncService { // let genesis = storage // .get_genesis()? // .ok_or_else(|| format_err!("Can not find genesis hash in storage."))?; - let dag_accumulator_info = - match storage.get_dag_accumulator_info()? { - Some(info) => Some(info), - None => { - warn!( - "Can not find dag accumulator info by head block id: {}, use genesis info.", - head_block_info.block_id(), - ); - None - } - }; + let dag_accumulator_info = match storage.get_dag_accumulator_info()? { + Some(info) => Some(info), + None => { + warn!( + "Can not find dag accumulator info by head block id: {}, use genesis info.", + head_block_info.block_id(), + ); + None + } + }; Ok(Self { sync_status: SyncStatus::new( - ChainStatus::new( - head_block.header.clone(), - head_block_info, - ), + ChainStatus::new(head_block.header.clone(), head_block_info), dag_accumulator_info, ), stage: SyncStage::NotStart, @@ -266,41 +262,43 @@ impl SyncService { network.clone(), )); - let op_local_dag_accumulator_info = self.flexidag_service.send(GetDagAccumulatorInfo).await??; + let op_local_dag_accumulator_info = + self.flexidag_service.send(GetDagAccumulatorInfo).await??; if let Some(local_dag_accumulator_info) = op_local_dag_accumulator_info { - let dag_sync_futs = rpc_client.get_dag_targets()?.into_iter().fold(Ok(vec![]), |mut futs, target_accumulator_infos| { - let (fut, task_handle, task_event_handle) = sync_dag_full_task( - local_dag_accumulator_info, - target_accumulator_info, - rpc_client.clone(), - dag_accumulator_store, - dag_accumulator_snapshot, - storage.clone(), - config.net().time_service(), - vm_metrics.clone(), - connector_service.clone(), - network.clone(), - skip_pow_verify, - dag.clone(), - block_chain_service.clone(), - config.net().id().clone(), - )?; - self_ref.notify(SyncBeginEvent { - target, - task_handle, - task_event_handle, - peer_selector, - })?; - if let Some(sync_task_total) = sync_task_total.as_ref() { - sync_task_total.with_label_values(&["start"]).inc(); - } - futs.and_then(|v| { - v.push(fut) - }) - })?.into_iter().fold(Ok(vec![]), |chain, fut| { - Ok(vec![fut.await?]) - })?; + let dag_sync_futs = rpc_client + .get_dag_targets()? + .into_iter() + .fold(Ok(vec![]), |mut futs, target_accumulator_infos| { + let (fut, task_handle, task_event_handle) = sync_dag_full_task( + local_dag_accumulator_info, + target_accumulator_info, + rpc_client.clone(), + dag_accumulator_store, + dag_accumulator_snapshot, + storage.clone(), + config.net().time_service(), + vm_metrics.clone(), + connector_service.clone(), + network.clone(), + skip_pow_verify, + dag.clone(), + block_chain_service.clone(), + config.net().id().clone(), + )?; + self_ref.notify(SyncBeginEvent { + target, + task_handle, + task_event_handle, + peer_selector, + })?; + if let Some(sync_task_total) = sync_task_total.as_ref() { + sync_task_total.with_label_values(&["start"]).inc(); + } + futs.and_then(|v| v.push(fut)) + })? + .into_iter() + .fold(Ok(vec![]), |chain, fut| Ok(vec![fut.await?]))?; assert!(dag_sync_futs.len() <= 1); if dag_sync_futs.len() == 1 { Ok(Some(dag_sync_futs[0])) diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index a1f95580a8..81469760d6 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -8,12 +8,14 @@ use futures::future::BoxFuture; use futures::FutureExt; use network_api::PeerId; use network_api::PeerProvider; +use starcoin_accumulator::accumulator_info::AccumulatorInfo; use starcoin_accumulator::{Accumulator, MerkleAccumulator}; use starcoin_chain::{verifier::BasicVerifier, BlockChain}; use starcoin_chain_api::{ChainReader, ChainWriter, ConnectBlockError, ExecutedBlock}; -use starcoin_config::{G_CRATE_VERSION, Connect}; +use starcoin_config::{Connect, G_CRATE_VERSION}; use starcoin_consensus::BlockDAG; use starcoin_crypto::HashValue; +use starcoin_flexidag::flexidag_service::{AddToDag, GetDagTips, ForkDagAccumulator}; use starcoin_flexidag::FlexidagService; use starcoin_logger::prelude::*; use starcoin_service_registry::ServiceRef; @@ -32,10 +34,13 @@ pub struct SyncBlockData { pub(crate) block: Block, pub(crate) info: Option, pub(crate) peer_id: Option, - pub(crate) accumulator_root: Option, // the block belongs to this accumulator leaf - pub(crate) count_in_leaf: u64, // the number of the block in the accumulator leaf + pub(crate) accumulator_root: Option, // the block belongs to this dag accumulator leaf + pub(crate) count_in_leaf: u64, // the count of the block in the dag accumulator leaf + pub(crate) dag_accumulator_index: Option, // the index of the accumulator leaf which the block belogs to } + + impl SyncBlockData { pub fn new( block: Block, @@ -43,8 +48,7 @@ impl SyncBlockData { peer_id: Option, accumulator_root: Option, count_in_leaf: u64, - dag_block_headers: Option>, - dag_transaction_header: Option, + dag_acccumulator_index: Option, ) -> Self { Self { block, @@ -52,30 +56,15 @@ impl SyncBlockData { peer_id, accumulator_root, count_in_leaf, + dag_accumulator_index, } } } #[allow(clippy::from_over_into)] -impl - Into<( - Block, - Option, - Option, - )> for SyncBlockData -{ - fn into( - self, - ) -> ( - Block, - Option, - Option, - ) { - ( - self.block, - self.info, - self.peer_id, - ) +impl Into<(Block, Option, Option)> for SyncBlockData { + fn into(self) -> (Block, Option, Option) { + (self.block, self.info, self.peer_id) } } @@ -163,7 +152,7 @@ impl TaskState for BlockSyncTask { .fold(result_map, |mut result_map, (block, peer_id, _, _)| { result_map.insert( block.id(), - SyncBlockData::new(block, None, peer_id, None, 1, None, None), + SyncBlockData::new(block, None, peer_id, None, 1, None), ); result_map }) @@ -185,7 +174,7 @@ impl TaskState for BlockSyncTask { .await? .into_iter() .map(|(block, peer_id, _, _)| { - SyncBlockData::new(block, None, peer_id, None, 1, None, None) + SyncBlockData::new(block, None, peer_id, None, 1, None) }) .collect()) } @@ -221,7 +210,7 @@ impl TaskState for BlockSyncTask { pub struct BlockCollector { //node's current block info current_block_info: BlockInfo, - target: Option, + target: Option, // single chain use only // the block chain init by ancestor chain: BlockChain, event_handle: H, @@ -231,6 +220,7 @@ pub struct BlockCollector { dag_block_pool: Vec, target_accumulator_root: HashValue, flexidag_service: ServiceRef, + new_dag_accumulator_info: Option, } impl BlockCollector @@ -264,9 +254,14 @@ where dag_block_pool: Vec::new(), target_accumulator_root, flexidag_service, + new_dag_accumulator_info: None, } } + pub fn check_if_became_dag(&self) -> Result { + Ok(async_std::task::block_on(self.flexidag_service.send(GetDagTips))??.is_some()) + } + #[cfg(test)] pub fn apply_block_for_test( &mut self, @@ -283,7 +278,6 @@ where block_info: BlockInfo, action: BlockConnectAction, state: CollectorState, - dag_parents: Option>, ) -> Result { let total_difficulty = block_info.get_total_difficulty(); @@ -309,7 +303,6 @@ where // second, construct the block connect event. let block_connect_event = BlockConnectedEvent { block, - dag_parents, feedback: sender, action, }; @@ -342,11 +335,7 @@ where Ok(state) } - fn apply_block( - &mut self, - block: Block, - peer_id: Option, - ) -> Result<()> { + fn apply_block(&mut self, block: Block, peer_id: Option) -> Result<()> { if let Some((_failed_block, pre_peer_id, err, version)) = self .chain .get_storage() @@ -414,7 +403,8 @@ where fn broadcast_dag_chain_block( &mut self, - broadcast_blocks: Vec<(Block, BlockInfo, Option>, BlockConnectAction)>, + broadcast_blocks: Vec<(Block, BlockInfo, BlockConnectAction)>, + start_index: u64, ) -> Result { let state = if self.last_accumulator_root == self.target_accumulator_root { CollectorState::Enough @@ -422,29 +412,11 @@ where CollectorState::Need }; - let last_index = broadcast_blocks.len() - 1; - broadcast_blocks.into_iter().enumerate().for_each( - |(index, (block, block_info, dag_parents, action))| { - if last_index == index && state == CollectorState::Enough { - let _ = self.notify_connected_block( - block, - block_info, - action, - CollectorState::Enough, - dag_parents, - ); - } else { - let _ = self.notify_connected_block( - block, - block_info, - action, - CollectorState::Need, - dag_parents, - ); - } - }, - ); - + self.new_dag_accumulator_info = Some(async_std::task::block_on(self.flexidag_service.send(ForkDagAccumulator { + new_blocks: broadcast_blocks.into_iter().map(|(block, _, _)| block.id()).collect(), + dag_accumulator_index: start_index, + block_header_id: self.chain.head_block().id(), + }))??); return Ok(state); } @@ -481,20 +453,24 @@ where Ok(CollectorState::Need) }; - self.notify_connected_block(block, block_info, action, state?, None) + let result = self.notify_connected_block(block, block_info, action, state?); + match result { + Ok(state) => {} + Err(e) => { + error!("notify connected block error: {:?}", e); + Err(e) + } + } } - fn collect_dag_item( - &mut self, - item: SyncBlockData, - ) -> Result<()> { + fn collect_dag_item(&mut self, item: SyncBlockData) -> Result<()> { let (block, block_info, peer_id) = item.into(); let block_id = block.id(); let timestamp = block.header().timestamp(); let add_dag_result = async_std::task::block_on(self.flexidag_service.send(AddToDag { block_header: block.header().clone(), - }))??; + }))??; let selected_parent = self .storage .get_block_by_hash(add_dag_result.selected_parent)? @@ -528,85 +504,24 @@ where return match block_info { Some(block_info) => { - //If block_info exists, it means that this block was already executed and try connect in the previous sync, but the sync task was interrupted. + //If block_info exists, it means that this block was already executed and + // try connect in the previous sync, but the sync task was interrupted. //So, we just need to update chain and continue self.chain.connect(ExecutedBlock { block: block.clone(), block_info: block_info.clone(), })?; let block_info = self.chain.status().info; - Ok(( - block, - block_info, - BlockConnectAction::ConnectExecutedBlock, - )) + Ok((block, block_info, BlockConnectAction::ConnectExecutedBlock)) } None => { self.apply_block(block.clone(), peer_id)?; self.chain.time_service().adjust(timestamp); let block_info = self.chain.status().info; - Ok(( - block, - block_info, - BlockConnectAction::ConnectNewBlock, - )) + Ok((block, block_info, BlockConnectAction::ConnectNewBlock)) } }; } - - // fn process_received_block(&self, item: SyncBlockData, next_tips: &mut Option>) -> Result { - - // let (block, block_info, parent_hash, action) = self.collect_item(item, next_tips)?; - // ///////// - // // let (block, block_info, peer_id) = item.into(); - // // let timestamp = block.header().timestamp(); - // // let (block_info, action) = match block_info { - // // Some(block_info) => { - // // //If block_info exists, it means that this block was already executed and try connect in the previous sync, but the sync task was interrupted. - // // //So, we just need to update chain and continue - // // self.chain.connect(ExecutedBlock { - // // block: block.clone(), - // // block_info: block_info.clone(), - // // })?; - // // (block_info, BlockConnectAction::ConnectExecutedBlock) - // // } - // // None => { - // // self.apply_block(block.clone(), peer_id)?; - // // self.chain.time_service().adjust(timestamp); - // // ( - // // self.chain.status().info, - // // BlockConnectAction::ConnectNewBlock, - // // ) - // // } - // // }; - - // //verify target - // let state: Result = - // if block_info.block_accumulator_info.num_leaves - // == self.target.block_info.block_accumulator_info.num_leaves - // { - // if block_info != self.target.block_info { - // Err(TaskError::BreakError( - // RpcVerifyError::new_with_peers( - // self.target.peers.clone(), - // format!( - // "Verify target error, expect target: {:?}, collect target block_info:{:?}", - // self.target.block_info, - // block_info - // ), - // ) - // .into(), - // ) - // .into()) - // } else { - // Ok(CollectorState::Enough) - // } - // } else { - // Ok(CollectorState::Need) - // }; - - // self.notify_connected_block(block, block_info, action, state?, parent_hash) - // } } impl TaskResultCollector for BlockCollector @@ -627,14 +542,6 @@ where return Ok(CollectorState::Need); } else { process_block_pool = std::mem::take(&mut self.dag_block_pool); - - self.chain.status().tips_hash = Some( - process_block_pool - .iter() - .clone() - .map(|item| item.block.header().id()) - .collect(), - ); } } else { // it is a single chain @@ -664,11 +571,16 @@ where ); let (block, block_info, _, action) = block_to_broadcast.pop().unwrap(); // self.check_if_sync_complete(block_info) - self.broadcast_single_chain_block(block, block_info, action) - } - None => { - self.broadcast_dag_chain_block(block_to_broadcast) + match self.broadcast_single_chain_block(block, block_info, action) { + Ok(_) => { + if self.check_if_became_dag()? { + Ok(CollectorState::Enough) + } + } + Err(e) => Err(e), + } } + None => self.broadcast_dag_chain_block(block_to_broadcast, item.dag_accumulator_index), } } diff --git a/sync/src/tasks/mod.rs b/sync/src/tasks/mod.rs index cf92b552fb..36fea6074b 100644 --- a/sync/src/tasks/mod.rs +++ b/sync/src/tasks/mod.rs @@ -40,9 +40,12 @@ use stream_task::{ pub trait SyncFetcher: PeerOperator + BlockIdFetcher + BlockFetcher + BlockInfoFetcher { fn get_dag_targets(&self) -> Result> { - Ok(self.peer_selector().peer_infos().into_iter().map(|peer_info| { - peer_info.chain_info().dag_accumulator_info().clone() - }).collect()); + Ok(self + .peer_selector() + .peer_infos() + .into_iter() + .map(|peer_info| peer_info.chain_info().dag_accumulator_info().clone()) + .collect()); } fn get_best_target( @@ -304,14 +307,7 @@ pub trait BlockFetcher: Send + Sync { fn fetch_blocks( &self, block_ids: Vec, - ) -> BoxFuture< - Result< - Vec<( - Block, - Option, - )>, - >, - >; + ) -> BoxFuture)>>>; } impl BlockFetcher for Arc @@ -321,15 +317,7 @@ where fn fetch_blocks( &self, block_ids: Vec, - ) -> BoxFuture< - '_, - Result< - Vec<( - Block, - Option, - )>, - >, - > { + ) -> BoxFuture<'_, Result)>>> { BlockFetcher::fetch_blocks(self.as_ref(), block_ids) } } @@ -338,15 +326,7 @@ impl BlockFetcher for VerifiedRpcClient { fn fetch_blocks( &self, block_ids: Vec, - ) -> BoxFuture< - '_, - Result< - Vec<( - Block, - Option, - )>, - >, - > { + ) -> BoxFuture<'_, Result)>>> { self.get_blocks(block_ids.clone()) .and_then(|blocks| async move { let results = block_ids @@ -422,7 +402,7 @@ impl BlockLocalStore for Arc { let block_info = self.get_block_info(id)?; Ok(Some(SyncBlockData::new( - block, block_info, None, None, 1, None, None, + block, block_info, None, None, 1, None, ))) } None => Ok(None), @@ -440,7 +420,6 @@ pub enum BlockConnectAction { #[derive(Clone, Debug)] pub struct BlockConnectedEvent { pub block: Block, - pub dag_parents: Option>, pub feedback: Option>, pub action: BlockConnectAction, } diff --git a/sync/src/tasks/sync_dag_block_task.rs b/sync/src/tasks/sync_dag_block_task.rs index 4e31565e0f..2428898926 100644 --- a/sync/src/tasks/sync_dag_block_task.rs +++ b/sync/src/tasks/sync_dag_block_task.rs @@ -66,8 +66,6 @@ impl SyncDagBlockTask { block_id: block_id.clone(), block: None, peer_id: None, - dag_parents: vec![], - dag_transaction_header: None, }); }); @@ -76,15 +74,7 @@ impl SyncDagBlockTask { .fetch_blocks(absent_block) .await? .iter() - .map(|(block, peer_info)| { - ( - block.header().id(), - ( - block.clone(), - peer_info.clone(), - ), - ) - }) + .map(|(block, peer_info)| (block.header().id(), (block.clone(), peer_info.clone()))) .collect::>(); // should return the block in order @@ -116,6 +106,7 @@ impl SyncDagBlockTask { peer_id: item.peer_id, accumulator_root: Some(snapshot.accumulator_info.get_accumulator_root().clone()), count_in_leaf: snapshot.child_hashes.len() as u64, + dag_accumulator_index: Some(index), }) .collect()) } diff --git a/sync/src/tasks/tests.rs b/sync/src/tasks/tests.rs index e0382bad68..88895a69c4 100644 --- a/sync/src/tasks/tests.rs +++ b/sync/src/tasks/tests.rs @@ -766,7 +766,7 @@ impl MockLocalBlockStore { ); self.store.lock().unwrap().insert( block.id(), - SyncBlockData::new(block.clone(), Some(block_info), None, None, 1, None, None), + SyncBlockData::new(block.clone(), Some(block_info), None, None, 1, None), ); } } diff --git a/sync/src/verified_rpc_client.rs b/sync/src/verified_rpc_client.rs index 7272f12e7b..265fdba26c 100644 --- a/sync/src/verified_rpc_client.rs +++ b/sync/src/verified_rpc_client.rs @@ -381,14 +381,7 @@ impl VerifiedRpcClient { pub async fn get_blocks( &self, ids: Vec, - ) -> Result< - Vec< - Option<( - Block, - Option, - )>, - >, - > { + ) -> Result)>>> { let peer_id = self.select_a_peer()?; let start_time = Instant::now(); let blocks = self.client.get_blocks(peer_id.clone(), ids.clone()).await?; diff --git a/test-helper/src/network.rs b/test-helper/src/network.rs index 42320fb360..c295f01dd8 100644 --- a/test-helper/src/network.rs +++ b/test-helper/src/network.rs @@ -196,7 +196,7 @@ impl ServiceFactory for MockNetworkServiceFactory { let dag_tips = storage.get_tips_by_block_id(head_block_hash)?; let chain_status = ChainStatus::new(head_block_header.clone(), head_block_info, Some(dag_tips)); - let dag_accumulator_info = storage.get_dag_accumulator_info(head_block_hash)?; + let dag_accumulator_info = storage.get_dag_accumulator_info()?; let chain_state_info = ChainInfo::new( config.net().chain_id(), genesis_hash, diff --git a/types/src/block.rs b/types/src/block.rs index 53301e93c9..a8cbb0bf33 100644 --- a/types/src/block.rs +++ b/types/src/block.rs @@ -51,8 +51,8 @@ impl std::fmt::Display for BlockHeaderExtra { impl<'de> Deserialize<'de> for BlockHeaderExtra { fn deserialize(deserializer: D) -> std::result::Result - where - D: Deserializer<'de>, + where + D: Deserializer<'de>, { if deserializer.is_human_readable() { let s = ::deserialize(deserializer)?; @@ -79,8 +79,8 @@ impl<'de> Deserialize<'de> for BlockHeaderExtra { impl Serialize for BlockHeaderExtra { fn serialize(&self, serializer: S) -> std::result::Result - where - S: Serializer, + where + S: Serializer, { if serializer.is_human_readable() { format!("0x{}", hex::encode(self.0)).serialize(serializer) @@ -91,7 +91,7 @@ impl Serialize for BlockHeaderExtra { } #[derive( -Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, JsonSchema, + Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, JsonSchema, )] pub struct BlockIdAndNumber { pub id: HashValue, @@ -159,7 +159,6 @@ pub struct BlockHeader { /// Parents hash. #[serde(skip_serializing_if = "Option::is_none")] parents_hash: ParentsHash, - } impl BlockHeader { @@ -371,8 +370,8 @@ impl BlockHeader { impl<'de> Deserialize<'de> for BlockHeader { fn deserialize(deserializer: D) -> Result>::Error> - where - D: Deserializer<'de>, + where + D: Deserializer<'de>, { #[derive(Deserialize)] #[serde(rename = "BlockHeader")] @@ -391,7 +390,7 @@ impl<'de> Deserialize<'de> for BlockHeader { chain_id: ChainId, nonce: u32, extra: BlockHeaderExtra, - parents_hash:ParentsHash, + parents_hash: ParentsHash, } let header_data = BlockHeaderData::deserialize(deserializer)?; @@ -410,7 +409,7 @@ impl<'de> Deserialize<'de> for BlockHeader { header_data.chain_id, header_data.nonce, header_data.extra, - header_data.parents_hash + header_data.parents_hash, ); Ok(block_header) } @@ -507,7 +506,7 @@ pub struct RawBlockHeader { /// The chain id pub chain_id: ChainId, /// parents hash - pub parents_hash:ParentsHash, + pub parents_hash: ParentsHash, } #[derive(Default)] @@ -610,7 +609,7 @@ impl BlockHeaderBuilder { } #[derive( -Default, Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize, CryptoHasher, CryptoHash, + Default, Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize, CryptoHasher, CryptoHash, )] pub struct BlockBody { /// The transactions in this block. @@ -680,8 +679,8 @@ pub struct Block { impl Block { pub fn new(header: BlockHeader, body: B) -> Self - where - B: Into, + where + B: Into, { Block { header, @@ -797,7 +796,7 @@ impl Sample for Block { /// `BlockInfo` is the object we store in the storage. It consists of the /// block as well as the execution result of this block. #[derive( -Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize, CryptoHasher, CryptoHash, JsonSchema, + Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize, CryptoHasher, CryptoHash, JsonSchema, )] pub struct BlockInfo { /// Block id @@ -1014,7 +1013,7 @@ impl BlockTemplate { body_hash: self.body_hash, difficulty: self.difficulty, chain_id: self.chain_id, - parents_hash: self.parents_hash.clone() + parents_hash: self.parents_hash.clone(), } } @@ -1061,7 +1060,7 @@ impl BlockTemplate { self.chain_id, nonce, extra, - self.parents_hash + self.parents_hash, ) } } @@ -1074,10 +1073,7 @@ pub struct ExecutedBlock { impl ExecutedBlock { pub fn new(block: Block, block_info: BlockInfo) -> Self { - ExecutedBlock { - block, - block_info, - } + ExecutedBlock { block, block_info } } pub fn total_difficulty(&self) -> U256 { diff --git a/types/src/dag_block.rs b/types/src/dag_block.rs index bc089a92e5..672b728850 100644 --- a/types/src/dag_block.rs +++ b/types/src/dag_block.rs @@ -945,3 +945,20 @@ impl EpochUncleSummary { } } } +#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)] +pub struct KTotalDifficulty { + pub head_block_id: HashValue, + pub total_difficulty: U256, +} + +impl Ord for KTotalDifficulty { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.total_difficulty.cmp(&other.total_difficulty) + } +} + +impl PartialOrd for KTotalDifficulty { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} diff --git a/types/src/header.rs b/types/src/header.rs index 644f1bb064..a93ddcde36 100644 --- a/types/src/header.rs +++ b/types/src/header.rs @@ -21,7 +21,9 @@ pub struct DagHeader { impl DagHeader { pub fn new(block_header: BlockHeader) -> Self { Self { - parents_hash: block_header.parents_hash().expect("dag block must have parents hash"), + parents_hash: block_header + .parents_hash() + .expect("dag block must have parents hash"), block_header, } } diff --git a/types/src/startup_info.rs b/types/src/startup_info.rs index fc731a580f..4e08226411 100644 --- a/types/src/startup_info.rs +++ b/types/src/startup_info.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::block::{BlockHeader, BlockInfo, BlockNumber}; +use crate::dag_block::KTotalDifficulty; use anyhow::Result; use bcs_ext::{BCSCodec, Sample}; use schemars::JsonSchema; @@ -11,6 +12,7 @@ use starcoin_accumulator::MerkleAccumulator; use starcoin_crypto::HashValue; use starcoin_uint::U256; use starcoin_vm_types::genesis_config::ChainId; +use std::collections::BTreeSet; use std::convert::{TryFrom, TryInto}; use std::fmt; use std::fmt::Formatter; @@ -23,6 +25,7 @@ pub struct ChainInfo { genesis_hash: HashValue, status: ChainStatus, flexi_dag_accumulator_info: Option, + k_total_difficulties: Option>, } impl ChainInfo { @@ -31,12 +34,14 @@ impl ChainInfo { genesis_hash: HashValue, status: ChainStatus, flexi_dag_accumulator_info: Option, + k_total_difficulties: Option>, ) -> Self { Self { chain_id, genesis_hash, status, flexi_dag_accumulator_info, + k_total_difficulties, } } @@ -75,6 +80,10 @@ impl ChainInfo { self.status.info.get_total_difficulty() } + pub fn k_total_difficulties(&self) -> &Option> { + &self.k_total_difficulties + } + pub fn into_inner(self) -> (ChainId, HashValue, ChainStatus) { (self.chain_id, self.genesis_hash, self.status) } @@ -90,6 +99,7 @@ impl ChainInfo { rand::random::(), rand::random::(), )), + k_total_difficulties: Some(BTreeSet::new()), } } } @@ -101,6 +111,7 @@ impl std::default::Default for ChainInfo { genesis_hash: HashValue::default(), status: ChainStatus::sample(), flexi_dag_accumulator_info: Some(AccumulatorInfo::default()), + k_total_difficulties: Some(BTreeSet::new()), } } } @@ -126,10 +137,7 @@ pub struct ChainStatus { impl ChainStatus { pub fn new(head: BlockHeader, info: BlockInfo) -> Self { - Self { - head, - info, - } + Self { head, info } } pub fn random() -> Self { @@ -219,7 +227,7 @@ pub struct StartupInfo { pub main: HashValue, /// dag accumulator info hash - pub dag_main: Option + pub dag_main: Option, } impl fmt::Display for StartupInfo { @@ -233,17 +241,14 @@ impl fmt::Display for StartupInfo { impl StartupInfo { pub fn new(main: HashValue) -> Self { - Self { + Self { main, dag_main: None, - } + } } pub fn new_with_dag(main: HashValue, dag_main: Option) -> Self { - Self { - main, - dag_main, - } + Self { main, dag_main } } pub fn update_main(&mut self, new_head: HashValue) { @@ -261,8 +266,6 @@ impl StartupInfo { pub fn get_dag_main(&self) -> Option { self.dag_main } - - } impl TryFrom> for StartupInfo { diff --git a/types/src/system_events.rs b/types/src/system_events.rs index 6221778ab4..0fdc0c8899 100644 --- a/types/src/system_events.rs +++ b/types/src/system_events.rs @@ -10,7 +10,7 @@ use starcoin_crypto::HashValue; use starcoin_vm_types::genesis_config::ConsensusStrategy; use std::sync::Arc; #[derive(Clone, Debug)] -pub struct NewHeadBlock(pub Arc, pub Option>); +pub struct NewHeadBlock(pub Arc); /// may be uncle block #[derive(Clone, Debug)]