From 35021a4d5ba2698fdd69c063767cf5787f7d179f Mon Sep 17 00:00:00 2001 From: Jakub Krawczyk Date: Tue, 19 Sep 2023 10:36:37 +0200 Subject: [PATCH 1/3] Infra for declarative handling of RocksDB key/value encoding/decoding + type safety. --- core-rust/state-manager/src/store/mod.rs | 1 + core-rust/state-manager/src/store/rocks_db.rs | 1346 +++++++++-------- .../state-manager/src/store/typed_cf_api.rs | 417 +++++ 3 files changed, 1163 insertions(+), 601 deletions(-) create mode 100644 core-rust/state-manager/src/store/typed_cf_api.rs diff --git a/core-rust/state-manager/src/store/mod.rs b/core-rust/state-manager/src/store/mod.rs index c26332601b..a23a974a99 100644 --- a/core-rust/state-manager/src/store/mod.rs +++ b/core-rust/state-manager/src/store/mod.rs @@ -66,6 +66,7 @@ mod db; mod in_memory; mod rocks_db; pub mod traits; +mod typed_cf_api; pub use db::{DatabaseBackendConfig, StateManagerDatabase}; pub use in_memory::InMemoryStore; diff --git a/core-rust/state-manager/src/store/rocks_db.rs b/core-rust/state-manager/src/store/rocks_db.rs index feeca9f83f..a34317defb 100644 --- a/core-rust/state-manager/src/store/rocks_db.rs +++ b/core-rust/state-manager/src/store/rocks_db.rs @@ -69,19 +69,18 @@ use crate::store::traits::*; use crate::{ CommittedTransactionIdentifiers, LedgerProof, LedgerTransactionReceipt, LocalTransactionExecution, LocalTransactionReceipt, ReceiptTreeHash, StateVersion, - TransactionTreeHash, VersionedCommittedTransactionIdentifiers, VersionedLedgerProof, - VersionedLedgerTransactionReceipt, VersionedLocalTransactionExecution, + TransactionTreeHash, VersionedCommittedTransactionIdentifiers, + VersionedCommittedTransactionIdentifiersVersion, VersionedLedgerProof, + VersionedLedgerProofVersion, VersionedLedgerTransactionReceipt, + VersionedLedgerTransactionReceiptVersion, VersionedLocalTransactionExecution, + VersionedLocalTransactionExecutionVersion, }; use node_common::utils::IsAccountExt; use radix_engine::types::*; -use radix_engine_interface::data::scrypto::ScryptoDecode; use radix_engine_stores::hash_tree::tree_store::{ - encode_key, NodeKey, ReadableTreeStore, TreeNode, VersionedTreeNode, -}; -use rocksdb::{ - ColumnFamily, ColumnFamilyDescriptor, DBIteratorWithThreadMode, Direction, IteratorMode, - Options, WriteBatch, DB, + encode_key, NodeKey, ReadableTreeStore, TreeNode, VersionedTreeNode, VersionedTreeNodeVersion, }; +use rocksdb::{ColumnFamilyDescriptor, Direction, Options, WriteBatch, DB}; use transaction::model::*; use radix_engine_store_interface::interface::*; @@ -92,6 +91,11 @@ use tracing::{error, info}; use crate::accumulator_tree::storage::{ReadableAccuTreeStore, TreeSlice}; use crate::query::TransactionIdentifierLoader; +use crate::store::traits::scenario::{ + ExecutedGenesisScenario, ExecutedGenesisScenarioStore, ScenarioSequenceNumber, + VersionedExecutedGenesisScenario, VersionedExecutedGenesisScenarioVersion, +}; +use crate::store::typed_cf_api::*; use crate::transaction::{ LedgerTransactionHash, RawLedgerTransaction, TypedTransactionIdentifiers, }; @@ -174,14 +178,9 @@ enum RocksDBColumnFamily { AccountChangeStateVersions, /// Additional details of "Scenarios" (and their transactions) executed as part of Genesis, /// keyed by their sequence number (i.e. their index in the list of Scenarios to execute). - /// Schema: `ScenarioSequenceNumber.to_be_byte()` -> `scrypto_encode(VersionedExecutedGenesisScenario)` + /// Schema: `ScenarioSequenceNumber.to_be_bytes()` -> `scrypto_encode(VersionedExecutedGenesisScenario)` ScenarioSequenceNumberToExecutedGenesisScenario, } - -use crate::store::traits::scenario::{ - ExecutedGenesisScenario, ExecutedGenesisScenarioStore, ScenarioSequenceNumber, - VersionedExecutedGenesisScenario, -}; use RocksDBColumnFamily::*; const ALL_COLUMN_FAMILIES: [RocksDBColumnFamily; 19] = [ @@ -212,7 +211,7 @@ impl fmt::Display for RocksDBColumnFamily { } } -#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Debug)] +#[derive(Eq, PartialEq, Hash, PartialOrd, Ord, Clone, Debug)] enum ExtensionsDataKey { AccountChangeIndexLastProcessedStateVersion, AccountChangeIndexEnabled, @@ -234,9 +233,226 @@ impl fmt::Display for ExtensionsDataKey { } } -pub struct RocksDBStore { +/// An entry-point to typed column family APIs ([`TypedCfApi`]s) of all tables used by Node. +struct NodeColumnFamilies { db: DB, +} + +impl NodeColumnFamilies { + /// Returns an API scoped at [`StateVersionToRawLedgerTransactionBytes`] column family. + pub fn raw_ledger_transactions(&self) -> impl TypedCfApi { + self.create_with_codecs( + StateVersionToRawLedgerTransactionBytes, + StateVersionDbCodec::default(), + RawLedgerTransactionDbCodec::default(), + ) + } + + /// Returns an API scoped at [`StateVersionToCommittedTransactionIdentifiers`] column family. + pub fn committed_transaction_identifiers( + &self, + ) -> impl TypedCfApi { + self.create_with_codecs( + StateVersionToCommittedTransactionIdentifiers, + StateVersionDbCodec::default(), + VersionedDbCodec::new( + SborDbCodec::::default(), + ), + ) + } + + /// Returns an API scoped at [`StateVersionToLedgerTransactionReceipt`] column family. + pub fn transaction_receipts(&self) -> impl TypedCfApi { + self.create_with_codecs( + StateVersionToLedgerTransactionReceipt, + StateVersionDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`StateVersionToLocalTransactionExecution`] column family. + pub fn local_transaction_executions( + &self, + ) -> impl TypedCfApi { + self.create_with_codecs( + StateVersionToLocalTransactionExecution, + StateVersionDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`StateVersionToLedgerProof`] column family. + pub fn ledger_proofs(&self) -> impl TypedCfApi { + self.create_with_codecs( + StateVersionToLedgerProof, + StateVersionDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`EpochToLedgerProof`] column family. + pub fn epoch_ledger_proofs(&self) -> impl TypedCfApi { + self.create_with_codecs( + EpochToLedgerProof, + EpochDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`IntentHashToStateVersion`] column family. + pub fn intent_hashes(&self) -> impl TypedCfApi { + self.create_with_codecs( + IntentHashToStateVersion, + HashDbCodec::default(), + StateVersionDbCodec::default(), + ) + } + + /// Returns an API scoped at [`NotarizedTransactionHashToStateVersion`] column family. + pub fn notarized_transaction_hashes( + &self, + ) -> impl TypedCfApi { + self.create_with_codecs( + NotarizedTransactionHashToStateVersion, + HashDbCodec::default(), + StateVersionDbCodec::default(), + ) + } + + /// Returns an API scoped at [`LedgerTransactionHashToStateVersion`] column family. + pub fn ledger_transaction_hashes( + &self, + ) -> impl TypedCfApi { + self.create_with_codecs( + LedgerTransactionHashToStateVersion, + HashDbCodec::default(), + StateVersionDbCodec::default(), + ) + } + + /// Returns an API scoped at [`Substates`] column family. + pub fn substates(&self) -> impl TypedCfApi { + self.create_with_codecs( + Substates, + SubstateKeyDbCodec::default(), + DirectDbCodec::default(), + ) + } + + /// Returns an API scoped at [`NodeIdToSubstateNodeAncestryRecord`] column family. + pub fn substate_node_ancestry_records( + &self, + ) -> impl TypedCfApi { + self.create_with_codecs( + NodeIdToSubstateNodeAncestryRecord, + NodeIdDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`VertexStore`] column family. + pub fn vertex_store(&self) -> impl TypedCfApi<(), VertexStoreBlob> { + self.create_with_codecs( + VertexStore, + PredefinedDbCodec::for_unit(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`NodeKeyToTreeNode`] column family. + pub fn hash_tree_nodes(&self) -> impl TypedCfApi { + self.create_with_codecs( + NodeKeyToTreeNode, + NodeKeyDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`StateVersionToStaleTreeParts`] column family. + pub fn stale_hash_tree_parts(&self) -> impl TypedCfApi { + self.create_with_codecs( + StateVersionToStaleTreeParts, + StateVersionDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`StateVersionToTransactionAccuTreeSlice`] column family. + pub fn transaction_accu_tree_slices( + &self, + ) -> impl TypedCfApi { + self.create_with_codecs( + StateVersionToTransactionAccuTreeSlice, + StateVersionDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`StateVersionToReceiptAccuTreeSlice`] column family. + pub fn receipt_accu_tree_slices(&self) -> impl TypedCfApi { + self.create_with_codecs( + StateVersionToReceiptAccuTreeSlice, + StateVersionDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Returns an API scoped at [`ExtensionsDataKeyToCustomValue`] column family. + pub fn extension_data(&self) -> impl TypedCfApi> { + self.create_with_codecs( + ExtensionsDataKeyToCustomValue, + PredefinedDbCodec::new_from_string_representations(vec![ + ExtensionsDataKey::AccountChangeIndexEnabled, + ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion, + ExtensionsDataKey::LocalTransactionExecutionIndexEnabled, + ]), + DirectDbCodec::default(), + ) + } + + /// Returns an API scoped at [`AccountChangeStateVersions`] column family. + pub fn account_change_state_versions( + &self, + ) -> impl TypedCfApi<(GlobalAddress, StateVersion), ()> { + self.create_with_codecs( + AccountChangeStateVersions, + PrefixGlobalAddressDbCodec::new(StateVersionDbCodec::default()), + PredefinedDbCodec::for_unit(), + ) + } + + /// Returns an API scoped at [`ScenarioSequenceNumberToExecutedGenesisScenario`] column family. + pub fn executed_genesis_scenarios( + &self, + ) -> impl TypedCfApi { + self.create_with_codecs( + ScenarioSequenceNumberToExecutedGenesisScenario, + ScenarioSequenceNumberDbCodec::default(), + VersionedDbCodec::new(SborDbCodec::::default()), + ) + } + + /// Commits the given batch. + // TODO(next potential refactor): this method only exists here since this structure hides the + // `DB` under higher-level APIs, while our batch-handling logic still uses the low-level + // `WriteBatch` directly. This can be refactored e.g. by introducing a "batch guard". + pub fn commit_batch(&self, batch: WriteBatch) { + self.db.write(batch).expect("committing database batch"); + } + + fn create_with_codecs<'db, K: 'db, KC: DbCodec + 'db, V: 'db, VC: DbCodec + 'db>( + &'db self, + cf: RocksDBColumnFamily, + key_codec: KC, + value_codec: VC, + ) -> impl TypedCfApi<'db, K, V> { + CodecBasedCfApi::new(&self.db, &cf.to_string(), key_codec, value_codec) + } +} + +pub struct RocksDBStore { config: DatabaseFlags, + cfs: NodeColumnFamilies, } impl RocksDBStore { @@ -249,13 +465,14 @@ impl RocksDBStore { db_opts.create_missing_column_families(true); let column_families: Vec = ALL_COLUMN_FAMILIES - .into_iter() + .iter() .map(|cf| ColumnFamilyDescriptor::new(cf.to_string(), Options::default())) .collect(); let db = DB::open_cf_descriptors(&db_opts, root.as_path(), column_families).unwrap(); - let mut rocks_db_store = RocksDBStore { db, config }; + let cfs = NodeColumnFamilies { db }; + let mut rocks_db_store = RocksDBStore { config, cfs }; let current_database_config = rocks_db_store.read_flags_state(); rocks_db_store.config.validate(¤t_database_config)?; @@ -298,132 +515,72 @@ impl RocksDBStore { /* For user transactions we only need to check for duplicate intent hashes to know that user payload hash and ledger payload hash are also unique. */ - let maybe_existing_state_version = self - .db - .get_cf(self.cf_handle(&IntentHashToStateVersion), intent_hash) - .unwrap(); - + let maybe_existing_state_version = self.cfs.intent_hashes().get(intent_hash); if let Some(existing_state_version) = maybe_existing_state_version { panic!( "Attempted to save intent hash {:?} which already exists at state version {:?}", - intent_hash, - StateVersion::from_bytes(existing_state_version) + intent_hash, existing_state_version ); } - batch.put_cf( - self.cf_handle(&IntentHashToStateVersion), - intent_hash, - state_version.to_bytes(), - ); - - batch.put_cf( - self.cf_handle(&NotarizedTransactionHashToStateVersion), + self.cfs + .intent_hashes() + .put_with_batch(batch, intent_hash, &state_version); + self.cfs.notarized_transaction_hashes().put_with_batch( + batch, notarized_transaction_hash, - state_version.to_bytes(), + &state_version, ); } else { let maybe_existing_state_version = self - .db - .get_cf( - self.cf_handle(&LedgerTransactionHashToStateVersion), - ledger_transaction_hash, - ) - .unwrap(); - + .cfs + .ledger_transaction_hashes() + .get(&ledger_transaction_hash); if let Some(existing_state_version) = maybe_existing_state_version { panic!( "Attempted to save ledger transaction hash {:?} which already exists at state version {:?}", ledger_transaction_hash, - StateVersion::from_bytes(existing_state_version) + existing_state_version ); } } - batch.put_cf( - self.cf_handle(&LedgerTransactionHashToStateVersion), - ledger_transaction_hash, - state_version.to_bytes(), - ); - - batch.put_cf( - self.cf_handle(&StateVersionToRawLedgerTransactionBytes), - state_version.to_bytes(), - &raw.0, - ); - - batch.put_cf( - self.cf_handle(&StateVersionToCommittedTransactionIdentifiers), - state_version.to_bytes(), - scrypto_encode(&VersionedCommittedTransactionIdentifiers::new_latest( - identifiers, - )) - .unwrap(), + self.cfs.ledger_transaction_hashes().put_with_batch( + batch, + &ledger_transaction_hash, + &state_version, ); - - batch.put_cf( - self.cf_handle(&StateVersionToLedgerTransactionReceipt), - state_version.to_bytes(), - scrypto_encode(&VersionedLedgerTransactionReceipt::new_latest( - receipt.on_ledger, - )) - .unwrap(), + self.cfs + .raw_ledger_transactions() + .put_with_batch(batch, &state_version, &raw); + self.cfs.committed_transaction_identifiers().put_with_batch( + batch, + &state_version, + &identifiers, ); + self.cfs + .transaction_receipts() + .put_with_batch(batch, &state_version, &receipt.on_ledger); if self.is_local_transaction_execution_index_enabled() { - batch.put_cf( - self.cf_handle(&StateVersionToLocalTransactionExecution), - state_version.to_bytes(), - scrypto_encode(&VersionedLocalTransactionExecution::new_latest( - receipt.local_execution, - )) - .unwrap(), + self.cfs.local_transaction_executions().put_with_batch( + batch, + &state_version, + &receipt.local_execution, ); } } - - fn cf_handle(&self, cf: &RocksDBColumnFamily) -> &ColumnFamily { - self.db.cf_handle(&cf.to_string()).unwrap() - } - - fn get_first(&self, cf: &RocksDBColumnFamily) -> Option { - self.db - .iterator_cf(self.cf_handle(cf), IteratorMode::Start) - .map(|res| res.unwrap()) - .next() - .map(|(_, value)| scrypto_decode(value.as_ref()).unwrap()) - } - - fn get_last(&self, cf: &RocksDBColumnFamily) -> Option { - self.db - .iterator_cf(self.cf_handle(cf), IteratorMode::End) - .map(|res| res.unwrap()) - .next() - .map(|(_, value)| scrypto_decode(value.as_ref()).unwrap()) - } - - fn get_by_key(&self, cf: &RocksDBColumnFamily, key: &[u8]) -> Option { - self.db - .get_pinned_cf(self.cf_handle(cf), key) - .unwrap() - .map(|pinnable_slice| scrypto_decode(pinnable_slice.as_ref()).unwrap()) - } } impl ConfigurableDatabase for RocksDBStore { fn read_flags_state(&self) -> DatabaseFlagsState { - let account_change_index_enabled = self.get_by_key::( - &ExtensionsDataKeyToCustomValue, - ExtensionsDataKey::AccountChangeIndexEnabled - .to_string() - .as_bytes(), - ); - let local_transaction_execution_index_enabled = self.get_by_key::( - &ExtensionsDataKeyToCustomValue, - ExtensionsDataKey::LocalTransactionExecutionIndexEnabled - .to_string() - .as_bytes(), - ); + let extension_data_cf = self.cfs.extension_data(); + let account_change_index_enabled = extension_data_cf + .get(&ExtensionsDataKey::AccountChangeIndexEnabled) + .map(|bytes| scrypto_decode::(&bytes).unwrap()); + let local_transaction_execution_index_enabled = extension_data_cf + .get(&ExtensionsDataKey::LocalTransactionExecutionIndexEnabled) + .map(|bytes| scrypto_decode::(&bytes).unwrap()); DatabaseFlagsState { account_change_index_enabled, local_transaction_execution_index_enabled, @@ -432,23 +589,18 @@ impl ConfigurableDatabase for RocksDBStore { fn write_flags(&mut self, database_config: &DatabaseFlags) { let mut batch = WriteBatch::default(); - batch.put_cf( - self.cf_handle(&ExtensionsDataKeyToCustomValue), - ExtensionsDataKey::AccountChangeIndexEnabled - .to_string() - .as_bytes(), - scrypto_encode(&database_config.enable_account_change_index).unwrap(), + let extension_data_cf = self.cfs.extension_data(); + extension_data_cf.put_with_batch( + &mut batch, + &ExtensionsDataKey::AccountChangeIndexEnabled, + &scrypto_encode(&database_config.enable_account_change_index).unwrap(), ); - batch.put_cf( - self.cf_handle(&ExtensionsDataKeyToCustomValue), - ExtensionsDataKey::LocalTransactionExecutionIndexEnabled - .to_string() - .as_bytes(), - scrypto_encode(&database_config.enable_local_transaction_execution_index).unwrap(), + extension_data_cf.put_with_batch( + &mut batch, + &ExtensionsDataKey::LocalTransactionExecutionIndexEnabled, + &scrypto_encode(&database_config.enable_local_transaction_execution_index).unwrap(), ); - self.db - .write(batch) - .expect("DB error writing database config"); + self.cfs.commit_batch(batch); } fn is_account_change_index_enabled(&self) -> bool { @@ -493,25 +645,20 @@ impl CommitStore for RocksDBStore { panic!("Commit request contains duplicate ledger transaction hashes"); } - let next_epoch_number = commit_ledger_header - .next_epoch - .as_ref() - .map(|next_epoch| next_epoch.epoch.number()); - let encoded_proof = - scrypto_encode(&VersionedLedgerProof::new_latest(commit_bundle.proof)).unwrap(); - batch.put_cf( - self.cf_handle(&StateVersionToLedgerProof), - commit_state_version.to_bytes(), - &encoded_proof, + self.cfs.ledger_proofs().put_with_batch( + &mut batch, + &commit_state_version, + &commit_bundle.proof, ); - if let Some(next_epoch_number) = next_epoch_number { - batch.put_cf( - self.cf_handle(&EpochToLedgerProof), - next_epoch_number.to_be_bytes(), - &encoded_proof, + if let Some(next_epoch) = &commit_ledger_header.next_epoch { + self.cfs.epoch_ledger_proofs().put_with_batch( + &mut batch, + &next_epoch.epoch, + &commit_bundle.proof, ); } + let substates_cf = self.cfs.substates(); for (node_key, node_updates) in commit_bundle.substate_store_update.updates.node_updates { for (partition_num, partition_updates) in node_updates.partition_updates { let partition_key = DbPartitionKey { @@ -521,13 +668,17 @@ impl CommitStore for RocksDBStore { match partition_updates { PartitionDatabaseUpdates::Delta { substate_updates } => { for (sort_key, update) in substate_updates { - let key_bytes = encode_to_rocksdb_bytes(&partition_key, &sort_key); + let substate_key = (partition_key.clone(), sort_key); match update { - DatabaseUpdate::Set(value_bytes) => { - batch.put_cf(self.cf_handle(&Substates), key_bytes, value_bytes) + DatabaseUpdate::Set(substate_value) => { + substates_cf.put_with_batch( + &mut batch, + &substate_key, + &substate_value, + ); } DatabaseUpdate::Delete => { - batch.delete_cf(self.cf_handle(&Substates), key_bytes) + substates_cf.delete_with_batch(&mut batch, &substate_key); } } } @@ -535,16 +686,16 @@ impl CommitStore for RocksDBStore { PartitionDatabaseUpdates::Reset { new_substate_values, } => { - batch.delete_range_cf( - self.cf_handle(&Substates), - encode_to_rocksdb_bytes(&partition_key, &DbSortKey(vec![])), - encode_to_rocksdb_bytes(&partition_key.next(), &DbSortKey(vec![])), + substates_cf.delete_range_with_batch( + &mut batch, + &(partition_key.clone(), DbSortKey(vec![])), + &(partition_key.next(), DbSortKey(vec![])), ); - for (sort_key, value_bytes) in new_substate_values { - batch.put_cf( - self.cf_handle(&Substates), - encode_to_rocksdb_bytes(&partition_key, &sort_key), - value_bytes, + for (sort_key, substate_value) in new_substate_values { + substates_cf.put_with_batch( + &mut batch, + &(partition_key.clone(), sort_key), + &substate_value, ); } } @@ -553,197 +704,142 @@ impl CommitStore for RocksDBStore { } if let Some(vertex_store) = commit_bundle.vertex_store { - batch.put_cf( - self.cf_handle(&VertexStore), - [], - scrypto_encode(&VersionedVertexStoreBlob::new_latest(vertex_store)).unwrap(), - ); + self.cfs + .vertex_store() + .put_with_batch(&mut batch, &(), &vertex_store); } let state_hash_tree_update = commit_bundle.state_tree_update; for (key, node) in state_hash_tree_update.new_nodes { - batch.put_cf( - self.cf_handle(&NodeKeyToTreeNode), - encode_key(&key), - scrypto_encode(&VersionedTreeNode::new_latest(node)).unwrap(), - ); + self.cfs + .hash_tree_nodes() + .put_with_batch(&mut batch, &key, &node); } for (version, stale_parts) in state_hash_tree_update.stale_tree_parts_at_state_version { - batch.put_cf( - self.cf_handle(&StateVersionToStaleTreeParts), - version.to_bytes(), - scrypto_encode(&VersionedStaleTreeParts::new_latest(stale_parts)).unwrap(), - ) + self.cfs + .stale_hash_tree_parts() + .put_with_batch(&mut batch, &version, &stale_parts); } for (node_ids, record) in commit_bundle.new_substate_node_ancestry_records { - let encoded_record = - scrypto_encode(&VersionedSubstateNodeAncestryRecord::new_latest(record)).unwrap(); for node_id in node_ids { - batch.put_cf( - self.cf_handle(&NodeIdToSubstateNodeAncestryRecord), - node_id.0, - &encoded_record, - ); + self.cfs + .substate_node_ancestry_records() + .put_with_batch(&mut batch, &node_id, &record); } } - batch.put_cf( - self.cf_handle(&StateVersionToTransactionAccuTreeSlice), - commit_state_version.to_bytes(), - scrypto_encode(&VersionedTransactionAccuTreeSlice::new_latest( - commit_bundle.transaction_tree_slice, - )) - .unwrap(), + self.cfs.transaction_accu_tree_slices().put_with_batch( + &mut batch, + &commit_state_version, + &commit_bundle.transaction_tree_slice, ); - batch.put_cf( - self.cf_handle(&StateVersionToReceiptAccuTreeSlice), - commit_state_version.to_bytes(), - scrypto_encode(&VersionedReceiptAccuTreeSlice::new_latest( - commit_bundle.receipt_tree_slice, - )) - .unwrap(), + self.cfs.receipt_accu_tree_slices().put_with_batch( + &mut batch, + &commit_state_version, + &commit_bundle.receipt_tree_slice, ); - self.db.write(batch).expect("Commit failed"); + self.cfs.commit_batch(batch); } } impl ExecutedGenesisScenarioStore for RocksDBStore { fn put_scenario(&mut self, number: ScenarioSequenceNumber, scenario: ExecutedGenesisScenario) { - self.db - .put_cf( - self.cf_handle(&ScenarioSequenceNumberToExecutedGenesisScenario), - number.to_be_bytes(), - scrypto_encode(&VersionedExecutedGenesisScenario::new_latest(scenario)).unwrap(), - ) - .expect("Executed scenario write failed"); + self.cfs + .executed_genesis_scenarios() + .put(&number, &scenario); } fn list_all_scenarios(&self) -> Vec<(ScenarioSequenceNumber, ExecutedGenesisScenario)> { - self.db - .iterator_cf( - self.cf_handle(&ScenarioSequenceNumberToExecutedGenesisScenario), - IteratorMode::Start, - ) - .map(|result| result.unwrap()) - .map(|kv| { - ( - u32::from_be_bytes(kv.0.as_ref().try_into().unwrap()), - scrypto_decode::(kv.1.as_ref()) - .unwrap() - .into_latest(), - ) - }) + self.cfs + .executed_genesis_scenarios() + .iterate(Direction::Forward) .collect() } } -pub struct RocksDBCommittedTransactionBundleIterator<'a> { +pub struct RocksDBCommittedTransactionBundleIterator<'db> { state_version: StateVersion, - txns_iter: DBIteratorWithThreadMode<'a, DB>, - ledger_receipts_iter: DBIteratorWithThreadMode<'a, DB>, - local_executions_iter: DBIteratorWithThreadMode<'a, DB>, - identifiers_iter: DBIteratorWithThreadMode<'a, DB>, + txns_iter: Box + 'db>, + ledger_receipts_iter: Box + 'db>, + local_executions_iter: + Box + 'db>, + identifiers_iter: + Box + 'db>, } -impl<'a> RocksDBCommittedTransactionBundleIterator<'a> { - fn new(from_state_version: StateVersion, store: &'a RocksDBStore) -> Self { - let start_state_version_bytes = from_state_version.to_bytes(); +impl<'db> RocksDBCommittedTransactionBundleIterator<'db> { + fn new(from_state_version: StateVersion, store: &'db RocksDBStore) -> Self { Self { state_version: from_state_version, - txns_iter: store.db.iterator_cf( - store.cf_handle(&StateVersionToRawLedgerTransactionBytes), - IteratorMode::From(&start_state_version_bytes, Direction::Forward), - ), - ledger_receipts_iter: store.db.iterator_cf( - store.cf_handle(&StateVersionToLedgerTransactionReceipt), - IteratorMode::From(&start_state_version_bytes, Direction::Forward), - ), - local_executions_iter: store.db.iterator_cf( - store.cf_handle(&StateVersionToLocalTransactionExecution), - IteratorMode::From(&start_state_version_bytes, Direction::Forward), - ), - identifiers_iter: store.db.iterator_cf( - store.cf_handle(&StateVersionToCommittedTransactionIdentifiers), - IteratorMode::From(&start_state_version_bytes, Direction::Forward), - ), + txns_iter: store + .cfs + .raw_ledger_transactions() + .iterate_from(&from_state_version, Direction::Forward), + ledger_receipts_iter: store + .cfs + .transaction_receipts() + .iterate_from(&from_state_version, Direction::Forward), + local_executions_iter: store + .cfs + .local_transaction_executions() + .iterate_from(&from_state_version, Direction::Forward), + identifiers_iter: store + .cfs + .committed_transaction_identifiers() + .iterate_from(&from_state_version, Direction::Forward), } } } -impl Iterator for RocksDBCommittedTransactionBundleIterator<'_> { +impl<'db> Iterator for RocksDBCommittedTransactionBundleIterator<'db> { type Item = CommittedTransactionBundle; fn next(&mut self) -> Option { - match self.txns_iter.next() { - None => None, - Some(txn) => { - let txn_kv = txn.unwrap(); - - let ledger_receipt_kv = self - .ledger_receipts_iter - .next() - .expect("Missing ledger receipt") - .unwrap(); - let local_execution_kv = self - .local_executions_iter - .next() - .expect("Missing local transaction execution") - .unwrap(); - let identifiers_kv = self - .identifiers_iter - .next() - .expect("Missing txn hashes") - .unwrap(); - - let current_state_version = self.state_version; - for (other_key_description, other_key_bytes) in [ - ("transaction version", txn_kv.0), - ("ledger receipt version", ledger_receipt_kv.0), - ("local execution version", local_execution_kv.0), - ("identifiers version", identifiers_kv.0), - ] { - let other_row_version = StateVersion::from_bytes(other_key_bytes); - if other_row_version != current_state_version { - panic!("DB inconsistency! {other_key_description} ({other_row_version}) doesn't match expected state version ({current_state_version})"); - } - } - - let txn = RawLedgerTransaction(txn_kv.1.to_vec()); - let ledger_receipt = scrypto_decode::( - ledger_receipt_kv.1.as_ref(), - ) - .unwrap() - .into_latest(); - let local_execution = scrypto_decode::( - local_execution_kv.1.as_ref(), - ) - .unwrap() - .into_latest(); - let complete_receipt = LocalTransactionReceipt { - on_ledger: ledger_receipt, - local_execution, - }; - let identifiers = scrypto_decode::( - identifiers_kv.1.as_ref(), - ) - .unwrap() - .into_latest(); + let Some((txn_version, txn)) = self.txns_iter.next() else { + return None; + }; - self.state_version = self - .state_version - .next() - .expect("Invalid next state version!"); - - Some(CommittedTransactionBundle { - state_version: current_state_version, - raw: txn, - receipt: complete_receipt, - identifiers, - }) + let (ledger_receipt_version, ledger_receipt) = self + .ledger_receipts_iter + .next() + .expect("missing ledger receipt"); + let (local_execution_version, local_execution) = self + .local_executions_iter + .next() + .expect("missing local transaction execution"); + let (identifiers_version, identifiers) = self + .identifiers_iter + .next() + .expect("missing transaction identifiers"); + + let current_state_version = self.state_version; + for (other_row_description, other_row_version) in [ + ("transaction version", txn_version), + ("ledger receipt version", ledger_receipt_version), + ("local execution version", local_execution_version), + ("identifiers version", identifiers_version), + ] { + if other_row_version != current_state_version { + panic!("DB inconsistency! {other_row_description} ({other_row_version}) doesn't match expected state version ({current_state_version})"); } } + + self.state_version = self + .state_version + .next() + .expect("Invalid next state version!"); + + Some(CommittedTransactionBundle { + state_version: current_state_version, + raw: txn, + receipt: LocalTransactionReceipt { + on_ledger: ledger_receipt, + local_execution, + }, + identifiers, + }) } } @@ -768,52 +864,30 @@ impl QueryableTransactionStore for RocksDBStore { &self, state_version: StateVersion, ) -> Option { - self.db - .get_cf( - self.cf_handle(&StateVersionToRawLedgerTransactionBytes), - state_version.to_bytes(), - ) - .expect("DB error loading transaction") - .map(RawLedgerTransaction) + self.cfs.raw_ledger_transactions().get(&state_version) } fn get_committed_transaction_identifiers( &self, state_version: StateVersion, ) -> Option { - self.db - .get_cf( - self.cf_handle(&StateVersionToCommittedTransactionIdentifiers), - state_version.to_bytes(), - ) - .expect("DB error loading identifiers") - .map(|v| { - scrypto_decode::(&v) - .expect("Failed to decode identifiers") - .into_latest() - }) + self.cfs + .committed_transaction_identifiers() + .get(&state_version) } fn get_committed_ledger_transaction_receipt( &self, state_version: StateVersion, ) -> Option { - self.get_by_key::( - &StateVersionToLedgerTransactionReceipt, - &state_version.to_bytes(), - ) - .map(|versioned| versioned.into_latest()) + self.cfs.transaction_receipts().get(&state_version) } fn get_committed_local_transaction_execution( &self, state_version: StateVersion, ) -> Option { - self.get_by_key::( - &StateVersionToLocalTransactionExecution, - &state_version.to_bytes(), - ) - .map(|versioned| versioned.into_latest()) + self.cfs.local_transaction_executions().get(&state_version) } fn get_committed_local_transaction_receipt( @@ -842,11 +916,11 @@ impl QueryableTransactionStore for RocksDBStore { } impl TransactionIndex<&IntentHash> for RocksDBStore { - fn get_txn_state_version_by_identifier(&self, identifier: &IntentHash) -> Option { - self.db - .get_cf(self.cf_handle(&IntentHashToStateVersion), identifier) - .expect("DB error reading state version for intent hash") - .map(StateVersion::from_bytes) + fn get_txn_state_version_by_identifier( + &self, + intent_hash: &IntentHash, + ) -> Option { + self.cfs.intent_hashes().get(intent_hash) } } @@ -855,13 +929,9 @@ impl TransactionIndex<&NotarizedTransactionHash> for RocksDBStore { &self, notarized_transaction_hash: &NotarizedTransactionHash, ) -> Option { - self.db - .get_cf( - self.cf_handle(&NotarizedTransactionHashToStateVersion), - notarized_transaction_hash, - ) - .expect("DB error reading state version for notarized transaction hash") - .map(StateVersion::from_bytes) + self.cfs + .notarized_transaction_hashes() + .get(notarized_transaction_hash) } } @@ -870,13 +940,9 @@ impl TransactionIndex<&LedgerTransactionHash> for RocksDBStore { &self, ledger_transaction_hash: &LedgerTransactionHash, ) -> Option { - self.db - .get_cf( - self.cf_handle(&LedgerTransactionHashToStateVersion), - ledger_transaction_hash, - ) - .expect("DB error reading state version for ledger transaction hash") - .map(StateVersion::from_bytes) + self.cfs + .ledger_transaction_hashes() + .get(ledger_transaction_hash) } } @@ -884,49 +950,7 @@ impl TransactionIdentifierLoader for RocksDBStore { fn get_top_transaction_identifiers( &self, ) -> Option<(StateVersion, CommittedTransactionIdentifiers)> { - self.db - .iterator_cf( - self.cf_handle(&StateVersionToCommittedTransactionIdentifiers), - IteratorMode::End, - ) - .map(|res| res.unwrap()) - .next() - .map(|(key, value)| { - ( - StateVersion::from_bytes(key), - scrypto_decode::(&value) - .expect("Failed to decode identifiers") - .into_latest(), - ) - }) - } -} - -pub struct RocksDBProofIterator<'a> { - proofs_iter: DBIteratorWithThreadMode<'a, DB>, -} - -impl<'a> RocksDBProofIterator<'a> { - fn new(from_state_version: StateVersion, store: &'a RocksDBStore) -> Self { - let start_state_version_bytes = from_state_version.to_bytes(); - Self { - proofs_iter: store.db.iterator_cf( - store.cf_handle(&StateVersionToLedgerProof), - IteratorMode::From(&start_state_version_bytes, Direction::Forward), - ), - } - } -} - -impl Iterator for RocksDBProofIterator<'_> { - type Item = LedgerProof; - - fn next(&mut self) -> Option { - self.proofs_iter.next().map(|proof| { - scrypto_decode::(proof.unwrap().1.as_ref()) - .unwrap() - .into_latest() - }) + self.cfs.committed_transaction_identifiers().get_last() } } @@ -935,20 +959,20 @@ impl IterableProofStore for RocksDBStore { &self, from_state_version: StateVersion, ) -> Box + '_> { - Box::new(RocksDBProofIterator::new(from_state_version, self)) + Box::new( + self.cfs + .ledger_proofs() + .iterate_from(&from_state_version, Direction::Forward) + .map(|(_, proof)| proof), + ) } } impl QueryableProofStore for RocksDBStore { fn max_state_version(&self) -> StateVersion { - self.db - .iterator_cf( - self.cf_handle(&StateVersionToRawLedgerTransactionBytes), - IteratorMode::End, - ) - .next() - .map(|res| res.unwrap()) - .map(|(key, _)| StateVersion::from_bytes(key)) + self.cfs + .raw_ledger_transactions() + .get_last_key() .unwrap_or(StateVersion::pre_genesis()) } @@ -962,21 +986,14 @@ impl QueryableProofStore for RocksDBStore { let mut latest_usable_proof: Option = None; let mut txns = Vec::new(); - let mut proofs_iter = self.db.iterator_cf( - self.cf_handle(&StateVersionToLedgerProof), - IteratorMode::From( - &start_state_version_inclusive.to_bytes(), - Direction::Forward, - ), - ); - - let mut txns_iter = self.db.iterator_cf( - self.cf_handle(&StateVersionToRawLedgerTransactionBytes), - IteratorMode::From( - &start_state_version_inclusive.to_bytes(), - Direction::Forward, - ), - ); + let mut proofs_iter = self + .cfs + .ledger_proofs() + .iterate_from(&start_state_version_inclusive, Direction::Forward); + let mut txns_iter = self + .cfs + .raw_ledger_transactions() + .iterate_from(&start_state_version_inclusive, Direction::Forward); 'proof_loop: while payload_size_so_far <= max_payload_size_in_bytes && txns.len() <= (max_number_of_txns_if_more_than_one_proof as usize) @@ -986,14 +1003,7 @@ impl QueryableProofStore for RocksDBStore { // If they don't - (sadly) ignore this proof's txns read so far and break the loop // If we're out of proofs (or some txns are missing): also break the loop match proofs_iter.next() { - Some(next_proof_result) => { - let next_proof_kv = next_proof_result.unwrap(); - let next_proof_state_version = StateVersion::from_bytes(next_proof_kv.0); - let next_proof = - scrypto_decode::(next_proof_kv.1.as_ref()) - .unwrap() - .into_latest(); - + Some((next_proof_state_version, next_proof)) => { let mut payload_size_including_next_proof_txns = payload_size_so_far; let mut next_proof_txns = Vec::new(); @@ -1010,15 +1020,9 @@ impl QueryableProofStore for RocksDBStore { <= (max_number_of_txns_if_more_than_one_proof as usize)) { match txns_iter.next() { - Some(next_txn_result) => { - let next_txn_kv = next_txn_result.unwrap(); - let next_txn_state_version = - StateVersion::from_bytes(next_txn_kv.0); - let next_txn_payload = next_txn_kv.1.to_vec(); - - payload_size_including_next_proof_txns += - next_txn_payload.len() as u32; - next_proof_txns.push(RawLedgerTransaction(next_txn_payload)); + Some((next_txn_state_version, next_txn)) => { + payload_size_including_next_proof_txns += next_txn.0.len() as u32; + next_proof_txns.push(next_txn); if next_txn_state_version == next_proof_state_version { // We've reached the last txn under next_proof @@ -1068,28 +1072,23 @@ impl QueryableProofStore for RocksDBStore { } fn get_first_proof(&self) -> Option { - self.get_first::(&StateVersionToLedgerProof) - .map(|versioned| versioned.into_latest()) + self.cfs.ledger_proofs().get_first_value() } fn get_post_genesis_epoch_proof(&self) -> Option { - self.get_first::(&EpochToLedgerProof) - .map(|versioned| versioned.into_latest()) + self.cfs.epoch_ledger_proofs().get_first_value() } fn get_epoch_proof(&self, epoch: Epoch) -> Option { - self.get_by_key::(&EpochToLedgerProof, &epoch.number().to_be_bytes()) - .map(|versioned| versioned.into_latest()) + self.cfs.epoch_ledger_proofs().get(&epoch) } fn get_last_proof(&self) -> Option { - self.get_last::(&StateVersionToLedgerProof) - .map(|versioned| versioned.into_latest()) + self.cfs.ledger_proofs().get_last_value() } fn get_last_epoch_proof(&self) -> Option { - self.get_last::(&EpochToLedgerProof) - .map(|versioned| versioned.into_latest()) + self.cfs.epoch_ledger_proofs().get_last_value() } } @@ -1099,10 +1098,9 @@ impl SubstateDatabase for RocksDBStore { partition_key: &DbPartitionKey, sort_key: &DbSortKey, ) -> Option { - let encoded_key_bytes = encode_to_rocksdb_bytes(partition_key, sort_key); - self.db - .get_cf(self.cf_handle(&Substates), encoded_key_bytes) - .unwrap() + self.cfs + .substates() + .get(&(partition_key.clone(), sort_key.clone())) } fn list_entries( @@ -1110,22 +1108,16 @@ impl SubstateDatabase for RocksDBStore { partition_key: &DbPartitionKey, ) -> Box + '_> { let partition_key = partition_key.clone(); - let start = encode_to_rocksdb_bytes(&partition_key, &DbSortKey(vec![])); - let iter = self - .db - .iterator_cf( - self.cf_handle(&Substates), - IteratorMode::From(&start, Direction::Forward), - ) - .map(|kv| { - let (k, v) = kv.unwrap(); - let (partition_key, sort_key) = decode_from_rocksdb_bytes(k.as_ref()); - ((partition_key, sort_key), v) - }) - .take_while(move |((next_partition_key, _), _)| next_partition_key.eq(&partition_key)) - .map(|((_, sort_key), value)| (sort_key, value.as_ref().to_vec())); - - Box::new(iter) + Box::new( + self.cfs + .substates() + .iterate_from( + &(partition_key.clone(), DbSortKey(vec![])), + Direction::Forward, + ) + .take_while(move |((next_key, _), _)| next_key == &partition_key) + .map(|((_, sort_key), value)| (sort_key, value)), + ) } } @@ -1134,29 +1126,15 @@ impl SubstateNodeAncestryStore for RocksDBStore { &self, node_ids: impl IntoIterator, ) -> Vec> { - self.db - .multi_get_cf(node_ids.into_iter().map(|node_id| { - ( - self.cf_handle(&NodeIdToSubstateNodeAncestryRecord), - node_id.0, - ) - })) - .into_iter() - .map(|result| { - result.unwrap().map(|bytes| { - scrypto_decode::(&bytes) - .unwrap() - .into_latest() - }) - }) - .collect() + self.cfs + .substate_node_ancestry_records() + .get_many(Vec::from_iter(node_ids)) } } impl ReadableTreeStore for RocksDBStore { fn get_node(&self, key: &NodeKey) -> Option { - self.get_by_key::(&NodeKeyToTreeNode, &encode_key(key)) - .map(|versioned| versioned.into_latest()) + self.cfs.hash_tree_nodes().get(key) } } @@ -1165,46 +1143,31 @@ impl ReadableAccuTreeStore for RocksDBStore { &self, state_version: &StateVersion, ) -> Option> { - self.get_by_key::( - &StateVersionToTransactionAccuTreeSlice, - &state_version.to_bytes(), - ) - .map(|versioned| versioned.into_latest().0) + self.cfs + .transaction_accu_tree_slices() + .get(state_version) + .map(|slice| slice.0) } } impl ReadableAccuTreeStore for RocksDBStore { fn get_tree_slice(&self, state_version: &StateVersion) -> Option> { - self.get_by_key::( - &StateVersionToReceiptAccuTreeSlice, - &state_version.to_bytes(), - ) - .map(|versioned| versioned.into_latest().0) + self.cfs + .receipt_accu_tree_slices() + .get(state_version) + .map(|slice| slice.0) } } impl WriteableVertexStore for RocksDBStore { fn save_vertex_store(&mut self, blob: VertexStoreBlob) { - self.db - .put_cf( - self.cf_handle(&VertexStore), - [], - scrypto_encode(&VersionedVertexStoreBlob::new_latest(blob)).unwrap(), - ) - .unwrap(); + self.cfs.vertex_store().put(&(), &blob) } } impl RecoverableVertexStore for RocksDBStore { fn get_vertex_store(&self) -> Option { - self.db - .get_cf(self.cf_handle(&VertexStore), []) - .unwrap() - .map(|bytes| { - scrypto_decode::(&bytes) - .unwrap() - .into_latest() - }) + self.cfs.vertex_store().get(&()) } } @@ -1248,13 +1211,13 @@ impl RocksDBStore { .global_balance_summary .global_balance_changes .keys() + .filter(|address| address.is_account()) { - if !address.is_account() { - continue; - } - let mut key = address.to_vec(); - key.extend(state_version.to_bytes()); - batch.put_cf(self.cf_handle(&AccountChangeStateVersions), key, []); + self.cfs.account_change_state_versions().put_with_batch( + batch, + &(*address, state_version), + &(), + ); } } @@ -1270,12 +1233,10 @@ impl RocksDBStore { &transaction_bundle.receipt.local_execution, ); - batch.put_cf( - self.cf_handle(&ExtensionsDataKeyToCustomValue), - ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion - .to_string() - .as_bytes(), - state_version.to_bytes(), + self.cfs.extension_data().put_with_batch( + batch, + &ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion, + &state_version.to_bytes().to_vec(), ); } @@ -1284,11 +1245,10 @@ impl RocksDBStore { start_state_version_inclusive: StateVersion, limit: u64, ) -> StateVersion { - let start_state_version_bytes = start_state_version_inclusive.to_bytes(); - let mut executions_iter = self.db.iterator_cf( - self.cf_handle(&StateVersionToLocalTransactionExecution), - IteratorMode::From(&start_state_version_bytes, Direction::Forward), - ); + let mut executions_iter = self + .cfs + .local_transaction_executions() + .iterate_from(&start_state_version_inclusive, Direction::Forward); let mut batch = WriteBatch::default(); @@ -1296,11 +1256,7 @@ impl RocksDBStore { let mut index = 0; while index < limit { match executions_iter.next() { - Some(next_execution) => { - let next_execution_kv = next_execution.unwrap(); - let next_execution_state_version = - StateVersion::from_bytes(next_execution_kv.0); - + Some((next_execution_state_version, next_execution)) => { let expected_state_version = start_state_version_inclusive .relative(index) .expect("Invalid relative state version!"); @@ -1308,18 +1264,11 @@ impl RocksDBStore { panic!("DB inconsistency! Missing local transaction execution at state version {expected_state_version}"); } last_state_version = expected_state_version; - - let next_execution = scrypto_decode::( - next_execution_kv.1.as_ref(), - ) - .unwrap() - .into_latest(); self.batch_update_account_change_index_from_receipt( &mut batch, last_state_version, &next_execution, ); - index += 1; } None => { @@ -1328,17 +1277,12 @@ impl RocksDBStore { } } - batch.put_cf( - self.cf_handle(&ExtensionsDataKeyToCustomValue), - ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion - .to_string() - .as_bytes(), - last_state_version.to_bytes(), + self.cfs.extension_data().put_with_batch( + &mut batch, + &ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion, + &last_state_version.to_bytes().to_vec(), ); - - self.db - .write(batch) - .expect("Account change index build failed"); + self.cfs.commit_batch(batch); last_state_version } @@ -1346,14 +1290,9 @@ impl RocksDBStore { impl AccountChangeIndexExtension for RocksDBStore { fn account_change_index_last_processed_state_version(&self) -> StateVersion { - self.db - .get_pinned_cf( - self.cf_handle(&ExtensionsDataKeyToCustomValue), - ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion - .to_string() - .as_bytes(), - ) - .unwrap() + self.cfs + .extension_data() + .get(&ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion) .map(StateVersion::from_bytes) .unwrap_or(StateVersion::pre_genesis()) } @@ -1387,61 +1326,266 @@ impl AccountChangeIndexExtension for RocksDBStore { } } -pub struct RocksDBAccountChangeIndexIterator<'a> { - account_bytes: Vec, - account_change_iter: DBIteratorWithThreadMode<'a, DB>, +impl IterableAccountChangeIndex for RocksDBStore { + fn get_state_versions_for_account_iter( + &self, + account: GlobalAddress, + from_state_version: StateVersion, + ) -> Box + '_> { + Box::new( + self.cfs + .account_change_state_versions() + .iterate_from(&(account, from_state_version), Direction::Forward) + .take_while(move |((next_account, _), _)| next_account == &account) + .map(|((_, state_version), _)| state_version), + ) + } } -impl<'a> RocksDBAccountChangeIndexIterator<'a> { - fn new( - from_state_version: StateVersion, - account: GlobalAddress, - store: &'a RocksDBStore, - ) -> Self { - let mut key = account.to_vec(); - key.extend(from_state_version.to_bytes()); +// Concrete DB-level codecs of keys/values: + +#[derive(Clone, Default)] +struct StateVersionDbCodec {} + +impl DbCodec for StateVersionDbCodec { + fn encode(&self, value: &StateVersion) -> Vec { + value.to_bytes().to_vec() + } + + fn decode(&self, bytes: &[u8]) -> StateVersion { + StateVersion::from_bytes(bytes) + } +} + +#[derive(Clone, Default)] +struct EpochDbCodec {} + +impl DbCodec for EpochDbCodec { + fn encode(&self, value: &Epoch) -> Vec { + value.number().to_be_bytes().to_vec() + } + + fn decode(&self, bytes: &[u8]) -> Epoch { + Epoch::of(u64::from_be_bytes(copy_u8_array(bytes))) + } +} + +#[derive(Clone, Default)] +struct ScenarioSequenceNumberDbCodec {} + +impl DbCodec for ScenarioSequenceNumberDbCodec { + fn encode(&self, value: &ScenarioSequenceNumber) -> Vec { + value.to_be_bytes().to_vec() + } + + fn decode(&self, bytes: &[u8]) -> ScenarioSequenceNumber { + ScenarioSequenceNumber::from_be_bytes(copy_u8_array(bytes)) + } +} + +#[derive(Clone, Default)] +struct RawLedgerTransactionDbCodec {} + +impl DbCodec for RawLedgerTransactionDbCodec { + fn encode(&self, value: &RawLedgerTransaction) -> Vec { + value.0.to_vec() + } + + fn decode(&self, bytes: &[u8]) -> RawLedgerTransaction { + RawLedgerTransaction(bytes.to_vec()) + } +} + +struct HashDbCodec { + type_parameters_phantom: PhantomData, +} + +impl Default for HashDbCodec { + fn default() -> Self { Self { - account_bytes: account.to_vec(), - account_change_iter: store.db.iterator_cf( - store.cf_handle(&AccountChangeStateVersions), - IteratorMode::From(&key, Direction::Forward), - ), + type_parameters_phantom: PhantomData, } } } -impl Iterator for RocksDBAccountChangeIndexIterator<'_> { - type Item = StateVersion; - - fn next(&mut self) -> Option { - match self.account_change_iter.next() { - Some(entry) => { - let (key, _value) = entry.unwrap(); - let (address_bytes, state_version_bytes) = - key.split_at(key.len() - StateVersion::BYTE_LEN); - let state_version = StateVersion::from_bytes(state_version_bytes); - if address_bytes != self.account_bytes { - None - } else { - Some(state_version) - } - } - None => None, +impl Clone for HashDbCodec { + fn clone(&self) -> Self { + Self::default() + } +} + +impl DbCodec for HashDbCodec { + fn encode(&self, value: &T) -> Vec { + value.as_slice().to_vec() + } + + fn decode(&self, bytes: &[u8]) -> T { + T::from_bytes(copy_u8_array(bytes)) + } +} + +#[derive(Clone, Default)] +struct SubstateKeyDbCodec {} + +impl DbCodec for SubstateKeyDbCodec { + fn encode(&self, value: &DbSubstateKey) -> Vec { + let (partition_key, sort_key) = value; + encode_to_rocksdb_bytes(partition_key, sort_key) + } + + fn decode(&self, bytes: &[u8]) -> DbSubstateKey { + decode_from_rocksdb_bytes(bytes) + } +} + +#[derive(Clone, Default)] +struct NodeKeyDbCodec {} + +impl DbCodec for NodeKeyDbCodec { + fn encode(&self, value: &NodeKey) -> Vec { + encode_key(value) + } + + fn decode(&self, _bytes: &[u8]) -> NodeKey { + unimplemented!("no use-case for decoding hash tree's `NodeKey`s exists yet") + } +} + +struct PrefixGlobalAddressDbCodec> { + suffix_codec: SC, + type_parameters_phantom: PhantomData, +} + +impl> PrefixGlobalAddressDbCodec { + pub fn new(suffix_codec: SC) -> Self { + Self { + suffix_codec, + type_parameters_phantom: PhantomData, } } } -impl IterableAccountChangeIndex for RocksDBStore { - fn get_state_versions_for_account_iter( - &self, - account: GlobalAddress, - from_state_version: StateVersion, - ) -> Box + '_> { - Box::new(RocksDBAccountChangeIndexIterator::new( - from_state_version, - account, - self, - )) +impl> Clone for PrefixGlobalAddressDbCodec { + fn clone(&self) -> Self { + Self::new(self.suffix_codec.clone()) + } +} + +impl> DbCodec<(GlobalAddress, S)> for PrefixGlobalAddressDbCodec { + fn encode(&self, (global_address, suffix): &(GlobalAddress, S)) -> Vec { + let mut encoding = global_address.to_vec(); + encoding.extend_from_slice(self.suffix_codec.encode(suffix).as_slice()); + encoding + } + + fn decode(&self, bytes: &[u8]) -> (GlobalAddress, S) { + let global_address = GlobalAddress::new_or_panic(copy_u8_array(&bytes[..NodeId::LENGTH])); + let suffix = self.suffix_codec.decode(&bytes[NodeId::LENGTH..]); + (global_address, suffix) + } +} + +#[derive(Clone, Default)] +struct NodeIdDbCodec {} + +impl DbCodec for NodeIdDbCodec { + fn encode(&self, value: &NodeId) -> Vec { + value.0.to_vec() + } + + fn decode(&self, bytes: &[u8]) -> NodeId { + NodeId(copy_u8_array(bytes)) + } +} + +// TODO(requires Engine changes): move the `IsConcreteVersion` to the versioning macros on the +// Engine's side, in order to get rid of the following ~100 lines. + +impl IsConcreteVersion for SubstateNodeAncestryRecord { + type Versioned = VersionedSubstateNodeAncestryRecord; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for LedgerTransactionReceipt { + type Versioned = VersionedLedgerTransactionReceipt; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for CommittedTransactionIdentifiers { + type Versioned = VersionedCommittedTransactionIdentifiers; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for LocalTransactionExecution { + type Versioned = VersionedLocalTransactionExecution; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for ExecutedGenesisScenario { + type Versioned = VersionedExecutedGenesisScenario; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for LedgerProof { + type Versioned = VersionedLedgerProof; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for ReceiptAccuTreeSlice { + type Versioned = VersionedReceiptAccuTreeSlice; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for TransactionAccuTreeSlice { + type Versioned = VersionedTransactionAccuTreeSlice; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for TreeNode { + type Versioned = VersionedTreeNode; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for StaleTreeParts { + type Versioned = VersionedStaleTreeParts; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() + } +} + +impl IsConcreteVersion for VertexStoreBlob { + type Versioned = VersionedVertexStoreBlob; + + fn clone_into_versioned(&self) -> Self::Versioned { + self.clone().into_versioned() } } diff --git a/core-rust/state-manager/src/store/typed_cf_api.rs b/core-rust/state-manager/src/store/typed_cf_api.rs new file mode 100644 index 0000000000..49db3eeb1b --- /dev/null +++ b/core-rust/state-manager/src/store/typed_cf_api.rs @@ -0,0 +1,417 @@ +/* Copyright 2021 Radix Publishing Ltd incorporated in Jersey (Channel Islands). + * + * Licensed under the Radix License, Version 1.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at: + * + * radixfoundation.org/licenses/LICENSE-v1 + * + * The Licensor hereby grants permission for the Canonical version of the Work to be + * published, distributed and used under or by reference to the Licensor’s trademark + * Radix ® and use of any unregistered trade names, logos or get-up. + * + * The Licensor provides the Work (and each Contributor provides its Contributions) on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, + * including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, + * MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. + * + * Whilst the Work is capable of being deployed, used and adopted (instantiated) to create + * a distributed ledger it is your responsibility to test and validate the code, together + * with all logic and performance of that code under all foreseeable scenarios. + * + * The Licensor does not make or purport to make and hereby excludes liability for all + * and any representation, warranty or undertaking in any form whatsoever, whether express + * or implied, to any entity or person, including any representation, warranty or + * undertaking, as to the functionality security use, value or other characteristics of + * any distributed ledger nor in respect the functioning or value of any tokens which may + * be created stored or transferred using the Work. The Licensor does not warrant that the + * Work or any use of the Work complies with any law or regulation in any territory where + * it may be implemented or used or that it will be appropriate for any specific purpose. + * + * Neither the licensor nor any current or former employees, officers, directors, partners, + * trustees, representatives, agents, advisors, contractors, or volunteers of the Licensor + * shall be liable for any direct or indirect, special, incidental, consequential or other + * losses of any kind, in tort, contract or otherwise (including but not limited to loss + * of revenue, income or profits, or loss of use or data, or loss of reputation, or loss + * of any economic or other opportunity of whatsoever nature or howsoever arising), arising + * out of or in connection with (without limitation of any use, misuse, of any ledger system + * or use made or its functionality or any performance or operation of any code or protocol + * caused by bugs or programming or logic errors or otherwise); + * + * A. any offer, purchase, holding, use, sale, exchange or transmission of any + * cryptographic keys, tokens or assets created, exchanged, stored or arising from any + * interaction with the Work; + * + * B. any failure in a transmission or loss of any token or assets keys or other digital + * artefacts due to errors in transmission; + * + * C. bugs, hacks, logic errors or faults in the Work or any communication; + * + * D. system software or apparatus including but not limited to losses caused by errors + * in holding or transmitting tokens by any third-party; + * + * E. breaches or failure of security including hacker attacks, loss or disclosure of + * password, loss of private key, unauthorised use or misuse of such passwords or keys; + * + * F. any losses including loss of anticipated savings or other benefits resulting from + * use of the Work or any changes to the Work (however implemented). + * + * You are solely responsible for; testing, validating and evaluation of all operation + * logic, functionality, security and appropriateness of using the Work for any commercial + * or non-commercial purpose and for any reproduction or redistribution by You of the + * Work. You assume all risks associated with Your use of the Work and the exercise of + * permissions under this License. + */ + +use radix_engine::types::*; +use rocksdb::{ColumnFamily, Direction, IteratorMode, WriteBatch, DB}; + +/// A higher-level database access API scoped at a specific column family. +pub trait TypedCfApi<'db, K, V> { + /// Gets value by key. + fn get(&self, key: &K) -> Option; + + /// Gets multiple values by keys. + /// The order of returned values (or [`None`]s) matches the order of requested keys. + fn get_many(&self, keys: Vec<&K>) -> Vec> { + keys.into_iter().map(|key| self.get(key)).collect() + } + + /// Gets the entry of the least key _(according to the database's ordering)_. + fn get_first(&self) -> Option<(K, V)> { + self.iterate(Direction::Forward).next() + } + + /// Gets the least key _(according to the database's ordering)_. + fn get_first_key(&self) -> Option { + self.get_first().map(|(key, _)| key) + } + + /// Gets the value associated with the least key _(according to the database's ordering)_. + fn get_first_value(&self) -> Option { + self.get_first().map(|(_, value)| value) + } + + /// Gets the entry of the greatest key _(according to the database's ordering)_. + fn get_last(&self) -> Option<(K, V)> { + self.iterate(Direction::Reverse).next() + } + + /// Gets the greatest key _(according to the database's ordering)_. + fn get_last_key(&self) -> Option { + self.get_last().map(|(key, _)| key) + } + + /// Gets the value associated with the greatest key _(according to the database's ordering)_. + fn get_last_value(&self) -> Option { + self.get_last().map(|(_, value)| value) + } + + /// Returns an iterator traversing over (potentially) all the entries, in the requested + /// direction. + fn iterate(&self, direction: Direction) -> Box + 'db>; + + /// Returns an iterator starting at the given key (inclusive) and traversing over (potentially) + /// all the entries remaining in the requested direction. + fn iterate_from( + &self, + from: &K, + direction: Direction, + ) -> Box + 'db>; + + /// Upserts the new value at the given key. + fn put(&self, key: &K, value: &V); + + /// Adds the "upsert at key" operation to the given batch. + fn put_with_batch(&self, batch: &mut WriteBatch, key: &K, value: &V); + + /// Adds the "delete by key" operation to the given batch. + fn delete_with_batch(&self, batch: &mut WriteBatch, key: &K); + + /// Adds the "delete by key range" operation to the given batch. + /// Follows the classic convention of "from inclusive, to exclusive". + fn delete_range_with_batch(&self, batch: &mut WriteBatch, from_key: &K, to_key: &K); +} + +/// An encoder/decoder of a typed value. +/// +/// Design note: +/// There are reasons why this is a service-like business object (rather than requiring something +/// like `trait DbEncodable` to be implemented by types stored in the database): +/// - codecs are composable (e.g. `VersioningCodec::new(SborCodec::::new())`); +/// - the same type may have different encodings (e.g. when used for a key vs for a value). +pub trait DbCodec: Clone { + /// Encodes the value into bytes. + fn encode(&self, value: &T) -> Vec; + /// Decodes the bytes into value. + fn decode(&self, bytes: &[u8]) -> T; +} + +/// A [`DB`]-backed implementation of [`TypedCfApi`] using configured key and value codecs. +pub struct CodecBasedCfApi<'db, K, KC: DbCodec + 'db, V, VC: DbCodec + 'db> { + db: &'db DB, + cf: &'db ColumnFamily, + key_codec: KC, + value_codec: VC, + type_parameters_phantom: PhantomData<(K, V)>, +} + +impl<'db, K, KC: DbCodec + 'db, V, VC: DbCodec + 'db> CodecBasedCfApi<'db, K, KC, V, VC> { + /// Creates an instance for the given column family. + pub fn new(db: &'db DB, cf_name: &str, key_codec: KC, value_codec: VC) -> Self { + Self { + db, + cf: db.cf_handle(cf_name).unwrap(), + key_codec, + value_codec, + type_parameters_phantom: PhantomData, + } + } + + /// Returns an iterator based on the [`IteratorMode`] (which already contains encoded key). + /// + /// This is an internal shared implementation detail for different iteration flavors. + /// Implementation note: the key and value codecs are cloned, so that the iterator does not + /// have to reference this instance of [`CodecBasedCfApi`] (for borrow-checker's reasons). This + /// clone typically uses 0 bytes, though (in practice, iterators are stateless and have no + /// dependencies). + fn iterate_with_mode(&self, mode: IteratorMode) -> Box + 'db> { + let key_codec = self.key_codec.clone(); + let value_codec = self.value_codec.clone(); + Box::new(self.db.iterator_cf(self.cf, mode).map(move |result| { + let (key, value) = result.expect("starting iteration"); + ( + key_codec.decode(key.as_ref()), + value_codec.decode(value.as_ref()), + ) + })) + } +} + +impl<'db, K, KC: DbCodec + 'db, V, VC: DbCodec + 'db> TypedCfApi<'db, K, V> + for CodecBasedCfApi<'db, K, KC, V, VC> +{ + fn get(&self, key: &K) -> Option { + self.db + .get_pinned_cf(self.cf, self.key_codec.encode(key).as_slice()) + .expect("database get by key") + .map(|pinnable_slice| self.value_codec.decode(pinnable_slice.as_ref())) + } + + fn get_many(&self, keys: Vec<&K>) -> Vec> { + self.db + .multi_get_cf( + keys.into_iter() + .map(|key| (self.cf, self.key_codec.encode(key))), + ) + .into_iter() + .map(|result| { + result + .expect("multi get") + .map(|bytes| self.value_codec.decode(&bytes)) + }) + .collect() + } + + fn iterate(&self, direction: Direction) -> Box + 'db> { + self.iterate_with_mode(match direction { + Direction::Forward => IteratorMode::Start, + Direction::Reverse => IteratorMode::End, + }) + } + + fn iterate_from( + &self, + from: &K, + direction: Direction, + ) -> Box + 'db> { + self.iterate_with_mode(IteratorMode::From( + self.key_codec.encode(from).as_slice(), + direction, + )) + } + + fn put(&self, key: &K, value: &V) { + self.db + .put_cf( + self.cf, + self.key_codec.encode(key), + self.value_codec.encode(value), + ) + .expect("database put"); + } + + fn put_with_batch(&self, batch: &mut WriteBatch, key: &K, value: &V) { + batch.put_cf( + self.cf, + self.key_codec.encode(key), + self.value_codec.encode(value), + ); + } + + fn delete_with_batch(&self, batch: &mut WriteBatch, key: &K) { + batch.delete_cf(self.cf, self.key_codec.encode(key)); + } + + fn delete_range_with_batch(&self, batch: &mut WriteBatch, from_key: &K, to_key: &K) { + batch.delete_range_cf( + self.cf, + self.key_codec.encode(from_key), + self.key_codec.encode(to_key), + ); + } +} + +/// A trait for a type representing a specific version of some versioned type. +pub trait IsConcreteVersion { + /// The type of versioned wrapper. + type Versioned; + + /// Creates a versioned wrapper containing a copy of this instance. + fn clone_into_versioned(&self) -> Self::Versioned; +} + +/// A reusable versioning decorator for [`DbCodec`]s. +pub struct VersionedDbCodec< + T: IsConcreteVersion, + U: DbCodec, + VT: HasLatestVersion, +> { + underlying: U, + type_parameters_phantom: PhantomData, +} + +impl, U: DbCodec, VT: HasLatestVersion> + VersionedDbCodec +{ + /// Applies versioning for the given codec. + pub fn new(underlying: U) -> Self { + Self { + underlying, + type_parameters_phantom: PhantomData, + } + } +} + +impl, U: DbCodec, VT: HasLatestVersion> Clone + for VersionedDbCodec +{ + fn clone(&self) -> Self { + Self { + underlying: self.underlying.clone(), + type_parameters_phantom: PhantomData, + } + } +} + +impl, U: DbCodec, VT: HasLatestVersion> + DbCodec for VersionedDbCodec +{ + fn encode(&self, value: &T) -> Vec { + let versioned = value.clone_into_versioned(); + self.underlying.encode(&versioned) + } + + fn decode(&self, bytes: &[u8]) -> T { + let versioned = self.underlying.decode(bytes); + versioned.into_latest() + } +} + +/// A [`DbCodec]` for SBOR types. +pub struct SborDbCodec { + type_parameters_phantom: PhantomData, +} + +impl Default for SborDbCodec { + fn default() -> Self { + Self { + type_parameters_phantom: PhantomData, + } + } +} + +impl Clone for SborDbCodec { + fn clone(&self) -> Self { + Self::default() + } +} + +impl DbCodec for SborDbCodec { + fn encode(&self, value: &T) -> Vec { + scrypto_encode(value).unwrap() + } + + fn decode(&self, bytes: &[u8]) -> T { + scrypto_decode(bytes).unwrap() + } +} + +/// A [`DbCodec]` for byte arrays (`Vec`) that are supposed to be stored directly. +#[derive(Clone, Default)] +pub struct DirectDbCodec {} + +impl DbCodec> for DirectDbCodec { + fn encode(&self, value: &Vec) -> Vec { + value.clone() + } + + fn decode(&self, bytes: &[u8]) -> Vec { + bytes.to_vec() + } +} + +/// A [`DbCodec]` based on a predefined set of mappings. +#[derive(Clone, Default)] +pub struct PredefinedDbCodec { + encoding: NonIterMap>, + decoding: NonIterMap, T>, +} + +impl PredefinedDbCodec<()> { + /// Creates an instance capable of representing only a unit `()` (as an empty array). + /// This is useful e.g. for "single-row" column families (which do not need keys), or "key-only" + /// column families (which do not need values). + pub fn for_unit() -> Self { + Self::new(vec![((), vec![])]) + } +} + +impl PredefinedDbCodec { + /// Creates an instance from the given `(value, encoding)` mapping pairs. + pub fn new(mappings: impl IntoIterator)>) -> Self { + let mut encoding = NonIterMap::new(); + let mut decoding = NonIterMap::new(); + for (value, bytes) in mappings { + encoding.insert(value.clone(), bytes.clone()); + decoding.insert(bytes, value); + } + Self { encoding, decoding } + } +} + +impl PredefinedDbCodec { + /// Creates an instance mapping between the given values and their [`ToString`] representations. + pub fn new_from_string_representations(values: impl IntoIterator) -> Self { + Self::new( + values + .into_iter() + .map(|value| (value.clone(), value.to_string().into_bytes())), + ) + } +} + +impl DbCodec for PredefinedDbCodec { + fn encode(&self, value: &T) -> Vec { + self.encoding + .get(value) + .expect("value outside mappings") + .clone() + } + + fn decode(&self, bytes: &[u8]) -> T { + self.decoding + .get(bytes) + .expect("encoding outside mappings") + .clone() + } +} From f7cd0900ea430d1e5e570f7bb0763aa44873453a Mon Sep 17 00:00:00 2001 From: Jakub Krawczyk Date: Tue, 19 Sep 2023 14:22:31 +0200 Subject: [PATCH 2/3] Using existing versioning-macro-generated traits instead of implementing own. --- core-rust/state-manager/src/store/rocks_db.rs | 102 +----------------- .../state-manager/src/store/typed_cf_api.rs | 25 ++--- 2 files changed, 10 insertions(+), 117 deletions(-) diff --git a/core-rust/state-manager/src/store/rocks_db.rs b/core-rust/state-manager/src/store/rocks_db.rs index a34317defb..d18c0b77b5 100644 --- a/core-rust/state-manager/src/store/rocks_db.rs +++ b/core-rust/state-manager/src/store/rocks_db.rs @@ -69,16 +69,13 @@ use crate::store::traits::*; use crate::{ CommittedTransactionIdentifiers, LedgerProof, LedgerTransactionReceipt, LocalTransactionExecution, LocalTransactionReceipt, ReceiptTreeHash, StateVersion, - TransactionTreeHash, VersionedCommittedTransactionIdentifiers, - VersionedCommittedTransactionIdentifiersVersion, VersionedLedgerProof, - VersionedLedgerProofVersion, VersionedLedgerTransactionReceipt, - VersionedLedgerTransactionReceiptVersion, VersionedLocalTransactionExecution, - VersionedLocalTransactionExecutionVersion, + TransactionTreeHash, VersionedCommittedTransactionIdentifiers, VersionedLedgerProof, + VersionedLedgerTransactionReceipt, VersionedLocalTransactionExecution, }; use node_common::utils::IsAccountExt; use radix_engine::types::*; use radix_engine_stores::hash_tree::tree_store::{ - encode_key, NodeKey, ReadableTreeStore, TreeNode, VersionedTreeNode, VersionedTreeNodeVersion, + encode_key, NodeKey, ReadableTreeStore, TreeNode, VersionedTreeNode, }; use rocksdb::{ColumnFamilyDescriptor, Direction, Options, WriteBatch, DB}; use transaction::model::*; @@ -93,7 +90,7 @@ use crate::accumulator_tree::storage::{ReadableAccuTreeStore, TreeSlice}; use crate::query::TransactionIdentifierLoader; use crate::store::traits::scenario::{ ExecutedGenesisScenario, ExecutedGenesisScenarioStore, ScenarioSequenceNumber, - VersionedExecutedGenesisScenario, VersionedExecutedGenesisScenarioVersion, + VersionedExecutedGenesisScenario, }; use crate::store::typed_cf_api::*; use crate::transaction::{ @@ -1498,97 +1495,6 @@ impl DbCodec for NodeIdDbCodec { } } -// TODO(requires Engine changes): move the `IsConcreteVersion` to the versioning macros on the -// Engine's side, in order to get rid of the following ~100 lines. - -impl IsConcreteVersion for SubstateNodeAncestryRecord { - type Versioned = VersionedSubstateNodeAncestryRecord; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for LedgerTransactionReceipt { - type Versioned = VersionedLedgerTransactionReceipt; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for CommittedTransactionIdentifiers { - type Versioned = VersionedCommittedTransactionIdentifiers; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for LocalTransactionExecution { - type Versioned = VersionedLocalTransactionExecution; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for ExecutedGenesisScenario { - type Versioned = VersionedExecutedGenesisScenario; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for LedgerProof { - type Versioned = VersionedLedgerProof; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for ReceiptAccuTreeSlice { - type Versioned = VersionedReceiptAccuTreeSlice; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for TransactionAccuTreeSlice { - type Versioned = VersionedTransactionAccuTreeSlice; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for TreeNode { - type Versioned = VersionedTreeNode; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for StaleTreeParts { - type Versioned = VersionedStaleTreeParts; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - -impl IsConcreteVersion for VertexStoreBlob { - type Versioned = VersionedVertexStoreBlob; - - fn clone_into_versioned(&self) -> Self::Versioned { - self.clone().into_versioned() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/core-rust/state-manager/src/store/typed_cf_api.rs b/core-rust/state-manager/src/store/typed_cf_api.rs index 49db3eeb1b..cb45649acb 100644 --- a/core-rust/state-manager/src/store/typed_cf_api.rs +++ b/core-rust/state-manager/src/store/typed_cf_api.rs @@ -261,26 +261,13 @@ impl<'db, K, KC: DbCodec + 'db, V, VC: DbCodec + 'db> TypedCfApi<'db, K, V } } -/// A trait for a type representing a specific version of some versioned type. -pub trait IsConcreteVersion { - /// The type of versioned wrapper. - type Versioned; - - /// Creates a versioned wrapper containing a copy of this instance. - fn clone_into_versioned(&self) -> Self::Versioned; -} - /// A reusable versioning decorator for [`DbCodec`]s. -pub struct VersionedDbCodec< - T: IsConcreteVersion, - U: DbCodec, - VT: HasLatestVersion, -> { +pub struct VersionedDbCodec + Clone, U: DbCodec, VT: HasLatestVersion> { underlying: U, type_parameters_phantom: PhantomData, } -impl, U: DbCodec, VT: HasLatestVersion> +impl + Clone, U: DbCodec, VT: HasLatestVersion> VersionedDbCodec { /// Applies versioning for the given codec. @@ -292,7 +279,7 @@ impl, U: DbCodec, VT: HasLatestVersion< } } -impl, U: DbCodec, VT: HasLatestVersion> Clone +impl + Clone, U: DbCodec, VT: HasLatestVersion> Clone for VersionedDbCodec { fn clone(&self) -> Self { @@ -303,11 +290,11 @@ impl, U: DbCodec, VT: HasLatestVersion< } } -impl, U: DbCodec, VT: HasLatestVersion> - DbCodec for VersionedDbCodec +impl + Clone, U: DbCodec, VT: HasLatestVersion> DbCodec + for VersionedDbCodec { fn encode(&self, value: &T) -> Vec { - let versioned = value.clone_into_versioned(); + let versioned = value.clone().into(); self.underlying.encode(&versioned) } From f068aebf591907820372e8a89489785daf9407fb Mon Sep 17 00:00:00 2001 From: Jakub Krawczyk Date: Tue, 19 Sep 2023 17:37:55 +0200 Subject: [PATCH 3/3] Minor renames. --- core-rust/state-manager/src/store/rocks_db.rs | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/core-rust/state-manager/src/store/rocks_db.rs b/core-rust/state-manager/src/store/rocks_db.rs index e8d12bc52b..a268c50336 100644 --- a/core-rust/state-manager/src/store/rocks_db.rs +++ b/core-rust/state-manager/src/store/rocks_db.rs @@ -395,7 +395,7 @@ impl NodeColumnFamilies { } /// Returns an API scoped at [`NodeKeyToTreeNode`] column family. - pub fn hash_tree_nodes(&self) -> impl TypedCfApi { + pub fn state_hash_tree_nodes(&self) -> impl TypedCfApi { self.create_with_codecs( NodeKeyToTreeNode, NodeKeyDbCodec::default(), @@ -404,7 +404,7 @@ impl NodeColumnFamilies { } /// Returns an API scoped at [`StateVersionToStaleTreeParts`] column family. - pub fn stale_hash_tree_parts(&self) -> impl TypedCfApi { + pub fn stale_state_hash_tree_parts(&self) -> impl TypedCfApi { self.create_with_codecs( StateVersionToStaleTreeParts, StateVersionDbCodec::default(), @@ -433,7 +433,7 @@ impl NodeColumnFamilies { } /// Returns an API scoped at [`ExtensionsDataKeyToCustomValue`] column family. - pub fn extension_data(&self) -> impl TypedCfApi> { + pub fn extensions_data(&self) -> impl TypedCfApi> { self.create_with_codecs( ExtensionsDataKeyToCustomValue, PredefinedDbCodec::new_from_string_representations(vec![ @@ -609,7 +609,7 @@ impl RocksDBStore { impl ConfigurableDatabase for RocksDBStore { fn read_flags_state(&self) -> DatabaseFlagsState { - let extension_data_cf = self.cfs.extension_data(); + let extension_data_cf = self.cfs.extensions_data(); let account_change_index_enabled = extension_data_cf .get(&ExtensionsDataKey::AccountChangeIndexEnabled) .map(|bytes| scrypto_decode::(&bytes).unwrap()); @@ -624,7 +624,7 @@ impl ConfigurableDatabase for RocksDBStore { fn write_flags(&mut self, database_config: &DatabaseFlags) { let mut batch = WriteBatch::default(); - let extension_data_cf = self.cfs.extension_data(); + let extension_data_cf = self.cfs.extensions_data(); extension_data_cf.put_with_batch( &mut batch, &ExtensionsDataKey::AccountChangeIndexEnabled, @@ -747,13 +747,15 @@ impl CommitStore for RocksDBStore { let state_hash_tree_update = commit_bundle.state_tree_update; for (key, node) in state_hash_tree_update.new_nodes { self.cfs - .hash_tree_nodes() + .state_hash_tree_nodes() .put_with_batch(&mut batch, &key, &node); } for (version, stale_parts) in state_hash_tree_update.stale_tree_parts_at_state_version { - self.cfs - .stale_hash_tree_parts() - .put_with_batch(&mut batch, &version, &stale_parts); + self.cfs.stale_state_hash_tree_parts().put_with_batch( + &mut batch, + &version, + &stale_parts, + ); } for (node_ids, record) in commit_bundle.new_substate_node_ancestry_records { @@ -1169,7 +1171,7 @@ impl SubstateNodeAncestryStore for RocksDBStore { impl ReadableTreeStore for RocksDBStore { fn get_node(&self, key: &NodeKey) -> Option { - self.cfs.hash_tree_nodes().get(key) + self.cfs.state_hash_tree_nodes().get(key) } } @@ -1268,7 +1270,7 @@ impl RocksDBStore { &transaction_bundle.receipt.local_execution, ); - self.cfs.extension_data().put_with_batch( + self.cfs.extensions_data().put_with_batch( batch, &ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion, &state_version.to_bytes().to_vec(), @@ -1312,7 +1314,7 @@ impl RocksDBStore { } } - self.cfs.extension_data().put_with_batch( + self.cfs.extensions_data().put_with_batch( &mut batch, &ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion, &last_state_version.to_bytes().to_vec(), @@ -1326,7 +1328,7 @@ impl RocksDBStore { impl AccountChangeIndexExtension for RocksDBStore { fn account_change_index_last_processed_state_version(&self) -> StateVersion { self.cfs - .extension_data() + .extensions_data() .get(&ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion) .map(StateVersion::from_bytes) .unwrap_or(StateVersion::pre_genesis())