From a908eefbe8b4916b956ed586dd86f3cf091b3062 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Tue, 11 Jun 2024 11:56:55 -0500 Subject: [PATCH 1/2] feat(builder): cancel transaction when replacement underpriced --- crates/builder/src/bundle_proposer.rs | 21 +- crates/builder/src/bundle_sender.rs | 276 +++++++++++++++------- crates/builder/src/sender/bloxroute.rs | 30 ++- crates/builder/src/sender/conditional.rs | 31 ++- crates/builder/src/sender/flashbots.rs | 99 ++++++-- crates/builder/src/sender/mod.rs | 40 +++- crates/builder/src/sender/raw.rs | 32 ++- crates/builder/src/transaction_tracker.rs | 63 ++++- 8 files changed, 480 insertions(+), 112 deletions(-) diff --git a/crates/builder/src/bundle_proposer.rs b/crates/builder/src/bundle_proposer.rs index 49375116b..d006a395f 100644 --- a/crates/builder/src/bundle_proposer.rs +++ b/crates/builder/src/bundle_proposer.rs @@ -96,11 +96,21 @@ impl Bundle { pub(crate) trait BundleProposer: Send + Sync + 'static { type UO: UserOperation; + /// Constructs the next bundle + /// + /// If `min_fees` is `Some`, the proposer will ensure the bundle has + /// at least `min_fees`. async fn make_bundle( &self, - required_fees: Option, + min_fees: Option, is_replacement: bool, ) -> anyhow::Result>; + + /// Gets the current gas fees + /// + /// If `min_fees` is `Some`, the proposer will ensure the gas fees returned are at least `min_fees`. + async fn estimate_gas_fees(&self, min_fees: Option) + -> anyhow::Result<(GasFees, U256)>; } #[derive(Debug)] @@ -138,6 +148,13 @@ where { type UO = UO; + async fn estimate_gas_fees( + &self, + required_fees: Option, + ) -> anyhow::Result<(GasFees, U256)> { + self.fee_estimator.required_bundle_fees(required_fees).await + } + async fn make_bundle( &self, required_fees: Option, @@ -148,7 +165,7 @@ where self.provider .get_latest_block_hash_and_number() .map_err(anyhow::Error::from), - self.fee_estimator.required_bundle_fees(required_fees) + self.estimate_gas_fees(required_fees) )?; if ops.is_empty() { return Ok(Bundle::default()); diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index cae080929..67375f0c5 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -132,6 +132,12 @@ where // Waiting for a bundle to be mined // (wait_until_block, fee_increase_count) Pending(u64, u64), + // Cancelling the last transaction + // (fee_increase_count) + Cancelling(u64), + // Waiting for a cancellation transaction to be mined + // (wait_until_block, fee_increase_count) + CancelPending(u64, u64), } // initial state @@ -181,39 +187,40 @@ where debug!("No operations in bundle"); if fee_increase_count > 0 { - // TODO(danc): when abandoning we tend to get "stuck" where we have a pending bundle - // transaction that isn't landing. We should move to a new state where we track the - // pending transaction and "cancel" it if it doesn't land after a certain number of blocks. - - warn!("Abandoning bundle after fee increases {fee_increase_count}, no operations available, waiting for next trigger"); + warn!("Abandoning bundle after fee increases {fee_increase_count}, no operations available"); BuilderMetrics::increment_bundle_txns_abandoned( self.builder_index, self.entry_point.address(), ); - self.transaction_tracker.reset().await; send_bundle_result = Some(SendBundleResult::NoOperationsAfterFeeIncreases { attempt_number: fee_increase_count, - }) + }); + + // abandon the bundle by resetting the tracker and starting a new bundle process + // If the node we are using still has the transaction in the mempool, its + // possible we will get a `ReplacementUnderpriced` on the next iteration + // and will start a cancellation. + self.transaction_tracker.reset().await; + state = State::Building(true, 0); } else { debug!("No operations available, waiting for next trigger"); send_bundle_result = Some(SendBundleResult::NoOperationsInitially); + state = State::Building(true, 0); } - - state = State::Building(true, 0); } Ok(SendBundleAttemptResult::NonceTooLow) => { // reset the transaction tracker and try again + info!("Nonce too low, starting new bundle attempt"); self.transaction_tracker.reset().await; state = State::Building(true, 0); } Ok(SendBundleAttemptResult::ReplacementUnderpriced) => { - // TODO(danc): handle replacement underpriced - // move to the cancellation state - - // for now: reset the transaction tracker and try again + info!( + "Replacement transaction underpriced, entering cancellation loop" + ); self.transaction_tracker.reset().await; - state = State::Building(true, 0); + state = State::Cancelling(0); } Err(error) => { error!("Bundle send error {error:?}"); @@ -236,11 +243,26 @@ where TrackerUpdate::Mined { block_number, attempt_number, + gas_limit, + gas_used, tx_hash, + nonce, .. } => { // mined! info!("Bundle transaction mined"); + BuilderMetrics::process_bundle_txn_success( + self.builder_index, + self.entry_point.address(), + gas_limit, + gas_used, + ); + self.emit(BuilderEvent::transaction_mined( + self.builder_index, + tx_hash, + nonce.low_u64(), + block_number, + )); send_bundle_result = Some(SendBundleResult::Success { block_number, attempt_number, @@ -248,46 +270,156 @@ where }); state = State::Building(true, 0); } - TrackerUpdate::LatestTxDropped { .. } => { + TrackerUpdate::LatestTxDropped { nonce } => { // try again, don't wait for trigger, re-estimate fees info!("Latest transaction dropped, starting new bundle attempt"); + self.emit(BuilderEvent::latest_transaction_dropped( + self.builder_index, + nonce.low_u64(), + )); + BuilderMetrics::increment_bundle_txns_dropped( + self.builder_index, + self.entry_point.address(), + ); // force reset the transaction tracker self.transaction_tracker.reset().await; - state = State::Building(true, 0); } - TrackerUpdate::NonceUsedForOtherTx { .. } => { + TrackerUpdate::NonceUsedForOtherTx { nonce } => { // try again, don't wait for trigger, re-estimate fees info!("Nonce used externally, starting new bundle attempt"); + self.emit(BuilderEvent::nonce_used_for_other_transaction( + self.builder_index, + nonce.low_u64(), + )); + BuilderMetrics::increment_bundle_txns_nonce_used( + self.builder_index, + self.entry_point.address(), + ); state = State::Building(true, 0); } } } else if sender_trigger.last_block().block_number >= until { - if fee_increase_count >= self.settings.max_fee_increases { - // TODO(danc): same "stuck" issue here on abandonment - // this abandon is likely to lead to "transaction underpriced" errors - // Instead, move to cancellation state. + // start replacement, don't wait for trigger. Continue + // to attempt until there are no longer any UOs priced high enough + // to bundle. + info!( + "Not mined after {} blocks, increasing fees, attempt: {}", + self.settings.max_blocks_to_wait_for_mine, + fee_increase_count + 1 + ); + BuilderMetrics::increment_bundle_txn_fee_increases( + self.builder_index, + self.entry_point.address(), + ); + state = State::Building(false, fee_increase_count + 1); + } + } + State::Cancelling(fee_increase_count) => { + // cancel the transaction + info!("Cancelling last transaction"); + + let (estimated_fees, _) = self + .proposer + .estimate_gas_fees(None) + .await + .unwrap_or_default(); + + let cancel_res = self + .transaction_tracker + .cancel_transaction(self.entry_point.address(), estimated_fees) + .await; + + match cancel_res { + Ok(Some(_)) => { + info!("Cancellation transaction sent, waiting for confirmation"); + BuilderMetrics::increment_cancellation_txns_sent( + self.builder_index, + self.entry_point.address(), + ); + + state = State::CancelPending( + sender_trigger.last_block.block_number + + self.settings.max_blocks_to_wait_for_mine, + fee_increase_count, + ); + } + Ok(None) => { + info!("Soft cancellation or no transaction to cancel, starting new bundle attempt"); + BuilderMetrics::increment_soft_cancellations( + self.builder_index, + self.entry_point.address(), + ); - warn!("Abandoning bundle after max fee increases {fee_increase_count}"); - BuilderMetrics::increment_bundle_txns_abandoned( + state = State::Building(true, 0); + } + Err(TransactionTrackerError::ReplacementUnderpriced) => { + info!("Replacement transaction underpriced during cancellation, trying again"); + state = State::Cancelling(fee_increase_count + 1); + } + Err(TransactionTrackerError::NonceTooLow) => { + // reset the transaction tracker and try again + info!("Nonce too low during cancellation, starting new bundle attempt"); + self.transaction_tracker.reset().await; + state = State::Building(true, 0); + } + Err(e) => { + error!("Failed to cancel transaction, moving back to building state: {e:#?}"); + BuilderMetrics::increment_cancellation_txns_failed( self.builder_index, self.entry_point.address(), ); + state = State::Building(true, 0); + } + } + } + State::CancelPending(until, fee_increase_count) => { + sender_trigger.wait_for_block().await?; + + // check for transaction update + if let Some(update) = self.check_for_transaction_update().await { + match update { + TrackerUpdate::Mined { .. } => { + // mined + info!("Cancellation transaction mined"); + BuilderMetrics::increment_cancellation_txns_mined( + self.builder_index, + self.entry_point.address(), + ); + } + TrackerUpdate::LatestTxDropped { .. } => { + // If a cancellation gets dropped, move to bundling state as there is no + // longer a pending transaction + info!( + "Cancellation transaction dropped, starting new bundle attempt" + ); + // force reset the transaction tracker + self.transaction_tracker.reset().await; + } + TrackerUpdate::NonceUsedForOtherTx { .. } => { + // If a nonce is used externally, move to bundling state as there is no longer + // a pending transaction + info!("Nonce used externally while cancelling, starting new bundle attempt"); + } + } + + state = State::Building(true, 0); + } else if sender_trigger.last_block().block_number >= until { + if fee_increase_count >= self.settings.max_fee_increases { + // abandon the cancellation + warn!("Abandoning cancellation after max fee increases {fee_increase_count}, starting new bundle attempt"); + // force reset the transaction tracker self.transaction_tracker.reset().await; state = State::Building(true, 0); } else { // start replacement, don't wait for trigger info!( - "Not mined after {} blocks, increasing fees, attempt: {}", + "Cancellation not mined after {} blocks, increasing fees, attempt: {}", self.settings.max_blocks_to_wait_for_mine, fee_increase_count + 1 ); - BuilderMetrics::increment_bundle_txn_fee_increases( - self.builder_index, - self.entry_point.address(), - ); - state = State::Building(false, fee_increase_count + 1); + state = State::Cancelling(fee_increase_count + 1); } } } @@ -351,60 +483,25 @@ where } }; - // process update before returning match update { TrackerUpdate::Mined { tx_hash, block_number, attempt_number, - gas_limit, - gas_used, nonce, .. } => { - BuilderMetrics::increment_bundle_txns_success( - self.builder_index, - self.entry_point.address(), - ); - BuilderMetrics::set_bundle_gas_stats( - gas_limit, - gas_used, - self.builder_index, - self.entry_point.address(), - ); if attempt_number == 0 { - info!("Bundle with hash {tx_hash:?} landed in block {block_number}"); + info!("Transaction with hash {tx_hash:?}, nonce {nonce:?}, landed in block {block_number}"); } else { - info!("Bundle with hash {tx_hash:?} landed in block {block_number} after increasing gas fees {attempt_number} time(s)"); + info!("Transaction with hash {tx_hash:?}, nonce {nonce:?}, landed in block {block_number} after increasing gas fees {attempt_number} time(s)"); } - self.emit(BuilderEvent::transaction_mined( - self.builder_index, - tx_hash, - nonce.low_u64(), - block_number, - )); } TrackerUpdate::LatestTxDropped { nonce } => { - self.emit(BuilderEvent::latest_transaction_dropped( - self.builder_index, - nonce.low_u64(), - )); - BuilderMetrics::increment_bundle_txns_dropped( - self.builder_index, - self.entry_point.address(), - ); - info!("Previous transaction dropped by sender"); + info!("Previous transaction dropped by sender. Nonce: {nonce:?}"); } TrackerUpdate::NonceUsedForOtherTx { nonce } => { - self.emit(BuilderEvent::nonce_used_for_other_transaction( - self.builder_index, - nonce.low_u64(), - )); - BuilderMetrics::increment_bundle_txns_nonce_used( - self.builder_index, - self.entry_point.address(), - ); - info!("Nonce used by external transaction"); + info!("Nonce used by external transaction. Nonce: {nonce:?}"); } }; @@ -697,7 +794,8 @@ impl BundleSenderTrigger { } async fn wait_for_block(&mut self) -> anyhow::Result { - self.block_rx + self.last_block = self + .block_rx .recv() .await .ok_or_else(|| anyhow::anyhow!("Block stream closed"))?; @@ -736,8 +834,20 @@ impl BuilderMetrics { .increment(1); } - fn increment_bundle_txns_success(builder_index: u64, entry_point: Address) { + fn process_bundle_txn_success( + builder_index: u64, + entry_point: Address, + gas_limit: Option, + gas_used: Option, + ) { metrics::counter!("builder_bundle_txns_success", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + + if let Some(limit) = gas_limit { + metrics::counter!("builder_bundle_gas_limit", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(limit.as_u64()); + } + if let Some(used) = gas_used { + metrics::counter!("builder_bundle_gas_used", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(used.as_u64()); + } } fn increment_bundle_txns_dropped(builder_index: u64, entry_point: Address) { @@ -766,17 +876,19 @@ impl BuilderMetrics { metrics::counter!("builder_bundle_replacement_underpriced", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); } - fn set_bundle_gas_stats( - gas_limit: Option, - gas_used: Option, - builder_index: u64, - entry_point: Address, - ) { - if let Some(limit) = gas_limit { - metrics::counter!("builder_bundle_gas_limit", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(limit.as_u64()); - } - if let Some(used) = gas_used { - metrics::counter!("builder_bundle_gas_used", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(used.as_u64()); - } + fn increment_cancellation_txns_sent(builder_index: u64, entry_point: Address) { + metrics::counter!("builder_cancellation_txns_sent", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + } + + fn increment_cancellation_txns_mined(builder_index: u64, entry_point: Address) { + metrics::counter!("builder_cancellation_txns_mined", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + } + + fn increment_soft_cancellations(builder_index: u64, entry_point: Address) { + metrics::counter!("builder_soft_cancellations", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + } + + fn increment_cancellation_txns_failed(builder_index: u64, entry_point: Address) { + metrics::counter!("builder_cancellation_txns_failed", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); } } diff --git a/crates/builder/src/sender/bloxroute.rs b/crates/builder/src/sender/bloxroute.rs index b23ea7a92..66a8807b3 100644 --- a/crates/builder/src/sender/bloxroute.rs +++ b/crates/builder/src/sender/bloxroute.rs @@ -19,6 +19,7 @@ use ethers::{ providers::{JsonRpcClient, Middleware, Provider}, types::{ transaction::eip2718::TypedTransaction, Address, Bytes, TransactionReceipt, TxHash, H256, + U256, }, utils::hex, }; @@ -28,12 +29,16 @@ use jsonrpsee::{ http_client::{transport::HttpBackend, HeaderMap, HeaderValue, HttpClient, HttpClientBuilder}, }; use rundler_sim::ExpectedStorage; +use rundler_types::GasFees; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use tokio::time; use tonic::async_trait; -use super::{fill_and_sign, Result, SentTxInfo, TransactionSender, TxStatus}; +use super::{ + create_hard_cancel_tx, fill_and_sign, CancelTxInfo, Result, SentTxInfo, TransactionSender, + TxStatus, +}; pub(crate) struct PolygonBloxrouteTransactionSender where @@ -62,6 +67,29 @@ where Ok(SentTxInfo { nonce, tx_hash }) } + async fn cancel_transaction( + &self, + _tx_hash: H256, + nonce: U256, + to: Address, + gas_fees: GasFees, + ) -> Result { + let tx = create_hard_cancel_tx(self.provider.address(), to, nonce, gas_fees); + + let (raw_tx, _) = fill_and_sign(&self.provider, tx).await?; + + let tx_hash = self + .provider + .provider() + .request("eth_sendRawTransaction", (raw_tx,)) + .await?; + + Ok(CancelTxInfo { + tx_hash, + soft_cancelled: false, + }) + } + async fn get_transaction_status(&self, tx_hash: H256) -> Result { let tx = self .provider diff --git a/crates/builder/src/sender/conditional.rs b/crates/builder/src/sender/conditional.rs index e02f78a68..5e1ca3e35 100644 --- a/crates/builder/src/sender/conditional.rs +++ b/crates/builder/src/sender/conditional.rs @@ -17,14 +17,18 @@ use anyhow::Context; use ethers::{ middleware::SignerMiddleware, providers::{JsonRpcClient, Middleware, PendingTransaction, Provider}, - types::{transaction::eip2718::TypedTransaction, Address, TransactionReceipt, H256}, + types::{transaction::eip2718::TypedTransaction, Address, TransactionReceipt, H256, U256}, }; use ethers_signers::Signer; use rundler_sim::ExpectedStorage; +use rundler_types::GasFees; use serde_json::json; use tonic::async_trait; -use super::{fill_and_sign, Result, SentTxInfo, TransactionSender, TxStatus}; +use super::{ + create_hard_cancel_tx, fill_and_sign, CancelTxInfo, Result, SentTxInfo, TransactionSender, + TxStatus, +}; pub(crate) struct ConditionalTransactionSender where @@ -62,6 +66,29 @@ where Ok(SentTxInfo { nonce, tx_hash }) } + async fn cancel_transaction( + &self, + _tx_hash: H256, + nonce: U256, + to: Address, + gas_fees: GasFees, + ) -> Result { + let tx = create_hard_cancel_tx(self.provider.address(), to, nonce, gas_fees); + + let (raw_tx, _) = fill_and_sign(&self.provider, tx).await?; + + let tx_hash = self + .provider + .provider() + .request("eth_sendRawTransaction", (raw_tx,)) + .await?; + + Ok(CancelTxInfo { + tx_hash, + soft_cancelled: false, + }) + } + async fn get_transaction_status(&self, tx_hash: H256) -> Result { let tx = self .provider diff --git a/crates/builder/src/sender/flashbots.rs b/crates/builder/src/sender/flashbots.rs index bad8c6bf3..eecaf1144 100644 --- a/crates/builder/src/sender/flashbots.rs +++ b/crates/builder/src/sender/flashbots.rs @@ -26,8 +26,7 @@ use ethers::{ middleware::SignerMiddleware, providers::{interval, JsonRpcClient, Middleware, Provider}, types::{ - transaction::eip2718::TypedTransaction, Address, Bytes, TransactionReceipt, TxHash, H256, - U256, U64, + transaction::eip2718::TypedTransaction, Address, Bytes, TransactionReceipt, H256, U256, U64, }, utils, }; @@ -37,8 +36,9 @@ use futures_util::{Stream, StreamExt, TryFutureExt}; use pin_project::pin_project; use reqwest::{ header::{HeaderMap, HeaderValue, CONTENT_TYPE}, - Client, + Client, Response, }; +use rundler_types::GasFees; use serde::{de, Deserialize, Serialize}; use serde_json::{json, Value}; use tonic::async_trait; @@ -46,6 +46,7 @@ use tonic::async_trait; use super::{ fill_and_sign, ExpectedStorage, Result, SentTxInfo, TransactionSender, TxSenderError, TxStatus, }; +use crate::sender::CancelTxInfo; #[derive(Debug)] pub(crate) struct FlashbotsTransactionSender { @@ -75,6 +76,28 @@ where Ok(SentTxInfo { nonce, tx_hash }) } + async fn cancel_transaction( + &self, + tx_hash: H256, + _nonce: U256, + _to: Address, + _gas_fees: GasFees, + ) -> Result { + let success = self + .flashbots_client + .cancel_private_transaction(tx_hash) + .await?; + + if !success { + return Err(TxSenderError::SoftCancelFailed); + } + + Ok(CancelTxInfo { + tx_hash: H256::zero(), + soft_cancelled: true, + }) + } + async fn get_transaction_status(&self, tx_hash: H256) -> Result { let status = self.flashbots_client.status(tx_hash).await?; Ok(match status.status { @@ -181,13 +204,31 @@ struct Refund { #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] -struct FlashbotsPrivateTransaction { +struct FlashbotsSendPrivateTransactionRequest { tx: Bytes, #[serde(skip_serializing_if = "Option::is_none")] max_block_number: Option, preferences: Preferences, } +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct FlashbotsSendPrivateTransactionResponse { + result: H256, +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +struct FlashbotsCancelPrivateTransactionRequest { + tx_hash: H256, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct FlashbotsCancelPrivateTransactionResponse { + result: bool, +} + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] #[allow(dead_code)] @@ -277,7 +318,7 @@ where "jsonrpc": "2.0", "method": "eth_sendPrivateTransaction", "params": [ - FlashbotsPrivateTransaction { + FlashbotsSendPrivateTransactionRequest { tx: raw_tx, max_block_number: None, preferences, @@ -285,6 +326,37 @@ where "id": 1 }); + let response = self.sign_send_request(body).await?; + + let parsed_response = response + .json::() + .await + .map_err(|e| anyhow!("failed to deserialize Flashbots response: {:?}", e))?; + + Ok(parsed_response.result) + } + + async fn cancel_private_transaction(&self, tx_hash: H256) -> anyhow::Result { + let body = json!({ + "jsonrpc": "2.0", + "method": "eth_cancelPrivateTransaction", + "params": [ + FlashbotsCancelPrivateTransactionRequest { tx_hash } + ], + "id": 1 + }); + + let response = self.sign_send_request(body).await?; + + let parsed_response = response + .json::() + .await + .map_err(|e| anyhow!("failed to deserialize Flashbots response: {:?}", e))?; + + Ok(parsed_response.result) + } + + async fn sign_send_request(&self, body: Value) -> anyhow::Result { let signature = self .signer .sign_message(format!( @@ -302,29 +374,16 @@ where headers.insert("x-flashbots-signature", header_val); // Send the request - let response = self - .http_client + self.http_client .post(&self.relay_url) .headers(headers) .body(body.to_string()) .send() .await - .map_err(|e| anyhow!("failed to send transaction to Flashbots: {:?}", e))?; - - let parsed_response = response - .json::() - .await - .map_err(|e| anyhow!("failed to deserialize Flashbots response: {:?}", e))?; - - Ok(parsed_response.result) + .map_err(|e| anyhow!("failed to send request to Flashbots: {:?}", e)) } } -#[derive(Deserialize, Debug)] -struct FlashbotsResponse { - result: TxHash, -} - type PinBoxFut<'a, T> = Pin> + Send + 'a>>; enum PendingFlashbotsTxState<'a> { diff --git a/crates/builder/src/sender/mod.rs b/crates/builder/src/sender/mod.rs index a83d0a296..df4214a9a 100644 --- a/crates/builder/src/sender/mod.rs +++ b/crates/builder/src/sender/mod.rs @@ -26,7 +26,8 @@ use ethers::{ prelude::SignerMiddleware, providers::{JsonRpcClient, Middleware, Provider, ProviderError}, types::{ - transaction::eip2718::TypedTransaction, Address, Bytes, TransactionReceipt, H256, U256, + transaction::eip2718::TypedTransaction, Address, Bytes, Eip1559TransactionRequest, + TransactionReceipt, H256, U256, }, }; use ethers_signers::{LocalWallet, Signer}; @@ -35,12 +36,22 @@ pub(crate) use flashbots::FlashbotsTransactionSender; use mockall::automock; pub(crate) use raw::RawTransactionSender; use rundler_sim::ExpectedStorage; +use rundler_types::GasFees; + #[derive(Debug)] pub(crate) struct SentTxInfo { pub(crate) nonce: U256, pub(crate) tx_hash: H256, } +#[derive(Debug)] +pub(crate) struct CancelTxInfo { + pub(crate) tx_hash: H256, + // True if the transaction was soft-cancelled. Soft-cancellation is when the RPC endpoint + // accepts the cancel without an onchain transaction. + pub(crate) soft_cancelled: bool, +} + #[derive(Debug)] pub(crate) enum TxStatus { Pending, @@ -57,6 +68,9 @@ pub(crate) enum TxSenderError { /// Nonce too low #[error("nonce too low")] NonceTooLow, + /// Soft cancellation failed + #[error("soft cancel failed")] + SoftCancelFailed, /// All other errors #[error(transparent)] Other(#[from] anyhow::Error), @@ -74,6 +88,14 @@ pub(crate) trait TransactionSender: Send + Sync + 'static { expected_storage: &ExpectedStorage, ) -> Result; + async fn cancel_transaction( + &self, + tx_hash: H256, + nonce: U256, + to: Address, + gas_fees: GasFees, + ) -> Result; + async fn get_transaction_status(&self, tx_hash: H256) -> Result; async fn wait_until_mined(&self, tx_hash: H256) -> Result>; @@ -213,6 +235,22 @@ where Ok((tx.rlp_signed(&signature), nonce)) } +fn create_hard_cancel_tx( + from: Address, + to: Address, + nonce: U256, + gas_fees: GasFees, +) -> TypedTransaction { + Eip1559TransactionRequest::new() + .from(from) + .to(to) + .nonce(nonce) + .max_fee_per_gas(gas_fees.max_fee_per_gas) + .max_priority_fee_per_gas(gas_fees.max_priority_fee_per_gas) + .data(Bytes::new()) + .into() +} + impl From for TxSenderError { fn from(value: ProviderError) -> Self { match &value { diff --git a/crates/builder/src/sender/raw.rs b/crates/builder/src/sender/raw.rs index afaacab95..4495606e4 100644 --- a/crates/builder/src/sender/raw.rs +++ b/crates/builder/src/sender/raw.rs @@ -18,13 +18,16 @@ use async_trait::async_trait; use ethers::{ middleware::SignerMiddleware, providers::{JsonRpcClient, Middleware, PendingTransaction, Provider}, - types::{transaction::eip2718::TypedTransaction, Address, TransactionReceipt, H256}, + types::{transaction::eip2718::TypedTransaction, Address, TransactionReceipt, H256, U256}, }; use ethers_signers::Signer; use rundler_sim::ExpectedStorage; +use rundler_types::GasFees; -use super::Result; -use crate::sender::{fill_and_sign, SentTxInfo, TransactionSender, TxStatus}; +use super::{CancelTxInfo, Result}; +use crate::sender::{ + create_hard_cancel_tx, fill_and_sign, SentTxInfo, TransactionSender, TxStatus, +}; #[derive(Debug)] pub(crate) struct RawTransactionSender @@ -59,6 +62,29 @@ where Ok(SentTxInfo { nonce, tx_hash }) } + async fn cancel_transaction( + &self, + _tx_hash: H256, + nonce: U256, + to: Address, + gas_fees: GasFees, + ) -> Result { + let tx = create_hard_cancel_tx(self.provider.address(), to, nonce, gas_fees); + + let (raw_tx, _) = fill_and_sign(&self.provider, tx).await?; + + let tx_hash = self + .provider + .provider() + .request("eth_sendRawTransaction", (raw_tx,)) + .await?; + + Ok(CancelTxInfo { + tx_hash, + soft_cancelled: false, + }) + } + async fn get_transaction_status(&self, tx_hash: H256) -> Result { let tx = self .provider diff --git a/crates/builder/src/transaction_tracker.rs b/crates/builder/src/transaction_tracker.rs index b5a452615..775ec0905 100644 --- a/crates/builder/src/transaction_tracker.rs +++ b/crates/builder/src/transaction_tracker.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use anyhow::{bail, Context}; use async_trait::async_trait; -use ethers::types::{transaction::eip2718::TypedTransaction, H256, U256}; +use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256, U256}; use rundler_provider::Provider; use rundler_sim::ExpectedStorage; use rundler_types::GasFees; @@ -47,6 +47,16 @@ pub(crate) trait TransactionTracker: Send + Sync + 'static { expected_stroage: &ExpectedStorage, ) -> TransactionTrackerResult; + /// Cancel the latest transaction in the tracker. + /// + /// Returns: An option containing the hash of the transaction that was used to cancel. If the option + /// is empty, then either no transaction was cancelled or the cancellation was a "soft-cancel." + async fn cancel_transaction( + &mut self, + to: Address, + estimated_fees: GasFees, + ) -> TransactionTrackerResult>; + /// Checks: /// 1. One of our transactions mines (not necessarily the one just sent). /// 2. All our send transactions have dropped. @@ -260,6 +270,54 @@ where Ok(sent_tx.tx_hash) } + async fn cancel_transaction( + &mut self, + to: Address, + estimated_fees: GasFees, + ) -> TransactionTrackerResult> { + let (tx_hash, gas_fees) = match self.transactions.last() { + Some(tx) => { + let increased_fees = tx + .gas_fees + .increase_by_percent(self.settings.replacement_fee_percent_increase); + let gas_fees = GasFees { + max_fee_per_gas: increased_fees + .max_fee_per_gas + .max(estimated_fees.max_fee_per_gas), + max_priority_fee_per_gas: increased_fees + .max_priority_fee_per_gas + .max(estimated_fees.max_priority_fee_per_gas), + }; + (tx.tx_hash, gas_fees) + } + None => (H256::zero(), estimated_fees), + }; + + let cancel_info = self + .sender + .cancel_transaction(tx_hash, self.nonce, to, gas_fees) + .await?; + + if cancel_info.soft_cancelled { + // If the transaction was soft-cancelled. Reset internal state. + self.reset().await; + return Ok(None); + } + + info!("Sent cancellation tx {:?}", cancel_info.tx_hash); + + self.transactions.push(PendingTransaction { + tx_hash: cancel_info.tx_hash, + gas_fees, + attempt_number: self.attempt_count, + }); + + self.has_dropped = false; + self.attempt_count += 1; + self.update_metrics(); + Ok(Some(cancel_info.tx_hash)) + } + async fn check_for_update(&mut self) -> TransactionTrackerResult> { let external_nonce = self.get_external_nonce().await?; if self.nonce < external_nonce { @@ -345,6 +403,9 @@ impl From for TransactionTrackerError { TxSenderError::ReplacementUnderpriced => { TransactionTrackerError::ReplacementUnderpriced } + TxSenderError::SoftCancelFailed => { + TransactionTrackerError::Other(anyhow::anyhow!("soft cancel failed")) + } TxSenderError::Other(e) => TransactionTrackerError::Other(e), } } From 43fa7ad21db2f95ad2fd98db51a8451e249942c1 Mon Sep 17 00:00:00 2001 From: dancoombs Date: Tue, 11 Jun 2024 14:59:56 -0500 Subject: [PATCH 2/2] refactor(builder): large refactor of bundle sender state machine and metrics --- crates/builder/src/bundle_sender.rs | 832 +++++++++++++++------------- crates/builder/src/server/local.rs | 3 - 2 files changed, 462 insertions(+), 373 deletions(-) diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index 67375f0c5..8a7a37af2 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -57,10 +57,11 @@ pub(crate) struct BundleSenderImpl { beneficiary: Address, proposer: P, entry_point: E, - transaction_tracker: T, + transaction_tracker: Option, pool: C, settings: Settings, event_sender: broadcast::Sender>, + metrics: BuilderMetrics, _uo_type: PhantomData, } @@ -91,9 +92,6 @@ pub enum SendBundleResult { tx_hash: H256, }, NoOperationsInitially, - NoOperationsAfterFeeIncreases { - attempt_number: u64, - }, StalledAtMaxFeeIncreases, Error(anyhow::Error), } @@ -124,314 +122,23 @@ where /// next one. #[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), builder_index = self.builder_index))] async fn send_bundles_in_loop(mut self) -> anyhow::Result<()> { - // State of the sender loop - enum State { - // Building a bundle, optionally waiting for a trigger to send it - // (wait_for_trigger, fee_increase_count) - Building(bool, u64), - // Waiting for a bundle to be mined - // (wait_until_block, fee_increase_count) - Pending(u64, u64), - // Cancelling the last transaction - // (fee_increase_count) - Cancelling(u64), - // Waiting for a cancellation transaction to be mined - // (wait_until_block, fee_increase_count) - CancelPending(u64, u64), - } - - // initial state - let mut state = State::Building(true, 0); - - // response to manual caller - let mut send_bundle_response = None; - let mut send_bundle_result = None; - // trigger for sending bundles - let mut sender_trigger = BundleSenderTrigger::new( + let sender_trigger = BundleSenderTrigger::new( &self.pool, self.bundle_action_receiver.take().unwrap(), Duration::from_millis(self.chain_spec.bundle_max_send_interval_millis), ) .await?; - loop { - match state { - State::Building(wait_for_trigger, fee_increase_count) => { - if wait_for_trigger { - send_bundle_response = sender_trigger.wait_for_trigger().await?; - - // process any nonce updates, ignore result - self.check_for_transaction_update().await; - } - - // send bundle - debug!( - "Building bundle on block {}", - sender_trigger.last_block.block_number - ); - let result = self.send_bundle(fee_increase_count).await; - - // handle result - match result { - Ok(SendBundleAttemptResult::Success) => { - // sent the bundle - info!("Bundle sent successfully"); - state = State::Pending( - sender_trigger.last_block.block_number - + self.settings.max_blocks_to_wait_for_mine, - 0, - ); - } - Ok(SendBundleAttemptResult::NoOperations) => { - debug!("No operations in bundle"); - - if fee_increase_count > 0 { - warn!("Abandoning bundle after fee increases {fee_increase_count}, no operations available"); - BuilderMetrics::increment_bundle_txns_abandoned( - self.builder_index, - self.entry_point.address(), - ); - send_bundle_result = - Some(SendBundleResult::NoOperationsAfterFeeIncreases { - attempt_number: fee_increase_count, - }); - - // abandon the bundle by resetting the tracker and starting a new bundle process - // If the node we are using still has the transaction in the mempool, its - // possible we will get a `ReplacementUnderpriced` on the next iteration - // and will start a cancellation. - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } else { - debug!("No operations available, waiting for next trigger"); - send_bundle_result = Some(SendBundleResult::NoOperationsInitially); - state = State::Building(true, 0); - } - } - Ok(SendBundleAttemptResult::NonceTooLow) => { - // reset the transaction tracker and try again - info!("Nonce too low, starting new bundle attempt"); - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } - Ok(SendBundleAttemptResult::ReplacementUnderpriced) => { - info!( - "Replacement transaction underpriced, entering cancellation loop" - ); - self.transaction_tracker.reset().await; - state = State::Cancelling(0); - } - Err(error) => { - error!("Bundle send error {error:?}"); - BuilderMetrics::increment_bundle_txns_failed( - self.builder_index, - self.entry_point.address(), - ); - self.transaction_tracker.reset().await; - send_bundle_result = Some(SendBundleResult::Error(error)); - state = State::Building(true, 0); - } - } - } - State::Pending(until, fee_increase_count) => { - sender_trigger.wait_for_block().await?; - - // check for transaction update - if let Some(update) = self.check_for_transaction_update().await { - match update { - TrackerUpdate::Mined { - block_number, - attempt_number, - gas_limit, - gas_used, - tx_hash, - nonce, - .. - } => { - // mined! - info!("Bundle transaction mined"); - BuilderMetrics::process_bundle_txn_success( - self.builder_index, - self.entry_point.address(), - gas_limit, - gas_used, - ); - self.emit(BuilderEvent::transaction_mined( - self.builder_index, - tx_hash, - nonce.low_u64(), - block_number, - )); - send_bundle_result = Some(SendBundleResult::Success { - block_number, - attempt_number, - tx_hash, - }); - state = State::Building(true, 0); - } - TrackerUpdate::LatestTxDropped { nonce } => { - // try again, don't wait for trigger, re-estimate fees - info!("Latest transaction dropped, starting new bundle attempt"); - self.emit(BuilderEvent::latest_transaction_dropped( - self.builder_index, - nonce.low_u64(), - )); - BuilderMetrics::increment_bundle_txns_dropped( - self.builder_index, - self.entry_point.address(), - ); - - // force reset the transaction tracker - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } - TrackerUpdate::NonceUsedForOtherTx { nonce } => { - // try again, don't wait for trigger, re-estimate fees - info!("Nonce used externally, starting new bundle attempt"); - self.emit(BuilderEvent::nonce_used_for_other_transaction( - self.builder_index, - nonce.low_u64(), - )); - BuilderMetrics::increment_bundle_txns_nonce_used( - self.builder_index, - self.entry_point.address(), - ); - state = State::Building(true, 0); - } - } - } else if sender_trigger.last_block().block_number >= until { - // start replacement, don't wait for trigger. Continue - // to attempt until there are no longer any UOs priced high enough - // to bundle. - info!( - "Not mined after {} blocks, increasing fees, attempt: {}", - self.settings.max_blocks_to_wait_for_mine, - fee_increase_count + 1 - ); - BuilderMetrics::increment_bundle_txn_fee_increases( - self.builder_index, - self.entry_point.address(), - ); - state = State::Building(false, fee_increase_count + 1); - } - } - State::Cancelling(fee_increase_count) => { - // cancel the transaction - info!("Cancelling last transaction"); - - let (estimated_fees, _) = self - .proposer - .estimate_gas_fees(None) - .await - .unwrap_or_default(); - - let cancel_res = self - .transaction_tracker - .cancel_transaction(self.entry_point.address(), estimated_fees) - .await; - - match cancel_res { - Ok(Some(_)) => { - info!("Cancellation transaction sent, waiting for confirmation"); - BuilderMetrics::increment_cancellation_txns_sent( - self.builder_index, - self.entry_point.address(), - ); - - state = State::CancelPending( - sender_trigger.last_block.block_number - + self.settings.max_blocks_to_wait_for_mine, - fee_increase_count, - ); - } - Ok(None) => { - info!("Soft cancellation or no transaction to cancel, starting new bundle attempt"); - BuilderMetrics::increment_soft_cancellations( - self.builder_index, - self.entry_point.address(), - ); - - state = State::Building(true, 0); - } - Err(TransactionTrackerError::ReplacementUnderpriced) => { - info!("Replacement transaction underpriced during cancellation, trying again"); - state = State::Cancelling(fee_increase_count + 1); - } - Err(TransactionTrackerError::NonceTooLow) => { - // reset the transaction tracker and try again - info!("Nonce too low during cancellation, starting new bundle attempt"); - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } - Err(e) => { - error!("Failed to cancel transaction, moving back to building state: {e:#?}"); - BuilderMetrics::increment_cancellation_txns_failed( - self.builder_index, - self.entry_point.address(), - ); - state = State::Building(true, 0); - } - } - } - State::CancelPending(until, fee_increase_count) => { - sender_trigger.wait_for_block().await?; - - // check for transaction update - if let Some(update) = self.check_for_transaction_update().await { - match update { - TrackerUpdate::Mined { .. } => { - // mined - info!("Cancellation transaction mined"); - BuilderMetrics::increment_cancellation_txns_mined( - self.builder_index, - self.entry_point.address(), - ); - } - TrackerUpdate::LatestTxDropped { .. } => { - // If a cancellation gets dropped, move to bundling state as there is no - // longer a pending transaction - info!( - "Cancellation transaction dropped, starting new bundle attempt" - ); - // force reset the transaction tracker - self.transaction_tracker.reset().await; - } - TrackerUpdate::NonceUsedForOtherTx { .. } => { - // If a nonce is used externally, move to bundling state as there is no longer - // a pending transaction - info!("Nonce used externally while cancelling, starting new bundle attempt"); - } - } - - state = State::Building(true, 0); - } else if sender_trigger.last_block().block_number >= until { - if fee_increase_count >= self.settings.max_fee_increases { - // abandon the cancellation - warn!("Abandoning cancellation after max fee increases {fee_increase_count}, starting new bundle attempt"); - // force reset the transaction tracker - self.transaction_tracker.reset().await; - state = State::Building(true, 0); - } else { - // start replacement, don't wait for trigger - info!( - "Cancellation not mined after {} blocks, increasing fees, attempt: {}", - self.settings.max_blocks_to_wait_for_mine, - fee_increase_count + 1 - ); - state = State::Cancelling(fee_increase_count + 1); - } - } - } - } + // initial state + let mut state = + SenderMachineState::new(sender_trigger, self.transaction_tracker.take().unwrap()); - // send result to manual caller - if let Some(res) = send_bundle_result.take() { - if let Some(r) = send_bundle_response.take() { - if r.send(res).is_err() { - error!("Failed to send bundle result to manual caller"); - } - } + loop { + if let Err(e) = self.step_state(&mut state).await { + error!("Error in bundle sender loop: {e:#?}"); + self.metrics.increment_state_machine_errors(); + state.reset(); } } } @@ -464,48 +171,264 @@ where chain_spec, beneficiary, proposer, - entry_point, - transaction_tracker, + transaction_tracker: Some(transaction_tracker), pool, settings, event_sender, + metrics: BuilderMetrics { + builder_index, + entry_point: entry_point.address(), + }, + entry_point, _uo_type: PhantomData, } } - async fn check_for_transaction_update(&mut self) -> Option { - let update = self.transaction_tracker.check_for_update().await; - let update = match update { - Ok(update) => update?, - Err(error) => { - error!("Failed to check for transaction updates: {error:#?}"); - return None; + async fn step_state(&mut self, state: &mut SenderMachineState) -> anyhow::Result<()> { + let tracker_update = state.wait_for_trigger().await?; + + match state.inner { + InnerState::Building(building_state) => { + self.handle_building_state(state, building_state).await?; } - }; + InnerState::Pending(pending_state) => { + self.handle_pending_state(state, pending_state, tracker_update) + .await?; + } + InnerState::Cancelling(cancelling_state) => { + self.handle_cancelling_state(state, cancelling_state) + .await?; + } + InnerState::CancelPending(cancel_pending_state) => { + self.handle_cancel_pending_state(state, cancel_pending_state, tracker_update) + .await?; + } + } - match update { - TrackerUpdate::Mined { - tx_hash, - block_number, - attempt_number, - nonce, - .. - } => { - if attempt_number == 0 { - info!("Transaction with hash {tx_hash:?}, nonce {nonce:?}, landed in block {block_number}"); + Ok(()) + } + + async fn handle_building_state( + &mut self, + state: &mut SenderMachineState, + inner: BuildingState, + ) -> anyhow::Result<()> { + // send bundle + let block_number = state.block_number(); + debug!("Building bundle on block {}", block_number); + let result = self.send_bundle(state, inner.fee_increase_count).await; + + // handle result + match result { + Ok(SendBundleAttemptResult::Success) => { + // sent the bundle + info!("Bundle sent successfully"); + state.update(InnerState::Pending(inner.to_pending( + block_number + self.settings.max_blocks_to_wait_for_mine, + ))); + } + Ok(SendBundleAttemptResult::NoOperations) => { + debug!("No operations to bundle"); + if inner.fee_increase_count > 0 { + warn!( + "Abandoning bundle after fee increases {}, no operations available", + inner.fee_increase_count + ); + self.metrics.increment_bundle_txns_abandoned(); + + // abandon the bundle by starting a new bundle process + // If the node we are using still has the transaction in the mempool, its + // possible we will get a `ReplacementUnderpriced` on the next iteration + // and will start a cancellation. + state.reset(); } else { - info!("Transaction with hash {tx_hash:?}, nonce {nonce:?}, landed in block {block_number} after increasing gas fees {attempt_number} time(s)"); + debug!("No operations available, waiting for next trigger"); + state.complete(Some(SendBundleResult::NoOperationsInitially)); } } - TrackerUpdate::LatestTxDropped { nonce } => { - info!("Previous transaction dropped by sender. Nonce: {nonce:?}"); + Ok(SendBundleAttemptResult::NonceTooLow) => { + // reset the transaction tracker and try again + info!("Nonce too low, starting new bundle attempt"); + state.reset(); } - TrackerUpdate::NonceUsedForOtherTx { nonce } => { - info!("Nonce used by external transaction. Nonce: {nonce:?}"); + Ok(SendBundleAttemptResult::ReplacementUnderpriced) => { + info!("Replacement transaction underpriced, entering cancellation loop"); + state.update(InnerState::Cancelling(inner.to_cancelling())); } - }; + Err(error) => { + error!("Bundle send error {error:?}"); + self.metrics.increment_bundle_txns_failed(); + let send_bundle_result = Some(SendBundleResult::Error(error)); + state.complete(send_bundle_result); + } + } + + Ok(()) + } - Some(update) + async fn handle_pending_state( + &mut self, + state: &mut SenderMachineState, + inner: PendingState, + tracker_update: Option, + ) -> anyhow::Result<()> { + if let Some(update) = tracker_update { + match update { + TrackerUpdate::Mined { + block_number, + attempt_number, + gas_limit, + gas_used, + tx_hash, + nonce, + .. + } => { + info!("Bundle transaction mined"); + self.metrics.process_bundle_txn_success(gas_limit, gas_used); + self.emit(BuilderEvent::transaction_mined( + self.builder_index, + tx_hash, + nonce.low_u64(), + block_number, + )); + let send_bundle_result = Some(SendBundleResult::Success { + block_number, + attempt_number, + tx_hash, + }); + state.complete(send_bundle_result); + } + TrackerUpdate::LatestTxDropped { nonce } => { + // try again, don't wait for trigger, re-estimate fees + info!("Latest transaction dropped, starting new bundle attempt"); + self.emit(BuilderEvent::latest_transaction_dropped( + self.builder_index, + nonce.low_u64(), + )); + self.metrics.increment_bundle_txns_dropped(); + state.reset(); + } + TrackerUpdate::NonceUsedForOtherTx { nonce } => { + // try again, don't wait for trigger, re-estimate fees + info!("Nonce used externally, starting new bundle attempt"); + self.emit(BuilderEvent::nonce_used_for_other_transaction( + self.builder_index, + nonce.low_u64(), + )); + self.metrics.increment_bundle_txns_nonce_used(); + state.reset(); + } + } + } else if state.block_number() >= inner.until { + // start replacement, don't wait for trigger. Continue + // to attempt until there are no longer any UOs priced high enough + // to bundle. + info!( + "Not mined after {} blocks, increasing fees, attempt: {}", + self.settings.max_blocks_to_wait_for_mine, + inner.fee_increase_count + 1 + ); + self.metrics.increment_bundle_txn_fee_increases(); + state.update(InnerState::Building(inner.to_building())) + } + + Ok(()) + } + + async fn handle_cancelling_state( + &mut self, + state: &mut SenderMachineState, + inner: CancellingState, + ) -> anyhow::Result<()> { + info!("Cancelling last transaction"); + + let (estimated_fees, _) = self + .proposer + .estimate_gas_fees(None) + .await + .unwrap_or_default(); + + let cancel_res = state + .transaction_tracker + .cancel_transaction(self.entry_point.address(), estimated_fees) + .await; + + match cancel_res { + Ok(Some(_)) => { + info!("Cancellation transaction sent, waiting for confirmation"); + self.metrics.increment_cancellation_txns_sent(); + + state.update(InnerState::CancelPending(inner.to_cancel_pending( + state.block_number() + self.settings.max_blocks_to_wait_for_mine, + ))); + } + Ok(None) => { + info!("Soft cancellation or no transaction to cancel, starting new bundle attempt"); + self.metrics.increment_soft_cancellations(); + state.reset(); + } + Err(TransactionTrackerError::ReplacementUnderpriced) => { + info!("Replacement transaction underpriced during cancellation, trying again"); + state.update(InnerState::Cancelling(inner.to_self())); + } + Err(TransactionTrackerError::NonceTooLow) => { + // reset the transaction tracker and try again + info!("Nonce too low during cancellation, starting new bundle attempt"); + state.reset(); + } + Err(e) => { + error!("Failed to cancel transaction, moving back to building state: {e:#?}"); + self.metrics.increment_cancellation_txns_failed(); + state.reset(); + } + } + + Ok(()) + } + + async fn handle_cancel_pending_state( + &mut self, + state: &mut SenderMachineState, + inner: CancelPendingState, + tracker_update: Option, + ) -> anyhow::Result<()> { + // check for transaction update + if let Some(update) = tracker_update { + match update { + TrackerUpdate::Mined { .. } => { + // mined + info!("Cancellation transaction mined"); + self.metrics.increment_cancellation_txns_mined(); + } + TrackerUpdate::LatestTxDropped { .. } => { + // If a cancellation gets dropped, move to bundling state as there is no + // longer a pending transaction + info!("Cancellation transaction dropped, starting new bundle attempt"); + } + TrackerUpdate::NonceUsedForOtherTx { .. } => { + // If a nonce is used externally, move to bundling state as there is no longer + // a pending transaction + info!("Nonce used externally while cancelling, starting new bundle attempt"); + } + } + state.reset(); + } else if state.block_number() >= inner.until { + if inner.fee_increase_count >= self.settings.max_fee_increases { + // abandon the cancellation + warn!("Abandoning cancellation after max fee increases {}, starting new bundle attempt", inner.fee_increase_count); + state.reset(); + } else { + // start replacement, don't wait for trigger + info!( + "Cancellation not mined after {} blocks, increasing fees, attempt: {}", + self.settings.max_blocks_to_wait_for_mine, + inner.fee_increase_count + 1 + ); + state.update(InnerState::Cancelling(inner.to_cancelling())); + } + } + + Ok(()) } /// Constructs a bundle and sends it to the entry point as a transaction. @@ -516,9 +439,10 @@ where /// are no ops that meet the fee requirements. async fn send_bundle( &mut self, + state: &mut SenderMachineState, fee_increase_count: u64, ) -> anyhow::Result { - let (nonce, required_fees) = self.transaction_tracker.get_nonce_and_required_fees()?; + let (nonce, required_fees) = state.transaction_tracker.get_nonce_and_required_fees()?; let Some(bundle_tx) = self .get_bundle_tx(nonce, required_fees, fee_increase_count > 0) @@ -539,9 +463,9 @@ where op_hashes, } = bundle_tx; - BuilderMetrics::increment_bundle_txns_sent(self.builder_index, self.entry_point.address()); + self.metrics.increment_bundle_txns_sent(); - let send_result = self + let send_result = state .transaction_tracker .send_transaction(tx.clone(), &expected_storage) .await; @@ -567,10 +491,7 @@ where Ok(SendBundleAttemptResult::NonceTooLow) } Err(TransactionTrackerError::ReplacementUnderpriced) => { - BuilderMetrics::increment_bundle_txn_replacement_underpriced( - self.builder_index, - self.entry_point.address(), - ); + self.metrics.increment_bundle_txn_replacement_underpriced(); warn!("Replacement transaction underpriced"); Ok(SendBundleAttemptResult::ReplacementUnderpriced) } @@ -669,6 +590,174 @@ where } } +struct SenderMachineState { + trigger: BundleSenderTrigger, + transaction_tracker: T, + send_bundle_response: Option>, + inner: InnerState, + requires_reset: bool, +} + +impl SenderMachineState { + fn new(trigger: BundleSenderTrigger, transaction_tracker: T) -> Self { + Self { + trigger, + transaction_tracker, + send_bundle_response: None, + inner: InnerState::new(), + requires_reset: false, + } + } + + fn update(&mut self, inner: InnerState) { + self.inner = inner; + } + + // resets the state machine to the initial state, doesn't wait for next trigger + fn reset(&mut self) { + self.requires_reset = true; + let building_state = BuildingState { + wait_for_trigger: false, + fee_increase_count: 0, + }; + self.inner = InnerState::Building(building_state); + } + + fn complete(&mut self, result: Option) { + if let Some(result) = result { + if let Some(r) = self.send_bundle_response.take() { + if r.send(result).is_err() { + error!("Failed to send bundle result to manual caller"); + } + } + } + self.inner = InnerState::new(); + } + + async fn wait_for_trigger(&mut self) -> anyhow::Result> { + if self.requires_reset { + self.transaction_tracker.reset().await; + self.requires_reset = false; + } + + match &self.inner { + InnerState::Building(s) => { + if !s.wait_for_trigger { + return Ok(None); + } + + self.send_bundle_response = self.trigger.wait_for_trigger().await?; + self.transaction_tracker + .check_for_update() + .await + .map_err(|e| anyhow::anyhow!("transaction tracker update error {e:?}")) + } + InnerState::Pending(..) | InnerState::CancelPending(..) => { + self.trigger.wait_for_block().await?; + self.transaction_tracker + .check_for_update() + .await + .map_err(|e| anyhow::anyhow!("transaction tracker update error {e:?}")) + } + InnerState::Cancelling(..) => Ok(None), + } + } + + fn block_number(&self) -> u64 { + self.trigger.last_block().block_number + } +} + +// State of the sender loop +enum InnerState { + // Building a bundle, optionally waiting for a trigger to send it + Building(BuildingState), + // Waiting for a bundle to be mined + Pending(PendingState), + // Cancelling the last transaction + Cancelling(CancellingState), + // Waiting for a cancellation transaction to be mined + CancelPending(CancelPendingState), +} + +impl InnerState { + fn new() -> Self { + InnerState::Building(BuildingState { + wait_for_trigger: true, + fee_increase_count: 0, + }) + } +} + +#[derive(Debug, Clone, Copy)] +struct BuildingState { + wait_for_trigger: bool, + fee_increase_count: u64, +} + +impl BuildingState { + fn to_pending(self, until: u64) -> PendingState { + PendingState { + until, + fee_increase_count: self.fee_increase_count, + } + } + + fn to_cancelling(self) -> CancellingState { + CancellingState { + fee_increase_count: 0, + } + } +} + +#[derive(Debug, Clone, Copy)] +struct PendingState { + until: u64, + fee_increase_count: u64, +} + +impl PendingState { + fn to_building(self) -> BuildingState { + BuildingState { + wait_for_trigger: true, + fee_increase_count: self.fee_increase_count + 1, + } + } +} + +#[derive(Debug, Clone, Copy)] +struct CancellingState { + fee_increase_count: u64, +} + +impl CancellingState { + fn to_self(mut self) -> Self { + self.fee_increase_count += 1; + self + } + + fn to_cancel_pending(self, until: u64) -> CancelPendingState { + CancelPendingState { + until, + fee_increase_count: self.fee_increase_count, + } + } +} + +#[derive(Debug, Clone, Copy)] +struct CancelPendingState { + until: u64, + fee_increase_count: u64, +} + +impl CancelPendingState { + fn to_cancelling(self) -> CancellingState { + CancellingState { + fee_increase_count: self.fee_increase_count + 1, + } + } +} + struct BundleSenderTrigger { bundling_mode: BundlingMode, block_rx: UnboundedReceiver, @@ -826,69 +915,72 @@ impl BundleSenderTrigger { } } -struct BuilderMetrics {} +#[derive(Debug, Clone)] +struct BuilderMetrics { + builder_index: u64, + entry_point: Address, +} impl BuilderMetrics { - fn increment_bundle_txns_sent(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_sent", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()) + fn increment_bundle_txns_sent(&self) { + metrics::counter!("builder_bundle_txns_sent", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()) .increment(1); } - fn process_bundle_txn_success( - builder_index: u64, - entry_point: Address, - gas_limit: Option, - gas_used: Option, - ) { - metrics::counter!("builder_bundle_txns_success", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn process_bundle_txn_success(&self, gas_limit: Option, gas_used: Option) { + metrics::counter!("builder_bundle_txns_success", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); if let Some(limit) = gas_limit { - metrics::counter!("builder_bundle_gas_limit", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(limit.as_u64()); + metrics::counter!("builder_bundle_gas_limit", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(limit.as_u64()); } if let Some(used) = gas_used { - metrics::counter!("builder_bundle_gas_used", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(used.as_u64()); + metrics::counter!("builder_bundle_gas_used", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(used.as_u64()); } } - fn increment_bundle_txns_dropped(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_dropped", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txns_dropped(&self) { + metrics::counter!("builder_bundle_txns_dropped", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } // used when we decide to stop trying a transaction - fn increment_bundle_txns_abandoned(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_abandoned", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txns_abandoned(&self) { + metrics::counter!("builder_bundle_txns_abandoned", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } // used when sending a transaction fails - fn increment_bundle_txns_failed(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_failed", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txns_failed(&self) { + metrics::counter!("builder_bundle_txns_failed", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); + } + + fn increment_bundle_txns_nonce_used(&self) { + metrics::counter!("builder_bundle_txns_nonce_used", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_bundle_txns_nonce_used(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_txns_nonce_used", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txn_fee_increases(&self) { + metrics::counter!("builder_bundle_fee_increases", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_bundle_txn_fee_increases(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_fee_increases", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_bundle_txn_replacement_underpriced(&self) { + metrics::counter!("builder_bundle_replacement_underpriced", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_bundle_txn_replacement_underpriced(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_bundle_replacement_underpriced", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_cancellation_txns_sent(&self) { + metrics::counter!("builder_cancellation_txns_sent", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_cancellation_txns_sent(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_cancellation_txns_sent", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_cancellation_txns_mined(&self) { + metrics::counter!("builder_cancellation_txns_mined", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_cancellation_txns_mined(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_cancellation_txns_mined", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_soft_cancellations(&self) { + metrics::counter!("builder_soft_cancellations", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_soft_cancellations(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_soft_cancellations", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_cancellation_txns_failed(&self) { + metrics::counter!("builder_cancellation_txns_failed", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } - fn increment_cancellation_txns_failed(builder_index: u64, entry_point: Address) { - metrics::counter!("builder_cancellation_txns_failed", "entry_point" => entry_point.to_string(), "builder_index" => builder_index.to_string()).increment(1); + fn increment_state_machine_errors(&self) { + metrics::counter!("builder_state_machine_errors", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1); } } diff --git a/crates/builder/src/server/local.rs b/crates/builder/src/server/local.rs index 76bc0cf87..fa6758ab3 100644 --- a/crates/builder/src/server/local.rs +++ b/crates/builder/src/server/local.rs @@ -186,9 +186,6 @@ impl LocalBuilderServerRunner { SendBundleResult::NoOperationsInitially => { Err(anyhow::anyhow!("no ops to send").into()) }, - SendBundleResult::NoOperationsAfterFeeIncreases { .. } => { - Err(anyhow::anyhow!("bundle initially had operations, but after increasing gas fees it was empty").into()) - }, SendBundleResult::StalledAtMaxFeeIncreases => Err(anyhow::anyhow!("stalled at max fee increases").into()), SendBundleResult::Error(e) => Err(anyhow::anyhow!("send bundle error: {e:?}").into()), }