From 7c9c85b2a3bbef588bfafac0a7a69eddc181079b Mon Sep 17 00:00:00 2001 From: dancoombs Date: Mon, 24 Jun 2024 12:47:19 -0500 Subject: [PATCH] fix(builder): fix builder state machine replacement tracking --- bin/rundler/src/cli/builder.rs | 26 ++- crates/builder/src/bundle_proposer.rs | 49 +++-- crates/builder/src/bundle_sender.rs | 256 +++++++++++++++++++--- crates/builder/src/task.rs | 9 +- crates/builder/src/transaction_tracker.rs | 89 ++++++-- crates/pool/src/mempool/pool.rs | 56 ++--- docs/architecture/builder.md | 74 ++++++- docs/cli.md | 6 +- 8 files changed, 436 insertions(+), 129 deletions(-) diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index 3ad87f04e..333c86dbe 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -226,16 +226,25 @@ pub struct BuilderArgs { )] replacement_fee_percent_increase: u64, - /// Maximum number of times to increase gas fees when retrying a transaction + /// Maximum number of times to increase gas fees when retrying a cancellation transaction /// before giving up. #[arg( - long = "builder.max_fee_increases", - name = "builder.max_fee_increases", - env = "BUILDER_MAX_FEE_INCREASES", - // Seven increases of 10% is roughly 2x the initial fees. - default_value = "7" + long = "builder.max_cancellation_fee_increases", + name = "builder.max_cancellation_fee_increases", + env = "BUILDER_MAX_CANCELLATION_FEE_INCREASES", + default_value = "15" )] - max_fee_increases: u64, + max_cancellation_fee_increases: u64, + + /// The maximum number of blocks to wait in a replacement underpriced state before issuing + /// a cancellation transaction. + #[arg( + long = "builder.max_replacement_underpriced_blocks", + name = "builder.max_replacement_underpriced_blocks", + env = "BUILDER_MAX_REPLACEMENT_UNDERPRICED_BLOCKS", + default_value = "20" + )] + max_replacement_underpriced_blocks: u64, /// The index offset to apply to the builder index #[arg( @@ -350,7 +359,8 @@ impl BuilderArgs { sim_settings: common.try_into()?, max_blocks_to_wait_for_mine: self.max_blocks_to_wait_for_mine, replacement_fee_percent_increase: self.replacement_fee_percent_increase, - max_fee_increases: self.max_fee_increases, + max_cancellation_fee_increases: self.max_cancellation_fee_increases, + max_replacement_underpriced_blocks: self.max_replacement_underpriced_blocks, remote_address, }) } diff --git a/crates/builder/src/bundle_proposer.rs b/crates/builder/src/bundle_proposer.rs index 17999cfcc..ea4def832 100644 --- a/crates/builder/src/bundle_proposer.rs +++ b/crates/builder/src/bundle_proposer.rs @@ -104,18 +104,35 @@ pub(crate) trait BundleProposer: Send + Sync + 'static { &mut self, min_fees: Option, is_replacement: bool, - ) -> anyhow::Result>; + ) -> BundleProposerResult>; /// Gets the current gas fees /// /// If `min_fees` is `Some`, the proposer will ensure the gas fees returned are at least `min_fees`. - async fn estimate_gas_fees(&self, min_fees: Option) - -> anyhow::Result<(GasFees, U256)>; + async fn estimate_gas_fees( + &self, + min_fees: Option, + ) -> BundleProposerResult<(GasFees, U256)>; /// Notifies the proposer that a condition was not met during the last bundle proposal fn notify_condition_not_met(&mut self); } +pub(crate) type BundleProposerResult = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum BundleProposerError { + #[error("No operations initially")] + NoOperationsInitially, + #[error("No operations after fee filtering")] + NoOperationsAfterFeeFilter, + #[error(transparent)] + ProviderError(#[from] rundler_provider::ProviderError), + /// All other errors + #[error(transparent)] + Other(#[from] anyhow::Error), +} + #[derive(Debug)] pub(crate) struct BundleProposerImpl { builder_index: u64, @@ -155,8 +172,11 @@ where async fn estimate_gas_fees( &self, required_fees: Option, - ) -> anyhow::Result<(GasFees, U256)> { - self.fee_estimator.required_bundle_fees(required_fees).await + ) -> BundleProposerResult<(GasFees, U256)> { + Ok(self + .fee_estimator + .required_bundle_fees(required_fees) + .await?) } fn notify_condition_not_met(&mut self) { @@ -167,16 +187,16 @@ where &mut self, required_fees: Option, is_replacement: bool, - ) -> anyhow::Result> { + ) -> BundleProposerResult> { let (ops, (block_hash, _), (bundle_fees, base_fee)) = try_join!( self.get_ops_from_pool(), self.provider .get_latest_block_hash_and_number() - .map_err(anyhow::Error::from), + .map_err(BundleProposerError::from), self.estimate_gas_fees(required_fees) )?; if ops.is_empty() { - return Ok(Bundle::default()); + return Err(BundleProposerError::NoOperationsInitially); } tracing::debug!("Starting bundle proposal with {} ops", ops.len()); @@ -206,7 +226,7 @@ where tracing::debug!("Bundle proposal after fee limit had {} ops", ops.len()); if ops.is_empty() { - return Ok(Bundle::default()); + return Err(BundleProposerError::NoOperationsAfterFeeFilter); } // (2) Limit the amount of operations for simulation @@ -686,7 +706,7 @@ where async fn estimate_gas_rejecting_failed_ops( &self, context: &mut ProposalContext, - ) -> anyhow::Result> { + ) -> BundleProposerResult> { // sum up the gas needed for all the ops in the bundle // and apply an overhead multiplier let gas = math::increase_by_percent( @@ -731,7 +751,7 @@ where } } - async fn get_ops_from_pool(&self) -> anyhow::Result> { + async fn get_ops_from_pool(&self) -> BundleProposerResult> { // Use builder's index as the shard index to ensure that two builders don't // attempt to bundle the same operations. // @@ -754,7 +774,7 @@ where &self, addresses: impl IntoIterator, block_hash: H256, - ) -> anyhow::Result> { + ) -> BundleProposerResult> { let futures = addresses.into_iter().map(|address| async move { let deposit = self .entry_point @@ -1294,8 +1314,9 @@ impl ProposalContext { } SimulationViolation::UnintendedRevertWithMessage(entity_type, message, address) => { match &message[..4] { - // do not penalize an entity for invalid account nonces, which can occur without malicious intent from the sender - "AA25" => {} + // do not penalize an entity for invalid account nonces or already deployed senders, + // which can occur without malicious intent from the sender or factory + "AA10" | "AA25" => {} _ => { if let Some(entity_address) = address { self.add_entity_update( diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index 4c088dbb0..e3e77a319 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -25,7 +25,7 @@ use rundler_types::{ builder::BundlingMode, chain::ChainSpec, pool::{NewHead, Pool}, - EntityUpdate, GasFees, UserOperation, + EntityUpdate, UserOperation, }; use rundler_utils::emit::WithEntryPoint; use tokio::{ @@ -35,7 +35,7 @@ use tokio::{ use tracing::{debug, error, info, instrument, warn}; use crate::{ - bundle_proposer::BundleProposer, + bundle_proposer::{Bundle, BundleProposer, BundleProposerError}, emit::{BuilderEvent, BundleTxDetails}, transaction_tracker::{TrackerUpdate, TransactionTracker, TransactionTrackerError}, }; @@ -47,7 +47,8 @@ pub(crate) trait BundleSender: Send + Sync + 'static { #[derive(Debug)] pub(crate) struct Settings { - pub(crate) max_fee_increases: u64, + pub(crate) max_replacement_underpriced_blocks: u64, + pub(crate) max_cancellation_fee_increases: u64, pub(crate) max_blocks_to_wait_for_mine: u64, } @@ -102,8 +103,12 @@ pub enum SendBundleResult { enum SendBundleAttemptResult { // The bundle was successfully sent Success, - // The bundle was empty - NoOperations, + // There are no operations available to bundle + NoOperationsInitially, + // There were no operations after the fee was increased + NoOperationsAfterFeeFilter, + // There were no operations after the bundle was simulated + NoOperationsAfterSimulation, // Replacement Underpriced ReplacementUnderpriced, // Condition not met @@ -234,11 +239,31 @@ where block_number + self.settings.max_blocks_to_wait_for_mine, ))); } - Ok(SendBundleAttemptResult::NoOperations) => { - debug!("No operations to bundle"); - if inner.fee_increase_count > 0 { + Ok(SendBundleAttemptResult::NoOperationsInitially) => { + debug!("No operations available initially"); + state.complete(Some(SendBundleResult::NoOperationsInitially)); + } + Ok(SendBundleAttemptResult::NoOperationsAfterSimulation) => { + debug!("No operations available after simulation"); + state.complete(Some(SendBundleResult::NoOperationsInitially)); + } + Ok(SendBundleAttemptResult::NoOperationsAfterFeeFilter) => { + debug!("No operations to bundle after fee filtering"); + if let Some(underpriced_info) = inner.underpriced_info { + // If we are here, there are UOs in the pool that may be correctly priced, but are being blocked by an underpriced replacement + // after a fee increase. If we repeatedly get into this state, initiate a cancellation. + if block_number - underpriced_info.since_block + >= self.settings.max_replacement_underpriced_blocks + { + warn!("No operations available, but last replacement underpriced, moving to cancelling state. Round: {}. Since block {}. Current block {}. Max underpriced blocks: {}", underpriced_info.rounds, underpriced_info.since_block, block_number, self.settings.max_replacement_underpriced_blocks); + state.update(InnerState::Cancelling(inner.to_cancelling())); + } else { + info!("No operations available, but last replacement underpriced, starting over and waiting for next trigger. Round: {}. Since block {}. Current block {}", underpriced_info.rounds, underpriced_info.since_block, block_number); + state.update_and_reset(InnerState::Building(inner.underpriced_round())); + } + } else if inner.fee_increase_count > 0 { warn!( - "Abandoning bundle after fee increases {}, no operations available", + "Abandoning bundle after {} fee increases, no operations available after fee increase", inner.fee_increase_count ); self.metrics.increment_bundle_txns_abandoned(); @@ -247,7 +272,7 @@ where // If the node we are using still has the transaction in the mempool, its // possible we will get a `ReplacementUnderpriced` on the next iteration // and will start a cancellation. - state.reset(); + state.abandon(); } else { debug!("No operations available, waiting for next trigger"); state.complete(Some(SendBundleResult::NoOperationsInitially)); @@ -259,13 +284,15 @@ where state.reset(); } Ok(SendBundleAttemptResult::ReplacementUnderpriced) => { - info!("Replacement transaction underpriced, entering cancellation loop"); - state.update(InnerState::Cancelling(inner.to_cancelling())); + info!("Replacement transaction underpriced, marking as underpriced. Num fee increases {:?}", inner.fee_increase_count); + state.update(InnerState::Building( + inner.replacement_underpriced(block_number), + )); } Ok(SendBundleAttemptResult::ConditionNotMet) => { info!("Condition not met, notifying proposer and starting new bundle attempt"); self.proposer.notify_condition_not_met(); - state.reset(); + state.update(InnerState::Building(inner.retry())); } Err(error) => { error!("Bundle send error {error:?}"); @@ -352,7 +379,10 @@ where state: &mut SenderMachineState, inner: CancellingState, ) -> anyhow::Result<()> { - info!("Cancelling last transaction"); + info!( + "Cancelling last transaction, attempt {}", + inner.fee_increase_count + ); let (estimated_fees, _) = self .proposer @@ -381,7 +411,19 @@ where } Err(TransactionTrackerError::ReplacementUnderpriced) => { info!("Replacement transaction underpriced during cancellation, trying again"); - state.update(InnerState::Cancelling(inner.to_self())); + if inner.fee_increase_count >= self.settings.max_cancellation_fee_increases { + // abandon the cancellation + warn!("Abandoning cancellation after max fee increases {}, starting new bundle attempt", inner.fee_increase_count); + self.metrics.increment_cancellations_abandoned(); + state.reset(); + } else { + // Increase fees again + info!( + "Cancellation increasing fees, attempt: {}", + inner.fee_increase_count + 1 + ); + state.update(InnerState::Cancelling(inner.to_self())); + } } Err(TransactionTrackerError::NonceTooLow) => { // reset the transaction tracker and try again @@ -407,10 +449,19 @@ where // check for transaction update if let Some(update) = tracker_update { match update { - TrackerUpdate::Mined { .. } => { + TrackerUpdate::Mined { + gas_used, + gas_price, + .. + } => { // mined - info!("Cancellation transaction mined"); + let fee = gas_used.zip(gas_price).map(|(used, price)| used * price); + info!("Cancellation transaction mined. Price (wei) {fee:?}"); self.metrics.increment_cancellation_txns_mined(); + if let Some(fee) = fee { + self.metrics + .increment_cancellation_txns_total_fee(fee.as_u64()); + }; } TrackerUpdate::LatestTxDropped { .. } => { // If a cancellation gets dropped, move to bundling state as there is no @@ -425,9 +476,10 @@ where } state.reset(); } else if state.block_number() >= inner.until { - if inner.fee_increase_count >= self.settings.max_fee_increases { + if inner.fee_increase_count >= self.settings.max_cancellation_fee_increases { // abandon the cancellation warn!("Abandoning cancellation after max fee increases {}, starting new bundle attempt", inner.fee_increase_count); + self.metrics.increment_cancellations_abandoned(); state.reset(); } else { // start replacement, don't wait for trigger @@ -456,10 +508,22 @@ where ) -> anyhow::Result { let (nonce, required_fees) = state.transaction_tracker.get_nonce_and_required_fees()?; - let Some(bundle_tx) = self - .get_bundle_tx(nonce, required_fees, fee_increase_count > 0) - .await? - else { + let bundle = match self + .proposer + .make_bundle(required_fees, fee_increase_count > 0) + .await + { + Ok(bundle) => bundle, + Err(BundleProposerError::NoOperationsInitially) => { + return Ok(SendBundleAttemptResult::NoOperationsInitially); + } + Err(BundleProposerError::NoOperationsAfterFeeFilter) => { + return Ok(SendBundleAttemptResult::NoOperationsAfterFeeFilter); + } + Err(e) => bail!("Failed to make bundle: {e:?}"), + }; + + let Some(bundle_tx) = self.get_bundle_tx(nonce, bundle).await? else { self.emit(BuilderEvent::formed_bundle( self.builder_index, None, @@ -467,7 +531,7 @@ where fee_increase_count, required_fees, )); - return Ok(SendBundleAttemptResult::NoOperations); + return Ok(SendBundleAttemptResult::NoOperationsAfterSimulation); }; let BundleTx { tx, @@ -525,15 +589,8 @@ where async fn get_bundle_tx( &mut self, nonce: U256, - required_fees: Option, - is_replacement: bool, + bundle: Bundle, ) -> anyhow::Result> { - let bundle = self - .proposer - .make_bundle(required_fees, is_replacement) - .await - .context("proposer should create bundle for builder")?; - let remove_ops_future = async { if bundle.rejected_ops.is_empty() { return; @@ -649,10 +706,21 @@ impl SenderMachineState { let building_state = BuildingState { wait_for_trigger: false, fee_increase_count: 0, + underpriced_info: None, }; self.inner = InnerState::Building(building_state); } + fn update_and_reset(&mut self, inner: InnerState) { + self.update(inner); + self.requires_reset = true; + } + + fn abandon(&mut self) { + self.transaction_tracker.abandon(); + self.inner = InnerState::new(); + } + fn complete(&mut self, result: Option) { if let Some(result) = result { if let Some(r) = self.send_bundle_response.take() { @@ -715,6 +783,7 @@ impl InnerState { InnerState::Building(BuildingState { wait_for_trigger: true, fee_increase_count: 0, + underpriced_info: None, }) } } @@ -723,9 +792,17 @@ impl InnerState { struct BuildingState { wait_for_trigger: bool, fee_increase_count: u64, + underpriced_info: Option, +} + +#[derive(Debug, Clone, Copy)] +struct UnderpricedInfo { + since_block: u64, + rounds: u64, } impl BuildingState { + // Transition to pending state fn to_pending(self, until: u64) -> PendingState { PendingState { until, @@ -733,11 +810,56 @@ impl BuildingState { } } + // Transition to cancelling state fn to_cancelling(self) -> CancellingState { CancellingState { fee_increase_count: 0, } } + + // Retry the build + fn retry(mut self) -> Self { + self.wait_for_trigger = false; + self + } + + // Mark a replacement as underpriced + // + // The next state will NOT wait for a trigger. Use this when fees should be increased and a new bundler + // should be attempted immediately. + fn replacement_underpriced(self, block_number: u64) -> Self { + let ui = if let Some(underpriced_info) = self.underpriced_info { + underpriced_info + } else { + UnderpricedInfo { + since_block: block_number, + rounds: 1, + } + }; + + BuildingState { + wait_for_trigger: false, + fee_increase_count: self.fee_increase_count + 1, + underpriced_info: Some(ui), + } + } + + // Finalize an underpriced round. + // + // This will clear out the number of fee increases and increment the number of underpriced rounds. + // Use this when we are in an underpriced state, but there are no longer any UOs available to bundle. + fn underpriced_round(self) -> Self { + let mut underpriced_info = self + .underpriced_info + .expect("underpriced_info must be Some when calling underpriced_round"); + underpriced_info.rounds += 1; + + BuildingState { + wait_for_trigger: true, + fee_increase_count: 0, + underpriced_info: Some(underpriced_info), + } + } } #[derive(Debug, Clone, Copy)] @@ -751,6 +873,7 @@ impl PendingState { BuildingState { wait_for_trigger: false, fee_increase_count: self.fee_increase_count + 1, + underpriced_info: None, } } } @@ -1025,6 +1148,14 @@ impl BuilderMetrics { metrics::counter!("builder_cancellation_txns_mined", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } + fn increment_cancellation_txns_total_fee(&self, fee: u64) { + metrics::counter!("builder_cancellation_txns_total_fee", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(fee); + } + + fn increment_cancellations_abandoned(&self) { + metrics::counter!("builder_cancellations_abandoned", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); + } + fn increment_soft_cancellations(&self) { metrics::counter!("builder_soft_cancellations", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } @@ -1044,7 +1175,7 @@ mod tests { use mockall::Sequence; use rundler_provider::MockEntryPointV0_6; use rundler_types::{ - chain::ChainSpec, pool::MockPool, v0_6::UserOperation, UserOpsPerAggregator, + chain::ChainSpec, pool::MockPool, v0_6::UserOperation, GasFees, UserOpsPerAggregator, }; use tokio::sync::{broadcast, mpsc}; @@ -1197,6 +1328,7 @@ mod tests { nonce: U256::zero(), gas_limit: None, gas_used: None, + gas_price: None, tx_hash: H256::zero(), attempt_number: 0, })) @@ -1231,6 +1363,7 @@ mod tests { InnerState::Building(BuildingState { wait_for_trigger: true, fee_increase_count: 0, + underpriced_info: None, }) )); } @@ -1284,6 +1417,60 @@ mod tests { InnerState::Building(BuildingState { wait_for_trigger: false, fee_increase_count: 1, + underpriced_info: None, + }) + )); + } + + #[tokio::test] + async fn test_transition_to_cancel() { + let Mocks { + mut mock_proposer, + mock_entry_point, + mut mock_tracker, + mut mock_trigger, + } = new_mocks(); + + let mut seq = Sequence::new(); + add_trigger_no_update_last_block(&mut mock_trigger, &mut mock_tracker, &mut seq, 3); + + // zero nonce + mock_tracker + .expect_get_nonce_and_required_fees() + .returning(|| Ok((U256::zero(), None))); + + // fee filter error + mock_proposer + .expect_make_bundle() + .times(1) + .returning(|_, _| { + Box::pin(async { Err(BundleProposerError::NoOperationsAfterFeeFilter) }) + }); + + let mut sender = new_sender(mock_proposer, mock_entry_point); + + // start in underpriced meta-state + let mut state = SenderMachineState { + trigger: mock_trigger, + transaction_tracker: mock_tracker, + send_bundle_response: None, + inner: InnerState::Building(BuildingState { + wait_for_trigger: true, + fee_increase_count: 0, + underpriced_info: Some(UnderpricedInfo { + since_block: 0, + rounds: 1, + }), + }), + requires_reset: false, + }; + + // step state, block number should trigger move to cancellation + sender.step_state(&mut state).await.unwrap(); + assert!(matches!( + state.inner, + InnerState::Cancelling(CancellingState { + fee_increase_count: 0, }) )); } @@ -1432,6 +1619,7 @@ mod tests { inner: InnerState::Building(BuildingState { wait_for_trigger: true, fee_increase_count: 0, + underpriced_info: None, }), requires_reset: false, }; @@ -1446,6 +1634,7 @@ mod tests { InnerState::Building(BuildingState { wait_for_trigger: false, fee_increase_count: 0, + underpriced_info: None, }) )); } @@ -1491,8 +1680,9 @@ mod tests { MockTransactionTracker::new(), MockPool::new(), Settings { - max_fee_increases: 3, + max_cancellation_fee_increases: 3, max_blocks_to_wait_for_mine: 3, + max_replacement_underpriced_blocks: 3, }, broadcast::channel(1000).0, ) diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index 933cc35a0..6d4ffe5eb 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -91,8 +91,10 @@ pub struct Args { pub max_blocks_to_wait_for_mine: u64, /// Percentage to increase the fees by when replacing a bundle transaction pub replacement_fee_percent_increase: u64, - /// Maximum number of times to increase the fees when replacing a bundle transaction - pub max_fee_increases: u64, + /// Maximum number of times to increase the fee when cancelling a transaction + pub max_cancellation_fee_increases: u64, + /// Maximum amount of blocks to spend in a replacement underpriced state before moving to cancel + pub max_replacement_underpriced_blocks: u64, /// Address to bind the remote builder server to, if any. If none, no server is starter. pub remote_address: Option, /// Entry points to start builders for @@ -456,7 +458,8 @@ where .await?; let builder_settings = bundle_sender::Settings { - max_fee_increases: self.args.max_fee_increases, + max_replacement_underpriced_blocks: self.args.max_replacement_underpriced_blocks, + max_cancellation_fee_increases: self.args.max_cancellation_fee_increases, max_blocks_to_wait_for_mine: self.args.max_blocks_to_wait_for_mine, }; diff --git a/crates/builder/src/transaction_tracker.rs b/crates/builder/src/transaction_tracker.rs index 4514951f6..fcc5a2c3a 100644 --- a/crates/builder/src/transaction_tracker.rs +++ b/crates/builder/src/transaction_tracker.rs @@ -50,7 +50,7 @@ pub(crate) trait TransactionTracker: Send + Sync + 'static { expected_stroage: &ExpectedStorage, ) -> TransactionTrackerResult; - /// Cancel the latest transaction in the tracker. + /// Cancel the abandoned transaction in the tracker. /// /// Returns: An option containing the hash of the transaction that was used to cancel. If the option /// is empty, then either no transaction was cancelled or the cancellation was a "soft-cancel." @@ -71,6 +71,9 @@ pub(crate) trait TransactionTracker: Send + Sync + 'static { /// Resets the tracker to its initial state async fn reset(&mut self); + + /// Abandons the current transaction + fn abandon(&mut self); } /// Errors that can occur while using a `TransactionTracker`. @@ -99,6 +102,7 @@ pub(crate) enum TrackerUpdate { attempt_number: u64, gas_limit: Option, gas_used: Option, + gas_price: Option, }, LatestTxDropped { nonce: U256, @@ -121,6 +125,7 @@ where nonce: U256, transactions: Vec, has_dropped: bool, + has_abandoned: bool, attempt_count: u64, } @@ -159,6 +164,7 @@ where nonce, transactions: vec![], has_dropped: false, + has_abandoned: false, attempt_count: 0, }) } @@ -168,6 +174,7 @@ where self.transactions.clear(); self.has_dropped = false; self.attempt_count = 0; + self.has_abandoned = false; self.update_metrics(); } @@ -214,7 +221,7 @@ where async fn get_mined_tx_gas_info( &self, tx_hash: H256, - ) -> anyhow::Result<(Option, Option)> { + ) -> anyhow::Result<(Option, Option, Option)> { let (tx, tx_receipt) = tokio::try_join!( self.provider.get_transaction(tx_hash), self.provider.get_transaction_receipt(tx_hash), @@ -223,14 +230,14 @@ where warn!("failed to fetch transaction data for tx: {}", tx_hash); None }); - let gas_used = match tx_receipt { - Some(r) => r.gas_used, + let (gas_used, gas_price) = match tx_receipt { + Some(r) => (r.gas_used, r.effective_gas_price), None => { warn!("failed to fetch transaction receipt for tx: {}", tx_hash); - None + (None, None) } }; - Ok((gas_limit, gas_used)) + Ok((gas_limit, gas_used, gas_price)) } } @@ -241,7 +248,7 @@ where T: TransactionSender, { fn get_nonce_and_required_fees(&self) -> TransactionTrackerResult<(U256, Option)> { - let gas_fees = if self.has_dropped { + let gas_fees = if self.has_dropped || self.has_abandoned { None } else { self.transactions.last().map(|tx| { @@ -259,20 +266,45 @@ where ) -> TransactionTrackerResult { self.validate_transaction(&tx)?; let gas_fees = GasFees::from(&tx); - let sent_tx = self.sender.send_transaction(tx, expected_storage).await?; info!( - "Sent transaction {:?} nonce: {:?}", - sent_tx.tx_hash, sent_tx.nonce + "Sending transaction with nonce: {:?} gas fees: {:?}", + self.nonce, gas_fees ); - self.transactions.push(PendingTransaction { - tx_hash: sent_tx.tx_hash, - gas_fees, - attempt_number: self.attempt_count, - }); - self.has_dropped = false; - self.attempt_count += 1; - self.update_metrics(); - Ok(sent_tx.tx_hash) + let sent_tx = self.sender.send_transaction(tx, expected_storage).await; + + match sent_tx { + Ok(sent_tx) => { + info!( + "Sent transaction {:?} nonce: {:?}", + sent_tx.tx_hash, sent_tx.nonce + ); + self.transactions.push(PendingTransaction { + tx_hash: sent_tx.tx_hash, + gas_fees, + attempt_number: self.attempt_count, + }); + self.has_dropped = false; + self.has_abandoned = false; + self.attempt_count += 1; + self.update_metrics(); + Ok(sent_tx.tx_hash) + } + Err(e) if matches!(e, TxSenderError::ReplacementUnderpriced) => { + info!("Replacement underpriced: nonce: {:?}", self.nonce); + // still store this as a pending transaction so that we can continue to increase fees. + self.transactions.push(PendingTransaction { + tx_hash: H256::zero(), + gas_fees, + attempt_number: self.attempt_count, + }); + self.has_dropped = false; + self.has_abandoned = false; + self.attempt_count += 1; + self.update_metrics(); + Err(e.into()) + } + Err(e) => Err(e.into()), + } } async fn cancel_transaction( @@ -309,7 +341,10 @@ where return Ok(None); } - info!("Sent cancellation tx {:?}", cancel_info.tx_hash); + info!( + "Sent cancellation tx {:?} fees: {:?}", + cancel_info.tx_hash, gas_fees + ); self.transactions.push(PendingTransaction { tx_hash: cancel_info.tx_hash, @@ -342,7 +377,8 @@ where .context("tracker should check transaction status when the nonce changes")?; info!("Status of tx {:?}: {:?}", tx.tx_hash, status); if let TxStatus::Mined { block_number } = status { - let (gas_limit, gas_used) = self.get_mined_tx_gas_info(tx.tx_hash).await?; + let (gas_limit, gas_used, gas_price) = + self.get_mined_tx_gas_info(tx.tx_hash).await?; out = TrackerUpdate::Mined { tx_hash: tx.tx_hash, nonce: self.nonce, @@ -350,6 +386,7 @@ where attempt_number: tx.attempt_number, gas_limit, gas_used, + gas_price, }; break; } @@ -378,7 +415,8 @@ where TxStatus::Mined { block_number } => { let nonce = self.nonce; self.set_nonce_and_clear_state(nonce + 1); - let (gas_limit, gas_used) = self.get_mined_tx_gas_info(last_tx.tx_hash).await?; + let (gas_limit, gas_used, gas_price) = + self.get_mined_tx_gas_info(last_tx.tx_hash).await?; Some(TrackerUpdate::Mined { tx_hash: last_tx.tx_hash, nonce, @@ -386,6 +424,7 @@ where attempt_number: last_tx.attempt_number, gas_limit, gas_used, + gas_price, }) } TxStatus::Dropped => { @@ -399,6 +438,12 @@ where let nonce = self.get_external_nonce().await.unwrap_or(self.nonce); self.set_nonce_and_clear_state(nonce); } + + fn abandon(&mut self) { + self.has_abandoned = true; + self.attempt_count = 0; + // remember the transaction in case we need to cancel it + } } impl From for TransactionTrackerError { diff --git a/crates/pool/src/mempool/pool.rs b/crates/pool/src/mempool/pool.rs index 817cd9afe..a5f2eb022 100644 --- a/crates/pool/src/mempool/pool.rs +++ b/crates/pool/src/mempool/pool.rs @@ -28,7 +28,7 @@ use rundler_types::{ Entity, EntityType, GasFees, Timestamp, UserOperation, UserOperationId, UserOperationVariant, }; use rundler_utils::math; -use tracing::info; +use tracing::{info, warn}; use super::{entity_tracker::EntityCounter, size::SizeTracker, MempoolResult, PoolConfig}; use crate::chain::MinedOp; @@ -67,6 +67,8 @@ pub(crate) struct PoolInner { by_id: HashMap, /// Best operations, sorted by gas price best: BTreeSet, + /// Time to mine info + time_to_mine: HashMap, /// Removed operations, temporarily kept around in case their blocks are /// reorged away. Stored along with the block number at which it was /// removed. @@ -95,6 +97,7 @@ impl PoolInner { by_hash: HashMap::new(), by_id: HashMap::new(), best: BTreeSet::new(), + time_to_mine: HashMap::new(), mined_at_block_number_by_hash: HashMap::new(), mined_hashes_with_block_numbers: BTreeSet::new(), count_by_address: HashMap::new(), @@ -172,6 +175,7 @@ impl PoolInner { let block_delta_time = sys_block_time - self.prev_sys_block_time; let block_delta_height = block_number - self.prev_block_number; + let candidate_gas_price = base_fee + candidate_gas_fees.max_priority_fee_per_gas; let mut expired = Vec::new(); let mut num_candidates = 0; @@ -180,12 +184,15 @@ impl PoolInner { expired.push((*hash, op.po.valid_time_range.valid_until)); } - num_candidates += if op.update_time_to_mine( - block_delta_time, - block_delta_height, - candidate_gas_fees, - base_fee, - ) { + let uo_gas_price = cmp::min( + op.uo().max_fee_per_gas(), + op.uo().max_priority_fee_per_gas() + base_fee, + ); + + num_candidates += if uo_gas_price >= candidate_gas_price { + if let Some(ttm) = self.time_to_mine.get_mut(hash) { + ttm.increase(block_delta_time, block_delta_height); + } 1 } else { 0 @@ -282,7 +289,11 @@ impl PoolInner { block_number: u64, ) -> Option> { let tx_in_pool = self.by_id.get(&mined_op.id())?; - PoolMetrics::record_time_to_mine(&tx_in_pool.time_to_mine, mined_op.entry_point); + if let Some(time_to_mine) = self.time_to_mine.remove(&mined_op.hash) { + PoolMetrics::record_time_to_mine(&time_to_mine, mined_op.entry_point); + } else { + warn!("Could not find time to mine for {:?}", mined_op.hash); + } let hash = tx_in_pool .uo() @@ -420,7 +431,6 @@ impl PoolInner { let pool_op = OrderedPoolOperation { po: op, submission_id: submission_id.unwrap_or_else(|| self.next_submission_id()), - time_to_mine: TimeToMineInfo::new(), }; // update counts @@ -439,6 +449,7 @@ impl PoolInner { self.by_hash.insert(hash, pool_op.clone()); self.by_id.insert(pool_op.uo().id(), pool_op.clone()); self.best.insert(pool_op); + self.time_to_mine.insert(hash, TimeToMineInfo::new()); // TODO(danc): This silently drops UOs from the pool without reporting let removed = self @@ -461,6 +472,7 @@ impl PoolInner { let id = &op.po.uo.id(); self.by_id.remove(id); self.best.remove(&op); + self.time_to_mine.remove(&hash); if let Some(block_number) = block_number { self.cache_size += op.mem_size(); @@ -525,7 +537,6 @@ impl PoolInner { struct OrderedPoolOperation { po: Arc, submission_id: u64, - time_to_mine: TimeToMineInfo, } impl OrderedPoolOperation { @@ -536,28 +547,6 @@ impl OrderedPoolOperation { fn mem_size(&self) -> usize { std::mem::size_of::() + self.po.mem_size() } - - fn update_time_to_mine( - &mut self, - block_delta_time: Duration, - block_delta_height: u64, - candidate_gas_fees: GasFees, - base_fee: U256, - ) -> bool { - let candidate_gas_price = base_fee + candidate_gas_fees.max_priority_fee_per_gas; - let uo_gas_price = cmp::min( - self.uo().max_fee_per_gas(), - self.uo().max_priority_fee_per_gas() + base_fee, - ); - - if uo_gas_price >= candidate_gas_price { - self.time_to_mine - .increase(block_delta_time, block_delta_height); - true - } else { - false - } - } } impl Eq for OrderedPoolOperation {} @@ -1012,7 +1001,6 @@ mod tests { OrderedPoolOperation { po: Arc::new(po1), submission_id: 0, - time_to_mine: TimeToMineInfo::new() } .mem_size() ); @@ -1053,7 +1041,6 @@ mod tests { OrderedPoolOperation { po: Arc::new(po2), submission_id: 0, - time_to_mine: TimeToMineInfo::new(), } .mem_size() ); @@ -1129,7 +1116,6 @@ mod tests { OrderedPoolOperation { po: Arc::new(create_op(Address::random(), 1, 1)), submission_id: 1, - time_to_mine: TimeToMineInfo::new(), } .mem_size() } diff --git a/docs/architecture/builder.md b/docs/architecture/builder.md index 1ac0df6db..767770880 100644 --- a/docs/architecture/builder.md +++ b/docs/architecture/builder.md @@ -59,25 +59,14 @@ When using AWS KMS for signing Rundler requires the use of Redis to perform key To ensure that no two signers in a bundler system attempt to use the same key, causing nonce collisions, this key leasing system is used to lease a key in a CLI configured list to a single signer at a time. ## Transaction Senders - The builder supports multiple sender implementations to support bundle transaction submission to different types of APIs. -- **Raw**: Send the bundle as an `eth_sendRawTransaction` via a standard ETH JSON-RPC. - -- **Conditional**: Send the bundle as an `eth_sendRawTransactionConditional` to an interface that supports the [conditional transaction RPC](https://notes.ethereum.org/@yoav/SkaX2lS9j). +- **Raw**: Send the bundle as an `eth_sendRawTransaction` via a standard ETH JSON-RPC. If conditional RPC is enabled it will send the bundle as an `eth_sendRawTransactionConditional` to an interface that supports the [conditional transaction RPC](https://notes.ethereum.org/@yoav/SkaX2lS9j). - **Flashbots**: Submit bundles via the [Flashbots Protect](https://docs.flashbots.net/) RPC endpoint, only supported on Ethereum Mainnet. - **Bloxroute**: Submit bundles via Bloxroute's [Polygon Private Transaction](https://docs.bloxroute.com/apis/frontrunning-protection/polygon_private_tx) endpoint. Only supported on polygon. -## Transaction Tracking - -After the bundle transaction is sent, the sender tracks its status via the transaction tracker module. This module tracks to see if a transaction is pending, dropped, or mined. - -If after a configured amount of blocks the transaction is still pending, the sender will attempt to re-estimate gas fees and will submit a new bundle that replaces the old bundle. - -If dropped or mined, the sender will restart the process. - ## N-Senders Rundler has the ability to run N bundle sender state machines in parallel, each configured with their own distinct signer/account for bundle submission. @@ -85,3 +74,64 @@ Rundler has the ability to run N bundle sender state machines in parallel, each In order for bundle proposers to avoid attempting to bundle the same UO, the sender is configured with a mempool shard index that is added to the request to the pool. This shard index is used by the pool to always return a disjoint set of UOs to each sender. N-senders can be useful to increase bundler gas throughput. + +## Sender State Machine + +The bundle sender is implemented as an finite state machine to continuously submit bundle transactions onchain. The state machine runs as long as the builder process is running. + +### States + +**`Building`** + +In the building state the sender is waiting for a trigger. Once triggered, the sender will query the mempool for available user operations. Those user operations are then filtered by the current fees, total gas limit, and simulation results. If before/after the filtering there are no candidate user operations, the sender will wait for another trigger. If there are candidate user operations, a bundle transaction is submitted. If a cancellation is required, the sender will transfer to the cancelling state. + +**`Pending`** + +In the pending state the builder is waiting for a bundle transaction to be mined. It will wait in this state for up to `max_blocks_to_wait_for_mine` blocks. If mined, dropped, or timed out (abandoned) the sender will transition back to the building state with the appropriate metadata captured. + +**`Cancelling`** + +In the cancelling state the builder creates a cancellation operation. The shape of this operation depends on the type of transaction sender being used. If a "hard" cancellation operation is submitted the sender will submit a cancellation transaction and transition to the cancel pending state. If a "soft" cancellation operation is submitted it will transition back to the building state immediately. + +**`CancelPending`** + +In the cancel pending state the builder is waiting for a cancellation transaction to be mined. It will wait in this state for up to `max_blocks_to_wait_for_mine` blocks. If mined, the sender will transition back to the building state. If dropped or timed out (abandoned), the sender will transition back to the cancelling state. If the sender has already performed `max_cancellation_fee_increases`, and the transaction has been abandoned, it will transition back to the building state and reset internal state. + +### Triggers + +While in the building state the sender is waiting for a trigger. There are 3 types of triggers: + +* New block (building mode: auto): Trigger bundle building when a new block is mined. +* Time (building mode: auto): Trigger bundle building after `bundle_max_send_interval_millis` (chain spec) has elapsed without a bundle attempt. +* Manual call (building mode: manual): Trigger bundle building on a call to `debug_bundler_sendBundleNow`. + +### Cancellations + +Cancellations occur in a specific scenario: there are user operations available that pay more than the estimated gas price, but when the sender submits the bundle transaction it receives a "replacement underpriced" error. If after increasing the fee the user operations are priced out, we are in an "underpriced" meta-state. + +The first time the sender encounters this state it will capture the block number and attempt to create another bundle, resetting the fees. During subsequent encounters the builder will compare that block number to latest, if the difference is more than `max_replacement_underpriced_blocks`, the builder will move to a cancellation state. + +The goal of the cancellation state is to remove the pending transaction from the mempool that is blocking the bundle submission, and to do so while spending the least amount of gas. There are two types of cancellations: "hard" and "soft." A "hard" cancellation requires a transaction to be sent onchain. This is typically an empty transaction to minimize costs. A "soft" cancellation does not require a transaction and is simply an RPC interaction. + +### Diagram + +```mermaid +--- +title: Bundle Sender State Machine (Simplified) +--- +stateDiagram-v2 + Building: Building + Pending + Cancelling + CancelPending + + [*] --> Building + Building --> Building : No operations + Building --> Pending : Bundle submitted + Pending --> Building : Bundle mined/dropped/abandoned + Building --> Cancelling : Cancel triggered + Cancelling --> CancelPending: Hard cancellation submitted + Cancelling --> Building : Soft cancellation completed + CancelPending --> Cancelling: Cancellation dropped/abandoned + CancelPending --> Building: Cancellation mined/aborted +``` diff --git a/docs/cli.md b/docs/cli.md index ff26b0216..05d9f056a 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -187,8 +187,10 @@ List of command line options for configuring the Builder. - env: *BUILDER_MAX_BLOCKS_TO_WAIT_FOR_MINE* - `--builder.replacement_fee_percent_increase`: Percentage amount to increase gas fees when retrying a transaction after it failed to mine (default: `10`) - env: *BUILDER_REPLACEMENT_FEE_PERCENT_INCREASE* -- `--builder.max_fee_increases`: Maximum number of fee increases to attempt (Seven increases of 10% is roughly 2x the initial fees) (default: `7`) - - env: *BUILDER_MAX_FEE_INCREASES* +- `--builder.max_cancellation_fee_increases`: Maximum number of cancellation fee increases to attempt (default: `15`) + - env: *BUILDER_MAX_CANCELLATION_FEE_INCREASES* +- `--builder.max_replacement_underpriced_blocks`: The maximum number of blocks to wait in a replacement underpriced state before issuing a cancellation transaction (default: `20`) + - env: *BUILDER_MAX_REPLACEMENT_UNDERPRICED_BLOCKS* - `--builder.sender`: Choice of what sender type to use for transaction submission. (default: `raw`, options: `raw`, `flashbots`, `polygon_bloxroute`) - env: *BUILDER_SENDER* - `--builder.submit_url`: Only used if builder.sender == "raw." If present, the URL of the ETH provider that will be used to send transactions. Defaults to the value of `node_http`.