Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unneeded rescan when importing transactions in chunk #2035

Merged
merged 6 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ As a minor extension, we have adopted a slightly different versioning convention

- Support for Prometheus metrics endpoint in aggregator

- Fix an issue that caused unnecessary re-scan of the Cardano chain when importing transactions.

- Crates versions:

| Crate | Version |
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.5.87"
version = "0.5.88"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
200 changes: 198 additions & 2 deletions mithril-aggregator/src/services/cardano_transactions_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use anyhow::Context;
use async_trait::async_trait;
use slog::{debug, Logger};
use tokio::{runtime::Handle, task};
use tokio::{runtime::Handle, sync::Mutex, task};

use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks};
use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory};
Expand Down Expand Up @@ -56,6 +56,7 @@ pub trait TransactionStore: Send + Sync {
pub struct CardanoTransactionsImporter {
block_scanner: Arc<dyn BlockScanner>,
transaction_store: Arc<dyn TransactionStore>,
latest_polled_chain_point: Arc<Mutex<Option<ChainPoint>>>,
logger: Logger,
}

Expand All @@ -69,12 +70,23 @@ impl CardanoTransactionsImporter {
Self {
block_scanner,
transaction_store,
latest_polled_chain_point: Arc::new(Mutex::new(None)),
logger: logger.new_with_component_name::<Self>(),
}
}

async fn start_chain_point(&self) -> StdResult<Option<ChainPoint>> {
let last_scanned_chain_point = self.latest_polled_chain_point.lock().await.clone();

if last_scanned_chain_point.is_none() {
self.transaction_store.get_highest_beacon().await
} else {
Ok(last_scanned_chain_point)
}
}

async fn import_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
let from = self.transaction_store.get_highest_beacon().await?;
let from = self.start_chain_point().await?;
self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon)
.await
}
Expand Down Expand Up @@ -120,6 +132,7 @@ impl CardanoTransactionsImporter {
}
}
}
*self.latest_polled_chain_point.lock().await = streamer.latest_polled_chain_point();

Ok(())
}
Expand Down Expand Up @@ -781,6 +794,189 @@ mod tests {
assert_eq!(cold_imported_transactions, warm_imported_transactions);
}

mod transactions_import_start_point {
use super::*;

async fn importer_with_highest_stored_transaction_and_last_polled_chain_point(
highest_stored_transaction: Option<CardanoTransaction>,
last_polled_chain_point: Option<ChainPoint>,
) -> CardanoTransactionsImporter {
let connection = cardano_tx_db_connection().unwrap();
let repository = Arc::new(CardanoTransactionRepository::new(Arc::new(
SqliteConnectionPool::build_from_connection(connection),
)));

if let Some(transaction) = highest_stored_transaction {
repository
.store_transactions(vec![transaction])
.await
.unwrap();
}

CardanoTransactionsImporter {
latest_polled_chain_point: Arc::new(Mutex::new(last_polled_chain_point)),
..CardanoTransactionsImporter::new_for_test(
Arc::new(DumbBlockScanner::new()),
repository,
)
}
}

#[tokio::test]
async fn cloning_keep_last_polled_chain_point() {
let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point(
None,
Some(ChainPoint::new(
SlotNumber(15),
BlockNumber(10),
"block_hash-1",
)),
)
.await;

let cloned_importer = importer.clone();
let start_point = cloned_importer.start_chain_point().await.unwrap();
assert_eq!(
Some(ChainPoint::new(
SlotNumber(15),
BlockNumber(10),
"block_hash-1"
)),
start_point
);
}

#[tokio::test]
async fn none_if_nothing_stored_nor_scanned() {
let importer =
importer_with_highest_stored_transaction_and_last_polled_chain_point(None, None)
.await;

let start_point = importer.start_chain_point().await.unwrap();
assert_eq!(None, start_point);
}

#[tokio::test]
async fn start_at_last_stored_chain_point_if_nothing_scanned() {
let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point(
Some(CardanoTransaction::new(
"tx_hash-2",
BlockNumber(20),
SlotNumber(25),
"block_hash-2",
)),
None,
)
.await;

let start_point = importer.start_chain_point().await.unwrap();
assert_eq!(
Some(ChainPoint::new(
SlotNumber(25),
BlockNumber(20),
"block_hash-2"
)),
start_point
);
}

#[tokio::test]
async fn start_at_last_scanned_chain_point_when_nothing_stored() {
let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point(
None,
Some(ChainPoint::new(
SlotNumber(15),
BlockNumber(10),
"block_hash-1",
)),
)
.await;

let start_point = importer.start_chain_point().await.unwrap();
assert_eq!(
Some(ChainPoint::new(
SlotNumber(15),
BlockNumber(10),
"block_hash-1"
)),
start_point
);
}

#[tokio::test]
async fn start_at_last_scanned_chain_point_even_if_something_stored() {
let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point(
Some(CardanoTransaction::new(
"tx_hash-2",
BlockNumber(20),
SlotNumber(25),
"block_hash-2",
)),
Some(ChainPoint::new(
SlotNumber(15),
BlockNumber(10),
"block_hash-1",
)),
)
.await;

let start_point = importer.start_chain_point().await.unwrap();
assert_eq!(
Some(ChainPoint::new(
SlotNumber(15),
BlockNumber(10),
"block_hash-1"
)),
start_point
);
}

#[tokio::test]
async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() {
let connection = cardano_tx_db_connection().unwrap();
let importer = CardanoTransactionsImporter {
latest_polled_chain_point: Arc::new(Mutex::new(None)),
..CardanoTransactionsImporter::new_for_test(
Arc::new(
DumbBlockScanner::new()
.forwards(vec![vec![ScannedBlock::new(
"block_hash-1",
BlockNumber(10),
SlotNumber(15),
Vec::<&str>::new(),
)]])
.latest_polled_chain_point(Some(ChainPoint::new(
SlotNumber(25),
BlockNumber(20),
"block_hash-2",
))),
),
Arc::new(CardanoTransactionRepository::new(Arc::new(
SqliteConnectionPool::build_from_connection(connection),
))),
)
};

let start_point_before_import = importer.start_chain_point().await.unwrap();
assert_eq!(None, start_point_before_import);

importer
.import_transactions(BlockNumber(1000))
.await
.unwrap();

let start_point_after_import = importer.start_chain_point().await.unwrap();
assert_eq!(
Some(ChainPoint::new(
SlotNumber(25),
BlockNumber(20),
"block_hash-2"
)),
start_point_after_import
);
}
}

#[tokio::test]
async fn when_rollbackward_should_remove_transactions() {
let connection = cardano_tx_db_connection().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion mithril-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-common"
version = "0.4.72"
version = "0.4.73"
description = "Common types, interfaces, and utilities for Mithril nodes."
authors = { workspace = true }
edition = { workspace = true }
Expand Down
Loading