From fdab1756619c36a0d350e311148f67eb4265b620 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Tue, 18 Jun 2024 22:03:00 -0500 Subject: [PATCH] chore(builder): add unit tests to bundle_sender --- crates/builder/src/bundle_proposer.rs | 2 +- crates/builder/src/bundle_sender.rs | 625 +++++++++++++++++++--- crates/builder/src/transaction_tracker.rs | 3 + 3 files changed, 560 insertions(+), 70 deletions(-) diff --git a/crates/builder/src/bundle_proposer.rs b/crates/builder/src/bundle_proposer.rs index dccf22a3..5f97559c 100644 --- a/crates/builder/src/bundle_proposer.rs +++ b/crates/builder/src/bundle_proposer.rs @@ -91,8 +91,8 @@ impl Bundle { } } -#[cfg_attr(test, automock(type UO = rundler_types::v0_6::UserOperation;))] #[async_trait] +#[cfg_attr(test, automock(type UO = rundler_types::v0_6::UserOperation;))] pub(crate) trait BundleProposer: Send + Sync + 'static { type UO: UserOperation; diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index d3a2f7be..374e1530 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -17,6 +17,8 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256}; use futures_util::StreamExt; +#[cfg(test)] +use mockall::automock; use rundler_provider::{BundleHandler, EntryPoint}; use rundler_sim::ExpectedStorage; use rundler_types::{ @@ -186,7 +188,10 @@ where } } - async fn step_state(&mut self, state: &mut SenderMachineState) -> anyhow::Result<()> { + async fn step_state( + &mut self, + state: &mut SenderMachineState, + ) -> anyhow::Result<()> { let tracker_update = state.wait_for_trigger().await?; match state.inner { @@ -210,9 +215,9 @@ where Ok(()) } - async fn handle_building_state( + async fn handle_building_state( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, inner: BuildingState, ) -> anyhow::Result<()> { // send bundle @@ -273,9 +278,9 @@ where Ok(()) } - async fn handle_pending_state( + async fn handle_pending_state( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, inner: PendingState, tracker_update: Option, ) -> anyhow::Result<()> { @@ -342,9 +347,9 @@ where Ok(()) } - async fn handle_cancelling_state( + async fn handle_cancelling_state( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, inner: CancellingState, ) -> anyhow::Result<()> { info!("Cancelling last transaction"); @@ -393,9 +398,9 @@ where Ok(()) } - async fn handle_cancel_pending_state( + async fn handle_cancel_pending_state( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, inner: CancelPendingState, tracker_update: Option, ) -> anyhow::Result<()> { @@ -444,9 +449,9 @@ where /// - There are no ops available to bundle initially. /// - The gas fees are high enough that the bundle is empty because there /// are no ops that meet the fee requirements. - async fn send_bundle( + async fn send_bundle( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, fee_increase_count: u64, ) -> anyhow::Result { let (nonce, required_fees) = state.transaction_tracker.get_nonce_and_required_fees()?; @@ -528,19 +533,31 @@ where .make_bundle(required_fees, is_replacement) .await .context("proposer should create bundle for builder")?; + let remove_ops_future = async { + if bundle.rejected_ops.is_empty() { + return; + } + let result = self.remove_ops_from_pool(&bundle.rejected_ops).await; if let Err(error) = result { error!("Failed to remove rejected ops from pool: {error}"); } }; + let update_entities_future = async { + if bundle.entity_updates.is_empty() { + return; + } + let result = self.update_entities_in_pool(&bundle.entity_updates).await; if let Err(error) = result { error!("Failed to update entities in pool: {error}"); } }; + join!(remove_ops_future, update_entities_future); + if bundle.is_empty() { if !bundle.rejected_ops.is_empty() || !bundle.entity_updates.is_empty() { info!( @@ -603,16 +620,16 @@ where } } -struct SenderMachineState { - trigger: BundleSenderTrigger, +struct SenderMachineState { + trigger: TRIG, transaction_tracker: T, send_bundle_response: Option>, inner: InnerState, requires_reset: bool, } -impl SenderMachineState { - fn new(trigger: BundleSenderTrigger, transaction_tracker: T) -> Self { +impl SenderMachineState { + fn new(trigger: TRIG, transaction_tracker: T) -> Self { Self { trigger, transaction_tracker, @@ -732,7 +749,7 @@ struct PendingState { impl PendingState { fn to_building(self) -> BuildingState { BuildingState { - wait_for_trigger: true, + wait_for_trigger: false, fee_increase_count: self.fee_increase_count + 1, } } @@ -771,6 +788,18 @@ impl CancelPendingState { } } +#[async_trait] +#[cfg_attr(test, automock)] +trait Trigger { + async fn wait_for_trigger( + &mut self, + ) -> anyhow::Result>>; + + async fn wait_for_block(&mut self) -> anyhow::Result; + + fn last_block(&self) -> &NewHead; +} + struct BundleSenderTrigger { bundling_mode: BundlingMode, block_rx: UnboundedReceiver, @@ -779,55 +808,8 @@ struct BundleSenderTrigger { last_block: NewHead, } -impl BundleSenderTrigger { - async fn new( - pool_client: &P, - bundle_action_receiver: mpsc::Receiver, - timer_interval: Duration, - ) -> anyhow::Result { - let block_rx = Self::start_block_stream(pool_client).await?; - - Ok(Self { - bundling_mode: BundlingMode::Auto, - block_rx, - bundle_action_receiver, - timer: tokio::time::interval(timer_interval), - last_block: NewHead { - block_hash: H256::zero(), - block_number: 0, - }, - }) - } - - async fn start_block_stream( - pool_client: &P, - ) -> anyhow::Result> { - let Ok(mut new_heads) = pool_client.subscribe_new_heads().await else { - error!("Failed to subscribe to new blocks"); - bail!("failed to subscribe to new blocks"); - }; - - let (tx, rx) = mpsc::unbounded_channel(); - tokio::spawn(async move { - loop { - match new_heads.next().await { - Some(b) => { - if tx.send(b).is_err() { - error!("Failed to buffer new block for bundle sender"); - return; - } - } - None => { - error!("Block stream ended"); - return; - } - } - } - }); - - Ok(rx) - } - +#[async_trait] +impl Trigger for BundleSenderTrigger { async fn wait_for_trigger( &mut self, ) -> anyhow::Result>> { @@ -905,6 +887,60 @@ impl BundleSenderTrigger { Ok(self.last_block.clone()) } + fn last_block(&self) -> &NewHead { + &self.last_block + } +} + +impl BundleSenderTrigger { + async fn new( + pool_client: &P, + bundle_action_receiver: mpsc::Receiver, + timer_interval: Duration, + ) -> anyhow::Result { + let block_rx = Self::start_block_stream(pool_client).await?; + + Ok(Self { + bundling_mode: BundlingMode::Auto, + block_rx, + bundle_action_receiver, + timer: tokio::time::interval(timer_interval), + last_block: NewHead { + block_hash: H256::zero(), + block_number: 0, + }, + }) + } + + async fn start_block_stream( + pool_client: &P, + ) -> anyhow::Result> { + let Ok(mut new_heads) = pool_client.subscribe_new_heads().await else { + error!("Failed to subscribe to new blocks"); + bail!("failed to subscribe to new blocks"); + }; + + let (tx, rx) = mpsc::unbounded_channel(); + tokio::spawn(async move { + loop { + match new_heads.next().await { + Some(b) => { + if tx.send(b).is_err() { + error!("Failed to buffer new block for bundle sender"); + return; + } + } + None => { + error!("Block stream ended"); + return; + } + } + } + }); + + Ok(rx) + } + fn consume_blocks(&mut self) -> anyhow::Result<()> { // Consume any other blocks that may have been buffered up loop { @@ -922,10 +958,6 @@ impl BundleSenderTrigger { } } } - - fn last_block(&self) -> &NewHead { - &self.last_block - } } #[derive(Debug, Clone)] @@ -1005,3 +1037,458 @@ impl BuilderMetrics { metrics::counter!("builder_state_machine_errors", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } } + +#[cfg(test)] +mod tests { + use ethers::types::Bytes; + use mockall::Sequence; + use rundler_provider::MockEntryPointV0_6; + use rundler_types::{ + chain::ChainSpec, pool::MockPool, v0_6::UserOperation, UserOpsPerAggregator, + }; + use tokio::sync::{broadcast, mpsc}; + + use super::*; + use crate::{ + bundle_proposer::{Bundle, MockBundleProposer}, + bundle_sender::{BundleSenderImpl, MockTrigger}, + transaction_tracker::MockTransactionTracker, + }; + + #[tokio::test] + async fn test_empty_send() { + let Mocks { + mut mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + // block 0 + add_trigger_no_update_last_block(&mut mock_trigger, &mut Sequence::new(), 0); + + // zero nonce + mock_tracker + .expect_check_for_update() + .returning(|| Box::pin(async { Ok(None) })); + mock_tracker + .expect_get_nonce_and_required_fees() + .returning(|| Ok((U256::zero(), None))); + + // empty bundle + mock_proposer + .expect_make_bundle() + .times(1) + .returning(|_, _| Box::pin(async { Ok(Bundle::::default()) })); + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + // start in building state + let mut state = SenderMachineState::new(mock_trigger, mock_tracker); + + sender.step_state(&mut state).await.unwrap(); + + // empty bundle shouldn't move out of building state + assert!(matches!( + state.inner, + InnerState::Building(BuildingState { + wait_for_trigger: true, + .. + }) + )); + } + + #[tokio::test] + async fn test_send() { + let Mocks { + mut mock_proposer, + mut mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + // block 0 + add_trigger_no_update_last_block(&mut mock_trigger, &mut Sequence::new(), 0); + + // zero nonce + mock_tracker + .expect_check_for_update() + .returning(|| Box::pin(async { Ok(None) })); + mock_tracker + .expect_get_nonce_and_required_fees() + .returning(|| Ok((U256::zero(), None))); + + // bundle with one op + mock_proposer + .expect_make_bundle() + .times(1) + .returning(|_, _| { + Box::pin(async { + Ok(Bundle { + gas_estimate: U256::from(100_000), + gas_fees: GasFees::default(), + expected_storage: Default::default(), + rejected_ops: vec![], + entity_updates: vec![], + ops_per_aggregator: vec![UserOpsPerAggregator { + aggregator: Address::zero(), + signature: Bytes::new(), + user_ops: vec![UserOperation::default()], + }], + }) + }) + }); + + // should create the bundle txn + mock_entry_point + .expect_get_send_bundle_transaction() + .returning(|_, _, _, _| TypedTransaction::default()); + + // should send the bundle txn + mock_tracker + .expect_send_transaction() + .returning(|_, _| Box::pin(async { Ok(H256::zero()) })); + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + // start in building state + let mut state = SenderMachineState::new(mock_trigger, mock_tracker); + + sender.step_state(&mut state).await.unwrap(); + + // end in the pending state + assert!(matches!( + state.inner, + InnerState::Pending(PendingState { + until: 3, // block 0 + wait 3 blocks + .. + }) + )); + } + + #[tokio::test] + async fn test_wait_for_mine_success() { + let Mocks { + mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + let mut seq = Sequence::new(); + add_trigger_wait_for_block_last_block(&mut mock_trigger, &mut seq, 1); + mock_trigger + .expect_wait_for_block() + .once() + .in_sequence(&mut seq) + .returning(|| { + Box::pin(async { + Ok(NewHead { + block_number: 2, + block_hash: H256::zero(), + }) + }) + }); + // no call to last_block after mine + + let mut seq = Sequence::new(); + mock_tracker + .expect_check_for_update() + .once() + .in_sequence(&mut seq) + .returning(|| Box::pin(async { Ok(None) })); + mock_tracker + .expect_check_for_update() + .once() + .in_sequence(&mut seq) + .returning(|| { + Box::pin(async { + Ok(Some(TrackerUpdate::Mined { + block_number: 2, + nonce: U256::zero(), + gas_limit: None, + gas_used: None, + tx_hash: H256::zero(), + attempt_number: 0, + })) + }) + }); + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + // start in pending state + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::Pending(PendingState { + until: 3, + fee_increase_count: 0, + }), + requires_reset: false, + }; + + // first step has no update + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Pending(PendingState { until: 3, .. }) + )); + + // second step is mined and moves back to building + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Building(BuildingState { + wait_for_trigger: true, + fee_increase_count: 0, + }) + )); + } + + #[tokio::test] + async fn test_wait_for_mine_timed_out() { + let Mocks { + mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + let mut seq = Sequence::new(); + for i in 1..=3 { + add_trigger_wait_for_block_last_block(&mut mock_trigger, &mut seq, i); + } + + mock_tracker + .expect_check_for_update() + .times(3) + .returning(|| Box::pin(async { Ok(None) })); + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + // start in pending state + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::Pending(PendingState { + until: 3, + fee_increase_count: 0, + }), + requires_reset: false, + }; + + // first and second step has no update + for _ in 0..2 { + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Pending(PendingState { until: 3, .. }) + )); + } + + // third step times out and moves back to building with a fee increase + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Building(BuildingState { + wait_for_trigger: false, + fee_increase_count: 1, + }) + )); + } + + #[tokio::test] + async fn test_send_cancel() { + let Mocks { + mut mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + mock_proposer + .expect_estimate_gas_fees() + .once() + .returning(|_| Box::pin(async { Ok((GasFees::default(), U256::zero())) })); + + mock_tracker + .expect_cancel_transaction() + .once() + .returning(|_, _| Box::pin(async { Ok(Some(H256::zero())) })); + + mock_trigger.expect_last_block().return_const(NewHead { + block_number: 0, + block_hash: H256::zero(), + }); + + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::Cancelling(CancellingState { + fee_increase_count: 0, + }), + requires_reset: false, + }; + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::CancelPending(CancelPendingState { + until: 3, + fee_increase_count: 0, + }) + )); + } + + #[tokio::test] + async fn test_resubmit_cancel() { + let Mocks { + mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + let mut seq = Sequence::new(); + for i in 1..=3 { + add_trigger_wait_for_block_last_block(&mut mock_trigger, &mut seq, i); + } + + mock_tracker + .expect_check_for_update() + .times(3) + .returning(|| Box::pin(async { Ok(None) })); + + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::CancelPending(CancelPendingState { + until: 3, + fee_increase_count: 0, + }), + requires_reset: false, + }; + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + for _ in 0..2 { + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::CancelPending(CancelPendingState { + until: 3, + fee_increase_count: 0, + }) + )); + } + + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Cancelling(CancellingState { + fee_increase_count: 1, + }) + )); + } + + struct Mocks { + mock_proposer: MockBundleProposer, + mock_entry_point: MockEntryPointV0_6, + mock_tracker: MockTransactionTracker, + mock_trigger: MockTrigger, + } + + fn new_mocks() -> Mocks { + let mut mock_entry_point = MockEntryPointV0_6::new(); + mock_entry_point + .expect_address() + .return_const(Address::default()); + + Mocks { + mock_proposer: MockBundleProposer::new(), + mock_entry_point, + mock_tracker: MockTransactionTracker::new(), + mock_trigger: MockTrigger::new(), + } + } + + fn new_sender( + mock_proposer: MockBundleProposer, + mock_entry_point: MockEntryPointV0_6, + ) -> BundleSenderImpl< + UserOperation, + MockBundleProposer, + MockEntryPointV0_6, + MockTransactionTracker, + MockPool, + > { + BundleSenderImpl::new( + 0, + mpsc::channel(1000).1, + ChainSpec::default(), + Address::default(), + mock_proposer, + mock_entry_point, + MockTransactionTracker::new(), + MockPool::new(), + Settings { + max_fee_increases: 3, + max_blocks_to_wait_for_mine: 3, + }, + broadcast::channel(1000).0, + ) + } + + fn add_trigger_no_update_last_block( + mock_trigger: &mut MockTrigger, + seq: &mut Sequence, + block_number: u64, + ) { + mock_trigger + .expect_wait_for_trigger() + .once() + .in_sequence(seq) + .returning(move || Box::pin(async move { Ok(None) })); + mock_trigger + .expect_last_block() + .once() + .in_sequence(seq) + .return_const(NewHead { + block_number, + block_hash: H256::zero(), + }); + } + + fn add_trigger_wait_for_block_last_block( + mock_trigger: &mut MockTrigger, + seq: &mut Sequence, + block_number: u64, + ) { + mock_trigger + .expect_wait_for_block() + .once() + .in_sequence(seq) + .returning(move || { + Box::pin(async move { + Ok(NewHead { + block_number, + block_hash: H256::zero(), + }) + }) + }); + mock_trigger + .expect_last_block() + .once() + .in_sequence(seq) + .return_const(NewHead { + block_number, + block_hash: H256::zero(), + }); + } +} diff --git a/crates/builder/src/transaction_tracker.rs b/crates/builder/src/transaction_tracker.rs index cc0fff66..4514951f 100644 --- a/crates/builder/src/transaction_tracker.rs +++ b/crates/builder/src/transaction_tracker.rs @@ -16,6 +16,8 @@ use std::sync::Arc; use anyhow::{bail, Context}; use async_trait::async_trait; use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256}; +#[cfg(test)] +use mockall::automock; use rundler_provider::Provider; use rundler_sim::ExpectedStorage; use rundler_types::GasFees; @@ -33,6 +35,7 @@ use crate::sender::{TransactionSender, TxSenderError, TxStatus}; /// succeeded (potentially not the most recent one) or whether circumstances /// have changed so that it is worth making another attempt. #[async_trait] +#[cfg_attr(test, automock)] pub(crate) trait TransactionTracker: Send + Sync + 'static { /// Returns the current nonce and the required fees for the next transaction. fn get_nonce_and_required_fees(&self) -> TransactionTrackerResult<(U256, Option)>;