From 1ccd84793855a6b3e4c5f6b86eea79e7902ba116 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Tue, 11 Jun 2024 14:59:56 -0500 Subject: [PATCH] refactor(builder): large refactor of bundle sender state machine and metrics --- crates/builder/src/bundle_sender.rs | 832 +++++++++++++++------------- crates/builder/src/server/local.rs | 3 - 2 files changed, 462 insertions(+), 373 deletions(-) diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index 67375f0c5..8a7a37af2 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -57,10 +57,11 @@ pub(crate) struct BundleSenderImpl { beneficiary: Address, proposer: P, entry_point: E, - transaction_tracker: T, + transaction_tracker: Option, pool: C, settings: Settings, event_sender: broadcast::Sender>, + metrics: BuilderMetrics, _uo_type: PhantomData, } @@ -91,9 +92,6 @@ pub enum SendBundleResult { tx_hash: H256, }, NoOperationsInitially, - NoOperationsAfterFeeIncreases { - attempt_number: u64, - }, StalledAtMaxFeeIncreases, Error(anyhow::Error), } @@ -124,314 +122,23 @@ where /// next one. #[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), builder_index = self.builder_index))] async fn send_bundles_in_loop(mut self) -> anyhow::Result<()> { - // State of the sender loop - enum State { - // Building a bundle, optionally waiting for a trigger to send it - // (wait_for_trigger, fee_increase_count) - Building(bool, u64), - // Waiting for a bundle to be mined - // (wait_until_block, fee_increase_count) - Pending(u64, u64), - // Cancelling the last transaction - // (fee_increase_count) - Cancelling(u64), - // Waiting for a cancellation transaction to be mined - // (wait_until_block, fee_increase_count) - CancelPending(u64, u64), - } - - // initial state - let mut state = State::Building(true, 0); - - // response to manual caller - let mut send_bundle_response = None; - let mut send_bundle_result = None; - // trigger for sending bundles - let mut sender_trigger = BundleSenderTrigger::new( + let sender_trigger = BundleSenderTrigger::new( &self.pool, self.bundle_action_receiver.take().unwrap(), Duration::from_millis(self.chain_spec.bundle_max_send_interval_millis), ) .await?; - loop { - match state { - State::Building(wait_for_trigger, fee_increase_count) => { - if wait_for_trigger { - send_bundle_response = sender_trigger.wait_for_trigger().await?; - - // process any nonce updates, ignore result - self.check_for_transaction_update().await; - } - - // send bundle - debug!( - "Building bundle on block {}", - sender_trigger.last_block.block_number - ); - let result = self.send_bundle(fee_increase_count).await; - - // handle result - match result { - Ok(SendBundleAttemptResult::Success) => { - // sent the bundle - info!("Bundle sent successfully"); - state = State::Pending( - sender_trigger.last_block.block_number - + self.settings.max_blocks_to_wait_for_mine, - 0, - ); - } - Ok(SendBundleAttemptResult::NoOperations) => { - debug!("No operations in bundle"); - - if fee_increase_count > 0 { - warn!("Abandoning bundle after fee increases {fee_increase_count}, no operations available"); - BuilderMetrics::increment_bundle_txns_abandoned( - self.builder_index, - self.entry_point.address(), - ); - send_bundle_result = - Some(SendBundleResult::NoOperationsAfterFeeIncreases { - attempt_number: fee_increase_count, - }); - - // abandon the bundle by resetting the tracker and starting a new bundle process - // If the node we are using still has the transaction in the mempool, its - // possible we will get a `ReplacementUnderpriced` on the next iteration - // and will start a cancellation. - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } else { - debug!("No operations available, waiting for next trigger"); - send_bundle_result = Some(SendBundleResult::NoOperationsInitially); - state = State::Building(true, 0); - } - } - Ok(SendBundleAttemptResult::NonceTooLow) => { - // reset the transaction tracker and try again - info!("Nonce too low, starting new bundle attempt"); - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } - Ok(SendBundleAttemptResult::ReplacementUnderpriced) => { - info!( - "Replacement transaction underpriced, entering cancellation loop" - ); - self.transaction_tracker.reset().await; - state = State::Cancelling(0); - } - Err(error) => { - error!("Bundle send error {error:?}"); - BuilderMetrics::increment_bundle_txns_failed( - self.builder_index, - self.entry_point.address(), - ); - self.transaction_tracker.reset().await; - send_bundle_result = Some(SendBundleResult::Error(error)); - state = State::Building(true, 0); - } - } - } - State::Pending(until, fee_increase_count) => { - sender_trigger.wait_for_block().await?; - - // check for transaction update - if let Some(update) = self.check_for_transaction_update().await { - match update { - TrackerUpdate::Mined { - block_number, - attempt_number, - gas_limit, - gas_used, - tx_hash, - nonce, - .. - } => { - // mined! - info!("Bundle transaction mined"); - BuilderMetrics::process_bundle_txn_success( - self.builder_index, - self.entry_point.address(), - gas_limit, - gas_used, - ); - self.emit(BuilderEvent::transaction_mined( - self.builder_index, - tx_hash, - nonce.low_u64(), - block_number, - )); - send_bundle_result = Some(SendBundleResult::Success { - block_number, - attempt_number, - tx_hash, - }); - state = State::Building(true, 0); - } - TrackerUpdate::LatestTxDropped { nonce } => { - // try again, don't wait for trigger, re-estimate fees - info!("Latest transaction dropped, starting new bundle attempt"); - self.emit(BuilderEvent::latest_transaction_dropped( - self.builder_index, - nonce.low_u64(), - )); - BuilderMetrics::increment_bundle_txns_dropped( - self.builder_index, - self.entry_point.address(), - ); - - // force reset the transaction tracker - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } - TrackerUpdate::NonceUsedForOtherTx { nonce } => { - // try again, don't wait for trigger, re-estimate fees - info!("Nonce used externally, starting new bundle attempt"); - self.emit(BuilderEvent::nonce_used_for_other_transaction( - self.builder_index, - nonce.low_u64(), - )); - BuilderMetrics::increment_bundle_txns_nonce_used( - self.builder_index, - self.entry_point.address(), - ); - state = State::Building(true, 0); - } - } - } else if sender_trigger.last_block().block_number >= until { - // start replacement, don't wait for trigger. Continue - // to attempt until there are no longer any UOs priced high enough - // to bundle. - info!( - "Not mined after {} blocks, increasing fees, attempt: {}", - self.settings.max_blocks_to_wait_for_mine, - fee_increase_count + 1 - ); - BuilderMetrics::increment_bundle_txn_fee_increases( - self.builder_index, - self.entry_point.address(), - ); - state = State::Building(false, fee_increase_count + 1); - } - } - State::Cancelling(fee_increase_count) => { - // cancel the transaction - info!("Cancelling last transaction"); - - let (estimated_fees, _) = self - .proposer - .estimate_gas_fees(None) - .await - .unwrap_or_default(); - - let cancel_res = self - .transaction_tracker - .cancel_transaction(self.entry_point.address(), estimated_fees) - .await; - - match cancel_res { - Ok(Some(_)) => { - info!("Cancellation transaction sent, waiting for confirmation"); - BuilderMetrics::increment_cancellation_txns_sent( - self.builder_index, - self.entry_point.address(), - ); - - state = State::CancelPending( - sender_trigger.last_block.block_number - + self.settings.max_blocks_to_wait_for_mine, - fee_increase_count, - ); - } - Ok(None) => { - info!("Soft cancellation or no transaction to cancel, starting new bundle attempt"); - BuilderMetrics::increment_soft_cancellations( - self.builder_index, - self.entry_point.address(), - ); - - state = State::Building(true, 0); - } - Err(TransactionTrackerError::ReplacementUnderpriced) => { - info!("Replacement transaction underpriced during cancellation, trying again"); - state = State::Cancelling(fee_increase_count + 1); - } - Err(TransactionTrackerError::NonceTooLow) => { - // reset the transaction tracker and try again - info!("Nonce too low during cancellation, starting new bundle attempt"); - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } - Err(e) => { - error!("Failed to cancel transaction, moving back to building state: {e:#?}"); - BuilderMetrics::increment_cancellation_txns_failed( - self.builder_index, - self.entry_point.address(), - ); - state = State::Building(true, 0); - } - } - } - State::CancelPending(until, fee_increase_count) => { - sender_trigger.wait_for_block().await?; - - // check for transaction update - if let Some(update) = self.check_for_transaction_update().await { - match update { - TrackerUpdate::Mined { .. } => { - // mined - info!("Cancellation transaction mined"); - BuilderMetrics::increment_cancellation_txns_mined( - self.builder_index, - self.entry_point.address(), - ); - } - TrackerUpdate::LatestTxDropped { .. } => { - // If a cancellation gets dropped, move to bundling state as there is no - // longer a pending transaction - info!( - "Cancellation transaction dropped, starting new bundle attempt" - ); - // force reset the transaction tracker - self.transaction_tracker.reset().await; - } - TrackerUpdate::NonceUsedForOtherTx { .. } => { - // If a nonce is used externally, move to bundling state as there is no longer - // a pending transaction - info!("Nonce used externally while cancelling, starting new bundle attempt"); - } - } - - state = State::Building(true, 0); - } else if sender_trigger.last_block().block_number >= until { - if fee_increase_count >= self.settings.max_fee_increases { - // abandon the cancellation - warn!("Abandoning cancellation after max fee increases {fee_increase_count}, starting new bundle attempt"); - // force reset the transaction tracker - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } else { - // start replacement, don't wait for trigger - info!( - "Cancellation not mined after {} blocks, increasing fees, attempt: {}", - self.settings.max_blocks_to_wait_for_mine, - fee_increase_count + 1 - ); - state = State::Cancelling(fee_increase_count + 1); - } - } - } - } + // initial state + let mut state = + SenderMachineState::new(sender_trigger, self.transaction_tracker.take().unwrap()); - // send result to manual caller - if let Some(res) = send_bundle_result.take() { - if let Some(r) = send_bundle_response.take() { - if r.send(res).is_err() { - error!("Failed to send bundle result to manual caller"); - } - } + loop { + if let Err(e) = self.step_state(&mut state).await { + error!("Error in bundle sender loop: {e:#?}"); + self.metrics.increment_state_machine_errors(); + state.reset(); } } } @@ -464,48 +171,264 @@ where chain_spec, beneficiary, proposer, - entry_point, - transaction_tracker, + transaction_tracker: Some(transaction_tracker), pool, settings, event_sender, + metrics: BuilderMetrics { + builder_index, + entry_point: entry_point.address(), + }, + entry_point, _uo_type: PhantomData, } } - async fn check_for_transaction_update(&mut self) -> Option { - let update = self.transaction_tracker.check_for_update().await; - let update = match update { - Ok(update) => update?, - Err(error) => { - error!("Failed to check for transaction updates: {error:#?}"); - return None; + async fn step_state(&mut self, state: &mut SenderMachineState) -> anyhow::Result<()> { + let tracker_update = state.wait_for_trigger().await?; + + match state.inner { + InnerState::Building(building_state) => { + self.handle_building_state(state, building_state).await?; } - }; + InnerState::Pending(pending_state) => { + self.handle_pending_state(state, pending_state, tracker_update) + .await?; + } + InnerState::Cancelling(cancelling_state) => { + self.handle_cancelling_state(state, cancelling_state) + .await?; + } + InnerState::CancelPending(cancel_pending_state) => { + self.handle_cancel_pending_state(state, cancel_pending_state, tracker_update) + .await?; + } + } - match update { - TrackerUpdate::Mined { - tx_hash, - block_number, - attempt_number, - nonce, - .. - } => { - if attempt_number == 0 { - info!("Transaction with hash {tx_hash:?}, nonce {nonce:?}, landed in block {block_number}"); + Ok(()) + } + + async fn handle_building_state( + &mut self, + state: &mut SenderMachineState, + inner: BuildingState, + ) -> anyhow::Result<()> { + // send bundle + let block_number = state.block_number(); + debug!("Building bundle on block {}", block_number); + let result = self.send_bundle(state, inner.fee_increase_count).await; + + // handle result + match result { + Ok(SendBundleAttemptResult::Success) => { + // sent the bundle + info!("Bundle sent successfully"); + state.update(InnerState::Pending(inner.to_pending( + block_number + self.settings.max_blocks_to_wait_for_mine, + ))); + } + Ok(SendBundleAttemptResult::NoOperations) => { + debug!("No operations to bundle"); + if inner.fee_increase_count > 0 { + warn!( + "Abandoning bundle after fee increases {}, no operations available", + inner.fee_increase_count + ); + self.metrics.increment_bundle_txns_abandoned(); + + // abandon the bundle by starting a new bundle process + // If the node we are using still has the transaction in the mempool, its + // possible we will get a `ReplacementUnderpriced` on the next iteration + // and will start a cancellation. + state.reset(); } else { - info!("Transaction with hash {tx_hash:?}, nonce {nonce:?}, landed in block {block_number} after increasing gas fees {attempt_number} time(s)"); + debug!("No operations available, waiting for next trigger"); + state.complete(Some(SendBundleResult::NoOperationsInitially)); } } - TrackerUpdate::LatestTxDropped { nonce } => { - info!("Previous transaction dropped by sender. Nonce: {nonce:?}"); + Ok(SendBundleAttemptResult::NonceTooLow) => { + // reset the transaction tracker and try again + info!("Nonce too low, starting new bundle attempt"); + state.reset(); } - TrackerUpdate::NonceUsedForOtherTx { nonce } => { - info!("Nonce used by external transaction. Nonce: {nonce:?}"); + Ok(SendBundleAttemptResult::ReplacementUnderpriced) => { + info!("Replacement transaction underpriced, entering cancellation loop"); + state.update(InnerState::Cancelling(inner.to_cancelling())); } - }; + Err(error) => { + error!("Bundle send error {error:?}"); + self.metrics.increment_bundle_txns_failed(); + let send_bundle_result = Some(SendBundleResult::Error(error)); + state.complete(send_bundle_result); + } + } + + Ok(()) + } - Some(update) + async fn handle_pending_state( + &mut self, + state: &mut SenderMachineState, + inner: PendingState, + tracker_update: Option, + ) -> anyhow::Result<()> { + if let Some(update) = tracker_update { + match update { + TrackerUpdate::Mined { + block_number, + attempt_number, + gas_limit, + gas_used, + tx_hash, + nonce, + .. + } => { + info!("Bundle transaction mined"); + self.metrics.process_bundle_txn_success(gas_limit, gas_used); + self.emit(BuilderEvent::transaction_mined( + self.builder_index, + tx_hash, + nonce.low_u64(), + block_number, + )); + let send_bundle_result = Some(SendBundleResult::Success { + block_number, + attempt_number, + tx_hash, + }); + state.complete(send_bundle_result); + } + TrackerUpdate::LatestTxDropped { nonce } => { + // try again, don't wait for trigger, re-estimate fees + info!("Latest transaction dropped, starting new bundle attempt"); + self.emit(BuilderEvent::latest_transaction_dropped( + self.builder_index, + nonce.low_u64(), + )); + self.metrics.increment_bundle_txns_dropped(); + state.reset(); + } + TrackerUpdate::NonceUsedForOtherTx { nonce } => { + // try again, don't wait for trigger, re-estimate fees + info!("Nonce used externally, starting new bundle attempt"); + self.emit(BuilderEvent::nonce_used_for_other_transaction( + self.builder_index, + nonce.low_u64(), + )); + self.metrics.increment_bundle_txns_nonce_used(); + state.reset(); + } + } + } else if state.block_number() >= inner.until { + // start replacement, don't wait for trigger. Continue + // to attempt until there are no longer any UOs priced high enough + // to bundle. + info!( + "Not mined after {} blocks, increasing fees, attempt: {}", + self.settings.max_blocks_to_wait_for_mine, + inner.fee_increase_count + 1 + ); + self.metrics.increment_bundle_txn_fee_increases(); + state.update(InnerState::Building(inner.to_building())) + } + + Ok(()) + } + + async fn handle_cancelling_state( + &mut self, + state: &mut SenderMachineState, + inner: CancellingState, + ) -> anyhow::Result<()> { + info!("Cancelling last transaction"); + + let (estimated_fees, _) = self + .proposer + .estimate_gas_fees(None) + .await + .unwrap_or_default(); + + let cancel_res = state + .transaction_tracker + .cancel_transaction(self.entry_point.address(), estimated_fees) + .await; + + match cancel_res { + Ok(Some(_)) => { + info!("Cancellation transaction sent, waiting for confirmation"); + self.metrics.increment_cancellation_txns_sent(); + + state.update(InnerState::CancelPending(inner.to_cancel_pending( + state.block_number() + self.settings.max_blocks_to_wait_for_mine, + ))); + } + Ok(None) => { + info!("Soft cancellation or no transaction to cancel, starting new bundle attempt"); + self.metrics.increment_soft_cancellations(); + state.reset(); + } + Err(TransactionTrackerError::ReplacementUnderpriced) => { + info!("Replacement transaction underpriced during cancellation, trying again"); + state.update(InnerState::Cancelling(inner.to_self())); + } + Err(TransactionTrackerError::NonceTooLow) => { + // reset the transaction tracker and try again + info!("Nonce too low during cancellation, starting new bundle attempt"); + state.reset(); + } + Err(e) => { + error!("Failed to cancel transaction, moving back to building state: {e:#?}"); + self.metrics.increment_cancellation_txns_failed(); + state.reset(); + } + } + + Ok(()) + } + + async fn handle_cancel_pending_state( + &mut self, + state: &mut SenderMachineState, + inner: CancelPendingState, + tracker_update: Option, + ) -> anyhow::Result<()> { + // check for transaction update + if let Some(update) = tracker_update { + match update { + TrackerUpdate::Mined { .. } => { + // mined + info!("Cancellation transaction mined"); + self.metrics.increment_cancellation_txns_mined(); + } + TrackerUpdate::LatestTxDropped { .. } => { + // If a cancellation gets dropped, move to bundling state as there is no + // longer a pending transaction + info!("Cancellation transaction dropped, starting new bundle attempt"); + } + TrackerUpdate::NonceUsedForOtherTx { .. } => { + // If a nonce is used externally, move to bundling state as there is no longer + // a pending transaction + info!("Nonce used externally while cancelling, starting new bundle attempt"); + } + } + state.reset(); + } else if state.block_number() >= inner.until { + if inner.fee_increase_count >= self.settings.max_fee_increases { + // abandon the cancellation + warn!("Abandoning cancellation after max fee increases {}, starting new bundle attempt", inner.fee_increase_count); + state.reset(); + } else { + // start replacement, don't wait for trigger + info!( + "Cancellation not mined after {} blocks, increasing fees, attempt: {}", + self.settings.max_blocks_to_wait_for_mine, + inner.fee_increase_count + 1 + ); + state.update(InnerState::Cancelling(inner.to_cancelling())); + } + } + + Ok(()) } /// Constructs a bundle and sends it to the entry point as a transaction. @@ -516,9 +439,10 @@ where /// are no ops that meet the fee requirements. async fn send_bundle( &mut self, + state: &mut SenderMachineState, fee_increase_count: u64, ) -> anyhow::Result { - let (nonce, required_fees) = self.transaction_tracker.get_nonce_and_required_fees()?; + let (nonce, required_fees) = state.transaction_tracker.get_nonce_and_required_fees()?; let Some(bundle_tx) = self .get_bundle_tx(nonce, required_fees, fee_increase_count > 0) @@ -539,9 +463,9 @@ where op_hashes, } = bundle_tx; - BuilderMetrics::increment_bundle_txns_sent(self.builder_index, self.entry_point.address()); + self.metrics.increment_bundle_txns_sent(); - let send_result = self + let send_result = state .transaction_tracker .send_transaction(tx.clone(), &expected_storage) .await; @@ -567,10 +491,7 @@ where Ok(SendBundleAttemptResult::NonceTooLow) } Err(TransactionTrackerError::ReplacementUnderpriced) => { - BuilderMetrics::increment_bundle_txn_replacement_underpriced( - self.builder_index, - self.entry_point.address(), - ); + self.metrics.increment_bundle_txn_replacement_underpriced(); warn!("Replacement transaction underpriced"); Ok(SendBundleAttemptResult::ReplacementUnderpriced) } @@ -669,6 +590,174 @@ where } } +struct SenderMachineState { + trigger: BundleSenderTrigger, + transaction_tracker: T, + send_bundle_response: Option>, + inner: InnerState, + requires_reset: bool, +} + +impl SenderMachineState { + fn new(trigger: BundleSenderTrigger, transaction_tracker: T) -> Self { + Self { + trigger, + transaction_tracker, + send_bundle_response: None, + inner: InnerState::new(), + requires_reset: false, + } + } + + fn update(&mut self, inner: InnerState) { + self.inner = inner; + } + + // resets the state machine to the initial state, doesn't wait for next trigger + fn reset(&mut self) { + self.requires_reset = true; + let building_state = BuildingState { + wait_for_trigger: false, + fee_increase_count: 0, + }; + self.inner = InnerState::Building(building_state); + } + + fn complete(&mut self, result: Option) { + if let Some(result) = result { + if let Some(r) = self.send_bundle_response.take() { + if r.send(result).is_err() { + error!("Failed to send bundle result to manual caller"); + } + } + } + self.inner = InnerState::new(); + } + + async fn wait_for_trigger(&mut self) -> anyhow::Result> { + if self.requires_reset { + self.transaction_tracker.reset().await; + self.requires_reset = false; + } + + match &self.inner { + InnerState::Building(s) => { + if !s.wait_for_trigger { + return Ok(None); + } + + self.send_bundle_response = self.trigger.wait_for_trigger().await?; + self.transaction_tracker + .check_for_update() + .await + .map_err(|e| anyhow::anyhow!("transaction tracker update error {e:?}")) + } + InnerState::Pending(..) | InnerState::CancelPending(..) => { + self.trigger.wait_for_block().await?; + self.transaction_tracker + .check_for_update() + .await + .map_err(|e| anyhow::anyhow!("transaction tracker update error {e:?}")) + } + InnerState::Cancelling(..) => Ok(None), + } + } + + fn block_number(&self) -> u64 { + self.trigger.last_block().block_number + } +} + +// State of the sender loop +enum InnerState { + // Building a bundle, optionally waiting for a trigger to send it + Building(BuildingState), + // Waiting for a bundle to be mined + Pending(PendingState), + // Cancelling the last transaction + Cancelling(CancellingState), + // Waiting for a cancellation transaction to be mined + CancelPending(CancelPendingState), +} + +impl InnerState { + fn new() -> Self { + InnerState::Building(BuildingState { + wait_for_trigger: true, + fee_increase_count: 0, + }) + } +} + +#[derive(Debug, Clone, Copy)] +struct BuildingState { + wait_for_trigger: bool, + fee_increase_count: u64, +} + +impl BuildingState { + fn to_pending(self, until: u64) -> PendingState { + PendingState { + until, + fee_increase_count: self.fee_increase_count, + } + } + + fn to_cancelling(self) -> CancellingState { + CancellingState { + fee_increase_count: 0, + } + } +} + +#[derive(Debug, Clone, Copy)] +struct PendingState { + until: u64, + fee_increase_count: u64, +} + +impl PendingState { + fn to_building(self) -> BuildingState { + BuildingState { + wait_for_trigger: true, + fee_increase_count: self.fee_increase_count + 1, + } + } +} + +#[derive(Debug, Clone, Copy)] +struct CancellingState { + fee_increase_count: u64, +} + +impl CancellingState { + fn to_self(mut self) -> Self { + self.fee_increase_count += 1; + self + } + + fn to_cancel_pending(self, until: u64) -> CancelPendingState { + CancelPendingState { + until, + fee_increase_count: self.fee_increase_count, + } + } +} + +#[derive(Debug, Clone, Copy)] +struct CancelPendingState { + until: u64, + fee_increase_count: u64, +} + +impl CancelPendingState { + fn to_cancelling(self) -> CancellingState { + CancellingState { + fee_increase_count: self.fee_increase_count + 1, + } + } +} + struct BundleSenderTrigger { bundling_mode: BundlingMode, block_rx: UnboundedReceiver, @@ -826,69 +915,72 @@ impl BundleSenderTrigger { } } -struct BuilderMetrics {} +#[derive(Debug, Clone)] +struct BuilderMetrics { + builder_index: u64, + entry_point: Address, +} impl BuilderMetrics { - fn increment_bundle_txns_sent(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_sent", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()) + fn increment_bundle_txns_sent(&self) { + metrics::counter!("builder_bundle_txns_sent", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()) .increment(1); } - fn process_bundle_txn_success( - builder_index: u64, - entry_point: Address, - gas_limit: Option, - gas_used: Option, - ) { - metrics::counter!("builder_bundle_txns_success", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn process_bundle_txn_success(&self, gas_limit: Option, gas_used: Option) { + metrics::counter!("builder_bundle_txns_success", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); if let Some(limit) = gas_limit { - metrics::counter!("builder_bundle_gas_limit", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(limit.as_u64()); + metrics::counter!("builder_bundle_gas_limit", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(limit.as_u64()); } if let Some(used) = gas_used { - metrics::counter!("builder_bundle_gas_used", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(used.as_u64()); + metrics::counter!("builder_bundle_gas_used", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(used.as_u64()); } } - fn increment_bundle_txns_dropped(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_dropped", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txns_dropped(&self) { + metrics::counter!("builder_bundle_txns_dropped", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } // used when we decide to stop trying a transaction - fn increment_bundle_txns_abandoned(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_abandoned", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txns_abandoned(&self) { + metrics::counter!("builder_bundle_txns_abandoned", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } // used when sending a transaction fails - fn increment_bundle_txns_failed(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_failed", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txns_failed(&self) { + metrics::counter!("builder_bundle_txns_failed", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); + } + + fn increment_bundle_txns_nonce_used(&self) { + metrics::counter!("builder_bundle_txns_nonce_used", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_bundle_txns_nonce_used(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_nonce_used", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txn_fee_increases(&self) { + metrics::counter!("builder_bundle_fee_increases", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_bundle_txn_fee_increases(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_fee_increases", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txn_replacement_underpriced(&self) { + metrics::counter!("builder_bundle_replacement_underpriced", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_bundle_txn_replacement_underpriced(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_replacement_underpriced", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_cancellation_txns_sent(&self) { + metrics::counter!("builder_cancellation_txns_sent", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_cancellation_txns_sent(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_cancellation_txns_sent", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_cancellation_txns_mined(&self) { + metrics::counter!("builder_cancellation_txns_mined", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_cancellation_txns_mined(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_cancellation_txns_mined", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_soft_cancellations(&self) { + metrics::counter!("builder_soft_cancellations", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_soft_cancellations(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_soft_cancellations", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_cancellation_txns_failed(&self) { + metrics::counter!("builder_cancellation_txns_failed", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_cancellation_txns_failed(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_cancellation_txns_failed", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_state_machine_errors(&self) { + metrics::counter!("builder_state_machine_errors", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } } diff --git a/crates/builder/src/server/local.rs b/crates/builder/src/server/local.rs index 76bc0cf87..fa6758ab3 100644 --- a/crates/builder/src/server/local.rs +++ b/crates/builder/src/server/local.rs @@ -186,9 +186,6 @@ impl LocalBuilderServerRunner { SendBundleResult::NoOperationsInitially => { Err(anyhow::anyhow!("no ops to send").into()) }, - SendBundleResult::NoOperationsAfterFeeIncreases { .. } => { - Err(anyhow::anyhow!("bundle initially had operations, but after increasing gas fees it was empty").into()) - }, SendBundleResult::StalledAtMaxFeeIncreases => Err(anyhow::anyhow!("stalled at max fee increases").into()), SendBundleResult::Error(e) => Err(anyhow::anyhow!("send bundle error: {e:?}").into()), }