diff --git a/chain/api/src/chain.rs b/chain/api/src/chain.rs index b34310e2d3..d315a7515b 100644 --- a/chain/api/src/chain.rs +++ b/chain/api/src/chain.rs @@ -114,7 +114,7 @@ pub trait ChainReader { uncles: &[BlockHeader], header: &BlockHeader, ) -> Result; - fn is_dag_ancestor_of(&self, ancestor: HashValue, descendants: Vec) -> Result; + fn is_dag_ancestor_of(&self, ancestor: HashValue, descendant: HashValue) -> Result; fn get_pruning_height(&self) -> BlockNumber; fn get_pruning_config(&self) -> (u64, u64); } diff --git a/chain/src/chain.rs b/chain/src/chain.rs index bd4ecaf5ac..99886657f0 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -19,7 +19,6 @@ use starcoin_crypto::HashValue; use starcoin_dag::blockdag::{BlockDAG, MineNewDagBlockInfo}; use starcoin_dag::consensusdb::consenses_state::DagState; use starcoin_dag::consensusdb::prelude::StoreError; -use starcoin_dag::consensusdb::schemadb::GhostdagStoreReader; use starcoin_executor::VMMetrics; #[cfg(feature = "force-deploy")] use starcoin_force_upgrade::force_upgrade_management::get_force_upgrade_block_number; @@ -94,7 +93,7 @@ impl BlockChain { uncles: Option>, storage: Arc, vm_metrics: Option, - mut dag: BlockDAG, + dag: BlockDAG, ) -> Result { let block_info = storage .get_block_info(head_block.id())? @@ -133,10 +132,6 @@ impl BlockChain { vm_metrics, dag: dag.clone(), }; - let genesis_header = storage - .get_block_header_by_hash(genesis)? - .ok_or_else(|| format_err!("failed to get genesis because it is none"))?; - dag.set_reindex_root(genesis_header.parent_hash())?; watch(CHAIN_WATCH_NAME, "n1251"); match uncles { Some(data) => chain.uncles = data, @@ -659,19 +654,11 @@ impl BlockChain { self.storage.save_block_info(block_info.clone())?; self.storage.save_table_infos(txn_table_infos)?; - let genesis_header = self - .storage - .get_block_header_by_hash(self.genesis_hash)? - .ok_or_else(|| format_err!("failed to get genesis because it is none"))?; let result = match verified_block.ghostdata { - Some(trusted_ghostdata) => self.dag.commit_trusted_block( - header.to_owned(), - genesis_header.parent_hash(), - Arc::new(trusted_ghostdata), - ), - None => self + Some(trusted_ghostdata) => self .dag - .commit(header.to_owned(), genesis_header.parent_hash()), + .commit_trusted_block(header.to_owned(), Arc::new(trusted_ghostdata)), + None => self.dag.commit(header.to_owned()), }; match result { anyhow::Result::Ok(_) => info!("finish to commit dag block: {:?}", block_id), @@ -1385,41 +1372,11 @@ impl ChainReader for BlockChain { uncles: &[BlockHeader], header: &BlockHeader, ) -> Result { - let previous_header = self - .storage - .get_block_header_by_hash(header.parent_hash())? - .ok_or_else(|| format_err!("cannot find parent block header"))?; - let next_ghostdata = self.dag().verify_and_ghostdata(uncles, header)?; - let (pruning_depth, pruning_finality) = self.get_pruning_config(); - if self.status().head().pruning_point() != HashValue::zero() { - let previous_ghostdata = if previous_header.pruning_point() == HashValue::zero() { - let genesis = self - .storage - .get_genesis()? - .ok_or_else(|| format_err!("the genesis id is none!"))?; - self.dag().storage.ghost_dag_store.get_data(genesis)? - } else { - self.dag() - .storage - .ghost_dag_store - .get_data(previous_header.pruning_point())? - }; - - self.dag().verify_pruning_point( - previous_header.pruning_point(), - previous_ghostdata.as_ref(), - header.pruning_point(), - &next_ghostdata, - pruning_depth, - pruning_finality, - )?; - } - - Ok(next_ghostdata) + Ok(self.dag().verify_and_ghostdata(uncles, header)?) } - fn is_dag_ancestor_of(&self, ancestor: HashValue, descendants: Vec) -> Result { - self.dag().check_ancestor_of(ancestor, descendants) + fn is_dag_ancestor_of(&self, ancestor: HashValue, descendant: HashValue) -> Result { + self.dag().check_ancestor_of(ancestor, descendant) } fn get_pruning_height(&self) -> BlockNumber { @@ -1561,7 +1518,7 @@ impl BlockChain { let mut new_tips = vec![]; for hash in tips { - if !dag.check_ancestor_of(hash, vec![new_tip_block.id()])? { + if !dag.check_ancestor_of(hash, new_tip_block.id())? { new_tips.push(hash); } } diff --git a/chain/src/verifier/mod.rs b/chain/src/verifier/mod.rs index a4381b2522..8232093e72 100644 --- a/chain/src/verifier/mod.rs +++ b/chain/src/verifier/mod.rs @@ -392,7 +392,7 @@ impl BasicDagVerifier { parents_hash.iter().try_for_each(|parent_hash| { verify_block!( VerifyBlockField::Header, - current_chain.is_dag_ancestor_of(new_block_header.pruning_point(), vec![*parent_hash]).map_err(|e| { + current_chain.is_dag_ancestor_of(new_block_header.pruning_point(), *parent_hash).map_err(|e| { ConnectBlockError::VerifyBlockFailed( VerifyBlockField::Header, anyhow::anyhow!( diff --git a/flexidag/src/blockdag.rs b/flexidag/src/blockdag.rs index 8b5e2def2f..ce59648f47 100644 --- a/flexidag/src/blockdag.rs +++ b/flexidag/src/blockdag.rs @@ -4,7 +4,9 @@ use crate::consensusdb::consenses_state::{ DagState, DagStateReader, DagStateStore, ReachabilityView, }; use crate::consensusdb::prelude::{FlexiDagStorageConfig, StoreError}; -use crate::consensusdb::schemadb::{GhostdagStoreReader, ReachabilityStore, REINDEX_ROOT_KEY}; +use crate::consensusdb::schemadb::{ + GhostdagStoreReader, ReachabilityStore, StagingReachabilityStore, +}; use crate::consensusdb::{ prelude::FlexiDagStorage, schemadb::{ @@ -13,9 +15,11 @@ use crate::consensusdb::{ }, }; use crate::ghostdag::protocol::GhostdagManager; +use crate::process_key_already_error; use crate::prune::pruning_point_manager::PruningPointManagerT; -use crate::{process_key_already_error, reachability}; +use crate::reachability::ReachabilityError; use anyhow::{bail, ensure, Ok}; +use rocksdb::WriteBatch; use starcoin_config::temp_dir; use starcoin_crypto::{HashValue as Hash, HashValue}; use starcoin_logger::prelude::{debug, info, warn}; @@ -98,9 +102,15 @@ impl BlockDAG { Ok(self.storage.header_store.has(hash)?) } - pub fn check_ancestor_of(&self, ancestor: Hash, descendant: Vec) -> anyhow::Result { - self.ghostdag_manager - .check_ancestor_of(ancestor, descendant) + pub fn check_ancestor_of(&self, ancestor: Hash, descendant: Hash) -> anyhow::Result { + // self.ghostdag_manager + // .check_ancestor_of(ancestor, descendant) + inquirer::is_dag_ancestor_of( + &*self.storage.reachability_store.read(), + ancestor, + descendant, + ) + .map_err(|e| e.into()) } pub fn init_with_genesis(&mut self, genesis: BlockHeader) -> anyhow::Result { @@ -114,7 +124,7 @@ impl BlockDAG { .write() .insert(origin, BlockHashes::new(vec![]))?; - self.commit(genesis, origin)?; + self.commit(genesis)?; self.save_dag_state( genesis_id, DagState { @@ -150,7 +160,6 @@ impl BlockDAG { pub fn commit_trusted_block( &mut self, header: BlockHeader, - origin: HashValue, trusted_ghostdata: Arc, ) -> anyhow::Result<()> { info!( @@ -208,12 +217,49 @@ impl BlockDAG { } }; + if self.storage.reachability_store.read().get_reindex_root()? != header.pruning_point() + && header.pruning_point() != HashValue::zero() + && self + .storage + .reachability_store + .read() + .has(header.pruning_point())? + { + info!( + "try to hint virtual selected parent, root index: {:?}", + self.storage.reachability_store.read().get_reindex_root() + ); + inquirer::hint_virtual_selected_parent( + self.storage.reachability_store.write().deref_mut(), + header.pruning_point(), + )?; + info!( + "after hint virtual selected parent, root index: {:?}", + self.storage.reachability_store.read().get_reindex_root() + ); + } + + info!("start to commit via batch, header id: {:?}", header.id()); + + // Create a DB batch writer + let mut batch = WriteBatch::default(); + + // lock the dag data to write in batch + // the cache will be written at the same time + // when the batch is written before flush to the disk and + // if the writing process abort the starcoin process will/should restart. + let mut stage = StagingReachabilityStore::new( + self.storage.db.clone(), + self.storage.reachability_store.upgradable_read(), + ); + // Store ghostdata - process_key_already_error( - self.storage - .ghost_dag_store - .insert(header.id(), ghostdata.clone()), - )?; + process_key_already_error(self.storage.ghost_dag_store.insert_batch( + &mut batch, + header.id(), + ghostdata.clone(), + )) + .expect("failed to ghostdata in batch"); // Update reachability store debug!( @@ -221,81 +267,67 @@ impl BlockDAG { header.id(), header.number() ); - let reachability_store = self.storage.reachability_store.clone(); let mut merge_set = ghostdata .unordered_mergeset_without_selected_parent() .filter(|hash| self.storage.reachability_store.read().has(*hash).unwrap()) .collect::>() .into_iter(); - let add_block_result = { - let mut reachability_writer = reachability_store.write(); - inquirer::add_block( - reachability_writer.deref_mut(), - header.id(), - ghostdata.selected_parent, - &mut merge_set, - ) - }; - match add_block_result { - Result::Ok(_) => (), - Err(reachability::ReachabilityError::DataInconsistency) => { - let _future_covering_set = reachability_store - .read() - .get_future_covering_set(header.id())?; - info!( - "the key {:?} was already processed, original error message: {:?}", - header.id(), - reachability::ReachabilityError::DataInconsistency - ); - } - Err(reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg))) => { - if msg == *REINDEX_ROOT_KEY.to_string() { - info!( + + match inquirer::add_block( + &mut stage, + header.id(), + ghostdata.selected_parent, + &mut merge_set, + ) { + std::result::Result::Ok(_) => {} + Err(e) => match e { + ReachabilityError::DataInconsistency => { + warn!( "the key {:?} was already processed, original error message: {:?}", header.id(), - reachability::ReachabilityError::StoreError(StoreError::KeyNotFound( - REINDEX_ROOT_KEY.to_string() - )) - ); - info!("now set the reindex key to origin: {:?}", origin); - // self.storage.reachability_store.set_reindex_root(origin)?; - self.set_reindex_root(origin)?; - bail!( - "failed to add a block when committing, e: {:?}", - reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg)) - ); - } else { - bail!( - "failed to add a block when committing, e: {:?}", - reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg)) + ReachabilityError::DataInconsistency ); } - } - Err(reachability::ReachabilityError::StoreError(StoreError::InvalidInterval(_, _))) => { - self.set_reindex_root(origin)?; - bail!("failed to add a block when committing for invalid interval",); - } - Err(e) => { - bail!("failed to add a block when committing, e: {:?}", e); - } + _ => { + panic!("failed to add block in batch for error: {:?}", e); + } + }, } - process_key_already_error( - self.storage - .relations_store - .write() - .insert(header.id(), BlockHashes::new(parents)), - )?; + + process_key_already_error(self.storage.relations_store.write().insert_batch( + &mut batch, + header.id(), + BlockHashes::new(parents), + )) + .expect("failed to insert relations in batch"); + // Store header store process_key_already_error(self.storage.header_store.insert( header.id(), - Arc::new(header), + Arc::new(header.clone()), 1, - ))?; + )) + .expect("failed to insert header in batch"); + + // the read lock will be updated to the write lock + // and then write the batch + // and then release the lock + stage + .commit(&mut batch) + .expect("failed to write the stage reachability in batch"); + + // write the data just one time + self.storage + .write_batch(batch) + .expect("failed to write dag data in batch"); + + info!("finish writing the batch, head id: {:?}", header.id()); + Ok(()) } - pub fn commit(&mut self, header: BlockHeader, origin: HashValue) -> anyhow::Result<()> { + pub fn commit(&mut self, header: BlockHeader) -> anyhow::Result<()> { info!( "start to commit header: {:?}, number: {:?}", header.id(), @@ -322,12 +354,49 @@ impl BlockDAG { Some(ghostdata) => ghostdata, }; + if self.storage.reachability_store.read().get_reindex_root()? != header.pruning_point() + && header.pruning_point() != HashValue::zero() + && self + .storage + .reachability_store + .read() + .has(header.pruning_point())? + { + info!( + "try to hint virtual selected parent, root index: {:?}", + self.storage.reachability_store.read().get_reindex_root() + ); + inquirer::hint_virtual_selected_parent( + self.storage.reachability_store.write().deref_mut(), + header.pruning_point(), + )?; + info!( + "after hint virtual selected parent, root index: {:?}", + self.storage.reachability_store.read().get_reindex_root() + ); + } + + info!("start to commit via batch, header id: {:?}", header.id()); + + // Create a DB batch writer + let mut batch = WriteBatch::default(); + + // lock the dag data to write in batch, read lock. + // the cache will be written at the same time + // when the batch is written before flush to the disk and + // if the writing process abort the starcoin process will/should restart. + let mut stage = StagingReachabilityStore::new( + self.storage.db.clone(), + self.storage.reachability_store.upgradable_read(), + ); + // Store ghostdata - process_key_already_error( - self.storage - .ghost_dag_store - .insert(header.id(), ghostdata.clone()), - )?; + process_key_already_error(self.storage.ghost_dag_store.insert_batch( + &mut batch, + header.id(), + ghostdata.clone(), + )) + .expect("failed to ghostdata in batch"); // Update reachability store debug!( @@ -335,78 +404,64 @@ impl BlockDAG { header.id(), header.number() ); - let reachability_store = self.storage.reachability_store.clone(); let mut merge_set = ghostdata .unordered_mergeset_without_selected_parent() .filter(|hash| self.storage.reachability_store.read().has(*hash).unwrap()) .collect::>() .into_iter(); - let add_block_result = { - let mut reachability_writer = reachability_store.write(); - inquirer::add_block( - reachability_writer.deref_mut(), - header.id(), - ghostdata.selected_parent, - &mut merge_set, - ) - }; - match add_block_result { - Result::Ok(_) => (), - Err(reachability::ReachabilityError::DataInconsistency) => { - let _future_covering_set = reachability_store - .read() - .get_future_covering_set(header.id())?; - info!( - "the key {:?} was already processed, original error message: {:?}", - header.id(), - reachability::ReachabilityError::DataInconsistency - ); - } - Err(reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg))) => { - if msg == *REINDEX_ROOT_KEY.to_string() { + + match inquirer::add_block( + &mut stage, + header.id(), + ghostdata.selected_parent, + &mut merge_set, + ) { + std::result::Result::Ok(_) => {} + Err(e) => match e { + ReachabilityError::DataInconsistency => { info!( "the key {:?} was already processed, original error message: {:?}", header.id(), - reachability::ReachabilityError::StoreError(StoreError::KeyNotFound( - REINDEX_ROOT_KEY.to_string() - )) - ); - info!("now set the reindex key to origin: {:?}", origin); - // self.storage.reachability_store.set_reindex_root(origin)?; - self.set_reindex_root(origin)?; - bail!( - "failed to add a block when committing, e: {:?}", - reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg)) - ); - } else { - bail!( - "failed to add a block when committing, e: {:?}", - reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg)) + ReachabilityError::DataInconsistency ); } - } - Err(reachability::ReachabilityError::StoreError(StoreError::InvalidInterval(_, _))) => { - self.set_reindex_root(origin)?; - bail!("failed to add a block when committing for invalid interval",); - } - Err(e) => { - bail!("failed to add a block when committing, e: {:?}", e); - } + _ => { + panic!("failed to add block in batch for error: {:?}", e); + } + }, } - process_key_already_error( - self.storage - .relations_store - .write() - .insert(header.id(), BlockHashes::new(parents)), - )?; + process_key_already_error(self.storage.relations_store.write().insert_batch( + &mut batch, + header.id(), + BlockHashes::new(parents), + )) + .expect("failed to insert relations in batch"); + // Store header store - process_key_already_error(self.storage.header_store.insert( + process_key_already_error(self.storage.header_store.insert_batch( + &mut batch, header.id(), - Arc::new(header), + Arc::new(header.clone()), 1, - ))?; + )) + .expect("failed to insert header in batch"); + + // the read lock will be updated to the write lock + // and then write the batch + // and then release the lock + stage + .commit(&mut batch) + .expect("failed to write the stage reachability in batch"); + + // write the data just one time + self.storage + .write_batch(batch) + .expect("failed to write dag data in batch"); + + info!("finish writing the batch, head id: {:?}", header.id()); + Ok(()) } @@ -564,8 +619,12 @@ impl BlockDAG { blue_blocks: &[BlockHeader], header: &BlockHeader, ) -> Result { - self.ghost_dag_manager() - .verify_and_ghostdata(blue_blocks, header) + if header.pruning_point() != HashValue::zero() { + self.ghost_dag_manager().ghostdag(&header.parents()) + } else { + self.ghost_dag_manager() + .verify_and_ghostdata(blue_blocks, header) + } } pub fn check_upgrade(&self, main: &BlockHeader, genesis_id: HashValue) -> anyhow::Result<()> { // set the state with key 0 @@ -621,10 +680,15 @@ impl BlockDAG { ) -> anyhow::Result { let de = descendants .into_iter() - .filter(|descendant| { - self.check_ancestor_of(ancestor, vec![*descendant]) - .unwrap_or(false) - }) + .filter( + |descendant| match self.check_ancestor_of(ancestor, *descendant) { + std::result::Result::Ok(result) => result, + Err(e) => { + warn!("Error checking ancestor relationship: {:?}", e); + false + } + }, + ) .collect::>(); anyhow::Ok(ReachabilityView { ancestor, diff --git a/flexidag/src/consensusdb/access.rs b/flexidag/src/consensusdb/access.rs index 9d6a8ceedf..8244a3e9b3 100644 --- a/flexidag/src/consensusdb/access.rs +++ b/flexidag/src/consensusdb/access.rs @@ -33,6 +33,10 @@ where } } + pub fn clear_cache(&self) { + self.cache.remove_all(); + } + pub fn read_from_cache(&self, key: S::Key) -> Option { self.cache.get(&key) } @@ -108,6 +112,13 @@ where Ok(()) } + pub fn flush_cache(&self, data: &[(S::Key, S::Value)]) -> Result<(), StoreError> { + for (key, value) in data { + self.cache.insert(key.clone(), value.clone()); + } + Ok(()) + } + /// Write directly from an iterator and do not cache any data. NOTE: this action also clears the cache pub fn write_many_without_cache( &self, diff --git a/flexidag/src/consensusdb/consensus_ghostdag.rs b/flexidag/src/consensusdb/consensus_ghostdag.rs index a45d9473cd..6a70ff6e02 100644 --- a/flexidag/src/consensusdb/consensus_ghostdag.rs +++ b/flexidag/src/consensusdb/consensus_ghostdag.rs @@ -225,7 +225,7 @@ impl DbGhostdagStore { &self, batch: &mut WriteBatch, hash: Hash, - data: &Arc, + data: Arc, ) -> Result<(), StoreError> { if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); diff --git a/flexidag/src/consensusdb/consensus_reachability.rs b/flexidag/src/consensusdb/consensus_reachability.rs index 55adbfb338..22cfa64336 100644 --- a/flexidag/src/consensusdb/consensus_reachability.rs +++ b/flexidag/src/consensusdb/consensus_reachability.rs @@ -3,7 +3,7 @@ use super::{ prelude::{BatchDbWriter, CachedDbAccess, CachedDbItem, DirectDbWriter, StoreError}, }; use starcoin_crypto::HashValue as Hash; -use starcoin_storage::storage::RawDBStorage; +use starcoin_storage::storage::{InnerStore, RawDBStorage}; use crate::{ consensusdb::schema::{KeyCodec, ValueCodec}, @@ -12,7 +12,7 @@ use crate::{ }; use starcoin_types::blockhash::{self, BlockHashMap, BlockHashes}; -use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard}; +use parking_lot::RwLockUpgradableReadGuard; use rocksdb::WriteBatch; use std::{collections::hash_map::Entry::Vacant, sync::Arc}; @@ -126,6 +126,13 @@ impl DbReachabilityStore { pub fn clone_with_new_cache(&self, cache_size: usize) -> Self { Self::new_with_prefix_end(Arc::clone(&self.db), cache_size) } + + pub fn batch_write(&self, batch: starcoin_storage::batch::WriteBatchWithColumn) { + self.db.write_batch_with_column(batch).unwrap(); + self.access.clear_cache(); + + // self.reindex_root.clear_cache().unwrap(); + } } impl ReachabilityStore for DbReachabilityStore { @@ -237,38 +244,40 @@ impl ReachabilityStoreReader for DbReachabilityStore { } pub struct StagingReachabilityStore<'a> { + db: Arc, store_read: RwLockUpgradableReadGuard<'a, DbReachabilityStore>, staging_writes: BlockHashMap, staging_reindex_root: Option, } impl<'a> StagingReachabilityStore<'a> { - pub fn new(store_read: RwLockUpgradableReadGuard<'a, DbReachabilityStore>) -> Self { + pub fn new( + db: Arc, + store_read: RwLockUpgradableReadGuard<'a, DbReachabilityStore>, + ) -> Self { Self { + db, store_read, staging_writes: BlockHashMap::new(), staging_reindex_root: None, } } - pub fn commit( - self, - batch: &mut WriteBatch, - ) -> Result, StoreError> { - let db = Arc::clone(&self.store_read.db); + pub fn commit(self, batch: &mut WriteBatch) -> Result<(), StoreError> { let mut store_write = RwLockUpgradableReadGuard::upgrade(self.store_read); for (k, v) in self.staging_writes { let data = Arc::new(v); store_write .access - .write(BatchDbWriter::new(batch, &db), k, data)? + .write(BatchDbWriter::new(batch, &self.db), k, data)? } + if let Some(root) = self.staging_reindex_root { store_write .reindex_root - .write(BatchDbWriter::new(batch, &db), &root)?; + .write(BatchDbWriter::new(batch, &self.db), &root)?; } - Ok(store_write) + Ok(()) } } diff --git a/flexidag/src/consensusdb/consensus_relations.rs b/flexidag/src/consensusdb/consensus_relations.rs index 036f628bfb..f9086c3a87 100644 --- a/flexidag/src/consensusdb/consensus_relations.rs +++ b/flexidag/src/consensusdb/consensus_relations.rs @@ -1,14 +1,16 @@ use super::schema::{KeyCodec, ValueCodec}; +use super::writer::BatchDbWriter; use super::{ db::DBStorage, - prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter, StoreError}, + prelude::{CachedDbAccess, StoreError}, }; use crate::define_schema; -use rocksdb::WriteBatch; use starcoin_crypto::HashValue as Hash; +use starcoin_storage::batch::{WriteBatch, WriteBatchData, WriteBatchWithColumn}; +use starcoin_storage::storage::{InnerStore, WriteOp}; use starcoin_types::blockhash::{BlockHashes, BlockLevel}; +use std::collections::HashMap; use std::sync::Arc; - /// Reader API for `RelationsStore`. pub trait RelationsStoreReader { fn get_parents(&self, hash: Hash) -> Result; @@ -93,7 +95,7 @@ impl DbRelationsStore { pub fn insert_batch( &mut self, - batch: &mut WriteBatch, + batch: &mut rocksdb::WriteBatch, hash: Hash, parents: BlockHashes, ) -> Result<(), StoreError> { @@ -147,35 +149,69 @@ impl RelationsStoreReader for DbRelationsStore { } impl RelationsStore for DbRelationsStore { - /// See `insert_batch` as well - /// TODO: use one function with DbWriter for both this function and insert_batch fn insert(&self, hash: Hash, parents: BlockHashes) -> Result<(), StoreError> { if self.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - // Insert a new entry for `hash` - self.parents_access - .write(DirectDbWriter::new(&self.db), hash, parents.clone())?; - - // The new hash has no children yet - self.children_access.write( - DirectDbWriter::new(&self.db), - hash, - BlockHashes::new(Vec::new()), - )?; + let mut parent_to_children = HashMap::new(); + parent_to_children.insert(hash, vec![]); - // Update `children` for each parent for parent in parents.iter().cloned() { - let mut children = (*self.get_children(parent)?).clone(); + let mut children = match self.get_children(parent) { + Ok(children) => (*children).clone(), + Err(e) => match e { + StoreError::KeyNotFound(_) => vec![], + _ => return std::result::Result::Err(e), + }, + }; children.push(hash); - self.children_access.write( - DirectDbWriter::new(&self.db), - parent, - BlockHashes::new(children), - )?; + parent_to_children.insert(parent, children); } + let batch = WriteBatchWithColumn { + data: vec![ + WriteBatchData { + column: PARENTS_CF.to_string(), + row_data: WriteBatch::new_with_rows(vec![( + hash.to_vec(), + WriteOp::Value( + > as ValueCodec>::encode_value(&parents)?, + ), + )]), + }, + WriteBatchData { + column: CHILDREN_CF.to_string(), + row_data: WriteBatch::new_with_rows( + parent_to_children + .iter() + .map(|(key, value)| { + std::result::Result::Ok(( + key.to_vec(), + WriteOp::Value(> as ValueCodec< + RelationChildren, + >>::encode_value( + &Arc::new(value.clone()) + )?), + )) + }) + .collect::, StoreError>>()?, + ), + }, + ], + }; + self.db + .write_batch_with_column(batch) + .map_err(|e| StoreError::DBIoError(format!("Failed to write batch when writing batch with column for the dag releationship: {:?}", e)))?; + + self.parents_access.flush_cache(&[(hash, parents)])?; + self.children_access.flush_cache( + &parent_to_children + .into_iter() + .map(|(key, value)| (key, BlockHashes::new(value))) + .collect::>(), + )?; + Ok(()) } } diff --git a/flexidag/src/consensusdb/db.rs b/flexidag/src/consensusdb/db.rs index 0590c99706..72632d11db 100644 --- a/flexidag/src/consensusdb/db.rs +++ b/flexidag/src/consensusdb/db.rs @@ -8,8 +8,10 @@ use super::{ }, }; use parking_lot::RwLock; +use rocksdb::WriteBatch; use starcoin_config::{RocksdbConfig, StorageConfig}; pub(crate) use starcoin_storage::db_storage::DBStorage; +use starcoin_storage::storage::RawDBStorage; use std::{path::Path, sync::Arc}; #[derive(Clone)] @@ -19,6 +21,7 @@ pub struct FlexiDagStorage { pub reachability_store: Arc>, pub relations_store: Arc>, pub state_store: Arc>, + pub(crate) db: Arc, } #[derive(Clone)] @@ -92,7 +95,20 @@ impl FlexiDagStorage { 1, config.cache_size, ))), - state_store: Arc::new(RwLock::new(DbDagStateStore::new(db, config.cache_size))), + state_store: Arc::new(RwLock::new(DbDagStateStore::new( + db.clone(), + config.cache_size, + ))), + db, + }) + } + + pub fn write_batch(&self, batch: WriteBatch) -> Result<(), StoreError> { + self.db.raw_write_batch_sync(batch).map_err(|e| { + StoreError::DBIoError(format!( + "failed to write in batch for dag data, error: {:?}", + e.to_string() + )) }) } } diff --git a/flexidag/src/consensusdb/item.rs b/flexidag/src/consensusdb/item.rs index e4a85426f9..4bd83e39b6 100644 --- a/flexidag/src/consensusdb/item.rs +++ b/flexidag/src/consensusdb/item.rs @@ -23,16 +23,16 @@ impl CachedDbItem { } pub fn read(&self) -> Result { - if let Some(item) = self.cached_item.read().clone() { - return Ok(item); - } + // if let Some(item) = self.cached_item.read().clone() { + // return Ok(item); + // } if let Some(slice) = self .db .raw_get_pinned_cf(S::COLUMN_FAMILY, &self.key.encode_key()?) .map_err(|_| StoreError::CFNotExist(S::COLUMN_FAMILY.to_string()))? { let item = S::Value::decode_value(&slice)?; - *self.cached_item.write() = Some(item.clone()); + // *self.cached_item.write() = Some(item.clone()); Ok(item) } else { Err(StoreError::KeyNotFound( @@ -55,6 +55,11 @@ where { Ok(()) } + pub fn clear_cache(&mut self) -> Result<(), StoreError> { + *self.cached_item.write() = None; + Ok(()) + } + pub fn update(&mut self, mut writer: impl DbWriter, op: F) -> Result where F: Fn(S::Value) -> S::Value, diff --git a/flexidag/src/ghostdag/protocol.rs b/flexidag/src/ghostdag/protocol.rs index 30567d473a..435b09b421 100644 --- a/flexidag/src/ghostdag/protocol.rs +++ b/flexidag/src/ghostdag/protocol.rs @@ -2,7 +2,7 @@ use super::util::Refs; use crate::consensusdb::schemadb::{GhostdagStoreReader, HeaderStoreReader, RelationsStoreReader}; use crate::reachability::reachability_service::ReachabilityService; use crate::types::{ghostdata::GhostdagData, ordering::*}; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{ensure, Context, Result}; use parking_lot::RwLock; use starcoin_crypto::HashValue as Hash; use starcoin_logger::prelude::*; @@ -184,50 +184,19 @@ impl< let selected_parent = self.find_selected_parent(header.parents_hash().into_iter())?; // Initialize new GHOSTDAG block data with the selected parent let mut new_block_data = GhostdagData::new_with_selected_parent(selected_parent, self.k); - let ordered_mergeset = self.sort_blocks( + new_block_data.mergeset_blues = Arc::new( + vec![selected_parent] + .into_iter() + .chain(self.sort_blocks(blue_blocks.iter().map(|header| header.id()))?) + .collect(), + ); + new_block_data.mergeset_reds = Arc::new( header .parents_hash() .into_iter() - .filter(|header_id| *header_id != new_block_data.selected_parent) - .chain( - blue_blocks - .iter() - .filter(|header| header.id() != new_block_data.selected_parent) - .map(|header| header.id()), - ) - .collect::>() - .into_iter() - .collect::>(), - )?; - - for blue_candidate in ordered_mergeset.iter().cloned() { - let coloring = self.check_blue_candidate(&new_block_data, blue_candidate)?; - if let ColoringOutput::Blue(blue_anticone_size, blues_anticone_sizes) = coloring { - // No k-cluster violation found, we can now set the candidate block as blue - new_block_data.add_blue(blue_candidate, blue_anticone_size, &blues_anticone_sizes); - } else { - new_block_data.add_red(blue_candidate); - } - } - - if new_block_data - .mergeset_blues - .iter() - .skip(1) - .cloned() - .collect::>() - != blue_blocks - .iter() - .map(|header| header.id()) - .collect::>() - { - if header.number() < 10000000 { - // no bail before 10000000 - warn!("The data of blue set is not equal when executing the block: {:?}, for {:?}, checking data: {:?}", header.id(), blue_blocks.iter().map(|header| header.id()).collect::>(), new_block_data.mergeset_blues); - } else { - bail!("The data of blue set is not equal when executing the block: {:?}, for {:?}, checking data: {:?}", header.id(), blue_blocks.iter().map(|header| header.id()).collect::>(), new_block_data.mergeset_blues); - } - } + .filter(|header_id| !new_block_data.mergeset_blues.contains(header_id)) + .collect(), + ); let blue_score = self .ghostdag_store diff --git a/flexidag/src/prune/pruning_point_manager.rs b/flexidag/src/prune/pruning_point_manager.rs index 4e2ee9cf3e..a81597cf69 100644 --- a/flexidag/src/prune/pruning_point_manager.rs +++ b/flexidag/src/prune/pruning_point_manager.rs @@ -75,6 +75,11 @@ impl PruningPointManagerT { min_required_blue_score_for_next_pruning_point ); + debug!("previous_pruning_point: {:?}, previous_ghostdata: {:?}, next_ghostdata: {:?}, pruning_depth: {:?}, pruning_finality: {:?}", + previous_pruning_point, previous_ghostdata, next_ghostdata, + pruning_depth, pruning_finality, + ); + let mut latest_pruning_ghost_data = previous_ghostdata.to_compact(); if min_required_blue_score_for_next_pruning_point + pruning_depth <= next_ghostdata.blue_score diff --git a/flexidag/tests/tests.rs b/flexidag/tests/tests.rs index d3002c213c..ffc402708f 100644 --- a/flexidag/tests/tests.rs +++ b/flexidag/tests/tests.rs @@ -38,7 +38,7 @@ fn test_dag_commit() -> Result<()> { .build(); let mut parents_hash = vec![genesis.id()]; - let origin = dag.init_with_genesis(genesis.clone())?; + let _origin = dag.init_with_genesis(genesis.clone())?; for _ in 0..10 { let header_builder = BlockHeaderBuilder::random(); @@ -46,7 +46,7 @@ fn test_dag_commit() -> Result<()> { .with_parents_hash(parents_hash.clone()) .build(); parents_hash = vec![header.id()]; - dag.commit(header.to_owned(), origin)?; + dag.commit(header.to_owned())?; let ghostdata = dag.ghostdata_by_hash(header.id()).unwrap().unwrap(); println!("{:?},{:?}", header, ghostdata); } @@ -92,15 +92,15 @@ fn test_dag_1() -> Result<()> { let genesis_id = genesis.id(); let mut dag = BlockDAG::create_for_testing().unwrap(); let expect_selected_parented = [block5.id(), block3.id(), block3_1.id(), genesis_id]; - let origin = dag.init_with_genesis(genesis.clone()).unwrap(); - - dag.commit(block1, origin)?; - dag.commit(block2, origin)?; - dag.commit(block3_1, origin)?; - dag.commit(block3, origin)?; - dag.commit(block4, origin)?; - dag.commit(block5, origin)?; - dag.commit(block6, origin)?; + let _origin = dag.init_with_genesis(genesis.clone())?; + + dag.commit(block1)?; + dag.commit(block2)?; + dag.commit(block3_1)?; + dag.commit(block3)?; + dag.commit(block4)?; + dag.commit(block5)?; + dag.commit(block6)?; let mut count = 0; while latest_id != genesis_id && count < 4 { let ghostdata = dag @@ -130,9 +130,10 @@ async fn test_with_spawn() { .with_parents_hash(vec![genesis.id()]) .build(); let mut dag = BlockDAG::create_for_testing().unwrap(); - let real_origin = dag.init_with_genesis(genesis.clone()).unwrap(); - dag.commit(block1.clone(), real_origin).unwrap(); - dag.commit(block2.clone(), real_origin).unwrap(); + let _origin = dag.init_with_genesis(genesis.clone()).unwrap(); + + dag.commit(block1.clone()).unwrap(); + dag.commit(block2.clone()).unwrap(); let block3 = BlockHeaderBuilder::random() .with_difficulty(3.into()) .with_parents_hash(vec![block1.id(), block2.id()]) @@ -144,7 +145,7 @@ async fn test_with_spawn() { let handle = tokio::task::spawn_blocking(move || { let mut count = 10; loop { - match dag_clone.commit(block_clone.clone(), real_origin) { + match dag_clone.commit(block_clone.clone()) { std::result::Result::Ok(_) => break, Err(e) => { debug!("failed to commit error: {:?}, i: {:?}", e, i); @@ -261,8 +262,7 @@ fn test_dag_genesis_fork() { .with_parents_hash(parents_hash.clone()) .build(); parents_hash = vec![header.id()]; - dag.commit(header.to_owned(), genesis.parent_hash()) - .unwrap(); + dag.commit(header.to_owned()).unwrap(); let _ghostdata = dag.ghostdata_by_hash(header.id()).unwrap().unwrap(); } @@ -285,8 +285,7 @@ fn test_dag_genesis_fork() { .with_parents_hash(old_parents_hash.clone()) .build(); old_parents_hash = vec![header.id()]; - dag.commit(header.to_owned(), genesis.parent_hash()) - .unwrap(); + dag.commit(header.to_owned()).unwrap(); let ghostdata = dag.ghostdata_by_hash(header.id()).unwrap().unwrap(); println!("add a old header: {:?}, tips: {:?}", header, ghostdata); } @@ -298,8 +297,7 @@ fn test_dag_genesis_fork() { .with_parents_hash(parents_hash.clone()) .build(); parents_hash = vec![header.id()]; - dag.commit(header.to_owned(), genesis.parent_hash()) - .unwrap(); + dag.commit(header.to_owned()).unwrap(); let ghostdata = dag.ghostdata_by_hash(header.id()).unwrap().unwrap(); println!("add a forked header: {:?}, tips: {:?}", header, ghostdata); } @@ -308,8 +306,7 @@ fn test_dag_genesis_fork() { parents_hash.append(&mut old_parents_hash); let header = header_builder.with_parents_hash(parents_hash).build(); // parents_hash = vec![header.id()]; - dag.commit(header.to_owned(), genesis.parent_hash()) - .unwrap(); + dag.commit(header.to_owned()).unwrap(); let ghostdata = dag.ghostdata_by_hash(header.id()).unwrap().unwrap(); println!("add a forked header: {:?}, tips: {:?}", header, ghostdata); } @@ -359,10 +356,10 @@ fn test_dag_multiple_commits() -> anyhow::Result<()> { .build(); parents_hash = vec![header.id()]; parent_hash = header.id(); - dag.commit(header.to_owned(), genesis.parent_hash())?; + dag.commit(header.to_owned())?; if header.number() == 6 { - dag.commit(header.to_owned(), genesis.parent_hash())?; - dag.commit(header.to_owned(), genesis.parent_hash())?; + dag.commit(header.to_owned())?; + dag.commit(header.to_owned())?; } let ghostdata = dag.ghostdata(&parents_hash).unwrap(); println!("add a header: {:?}, tips: {:?}", header, ghostdata); @@ -474,48 +471,66 @@ fn test_reachability_check_ancestor() -> anyhow::Result<()> { // origin.....target_parent-target.....parent-child // ancestor assert!( - dag.check_ancestor_of(target, vec![parent, child])?, + dag.check_ancestor_of(target, parent)?, + "failed to check target is the ancestor of its descendant" + ); + assert!( + dag.check_ancestor_of(target, child)?, "failed to check target is the ancestor of its descendant" ); assert!( - dag.check_ancestor_of(origin, vec![target, parent, child])?, + dag.check_ancestor_of(origin, target)?, + "failed to check origin is the parent of its child" + ); + assert!( + dag.check_ancestor_of(origin, parent)?, "failed to check origin is the parent of its child" ); assert!( - dag.check_ancestor_of(parent, vec![child])?, + dag.check_ancestor_of(origin, child)?, + "failed to check origin is the parent of its child" + ); + assert!( + dag.check_ancestor_of(parent, child)?, "failed to check target, parent is the parent of its child" ); assert!( - dag.check_ancestor_of(target_parent, vec![target])?, + dag.check_ancestor_of(target_parent, target)?, "failed to check target parent, parent is the parent of its child" ); // not ancestor assert!( - !dag.check_ancestor_of(child, vec![target])?, + !dag.check_ancestor_of(child, target)?, "failed to check child is not the ancestor of its descendant" ); assert!( - !dag.check_ancestor_of(parent, vec![target])?, + !dag.check_ancestor_of(parent, target)?, "failed to check child is not the ancestor of its descendant" ); assert!( - !dag.check_ancestor_of(child, vec![parent])?, + !dag.check_ancestor_of(child, parent)?, "failed to check target, child is the child of its parent" ); assert!( - !dag.check_ancestor_of(target, vec![target_parent])?, + !dag.check_ancestor_of(target, target_parent)?, "failed to check target is the child of its parent" ); assert!( - dag.check_ancestor_of(target, vec![Hash::random(), Hash::random(),]) - .is_err(), + dag.check_ancestor_of(target, Hash::random()).is_err(), "failed to check not the ancestor of descendants" ); assert!( - dag.check_ancestor_of(Hash::random(), vec![target, parent, child]) - .is_err(), + dag.check_ancestor_of(Hash::random(), target).is_err(), + "failed to check not the descendant of parents" + ); + assert!( + dag.check_ancestor_of(Hash::random(), parent).is_err(), + "failed to check not the descendant of parents" + ); + assert!( + dag.check_ancestor_of(Hash::random(), child).is_err(), "failed to check not the descendant of parents" ); @@ -596,12 +611,69 @@ fn test_reachability_not_ancestor() -> anyhow::Result<()> { hashes.push(child3); print_reachability_data(reachability_store.read().deref(), &hashes); - let result = dag.check_ancestor_of(child1, vec![child3]); + let result = dag.check_ancestor_of(child1, child3); println!("dag.check_ancestor_of() result = {:?}", result); Ok(()) } +#[test] +#[ignore = "maxmum data testing for dev"] +fn test_hint_virtaul_selected_parent() -> anyhow::Result<()> { + let dag = BlockDAG::create_for_testing().unwrap(); + let reachability_store = dag.storage.reachability_store.clone(); + + let origin = Hash::random(); + + inquirer::init_for_test( + reachability_store.write().deref_mut(), + origin, + // Interval::maximal(), + Interval::new(1, 100000), + )?; + + let mut next_parent = origin; + let _start = Instant::now(); + for _i in 0..5000 { + let child = Hash::random(); + inquirer::add_block( + reachability_store.write().deref_mut(), + child, + next_parent, + &mut vec![].into_iter(), + )?; + next_parent = child; + } + // println!("add 50000 blocks duration: {:?}", start.elapsed()); + + println!( + "before hint, root reindex = {:?}, origin interval = {:?} ", + reachability_store.read().get_reindex_root()?, + reachability_store.read().get_interval(origin) + ); + inquirer::hint_virtual_selected_parent(&mut *reachability_store.write(), next_parent)?; + println!( + "after hint, root reindex = {:?}, origin interval = {:?} ", + reachability_store.read().get_reindex_root()?, + reachability_store.read().get_interval(origin) + ); + + // let start = Instant::now(); + // for _i in 0..200000 { + // let child = Hash::random(); + // inquirer::add_block( + // reachability_store.write().deref_mut(), + // child, + // next_parent, + // &mut vec![].into_iter(), + // )?; + // next_parent = child; + // } + // println!("add 50000 blocks again duration: {:?}", start.elapsed()); + + Ok(()) +} + #[test] fn test_reachability_algorithm() -> anyhow::Result<()> { let dag = BlockDAG::create_for_testing().unwrap(); @@ -699,7 +771,7 @@ fn test_reachability_algorithm() -> anyhow::Result<()> { print_reachability_data(reachability_store.read().deref(), &hashes); assert!( - dag.check_ancestor_of(origin, vec![child5])?, + dag.check_ancestor_of(origin, child5)?, "child 5 must be origin's child" ); @@ -710,7 +782,6 @@ fn add_and_print_with_ghostdata( number: BlockNumber, parent: Hash, parents: Vec, - origin: Hash, dag: &mut BlockDAG, ghostdata: GhostdagData, ) -> anyhow::Result { @@ -721,7 +792,7 @@ fn add_and_print_with_ghostdata( .with_number(number) .build(); let start = Instant::now(); - dag.commit_trusted_block(header.to_owned(), origin, Arc::new(ghostdata))?; + dag.commit_trusted_block(header.to_owned(), Arc::new(ghostdata))?; let duration = start.elapsed(); println!( "commit header: {:?}, number: {:?}, duration: {:?}", @@ -741,7 +812,6 @@ fn add_and_print_with_pruning_point( number: BlockNumber, parent: Hash, parents: Vec, - origin: Hash, pruning_point: Hash, dag: &mut BlockDAG, ) -> anyhow::Result { @@ -753,7 +823,7 @@ fn add_and_print_with_pruning_point( .with_pruning_point(pruning_point) .build(); let start = Instant::now(); - dag.commit(header.to_owned(), origin)?; + dag.commit(header.to_owned())?; let duration = start.elapsed(); println!( "commit header: {:?}, number: {:?}, duration: {:?}", @@ -773,10 +843,9 @@ fn add_and_print( number: BlockNumber, parent: Hash, parents: Vec, - origin: Hash, dag: &mut BlockDAG, ) -> anyhow::Result { - add_and_print_with_pruning_point(number, parent, parents, origin, Hash::zero(), dag) + add_and_print_with_pruning_point(number, parent, parents, Hash::zero(), dag) } #[test] @@ -795,32 +864,18 @@ fn test_dag_mergeset() -> anyhow::Result<()> { let mut parents_hash = vec![genesis.id()]; let mut parent_hash = genesis.id(); - let mut header = add_and_print( - 2, - parent_hash, - parents_hash, - genesis.parent_hash(), - &mut dag, - )? - .id(); - let red = add_and_print(3, header, vec![header], genesis.parent_hash(), &mut dag)?.id(); + let mut header = add_and_print(2, parent_hash, parents_hash, &mut dag)?.id(); + let red = add_and_print(3, header, vec![header], &mut dag)?.id(); parents_hash = vec![genesis.id()]; parent_hash = genesis.id(); - header = add_and_print( - 2, - parent_hash, - parents_hash, - genesis.parent_hash(), - &mut dag, - )? - .id(); - header = add_and_print(3, header, vec![header], genesis.parent_hash(), &mut dag)?.id(); - header = add_and_print(4, header, vec![header], genesis.parent_hash(), &mut dag)?.id(); + header = add_and_print(2, parent_hash, parents_hash, &mut dag)?.id(); + header = add_and_print(3, header, vec![header], &mut dag)?.id(); + header = add_and_print(4, header, vec![header], &mut dag)?.id(); let blue = header; - header = add_and_print(5, blue, vec![blue, red], genesis.parent_hash(), &mut dag)?.id(); + header = add_and_print(5, blue, vec![blue, red], &mut dag)?.id(); let ghostdata = dag.ghostdata(&[header, red])?; println!( @@ -832,6 +887,7 @@ fn test_dag_mergeset() -> anyhow::Result<()> { } #[test] +#[ignore = "this is the large amount of data testing for performance, dev only"] fn test_big_data_commit() -> anyhow::Result<()> { // initialzie the dag firstly let mut dag = BlockDAG::create_for_testing().unwrap(); @@ -841,43 +897,43 @@ fn test_big_data_commit() -> anyhow::Result<()> { dag.init_with_genesis(genesis.clone()).unwrap(); - let count = 20000; + let count = 200000; // one let mut parent = genesis.clone(); for i in 0..count { - let new = add_and_print( - i + 1, - parent.id(), - vec![parent.id()], - genesis.parent_hash(), - &mut dag, - )?; - parent = new; - } - let last_one = parent; - - // two - let mut parent = genesis.clone(); - for i in 0..count { - let new = add_and_print( - i + 1, - parent.id(), - vec![parent.id()], - genesis.parent_hash(), - &mut dag, - )?; + let new = add_and_print(i + 1, parent.id(), vec![parent.id()], &mut dag)?; + if i % 50000 == 0 { + inquirer::hint_virtual_selected_parent( + &mut *dag.storage.reachability_store.write(), + new.id(), + )?; + } parent = new; } - let last_two = parent; - - let _new = add_and_print( - count + 1, - last_one.id(), - vec![last_one.id(), last_two.id()], - genesis.parent_hash(), - &mut dag, - )?; + // let last_one = parent; + + // // two + // let mut parent = genesis.clone(); + // for i in 0..count { + // let new = add_and_print( + // i + 1, + // parent.id(), + // vec![parent.id()], + // genesis.parent_hash(), + // &mut dag, + // )?; + // parent = new; + // } + // let last_two = parent; + + // let _new = add_and_print( + // count + 1, + // last_one.id(), + // vec![last_one.id(), last_two.id()], + // genesis.parent_hash(), + // &mut dag, + // )?; anyhow::Result::Ok(()) } @@ -896,69 +952,25 @@ fn test_prune() -> anyhow::Result<()> { dag.init_with_genesis(genesis.clone()).unwrap(); - let block1 = add_and_print( - 1, - genesis.id(), - vec![genesis.id()], - genesis.parent_hash(), - &mut dag, - )?; + let block1 = add_and_print(1, genesis.id(), vec![genesis.id()], &mut dag)?; - let block_main_2 = add_and_print( - 2, - block1.id(), - vec![block1.id()], - genesis.parent_hash(), - &mut dag, - )?; - let block_main_3 = add_and_print( - 3, - block_main_2.id(), - vec![block_main_2.id()], - genesis.parent_hash(), - &mut dag, - )?; - let block_main_3_1 = add_and_print( - 3, - block_main_2.id(), - vec![block_main_2.id()], - genesis.parent_hash(), - &mut dag, - )?; + let block_main_2 = add_and_print(2, block1.id(), vec![block1.id()], &mut dag)?; + let block_main_3 = add_and_print(3, block_main_2.id(), vec![block_main_2.id()], &mut dag)?; + let block_main_3_1 = add_and_print(3, block_main_2.id(), vec![block_main_2.id()], &mut dag)?; let block_main_4 = add_and_print( 4, block_main_3.id(), vec![block_main_3.id(), block_main_3_1.id()], - genesis.parent_hash(), - &mut dag, - )?; - let block_main_5 = add_and_print( - 5, - block_main_4.id(), - vec![block_main_4.id()], - genesis.parent_hash(), &mut dag, )?; + let block_main_5 = add_and_print(5, block_main_4.id(), vec![block_main_4.id()], &mut dag)?; - let block_red_2 = add_and_print( - 2, - block1.id(), - vec![block1.id()], - genesis.parent_hash(), - &mut dag, - )?; - let block_red_2_1 = add_and_print( - 2, - block1.id(), - vec![block1.id()], - genesis.parent_hash(), - &mut dag, - )?; + let block_red_2 = add_and_print(2, block1.id(), vec![block1.id()], &mut dag)?; + let block_red_2_1 = add_and_print(2, block1.id(), vec![block1.id()], &mut dag)?; let block_red_3 = add_and_print( 3, block_red_2.id(), vec![block_red_2.id(), block_red_2_1.id()], - genesis.parent_hash(), &mut dag, )?; @@ -1024,22 +1036,9 @@ fn test_prune() -> anyhow::Result<()> { // test the pruning logic - let block_main_6 = add_and_print( - 6, - block_main_5.id(), - tips.clone(), - genesis.parent_hash(), - &mut dag, - )?; - let block_main_6_1 = - add_and_print(6, block_main_5.id(), tips, genesis.parent_hash(), &mut dag)?; - let block_fork = add_and_print( - 4, - block_red_3.id(), - vec![block_red_3.id()], - genesis.parent_hash(), - &mut dag, - )?; + let block_main_6 = add_and_print(6, block_main_5.id(), tips.clone(), &mut dag)?; + let block_main_6_1 = add_and_print(6, block_main_5.id(), tips, &mut dag)?; + let block_fork = add_and_print(4, block_red_3.id(), vec![block_red_3.id()], &mut dag)?; dag.save_dag_state( genesis.id(), @@ -1081,69 +1080,25 @@ fn test_verification_blue_block() -> anyhow::Result<()> { dag.init_with_genesis(genesis.clone()).unwrap(); - let block1 = add_and_print( - 1, - genesis.id(), - vec![genesis.id()], - genesis.parent_hash(), - &mut dag, - )?; + let block1 = add_and_print(1, genesis.id(), vec![genesis.id()], &mut dag)?; - let block_main_2 = add_and_print( - 2, - block1.id(), - vec![block1.id()], - genesis.parent_hash(), - &mut dag, - )?; - let block_main_3 = add_and_print( - 3, - block_main_2.id(), - vec![block_main_2.id()], - genesis.parent_hash(), - &mut dag, - )?; - let block_main_3_1 = add_and_print( - 3, - block_main_2.id(), - vec![block_main_2.id()], - genesis.parent_hash(), - &mut dag, - )?; + let block_main_2 = add_and_print(2, block1.id(), vec![block1.id()], &mut dag)?; + let block_main_3 = add_and_print(3, block_main_2.id(), vec![block_main_2.id()], &mut dag)?; + let block_main_3_1 = add_and_print(3, block_main_2.id(), vec![block_main_2.id()], &mut dag)?; let block_main_4 = add_and_print( 4, block_main_3.id(), vec![block_main_3.id(), block_main_3_1.id()], - genesis.parent_hash(), - &mut dag, - )?; - let block_main_5 = add_and_print( - 5, - block_main_4.id(), - vec![block_main_4.id()], - genesis.parent_hash(), &mut dag, )?; + let block_main_5 = add_and_print(5, block_main_4.id(), vec![block_main_4.id()], &mut dag)?; - let block_red_2 = add_and_print( - 2, - block1.id(), - vec![block1.id()], - genesis.parent_hash(), - &mut dag, - )?; - let block_red_2_1 = add_and_print( - 2, - block1.id(), - vec![block1.id()], - genesis.parent_hash(), - &mut dag, - )?; + let block_red_2 = add_and_print(2, block1.id(), vec![block1.id()], &mut dag)?; + let block_red_2_1 = add_and_print(2, block1.id(), vec![block1.id()], &mut dag)?; let block_red_3 = add_and_print( 3, block_red_2.id(), vec![block_red_2.id(), block_red_2_1.id()], - genesis.parent_hash(), &mut dag, )?; @@ -1215,7 +1170,6 @@ fn test_verification_blue_block() -> anyhow::Result<()> { 6, block_main_5.id(), vec![block_main_5.id(), block_red_3.id()], - genesis.parent_hash(), &mut dag, )?; assert_eq!( @@ -1240,25 +1194,12 @@ fn test_verification_blue_block() -> anyhow::Result<()> { 6, block_main_5.id(), vec![block_main_5.id(), block_red_3.id()], - genesis.parent_hash(), &mut dag, makeup_ghostdata.clone(), )?; - let block_from_normal = add_and_print( - 7, - normal_block.id(), - vec![normal_block.id()], - genesis.parent_hash(), - &mut dag, - )?; - let block_from_makeup = add_and_print( - 7, - makeup_block.id(), - vec![makeup_block.id()], - genesis.parent_hash(), - &mut dag, - )?; + let block_from_normal = add_and_print(7, normal_block.id(), vec![normal_block.id()], &mut dag)?; + let block_from_makeup = add_and_print(7, makeup_block.id(), vec![makeup_block.id()], &mut dag)?; let ghostdag_data_from_normal = dag .ghostdata_by_hash(block_from_normal.id())? @@ -1288,7 +1229,6 @@ fn test_verification_blue_block() -> anyhow::Result<()> { 8, together_mine.selected_parent, vec![block_from_normal.id(), block_from_makeup.id()], - genesis.parent_hash(), &mut dag, )?; let together_ghost_data = dag.storage.ghost_dag_store.get_data(mine_together.id())?; @@ -1300,7 +1240,6 @@ fn test_verification_blue_block() -> anyhow::Result<()> { 8, together_mine.selected_parent, vec![block_from_normal.id(), block_from_makeup.id()], - genesis.parent_hash(), &mut dag, )?; let together_ghost_data = dag.storage.ghost_dag_store.get_data(mine_together.id())?; diff --git a/storage/src/batch/mod.rs b/storage/src/batch/mod.rs index d07a2584b5..38dec1b754 100644 --- a/storage/src/batch/mod.rs +++ b/storage/src/batch/mod.rs @@ -56,3 +56,14 @@ where Ok(Self::new_with_rows(rows?)) } } + +#[derive(Debug, Default, Clone)] +pub struct WriteBatchData { + pub column: String, + pub row_data: WriteBatch, +} + +#[derive(Debug, Default, Clone)] +pub struct WriteBatchWithColumn { + pub data: Vec, +} diff --git a/storage/src/cache_storage/mod.rs b/storage/src/cache_storage/mod.rs index 6f6fc3e3ee..fea1d0ea61 100644 --- a/storage/src/cache_storage/mod.rs +++ b/storage/src/cache_storage/mod.rs @@ -1,7 +1,7 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use crate::batch::GWriteBatch; +use crate::batch::{GWriteBatch, WriteBatchWithColumn}; use crate::{ batch::WriteBatch, metrics::{record_metrics, StorageMetrics}, @@ -91,6 +91,32 @@ impl InnerStore for CacheStorage { }) } + fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()> { + let rows = batch + .data + .into_iter() + .flat_map(|data| { + data.row_data + .rows + .iter() + .cloned() + .map(|(k, v)| (compose_key(Some(&data.column), k), v)) + .collect::>() + }) + .collect(); + let batch = WriteBatch { rows }; + record_metrics( + "cache", + "write_batch_column_prefix", + "write_batch", + self.metrics.as_ref(), + ) + .call(|| { + self.write_batch_inner(batch); + Ok(()) + }) + } + fn get_len(&self) -> Result { Ok(self.cache.lock().len() as u64) } @@ -111,6 +137,10 @@ impl InnerStore for CacheStorage { self.write_batch(prefix_name, batch) } + fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()> { + self.write_batch_with_column(batch) + } + fn multi_get(&self, prefix_name: &str, keys: Vec>) -> Result>>> { let composed_keys = keys .into_iter() diff --git a/storage/src/db_storage/mod.rs b/storage/src/db_storage/mod.rs index 1bc73a930b..0bb0b42d05 100644 --- a/storage/src/db_storage/mod.rs +++ b/storage/src/db_storage/mod.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - batch::WriteBatch, + batch::{WriteBatch, WriteBatchWithColumn}, errors::StorageInitError, metrics::{record_metrics, StorageMetrics}, storage::{ColumnFamilyName, InnerStore, KeyCodec, RawDBStorage, ValueCodec, WriteOp}, @@ -10,8 +10,8 @@ use crate::{ }; use anyhow::{ensure, format_err, Error, Result}; use rocksdb::{ - DBIterator, DBPinnableSlice, IteratorMode, Options, ReadOptions, WriteBatch as DBWriteBatch, - WriteOptions, DB, + DBIterator, DBPinnableSlice, FlushOptions, IteratorMode, Options, ReadOptions, + WriteBatch as DBWriteBatch, WriteOptions, DB, }; use starcoin_config::{check_open_fds_limit, RocksdbConfig}; use std::{collections::HashSet, iter, marker::PhantomData, path::Path}; @@ -98,6 +98,7 @@ impl DBStorage { Self::open_inner(&rocksdb_opts, path, column_families.clone())? }; check_open_fds_limit(rocksdb_config.max_open_files as u64 + RES_FDS)?; + Ok(Self { db, cfs: column_families, @@ -414,6 +415,56 @@ impl InnerStore for DBStorage { }) } + fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()> { + let mut db_batch = DBWriteBatch::default(); + batch.data.into_iter().for_each(|data| { + let cf_handle = self.get_cf_handle(&data.column); + for (key, write_op) in data.row_data.rows { + match write_op { + WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value), + WriteOp::Deletion => db_batch.delete_cf(cf_handle, key), + }; + } + }); + record_metrics( + "db", + "write_batch_column", + "write_batch", + self.metrics.as_ref(), + ) + .call(|| { + self.db + .write_opt(db_batch, &Self::default_write_options())?; + let mut flush_options = FlushOptions::default(); + flush_options.set_wait(false); + self.db.flush_opt(&flush_options).unwrap(); + Ok(()) + }) + } + + fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()> { + let mut db_batch = DBWriteBatch::default(); + batch.data.into_iter().for_each(|data| { + let cf_handle = self.get_cf_handle(&data.column); + for (key, write_op) in data.row_data.rows { + match write_op { + WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value), + WriteOp::Deletion => db_batch.delete_cf(cf_handle, key), + }; + } + }); + record_metrics( + "db", + "write_batch_column", + "write_batch", + self.metrics.as_ref(), + ) + .call(|| { + self.db.write_opt(db_batch, &Self::sync_write_options())?; + Ok(()) + }) + } + fn get_len(&self) -> Result { unimplemented!() } @@ -493,4 +544,11 @@ impl RawDBStorage for DBStorage { self.db.write(batch)?; Ok(()) } + + fn raw_write_batch_sync(&self, batch: DBWriteBatch) -> Result<()> { + let mut opt = WriteOptions::default(); + opt.set_sync(true); + self.db.write_opt(batch, &opt)?; + Ok(()) + } } diff --git a/storage/src/storage.rs b/storage/src/storage.rs index ff2cf4b302..4c8c0743d3 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -3,6 +3,7 @@ pub use crate::batch::WriteBatch; use crate::{ + batch::WriteBatchWithColumn, cache_storage::CacheStorage, db_storage::{DBStorage, SchemaIterator}, upgrade::DBUpgrade, @@ -39,10 +40,12 @@ pub trait InnerStore: Send + Sync { fn contains_key(&self, prefix_name: &str, key: Vec) -> Result; fn remove(&self, prefix_name: &str, key: Vec) -> Result<()>; fn write_batch(&self, prefix_name: &str, batch: WriteBatch) -> Result<()>; + fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()>; fn get_len(&self) -> Result; fn keys(&self) -> Result>>; fn put_sync(&self, prefix_name: &str, key: Vec, value: Vec) -> Result<()>; fn write_batch_sync(&self, prefix_name: &str, batch: WriteBatch) -> Result<()>; + fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()>; fn multi_get(&self, prefix_name: &str, keys: Vec>) -> Result>>>; } @@ -54,6 +57,7 @@ pub trait RawDBStorage: Send + Sync { ) -> Result>; fn raw_write_batch(&self, batch: DBWriteBatch) -> Result<()>; + fn raw_write_batch_sync(&self, batch: DBWriteBatch) -> Result<()>; } ///Storage instance type define @@ -201,6 +205,18 @@ impl InnerStore for StorageInstance { }, } } + + fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()> { + match self { + Self::CACHE { cache } => cache.write_batch_with_column(batch), + Self::DB { db } => db.write_batch_with_column(batch), + Self::CacheAndDb { cache, db } => { + db.write_batch_with_column(batch.clone())?; + cache.write_batch_with_column(batch) + } + } + } + fn get_len(&self) -> Result { match self { Self::CACHE { cache } => cache.get_len(), @@ -240,6 +256,17 @@ impl InnerStore for StorageInstance { } } + fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()> { + match self { + Self::CACHE { cache } => cache.write_batch_with_column_sync(batch), + Self::DB { db } => db.write_batch_with_column_sync(batch), + Self::CacheAndDb { cache, db } => { + db.write_batch_with_column_sync(batch.clone())?; + cache.write_batch_with_column_sync(batch) + } + } + } + fn multi_get(&self, prefix_name: &str, keys: Vec>) -> Result>>> { match self { Self::CACHE { cache } => cache.multi_get(prefix_name, keys), diff --git a/storage/src/tests/test_storage.rs b/storage/src/tests/test_storage.rs index 750cba656b..2edb6e7005 100644 --- a/storage/src/tests/test_storage.rs +++ b/storage/src/tests/test_storage.rs @@ -3,27 +3,29 @@ extern crate chrono; +use crate::batch::{WriteBatch, WriteBatchData, WriteBatchWithColumn}; use crate::block::{ - FailedBlock, OldBlockHeaderStorage, OldBlockInnerStorage, OldFailedBlockStorage, + DagSyncBlock, FailedBlock, OldBlockHeaderStorage, OldBlockInnerStorage, OldFailedBlockStorage, OldFailedBlockV2, }; use crate::cache_storage::CacheStorage; use crate::db_storage::DBStorage; -use crate::storage::{CodecKVStore, InnerStore, StorageInstance, ValueCodec}; +use crate::storage::{CodecKVStore, InnerStore, StorageInstance, ValueCodec, WriteOp}; use crate::table_info::TableInfoStore; use crate::transaction::LegacyTransactionStorage; use crate::transaction_info::{BlockTransactionInfo, OldTransactionInfoStorage}; use crate::{ - BlockInfoStore, BlockStore, BlockTransactionInfoStore, Storage, - StorageVersion, /*TableInfoStore,*/ - DEFAULT_PREFIX_NAME, TRANSACTION_INFO_PREFIX_NAME, TRANSACTION_INFO_PREFIX_NAME_V2, + BlockInfoStore, BlockStore, BlockTransactionInfoStore, Storage, StorageVersion, + BLOCK_PREFIX_NAME, DAG_SYNC_BLOCK_PREFIX_NAME, DEFAULT_PREFIX_NAME, + TRANSACTION_INFO_PREFIX_NAME, TRANSACTION_INFO_PREFIX_NAME_V2, }; -use anyhow::Result; +use anyhow::{Ok, Result}; +use bcs_ext::BCSCodec; use starcoin_accumulator::accumulator_info::AccumulatorInfo; use starcoin_config::RocksdbConfig; use starcoin_crypto::HashValue; use starcoin_logger::prelude::info; -use starcoin_types::block::{Block, BlockBody, BlockHeader, BlockInfo}; +use starcoin_types::block::{Block, BlockBody, BlockHeader, BlockHeaderBuilder, BlockInfo}; use starcoin_types::startup_info::SnapshotRange; use starcoin_types::transaction::{RichTransactionInfo, SignedUserTransaction, TransactionInfo}; use starcoin_types::vm_error::KeptVMStatus; @@ -601,3 +603,219 @@ fn test_table_info_storage() -> Result<()> { assert_eq!(vals, vals2); Ok(()) } + +fn run_write_batch(instance: StorageInstance) -> Result<()> { + let body = BlockBody::new_empty(); + + let block1 = Block::new( + BlockHeaderBuilder::new() + .with_body_hash(body.hash()) + .with_number(1) + .build(), + body.clone(), + ); + let block2 = Block::new( + BlockHeaderBuilder::new() + .with_body_hash(body.hash()) + .with_number(2) + .build(), + body.clone(), + ); + + let dag_block1 = DagSyncBlock { + block: Block::new( + BlockHeaderBuilder::new() + .with_body_hash(body.hash()) + .with_number(3) + .build(), + body.clone(), + ), + children: vec![Block::random().id(), Block::random().id()], + }; + + let dag_block2 = DagSyncBlock { + block: Block::new( + BlockHeaderBuilder::new() + .with_body_hash(body.hash()) + .with_number(4) + .build(), + body.clone(), + ), + children: vec![Block::random().id(), Block::random().id()], + }; + + let batch_with_columns = WriteBatchWithColumn { + data: vec![ + WriteBatchData { + column: BLOCK_PREFIX_NAME.to_string(), + row_data: WriteBatch::new_with_rows(vec![ + ( + block1.id().encode()?, + WriteOp::Value(block1.clone().encode()?), + ), + ( + block2.id().encode()?, + WriteOp::Value(block2.clone().encode()?), + ), + ]), + }, + WriteBatchData { + column: DAG_SYNC_BLOCK_PREFIX_NAME.to_string(), + row_data: WriteBatch::new_with_rows(vec![ + ( + dag_block1.block.id().encode()?, + WriteOp::Value(dag_block1.clone().encode()?), + ), + ( + dag_block2.block.id().encode()?, + WriteOp::Value(dag_block2.clone().encode()?), + ), + ]), + }, + ], + }; + + instance.write_batch_with_column(batch_with_columns)?; + + match instance { + StorageInstance::CACHE { cache } => { + let read_block1 = Block::decode( + &cache + .get(BLOCK_PREFIX_NAME, block1.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block1, block1); + + let read_block2 = Block::decode( + &cache + .get(BLOCK_PREFIX_NAME, block2.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block2, block2); + + let read_dag_block1 = DagSyncBlock::decode( + &cache + .get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block1.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block1, dag_block1); + + let read_dag_block2 = DagSyncBlock::decode( + &cache + .get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block2.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block2, dag_block2); + } + StorageInstance::DB { db } => { + let read_block1 = Block::decode( + &db.get(BLOCK_PREFIX_NAME, block1.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block1, block1); + + let read_block2 = Block::decode( + &db.get(BLOCK_PREFIX_NAME, block2.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block2, block2); + + let read_dag_block1 = DagSyncBlock::decode( + &db.get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block1.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block1, dag_block1); + + let read_dag_block2 = DagSyncBlock::decode( + &db.get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block2.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block2, dag_block2); + } + StorageInstance::CacheAndDb { cache, db } => { + let read_block1 = Block::decode( + &cache + .get(BLOCK_PREFIX_NAME, block1.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block1, block1); + + let read_block2 = Block::decode( + &cache + .get(BLOCK_PREFIX_NAME, block2.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block2, block2); + + let read_dag_block1 = DagSyncBlock::decode( + &cache + .get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block1.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block1, dag_block1); + + let read_dag_block2 = DagSyncBlock::decode( + &cache + .get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block2.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block2, dag_block2); + + let read_block1 = Block::decode( + &db.get(BLOCK_PREFIX_NAME, block1.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block1, block1); + + let read_block2 = Block::decode( + &db.get(BLOCK_PREFIX_NAME, block2.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block2, block2); + + let read_dag_block1 = DagSyncBlock::decode( + &db.get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block1.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block1, dag_block1); + + let read_dag_block2 = DagSyncBlock::decode( + &db.get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block2.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block2, dag_block2); + } + } + + Ok(()) +} + +#[test] +fn test_batch_write_for_cache_and_db() -> Result<()> { + let tmpdir = starcoin_config::temp_dir(); + let instance = StorageInstance::new_cache_and_db_instance( + CacheStorage::new(None), + DBStorage::new(tmpdir.path(), RocksdbConfig::default(), None)?, + ); + + run_write_batch(instance) +} + +#[test] +fn test_batch_write_for_db() -> Result<()> { + let tmpdir = starcoin_config::temp_dir(); + let instance = StorageInstance::new_db_instance(DBStorage::new( + tmpdir.path(), + RocksdbConfig::default(), + None, + )?); + + run_write_batch(instance) +} + +#[test] +fn test_batch_write_for_cache() -> Result<()> { + let instance = StorageInstance::new_cache_instance(); + + run_write_batch(instance) +} diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index c9ab7e207b..4569bdba7b 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -403,7 +403,7 @@ where if descendant == start { continue; } - if self.main.dag().check_ancestor_of(descendant, vec![start])? { + if self.main.dag().check_ancestor_of(descendant, start)? { continue; }