Skip to content

Commit

Permalink
Merge pull request input-output-hk#1672 from input-output-hk/djo/1657…
Browse files Browse the repository at this point in the history
…/consistent_transactions_sort

Consistent transactions sort, use block number then transaction hash instead of row id
  • Loading branch information
Alenar committed May 13, 2024
2 parents 936ffe2 + 24d5c95 commit 94a103c
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 134 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.5.5"
version = "0.5.6"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand All @@ -13,6 +13,10 @@ repository = { workspace = true }
name = "cardano_transactions_import"
harness = false

[[bench]]
name = "cardano_transactions_get"
harness = false

[dependencies]
anyhow = "1.0.79"
async-trait = "0.1.77"
Expand Down
95 changes: 95 additions & 0 deletions mithril-aggregator/benches/cardano_transactions_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use sqlite::ConnectionThreadSafe;

use mithril_aggregator::{
database::repository::CardanoTransactionRepository, services::TransactionStore,
};
use mithril_common::{entities::CardanoTransaction, test_utils::TempDir};
use mithril_persistence::sqlite::ConnectionBuilder;

fn cardano_tx_db_connection(db_file_name: &str) -> ConnectionThreadSafe {
let db_path =
TempDir::create("aggregator_benches", "bench_get_transactions").join(db_file_name);

if db_path.exists() {
std::fs::remove_file(db_path.clone()).unwrap();
}

ConnectionBuilder::open_file(&db_path)
.with_migrations(
mithril_aggregator::database::cardano_transaction_migration::get_migrations(),
)
.build()
.unwrap()
}

fn generate_transactions(nb_transactions: usize) -> Vec<CardanoTransaction> {
// Note: we irrealistically generate transactions where each are on a different block.
// This is to trick the repository `get_transactions_in_range` method to read the expected number
// of transactions.
(0..nb_transactions)
.map(|i| {
CardanoTransaction::new(
format!("tx_hash-{}", i),
i as u64,
i as u64 * 100,
format!("block_hash-{}", i),
i as u64 + 1,
)
})
.collect()
}

async fn init_db(nb_transaction_in_db: usize) -> CardanoTransactionRepository {
println!("Generating a db with {nb_transaction_in_db} transactions, one per block ...");
let transactions = generate_transactions(nb_transaction_in_db);
let connection = Arc::new(cardano_tx_db_connection(&format!(
"cardano_tx-{nb_transaction_in_db}.db",
)));
let repository = CardanoTransactionRepository::new(connection);
repository.store_transactions(transactions).await.unwrap();

repository
}

fn run_bench(c: &mut Criterion, nb_transaction_in_db: usize) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let repository = runtime.block_on(async { init_db(nb_transaction_in_db).await });

let mut group = c.benchmark_group(format!(
"Get transactions - {nb_transaction_in_db} tx in db"
));
for max_block_number in [100, 10_000, 100_000, 1_000_000] {
group.bench_with_input(
BenchmarkId::from_parameter(format!(
"get_transactions_in_range(0..{max_block_number})"
)),
&max_block_number,
|b, &max_block_number| {
b.to_async(&runtime).iter(|| async {
let _transactions = repository
.get_transactions_in_range(0..max_block_number)
.await
.unwrap();
});
},
);
}
group.finish();
}

fn bench_get_transactions(c: &mut Criterion) {
// Two rounds of benchmarks: one with 1M transactions in the db, and one with 10M transactions.
// Each time the number of transactions to read is 100, 10_000, 100_000, 1_000_000.
run_bench(c, 1_000_000);
run_bench(c, 10_000_000);
}

criterion_group! {
name = benches;
config = Criterion::default().sample_size(20);
targets = bench_get_transactions
}
criterion_main!(benches);
14 changes: 14 additions & 0 deletions mithril-aggregator/src/database/cardano_transaction_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ create table block_range_root (
merkle_root text not null,
primary key (start, end)
);
"#,
),
// Migration 6
// Add composite index on `block_number/transaction_hash` column of `cardano_tx` table
// Truncate `block_range_root` table after changing the order of retrieval of the transactions
SqlMigration::new(
6,
r#"
create index block_number_transaction_hash_index on cardano_tx(block_number, transaction_hash);
-- remove all data from the block_range_root table since the order used to create them has changed
delete from block_range_root;
vacuum;
"#,
),
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use std::ops::Range;
use sqlite::Value;

use mithril_common::entities::{BlockNumber, BlockRange, TransactionHash};
#[cfg(test)]
use mithril_persistence::sqlite::GetAllCondition;
use mithril_persistence::sqlite::{
Provider, SourceAlias, SqLiteEntity, SqliteConnection, WhereCondition,
};

#[cfg(test)]
use mithril_persistence::sqlite::GetAllCondition;

use crate::database::record::CardanoTransactionRecord;

/// Simple queries to retrieve [CardanoTransaction] from the sqlite database.
Expand Down Expand Up @@ -87,7 +86,7 @@ impl<'client> Provider<'client> for GetCardanoTransactionProvider<'client> {
let aliases = SourceAlias::new(&[("{:cardano_tx:}", "cardano_tx")]);
let projection = Self::Entity::get_projection().expand(aliases);

format!("select {projection} from cardano_tx where {condition} order by rowid")
format!("select {projection} from cardano_tx where {condition} order by block_number, transaction_hash")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ use crate::database::provider::{
use crate::database::record::{BlockRangeRootRecord, CardanoTransactionRecord};
use crate::services::{TransactionStore, TransactionsRetriever};

#[cfg(test)]
use mithril_persistence::sqlite::GetAllProvider;

/// ## Cardano transaction repository
///
/// This is a business oriented layer to perform actions on the database through
Expand Down Expand Up @@ -177,12 +174,21 @@ impl CardanoTransactionRepository {
}
}

#[cfg(test)]
pub(crate) async fn get_all(&self) -> StdResult<Vec<CardanoTransaction>> {
let provider = GetCardanoTransactionProvider::new(&self.connection);
let records = provider.get_all()?;
/// Retrieve all the Block Range Roots in database up to the given end block number excluded.
pub async fn retrieve_block_range_roots_up_to(
&self,
end_block_number: BlockNumber,
) -> StdResult<Box<dyn Iterator<Item = (BlockRange, MKTreeNode)>>> {
let provider = GetBlockRangeRootProvider::new(&self.connection);
let filters = provider.get_up_to_block_number_condition(end_block_number);
let block_range_roots = provider.find(filters)?;
let iterator = block_range_roots
.into_iter()
.map(|record| -> (BlockRange, MKTreeNode) { record.into() })
.collect::<Vec<_>>() // TODO: remove this collect when we should ba able return the iterator directly
.into_iter();

Ok(records.map(|record| record.into()).collect())
Ok(Box::new(iterator))
}
}

Expand All @@ -195,6 +201,13 @@ pub mod test_extensions {
use super::*;

impl CardanoTransactionRepository {
pub async fn get_all(&self) -> StdResult<Vec<CardanoTransaction>> {
let provider = GetCardanoTransactionProvider::new(&self.connection);
let records = provider.get_all()?;

Ok(records.map(|record| record.into()).collect())
}

pub fn get_all_block_range_root(&self) -> StdResult<Vec<BlockRangeRootRecord>> {
let provider = GetBlockRangeRootProvider::new(&self.connection);
let records = provider.get_all()?;
Expand Down Expand Up @@ -233,14 +246,6 @@ impl TransactionStore for CardanoTransactionRepository {
}
}

async fn get_up_to(&self, beacon: ImmutableFileNumber) -> StdResult<Vec<CardanoTransaction>> {
self.get_transactions_up_to(beacon).await.map(|v| {
v.into_iter()
.map(|record| record.into())
.collect::<Vec<CardanoTransaction>>()
})
}

async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()> {
const DB_TRANSACTION_SIZE: usize = 100000;
for transactions_in_db_transaction_chunk in transactions.chunks(DB_TRANSACTION_SIZE) {
Expand Down Expand Up @@ -348,16 +353,8 @@ impl BlockRangeRootRetriever for CardanoTransactionRepository {
.get_highest_block_number_for_immutable_number(up_to_beacon)
.await?
.unwrap_or(0);
let provider = GetBlockRangeRootProvider::new(&self.connection);
let filters = provider.get_up_to_block_number_condition(block_number);
let block_range_roots = provider.find(filters)?;
let block_range_roots = block_range_roots
.into_iter()
.map(|record| -> (BlockRange, MKTreeNode) { record.into() })
.collect::<Vec<_>>() // TODO: remove this collect when we should ba able return the iterator directly
.into_iter();

Ok(Box::new(block_range_roots))
self.retrieve_block_range_roots_up_to(block_number).await
}
}

Expand Down Expand Up @@ -815,4 +812,73 @@ mod tests {
record
);
}

#[tokio::test]
async fn repository_retrieve_block_range_roots_up_to() {
let connection = Arc::new(cardano_tx_db_connection().unwrap());
let repository = CardanoTransactionRepository::new(connection);
let block_range_roots = vec![
(
BlockRange::from_block_number(15),
MKTreeNode::from_hex("AAAA").unwrap(),
),
(
BlockRange::from_block_number(30),
MKTreeNode::from_hex("BBBB").unwrap(),
),
(
BlockRange::from_block_number(45),
MKTreeNode::from_hex("CCCC").unwrap(),
),
];
repository
.store_block_range_roots(block_range_roots.clone())
.await
.unwrap();

// Retrieve with a block far higher than the highest block range - should return all
{
let retrieved_block_ranges = repository
.retrieve_block_range_roots_up_to(1000)
.await
.unwrap();
assert_eq!(
block_range_roots,
retrieved_block_ranges.collect::<Vec<_>>()
);
}
// Retrieve with a block bellow than the smallest block range - should return none
{
let retrieved_block_ranges = repository
.retrieve_block_range_roots_up_to(2)
.await
.unwrap();
assert_eq!(
Vec::<(BlockRange, MKTreeNode)>::new(),
retrieved_block_ranges.collect::<Vec<_>>()
);
}
// The given block is matched to the end (excluded) - should return the first of the three
{
let retrieved_block_ranges = repository
.retrieve_block_range_roots_up_to(45)
.await
.unwrap();
assert_eq!(
vec![block_range_roots[0].clone()],
retrieved_block_ranges.collect::<Vec<_>>()
);
}
// Right after the end of the second block range - should return first two of the three
{
let retrieved_block_ranges = repository
.retrieve_block_range_roots_up_to(46)
.await
.unwrap();
assert_eq!(
block_range_roots[0..=1].to_vec(),
retrieved_block_ranges.collect::<Vec<_>>()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ pub trait TransactionStore: Send + Sync {
/// Get the highest known transaction beacon
async fn get_highest_beacon(&self) -> StdResult<Option<ImmutableFileNumber>>;

/// Get stored transactions up to the given beacon
async fn get_up_to(
&self,
immutable_file_number: ImmutableFileNumber,
) -> StdResult<Vec<CardanoTransaction>>;

/// Store list of transactions
async fn store_transactions(&self, transactions: Vec<CardanoTransaction>) -> StdResult<()>;

Expand Down
2 changes: 1 addition & 1 deletion mithril-signer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-signer"
version = "0.2.133"
version = "0.2.134"
description = "A Mithril Signer"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
Loading

0 comments on commit 94a103c

Please sign in to comment.