From bc64d0a50512ba9a7a883c2cdc08a5d842bf20f9 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Fri, 14 Jun 2024 16:18:06 -0500 Subject: [PATCH] fix(pool): fix race condition in paymaster tracking --- crates/pool/src/mempool/paymaster.rs | 136 +++++++----------- crates/pool/src/mempool/uo_pool.rs | 199 ++++++++++++++++++++------- crates/rpc/src/eth/error.rs | 3 +- 3 files changed, 197 insertions(+), 141 deletions(-) diff --git a/crates/pool/src/mempool/paymaster.rs b/crates/pool/src/mempool/paymaster.rs index 92baf007d..af2a8a483 100644 --- a/crates/pool/src/mempool/paymaster.rs +++ b/crates/pool/src/mempool/paymaster.rs @@ -26,7 +26,7 @@ use rundler_types::{ use rundler_utils::cache::LruMap; use super::MempoolResult; -use crate::chain::{BalanceUpdate, MinedOp}; +use crate::chain::MinedOp; /// Keeps track of current and pending paymaster balances #[derive(Debug)] @@ -94,28 +94,6 @@ where Ok(stake_status) } - pub(crate) fn update_paymaster_balances_after_update<'a>( - &self, - entity_balance_updates: impl Iterator, - unmined_entity_balance_updates: impl Iterator, - ) { - for balance_update in entity_balance_updates { - self.state.write().update_paymaster_balance_from_event( - balance_update.address, - balance_update.amount, - balance_update.is_addition, - ) - } - - for unmined_balance_update in unmined_entity_balance_updates { - self.state.write().update_paymaster_balance_from_event( - unmined_balance_update.address, - unmined_balance_update.amount, - !unmined_balance_update.is_addition, - ) - } - } - pub(crate) async fn paymaster_balance( &self, paymaster: Address, @@ -174,7 +152,20 @@ where self.state.write().set_tracking(tracking_enabled); } - pub(crate) async fn reset_confimed_balances(&self) -> MempoolResult<()> { + pub(crate) async fn reset_confirmed_balances_for( + &self, + addresses: &[Address], + ) -> MempoolResult<()> { + let balances = self.entry_point.get_balances(addresses.to_vec()).await?; + + self.state + .write() + .set_confimed_balances(addresses, &balances); + + Ok(()) + } + + pub(crate) async fn reset_confirmed_balances(&self) -> MempoolResult<()> { let paymaster_addresses = self.paymaster_addresses(); let balances = self @@ -194,6 +185,7 @@ where .write() .update_paymaster_balance_from_mined_op(mined_op); } + pub(crate) fn remove_operation(&self, id: &UserOperationId) { self.state.write().remove_operation(id); } @@ -324,21 +316,6 @@ impl PaymasterTrackerInner { keys } - fn update_paymaster_balance_from_event( - &mut self, - paymaster: Address, - amount: U256, - should_add: bool, - ) { - if let Some(paymaster_balance) = self.paymaster_balances.get(&paymaster) { - if should_add { - paymaster_balance.confirmed = paymaster_balance.confirmed.saturating_add(amount); - } else { - paymaster_balance.confirmed = paymaster_balance.confirmed.saturating_sub(amount); - } - } - } - fn paymaster_metadata(&self, paymaster: Address) -> Option { if let Some(paymaster_balance) = self.paymaster_balances.peek(&paymaster) { return Some(PaymasterMetadata { @@ -530,7 +507,7 @@ mod tests { }; use super::*; - use crate::{chain::BalanceUpdate, mempool::paymaster::PaymasterTracker}; + use crate::mempool::paymaster::PaymasterTracker; fn demo_pool_op(uo: UserOperation) -> PoolOperation { PoolOperation { @@ -609,55 +586,6 @@ mod tests { assert!(res.is_err()); } - #[tokio::test] - async fn test_update_balance() { - let paymaster_tracker = new_paymaster_tracker(); - - let paymaster = Address::random(); - let pending_op_cost = U256::from(100); - let confirmed_balance = U256::from(1000); - - paymaster_tracker.add_new_paymaster(paymaster, confirmed_balance, pending_op_cost); - - let deposit = BalanceUpdate { - address: paymaster, - entrypoint: Address::random(), - amount: 100.into(), - is_addition: true, - }; - - // deposit - paymaster_tracker - .update_paymaster_balances_after_update(vec![&deposit].into_iter(), vec![].into_iter()); - - let balance = paymaster_tracker - .paymaster_balance(paymaster) - .await - .unwrap(); - - assert_eq!(balance.confirmed_balance, 1100.into()); - - let withdrawal = BalanceUpdate { - address: paymaster, - entrypoint: Address::random(), - amount: 50.into(), - is_addition: false, - }; - - // withdrawal - paymaster_tracker.update_paymaster_balances_after_update( - vec![&withdrawal].into_iter(), - vec![].into_iter(), - ); - - let balance = paymaster_tracker - .paymaster_balance(paymaster) - .await - .unwrap(); - - assert_eq!(balance.confirmed_balance, 1050.into()); - } - #[tokio::test] async fn new_uo_not_enough_balance_tracking_disabled() { let paymaster_tracker = new_paymaster_tracker(); @@ -731,7 +659,35 @@ mod tests { assert_eq!(balance_0.confirmed_balance, 1000.into()); - let _ = paymaster_tracker.reset_confimed_balances().await; + let _ = paymaster_tracker.reset_confirmed_balances().await; + + let balance_0 = paymaster_tracker + .paymaster_balance(paymaster_0) + .await + .unwrap(); + + assert_eq!(balance_0.confirmed_balance, 50.into()); + } + + #[tokio::test] + async fn test_reset_balances_for() { + let paymaster_tracker = new_paymaster_tracker(); + + let paymaster_0 = Address::random(); + let paymaster_0_confimed = 1000.into(); + + paymaster_tracker.add_new_paymaster(paymaster_0, paymaster_0_confimed, 0.into()); + + let balance_0 = paymaster_tracker + .paymaster_balance(paymaster_0) + .await + .unwrap(); + + assert_eq!(balance_0.confirmed_balance, 1000.into()); + + let _ = paymaster_tracker + .reset_confirmed_balances_for(&[paymaster_0]) + .await; let balance_0 = paymaster_tracker .paymaster_balance(paymaster_0) diff --git a/crates/pool/src/mempool/uo_pool.rs b/crates/pool/src/mempool/uo_pool.rs index 5b1ad0bba..8a31e05e8 100644 --- a/crates/pool/src/mempool/uo_pool.rs +++ b/crates/pool/src/mempool/uo_pool.rs @@ -151,15 +151,24 @@ where .iter() .filter(|op| op.entry_point == self.config.entry_point); - let entity_balance_updates = update - .entity_balance_updates - .iter() - .filter(|u| u.entrypoint == self.config.entry_point); + let entity_balance_updates = update.entity_balance_updates.iter().filter_map(|u| { + if u.entrypoint == self.config.entry_point { + Some(u.address) + } else { + None + } + }); let unmined_entity_balance_updates = update .unmined_entity_balance_updates .iter() - .filter(|u| u.entrypoint == self.config.entry_point); + .filter_map(|u| { + if u.entrypoint == self.config.entry_point { + Some(u.address) + } else { + None + } + }); let unmined_ops = deduped_ops .unmined_ops @@ -168,15 +177,6 @@ where let mut mined_op_count = 0; let mut unmined_op_count = 0; - if update.reorg_larger_than_history { - let _ = self.reset_confirmed_paymaster_balances().await; - } - - self.paymaster.update_paymaster_balances_after_update( - entity_balance_updates, - unmined_entity_balance_updates, - ); - for op in mined_ops { if op.entry_point != self.config.entry_point { continue; @@ -199,6 +199,7 @@ where mined_op_count += 1; } } + for op in unmined_ops { if op.entry_point != self.config.entry_point { continue; @@ -220,21 +221,43 @@ where let _ = self.paymaster.add_or_update_balance(&po).await; } } + + // Update paymaster balances AFTER updating the pool to reset confirmed balances if needed. + if update.reorg_larger_than_history { + if let Err(e) = self.reset_confirmed_paymaster_balances().await { + tracing::error!("Failed to reset confirmed paymaster balances: {:?}", e); + } + } else { + let addresses = entity_balance_updates + .chain(unmined_entity_balance_updates) + .unique() + .collect::>(); + if !addresses.is_empty() { + if let Err(e) = self + .paymaster + .reset_confirmed_balances_for(&addresses) + .await + { + tracing::error!("Failed to reset confirmed paymaster balances: {:?}", e); + } + } + } + if mined_op_count > 0 { info!( - "{mined_op_count} op(s) mined on entry point {:?} when advancing to block with number {}, hash {:?}.", - self.config.entry_point, - update.latest_block_number, - update.latest_block_hash, - ); + "{mined_op_count} op(s) mined on entry point {:?} when advancing to block with number {}, hash {:?}.", + self.config.entry_point, + update.latest_block_number, + update.latest_block_hash, + ); } if unmined_op_count > 0 { info!( - "{unmined_op_count} op(s) unmined in reorg on entry point {:?} when advancing to block with number {}, hash {:?}.", - self.config.entry_point, - update.latest_block_number, - update.latest_block_hash, - ); + "{unmined_op_count} op(s) unmined in reorg on entry point {:?} when advancing to block with number {}, hash {:?}.", + self.config.entry_point, + update.latest_block_number, + update.latest_block_hash, + ); } UoPoolMetrics::update_ops_seen( mined_op_count as isize - unmined_op_count as isize, @@ -323,7 +346,7 @@ where } async fn reset_confirmed_paymaster_balances(&self) -> MempoolResult<()> { - self.paymaster.reset_confimed_balances().await + self.paymaster.reset_confirmed_balances().await } async fn get_stake_status(&self, address: Address) -> MempoolResult { @@ -680,6 +703,7 @@ mod tests { use std::collections::HashMap; use ethers::types::{Bytes, H160}; + use mockall::Sequence; use rundler_provider::{DepositInfo, MockEntryPointV0_6}; use rundler_sim::{ MockPrechecker, MockSimulator, PrecheckError, PrecheckSettings, SimulationError, @@ -764,11 +788,25 @@ mod tests { #[tokio::test] async fn chain_update_mine() { let paymaster = Address::random(); - let (pool, uos) = create_pool_insert_ops(vec![ - create_op(Address::random(), 0, 3, None), - create_op(Address::random(), 0, 2, None), - create_op(Address::random(), 0, 1, Some(paymaster)), - ]) + + let mut entrypoint = MockEntryPointV0_6::new(); + // initial balance + entrypoint + .expect_balance_of() + .returning(|_, _| Ok(U256::from(1000))); + // after updates + entrypoint + .expect_get_balances() + .returning(|_| Ok(vec![1110.into()])); + + let (pool, uos) = create_pool_with_entrypoint_insert_ops( + vec![ + create_op(Address::random(), 0, 3, None), + create_op(Address::random(), 0, 2, None), + create_op(Address::random(), 0, 1, Some(paymaster)), + ], + entrypoint, + ) .await; check_ops(pool.best_operations(3, 0).unwrap(), uos.clone()); @@ -819,7 +857,7 @@ mod tests { create_op(Address::random(), 0, 1, Some(paymaster)), ]; - // add pending max cost of 30 for each uo + // add pending max cost of 50 for each uo for op in &mut ops { let uo: &mut UserOperation = op.op.as_mut(); uo.call_gas_limit = 10.into(); @@ -828,7 +866,29 @@ mod tests { uo.max_fee_per_gas = 1.into(); } - let (pool, uos) = create_pool_insert_ops(ops).await; + let mut entrypoint = MockEntryPointV0_6::new(); + // initial balance, pending = 850 + entrypoint + .expect_balance_of() + .returning(|_, _| Ok(U256::from(1000))); + // after updates + let mut seq = Sequence::new(); + // one UO mined with actual cost of 10, unmine deposit of 10, mine deposit 100 + // confirmed = 1000 - 10 - 10 + 100 = 1080. Pending = 1080 - 50*2 = 980 + entrypoint + .expect_get_balances() + .once() + .in_sequence(&mut seq) + .returning(|_| Ok(vec![1080.into()])); + // Unmine UO of 10, unmine deposit of 100 + // confirmed = 1080 + 10 - 100 = 990. Pending = 990 - 50*3 = 840 + entrypoint + .expect_get_balances() + .once() + .in_sequence(&mut seq) + .returning(|_| Ok(vec![990.into()])); + + let (pool, uos) = create_pool_with_entrypoint_insert_ops(ops, entrypoint).await; let metadata = pool.paymaster.paymaster_balance(paymaster).await.unwrap(); assert_eq!(metadata.pending_balance, 850.into()); @@ -872,7 +932,7 @@ mod tests { ); let metadata = pool.paymaster.paymaster_balance(paymaster).await.unwrap(); - assert_eq!(metadata.pending_balance, 1000.into()); + assert_eq!(metadata.pending_balance, 980.into()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -901,7 +961,7 @@ mod tests { .await; let metadata = pool.paymaster.paymaster_balance(paymaster).await.unwrap(); - assert_eq!(metadata.pending_balance, 850.into()); + assert_eq!(metadata.pending_balance, 840.into()); check_ops(pool.best_operations(3, 0).unwrap(), uos); } @@ -1105,8 +1165,13 @@ mod tests { uo.pre_verification_gas = 1000.into(); uo.max_fee_per_gas = 1.into(); + let mut entrypoint = MockEntryPointV0_6::new(); + entrypoint + .expect_balance_of() + .returning(|_, _| Ok(U256::from(1000))); + let uo = op.op.clone(); - let pool = create_pool(vec![op]); + let pool = create_pool_with_entry_point(vec![op], entrypoint); let ret = pool .add_operation(OperationOrigin::Local, uo.clone()) @@ -1204,7 +1269,17 @@ mod tests { #[tokio::test] async fn test_stake_status_not_staked() { - let mut pool = create_pool(vec![]); + let mut entrypoint = MockEntryPointV0_6::new(); + entrypoint.expect_get_deposit_info().returning(|_| { + Ok(DepositInfo { + deposit: 1000.into(), + staked: true, + stake: 10000, + unstake_delay_sec: 100, + withdraw_time: 10, + }) + }); + let mut pool = create_pool_with_entry_point(vec![], entrypoint); pool.config.sim_settings.min_stake_value = 10001; pool.config.sim_settings.min_unstake_delay = 101; @@ -1225,7 +1300,11 @@ mod tests { uo.pre_verification_gas = 10.into(); uo.max_fee_per_gas = 1.into(); - let pool = create_pool(vec![op.clone()]); + let mut entrypoint = MockEntryPointV0_6::new(); + entrypoint + .expect_balance_of() + .returning(|_, _| Ok(U256::from(1000))); + let pool = create_pool_with_entry_point(vec![op.clone()], entrypoint); let _ = pool .add_operation(OperationOrigin::Local, op.op.clone()) @@ -1399,6 +1478,19 @@ mod tests { impl Prechecker, impl Simulator, impl EntryPoint, + > { + let entrypoint = MockEntryPointV0_6::new(); + create_pool_with_entry_point(ops, entrypoint) + } + + fn create_pool_with_entry_point( + ops: Vec, + entrypoint: MockEntryPointV0_6, + ) -> UoPool< + UserOperation, + impl Prechecker, + impl Simulator, + impl EntryPoint, > { let args = PoolConfig { entry_point: Address::random(), @@ -1423,20 +1515,7 @@ mod tests { let mut simulator = MockSimulator::new(); let mut prechecker = MockPrechecker::new(); - let mut entrypoint = MockEntryPointV0_6::new(); - entrypoint.expect_get_deposit_info().returning(|_| { - Ok(DepositInfo { - deposit: 1000.into(), - staked: true, - stake: 10000, - unstake_delay_sec: 100, - withdraw_time: 10, - }) - }); - entrypoint - .expect_balance_of() - .returning(|_, _| Ok(U256::from(1000))); let paymaster = PaymasterTracker::new( entrypoint, PaymasterConfig::new( @@ -1509,6 +1588,26 @@ mod tests { ) } + async fn create_pool_with_entrypoint_insert_ops( + ops: Vec, + entrypoint: MockEntryPointV0_6, + ) -> ( + UoPool< + UserOperation, + impl Prechecker, + impl Simulator, + impl EntryPoint, + >, + Vec, + ) { + let uos = ops.iter().map(|op| op.op.clone()).collect::>(); + let pool = create_pool_with_entry_point(ops, entrypoint); + for op in &uos { + let _ = pool.add_operation(OperationOrigin::Local, op.clone()).await; + } + (pool, uos) + } + async fn create_pool_insert_ops( ops: Vec, ) -> ( diff --git a/crates/rpc/src/eth/error.rs b/crates/rpc/src/eth/error.rs index 6e0858a7c..9bce5e653 100644 --- a/crates/rpc/src/eth/error.rs +++ b/crates/rpc/src/eth/error.rs @@ -42,6 +42,7 @@ const THROTTLED_OR_BANNED_CODE: i32 = -32504; const STAKE_TOO_LOW_CODE: i32 = -32505; const UNSUPORTED_AGGREGATOR_CODE: i32 = -32506; const SIGNATURE_CHECK_FAILED_CODE: i32 = -32507; +const PAYMASTER_DEPOSIT_TOO_LOW: i32 = -32508; const EXECUTION_REVERTED: i32 = -32521; pub(crate) type EthResult = Result; @@ -383,6 +384,7 @@ impl From for ErrorObjectOwned { EthRpcError::PaymasterValidationRejected(data) => { rpc_err_with_data(PAYMASTER_VALIDATION_REJECTED_CODE, msg, data) } + EthRpcError::PaymasterBalanceTooLow(_, _) => rpc_err(PAYMASTER_DEPOSIT_TOO_LOW, msg), EthRpcError::OpcodeViolation(_, _) | EthRpcError::OpcodeViolationMap(_) | EthRpcError::OutOfGas(_) @@ -390,7 +392,6 @@ impl From for ErrorObjectOwned { | EthRpcError::MultipleRolesViolation(_) | EthRpcError::UnstakedPaymasterContext | EthRpcError::SenderAddressUsedAsAlternateEntity(_) - | EthRpcError::PaymasterBalanceTooLow(_, _) | EthRpcError::AssociatedStorageIsAlternateSender | EthRpcError::AssociatedStorageDuringDeploy(_, _, _) | EthRpcError::InvalidStorageAccess(_, _, _) => rpc_err(OPCODE_VIOLATION_CODE, msg),