From bf0b886a07111aa5c41c5d77acdea192a90701f8 Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Fri, 18 Oct 2024 17:06:50 +0200 Subject: [PATCH 1/6] refactor(common): use full chainpoint in `ChainBlockNextAction::RollBackward` This will allow to return the latest chain point known by the cardano block scanner. Else we only know it when there's a roll foward. --- .../chain_reader_block_streamer.rs | 16 +++++++++------- mithril-common/src/chain_reader/entity.rs | 6 +++--- .../src/chain_reader/fake_chain_reader.rs | 2 +- .../src/chain_reader/pallas_chain_reader.rs | 8 +++----- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs b/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs index 7a821052c48..fa0d411db74 100644 --- a/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs +++ b/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs @@ -52,9 +52,10 @@ impl BlockStreamer for ChainReaderBlockStreamer { } Some(BlockStreamerNextAction::ChainBlockNextAction( ChainBlockNextAction::RollBackward { - slot_number: rollback_slot_number, + chain_point: rollback_chain_point, }, )) => { + let rollback_slot_number = rollback_chain_point.slot_number; let index_rollback = roll_forwards .iter() .position(|block| block.slot_number == rollback_slot_number); @@ -139,8 +140,9 @@ impl ChainReaderBlockStreamer { } } Some(ChainBlockNextAction::RollBackward { - slot_number: rollback_slot_number, + chain_point: rollback_chain_point, }) => { + let rollback_slot_number = rollback_chain_point.slot_number; trace!( self.logger, "Received a RollBackward({rollback_slot_number:?})" @@ -150,7 +152,7 @@ impl ChainReaderBlockStreamer { } else { BlockStreamerNextAction::ChainBlockNextAction( ChainBlockNextAction::RollBackward { - slot_number: rollback_slot_number, + chain_point: rollback_chain_point, }, ) }; @@ -394,7 +396,7 @@ mod tests { async fn test_parse_expected_nothing_when_rollbackward_on_same_point() { let chain_reader = Arc::new(Mutex::new(FakeChainReader::new(vec![ ChainBlockNextAction::RollBackward { - slot_number: SlotNumber(100), + chain_point: ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-123"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -420,7 +422,7 @@ mod tests { { let chain_reader = Arc::new(Mutex::new(FakeChainReader::new(vec![ ChainBlockNextAction::RollBackward { - slot_number: SlotNumber(100), + chain_point: ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-123"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -473,7 +475,7 @@ mod tests { ), }, ChainBlockNextAction::RollBackward { - slot_number: SlotNumber(9), + chain_point: ChainPoint::new(SlotNumber(9), BlockNumber(90), "hash-9"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -518,7 +520,7 @@ mod tests { ), }, ChainBlockNextAction::RollBackward { - slot_number: SlotNumber(3), + chain_point: ChainPoint::new(SlotNumber(3), BlockNumber(30), "hash-3"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( diff --git a/mithril-common/src/chain_reader/entity.rs b/mithril-common/src/chain_reader/entity.rs index d750d93df09..0b391423b75 100644 --- a/mithril-common/src/chain_reader/entity.rs +++ b/mithril-common/src/chain_reader/entity.rs @@ -1,4 +1,4 @@ -use crate::{cardano_block_scanner::ScannedBlock, entities::SlotNumber}; +use crate::{cardano_block_scanner::ScannedBlock, entities::ChainPoint}; /// The action that indicates what to do next when scanning the chain #[derive(Debug, Clone, PartialEq)] @@ -10,7 +10,7 @@ pub enum ChainBlockNextAction { }, /// RollBackward event (we are on an incorrect fork, we need to get back a point to roll forward again) RollBackward { - /// The rollback slot number in the chain to read (as a new valid slot number to read from on the main chain, which has already been seen) - slot_number: SlotNumber, + /// The rollback chain point in the chain to read (as a new valid chain point to read from on the main chain, which has already been seen) + chain_point: ChainPoint, }, } diff --git a/mithril-common/src/chain_reader/fake_chain_reader.rs b/mithril-common/src/chain_reader/fake_chain_reader.rs index 5b3810936f6..6286f1ccb3b 100644 --- a/mithril-common/src/chain_reader/fake_chain_reader.rs +++ b/mithril-common/src/chain_reader/fake_chain_reader.rs @@ -71,7 +71,7 @@ mod tests { ), }, ChainBlockNextAction::RollBackward { - slot_number: build_chain_point(1).slot_number, + chain_point: build_chain_point(1), }, ]; diff --git a/mithril-common/src/chain_reader/pallas_chain_reader.rs b/mithril-common/src/chain_reader/pallas_chain_reader.rs index 6992f87fcf1..4116e410bbd 100644 --- a/mithril-common/src/chain_reader/pallas_chain_reader.rs +++ b/mithril-common/src/chain_reader/pallas_chain_reader.rs @@ -86,9 +86,7 @@ impl PallasChainReader { } NextResponse::RollBackward(rollback_point, _) => { let chain_point = ChainPoint::from(rollback_point); - Ok(Some(ChainBlockNextAction::RollBackward { - slot_number: chain_point.slot_number, - })) + Ok(Some(ChainBlockNextAction::RollBackward { chain_point })) } NextResponse::Await => Ok(None), } @@ -280,8 +278,8 @@ mod tests { let (_, client_res) = tokio::join!(server, client); let chain_block = client_res.expect("Client failed to get next chain block"); match chain_block { - ChainBlockNextAction::RollBackward { slot_number } => { - assert_eq!(slot_number, get_fake_chain_point_backwards().slot_number); + ChainBlockNextAction::RollBackward { chain_point } => { + assert_eq!(chain_point, get_fake_chain_point_backwards()); } _ => panic!("Unexpected chain block action"), } From c34b08d28965fc48a3817a58f547e1c8b863d25a Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Fri, 18 Oct 2024 18:11:04 +0200 Subject: [PATCH 2/6] feat(common): `BlockStreamer` can now return their latest polled chain point --- .../chain_reader_block_streamer.rs | 79 ++++++++++++++++++- .../dumb_block_scanner.rs | 52 +++++++++--- .../src/cardano_block_scanner/interface.rs | 3 + .../cardano_block_scanner/scanned_block.rs | 14 +++- .../signable_builder/cardano_transactions.rs | 2 +- 5 files changed, 135 insertions(+), 15 deletions(-) diff --git a/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs b/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs index fa0d411db74..e71a35ec4c9 100644 --- a/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs +++ b/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs @@ -26,6 +26,7 @@ pub struct ChainReaderBlockStreamer { from: ChainPoint, until: BlockNumber, max_roll_forwards_per_poll: usize, + last_polled_chain_point: Option, logger: Logger, } @@ -42,6 +43,7 @@ impl BlockStreamer for ChainReaderBlockStreamer { Some(BlockStreamerNextAction::ChainBlockNextAction( ChainBlockNextAction::RollForward { parsed_block }, )) => { + self.last_polled_chain_point = Some(ChainPoint::from(&parsed_block)); let parsed_block_number = parsed_block.block_number; roll_forwards.push(parsed_block); if roll_forwards.len() >= self.max_roll_forwards_per_poll @@ -55,6 +57,7 @@ impl BlockStreamer for ChainReaderBlockStreamer { chain_point: rollback_chain_point, }, )) => { + self.last_polled_chain_point = Some(rollback_chain_point.clone()); let rollback_slot_number = rollback_chain_point.slot_number; let index_rollback = roll_forwards .iter() @@ -82,16 +85,20 @@ impl BlockStreamer for ChainReaderBlockStreamer { continue; } None => { - if roll_forwards.is_empty() { - return Ok(None); + return if roll_forwards.is_empty() { + Ok(None) } else { chain_scanned_blocks = ChainScannedBlocks::RollForwards(roll_forwards); - return Ok(Some(chain_scanned_blocks)); + Ok(Some(chain_scanned_blocks)) } } } } } + + fn latest_polled_chain_point(&self) -> Option { + self.last_polled_chain_point.clone() + } } impl ChainReaderBlockStreamer { @@ -113,6 +120,7 @@ impl ChainReaderBlockStreamer { from, until, max_roll_forwards_per_poll, + last_polled_chain_point: None, logger: logger.new_with_component_name::(), }) } @@ -211,6 +219,7 @@ mod tests { let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); assert_eq!(None, scanned_blocks); + assert_eq!(None, block_streamer.latest_polled_chain_point()); let mut block_streamer = ChainReaderBlockStreamer::try_new( chain_reader, @@ -232,6 +241,14 @@ mod tests { )])), scanned_blocks ); + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new( + SlotNumber(100), + until_block_number, + "hash-2", + )) + ); } #[tokio::test] @@ -285,6 +302,11 @@ mod tests { let chain_reader_total_remaining_next_actions = chain_reader.lock().await.get_total_remaining_next_actions(); assert_eq!(1, chain_reader_total_remaining_next_actions); + + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new(SlotNumber(20), BlockNumber(2), "hash-2")) + ); } #[tokio::test] @@ -327,6 +349,10 @@ mod tests { ])), scanned_blocks, ); + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new(SlotNumber(20), BlockNumber(2), "hash-2")) + ); } #[tokio::test] @@ -376,6 +402,10 @@ mod tests { ])), scanned_blocks, ); + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new(SlotNumber(20), BlockNumber(2), "hash-2")) + ); let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); assert_eq!( @@ -387,9 +417,17 @@ mod tests { ),])), scanned_blocks, ); + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new(SlotNumber(30), BlockNumber(3), "hash-3")) + ); let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); assert_eq!(None, scanned_blocks); + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new(SlotNumber(30), BlockNumber(3), "hash-3")) + ); } #[tokio::test] @@ -415,6 +453,7 @@ mod tests { let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); assert_eq!(None, scanned_blocks); + assert_eq!(block_streamer.latest_polled_chain_point(), None); } #[tokio::test] @@ -422,7 +461,7 @@ mod tests { { let chain_reader = Arc::new(Mutex::new(FakeChainReader::new(vec![ ChainBlockNextAction::RollBackward { - chain_point: ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-123"), + chain_point: ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-10"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -441,9 +480,17 @@ mod tests { Some(ChainScannedBlocks::RollBackward(SlotNumber(100))), scanned_blocks, ); + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-10")) + ); let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); assert_eq!(None, scanned_blocks); + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-10")) + ); } #[tokio::test] @@ -497,6 +544,10 @@ mod tests { ])), scanned_blocks, ); + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new(SlotNumber(9), BlockNumber(90), "hash-9",)) + ); } #[tokio::test] @@ -539,6 +590,10 @@ mod tests { Some(ChainScannedBlocks::RollBackward(SlotNumber(3))), scanned_blocks, ); + assert_eq!( + block_streamer.latest_polled_chain_point(), + Some(ChainPoint::new(SlotNumber(3), BlockNumber(30), "hash-3",)) + ); } #[tokio::test] @@ -558,4 +613,20 @@ mod tests { assert_eq!(scanned_blocks, None); } + + #[tokio::test] + async fn test_latest_polled_chain_point_is_none_if_nothing_was_polled() { + let chain_reader = Arc::new(Mutex::new(FakeChainReader::new(vec![]))); + let block_streamer = ChainReaderBlockStreamer::try_new( + chain_reader, + None, + BlockNumber(1), + MAX_ROLL_FORWARDS_PER_POLL, + TestLogger::stdout(), + ) + .await + .unwrap(); + + assert_eq!(block_streamer.latest_polled_chain_point(), None); + } } diff --git a/mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs b/mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs index 2fc516bd663..b156e1a2167 100644 --- a/mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs +++ b/mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs @@ -21,33 +21,43 @@ impl DumbBlockScanner { } } - /// Add to the inner streamer several [ChainScannedBlocks::RollForwards] responses at the end of the + /// Add to the inner streamer several [ChainScannedBlocks::RollForwards] responses at the end of /// its queue. pub fn forwards(self, blocks: Vec>) -> Self { self.add_forwards(blocks); self } - /// Add to the inner streamer a [ChainScannedBlocks::RollBackward] response at the end of the - /// its queue. + /// Add to the inner streamer a [ChainScannedBlocks::RollBackward] response at the end of its queue. pub fn backward(self, chain_point: ChainPoint) -> Self { self.add_backward(chain_point); self } - /// Add to the inner streamer several [ChainScannedBlocks::RollForwards] responses at the end of the + /// Set the latest polled chain point to return when [Self::latest_polled_chain_point] is called. + pub fn latest_polled_chain_point(self, chain_point: Option) -> Self { + self.set_latest_polled_chain_point(chain_point); + self + } + + /// Add to the inner streamer several [ChainScannedBlocks::RollForwards] responses at the end of /// its queue. pub fn add_forwards(&self, blocks: Vec>) { let mut streamer = self.streamer.write().unwrap(); *streamer = streamer.clone().forwards(blocks); } - /// Add to the inner streamer a [ChainScannedBlocks::RollBackward] response at the end of the - /// its queue. + /// Add to the inner streamer a [ChainScannedBlocks::RollBackward] response at the end of its queue. pub fn add_backward(&self, chain_point: ChainPoint) { let mut streamer = self.streamer.write().unwrap(); *streamer = streamer.clone().rollback(chain_point); } + + /// Set the latest polled chain point to return when [Self::latest_polled_chain_point] is called. + pub fn set_latest_polled_chain_point(&self, chain_point: Option) { + let mut streamer = self.streamer.write().unwrap(); + *streamer = streamer.clone().set_latest_polled_chain_point(chain_point); + } } impl Default for DumbBlockScanner { @@ -72,6 +82,7 @@ impl BlockScanner for DumbBlockScanner { #[derive(Clone)] pub struct DumbBlockStreamer { streamer_responses: VecDeque, + latest_polled_chain_point: Option, } impl DumbBlockStreamer { @@ -79,10 +90,17 @@ impl DumbBlockStreamer { pub fn new() -> Self { Self { streamer_responses: VecDeque::new(), + latest_polled_chain_point: None, } } - /// Add to the streamer several [ChainScannedBlocks::RollForwards] responses at the end of the + /// Set the latest polled chain point to return when [Self::latest_polled_chain_point] is called + pub fn set_latest_polled_chain_point(mut self, chain_point: Option) -> Self { + self.latest_polled_chain_point = chain_point; + self + } + + /// Add to the streamer several [ChainScannedBlocks::RollForwards] responses at the end of /// its queue. pub fn forwards(mut self, blocks: Vec>) -> Self { let mut source: VecDeque<_> = blocks @@ -94,8 +112,7 @@ impl DumbBlockStreamer { self } - /// Add to the streamer a [ChainScannedBlocks::RollBackward] response at the end of the - /// its queue. + /// Add to the streamer a [ChainScannedBlocks::RollBackward] response at the end of its queue. pub fn rollback(mut self, chain_point: ChainPoint) -> Self { self.streamer_responses .push_back(ChainScannedBlocks::RollBackward(chain_point.slot_number)); @@ -114,6 +131,10 @@ impl BlockStreamer for DumbBlockStreamer { async fn poll_next(&mut self) -> StdResult> { Ok(self.streamer_responses.pop_front()) } + + fn latest_polled_chain_point(&self) -> Option { + self.latest_polled_chain_point.clone() + } } #[cfg(test)] @@ -287,4 +308,17 @@ mod tests { let blocks = streamer.poll_next().await.unwrap(); assert_eq!(blocks, None); } + + #[tokio::test] + async fn setting_last_polled_block() { + let mut streamer = DumbBlockStreamer::new().forwards(vec![]); + assert_eq!(streamer.latest_polled_chain_point(), None); + + let chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash"); + streamer = streamer.set_latest_polled_chain_point(Some(chain_point.clone())); + assert_eq!(streamer.latest_polled_chain_point(), Some(chain_point)); + + streamer = streamer.set_latest_polled_chain_point(None); + assert_eq!(streamer.latest_polled_chain_point(), None); + } } diff --git a/mithril-common/src/cardano_block_scanner/interface.rs b/mithril-common/src/cardano_block_scanner/interface.rs index 7bd44aeeed1..54268944cae 100644 --- a/mithril-common/src/cardano_block_scanner/interface.rs +++ b/mithril-common/src/cardano_block_scanner/interface.rs @@ -65,6 +65,9 @@ pub enum ChainScannedBlocks { pub trait BlockStreamer: Sync + Send { /// Stream the next available blocks async fn poll_next(&mut self) -> StdResult>; + + /// Get the latest polled chain point + fn latest_polled_chain_point(&self) -> Option; } cfg_test_tools! { diff --git a/mithril-common/src/cardano_block_scanner/scanned_block.rs b/mithril-common/src/cardano_block_scanner/scanned_block.rs index 41b287b3bba..8060cf2f7ee 100644 --- a/mithril-common/src/cardano_block_scanner/scanned_block.rs +++ b/mithril-common/src/cardano_block_scanner/scanned_block.rs @@ -1,6 +1,8 @@ use pallas_traverse::MultiEraBlock; -use crate::entities::{BlockHash, BlockNumber, CardanoTransaction, SlotNumber, TransactionHash}; +use crate::entities::{ + BlockHash, BlockNumber, CardanoTransaction, ChainPoint, SlotNumber, TransactionHash, +}; /// A block scanned from a Cardano database #[derive(Debug, Clone, PartialEq)] @@ -67,3 +69,13 @@ impl ScannedBlock { .collect::>() } } + +impl From<&ScannedBlock> for ChainPoint { + fn from(scanned_block: &ScannedBlock) -> Self { + ChainPoint::new( + scanned_block.slot_number, + scanned_block.block_number, + scanned_block.block_hash.clone(), + ) + } +} diff --git a/mithril-common/src/signable_builder/cardano_transactions.rs b/mithril-common/src/signable_builder/cardano_transactions.rs index 7ab3f495074..dc918a2852d 100644 --- a/mithril-common/src/signable_builder/cardano_transactions.rs +++ b/mithril-common/src/signable_builder/cardano_transactions.rs @@ -17,7 +17,7 @@ use mockall::automock; #[cfg_attr(test, automock)] #[async_trait] pub trait TransactionsImporter: Send + Sync { - /// Returns all transactions up to the given beacon + /// Import all transactions up to the given beacon into the system async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()>; } From 5eca7332b0e26623682ed17bcf1287babb29664b Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Mon, 21 Oct 2024 18:24:37 +0200 Subject: [PATCH 3/6] fix(signer): avoid unnecessary rescan when importing transactions in chunk This fix the issue of importing when there's no transactions to store in a large range of blocks: * Instead of always using the database to get the start point the db is only used for the first chunk. * For subsequent call the last chain point polled by the streamer is used. --- .../cardano_transactions/importer/service.rs | 200 +++++++++++++++++- 1 file changed, 198 insertions(+), 2 deletions(-) diff --git a/mithril-signer/src/services/cardano_transactions/importer/service.rs b/mithril-signer/src/services/cardano_transactions/importer/service.rs index a15b16ec5bc..ccc2bd62705 100644 --- a/mithril-signer/src/services/cardano_transactions/importer/service.rs +++ b/mithril-signer/src/services/cardano_transactions/importer/service.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; use slog::{debug, Logger}; -use tokio::{runtime::Handle, task}; +use tokio::{runtime::Handle, sync::Mutex, task}; use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks}; use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory}; @@ -56,6 +56,7 @@ pub trait TransactionStore: Send + Sync { pub struct CardanoTransactionsImporter { block_scanner: Arc, transaction_store: Arc, + latest_polled_chain_point: Arc>>, logger: Logger, } @@ -69,12 +70,23 @@ impl CardanoTransactionsImporter { Self { block_scanner, transaction_store, + latest_polled_chain_point: Arc::new(Mutex::new(None)), logger: logger.new_with_component_name::(), } } + async fn start_chain_point(&self) -> StdResult> { + let last_scanned_chain_point = self.latest_polled_chain_point.lock().await.clone(); + + if last_scanned_chain_point.is_none() { + self.transaction_store.get_highest_beacon().await + } else { + Ok(last_scanned_chain_point) + } + } + async fn import_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> { - let from = self.transaction_store.get_highest_beacon().await?; + let from = self.start_chain_point().await?; self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon) .await } @@ -120,6 +132,7 @@ impl CardanoTransactionsImporter { } } } + *self.latest_polled_chain_point.lock().await = streamer.latest_polled_chain_point(); Ok(()) } @@ -781,6 +794,189 @@ mod tests { assert_eq!(cold_imported_transactions, warm_imported_transactions); } + mod transactions_import_start_point { + use super::*; + + async fn importer_with_highest_stored_transaction_and_last_polled_chain_point( + highest_stored_transaction: Option, + last_polled_chain_point: Option, + ) -> CardanoTransactionsImporter { + let connection = cardano_tx_db_connection().unwrap(); + let repository = Arc::new(CardanoTransactionRepository::new(Arc::new( + SqliteConnectionPool::build_from_connection(connection), + ))); + + if let Some(transaction) = highest_stored_transaction { + repository + .store_transactions(vec![transaction]) + .await + .unwrap(); + } + + CardanoTransactionsImporter { + latest_polled_chain_point: Arc::new(Mutex::new(last_polled_chain_point)), + ..CardanoTransactionsImporter::new_for_test( + Arc::new(DumbBlockScanner::new()), + repository, + ) + } + } + + #[tokio::test] + async fn cloning_keep_last_polled_chain_point() { + let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( + None, + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1", + )), + ) + .await; + + let cloned_importer = importer.clone(); + let start_point = cloned_importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1" + )), + start_point + ); + } + + #[tokio::test] + async fn none_if_nothing_stored_nor_scanned() { + let importer = + importer_with_highest_stored_transaction_and_last_polled_chain_point(None, None) + .await; + + let start_point = importer.start_chain_point().await.unwrap(); + assert_eq!(None, start_point); + } + + #[tokio::test] + async fn start_at_last_stored_chain_point_if_nothing_scanned() { + let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( + Some(CardanoTransaction::new( + "tx_hash-2", + BlockNumber(20), + SlotNumber(25), + "block_hash-2", + )), + None, + ) + .await; + + let start_point = importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + "block_hash-2" + )), + start_point + ); + } + + #[tokio::test] + async fn start_at_last_scanned_chain_point_when_nothing_stored() { + let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( + None, + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1", + )), + ) + .await; + + let start_point = importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1" + )), + start_point + ); + } + + #[tokio::test] + async fn start_at_last_scanned_chain_point_even_if_something_stored() { + let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( + Some(CardanoTransaction::new( + "tx_hash-2", + BlockNumber(20), + SlotNumber(25), + "block_hash-2", + )), + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1", + )), + ) + .await; + + let start_point = importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1" + )), + start_point + ); + } + + #[tokio::test] + async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() { + let connection = cardano_tx_db_connection().unwrap(); + let importer = CardanoTransactionsImporter { + latest_polled_chain_point: Arc::new(Mutex::new(None)), + ..CardanoTransactionsImporter::new_for_test( + Arc::new( + DumbBlockScanner::new() + .forwards(vec![vec![ScannedBlock::new( + "block_hash-1", + BlockNumber(10), + SlotNumber(15), + Vec::<&str>::new(), + )]]) + .latest_polled_chain_point(Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + "block_hash-2", + ))), + ), + Arc::new(CardanoTransactionRepository::new(Arc::new( + SqliteConnectionPool::build_from_connection(connection), + ))), + ) + }; + + let start_point_before_import = importer.start_chain_point().await.unwrap(); + assert_eq!(None, start_point_before_import); + + importer + .import_transactions(BlockNumber(1000)) + .await + .unwrap(); + + let start_point_after_import = importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + "block_hash-2" + )), + start_point_after_import + ); + } + } + #[tokio::test] async fn when_rollbackward_should_remove_transactions() { let connection = cardano_tx_db_connection().unwrap(); From 3bab5a3fb88c769023c5f7f6eb17d8c34b83a4ac Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Mon, 21 Oct 2024 18:35:22 +0200 Subject: [PATCH 4/6] chore(aggregator): synchronise `cardano_transactions_importer` code with signer --- .../services/cardano_transactions_importer.rs | 200 +++++++++++++++++- 1 file changed, 198 insertions(+), 2 deletions(-) diff --git a/mithril-aggregator/src/services/cardano_transactions_importer.rs b/mithril-aggregator/src/services/cardano_transactions_importer.rs index a15b16ec5bc..ccc2bd62705 100644 --- a/mithril-aggregator/src/services/cardano_transactions_importer.rs +++ b/mithril-aggregator/src/services/cardano_transactions_importer.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; use slog::{debug, Logger}; -use tokio::{runtime::Handle, task}; +use tokio::{runtime::Handle, sync::Mutex, task}; use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks}; use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory}; @@ -56,6 +56,7 @@ pub trait TransactionStore: Send + Sync { pub struct CardanoTransactionsImporter { block_scanner: Arc, transaction_store: Arc, + latest_polled_chain_point: Arc>>, logger: Logger, } @@ -69,12 +70,23 @@ impl CardanoTransactionsImporter { Self { block_scanner, transaction_store, + latest_polled_chain_point: Arc::new(Mutex::new(None)), logger: logger.new_with_component_name::(), } } + async fn start_chain_point(&self) -> StdResult> { + let last_scanned_chain_point = self.latest_polled_chain_point.lock().await.clone(); + + if last_scanned_chain_point.is_none() { + self.transaction_store.get_highest_beacon().await + } else { + Ok(last_scanned_chain_point) + } + } + async fn import_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> { - let from = self.transaction_store.get_highest_beacon().await?; + let from = self.start_chain_point().await?; self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon) .await } @@ -120,6 +132,7 @@ impl CardanoTransactionsImporter { } } } + *self.latest_polled_chain_point.lock().await = streamer.latest_polled_chain_point(); Ok(()) } @@ -781,6 +794,189 @@ mod tests { assert_eq!(cold_imported_transactions, warm_imported_transactions); } + mod transactions_import_start_point { + use super::*; + + async fn importer_with_highest_stored_transaction_and_last_polled_chain_point( + highest_stored_transaction: Option, + last_polled_chain_point: Option, + ) -> CardanoTransactionsImporter { + let connection = cardano_tx_db_connection().unwrap(); + let repository = Arc::new(CardanoTransactionRepository::new(Arc::new( + SqliteConnectionPool::build_from_connection(connection), + ))); + + if let Some(transaction) = highest_stored_transaction { + repository + .store_transactions(vec![transaction]) + .await + .unwrap(); + } + + CardanoTransactionsImporter { + latest_polled_chain_point: Arc::new(Mutex::new(last_polled_chain_point)), + ..CardanoTransactionsImporter::new_for_test( + Arc::new(DumbBlockScanner::new()), + repository, + ) + } + } + + #[tokio::test] + async fn cloning_keep_last_polled_chain_point() { + let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( + None, + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1", + )), + ) + .await; + + let cloned_importer = importer.clone(); + let start_point = cloned_importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1" + )), + start_point + ); + } + + #[tokio::test] + async fn none_if_nothing_stored_nor_scanned() { + let importer = + importer_with_highest_stored_transaction_and_last_polled_chain_point(None, None) + .await; + + let start_point = importer.start_chain_point().await.unwrap(); + assert_eq!(None, start_point); + } + + #[tokio::test] + async fn start_at_last_stored_chain_point_if_nothing_scanned() { + let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( + Some(CardanoTransaction::new( + "tx_hash-2", + BlockNumber(20), + SlotNumber(25), + "block_hash-2", + )), + None, + ) + .await; + + let start_point = importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + "block_hash-2" + )), + start_point + ); + } + + #[tokio::test] + async fn start_at_last_scanned_chain_point_when_nothing_stored() { + let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( + None, + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1", + )), + ) + .await; + + let start_point = importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1" + )), + start_point + ); + } + + #[tokio::test] + async fn start_at_last_scanned_chain_point_even_if_something_stored() { + let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( + Some(CardanoTransaction::new( + "tx_hash-2", + BlockNumber(20), + SlotNumber(25), + "block_hash-2", + )), + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1", + )), + ) + .await; + + let start_point = importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(15), + BlockNumber(10), + "block_hash-1" + )), + start_point + ); + } + + #[tokio::test] + async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() { + let connection = cardano_tx_db_connection().unwrap(); + let importer = CardanoTransactionsImporter { + latest_polled_chain_point: Arc::new(Mutex::new(None)), + ..CardanoTransactionsImporter::new_for_test( + Arc::new( + DumbBlockScanner::new() + .forwards(vec![vec![ScannedBlock::new( + "block_hash-1", + BlockNumber(10), + SlotNumber(15), + Vec::<&str>::new(), + )]]) + .latest_polled_chain_point(Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + "block_hash-2", + ))), + ), + Arc::new(CardanoTransactionRepository::new(Arc::new( + SqliteConnectionPool::build_from_connection(connection), + ))), + ) + }; + + let start_point_before_import = importer.start_chain_point().await.unwrap(); + assert_eq!(None, start_point_before_import); + + importer + .import_transactions(BlockNumber(1000)) + .await + .unwrap(); + + let start_point_after_import = importer.start_chain_point().await.unwrap(); + assert_eq!( + Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + "block_hash-2" + )), + start_point_after_import + ); + } + } + #[tokio::test] async fn when_rollbackward_should_remove_transactions() { let connection = cardano_tx_db_connection().unwrap(); From 906cdc10f7bb106702bf920ff89fdd117171ec8c Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Tue, 22 Oct 2024 12:14:14 +0200 Subject: [PATCH 5/6] chore: upgrade crate versions * mithril-aggregator from `0.5.87` to `0.5.88` * mithril-common from `0.4.72` to `0.4.73` * mithril-signer from `0.2.202` to `0.2.203` --- Cargo.lock | 4 ++-- mithril-aggregator/Cargo.toml | 2 +- mithril-common/Cargo.toml | 2 +- mithril-signer/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be313dfb75e..cd076a95c18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3709,7 +3709,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.4.72" +version = "0.4.73" dependencies = [ "anyhow", "async-trait", @@ -3867,7 +3867,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.202" +version = "0.2.203" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index 5a5dde8b0e0..c83c9ca8600 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.5.87" +version = "0.5.88" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index 34b836aac66..9a2acf1927e 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.4.72" +version = "0.4.73" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index 0ffd65e490c..30b814b8e0f 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.202" +version = "0.2.203" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true } From 84ca63fdb3711baca396d4952a6f6309ab28c365 Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Tue, 22 Oct 2024 12:30:36 +0200 Subject: [PATCH 6/6] chore: update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a48567088fa..6ba3effaa8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ As a minor extension, we have adopted a slightly different versioning convention - Support for Prometheus metrics endpoint in aggregator +- Fix an issue that caused unnecessary re-scan of the Cardano chain when importing transactions. + - Crates versions: | Crate | Version |