From ff5ed52dbad9f8e00e0230f90454b50d1c48a43f Mon Sep 17 00:00:00 2001 From: Lukasz Gasior Date: Wed, 14 Feb 2024 13:06:47 +0100 Subject: [PATCH 1/3] Lock the mempool for the whole duration of add_if_committable_internal --- .../state-manager/src/mempool/mempool_manager.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core-rust/state-manager/src/mempool/mempool_manager.rs b/core-rust/state-manager/src/mempool/mempool_manager.rs index d2de8a0289..c486cfa667 100644 --- a/core-rust/state-manager/src/mempool/mempool_manager.rs +++ b/core-rust/state-manager/src/mempool/mempool_manager.rs @@ -280,12 +280,12 @@ impl MempoolManager { } }; + // We need to lock the mempool to prevent concurrent threads updating it while + // we attempt to insert a transaction (both mempool sync and Core API use this method). + let mut locked_mempool = self.mempool.write(); + // STEP 2 - Check if transaction is already in mempool to avoid transaction execution. - if self - .mempool - .read() - .contains_transaction(&prepared.notarized_transaction_hash()) - { + if locked_mempool.contains_transaction(&prepared.notarized_transaction_hash()) { return Err(MempoolAddError::Duplicate( prepared.notarized_transaction_hash(), )); @@ -315,7 +315,7 @@ impl MempoolManager { validated, raw: raw_transaction, }); - match self.mempool.write().add_transaction( + match locked_mempool.add_transaction( mempool_transaction.clone(), source, Instant::now(), From 800493174c8cb7d2658e675aa7a9ff0dc3a4aee7 Mon Sep 17 00:00:00 2001 From: Lukasz Gasior Date: Wed, 14 Feb 2024 14:18:44 +0100 Subject: [PATCH 2/3] Revert excessive mempool locking; add another contains_transaction check --- .../src/mempool/mempool_manager.rs | 12 ++-- .../src/mempool/priority_mempool.rs | 56 ++++++++++++------- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/core-rust/state-manager/src/mempool/mempool_manager.rs b/core-rust/state-manager/src/mempool/mempool_manager.rs index c486cfa667..5b36e0d993 100644 --- a/core-rust/state-manager/src/mempool/mempool_manager.rs +++ b/core-rust/state-manager/src/mempool/mempool_manager.rs @@ -280,12 +280,12 @@ impl MempoolManager { } }; - // We need to lock the mempool to prevent concurrent threads updating it while - // we attempt to insert a transaction (both mempool sync and Core API use this method). - let mut locked_mempool = self.mempool.write(); - // STEP 2 - Check if transaction is already in mempool to avoid transaction execution. - if locked_mempool.contains_transaction(&prepared.notarized_transaction_hash()) { + if self + .mempool + .read() + .contains_transaction(&prepared.notarized_transaction_hash()) + { return Err(MempoolAddError::Duplicate( prepared.notarized_transaction_hash(), )); @@ -315,7 +315,7 @@ impl MempoolManager { validated, raw: raw_transaction, }); - match locked_mempool.add_transaction( + match self.mempool.write().add_transaction_if_not_present( mempool_transaction.clone(), source, Instant::now(), diff --git a/core-rust/state-manager/src/mempool/priority_mempool.rs b/core-rust/state-manager/src/mempool/priority_mempool.rs index 6e53c6cc27..10a1781309 100644 --- a/core-rust/state-manager/src/mempool/priority_mempool.rs +++ b/core-rust/state-manager/src/mempool/priority_mempool.rs @@ -250,17 +250,22 @@ impl PriorityMempool { } impl PriorityMempool { - /// ASSUMPTION: Mempool does not already contain the transaction (panics otherwise). /// Tries to add a new transaction into the mempool. /// Will return either a [`Vec`] of [`MempoolData`] that was evicted in order to fit the new transaction or an error /// if the mempool is full and the new transaction proposal priority is not better than what already exists. - pub fn add_transaction( + /// Returns an empty [`Vec`] if the transaction was already present in the mempool. + pub fn add_transaction_if_not_present( &mut self, transaction: Arc, source: MempoolAddSource, added_at: Instant, ) -> Result>, MempoolAddError> { let payload_hash = transaction.notarized_transaction_hash(); + + if self.contains_transaction(&payload_hash) { + return Ok(vec![]); + } + let intent_hash = transaction.intent_hash(); let transaction_size = transaction.raw.0.len() as u64; @@ -335,7 +340,8 @@ impl PriorityMempool { .insert(payload_hash, transaction_data.clone()) .is_some() { - panic!("Broken precondition: Transaction already inside mempool"); + // This should have been checked at the beginning of this method + panic!("Broken precondition: Transaction already inside mempool."); } // Add proposal priority index @@ -636,14 +642,18 @@ mod tests { assert_eq!(mp.remaining_transaction_count, 5); assert_eq!(mp.get_count(), 0); - mp.add_transaction(mt1.clone(), MempoolAddSource::CoreApi, Instant::now()) + mp.add_transaction_if_not_present(mt1.clone(), MempoolAddSource::CoreApi, Instant::now()) .unwrap(); assert_eq!(mp.remaining_transaction_count, 4); assert_eq!(mp.get_count(), 1); assert!(mp.contains_transaction(&mt1.notarized_transaction_hash())); - mp.add_transaction(mt2.clone(), MempoolAddSource::MempoolSync, Instant::now()) - .unwrap(); + mp.add_transaction_if_not_present( + mt2.clone(), + MempoolAddSource::MempoolSync, + Instant::now(), + ) + .unwrap(); assert_eq!(mp.remaining_transaction_count, 3); assert_eq!(mp.get_count(), 2); assert!(mp.contains_transaction(&mt1.notarized_transaction_hash())); @@ -687,7 +697,7 @@ mod tests { ®istry, ); assert!(mp - .add_transaction( + .add_transaction_if_not_present( intent_1_payload_1.clone(), MempoolAddSource::CoreApi, Instant::now() @@ -695,7 +705,7 @@ mod tests { .unwrap() .is_empty()); assert!(mp - .add_transaction( + .add_transaction_if_not_present( intent_1_payload_2.clone(), MempoolAddSource::CoreApi, Instant::now() @@ -703,7 +713,7 @@ mod tests { .unwrap() .is_empty()); assert!(mp - .add_transaction( + .add_transaction_if_not_present( intent_1_payload_3, MempoolAddSource::MempoolSync, Instant::now() @@ -711,7 +721,7 @@ mod tests { .unwrap() .is_empty()); assert!(mp - .add_transaction( + .add_transaction_if_not_present( intent_2_payload_1.clone(), MempoolAddSource::CoreApi, Instant::now() @@ -753,7 +763,7 @@ mod tests { 0 ); assert!(mp - .add_transaction( + .add_transaction_if_not_present( intent_2_payload_1, MempoolAddSource::MempoolSync, Instant::now() @@ -769,7 +779,7 @@ mod tests { ); assert!(mp - .add_transaction( + .add_transaction_if_not_present( intent_2_payload_2.clone(), MempoolAddSource::CoreApi, Instant::now() @@ -857,49 +867,53 @@ mod tests { ); assert!(mp - .add_transaction(mt4.clone(), MempoolAddSource::CoreApi, time_point[0]) + .add_transaction_if_not_present(mt4.clone(), MempoolAddSource::CoreApi, time_point[0]) .unwrap() .is_empty()); assert!(mp - .add_transaction(mt2.clone(), MempoolAddSource::CoreApi, time_point[1]) + .add_transaction_if_not_present(mt2.clone(), MempoolAddSource::CoreApi, time_point[1]) .unwrap() .is_empty()); assert!(mp - .add_transaction(mt3.clone(), MempoolAddSource::MempoolSync, time_point[0]) + .add_transaction_if_not_present( + mt3.clone(), + MempoolAddSource::MempoolSync, + time_point[0] + ) .unwrap() .is_empty()); assert!(mp - .add_transaction(mt1.clone(), MempoolAddSource::CoreApi, time_point[0]) + .add_transaction_if_not_present(mt1.clone(), MempoolAddSource::CoreApi, time_point[0]) .unwrap() .is_empty()); let evicted = mp - .add_transaction(mt5, MempoolAddSource::CoreApi, time_point[1]) + .add_transaction_if_not_present(mt5, MempoolAddSource::CoreApi, time_point[1]) .unwrap(); assert_eq!(evicted.len(), 1); assert_eq!(evicted[0].transaction, mt1); // mt2 should be evicted before mt3 because of lower time spent in the mempool let evicted = mp - .add_transaction(mt6, MempoolAddSource::CoreApi, time_point[1]) + .add_transaction_if_not_present(mt6, MempoolAddSource::CoreApi, time_point[1]) .unwrap(); assert_eq!(evicted.len(), 1); assert_eq!(evicted[0].transaction, mt2); let evicted = mp - .add_transaction(mt7, MempoolAddSource::CoreApi, time_point[1]) + .add_transaction_if_not_present(mt7, MempoolAddSource::CoreApi, time_point[1]) .unwrap(); assert_eq!(evicted.len(), 1); assert_eq!(evicted[0].transaction, mt3); let evicted = mp - .add_transaction(mt8, MempoolAddSource::CoreApi, time_point[1]) + .add_transaction_if_not_present(mt8, MempoolAddSource::CoreApi, time_point[1]) .unwrap(); assert_eq!(evicted.len(), 1); assert_eq!(evicted[0].transaction, mt4); assert!(mp - .add_transaction(mt9, MempoolAddSource::CoreApi, time_point[2]) + .add_transaction_if_not_present(mt9, MempoolAddSource::CoreApi, time_point[2]) .is_err()); } } From 3ca26f2ab06bdb0b6e90c6272b5d62010a71ef7e Mon Sep 17 00:00:00 2001 From: Lukasz Gasior Date: Wed, 14 Feb 2024 21:37:55 +0100 Subject: [PATCH 3/3] Add mempool test for duplicate txn insertion --- .../src/mempool/priority_mempool.rs | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/core-rust/state-manager/src/mempool/priority_mempool.rs b/core-rust/state-manager/src/mempool/priority_mempool.rs index 10a1781309..a150c83820 100644 --- a/core-rust/state-manager/src/mempool/priority_mempool.rs +++ b/core-rust/state-manager/src/mempool/priority_mempool.rs @@ -916,4 +916,35 @@ mod tests { .add_transaction_if_not_present(mt9, MempoolAddSource::CoreApi, time_point[2]) .is_err()); } + + #[test] + fn test_duplicate_txn_not_inserted() { + let mempool_txn = create_fake_pending_transaction(1, 0, 10); + + let now = Instant::now(); + + let registry = Registry::new(); + + let mut mempool = PriorityMempool::new( + MempoolConfig { + max_transaction_count: 1, + max_total_transactions_size: 1024 * 1024, + }, + ®istry, + ); + + // Inserting the same transaction twice should be a non-panicking no-op + assert!(mempool + .add_transaction_if_not_present(mempool_txn.clone(), MempoolAddSource::CoreApi, now) + .unwrap() + .is_empty()); + assert!(mempool + .add_transaction_if_not_present( + mempool_txn.clone(), + MempoolAddSource::MempoolSync, + now + Duration::from_secs(1) + ) + .unwrap() + .is_empty()); + } }