Skip to content

Commit

Permalink
add finish sync and select the dag accumulator (#3989)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzhhuang authored Nov 10, 2023
1 parent 5a44f52 commit 9287b51
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 25 deletions.
9 changes: 5 additions & 4 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,15 @@ impl BlockChain {

impl ChainReader for BlockChain {
fn info(&self) -> ChainInfo {
let (dag_accumulator, k_total_difficulties) = self.storage.get_lastest_snapshot()?.map(|snapshot| {
(Some(snapshot.accumulator_info), Some(snapshot.k_total_difficulties))
}).unwrap_or((None, None));
ChainInfo::new(
self.status.head.header().chain_id(),
self.genesis_hash,
self.status.status.clone(),
self.storage.get_dag_accumulator_info().expect(&format!(
"the dag accumulator info cannot be found by id: {}",
self.status.head.header().id()
)),
dag_accumulator,
k_total_difficulties,
)
}

Expand Down
1 change: 1 addition & 0 deletions cmd/peer-watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub fn build_lighting_network(
chain_info.genesis_hash(),
chain_info.status().clone(),
chain_info.dag_accumulator_info().clone(),
chain_info.k_total_difficulties().clone(),
);
build_network_worker(
network_config,
Expand Down
39 changes: 34 additions & 5 deletions flexidag/src/flexidag_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use anyhow::{anyhow, bail, Error, Ok, Result};
use starcoin_accumulator::{accumulator_info::AccumulatorInfo, Accumulator, MerkleAccumulator};
use starcoin_accumulator::{accumulator_info::AccumulatorInfo, Accumulator, MerkleAccumulator, node::AccumulatorStoreType};
use starcoin_config::{NodeConfig, TimeService};
use starcoin_consensus::{dag::types::ghostdata::GhostdagData, BlockDAG};
use starcoin_crypto::HashValue;
Expand All @@ -14,7 +14,7 @@ use starcoin_service_registry::{
use starcoin_storage::{
flexi_dag::{KTotalDifficulty, SyncFlexiDagSnapshot, SyncFlexiDagSnapshotHasher},
storage::CodecKVStore,
BlockStore, Storage, SyncFlexiDagStore, block_info::BlockInfoStore,
BlockStore, Storage, SyncFlexiDagStore, block_info::BlockInfoStore, Store,
};
use starcoin_types::{block::BlockHeader, header::DagHeader, startup_info};

Expand Down Expand Up @@ -127,6 +127,15 @@ impl ServiceRequest for ForkDagAccumulator {
type Response = anyhow::Result<AccumulatorInfo>;
}

#[derive(Debug, Clone)]
pub struct FinishSync {
pub dag_accumulator_info: AccumulatorInfo,
}

impl ServiceRequest for FinishSync {
type Response = anyhow::Result<()>;
}

pub struct TipInfo {
tips: Option<Vec<HashValue>>, // some is for dag or the state of the chain is still in old version
k_total_difficulties: BTreeSet<KTotalDifficulty>,
Expand Down Expand Up @@ -536,8 +545,28 @@ impl ServiceHandler<Self, ForkDagAccumulator> for FlexidagService {
} else {
self.merge_from_small_dag(msg)
}


// append the ForkDagAccumulator.new_blocks and the fetched blocks above into the forked dag accumulator
}
}

impl ServiceHandler<Self, FinishSync> for FlexidagService {
fn handle(
&mut self,
msg: FinishSync,
_ctx: &mut ServiceContext<FlexidagService>,
) -> Result<()> {
let dag_accumulator = self.dag_accumulator.ok_or_else(|| anyhow!("the dag_accumulator is none when sync finish"))?;
let local_info = dag_accumulator.get_info();
if msg.dag_accumulator_info.get_num_leaves() < local_info.get_num_leaves() {
let mut new_dag_accumulator = MerkleAccumulator::new_with_info(msg.dag_accumulator_info, self.storage.get_accumulator_store(AccumulatorStoreType::SyncDag));
for index in msg.dag_accumulator_info.get_num_leaves()..local_info.get_num_leaves() {
let key = dag_accumulator.get_leaf(index)?.ok_or_else(|| anyhow!("the dag_accumulator leaf is none when sync finish"))?;
new_dag_accumulator.append(&[key])?;
}
self.dag_accumulator = Some(new_dag_accumulator);
Ok(())
} else {
self.dag_accumulator = Some(MerkleAccumulator::new_with_info(msg.dag_accumulator_info, self.storage.get_accumulator_store(AccumulatorStoreType::SyncDag)));
Ok(())
}
}
}
4 changes: 4 additions & 0 deletions network/api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn test_peer_selector() {
HashValue::zero(),
mock_chain_status(100.into()),
None,
None,
),
vec![],
vec![],
Expand All @@ -53,6 +54,7 @@ fn test_peer_selector() {
HashValue::zero(),
mock_chain_status(99.into()),
None,
None,
),
vec![],
vec![],
Expand All @@ -65,6 +67,7 @@ fn test_peer_selector() {
HashValue::zero(),
mock_chain_status(100.into()),
None,
None,
),
vec![],
vec![],
Expand All @@ -77,6 +80,7 @@ fn test_peer_selector() {
HashValue::zero(),
mock_chain_status(1.into()),
None,
None,
),
vec![],
vec![],
Expand Down
1 change: 1 addition & 0 deletions network/tests/network_service_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fn build_test_network_services(num: usize) -> Vec<NetworkComponent> {
HashValue::random(),
ChainStatus::random(),
None,
None,
);
for _index in 0..num {
let mut boot_nodes = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion rpc/server/src/module/chain_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ where
let fut = async move {
let chain_status = service.main_status().await?;
//TODO get chain info from chain service.
Ok(ChainInfo::new(chain_id, genesis_hash, chain_status, None).into())
Ok(ChainInfo::new(chain_id, genesis_hash, chain_status, None, None).into())
};
Box::pin(fut.boxed().map_err(map_err))
}
Expand Down
17 changes: 13 additions & 4 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ use crate::{
transaction::TransactionStorage,
transaction_info::{TransactionInfoHashStorage, TransactionInfoStorage},
};
use anyhow::{bail, format_err, Error, Ok, Result};
use anyhow::{anyhow, bail, format_err, Error, Ok, Result};
use flexi_dag::{SyncFlexiDagSnapshot, SyncFlexiDagSnapshotStorage, SyncFlexiDagStorage};
use network_p2p_types::peer_id::PeerId;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use once_cell::sync::Lazy;
use starcoin_accumulator::{
accumulator_info::{self, AccumulatorInfo},
node::AccumulatorStoreType,
AccumulatorTreeStore,
AccumulatorTreeStore, MerkleAccumulator, Accumulator,
};
use starcoin_config::ChainNetworkID;
use starcoin_crypto::HashValue;
Expand Down Expand Up @@ -339,6 +339,7 @@ pub trait SyncFlexiDagStore {
fn get_dag_accumulator_info(&self) -> Result<Option<AccumulatorInfo>>;
fn get_tips_by_block_id(&self, block_id: HashValue) -> Result<Vec<HashValue>>;
fn dag_fork_height(&self, id: ChainNetworkID) -> BlockNumber;
fn get_lastest_snapshot(&self) -> Result<Option<SyncFlexiDagSnapshot>>;
}

// TODO: remove Arc<dyn Store>, we can clone Storage directly.
Expand Down Expand Up @@ -456,12 +457,13 @@ impl BlockStore for Storage {
let head_block_info = self.get_block_info(head_block.id())?.ok_or_else(|| {
format_err!("Startup block info {:?} should exist", startup_info.main)
})?;

let snapshot = self.get_lastest_snapshot()?.ok_or_else(error || anyhow!("latest snapshot is none"))?;
let chain_info = ChainInfo::new(
head_block.chain_id(),
genesis_hash,
ChainStatus::new(head_block.clone(), head_block_info),
self.get_dag_accumulator_info()?,
Some(snapshot.accumulator_info),
Some(snapshot.k_total_difficulties),
);
Ok(Some(chain_info))
}
Expand Down Expand Up @@ -674,6 +676,13 @@ impl SyncFlexiDagStore for Storage {
self.flexi_dag_storage.get_snapshot_storage()
}

fn get_lastest_snapshot(&self) -> Result<Option<SyncFlexiDagSnapshot>> {
let info = self.get_dag_accumulator_info()?.ok_or_else(error || anyhow!("dag startup info is none"))?;
let merkle_tree = MerkleAccumulator::new_with_info(info, storage.get_accumulator_store(AccumulatorStoreType::SyncDag));
let key = merkle_tree.get_leaf(merkle_tree.num_leaves() - 1)?.ok_or_else(errors || anyhow!("faile to get the key since it is none"))?;
self.query_by_hash(key)
}

fn get_dag_accumulator_info(&self) -> Result<Option<AccumulatorInfo>> {
let startup_info = self.get_startup_info()?;
if startup_info.is_none() {
Expand Down
4 changes: 2 additions & 2 deletions sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl SyncService {

if let Some(local_dag_accumulator_info) = op_local_dag_accumulator_info {
let dag_sync_futs = rpc_client
.get_dag_targets()?
.get_dag_targets(current_block_info.get_total_difficulty(), local_dag_accumulator_info.get_num_leaves())?
.into_iter()
.fold(Ok(vec![]), |mut futs, target_accumulator_infos| {
let (fut, task_handle, task_event_handle) = sync_dag_full_task(
Expand Down Expand Up @@ -681,7 +681,7 @@ impl EventHandler<Self, SyncDoneEvent> for SyncService {

impl EventHandler<Self, NewHeadBlock> for SyncService {
fn handle_event(&mut self, msg: NewHeadBlock, ctx: &mut ServiceContext<Self>) {
let NewHeadBlock(block, tips_hash) = msg;
let NewHeadBlock(block) = msg;
if self.sync_status.update_chain_status(ChainStatus::new(
block.header().clone(),
block.block_info.clone(),
Expand Down
7 changes: 6 additions & 1 deletion sync/src/tasks/block_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use starcoin_chain_api::{ChainReader, ChainWriter, ConnectBlockError, ExecutedBl
use starcoin_config::{Connect, G_CRATE_VERSION};
use starcoin_consensus::BlockDAG;
use starcoin_crypto::HashValue;
use starcoin_flexidag::flexidag_service::{AddToDag, GetDagTips, ForkDagAccumulator};
use starcoin_flexidag::flexidag_service::{AddToDag, GetDagTips, ForkDagAccumulator, FinishSync};
use starcoin_flexidag::FlexidagService;
use starcoin_logger::prelude::*;
use starcoin_service_registry::ServiceRef;
Expand Down Expand Up @@ -417,6 +417,11 @@ where
dag_accumulator_index: start_index,
block_header_id: self.chain.head_block().id(),
}))??);
if state == State::Enough {
async_std::task::block_on(self.flexidag_service.send(FinishSync {
dag_accumulator_info: self.new_dag_accumulator_info.clone(),
}))??
}
return Ok(state);
}

Expand Down
18 changes: 16 additions & 2 deletions sync/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,26 @@ use stream_task::{
};

pub trait SyncFetcher: PeerOperator + BlockIdFetcher + BlockFetcher + BlockInfoFetcher {
fn get_dag_targets(&self) -> Result<Vec<AccumulatorInfo>> {
fn get_dag_targets(&self, total_difficulty: U256, local_dag_accumulator_leaf_num: u64) -> Result<Vec<AccumulatorInfo>> {
Ok(self
.peer_selector()
.peer_infos()
.into_iter()
.map(|peer_info| peer_info.chain_info().dag_accumulator_info().clone())
.filter(|peer_info| {
match (peer_info.chain_info().dag_accumulator_info(), peer_info.chain_info().k_total_difficulties()) {
(Some(info), Some(k)) => {
k.first() <= total_difficulty || info.get_num_leaves() > local_dag_accumulator_leaf_num
}
(NOne, None) => false,
_ => {
warn!("dag accumulator is inconsistent with k total difficulty");
false
}
}
})
.map(|peer_info| {
peer_info.chain_info().dag_accumulator_info().clone()
})
.collect());
}

Expand Down
7 changes: 5 additions & 2 deletions test-helper/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,16 @@ impl ServiceFactory<NetworkActorService> for MockNetworkServiceFactory {
.ok_or_else(|| format_err!("can't get block info by hash {}", head_block_hash))?;
let dag_tips = storage.get_tips_by_block_id(head_block_hash)?;
let chain_status =
ChainStatus::new(head_block_header.clone(), head_block_info, Some(dag_tips));
let dag_accumulator_info = storage.get_dag_accumulator_info()?;
ChainStatus::new(head_block_header.clone(), head_block_info);
let (dag_accumulator_info, k_total_difficulties) = storage.get_lastest_snapshot()?.map(|snapshot| {
(Some(snapshot.accumulator_info), Some(snapshot.k_total_difficulties))
}).unwrap_or((None, None));
let chain_state_info = ChainInfo::new(
config.net().chain_id(),
genesis_hash,
chain_status.clone(),
dag_accumulator_info.clone(),
k_total_difficulties,
);
let actor_service =
NetworkActorService::new(config, chain_state_info, rpc, peer_message_handle.clone())?;
Expand Down
4 changes: 1 addition & 3 deletions types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,9 +845,7 @@ impl BlockInfo {
&self.block_id
}

pub fn transaction_parent(&self) -> Option<HashValue> {
self.transaction_parent.clone()
}

}

impl Sample for BlockInfo {
Expand Down
3 changes: 2 additions & 1 deletion vm/starcoin-transactional-test-harness/src/fork_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ForkBlockChain {
self.number_hash_map
.insert(self.current_number, block.header().id());
self.status = Some(ChainStatusWithBlock {
status: ChainStatus::new(block.header().clone(), block_info.clone(), None),
status: ChainStatus::new(block.header().clone(), block_info.clone()),
head: block.clone(),
});
self.storage.save_block_info(block_info)?;
Expand Down Expand Up @@ -199,6 +199,7 @@ impl ChainApi for MockChainApi {
HashValue::random(),
status.status,
None,
None,
))),
None => match client {
Some(client) => client.info().await.map_err(|e| anyhow!("{}", e)),
Expand Down

0 comments on commit 9287b51

Please sign in to comment.