From e9527c5f88cd65fc60825e0412d01e83c795d65c Mon Sep 17 00:00:00 2001 From: dancoombs Date: Wed, 12 Jun 2024 16:55:55 -0500 Subject: [PATCH 1/4] fix(builder): rework raw sender to support dropped/conditional/split-rpcs correctly --- bin/rundler/src/cli/builder.rs | 58 ++++++++-- crates/builder/src/lib.rs | 3 +- crates/builder/src/sender/conditional.rs | 130 ---------------------- crates/builder/src/sender/mod.rs | 62 ++++++++--- crates/builder/src/sender/raw.rs | 59 +++++++--- crates/builder/src/task.rs | 34 ++++-- crates/builder/src/transaction_tracker.rs | 78 ++++++------- docs/cli.md | 28 +++-- 8 files changed, 220 insertions(+), 232 deletions(-) delete mode 100644 crates/builder/src/sender/conditional.rs diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index 2a5d73c0d..19e8d2225 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -17,8 +17,8 @@ use anyhow::Context; use clap::Args; use rundler_builder::{ self, BloxrouteSenderArgs, BuilderEvent, BuilderEventKind, BuilderTask, BuilderTaskArgs, - EntryPointBuilderSettings, FlashbotsSenderArgs, LocalBuilderBuilder, TransactionSenderArgs, - TransactionSenderKind, + EntryPointBuilderSettings, FlashbotsSenderArgs, LocalBuilderBuilder, RawSenderArgs, + TransactionSenderArgs, TransactionSenderKind, }; use rundler_pool::RemotePoolClient; use rundler_sim::{MempoolConfigs, PriorityFeeMode}; @@ -115,7 +115,7 @@ pub struct BuilderArgs { /// If present, the url of the ETH provider that will be used to send /// transactions. Defaults to the value of `node_http`. /// - /// Only used when BUILDER_SENDER is "raw" or "conditional" + /// Only used when BUILDER_SENDER is "raw" #[arg( long = "builder.submit_url", name = "builder.submit_url", @@ -123,6 +123,40 @@ pub struct BuilderArgs { )] pub submit_url: Option, + /// If present, the url of the ETH provider that will be used to check + /// transaction status. Else will use the node http for status. + /// + /// Only used when BUILDER_SENDER is "raw" + #[arg( + long = "builder.use_submit_for_status", + name = "builder.use_submit_for_status", + env = "BUILDER_USE_SUBMIT_FOR_STATUS", + default_value = "false" + )] + pub use_submit_for_status: bool, + + /// Use the conditional RPC endpoint for transaction submission. + /// + /// Only used when BUILDER_SENDER is "raw" + #[arg( + long = "builder.use_conditional_rpc", + name = "builder.use_conditional_rpc", + env = "BUILDER_USE_CONDITIONAL_RPC", + default_value = "false" + )] + pub use_conditional_rpc: bool, + + /// If the "dropped" status is unsupported by the status provider. + /// + /// Only used when BUILDER_SENDER is "raw" + #[arg( + long = "builder.dropped_status_unsupported", + name = "builder.dropped_status_unsupported", + env = "BUILDER_DROPPED_STATUS_UNSUPPORTED", + default_value = "false" + )] + pub dropped_status_unsupported: bool, + /// A list of builders to pass into the Flashbots Relay RPC. /// /// Only used when BUILDER_SENDER is "flashbots" @@ -216,7 +250,6 @@ impl BuilderArgs { .node_http .clone() .context("should have a node HTTP URL")?; - let submit_url = self.submit_url.clone().unwrap_or_else(|| rpc_url.clone()); let mempool_configs = match &common.mempool_config_path { Some(path) => get_json_config::(path, &common.aws_region) @@ -264,7 +297,7 @@ impl BuilderArgs { )); } - let sender_args = self.sender_args(&chain_spec)?; + let sender_args = self.sender_args(&chain_spec, &rpc_url)?; Ok(BuilderTaskArgs { entry_points, @@ -281,7 +314,6 @@ impl BuilderArgs { redis_lock_ttl_millis: self.redis_lock_ttl_millis, max_bundle_size: self.max_bundle_size, max_bundle_gas: common.max_bundle_gas, - submit_url, bundle_priority_fee_overhead_percent: common.bundle_priority_fee_overhead_percent, priority_fee_mode, sender_args, @@ -294,10 +326,18 @@ impl BuilderArgs { }) } - fn sender_args(&self, chain_spec: &ChainSpec) -> anyhow::Result { + fn sender_args( + &self, + chain_spec: &ChainSpec, + rpc_url: &str, + ) -> anyhow::Result { match self.sender_type { - TransactionSenderKind::Raw => Ok(TransactionSenderArgs::Raw), - TransactionSenderKind::Conditional => Ok(TransactionSenderArgs::Conditional), + TransactionSenderKind::Raw => Ok(TransactionSenderArgs::Raw(RawSenderArgs { + submit_url: self.submit_url.clone().unwrap_or_else(|| rpc_url.into()), + use_submit_for_status: self.use_submit_for_status, + dropped_status_supported: !self.dropped_status_unsupported, + use_conditional_rpc: self.use_conditional_rpc, + })), TransactionSenderKind::Flashbots => { if !chain_spec.flashbots_enabled { return Err(anyhow::anyhow!("Flashbots sender is not enabled for chain")); diff --git a/crates/builder/src/lib.rs b/crates/builder/src/lib.rs index 3363679ff..ceee246fc 100644 --- a/crates/builder/src/lib.rs +++ b/crates/builder/src/lib.rs @@ -27,7 +27,8 @@ pub use emit::{BuilderEvent, BuilderEventKind}; mod sender; pub use sender::{ - BloxrouteSenderArgs, FlashbotsSenderArgs, TransactionSenderArgs, TransactionSenderKind, + BloxrouteSenderArgs, FlashbotsSenderArgs, RawSenderArgs, TransactionSenderArgs, + TransactionSenderKind, }; mod server; diff --git a/crates/builder/src/sender/conditional.rs b/crates/builder/src/sender/conditional.rs deleted file mode 100644 index 5e1ca3e35..000000000 --- a/crates/builder/src/sender/conditional.rs +++ /dev/null @@ -1,130 +0,0 @@ -// This file is part of Rundler. -// -// Rundler is free software: you can redistribute it and/or modify it under the -// terms of the GNU Lesser General Public License as published by the Free Software -// Foundation, either version 3 of the License, or (at your option) any later version. -// -// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -// See the GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License along with Rundler. -// If not, see https://www.gnu.org/licenses/. - -use std::sync::Arc; - -use anyhow::Context; -use ethers::{ - middleware::SignerMiddleware, - providers::{JsonRpcClient, Middleware, PendingTransaction, Provider}, - types::{transaction::eip2718::TypedTransaction, Address, TransactionReceipt, H256, U256}, -}; -use ethers_signers::Signer; -use rundler_sim::ExpectedStorage; -use rundler_types::GasFees; -use serde_json::json; -use tonic::async_trait; - -use super::{ - create_hard_cancel_tx, fill_and_sign, CancelTxInfo, Result, SentTxInfo, TransactionSender, - TxStatus, -}; - -pub(crate) struct ConditionalTransactionSender -where - C: JsonRpcClient + 'static, - S: Signer + 'static, -{ - // The `SignerMiddleware` specifically needs to wrap a `Provider`, and not - // just any `Middleware`, because `.request()` is only on `Provider` and not - // on `Middleware`. - provider: SignerMiddleware>, S>, -} - -#[async_trait] -impl TransactionSender for ConditionalTransactionSender -where - C: JsonRpcClient + 'static, - S: Signer + 'static, -{ - async fn send_transaction( - &self, - tx: TypedTransaction, - expected_storage: &ExpectedStorage, - ) -> Result { - let (raw_tx, nonce) = fill_and_sign(&self.provider, tx).await?; - - let tx_hash = self - .provider - .provider() - .request( - "eth_sendRawTransactionConditional", - (raw_tx, json!({ "knownAccounts": expected_storage })), - ) - .await?; - - Ok(SentTxInfo { nonce, tx_hash }) - } - - async fn cancel_transaction( - &self, - _tx_hash: H256, - nonce: U256, - to: Address, - gas_fees: GasFees, - ) -> Result { - let tx = create_hard_cancel_tx(self.provider.address(), to, nonce, gas_fees); - - let (raw_tx, _) = fill_and_sign(&self.provider, tx).await?; - - let tx_hash = self - .provider - .provider() - .request("eth_sendRawTransaction", (raw_tx,)) - .await?; - - Ok(CancelTxInfo { - tx_hash, - soft_cancelled: false, - }) - } - - async fn get_transaction_status(&self, tx_hash: H256) -> Result { - let tx = self - .provider - .get_transaction(tx_hash) - .await - .context("provider should return transaction status")?; - Ok(match tx { - None => TxStatus::Dropped, - Some(tx) => match tx.block_number { - None => TxStatus::Pending, - Some(block_number) => TxStatus::Mined { - block_number: block_number.as_u64(), - }, - }, - }) - } - - async fn wait_until_mined(&self, tx_hash: H256) -> Result> { - Ok(PendingTransaction::new(tx_hash, self.provider.inner()) - .await - .context("should wait for transaction to be mined or dropped")?) - } - - fn address(&self) -> Address { - self.provider.address() - } -} - -impl ConditionalTransactionSender -where - C: JsonRpcClient + 'static, - S: Signer + 'static, -{ - pub(crate) fn new(provider: Arc>, signer: S) -> Self { - Self { - provider: SignerMiddleware::new(provider, signer), - } - } -} diff --git a/crates/builder/src/sender/mod.rs b/crates/builder/src/sender/mod.rs index df4214a9a..d2bb50fed 100644 --- a/crates/builder/src/sender/mod.rs +++ b/crates/builder/src/sender/mod.rs @@ -12,7 +12,6 @@ // If not, see https://www.gnu.org/licenses/. mod bloxroute; -mod conditional; mod flashbots; mod raw; use std::{sync::Arc, time::Duration}; @@ -20,7 +19,6 @@ use std::{sync::Arc, time::Duration}; use anyhow::Context; use async_trait::async_trait; pub(crate) use bloxroute::PolygonBloxrouteTransactionSender; -pub(crate) use conditional::ConditionalTransactionSender; use enum_dispatch::enum_dispatch; use ethers::{ prelude::SignerMiddleware, @@ -111,7 +109,6 @@ where FS: Signer + 'static, { Raw(RawTransactionSender), - Conditional(ConditionalTransactionSender), Flashbots(FlashbotsTransactionSender), PolygonBloxroute(PolygonBloxrouteTransactionSender), } @@ -122,8 +119,6 @@ where pub enum TransactionSenderKind { /// Raw transaction sender Raw, - /// Conditional transaction sender - Conditional, /// Flashbots transaction sender Flashbots, /// Bloxroute transaction sender @@ -134,15 +129,26 @@ pub enum TransactionSenderKind { #[derive(Debug, Clone)] pub enum TransactionSenderArgs { /// Raw transaction sender - Raw, - /// Conditional transaction sender - Conditional, + Raw(RawSenderArgs), /// Flashbots transaction sender Flashbots(FlashbotsSenderArgs), /// Bloxroute transaction sender Bloxroute(BloxrouteSenderArgs), } +/// Raw sender arguments +#[derive(Debug, Clone)] +pub struct RawSenderArgs { + /// Submit URL + pub submit_url: String, + /// Use submit for status + pub use_submit_for_status: bool, + /// If the "dropped" status is supported by the status provider + pub dropped_status_supported: bool, + /// If the sender should use the conditional endpoint + pub use_conditional_rpc: bool, +} + /// Bloxroute sender arguments #[derive(Debug, Clone)] pub struct BloxrouteSenderArgs { @@ -166,21 +172,47 @@ pub struct FlashbotsSenderArgs { impl TransactionSenderArgs { pub(crate) fn into_sender( self, - client: Arc>, + rpc_provider: Arc>, + submit_provider: Option>>, signer: S, eth_poll_interval: Duration, ) -> std::result::Result, SenderConstructorErrors> { let sender = match self { - Self::Raw => TransactionSenderEnum::Raw(RawTransactionSender::new(client, signer)), - Self::Conditional => TransactionSenderEnum::Conditional( - ConditionalTransactionSender::new(client, signer), - ), + Self::Raw(args) => { + if let Some(submit_provider) = submit_provider { + if args.use_submit_for_status { + TransactionSenderEnum::Raw(RawTransactionSender::new( + Arc::clone(&submit_provider), + submit_provider, + signer, + args.dropped_status_supported, + args.use_conditional_rpc, + )) + } else { + TransactionSenderEnum::Raw(RawTransactionSender::new( + rpc_provider, + submit_provider, + signer, + args.dropped_status_supported, + args.use_conditional_rpc, + )) + } + } else { + TransactionSenderEnum::Raw(RawTransactionSender::new( + Arc::clone(&rpc_provider), + rpc_provider, + signer, + args.dropped_status_supported, + args.use_conditional_rpc, + )) + } + } Self::Flashbots(args) => { let flashbots_signer = args.auth_key.parse().context("should parse auth key")?; TransactionSenderEnum::Flashbots(FlashbotsTransactionSender::new( - client, + rpc_provider, signer, flashbots_signer, args.builders, @@ -190,7 +222,7 @@ impl TransactionSenderArgs { } Self::Bloxroute(args) => { TransactionSenderEnum::PolygonBloxroute(PolygonBloxrouteTransactionSender::new( - client, + rpc_provider, signer, eth_poll_interval, &args.header, diff --git a/crates/builder/src/sender/raw.rs b/crates/builder/src/sender/raw.rs index 4495606e4..3e23f454f 100644 --- a/crates/builder/src/sender/raw.rs +++ b/crates/builder/src/sender/raw.rs @@ -23,6 +23,7 @@ use ethers::{ use ethers_signers::Signer; use rundler_sim::ExpectedStorage; use rundler_types::GasFees; +use serde_json::json; use super::{CancelTxInfo, Result}; use crate::sender::{ @@ -35,10 +36,13 @@ where C: JsonRpcClient + 'static, S: Signer + 'static, { + provider: Arc>, // The `SignerMiddleware` specifically needs to wrap a `Provider`, and not // just any `Middleware`, because `.request()` is only on `Provider` and not // on `Middleware`. - provider: SignerMiddleware>, S>, + submitter: SignerMiddleware>, S>, + dropped_status_supported: bool, + use_conditional_rpc: bool, } #[async_trait] @@ -50,15 +54,25 @@ where async fn send_transaction( &self, tx: TypedTransaction, - _expected_storage: &ExpectedStorage, + expected_storage: &ExpectedStorage, ) -> Result { - let (raw_tx, nonce) = fill_and_sign(&self.provider, tx).await?; + let (raw_tx, nonce) = fill_and_sign(&self.submitter, tx).await?; + + let tx_hash = if self.use_conditional_rpc { + self.submitter + .provider() + .request( + "eth_sendRawTransactionConditional", + (raw_tx, json!({ "knownAccounts": expected_storage })), + ) + .await? + } else { + self.submitter + .provider() + .request("eth_sendRawTransaction", (raw_tx,)) + .await? + }; - let tx_hash = self - .provider - .provider() - .request("eth_sendRawTransaction", (raw_tx,)) - .await?; Ok(SentTxInfo { nonce, tx_hash }) } @@ -69,12 +83,12 @@ where to: Address, gas_fees: GasFees, ) -> Result { - let tx = create_hard_cancel_tx(self.provider.address(), to, nonce, gas_fees); + let tx = create_hard_cancel_tx(self.submitter.address(), to, nonce, gas_fees); - let (raw_tx, _) = fill_and_sign(&self.provider, tx).await?; + let (raw_tx, _) = fill_and_sign(&self.submitter, tx).await?; let tx_hash = self - .provider + .submitter .provider() .request("eth_sendRawTransaction", (raw_tx,)) .await?; @@ -92,7 +106,13 @@ where .await .context("provider should return transaction status")?; Ok(match tx { - None => TxStatus::Dropped, + None => { + if self.dropped_status_supported { + TxStatus::Dropped + } else { + TxStatus::Pending + } + } Some(tx) => match tx.block_number { None => TxStatus::Pending, Some(block_number) => TxStatus::Mined { @@ -109,7 +129,7 @@ where } fn address(&self) -> Address { - self.provider.address() + self.submitter.address() } } @@ -118,9 +138,18 @@ where C: JsonRpcClient + 'static, S: Signer + 'static, { - pub(crate) fn new(provider: Arc>, signer: S) -> Self { + pub(crate) fn new( + provider: Arc>, + submitter: Arc>, + signer: S, + dropped_status_supported: bool, + use_conditional_rpc: bool, + ) -> Self { Self { - provider: SignerMiddleware::new(provider, signer), + provider, + submitter: SignerMiddleware::new(submitter, signer), + dropped_status_supported, + use_conditional_rpc, } } } diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index e934935e3..cb9700ce8 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -77,8 +77,6 @@ pub struct Args { pub max_bundle_size: u64, /// Maximum bundle size in gas limit pub max_bundle_gas: u64, - /// URL to submit bundles too - pub submit_url: String, /// Percentage to add to the network priority fee for the bundle priority fee pub bundle_priority_fee_overhead_percent: u64, /// Priority fee mode to use for operation priority fee minimums @@ -133,6 +131,11 @@ where async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { let provider = rundler_provider::new_provider(&self.args.rpc_url, Some(self.args.eth_poll_interval))?; + let submit_provider = if let TransactionSenderArgs::Raw(args) = &self.args.sender_args { + Some(rundler_provider::new_provider(&args.submit_url, None)?) + } else { + None + }; let ep_v0_6 = EthersEntryPointV0_6::new( self.args.chain_spec.entry_point_address_v0_6, @@ -154,14 +157,24 @@ where match ep.version { EntryPointVersion::V0_6 => { let (handles, actions) = self - .create_builders_v0_6(ep, Arc::clone(&provider), ep_v0_6.clone()) + .create_builders_v0_6( + ep, + Arc::clone(&provider), + submit_provider.clone(), + ep_v0_6.clone(), + ) .await?; sender_handles.extend(handles); bundle_sender_actions.extend(actions); } EntryPointVersion::V0_7 => { let (handles, actions) = self - .create_builders_v0_7(ep, Arc::clone(&provider), ep_v0_7.clone()) + .create_builders_v0_7( + ep, + Arc::clone(&provider), + submit_provider.clone(), + ep_v0_7.clone(), + ) .await?; sender_handles.extend(handles); bundle_sender_actions.extend(actions); @@ -246,6 +259,7 @@ where &self, ep: &EntryPointBuilderSettings, provider: Arc>, + submit_provider: Option>>, ep_v0_6: E, ) -> anyhow::Result<( Vec>>, @@ -263,6 +277,7 @@ where self.create_bundle_builder( i + ep.bundle_builder_index_offset, Arc::clone(&provider), + submit_provider.clone(), ep_v0_6.clone(), UnsafeSimulator::new( Arc::clone(&provider), @@ -275,6 +290,7 @@ where self.create_bundle_builder( i + ep.bundle_builder_index_offset, Arc::clone(&provider), + submit_provider.clone(), ep_v0_6.clone(), simulation::new_v0_6_simulator( Arc::clone(&provider), @@ -295,6 +311,7 @@ where &self, ep: &EntryPointBuilderSettings, provider: Arc>, + submit_provider: Option>>, ep_v0_7: E, ) -> anyhow::Result<( Vec>>, @@ -312,6 +329,7 @@ where self.create_bundle_builder( i + ep.bundle_builder_index_offset, Arc::clone(&provider), + submit_provider.clone(), ep_v0_7.clone(), UnsafeSimulator::new( Arc::clone(&provider), @@ -324,6 +342,7 @@ where self.create_bundle_builder( i + ep.bundle_builder_index_offset, Arc::clone(&provider), + submit_provider.clone(), ep_v0_7.clone(), simulation::new_v0_7_simulator( Arc::clone(&provider), @@ -344,6 +363,7 @@ where &self, index: u64, provider: Arc>, + submit_provider: Option>>, entry_point: E, simulator: S, ) -> anyhow::Result<( @@ -403,12 +423,8 @@ where bundle_priority_fee_overhead_percent: self.args.bundle_priority_fee_overhead_percent, }; - let submit_provider = rundler_provider::new_provider( - &self.args.submit_url, - Some(self.args.eth_poll_interval), - )?; - let transaction_sender = self.args.sender_args.clone().into_sender( + Arc::clone(&provider), submit_provider, signer, self.args.eth_poll_interval, diff --git a/crates/builder/src/transaction_tracker.rs b/crates/builder/src/transaction_tracker.rs index 775ec0905..a68798ee6 100644 --- a/crates/builder/src/transaction_tracker.rs +++ b/crates/builder/src/transaction_tracker.rs @@ -369,7 +369,7 @@ where .await .context("tracker should check for dropped transactions")?; Ok(match status { - TxStatus::Pending | TxStatus::Dropped => None, + TxStatus::Pending => None, TxStatus::Mined { block_number } => { let nonce = self.nonce; self.set_nonce_and_clear_state(nonce + 1); @@ -382,11 +382,11 @@ where gas_limit, gas_used, }) - } // TODO(#295): dropped status is often incorrect, for now just assume its still pending - // TxStatus::Dropped => { - // self.has_dropped = true; - // Some(TrackerUpdate::LatestTxDropped { nonce: self.nonce }) - // } + } + TxStatus::Dropped => { + self.has_dropped = true; + Some(TrackerUpdate::LatestTxDropped { nonce: self.nonce }) + } }) } @@ -513,50 +513,44 @@ mod tests { ); } - // TODO(#295): fix dropped status - // #[tokio::test] - // async fn test_nonce_and_fees_dropped() { - // let (mut sender, mut provider) = create_base_config(); - // sender.expect_address().return_const(Address::zero()); - - // sender - // .expect_get_transaction_status() - // .returning(move |_a| Box::pin(async { Ok(TxStatus::Dropped) })); + #[tokio::test] + async fn test_nonce_and_fees_dropped() { + let (mut sender, mut provider) = create_base_config(); + sender.expect_address().return_const(Address::zero()); - // sender.expect_send_transaction().returning(move |_a, _b| { - // Box::pin(async { - // Ok(SentTxInfo { - // nonce: U256::from(0), - // tx_hash: H256::zero(), - // }) - // }) - // }); + sender + .expect_get_transaction_status() + .returning(move |_a| Box::pin(async { Ok(TxStatus::Dropped) })); - // provider - // .expect_get_transaction_count() - // .returning(move |_a| Ok(U256::from(0))); + sender.expect_send_transaction().returning(move |_a, _b| { + Box::pin(async { + Ok(SentTxInfo { + nonce: U256::from(0), + tx_hash: H256::zero(), + }) + }) + }); - // provider - // .expect_get_block_number() - // .returning(move || Ok(1)) - // .times(1); + provider + .expect_get_transaction_count() + .returning(move |_a| Ok(U256::from(0))); - // let tracker = create_tracker(sender, provider).await; + let mut tracker = create_tracker(sender, provider).await; - // let tx = Eip1559TransactionRequest::new() - // .nonce(0) - // .gas(10000) - // .max_fee_per_gas(10000); - // let exp = ExpectedStorage::default(); + let tx = Eip1559TransactionRequest::new() + .nonce(0) + .gas(10000) + .max_fee_per_gas(10000); + let exp = ExpectedStorage::default(); - // // send dummy transaction - // let _sent = tracker.send_transaction(tx.into(), &exp).await; - // let _tracker_update = tracker.wait_for_update().await.unwrap(); + // send dummy transaction + let _sent = tracker.send_transaction(tx.into(), &exp).await; + let _tracker_update = tracker.check_for_update().await.unwrap(); - // let nonce_and_fees = tracker.get_nonce_and_required_fees().unwrap(); + let nonce_and_fees = tracker.get_nonce_and_required_fees().unwrap(); - // assert_eq!((U256::from(0), None), nonce_and_fees); - // } + assert_eq!((U256::from(0), None), nonce_and_fees); + } #[tokio::test] async fn test_send_transaction_without_nonce() { diff --git a/docs/cli.md b/docs/cli.md index 6249d3b02..2d260d548 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -178,22 +178,28 @@ List of command line options for configuring the Builder. - *Only required when AWS_KMS_KEY_IDS are provided* - `--builder.max_bundle_size`: Maximum number of ops to include in one bundle (default: `128`) - env: *BUILDER_MAX_BUNDLE_SIZE* -- `--builder.submit_url`: If present, the URL of the ETH provider that will be used to send transactions. Defaults to the value of `node_http`. - - env: *BUILDER_SUBMIT_URL* -- `--builder.sender`: Choice of what sender type to use for transaction submission. (default: `raw`, options: `raw`, `conditional`, `flashbots`, `polygon_bloxroute`) - - env: *BUILDER_SENDER* - `--builder.max_blocks_to_wait_for_mine`: After submitting a bundle transaction, the maximum number of blocks to wait for that transaction to mine before trying to resend with higher gas fees (default: `2`) - env: *BUILDER_MAX_BLOCKS_TO_WAIT_FOR_MINE* - `--builder.replacement_fee_percent_increase`: Percentage amount to increase gas fees when retrying a transaction after it failed to mine (default: `10`) - env: *BUILDER_REPLACEMENT_FEE_PERCENT_INCREASE* - `--builder.max_fee_increases`: Maximum number of fee increases to attempt (Seven increases of 10% is roughly 2x the initial fees) (default: `7`) - env: *BUILDER_MAX_FEE_INCREASES* -- `--builder.flashbots_relay_builders`: additional builders to send bundles to through the Flashbots relay RPC (comma-separated). List of builders that the Flashbots RPC supports can be found [here](https://docs.flashbots.net/flashbots-auction/advanced/rpc-endpoint#eth_sendprivatetransaction). (default: `flashbots`) -- `--builder.flashbots_relay_auth_key`: authorization key to use with the flashbots relay. See [here](https://docs.flashbots.net/flashbots-auction/advanced/rpc-endpoint#authentication) for more info. (default: None) +- `--builder.sender`: Choice of what sender type to use for transaction submission. (default: `raw`, options: `raw`, `flashbots`, `polygon_bloxroute`) + - env: *BUILDER_SENDER* +- `--builder.submit_url`: Only used if builder.sender == "raw." If present, the URL of the ETH provider that will be used to send transactions. Defaults to the value of `node_http`. + - env: *BUILDER_SUBMIT_URL* +- `--builder.use_submit_for_status`: Only used if builder.sender == "raw." Use the submit url to get the status of the bundle transaction. (default: `false`) + - env: *BUILDER_USE_SUBMIT_FOR_STATUS* +- `--builder.use_conditional_rpc`: Only used if builder.sender == "raw." Use `eth_sendRawTransactionConditional` when submitting. (default: `false`) + - env: *BUILDER_USE_CONDITIONAL_RPC* +- `--builder.dropped_status_unsupported`: Only used if builder.sender == "raw." If set, the builder will not process a dropped status. Use this if the URL that is being used for status (node_http or submit_url) does not support pending transactions, only those that are mined. (default: `false`) + - env: *BUILDER_DROPPED_STATUS_UNSUPPORTED* +- `--builder.flashbots_relay_builders`: Only used if builder.sender == "flashbots." Additional builders to send bundles to through the Flashbots relay RPC (comma-separated). List of builders that the Flashbots RPC supports can be found [here](https://docs.flashbots.net/flashbots-auction/advanced/rpc-endpoint#eth_sendprivatetransaction). (default: `flashbots`) - env: *BUILDER_FLASHBOTS_RELAY_BUILDERS* -- `--builder.bloxroute_auth_header`: If using the bloxroute transaction sender on Polygon, this is the auth header to supply with the requests. (default: None) +- `--builder.flashbots_relay_auth_key`: Only used/required if builder.sender == "flashbots." Authorization key to use with the flashbots relay. See [here](https://docs.flashbots.net/flashbots-auction/advanced/rpc-endpoint#authentication) for more info. (default: None) + - env: *BUILDER_FLASHBOTS_RELAY_AUTH_KEY* +- `--builder.bloxroute_auth_header`: Only used/required if builder.sender == "polygon_bloxroute." If using the bloxroute transaction sender on Polygon, this is the auth header to supply with the requests. (default: None) - env: `BUILDER_BLOXROUTE_AUTH_HEADER` - - *Only required when `--builder.sender=polygon_bloxroute`* - `--builder.index_offset`: If running multiple builder processes, this is the index offset to assign unique indexes to each bundle sender. (default: 0) - env: `BUILDER_INDEX_OFFSET` - `--builder.pool_url`: If running in distributed mode, the URL of the pool server to use. @@ -214,11 +220,11 @@ Here are some example commands to use the CLI: ```sh # Run the Node subcommand with custom options -$ ./rundler node --entry_points 0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789 --chain_id 1337 --max_verification_gas 10000000 +$ ./rundler node --chain_id 1337 --max_verification_gas 10000000 --disable_entry_point_v0_6 # Run the RPC subcommand with custom options and enable JSON logging. The builder and pool will need to be running before this starts. -$ ./rundler rpc --node_http http://localhost:8545 --log.json +$ ./rundler rpc --node_http http://localhost:8545 --log.json --disable_entry_point_v0_6 # Run the Pool subcommand with custom options and specify a mempool config file -$ ./rundler pool --max_simulate_handle_ops_gas 15000000 --mempool_config_path mempool.json --node_http http://localhost:8545 --chain_id 8453 +$ ./rundler pool --max_simulate_handle_ops_gas 15000000 --mempool_config_path mempool.json --node_http http://localhost:8545 --chain_id 8453 --disable_entry_point_v0_6 ``` From 17d37a7d34a927de718aba17fae52f64117364b4 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Thu, 13 Jun 2024 17:15:09 -0500 Subject: [PATCH 2/4] feat(builder): reject ops if condition not met after failure --- crates/builder/src/bundle_proposer.rs | 94 ++++++++++++++++++- crates/builder/src/bundle_sender.rs | 27 +++++- crates/builder/src/emit.rs | 11 +++ crates/builder/src/sender/mod.rs | 11 +++ crates/builder/src/transaction_tracker.rs | 3 + crates/provider/src/ethers/provider.rs | 46 ++++++++- crates/provider/src/traits/provider.rs | 7 ++ crates/types/build.rs | 1 + crates/types/contracts/foundry.toml | 2 +- .../contracts/src/utils/StorageLoader.sol | 17 ++++ 10 files changed, 209 insertions(+), 10 deletions(-) create mode 100644 crates/types/contracts/src/utils/StorageLoader.sol diff --git a/crates/builder/src/bundle_proposer.rs b/crates/builder/src/bundle_proposer.rs index d006a395f..dccf22a3a 100644 --- a/crates/builder/src/bundle_proposer.rs +++ b/crates/builder/src/bundle_proposer.rs @@ -46,7 +46,7 @@ use rundler_utils::{emit::WithEntryPoint, math}; use tokio::{sync::broadcast, try_join}; use tracing::{error, info, warn}; -use crate::emit::{BuilderEvent, OpRejectionReason, SkipReason}; +use crate::emit::{BuilderEvent, ConditionNotMetReason, OpRejectionReason, SkipReason}; /// Extra buffer percent to add on the bundle transaction gas estimate to be sure it will be enough const BUNDLE_TRANSACTION_GAS_OVERHEAD_PERCENT: u64 = 5; @@ -101,7 +101,7 @@ pub(crate) trait BundleProposer: Send + Sync + 'static { /// If `min_fees` is `Some`, the proposer will ensure the bundle has /// at least `min_fees`. async fn make_bundle( - &self, + &mut self, min_fees: Option, is_replacement: bool, ) -> anyhow::Result>; @@ -111,6 +111,9 @@ pub(crate) trait BundleProposer: Send + Sync + 'static { /// If `min_fees` is `Some`, the proposer will ensure the gas fees returned are at least `min_fees`. async fn estimate_gas_fees(&self, min_fees: Option) -> anyhow::Result<(GasFees, U256)>; + + /// Notifies the proposer that a condition was not met during the last bundle proposal + fn notify_condition_not_met(&mut self); } #[derive(Debug)] @@ -123,6 +126,7 @@ pub(crate) struct BundleProposerImpl { settings: Settings, fee_estimator: FeeEstimator

, event_sender: broadcast::Sender>, + condition_not_met_notified: bool, _uo_type: PhantomData, } @@ -155,8 +159,12 @@ where self.fee_estimator.required_bundle_fees(required_fees).await } + fn notify_condition_not_met(&mut self) { + self.condition_not_met_notified = true; + } + async fn make_bundle( - &self, + &mut self, required_fees: Option, is_replacement: bool, ) -> anyhow::Result> { @@ -238,6 +246,16 @@ where gas_estimate ); + // If recently notified that a bundle condition was not met, check each of + // the conditions again to ensure if they are met, rejecting OPs if they are not. + if self.condition_not_met_notified { + self.condition_not_met_notified = false; + self.check_conditions_met(&mut context).await?; + if context.is_empty() { + break; + } + } + let mut expected_storage = ExpectedStorage::default(); for op in context.iter_ops_with_simulations() { expected_storage.merge(&op.simulation.expected_storage)?; @@ -295,6 +313,7 @@ where ), settings, event_sender, + condition_not_met_notified: false, _uo_type: PhantomData, } } @@ -549,6 +568,73 @@ where context } + async fn check_conditions_met(&self, context: &mut ProposalContext) -> anyhow::Result<()> { + let futs = context + .iter_ops_with_simulations() + .enumerate() + .map(|(i, op)| async move { + self.check_op_conditions_met(&op.simulation.expected_storage) + .await + .map(|reason| (i, reason)) + }) + .collect::>(); + + let to_reject = future::join_all(futs).await.into_iter().flatten(); + + for (index, reason) in to_reject { + self.emit(BuilderEvent::rejected_op( + self.builder_index, + self.op_hash(&context.get_op_at(index)?.op), + OpRejectionReason::ConditionNotMet(reason), + )); + self.reject_index(context, index).await; + } + + Ok(()) + } + + async fn check_op_conditions_met( + &self, + expected_storage: &ExpectedStorage, + ) -> Option { + let futs = expected_storage + .0 + .iter() + .map(|(address, slots)| async move { + let storage = match self + .provider + .batch_get_storage_at(*address, slots.keys().copied().collect()) + .await + { + Ok(storage) => storage, + Err(e) => { + error!("Error getting storage for address {address:?} failing open: {e:?}"); + return None; + } + }; + + for ((slot, expected), actual) in slots.iter().zip(storage) { + if *expected != actual { + return Some(ConditionNotMetReason { + address: *address, + slot: *slot, + expected: *expected, + actual, + }); + } + } + None + }); + + let results = future::join_all(futs).await; + for result in results { + if result.is_some() { + return result; + } + } + None + } + async fn reject_index(&self, context: &mut ProposalContext, i: usize) { let changed_aggregator = context.reject_index(i); self.compute_aggregator_signatures(context, &changed_aggregator) @@ -2154,7 +2240,7 @@ mod tests { .expect_aggregate_signatures() .returning(move |address, _| Ok(signatures_by_aggregator[&address]().unwrap())); let (event_sender, _) = broadcast::channel(16); - let proposer = BundleProposerImpl::new( + let mut proposer = BundleProposerImpl::new( 0, pool_client, simulator, diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index 8a7a37af2..d3a2f7be7 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -104,6 +104,8 @@ enum SendBundleAttemptResult { NoOperations, // Replacement Underpriced ReplacementUnderpriced, + // Condition not met + ConditionNotMet, // Nonce too low NonceTooLow, } @@ -255,6 +257,11 @@ where info!("Replacement transaction underpriced, entering cancellation loop"); state.update(InnerState::Cancelling(inner.to_cancelling())); } + Ok(SendBundleAttemptResult::ConditionNotMet) => { + info!("Condition not met, notifying proposer and starting new bundle attempt"); + self.proposer.notify_condition_not_met(); + state.reset(); + } Err(error) => { error!("Bundle send error {error:?}"); self.metrics.increment_bundle_txns_failed(); @@ -487,14 +494,20 @@ where Ok(SendBundleAttemptResult::Success) } Err(TransactionTrackerError::NonceTooLow) => { - warn!("Replacement transaction underpriced"); + self.metrics.increment_bundle_txn_nonce_too_low(); + warn!("Bundle attempt nonce too low"); Ok(SendBundleAttemptResult::NonceTooLow) } Err(TransactionTrackerError::ReplacementUnderpriced) => { self.metrics.increment_bundle_txn_replacement_underpriced(); - warn!("Replacement transaction underpriced"); + warn!("Bundle attempt replacement transaction underpriced"); Ok(SendBundleAttemptResult::ReplacementUnderpriced) } + Err(TransactionTrackerError::ConditionNotMet) => { + self.metrics.increment_bundle_txn_condition_not_met(); + warn!("Bundle attempt condition not met"); + Ok(SendBundleAttemptResult::ConditionNotMet) + } Err(e) => { error!("Failed to send bundle with unexpected error: {e:?}"); Err(e.into()) @@ -505,7 +518,7 @@ where /// Builds a bundle and returns some metadata and the transaction to send /// it, or `None` if there are no valid operations available. async fn get_bundle_tx( - &self, + &mut self, nonce: U256, required_fees: Option, is_replacement: bool, @@ -964,6 +977,14 @@ impl BuilderMetrics { metrics::counter!("builder_bundle_replacement_underpriced", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } + fn increment_bundle_txn_nonce_too_low(&self) { + metrics::counter!("builder_bundle_nonce_too_low", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); + } + + fn increment_bundle_txn_condition_not_met(&self) { + metrics::counter!("builder_bundle_condition_not_met", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); + } + fn increment_cancellation_txns_sent(&self) { metrics::counter!("builder_cancellation_txns_sent", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } diff --git a/crates/builder/src/emit.rs b/crates/builder/src/emit.rs index e5c70cd36..66de76f01 100644 --- a/crates/builder/src/emit.rs +++ b/crates/builder/src/emit.rs @@ -196,6 +196,17 @@ pub enum OpRejectionReason { FailedRevalidation { error: SimulationError }, /// Operation reverted during bundle formation simulation with message FailedInBundle { message: Arc }, + /// Operation's storage slot condition was not met + ConditionNotMet(ConditionNotMetReason), +} + +/// Reason for a condition not being met +#[derive(Clone, Debug)] +pub struct ConditionNotMetReason { + pub address: Address, + pub slot: H256, + pub expected: H256, + pub actual: H256, } impl Display for BuilderEvent { diff --git a/crates/builder/src/sender/mod.rs b/crates/builder/src/sender/mod.rs index d2bb50fed..122640c78 100644 --- a/crates/builder/src/sender/mod.rs +++ b/crates/builder/src/sender/mod.rs @@ -66,6 +66,9 @@ pub(crate) enum TxSenderError { /// Nonce too low #[error("nonce too low")] NonceTooLow, + /// Conditional value not met + #[error("storage slot value condition not met")] + ConditionNotMet, /// Soft cancellation failed #[error("soft cancel failed")] SoftCancelFailed, @@ -292,6 +295,14 @@ impl From for TxSenderError { return TxSenderError::ReplacementUnderpriced; } else if e.message.contains("nonce too low") { return TxSenderError::NonceTooLow; + // Arbitrum conditional sender error message + // TODO push them to use a specific error code and to return the specific slot that is not met. + } else if e + .message + .to_lowercase() + .contains("storage slot value condition not met") + { + return TxSenderError::ConditionNotMet; } } TxSenderError::Other(value.into()) diff --git a/crates/builder/src/transaction_tracker.rs b/crates/builder/src/transaction_tracker.rs index a68798ee6..cc0fff660 100644 --- a/crates/builder/src/transaction_tracker.rs +++ b/crates/builder/src/transaction_tracker.rs @@ -77,6 +77,8 @@ pub(crate) enum TransactionTrackerError { NonceTooLow, #[error("replacement transaction underpriced")] ReplacementUnderpriced, + #[error("storage slot value condition not met")] + ConditionNotMet, /// All other errors #[error(transparent)] Other(#[from] anyhow::Error), @@ -403,6 +405,7 @@ impl From for TransactionTrackerError { TxSenderError::ReplacementUnderpriced => { TransactionTrackerError::ReplacementUnderpriced } + TxSenderError::ConditionNotMet => TransactionTrackerError::ConditionNotMet, TxSenderError::SoftCancelFailed => { TransactionTrackerError::Other(anyhow::anyhow!("soft cancel failed")) } diff --git a/crates/provider/src/ethers/provider.rs b/crates/provider/src/ethers/provider.rs index 739d7ad3f..baa757249 100644 --- a/crates/provider/src/ethers/provider.rs +++ b/crates/provider/src/ethers/provider.rs @@ -29,8 +29,9 @@ use ethers::{ }, }; use reqwest::Url; -use rundler_types::contracts::utils::get_gas_used::{ - GasUsedResult, GetGasUsed, GETGASUSED_DEPLOYED_BYTECODE, +use rundler_types::contracts::utils::{ + get_gas_used::{GasUsedResult, GetGasUsed, GETGASUSED_DEPLOYED_BYTECODE}, + storage_loader::STORAGELOADER_DEPLOYED_BYTECODE, }; use serde::{de::DeserializeOwned, Serialize}; @@ -207,6 +208,47 @@ impl Provider for EthersProvider { .await .context("should get gas used")?) } + + async fn batch_get_storage_at( + &self, + address: Address, + slots: Vec, + ) -> ProviderResult> { + let mut state_overrides = spoof::State::default(); + state_overrides + .account(address) + .code(STORAGELOADER_DEPLOYED_BYTECODE.clone()); + + let expected_ret_size = slots.len() * 32; + let slot_data = slots + .into_iter() + .flat_map(|slot| slot.to_fixed_bytes()) + .collect::>(); + + let tx: TypedTransaction = Eip1559TransactionRequest { + to: Some(address.into()), + data: Some(slot_data.into()), + ..Default::default() + } + .into(); + + let result_bytes = self + .call_raw(&tx) + .state(&state_overrides) + .await + .context("should call storage loader")?; + + if result_bytes.len() != expected_ret_size { + return Err(anyhow::anyhow!( + "expected {} bytes, got {}", + expected_ret_size, + result_bytes.len() + ) + .into()); + } + + Ok(result_bytes.chunks(32).map(H256::from_slice).collect()) + } } impl From for ProviderError { diff --git a/crates/provider/src/traits/provider.rs b/crates/provider/src/traits/provider.rs index 86a313c8e..0aac930b1 100644 --- a/crates/provider/src/traits/provider.rs +++ b/crates/provider/src/traits/provider.rs @@ -137,4 +137,11 @@ pub trait Provider: Send + Sync + Debug + 'static { data: Bytes, state_overrides: spoof::State, ) -> ProviderResult; + + /// Get the storage values at a given address and slots + async fn batch_get_storage_at( + &self, + address: Address, + slots: Vec, + ) -> ProviderResult>; } diff --git a/crates/types/build.rs b/crates/types/build.rs index 6bbb22fa4..f69d11246 100644 --- a/crates/types/build.rs +++ b/crates/types/build.rs @@ -88,6 +88,7 @@ fn generate_utils_bindings() -> Result<(), Box> { MultiAbigen::from_abigens([ abigen_of("utils", "GetCodeHashes")?, abigen_of("utils", "GetGasUsed")?, + abigen_of("utils", "StorageLoader")?, ]) .build()? .write_to_module("src/contracts/utils", false)?; diff --git a/crates/types/contracts/foundry.toml b/crates/types/contracts/foundry.toml index 0705faad6..ea3c24180 100644 --- a/crates/types/contracts/foundry.toml +++ b/crates/types/contracts/foundry.toml @@ -4,7 +4,7 @@ out = 'out' libs = ['lib'] test = 'test' cache_path = 'cache' -solc_version = '0.8.23' +solc_version = '0.8.26' remappings = [ 'forge-std/=lib/forge-std/src', diff --git a/crates/types/contracts/src/utils/StorageLoader.sol b/crates/types/contracts/src/utils/StorageLoader.sol new file mode 100644 index 000000000..1cdab1d1f --- /dev/null +++ b/crates/types/contracts/src/utils/StorageLoader.sol @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: UNLICENSED +pragma solidity ^0.8.25; + +contract StorageLoader { + fallback() external payable { + assembly { + let cursor := 0 + + for {} lt(cursor, calldatasize()) {cursor := add(cursor, 0x20)} { + let slot := calldataload(cursor) + mstore(cursor, sload(slot)) + } + + return(0, cursor) + } + } +} From e94744ad4541e05bb9898accf27aef90c44fb699 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Fri, 14 Jun 2024 10:40:34 -0500 Subject: [PATCH 3/4] feat(builder): allow for multiple private keys --- bin/rundler/src/cli/builder.rs | 47 ++++++++++++++++++++++++++------ crates/builder/src/sender/mod.rs | 36 +++++++++--------------- crates/builder/src/task.rs | 25 +++++++++++++---- docs/cli.md | 5 +++- 4 files changed, 74 insertions(+), 39 deletions(-) diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index 19e8d2225..ecb1952e6 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -13,7 +13,7 @@ use std::{net::SocketAddr, time::Duration}; -use anyhow::Context; +use anyhow::{bail, Context}; use clap::Args; use rundler_builder::{ self, BloxrouteSenderArgs, BuilderEvent, BuilderEventKind, BuilderTask, BuilderTaskArgs, @@ -57,6 +57,10 @@ pub struct BuilderArgs { host: String, /// Private key to use for signing transactions + /// DEPRECATED: Use `builder.private_keys` instead + /// + /// If both `builder.private_key` and `builder.private_keys` are set, `builder.private_key` is appended + /// to `builder.private_keys`. Keys must be unique. #[arg( long = "builder.private_key", name = "builder.private_key", @@ -64,6 +68,17 @@ pub struct BuilderArgs { )] private_key: Option, + /// Private keys to use for signing transactions + /// + /// Cannot use both `builder.private_key` and `builder.aws_kms_key_ids` at the same time. + #[arg( + long = "builder.private_keys", + name = "builder.private_keys", + env = "BUILDER_PRIVATE_KEYS", + value_delimiter = ',' + )] + private_keys: Vec, + /// AWS KMS key IDs to use for signing transactions #[arg( long = "builder.aws_kms_key_ids", @@ -284,17 +299,31 @@ impl BuilderArgs { num_builders += common.num_builders_v0_7; } - if self.private_key.is_some() { - if num_builders > 1 { - return Err(anyhow::anyhow!( - "Cannot use a private key with multiple builders. You may need to disable one of the entry points." - )); + if (self.private_key.is_some() || !self.private_keys.is_empty()) + && !self.aws_kms_key_ids.is_empty() + { + bail!( + "Cannot use both builder.private_key(s) and builder.aws_kms_key_ids at the same time." + ); + } + + let mut private_keys = self.private_keys.clone(); + if self.private_key.is_some() || !self.private_keys.is_empty() { + if let Some(pk) = &self.private_key { + private_keys.push(pk.clone()); + } + + if num_builders > private_keys.len() as u64 { + bail!( + "Found {} private keys, but need {} keys for the number of builders. You may need to disable one of the entry points.", + private_keys.len(), num_builders + ); } } else if self.aws_kms_key_ids.len() < num_builders as usize { - return Err(anyhow::anyhow!( + bail!( "Not enough AWS KMS key IDs for the number of builders. Need {} keys, found {}. You may need to disable one of the entry points.", num_builders, self.aws_kms_key_ids.len() - )); + ); } let sender_args = self.sender_args(&chain_spec, &rpc_url)?; @@ -304,7 +333,7 @@ impl BuilderArgs { chain_spec, unsafe_mode: common.unsafe_mode, rpc_url, - private_key: self.private_key.clone(), + private_keys, aws_kms_key_ids: self.aws_kms_key_ids.clone(), aws_kms_region: common .aws_region diff --git a/crates/builder/src/sender/mod.rs b/crates/builder/src/sender/mod.rs index 122640c78..fcc0ed70d 100644 --- a/crates/builder/src/sender/mod.rs +++ b/crates/builder/src/sender/mod.rs @@ -183,33 +183,23 @@ impl TransactionSenderArgs { { let sender = match self { Self::Raw(args) => { - if let Some(submit_provider) = submit_provider { + let (provider, submitter) = if let Some(submit_provider) = submit_provider { if args.use_submit_for_status { - TransactionSenderEnum::Raw(RawTransactionSender::new( - Arc::clone(&submit_provider), - submit_provider, - signer, - args.dropped_status_supported, - args.use_conditional_rpc, - )) + (Arc::clone(&submit_provider), submit_provider) } else { - TransactionSenderEnum::Raw(RawTransactionSender::new( - rpc_provider, - submit_provider, - signer, - args.dropped_status_supported, - args.use_conditional_rpc, - )) + (rpc_provider, submit_provider) } } else { - TransactionSenderEnum::Raw(RawTransactionSender::new( - Arc::clone(&rpc_provider), - rpc_provider, - signer, - args.dropped_status_supported, - args.use_conditional_rpc, - )) - } + (Arc::clone(&rpc_provider), rpc_provider) + }; + + TransactionSenderEnum::Raw(RawTransactionSender::new( + provider, + submitter, + signer, + args.dropped_status_supported, + args.use_conditional_rpc, + )) } Self::Flashbots(args) => { let flashbots_signer = args.auth_key.parse().context("should parse auth key")?; diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index cb9700ce8..0ed558e36 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -62,8 +62,8 @@ pub struct Args { /// True if using unsafe mode pub unsafe_mode: bool, /// Private key to use for signing transactions - /// If not provided, AWS KMS will be used - pub private_key: Option, + /// If empty, AWS KMS will be used + pub private_keys: Vec, /// AWS KMS key ids to use for signing transactions /// Only used if private_key is not provided pub aws_kms_key_ids: Vec, @@ -152,6 +152,7 @@ where let mut sender_handles = vec![]; let mut bundle_sender_actions = vec![]; + let mut pk_iter = self.args.private_keys.clone().into_iter(); for ep in &self.args.entry_points { match ep.version { @@ -162,6 +163,7 @@ where Arc::clone(&provider), submit_provider.clone(), ep_v0_6.clone(), + &mut pk_iter, ) .await?; sender_handles.extend(handles); @@ -174,6 +176,7 @@ where Arc::clone(&provider), submit_provider.clone(), ep_v0_7.clone(), + &mut pk_iter, ) .await?; sender_handles.extend(handles); @@ -255,12 +258,13 @@ where Box::new(self) } - async fn create_builders_v0_6( + async fn create_builders_v0_6( &self, ep: &EntryPointBuilderSettings, provider: Arc>, submit_provider: Option>>, ep_v0_6: E, + pk_iter: &mut I, ) -> anyhow::Result<( Vec>>, Vec>, @@ -268,6 +272,7 @@ where where C: JsonRpcClient + 'static, E: EntryPointProvider + Clone, + I: Iterator, { info!("Mempool config for ep v0.6: {:?}", ep.mempool_configs); let mut sender_handles = vec![]; @@ -284,6 +289,7 @@ where ep_v0_6.clone(), self.args.sim_settings, ), + pk_iter, ) .await? } else { @@ -298,6 +304,7 @@ where self.args.sim_settings, ep.mempool_configs.clone(), ), + pk_iter, ) .await? }; @@ -307,12 +314,13 @@ where Ok((sender_handles, bundle_sender_actions)) } - async fn create_builders_v0_7( + async fn create_builders_v0_7( &self, ep: &EntryPointBuilderSettings, provider: Arc>, submit_provider: Option>>, ep_v0_7: E, + pk_iter: &mut I, ) -> anyhow::Result<( Vec>>, Vec>, @@ -320,6 +328,7 @@ where where C: JsonRpcClient + 'static, E: EntryPointProvider + Clone, + I: Iterator, { info!("Mempool config for ep v0.7: {:?}", ep.mempool_configs); let mut sender_handles = vec![]; @@ -336,6 +345,7 @@ where ep_v0_7.clone(), self.args.sim_settings, ), + pk_iter, ) .await? } else { @@ -350,6 +360,7 @@ where self.args.sim_settings, ep.mempool_configs.clone(), ), + pk_iter, ) .await? }; @@ -359,13 +370,14 @@ where Ok((sender_handles, bundle_sender_actions)) } - async fn create_bundle_builder( + async fn create_bundle_builder( &self, index: u64, provider: Arc>, submit_provider: Option>>, entry_point: E, simulator: S, + pk_iter: &mut I, ) -> anyhow::Result<( JoinHandle>, mpsc::Sender, @@ -376,10 +388,11 @@ where E: EntryPointProvider + Clone, S: Simulator, C: JsonRpcClient + 'static, + I: Iterator, { let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1); - let signer = if let Some(pk) = &self.args.private_key { + let signer = if let Some(pk) = pk_iter.next() { info!("Using local signer"); BundlerSigner::Local( LocalSigner::connect( diff --git a/docs/cli.md b/docs/cli.md index 2d260d548..9936c69b5 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -166,10 +166,13 @@ List of command line options for configuring the Builder. - *Only required when running in distributed mode* - `--builder.private_key`: Private key to use for signing transactions - env: *BUILDER_PRIVATE_KEY* - - *Always used if provided. If not provided builder.aws_kms_key_ids is used* + - **DEPRECATED**: Use `--builder.private_keys` instead. If both used this is added to the list. +- `--builder.private_keys`: Private keys to use for signing transactions, separated by `,` + - env: *BUILDER_PRIVATE_KEYS* - `--builder.aws_kms_key_ids`: AWS KMS key IDs to use for signing transactions (comma-separated) - env: *BUILDER_AWS_KMS_KEY_IDS* - *Only required if BUILDER_PRIVATE_KEY is not provided* + - *Cannot use `builder.private_keys` and `builder.aws_kms_key_ids` at the same time* - `--builder.redis_uri`: Redis URI to use for KMS leasing (default: `""`) - env: *BUILDER_REDIS_URI* - *Only required when AWS_KMS_KEY_IDS are provided* From 16f7006d3136c8ec2add06d62cd385b71cab3397 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Tue, 18 Jun 2024 22:03:00 -0500 Subject: [PATCH 4/4] chore(builder): add unit tests to bundle_sender --- crates/builder/src/bundle_proposer.rs | 111 +++- crates/builder/src/bundle_sender.rs | 697 +++++++++++++++++++--- crates/builder/src/transaction_tracker.rs | 3 + 3 files changed, 741 insertions(+), 70 deletions(-) diff --git a/crates/builder/src/bundle_proposer.rs b/crates/builder/src/bundle_proposer.rs index dccf22a3a..17999cfcc 100644 --- a/crates/builder/src/bundle_proposer.rs +++ b/crates/builder/src/bundle_proposer.rs @@ -91,8 +91,8 @@ impl Bundle { } } -#[cfg_attr(test, automock(type UO = rundler_types::v0_6::UserOperation;))] #[async_trait] +#[cfg_attr(test, automock(type UO = rundler_types::v0_6::UserOperation;))] pub(crate) trait BundleProposer: Send + Sync + 'static { type UO: UserOperation; @@ -1579,6 +1579,8 @@ mod tests { vec![], base_fee, max_priority_fee_per_gas, + false, + ExpectedStorage::default(), ) .await; assert_eq!( @@ -1613,6 +1615,8 @@ mod tests { vec![], base_fee, max_priority_fee_per_gas, + false, + ExpectedStorage::default(), ) .await; assert_eq!( @@ -1655,6 +1659,8 @@ mod tests { vec![], base_fee, max_priority_fee_per_gas, + false, + ExpectedStorage::default(), ) .await; assert_eq!( @@ -1746,6 +1752,8 @@ mod tests { vec![], U256::zero(), U256::zero(), + false, + ExpectedStorage::default(), ) .await; // Ops should be grouped by aggregator. Further, the `signature` field @@ -1834,6 +1842,8 @@ mod tests { vec![deposit, deposit, deposit], U256::zero(), U256::zero(), + false, + ExpectedStorage::default(), ) .await; @@ -1895,6 +1905,8 @@ mod tests { vec![deposit, deposit, deposit], U256::zero(), U256::zero(), + false, + ExpectedStorage::default(), ) .await; @@ -2024,6 +2036,8 @@ mod tests { vec![], U256::zero(), U256::zero(), + false, + ExpectedStorage::default(), ) .await; @@ -2056,6 +2070,8 @@ mod tests { vec![], U256::zero(), U256::zero(), + false, + ExpectedStorage::default(), ) .await; @@ -2122,6 +2138,8 @@ mod tests { vec![], U256::zero(), U256::zero(), + false, + ExpectedStorage::default(), ) .await; @@ -2138,6 +2156,76 @@ mod tests { ); } + #[tokio::test] + async fn test_condition_not_met_match() { + let op = default_op(); + + let mut expected_storage = ExpectedStorage::default(); + expected_storage.insert(address(1), U256::zero(), U256::zero()); + let actual_storage = expected_storage.clone(); + + let bundle = mock_make_bundle( + vec![MockOp { + op: op.clone(), + simulation_result: Box::new(move || { + Ok(SimulationResult { + expected_storage: expected_storage.clone(), + ..Default::default() + }) + }), + }], + vec![], + vec![HandleOpsOut::Success], + vec![], + U256::zero(), + U256::zero(), + true, + actual_storage, + ) + .await; + + assert_eq!( + bundle.ops_per_aggregator, + vec![UserOpsPerAggregator { + user_ops: vec![op], + ..Default::default() + }] + ); + } + + #[tokio::test] + async fn test_condition_not_met_mismatch() { + let op = default_op(); + + let mut expected_storage = ExpectedStorage::default(); + expected_storage.insert(address(1), U256::zero(), U256::zero()); + let mut actual_storage = ExpectedStorage::default(); + actual_storage.insert(address(1), U256::zero(), U256::from(1)); + + let bundle = mock_make_bundle( + vec![MockOp { + op: op.clone(), + simulation_result: Box::new(move || { + Ok(SimulationResult { + expected_storage: expected_storage.clone(), + ..Default::default() + }) + }), + }], + vec![], + vec![HandleOpsOut::Success], + vec![], + U256::zero(), + U256::zero(), + true, + actual_storage, + ) + .await; + + assert!(bundle.ops_per_aggregator.is_empty()); + assert_eq!(bundle.rejected_ops, vec![op]); + } + struct MockOp { op: UserOperation, simulation_result: Box Result + Send + Sync>, @@ -2156,10 +2244,13 @@ mod tests { vec![], U256::zero(), U256::zero(), + false, + ExpectedStorage::default(), ) .await } + #[allow(clippy::too_many_arguments)] async fn mock_make_bundle( mock_ops: Vec, mock_aggregators: Vec, @@ -2167,6 +2258,8 @@ mod tests { mock_paymaster_deposits: Vec, base_fee: U256, max_priority_fee_per_gas: U256, + notify_condition_not_met: bool, + actual_storage: ExpectedStorage, ) -> Bundle { let entry_point_address = address(123); let beneficiary = address(124); @@ -2226,6 +2319,7 @@ mod tests { .into_iter() .map(|agg| (agg.address, agg.signature)) .collect(); + let mut provider = MockProvider::new(); provider .expect_get_latest_block_hash_and_number() @@ -2236,6 +2330,16 @@ mod tests { provider .expect_get_max_priority_fee() .returning(move || Ok(max_priority_fee_per_gas)); + if notify_condition_not_met { + for (addr, slots) in actual_storage.0.into_iter() { + let values = slots.values().cloned().collect::>(); + provider + .expect_batch_get_storage_at() + .withf(move |a, s| *a == addr && s.iter().all(|slot| slots.contains_key(slot))) + .returning(move |_, _| Ok(values.clone())); + } + } + entry_point .expect_aggregate_signatures() .returning(move |address, _| Ok(signatures_by_aggregator[&address]().unwrap())); @@ -2256,6 +2360,11 @@ mod tests { }, event_sender, ); + + if notify_condition_not_met { + proposer.notify_condition_not_met(); + } + proposer .make_bundle(None, false) .await diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index d3a2f7be7..4c088dbb0 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -17,6 +17,8 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256}; use futures_util::StreamExt; +#[cfg(test)] +use mockall::automock; use rundler_provider::{BundleHandler, EntryPoint}; use rundler_sim::ExpectedStorage; use rundler_types::{ @@ -186,7 +188,10 @@ where } } - async fn step_state(&mut self, state: &mut SenderMachineState) -> anyhow::Result<()> { + async fn step_state( + &mut self, + state: &mut SenderMachineState, + ) -> anyhow::Result<()> { let tracker_update = state.wait_for_trigger().await?; match state.inner { @@ -210,9 +215,9 @@ where Ok(()) } - async fn handle_building_state( + async fn handle_building_state( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, inner: BuildingState, ) -> anyhow::Result<()> { // send bundle @@ -273,9 +278,9 @@ where Ok(()) } - async fn handle_pending_state( + async fn handle_pending_state( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, inner: PendingState, tracker_update: Option, ) -> anyhow::Result<()> { @@ -342,9 +347,9 @@ where Ok(()) } - async fn handle_cancelling_state( + async fn handle_cancelling_state( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, inner: CancellingState, ) -> anyhow::Result<()> { info!("Cancelling last transaction"); @@ -393,9 +398,9 @@ where Ok(()) } - async fn handle_cancel_pending_state( + async fn handle_cancel_pending_state( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, inner: CancelPendingState, tracker_update: Option, ) -> anyhow::Result<()> { @@ -444,9 +449,9 @@ where /// - There are no ops available to bundle initially. /// - The gas fees are high enough that the bundle is empty because there /// are no ops that meet the fee requirements. - async fn send_bundle( + async fn send_bundle( &mut self, - state: &mut SenderMachineState, + state: &mut SenderMachineState, fee_increase_count: u64, ) -> anyhow::Result { let (nonce, required_fees) = state.transaction_tracker.get_nonce_and_required_fees()?; @@ -528,19 +533,31 @@ where .make_bundle(required_fees, is_replacement) .await .context("proposer should create bundle for builder")?; + let remove_ops_future = async { + if bundle.rejected_ops.is_empty() { + return; + } + let result = self.remove_ops_from_pool(&bundle.rejected_ops).await; if let Err(error) = result { error!("Failed to remove rejected ops from pool: {error}"); } }; + let update_entities_future = async { + if bundle.entity_updates.is_empty() { + return; + } + let result = self.update_entities_in_pool(&bundle.entity_updates).await; if let Err(error) = result { error!("Failed to update entities in pool: {error}"); } }; + join!(remove_ops_future, update_entities_future); + if bundle.is_empty() { if !bundle.rejected_ops.is_empty() || !bundle.entity_updates.is_empty() { info!( @@ -603,16 +620,16 @@ where } } -struct SenderMachineState { - trigger: BundleSenderTrigger, +struct SenderMachineState { + trigger: TRIG, transaction_tracker: T, send_bundle_response: Option>, inner: InnerState, requires_reset: bool, } -impl SenderMachineState { - fn new(trigger: BundleSenderTrigger, transaction_tracker: T) -> Self { +impl SenderMachineState { + fn new(trigger: TRIG, transaction_tracker: T) -> Self { Self { trigger, transaction_tracker, @@ -732,7 +749,7 @@ struct PendingState { impl PendingState { fn to_building(self) -> BuildingState { BuildingState { - wait_for_trigger: true, + wait_for_trigger: false, fee_increase_count: self.fee_increase_count + 1, } } @@ -771,6 +788,18 @@ impl CancelPendingState { } } +#[async_trait] +#[cfg_attr(test, automock)] +trait Trigger { + async fn wait_for_trigger( + &mut self, + ) -> anyhow::Result>>; + + async fn wait_for_block(&mut self) -> anyhow::Result; + + fn last_block(&self) -> &NewHead; +} + struct BundleSenderTrigger { bundling_mode: BundlingMode, block_rx: UnboundedReceiver, @@ -779,55 +808,8 @@ struct BundleSenderTrigger { last_block: NewHead, } -impl BundleSenderTrigger { - async fn new( - pool_client: &P, - bundle_action_receiver: mpsc::Receiver, - timer_interval: Duration, - ) -> anyhow::Result { - let block_rx = Self::start_block_stream(pool_client).await?; - - Ok(Self { - bundling_mode: BundlingMode::Auto, - block_rx, - bundle_action_receiver, - timer: tokio::time::interval(timer_interval), - last_block: NewHead { - block_hash: H256::zero(), - block_number: 0, - }, - }) - } - - async fn start_block_stream( - pool_client: &P, - ) -> anyhow::Result> { - let Ok(mut new_heads) = pool_client.subscribe_new_heads().await else { - error!("Failed to subscribe to new blocks"); - bail!("failed to subscribe to new blocks"); - }; - - let (tx, rx) = mpsc::unbounded_channel(); - tokio::spawn(async move { - loop { - match new_heads.next().await { - Some(b) => { - if tx.send(b).is_err() { - error!("Failed to buffer new block for bundle sender"); - return; - } - } - None => { - error!("Block stream ended"); - return; - } - } - } - }); - - Ok(rx) - } - +#[async_trait] +impl Trigger for BundleSenderTrigger { async fn wait_for_trigger( &mut self, ) -> anyhow::Result>> { @@ -905,6 +887,60 @@ impl BundleSenderTrigger { Ok(self.last_block.clone()) } + fn last_block(&self) -> &NewHead { + &self.last_block + } +} + +impl BundleSenderTrigger { + async fn new( + pool_client: &P, + bundle_action_receiver: mpsc::Receiver, + timer_interval: Duration, + ) -> anyhow::Result { + let block_rx = Self::start_block_stream(pool_client).await?; + + Ok(Self { + bundling_mode: BundlingMode::Auto, + block_rx, + bundle_action_receiver, + timer: tokio::time::interval(timer_interval), + last_block: NewHead { + block_hash: H256::zero(), + block_number: 0, + }, + }) + } + + async fn start_block_stream( + pool_client: &P, + ) -> anyhow::Result> { + let Ok(mut new_heads) = pool_client.subscribe_new_heads().await else { + error!("Failed to subscribe to new blocks"); + bail!("failed to subscribe to new blocks"); + }; + + let (tx, rx) = mpsc::unbounded_channel(); + tokio::spawn(async move { + loop { + match new_heads.next().await { + Some(b) => { + if tx.send(b).is_err() { + error!("Failed to buffer new block for bundle sender"); + return; + } + } + None => { + error!("Block stream ended"); + return; + } + } + } + }); + + Ok(rx) + } + fn consume_blocks(&mut self) -> anyhow::Result<()> { // Consume any other blocks that may have been buffered up loop { @@ -922,10 +958,6 @@ impl BundleSenderTrigger { } } } - - fn last_block(&self) -> &NewHead { - &self.last_block - } } #[derive(Debug, Clone)] @@ -1005,3 +1037,530 @@ impl BuilderMetrics { metrics::counter!("builder_state_machine_errors", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } } + +#[cfg(test)] +mod tests { + use ethers::types::Bytes; + use mockall::Sequence; + use rundler_provider::MockEntryPointV0_6; + use rundler_types::{ + chain::ChainSpec, pool::MockPool, v0_6::UserOperation, UserOpsPerAggregator, + }; + use tokio::sync::{broadcast, mpsc}; + + use super::*; + use crate::{ + bundle_proposer::{Bundle, MockBundleProposer}, + bundle_sender::{BundleSenderImpl, MockTrigger}, + transaction_tracker::MockTransactionTracker, + }; + + #[tokio::test] + async fn test_empty_send() { + let Mocks { + mut mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + // block 0 + add_trigger_no_update_last_block( + &mut mock_trigger, + &mut mock_tracker, + &mut Sequence::new(), + 0, + ); + + // zero nonce + mock_tracker + .expect_get_nonce_and_required_fees() + .returning(|| Ok((U256::zero(), None))); + + // empty bundle + mock_proposer + .expect_make_bundle() + .times(1) + .returning(|_, _| Box::pin(async { Ok(Bundle::::default()) })); + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + // start in building state + let mut state = SenderMachineState::new(mock_trigger, mock_tracker); + + sender.step_state(&mut state).await.unwrap(); + + // empty bundle shouldn't move out of building state + assert!(matches!( + state.inner, + InnerState::Building(BuildingState { + wait_for_trigger: true, + .. + }) + )); + } + + #[tokio::test] + async fn test_send() { + let Mocks { + mut mock_proposer, + mut mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + // block 0 + add_trigger_no_update_last_block( + &mut mock_trigger, + &mut mock_tracker, + &mut Sequence::new(), + 0, + ); + + // zero nonce + mock_tracker + .expect_get_nonce_and_required_fees() + .returning(|| Ok((U256::zero(), None))); + + // bundle with one op + mock_proposer + .expect_make_bundle() + .times(1) + .returning(|_, _| Box::pin(async { Ok(bundle()) })); + + // should create the bundle txn + mock_entry_point + .expect_get_send_bundle_transaction() + .returning(|_, _, _, _| TypedTransaction::default()); + + // should send the bundle txn + mock_tracker + .expect_send_transaction() + .returning(|_, _| Box::pin(async { Ok(H256::zero()) })); + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + // start in building state + let mut state = SenderMachineState::new(mock_trigger, mock_tracker); + + sender.step_state(&mut state).await.unwrap(); + + // end in the pending state + assert!(matches!( + state.inner, + InnerState::Pending(PendingState { + until: 3, // block 0 + wait 3 blocks + .. + }) + )); + } + + #[tokio::test] + async fn test_wait_for_mine_success() { + let Mocks { + mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + let mut seq = Sequence::new(); + add_trigger_wait_for_block_last_block(&mut mock_trigger, &mut seq, 1); + mock_trigger + .expect_wait_for_block() + .once() + .in_sequence(&mut seq) + .returning(|| { + Box::pin(async { + Ok(NewHead { + block_number: 2, + block_hash: H256::zero(), + }) + }) + }); + // no call to last_block after mine + + let mut seq = Sequence::new(); + mock_tracker + .expect_check_for_update() + .once() + .in_sequence(&mut seq) + .returning(|| Box::pin(async { Ok(None) })); + mock_tracker + .expect_check_for_update() + .once() + .in_sequence(&mut seq) + .returning(|| { + Box::pin(async { + Ok(Some(TrackerUpdate::Mined { + block_number: 2, + nonce: U256::zero(), + gas_limit: None, + gas_used: None, + tx_hash: H256::zero(), + attempt_number: 0, + })) + }) + }); + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + // start in pending state + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::Pending(PendingState { + until: 3, + fee_increase_count: 0, + }), + requires_reset: false, + }; + + // first step has no update + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Pending(PendingState { until: 3, .. }) + )); + + // second step is mined and moves back to building + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Building(BuildingState { + wait_for_trigger: true, + fee_increase_count: 0, + }) + )); + } + + #[tokio::test] + async fn test_wait_for_mine_timed_out() { + let Mocks { + mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + let mut seq = Sequence::new(); + for i in 1..=3 { + add_trigger_wait_for_block_last_block(&mut mock_trigger, &mut seq, i); + } + + mock_tracker + .expect_check_for_update() + .times(3) + .returning(|| Box::pin(async { Ok(None) })); + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + // start in pending state + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::Pending(PendingState { + until: 3, + fee_increase_count: 0, + }), + requires_reset: false, + }; + + // first and second step has no update + for _ in 0..2 { + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Pending(PendingState { until: 3, .. }) + )); + } + + // third step times out and moves back to building with a fee increase + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Building(BuildingState { + wait_for_trigger: false, + fee_increase_count: 1, + }) + )); + } + + #[tokio::test] + async fn test_send_cancel() { + let Mocks { + mut mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + mock_proposer + .expect_estimate_gas_fees() + .once() + .returning(|_| Box::pin(async { Ok((GasFees::default(), U256::zero())) })); + + mock_tracker + .expect_cancel_transaction() + .once() + .returning(|_, _| Box::pin(async { Ok(Some(H256::zero())) })); + + mock_trigger.expect_last_block().return_const(NewHead { + block_number: 0, + block_hash: H256::zero(), + }); + + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::Cancelling(CancellingState { + fee_increase_count: 0, + }), + requires_reset: false, + }; + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::CancelPending(CancelPendingState { + until: 3, + fee_increase_count: 0, + }) + )); + } + + #[tokio::test] + async fn test_resubmit_cancel() { + let Mocks { + mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + let mut seq = Sequence::new(); + for i in 1..=3 { + add_trigger_wait_for_block_last_block(&mut mock_trigger, &mut seq, i); + } + + mock_tracker + .expect_check_for_update() + .times(3) + .returning(|| Box::pin(async { Ok(None) })); + + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::CancelPending(CancelPendingState { + until: 3, + fee_increase_count: 0, + }), + requires_reset: false, + }; + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + for _ in 0..2 { + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::CancelPending(CancelPendingState { + until: 3, + fee_increase_count: 0, + }) + )); + } + + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Cancelling(CancellingState { + fee_increase_count: 1, + }) + )); + } + + #[tokio::test] + async fn test_condition_not_met() { + let Mocks { + mut mock_proposer, + mut mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + let mut seq = Sequence::new(); + add_trigger_no_update_last_block(&mut mock_trigger, &mut mock_tracker, &mut seq, 1); + + // zero nonce + mock_tracker + .expect_get_nonce_and_required_fees() + .returning(|| Ok((U256::zero(), None))); + + // bundle with one op + mock_proposer + .expect_make_bundle() + .times(1) + .returning(|_, _| Box::pin(async { Ok(bundle()) })); + + // should create the bundle txn + mock_entry_point + .expect_get_send_bundle_transaction() + .returning(|_, _, _, _| TypedTransaction::default()); + + // should send the bundle txn, returns condition not met + mock_tracker + .expect_send_transaction() + .returning(|_, _| Box::pin(async { Err(TransactionTrackerError::ConditionNotMet) })); + + // should notify proposer that condition was not met + mock_proposer + .expect_notify_condition_not_met() + .times(1) + .return_const(()); + + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::Building(BuildingState { + wait_for_trigger: true, + fee_increase_count: 0, + }), + requires_reset: false, + }; + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + sender.step_state(&mut state).await.unwrap(); + + // end back in the building state without waiting for trigger + assert!(matches!( + state.inner, + InnerState::Building(BuildingState { + wait_for_trigger: false, + fee_increase_count: 0, + }) + )); + } + + struct Mocks { + mock_proposer: MockBundleProposer, + mock_entry_point: MockEntryPointV0_6, + mock_tracker: MockTransactionTracker, + mock_trigger: MockTrigger, + } + + fn new_mocks() -> Mocks { + let mut mock_entry_point = MockEntryPointV0_6::new(); + mock_entry_point + .expect_address() + .return_const(Address::default()); + + Mocks { + mock_proposer: MockBundleProposer::new(), + mock_entry_point, + mock_tracker: MockTransactionTracker::new(), + mock_trigger: MockTrigger::new(), + } + } + + fn new_sender( + mock_proposer: MockBundleProposer, + mock_entry_point: MockEntryPointV0_6, + ) -> BundleSenderImpl< + UserOperation, + MockBundleProposer, + MockEntryPointV0_6, + MockTransactionTracker, + MockPool, + > { + BundleSenderImpl::new( + 0, + mpsc::channel(1000).1, + ChainSpec::default(), + Address::default(), + mock_proposer, + mock_entry_point, + MockTransactionTracker::new(), + MockPool::new(), + Settings { + max_fee_increases: 3, + max_blocks_to_wait_for_mine: 3, + }, + broadcast::channel(1000).0, + ) + } + + fn add_trigger_no_update_last_block( + mock_trigger: &mut MockTrigger, + mock_tracker: &mut MockTransactionTracker, + seq: &mut Sequence, + block_number: u64, + ) { + mock_trigger + .expect_wait_for_trigger() + .once() + .in_sequence(seq) + .returning(move || Box::pin(async move { Ok(None) })); + mock_tracker + .expect_check_for_update() + .returning(|| Box::pin(async { Ok(None) })); + mock_trigger + .expect_last_block() + .once() + .in_sequence(seq) + .return_const(NewHead { + block_number, + block_hash: H256::zero(), + }); + } + + fn add_trigger_wait_for_block_last_block( + mock_trigger: &mut MockTrigger, + seq: &mut Sequence, + block_number: u64, + ) { + mock_trigger + .expect_wait_for_block() + .once() + .in_sequence(seq) + .returning(move || { + Box::pin(async move { + Ok(NewHead { + block_number, + block_hash: H256::zero(), + }) + }) + }); + mock_trigger + .expect_last_block() + .once() + .in_sequence(seq) + .return_const(NewHead { + block_number, + block_hash: H256::zero(), + }); + } + + fn bundle() -> Bundle { + Bundle { + gas_estimate: U256::from(100_000), + gas_fees: GasFees::default(), + expected_storage: Default::default(), + rejected_ops: vec![], + entity_updates: vec![], + ops_per_aggregator: vec![UserOpsPerAggregator { + aggregator: Address::zero(), + signature: Bytes::new(), + user_ops: vec![UserOperation::default()], + }], + } + } +} diff --git a/crates/builder/src/transaction_tracker.rs b/crates/builder/src/transaction_tracker.rs index cc0fff660..4514951f6 100644 --- a/crates/builder/src/transaction_tracker.rs +++ b/crates/builder/src/transaction_tracker.rs @@ -16,6 +16,8 @@ use std::sync::Arc; use anyhow::{bail, Context}; use async_trait::async_trait; use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256}; +#[cfg(test)] +use mockall::automock; use rundler_provider::Provider; use rundler_sim::ExpectedStorage; use rundler_types::GasFees; @@ -33,6 +35,7 @@ use crate::sender::{TransactionSender, TxSenderError, TxStatus}; /// succeeded (potentially not the most recent one) or whether circumstances /// have changed so that it is worth making another attempt. #[async_trait] +#[cfg_attr(test, automock)] pub(crate) trait TransactionTracker: Send + Sync + 'static { /// Returns the current nonce and the required fees for the next transaction. fn get_nonce_and_required_fees(&self) -> TransactionTrackerResult<(U256, Option)>;