Skip to content

Commit

Permalink
fix fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzhhuang committed Dec 12, 2024
1 parent 93e722d commit 6259f80
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 288 deletions.
14 changes: 3 additions & 11 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,19 +659,11 @@ impl BlockChain {
self.storage.save_block_info(block_info.clone())?;

self.storage.save_table_infos(txn_table_infos)?;
let genesis_header = self
.storage
.get_block_header_by_hash(self.genesis_hash)?
.ok_or_else(|| format_err!("failed to get genesis because it is none"))?;
let result = match verified_block.ghostdata {
Some(trusted_ghostdata) => self.dag.commit_trusted_block(
header.to_owned(),
genesis_header.parent_hash(),
Arc::new(trusted_ghostdata),
),
None => self
Some(trusted_ghostdata) => self
.dag
.commit(header.to_owned(), genesis_header.parent_hash()),
.commit_trusted_block(header.to_owned(), Arc::new(trusted_ghostdata)),
None => self.dag.commit(header.to_owned()),
};
match result {
anyhow::Result::Ok(_) => info!("finish to commit dag block: {:?}", block_id),
Expand Down
139 changes: 63 additions & 76 deletions flexidag/src/blockdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::consensusdb::consenses_state::{
};
use crate::consensusdb::prelude::{FlexiDagStorageConfig, StoreError};
use crate::consensusdb::schemadb::{
GhostdagStoreReader, ReachabilityStore, StagingReachabilityStore, REINDEX_ROOT_KEY,
GhostdagStoreReader, ReachabilityStore, StagingReachabilityStore,
};
use crate::consensusdb::{
prelude::FlexiDagStorage,
Expand All @@ -15,14 +15,13 @@ use crate::consensusdb::{
},
};
use crate::ghostdag::protocol::GhostdagManager;
use crate::process_key_already_error;
use crate::prune::pruning_point_manager::PruningPointManagerT;
use crate::{process_key_already_error, reachability};
use anyhow::{bail, ensure, Ok};
use rocksdb::WriteBatch;
use starcoin_config::temp_dir;
use starcoin_crypto::{HashValue as Hash, HashValue};
use starcoin_logger::prelude::{debug, info, warn};
use starcoin_storage::batch::WriteBatchWithColumn;
use starcoin_types::block::BlockHeader;
use starcoin_types::{
blockhash::{BlockHashes, KType},
Expand Down Expand Up @@ -118,7 +117,7 @@ impl BlockDAG {
.write()
.insert(origin, BlockHashes::new(vec![]))?;

self.commit(genesis, origin)?;
self.commit(genesis)?;
self.save_dag_state(
genesis_id,
DagState {
Expand Down Expand Up @@ -154,7 +153,6 @@ impl BlockDAG {
pub fn commit_trusted_block(
&mut self,
header: BlockHeader,
origin: HashValue,
trusted_ghostdata: Arc<GhostdagData>,
) -> anyhow::Result<()> {
info!(
Expand Down Expand Up @@ -212,94 +210,76 @@ impl BlockDAG {
}
};

// Create a DB batch writer
let mut batch = WriteBatch::default();

// lock the dag data to write in batch
// the cache will be written at the same time
// when the batch is written before flush to the disk and
// if the writing process abort the starcoin process will/should restart.
let mut stage = StagingReachabilityStore::new(
self.storage.db.clone(),
self.storage.reachability_store.upgradable_read(),
);

// Store ghostdata
process_key_already_error(
self.storage
.ghost_dag_store
.insert(header.id(), ghostdata.clone()),
)?;
process_key_already_error(self.storage.ghost_dag_store.insert_batch(
&mut batch,
header.id(),
ghostdata.clone(),
))
.expect("failed to ghostdata in batch");

// Update reachability store
debug!(
"start to update reachability data for block: {:?}, number: {:?}",
header.id(),
header.number()
);
let reachability_store = self.storage.reachability_store.clone();

let mut merge_set = ghostdata
.unordered_mergeset_without_selected_parent()
.filter(|hash| self.storage.reachability_store.read().has(*hash).unwrap())
.collect::<Vec<_>>()
.into_iter();
let add_block_result = {
let mut reachability_writer = reachability_store.write();
inquirer::add_block(
reachability_writer.deref_mut(),
header.id(),
ghostdata.selected_parent,
&mut merge_set,
)
};
match add_block_result {
Result::Ok(_) => (),
Err(reachability::ReachabilityError::DataInconsistency) => {
let _future_covering_set = reachability_store
.read()
.get_future_covering_set(header.id())?;
info!(
"the key {:?} was already processed, original error message: {:?}",
header.id(),
reachability::ReachabilityError::DataInconsistency
);
}
Err(reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg))) => {
if msg == *REINDEX_ROOT_KEY.to_string() {
info!(
"the key {:?} was already processed, original error message: {:?}",
header.id(),
reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(
REINDEX_ROOT_KEY.to_string()
))
);
info!("now set the reindex key to origin: {:?}", origin);
// self.storage.reachability_store.set_reindex_root(origin)?;
self.set_reindex_root(origin)?;
bail!(
"failed to add a block when committing, e: {:?}",
reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg))
);
} else {
bail!(
"failed to add a block when committing, e: {:?}",
reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg))
);
}
}
Err(reachability::ReachabilityError::StoreError(StoreError::InvalidInterval(_, _))) => {
self.set_reindex_root(origin)?;
bail!("failed to add a block when committing for invalid interval",);
}
Err(e) => {
bail!("failed to add a block when committing, e: {:?}", e);
}
}
process_key_already_error(
self.storage
.relations_store
.write()
.insert(header.id(), BlockHashes::new(parents)),
)?;
inquirer::add_block(
&mut stage,
header.id(),
ghostdata.selected_parent,
&mut merge_set,
)
.expect("failed to add reachability in stage batch");

process_key_already_error(self.storage.relations_store.write().insert_batch(
&mut batch,
header.id(),
BlockHashes::new(parents),
))
.expect("failed to insert relations in batch");

// Store header store
process_key_already_error(self.storage.header_store.insert(
header.id(),
Arc::new(header),
1,
))?;
))
.expect("failed to insert header in batch");

// the read lock will be updated to the write lock
// and then write the batch
// and then release the lock
stage
.commit(&mut batch)
.expect("failed to write the stage reachability in batch");

// write the data just one time
self.storage
.write_batch(batch)
.expect("failed to write dag data in batch");
Ok(())
}

pub fn commit(&mut self, header: BlockHeader, origin: HashValue) -> anyhow::Result<()> {
pub fn commit(&mut self, header: BlockHeader) -> anyhow::Result<()> {
info!(
"start to commit header: {:?}, number: {:?}",
header.id(),
Expand Down Expand Up @@ -329,7 +309,10 @@ impl BlockDAG {
// Create a DB batch writer
let mut batch = WriteBatch::default();

// lock the dag data to write in batch
// lock the dag data to write in batch, read lock.
// the cache will be written at the same time
// when the batch is written before flush to the disk and
// if the writing process abort the starcoin process will/should restart.
let mut stage = StagingReachabilityStore::new(
self.storage.db.clone(),
self.storage.reachability_store.upgradable_read(),
Expand Down Expand Up @@ -364,10 +347,6 @@ impl BlockDAG {
)
.expect("failed to add block in batch");

stage
.commit(&mut batch)
.expect("failed to write the stage reachability in batch");

process_key_already_error(self.storage.relations_store.write().insert_batch(
&mut batch,
header.id(),
Expand All @@ -384,6 +363,14 @@ impl BlockDAG {
))
.expect("failed to insert header in batch");

// the read lock will be updated to the write lock
// and then write the batch
// and then release the lock
stage
.commit(&mut batch)
.expect("failed to write the stage reachability in batch");

// write the data just one time
self.storage
.write_batch(batch)
.expect("failed to write dag data in batch");
Expand Down
7 changes: 2 additions & 5 deletions flexidag/src/consensusdb/consensus_reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use super::{
prelude::{BatchDbWriter, CachedDbAccess, CachedDbItem, DirectDbWriter, StoreError},
};
use starcoin_crypto::HashValue as Hash;
use starcoin_storage::{
batch::WriteBatchData,
storage::{InnerStore, RawDBStorage, WriteOp},
};
use starcoin_storage::storage::{InnerStore, RawDBStorage};

use crate::{
consensusdb::schema::{KeyCodec, ValueCodec},
Expand All @@ -15,7 +12,7 @@ use crate::{
};
use starcoin_types::blockhash::{self, BlockHashMap, BlockHashes};

use parking_lot::{RwLockUpgradableReadGuard, RwLockWriteGuard};
use parking_lot::RwLockUpgradableReadGuard;
use rocksdb::WriteBatch;
use std::{collections::hash_map::Entry::Vacant, sync::Arc};

Expand Down
2 changes: 1 addition & 1 deletion flexidag/src/consensusdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{
},
};
use parking_lot::RwLock;
use rocksdb::{FlushOptions, WriteBatch, DB};
use rocksdb::WriteBatch;
use starcoin_config::{RocksdbConfig, StorageConfig};
pub(crate) use starcoin_storage::db_storage::DBStorage;
use starcoin_storage::storage::RawDBStorage;
Expand Down
Loading

0 comments on commit 6259f80

Please sign in to comment.