diff --git a/crates/builder/src/bundle_proposer.rs b/crates/builder/src/bundle_proposer.rs index 01b554fde..49375116b 100644 --- a/crates/builder/src/bundle_proposer.rs +++ b/crates/builder/src/bundle_proposer.rs @@ -150,6 +150,9 @@ where .map_err(anyhow::Error::from), self.fee_estimator.required_bundle_fees(required_fees) )?; + if ops.is_empty() { + return Ok(Bundle::default()); + } tracing::debug!("Starting bundle proposal with {} ops", ops.len()); @@ -177,6 +180,9 @@ where .collect::>(); tracing::debug!("Bundle proposal after fee limit had {} ops", ops.len()); + if ops.is_empty() { + return Ok(Bundle::default()); + } // (2) Limit the amount of operations for simulation let (ops, gas_limit) = self.limit_user_operations_for_simulation(ops); diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index 892c17419..cae080929 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -20,19 +20,22 @@ use futures_util::StreamExt; use rundler_provider::{BundleHandler, EntryPoint}; use rundler_sim::ExpectedStorage; use rundler_types::{ - builder::BundlingMode, chain::ChainSpec, pool::Pool, EntityUpdate, GasFees, UserOperation, + builder::BundlingMode, + chain::ChainSpec, + pool::{NewHead, Pool}, + EntityUpdate, GasFees, UserOperation, }; use rundler_utils::emit::WithEntryPoint; use tokio::{ join, - sync::{broadcast, mpsc, oneshot}, + sync::{broadcast, mpsc, mpsc::UnboundedReceiver, oneshot}, }; -use tracing::{debug, error, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, warn}; use crate::{ bundle_proposer::BundleProposer, emit::{BuilderEvent, BundleTxDetails}, - transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker}, + transaction_tracker::{TrackerUpdate, TransactionTracker, TransactionTrackerError}, }; #[async_trait] @@ -42,14 +45,14 @@ pub(crate) trait BundleSender: Send + Sync + 'static { #[derive(Debug)] pub(crate) struct Settings { - pub(crate) replacement_fee_percent_increase: u64, pub(crate) max_fee_increases: u64, + pub(crate) max_blocks_to_wait_for_mine: u64, } #[derive(Debug)] pub(crate) struct BundleSenderImpl { builder_index: u64, - bundle_action_receiver: mpsc::Receiver, + bundle_action_receiver: Option>, chain_spec: ChainSpec, beneficiary: Address, proposer: P, @@ -77,6 +80,9 @@ pub struct SendBundleRequest { pub responder: oneshot::Sender, } +/// Response to a `SendBundleRequest` after +/// going through a full cycle of bundling, sending, +/// and waiting for the transaction to be mined. #[derive(Debug)] pub enum SendBundleResult { Success { @@ -86,13 +92,24 @@ pub enum SendBundleResult { }, NoOperationsInitially, NoOperationsAfterFeeIncreases { - initial_op_count: usize, attempt_number: u64, }, StalledAtMaxFeeIncreases, Error(anyhow::Error), } +// Internal result of attempting to send a bundle. +enum SendBundleAttemptResult { + // The bundle was successfully sent + Success, + // The bundle was empty + NoOperations, + // Replacement Underpriced + ReplacementUnderpriced, + // Nonce too low + NonceTooLow, +} + #[async_trait] impl BundleSender for BundleSenderImpl where @@ -107,139 +124,183 @@ where /// next one. #[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), builder_index = self.builder_index))] async fn send_bundles_in_loop(mut self) -> anyhow::Result<()> { - let Ok(mut new_heads) = self.pool.subscribe_new_heads().await else { - error!("Failed to subscribe to new blocks"); - bail!("failed to subscribe to new blocks"); - }; + // State of the sender loop + enum State { + // Building a bundle, optionally waiting for a trigger to send it + // (wait_for_trigger, fee_increase_count) + Building(bool, u64), + // Waiting for a bundle to be mined + // (wait_until_block, fee_increase_count) + Pending(u64, u64), + } - // The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one. - // This task is used to consume the new heads and place them onto a channel that can be synchronously - // consumed until the latest block is reached. - let (tx, mut rx) = mpsc::unbounded_channel(); - tokio::spawn(async move { - loop { - match new_heads.next().await { - Some(b) => { - if tx.send(b).is_err() { - error!("Failed to buffer new block for bundle sender"); - return; - } - } - None => { - error!("Block stream ended"); - return; - } - } - } - }); + // initial state + let mut state = State::Building(true, 0); + + // response to manual caller + let mut send_bundle_response = None; + let mut send_bundle_result = None; + + // trigger for sending bundles + let mut sender_trigger = BundleSenderTrigger::new( + &self.pool, + self.bundle_action_receiver.take().unwrap(), + Duration::from_millis(self.chain_spec.bundle_max_send_interval_millis), + ) + .await?; - let mut bundling_mode = BundlingMode::Auto; - let mut timer = tokio::time::interval(Duration::from_millis( - self.chain_spec.bundle_max_send_interval_millis, - )); loop { - let mut send_bundle_response: Option> = None; - let mut last_block = None; + match state { + State::Building(wait_for_trigger, fee_increase_count) => { + if wait_for_trigger { + send_bundle_response = sender_trigger.wait_for_trigger().await?; - // 3 triggers for loop logic: - // 1 - new block - // - If auto mode, send next bundle - // 2 - timer tick - // - If auto mode, send next bundle - // 3 - action recv - // - If change mode, change and restart loop - // - If send bundle and manual mode, send next bundle - last_block = tokio::select! { - b = rx.recv() => { - match bundling_mode { - BundlingMode::Manual => continue, - BundlingMode::Auto => b + // process any nonce updates, ignore result + self.check_for_transaction_update().await; } - }, - _ = timer.tick() => { - match bundling_mode { - BundlingMode::Manual => continue, - BundlingMode::Auto => Some(last_block.unwrap_or_default()) - } - }, - a = self.bundle_action_receiver.recv() => { - match a { - Some(BundleSenderAction::ChangeMode(mode)) => { - debug!("chainging bundling mode to {mode:?}"); - bundling_mode = mode; - continue; - }, - Some(BundleSenderAction::SendBundle(r)) => { - match bundling_mode { - BundlingMode::Manual => { - send_bundle_response = Some(r.responder); - Some(last_block.unwrap_or_default()) - }, - BundlingMode::Auto => { - error!("Received bundle send action while in auto mode, ignoring"); - continue; - } + + // send bundle + debug!( + "Building bundle on block {}", + sender_trigger.last_block.block_number + ); + let result = self.send_bundle(fee_increase_count).await; + + // handle result + match result { + Ok(SendBundleAttemptResult::Success) => { + // sent the bundle + info!("Bundle sent successfully"); + state = State::Pending( + sender_trigger.last_block.block_number + + self.settings.max_blocks_to_wait_for_mine, + 0, + ); + } + Ok(SendBundleAttemptResult::NoOperations) => { + debug!("No operations in bundle"); + + if fee_increase_count > 0 { + // TODO(danc): when abandoning we tend to get "stuck" where we have a pending bundle + // transaction that isn't landing. We should move to a new state where we track the + // pending transaction and "cancel" it if it doesn't land after a certain number of blocks. + + warn!("Abandoning bundle after fee increases {fee_increase_count}, no operations available, waiting for next trigger"); + BuilderMetrics::increment_bundle_txns_abandoned( + self.builder_index, + self.entry_point.address(), + ); + self.transaction_tracker.reset().await; + send_bundle_result = + Some(SendBundleResult::NoOperationsAfterFeeIncreases { + attempt_number: fee_increase_count, + }) + } else { + debug!("No operations available, waiting for next trigger"); + send_bundle_result = Some(SendBundleResult::NoOperationsInitially); } - }, - None => { - error!("Bundle action recv closed"); - bail!("Bundle action recv closed"); + + state = State::Building(true, 0); } - } - } - }; + Ok(SendBundleAttemptResult::NonceTooLow) => { + // reset the transaction tracker and try again + self.transaction_tracker.reset().await; + state = State::Building(true, 0); + } + Ok(SendBundleAttemptResult::ReplacementUnderpriced) => { + // TODO(danc): handle replacement underpriced + // move to the cancellation state - // Consume any other blocks that may have been buffered up - loop { - match rx.try_recv() { - Ok(b) => { - last_block = Some(b); - } - Err(mpsc::error::TryRecvError::Empty) => { - break; - } - Err(mpsc::error::TryRecvError::Disconnected) => { - error!("Block stream closed"); - bail!("Block stream closed"); + // for now: reset the transaction tracker and try again + self.transaction_tracker.reset().await; + state = State::Building(true, 0); + } + Err(error) => { + error!("Bundle send error {error:?}"); + BuilderMetrics::increment_bundle_txns_failed( + self.builder_index, + self.entry_point.address(), + ); + self.transaction_tracker.reset().await; + send_bundle_result = Some(SendBundleResult::Error(error)); + state = State::Building(true, 0); + } } } - } + State::Pending(until, fee_increase_count) => { + sender_trigger.wait_for_block().await?; + + // check for transaction update + if let Some(update) = self.check_for_transaction_update().await { + match update { + TrackerUpdate::Mined { + block_number, + attempt_number, + tx_hash, + .. + } => { + // mined! + info!("Bundle transaction mined"); + send_bundle_result = Some(SendBundleResult::Success { + block_number, + attempt_number, + tx_hash, + }); + state = State::Building(true, 0); + } + TrackerUpdate::LatestTxDropped { .. } => { + // try again, don't wait for trigger, re-estimate fees + info!("Latest transaction dropped, starting new bundle attempt"); - // Wait for new block. Block number doesn't matter as the pool will only notify of new blocks - // after the pool has updated its state. The bundle will be formed using the latest pool state - // and can land in the next block - self.check_for_and_log_transaction_update().await; - let result = self.send_bundle_with_increasing_gas_fees().await; - match &result { - SendBundleResult::Success { - block_number, - attempt_number, - tx_hash, - } => - if *attempt_number == 0 { - info!("Bundle with hash {tx_hash:?} landed in block {block_number}"); - } else { - info!("Bundle with hash {tx_hash:?} landed in block {block_number} after increasing gas fees {attempt_number} time(s)"); + // force reset the transaction tracker + self.transaction_tracker.reset().await; + + state = State::Building(true, 0); + } + TrackerUpdate::NonceUsedForOtherTx { .. } => { + // try again, don't wait for trigger, re-estimate fees + info!("Nonce used externally, starting new bundle attempt"); + state = State::Building(true, 0); + } + } + } else if sender_trigger.last_block().block_number >= until { + if fee_increase_count >= self.settings.max_fee_increases { + // TODO(danc): same "stuck" issue here on abandonment + // this abandon is likely to lead to "transaction underpriced" errors + // Instead, move to cancellation state. + + warn!("Abandoning bundle after max fee increases {fee_increase_count}"); + BuilderMetrics::increment_bundle_txns_abandoned( + self.builder_index, + self.entry_point.address(), + ); + self.transaction_tracker.reset().await; + state = State::Building(true, 0); + } else { + // start replacement, don't wait for trigger + info!( + "Not mined after {} blocks, increasing fees, attempt: {}", + self.settings.max_blocks_to_wait_for_mine, + fee_increase_count + 1 + ); + BuilderMetrics::increment_bundle_txn_fee_increases( + self.builder_index, + self.entry_point.address(), + ); + state = State::Building(false, fee_increase_count + 1); + } } - SendBundleResult::NoOperationsInitially => trace!("No ops to send at block {}", last_block.unwrap_or_default().block_number), - SendBundleResult::NoOperationsAfterFeeIncreases { - initial_op_count, - attempt_number, - } => info!("Bundle initially had {initial_op_count} operations, but after increasing gas fees {attempt_number} time(s) it was empty"), - SendBundleResult::StalledAtMaxFeeIncreases => warn!("Bundle failed to mine after {} fee increases", self.settings.max_fee_increases), - SendBundleResult::Error(error) => { - BuilderMetrics::increment_bundle_txns_failed(self.builder_index, self.entry_point.address()); - error!("Failed to send bundle. Will retry next block: {error:#?}"); } } - if let Some(t) = send_bundle_response.take() { - if t.send(result).is_err() { - error!("Failed to send bundle result to manual caller"); + // send result to manual caller + if let Some(res) = send_bundle_result.take() { + if let Some(r) = send_bundle_response.take() { + if r.send(res).is_err() { + error!("Failed to send bundle result to manual caller"); + } } } - - timer.reset(); } } } @@ -267,7 +328,7 @@ where ) -> Self { Self { builder_index, - bundle_action_receiver, + bundle_action_receiver: Some(bundle_action_receiver), chain_spec, beneficiary, proposer, @@ -280,18 +341,17 @@ where } } - async fn check_for_and_log_transaction_update(&self) { - let update = self.transaction_tracker.check_for_update_now().await; + async fn check_for_transaction_update(&mut self) -> Option { + let update = self.transaction_tracker.check_for_update().await; let update = match update { - Ok(update) => update, + Ok(update) => update?, Err(error) => { error!("Failed to check for transaction updates: {error:#?}"); - return; + return None; } }; - let Some(update) = update else { - return; - }; + + // process update before returning match update { TrackerUpdate::Mined { tx_hash, @@ -299,6 +359,7 @@ where attempt_number, gas_limit, gas_used, + nonce, .. } => { BuilderMetrics::increment_bundle_txns_success( @@ -316,8 +377,13 @@ where } else { info!("Bundle with hash {tx_hash:?} landed in block {block_number} after increasing gas fees {attempt_number} time(s)"); } + self.emit(BuilderEvent::transaction_mined( + self.builder_index, + tx_hash, + nonce.low_u64(), + block_number, + )); } - TrackerUpdate::StillPendingAfterWait => (), TrackerUpdate::LatestTxDropped { nonce } => { self.emit(BuilderEvent::latest_transaction_dropped( self.builder_index, @@ -338,189 +404,84 @@ where self.builder_index, self.entry_point.address(), ); - info!("Nonce used by external transaction") - } - TrackerUpdate::ReplacementUnderpriced => { - BuilderMetrics::increment_bundle_txn_replacement_underpriced( - self.builder_index, - self.entry_point.address(), - ); - info!("Replacement transaction underpriced") + info!("Nonce used by external transaction"); } }; + + Some(update) } - /// Constructs a bundle and sends it to the entry point as a transaction. If - /// the bundle fails to be mined after - /// `settings.max_blocks_to_wait_for_mine` blocks, increases the gas fees by - /// enough to send a replacement transaction, then constructs a new bundle - /// using the new, higher gas requirements. Continues to retry with higher - /// gas costs until one of the following happens: + /// Constructs a bundle and sends it to the entry point as a transaction. /// - /// 1. A transaction succeeds (not necessarily the most recent one) - /// 2. The gas fees are high enough that the bundle is empty because there + /// Returns empty if: + /// - There are no ops available to bundle initially. + /// - The gas fees are high enough that the bundle is empty because there /// are no ops that meet the fee requirements. - /// 3. The transaction has not succeeded after `settings.max_fee_increases` - /// replacements. - async fn send_bundle_with_increasing_gas_fees(&self) -> SendBundleResult { - let result = self.send_bundle_with_increasing_gas_fees_inner().await; - match result { - Ok(result) => result, - Err(error) => SendBundleResult::Error(error), - } - } + async fn send_bundle( + &mut self, + fee_increase_count: u64, + ) -> anyhow::Result { + let (nonce, required_fees) = self.transaction_tracker.get_nonce_and_required_fees()?; + + let Some(bundle_tx) = self + .get_bundle_tx(nonce, required_fees, fee_increase_count > 0) + .await? + else { + self.emit(BuilderEvent::formed_bundle( + self.builder_index, + None, + nonce.low_u64(), + fee_increase_count, + required_fees, + )); + return Ok(SendBundleAttemptResult::NoOperations); + }; + let BundleTx { + tx, + expected_storage, + op_hashes, + } = bundle_tx; + + BuilderMetrics::increment_bundle_txns_sent(self.builder_index, self.entry_point.address()); - /// Helper function returning `Result` to be able to use `?`. - async fn send_bundle_with_increasing_gas_fees_inner(&self) -> anyhow::Result { - let (nonce, mut required_fees) = self.transaction_tracker.get_nonce_and_required_fees()?; - let mut initial_op_count: Option = None; - let mut is_replacement = false; - - for fee_increase_count in 0..=self.settings.max_fee_increases { - let Some(bundle_tx) = self - .get_bundle_tx(nonce, required_fees, is_replacement) - .await? - else { + let send_result = self + .transaction_tracker + .send_transaction(tx.clone(), &expected_storage) + .await; + + match send_result { + Ok(tx_hash) => { self.emit(BuilderEvent::formed_bundle( self.builder_index, - None, + Some(BundleTxDetails { + tx_hash, + tx, + op_hashes: Arc::new(op_hashes), + }), nonce.low_u64(), fee_increase_count, required_fees, )); - return Ok(match initial_op_count { - Some(initial_op_count) => { - BuilderMetrics::increment_bundle_txns_abandoned( - self.builder_index, - self.entry_point.address(), - ); - SendBundleResult::NoOperationsAfterFeeIncreases { - initial_op_count, - attempt_number: fee_increase_count, - } - } - None => SendBundleResult::NoOperationsInitially, - }); - }; - let BundleTx { - tx, - expected_storage, - op_hashes, - } = bundle_tx; - if initial_op_count.is_none() { - initial_op_count = Some(op_hashes.len()); - } - let current_fees = GasFees::from(&tx); - - BuilderMetrics::increment_bundle_txns_sent( - self.builder_index, - self.entry_point.address(), - ); - let send_result = self - .transaction_tracker - .send_transaction(tx.clone(), &expected_storage) - .await?; - let update = match send_result { - SendResult::TrackerUpdate(update) => update, - SendResult::TxHash(tx_hash) => { - self.emit(BuilderEvent::formed_bundle( - self.builder_index, - Some(BundleTxDetails { - tx_hash, - tx, - op_hashes: Arc::new(op_hashes), - }), - nonce.low_u64(), - fee_increase_count, - required_fees, - )); - self.transaction_tracker.wait_for_update().await? - } - }; - match update { - TrackerUpdate::Mined { - tx_hash, - nonce, - block_number, - attempt_number, - gas_limit, - gas_used, - } => { - self.emit(BuilderEvent::transaction_mined( - self.builder_index, - tx_hash, - nonce.low_u64(), - block_number, - )); - BuilderMetrics::increment_bundle_txns_success( - self.builder_index, - self.entry_point.address(), - ); - BuilderMetrics::set_bundle_gas_stats( - gas_limit, - gas_used, - self.builder_index, - self.entry_point.address(), - ); - return Ok(SendBundleResult::Success { - block_number, - attempt_number, - tx_hash, - }); - } - TrackerUpdate::StillPendingAfterWait => { - info!("Transaction not mined for several blocks") - } - TrackerUpdate::LatestTxDropped { nonce } => { - self.emit(BuilderEvent::latest_transaction_dropped( - self.builder_index, - nonce.low_u64(), - )); - BuilderMetrics::increment_bundle_txns_dropped( - self.builder_index, - self.entry_point.address(), - ); - info!("Previous transaction dropped by sender"); - } - TrackerUpdate::NonceUsedForOtherTx { nonce } => { - self.emit(BuilderEvent::nonce_used_for_other_transaction( - self.builder_index, - nonce.low_u64(), - )); - BuilderMetrics::increment_bundle_txns_nonce_used( - self.builder_index, - self.entry_point.address(), - ); - bail!("nonce used by external transaction") - } - TrackerUpdate::ReplacementUnderpriced => { - BuilderMetrics::increment_bundle_txn_replacement_underpriced( - self.builder_index, - self.entry_point.address(), - ); - info!("Replacement transaction underpriced, increasing fees") - } - }; - info!( - "Bundle transaction failed to mine after {fee_increase_count} fee increases (maxFeePerGas: {}, maxPriorityFeePerGas: {}).", - current_fees.max_fee_per_gas, - current_fees.max_priority_fee_per_gas, - ); - BuilderMetrics::increment_bundle_txn_fee_increases( - self.builder_index, - self.entry_point.address(), - ); - required_fees = Some( - current_fees.increase_by_percent(self.settings.replacement_fee_percent_increase), - ); - is_replacement = true; + Ok(SendBundleAttemptResult::Success) + } + Err(TransactionTrackerError::NonceTooLow) => { + warn!("Replacement transaction underpriced"); + Ok(SendBundleAttemptResult::NonceTooLow) + } + Err(TransactionTrackerError::ReplacementUnderpriced) => { + BuilderMetrics::increment_bundle_txn_replacement_underpriced( + self.builder_index, + self.entry_point.address(), + ); + warn!("Replacement transaction underpriced"); + Ok(SendBundleAttemptResult::ReplacementUnderpriced) + } + Err(e) => { + error!("Failed to send bundle with unexpected error: {e:?}"); + Err(e.into()) + } } - BuilderMetrics::increment_bundle_txns_abandoned( - self.builder_index, - self.entry_point.address(), - ); - Ok(SendBundleResult::StalledAtMaxFeeIncreases) } /// Builds a bundle and returns some metadata and the transaction to send @@ -611,6 +572,162 @@ where } } +struct BundleSenderTrigger { + bundling_mode: BundlingMode, + block_rx: UnboundedReceiver, + bundle_action_receiver: mpsc::Receiver, + timer: tokio::time::Interval, + last_block: NewHead, +} + +impl BundleSenderTrigger { + async fn new( + pool_client: &P, + bundle_action_receiver: mpsc::Receiver, + timer_interval: Duration, + ) -> anyhow::Result { + let block_rx = Self::start_block_stream(pool_client).await?; + + Ok(Self { + bundling_mode: BundlingMode::Auto, + block_rx, + bundle_action_receiver, + timer: tokio::time::interval(timer_interval), + last_block: NewHead { + block_hash: H256::zero(), + block_number: 0, + }, + }) + } + + async fn start_block_stream( + pool_client: &P, + ) -> anyhow::Result> { + let Ok(mut new_heads) = pool_client.subscribe_new_heads().await else { + error!("Failed to subscribe to new blocks"); + bail!("failed to subscribe to new blocks"); + }; + + let (tx, rx) = mpsc::unbounded_channel(); + tokio::spawn(async move { + loop { + match new_heads.next().await { + Some(b) => { + if tx.send(b).is_err() { + error!("Failed to buffer new block for bundle sender"); + return; + } + } + None => { + error!("Block stream ended"); + return; + } + } + } + }); + + Ok(rx) + } + + async fn wait_for_trigger( + &mut self, + ) -> anyhow::Result>> { + let mut send_bundle_response: Option> = None; + + loop { + // 3 triggers for loop logic: + // 1 - new block + // - If auto mode, send next bundle + // 2 - timer tick + // - If auto mode, send next bundle + // 3 - action recv + // - If change mode, change and restart loop + // - If send bundle and manual mode, send next bundle + tokio::select! { + b = self.block_rx.recv() => { + let Some(b) = b else { + error!("Block stream closed"); + bail!("Block stream closed"); + }; + + self.last_block = b; + + match self.bundling_mode { + BundlingMode::Manual => continue, + BundlingMode::Auto => break, + } + }, + _ = self.timer.tick() => { + match self.bundling_mode { + BundlingMode::Manual => continue, + BundlingMode::Auto => break, + } + }, + a = self.bundle_action_receiver.recv() => { + match a { + Some(BundleSenderAction::ChangeMode(mode)) => { + debug!("changing bundling mode to {mode:?}"); + self.bundling_mode = mode; + continue; + }, + Some(BundleSenderAction::SendBundle(r)) => { + match self.bundling_mode { + BundlingMode::Manual => { + send_bundle_response = Some(r.responder); + break; + }, + BundlingMode::Auto => { + error!("Received bundle send action while in auto mode, ignoring"); + continue; + } + } + }, + None => { + error!("Bundle action recv closed"); + bail!("Bundle action recv closed"); + } + } + } + }; + } + + self.consume_blocks()?; + + Ok(send_bundle_response) + } + + async fn wait_for_block(&mut self) -> anyhow::Result { + self.block_rx + .recv() + .await + .ok_or_else(|| anyhow::anyhow!("Block stream closed"))?; + self.consume_blocks()?; + Ok(self.last_block.clone()) + } + + fn consume_blocks(&mut self) -> anyhow::Result<()> { + // Consume any other blocks that may have been buffered up + loop { + match self.block_rx.try_recv() { + Ok(b) => { + self.last_block = b; + } + Err(mpsc::error::TryRecvError::Empty) => { + return Ok(()); + } + Err(mpsc::error::TryRecvError::Disconnected) => { + error!("Block stream closed"); + bail!("Block stream closed"); + } + } + } + } + + fn last_block(&self) -> &NewHead { + &self.last_block + } +} + struct BuilderMetrics {} impl BuilderMetrics { diff --git a/crates/builder/src/sender/mod.rs b/crates/builder/src/sender/mod.rs index e958e574b..a83d0a296 100644 --- a/crates/builder/src/sender/mod.rs +++ b/crates/builder/src/sender/mod.rs @@ -54,6 +54,9 @@ pub(crate) enum TxSenderError { /// Replacement transaction was underpriced #[error("replacement transaction underpriced")] ReplacementUnderpriced, + /// Nonce too low + #[error("nonce too low")] + NonceTooLow, /// All other errors #[error(transparent)] Other(#[from] anyhow::Error), @@ -217,6 +220,8 @@ impl From for TxSenderError { if let Some(e) = e.as_error_response() { if e.message.contains("replacement transaction underpriced") { return TxSenderError::ReplacementUnderpriced; + } else if e.message.contains("nonce too low") { + return TxSenderError::NonceTooLow; } } TxSenderError::Other(value.into()) diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index 91c41190b..e934935e3 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -415,8 +415,6 @@ where )?; let tracker_settings = transaction_tracker::Settings { - poll_interval: self.args.eth_poll_interval, - max_blocks_to_wait_for_mine: self.args.max_blocks_to_wait_for_mine, replacement_fee_percent_increase: self.args.replacement_fee_percent_increase, }; @@ -429,8 +427,8 @@ where .await?; let builder_settings = bundle_sender::Settings { - replacement_fee_percent_increase: self.args.replacement_fee_percent_increase, max_fee_increases: self.args.max_fee_increases, + max_blocks_to_wait_for_mine: self.args.max_blocks_to_wait_for_mine, }; let proposer = BundleProposerImpl::new( diff --git a/crates/builder/src/transaction_tracker.rs b/crates/builder/src/transaction_tracker.rs index 1dbe8f99d..b5a452615 100644 --- a/crates/builder/src/transaction_tracker.rs +++ b/crates/builder/src/transaction_tracker.rs @@ -11,7 +11,7 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use anyhow::{bail, Context}; use async_trait::async_trait; @@ -19,8 +19,7 @@ use ethers::types::{transaction::eip2718::TypedTransaction, H256, U256}; use rundler_provider::Provider; use rundler_sim::ExpectedStorage; use rundler_types::GasFees; -use tokio::time; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::sender::{TransactionSender, TxSenderError, TxStatus}; @@ -35,38 +34,46 @@ use crate::sender::{TransactionSender, TxSenderError, TxStatus}; /// have changed so that it is worth making another attempt. #[async_trait] pub(crate) trait TransactionTracker: Send + Sync + 'static { - fn get_nonce_and_required_fees(&self) -> anyhow::Result<(U256, Option)>; + /// Returns the current nonce and the required fees for the next transaction. + fn get_nonce_and_required_fees(&self) -> TransactionTrackerResult<(U256, Option)>; /// Sends the provided transaction and typically returns its transaction /// hash, but if the transaction failed to send because another transaction /// with the same nonce mined first, then returns information about that /// transaction instead. async fn send_transaction( - &self, + &mut self, tx: TypedTransaction, expected_stroage: &ExpectedStorage, - ) -> anyhow::Result; + ) -> TransactionTrackerResult; - /// Waits until one of the following occurs: - /// + /// Checks: /// 1. One of our transactions mines (not necessarily the one just sent). /// 2. All our send transactions have dropped. /// 3. Our nonce has changed but none of our transactions mined. This means /// that a transaction from our account other than one of the ones we are /// tracking has mined. This should not normally happen. /// 4. Several new blocks have passed. - async fn wait_for_update(&self) -> anyhow::Result; + async fn check_for_update(&mut self) -> TransactionTrackerResult>; - /// Like `wait_for_update`, except it returns immediately if there is no - /// update rather than waiting for several new blocks. - async fn check_for_update_now(&self) -> anyhow::Result>; + /// Resets the tracker to its initial state + async fn reset(&mut self); } -pub(crate) enum SendResult { - TxHash(H256), - TrackerUpdate(TrackerUpdate), +/// Errors that can occur while using a `TransactionTracker`. +#[derive(Debug, thiserror::Error)] +pub(crate) enum TransactionTrackerError { + #[error("nonce too low")] + NonceTooLow, + #[error("replacement transaction underpriced")] + ReplacementUnderpriced, + /// All other errors + #[error(transparent)] + Other(#[from] anyhow::Error), } +pub(crate) type TransactionTrackerResult = std::result::Result; + #[derive(Debug)] #[allow(dead_code)] pub(crate) enum TrackerUpdate { @@ -78,26 +85,16 @@ pub(crate) enum TrackerUpdate { gas_limit: Option, gas_used: Option, }, - StillPendingAfterWait, LatestTxDropped { nonce: U256, }, NonceUsedForOtherTx { nonce: U256, }, - ReplacementUnderpriced, } #[derive(Debug)] -pub(crate) struct TransactionTrackerImpl( - tokio::sync::Mutex>, -) -where - P: Provider, - T: TransactionSender; - -#[derive(Debug)] -struct TransactionTrackerImplInner +pub(crate) struct TransactionTrackerImpl where P: Provider, T: TransactionSender, @@ -114,8 +111,6 @@ where #[derive(Clone, Copy, Debug)] pub(crate) struct Settings { - pub(crate) poll_interval: Duration, - pub(crate) max_blocks_to_wait_for_mine: u64, pub(crate) replacement_fee_percent_increase: u64, } @@ -126,33 +121,6 @@ struct PendingTransaction { attempt_number: u64, } -#[async_trait] -impl TransactionTracker for TransactionTrackerImpl -where - P: Provider, - T: TransactionSender, -{ - fn get_nonce_and_required_fees(&self) -> anyhow::Result<(U256, Option)> { - Ok(self.inner()?.get_nonce_and_required_fees()) - } - - async fn send_transaction( - &self, - tx: TypedTransaction, - expected_storage: &ExpectedStorage, - ) -> anyhow::Result { - self.inner()?.send_transaction(tx, expected_storage).await - } - - async fn wait_for_update(&self) -> anyhow::Result { - self.inner()?.wait_for_update().await - } - - async fn check_for_update_now(&self) -> anyhow::Result> { - self.inner()?.check_for_update_now().await - } -} - impl TransactionTrackerImpl where P: Provider, @@ -163,31 +131,6 @@ where sender: T, settings: Settings, builder_index: u64, - ) -> anyhow::Result { - let inner = - TransactionTrackerImplInner::new(provider, sender, settings, builder_index).await?; - Ok(Self(tokio::sync::Mutex::new(inner))) - } - - fn inner( - &self, - ) -> anyhow::Result>> { - self.0 - .try_lock() - .context("tracker should not be called while waiting for a transaction") - } -} - -impl TransactionTrackerImplInner -where - P: Provider, - T: TransactionSender, -{ - async fn new( - provider: Arc

, - sender: T, - settings: Settings, - builder_index: u64, ) -> anyhow::Result { let nonce = provider .get_transaction_count(sender.address()) @@ -205,7 +148,84 @@ where }) } - fn get_nonce_and_required_fees(&self) -> (U256, Option) { + fn set_nonce_and_clear_state(&mut self, nonce: U256) { + self.nonce = nonce; + self.transactions.clear(); + self.has_dropped = false; + self.attempt_count = 0; + self.update_metrics(); + } + + async fn get_external_nonce(&self) -> anyhow::Result { + self.provider + .get_transaction_count(self.sender.address()) + .await + .context("tracker should load current nonce from provider") + } + + fn validate_transaction(&self, tx: &TypedTransaction) -> anyhow::Result<()> { + let Some(&nonce) = tx.nonce() else { + bail!("transaction given to tracker should have nonce set"); + }; + let gas_fees = GasFees::from(tx); + let (required_nonce, required_gas_fees) = self.get_nonce_and_required_fees()?; + if nonce != required_nonce { + bail!("tried to send transaction with nonce {nonce}, but should match tracker's nonce of {required_nonce}"); + } + if let Some(required_gas_fees) = required_gas_fees { + if gas_fees.max_fee_per_gas < required_gas_fees.max_fee_per_gas + || gas_fees.max_priority_fee_per_gas < required_gas_fees.max_priority_fee_per_gas + { + bail!("new transaction's gas fees should be at least the required fees") + } + } + Ok(()) + } + + fn update_metrics(&self) { + TransactionTrackerMetrics::set_num_pending_transactions( + self.builder_index, + self.transactions.len(), + ); + TransactionTrackerMetrics::set_nonce(self.builder_index, self.nonce); + TransactionTrackerMetrics::set_attempt_count(self.builder_index, self.attempt_count); + if let Some(tx) = self.transactions.last() { + TransactionTrackerMetrics::set_current_fees(self.builder_index, Some(tx.gas_fees)); + } else { + TransactionTrackerMetrics::set_current_fees(self.builder_index, None); + } + } + + async fn get_mined_tx_gas_info( + &self, + tx_hash: H256, + ) -> anyhow::Result<(Option, Option)> { + let (tx, tx_receipt) = tokio::try_join!( + self.provider.get_transaction(tx_hash), + self.provider.get_transaction_receipt(tx_hash), + )?; + let gas_limit = tx.map(|t| t.gas).or_else(|| { + warn!("failed to fetch transaction data for tx: {}", tx_hash); + None + }); + let gas_used = match tx_receipt { + Some(r) => r.gas_used, + None => { + warn!("failed to fetch transaction receipt for tx: {}", tx_hash); + None + } + }; + Ok((gas_limit, gas_used)) + } +} + +#[async_trait] +impl TransactionTracker for TransactionTrackerImpl +where + P: Provider, + T: TransactionSender, +{ + fn get_nonce_and_required_fees(&self) -> TransactionTrackerResult<(U256, Option)> { let gas_fees = if self.has_dropped { None } else { @@ -214,24 +234,17 @@ where .increase_by_percent(self.settings.replacement_fee_percent_increase) }) }; - (self.nonce, gas_fees) + Ok((self.nonce, gas_fees)) } async fn send_transaction( &mut self, tx: TypedTransaction, expected_storage: &ExpectedStorage, - ) -> anyhow::Result { + ) -> TransactionTrackerResult { self.validate_transaction(&tx)?; let gas_fees = GasFees::from(&tx); - let send_result = self.sender.send_transaction(tx, expected_storage).await; - let sent_tx = match send_result { - Ok(sent_tx) => sent_tx, - Err(error) => { - let tracker_update = self.handle_send_error(error).await?; - return Ok(SendResult::TrackerUpdate(tracker_update)); - } - }; + let sent_tx = self.sender.send_transaction(tx, expected_storage).await?; info!( "Sent transaction {:?} nonce: {:?}", sent_tx.tx_hash, sent_tx.nonce @@ -244,61 +257,18 @@ where self.has_dropped = false; self.attempt_count += 1; self.update_metrics(); - Ok(SendResult::TxHash(sent_tx.tx_hash)) + Ok(sent_tx.tx_hash) } - /// When we fail to send a transaction, it may be because another - /// transaction has mined before it could be sent, invalidating the nonce. - /// Thus, do one last check for an update before returning the error. - async fn handle_send_error(&mut self, error: TxSenderError) -> anyhow::Result { - match &error { - TxSenderError::ReplacementUnderpriced => { - return Ok(TrackerUpdate::ReplacementUnderpriced) - } - TxSenderError::Other(_error) => {} - } - - let update = self.check_for_update_now().await?; - let Some(update) = update else { - return Err(error.into()); - }; - match &update { - TrackerUpdate::StillPendingAfterWait | TrackerUpdate::LatestTxDropped { .. } => { - Err(error.into()) - } - _ => Ok(update), - } - } - - async fn wait_for_update(&mut self) -> anyhow::Result { - let start_block_number = self - .provider - .get_block_number() - .await - .context("tracker should get starting block when waiting for update")?; - let end_block_number = start_block_number + self.settings.max_blocks_to_wait_for_mine; - loop { - let update = self.check_for_update_now().await?; - if let Some(update) = update { - return Ok(update); - } - let current_block_number = self - .provider - .get_block_number() - .await - .context("tracker should get current block when polling for updates")?; - if end_block_number <= current_block_number { - return Ok(TrackerUpdate::StillPendingAfterWait); - } - time::sleep(self.settings.poll_interval).await; - } - } - - async fn check_for_update_now(&mut self) -> anyhow::Result> { + async fn check_for_update(&mut self) -> TransactionTrackerResult> { let external_nonce = self.get_external_nonce().await?; if self.nonce < external_nonce { // The nonce has changed. Check to see which of our transactions has // mined, if any. + debug!( + "Nonce has changed from {:?} to {:?}", + self.nonce, external_nonce + ); let mut out = TrackerUpdate::NonceUsedForOtherTx { nonce: self.nonce }; for tx in self.transactions.iter().rev() { @@ -307,6 +277,7 @@ where .get_transaction_status(tx.tx_hash) .await .context("tracker should check transaction status when the nonce changes")?; + info!("Status of tx {:?}: {:?}", tx.tx_hash, status); if let TxStatus::Mined { block_number } = status { let (gas_limit, gas_used) = self.get_mined_tx_gas_info(tx.tx_hash).await?; out = TrackerUpdate::Mined { @@ -361,74 +332,21 @@ where }) } - fn set_nonce_and_clear_state(&mut self, nonce: U256) { - self.nonce = nonce; - self.transactions.clear(); - self.has_dropped = false; - self.attempt_count = 0; - self.update_metrics(); - } - - async fn get_external_nonce(&self) -> anyhow::Result { - self.provider - .get_transaction_count(self.sender.address()) - .await - .context("tracker should load current nonce from provider") + async fn reset(&mut self) { + let nonce = self.get_external_nonce().await.unwrap_or(self.nonce); + self.set_nonce_and_clear_state(nonce); } +} - fn validate_transaction(&self, tx: &TypedTransaction) -> anyhow::Result<()> { - let Some(&nonce) = tx.nonce() else { - bail!("transaction given to tracker should have nonce set"); - }; - let gas_fees = GasFees::from(tx); - let (required_nonce, required_gas_fees) = self.get_nonce_and_required_fees(); - if nonce != required_nonce { - bail!("tried to send transaction with nonce {nonce}, but should match tracker's nonce of {required_nonce}"); - } - if let Some(required_gas_fees) = required_gas_fees { - if gas_fees.max_fee_per_gas < required_gas_fees.max_fee_per_gas - || gas_fees.max_priority_fee_per_gas < required_gas_fees.max_priority_fee_per_gas - { - bail!("new transaction's gas fees should be at least the required fees") +impl From for TransactionTrackerError { + fn from(value: TxSenderError) -> Self { + match value { + TxSenderError::NonceTooLow => TransactionTrackerError::NonceTooLow, + TxSenderError::ReplacementUnderpriced => { + TransactionTrackerError::ReplacementUnderpriced } + TxSenderError::Other(e) => TransactionTrackerError::Other(e), } - Ok(()) - } - - fn update_metrics(&self) { - TransactionTrackerMetrics::set_num_pending_transactions( - self.builder_index, - self.transactions.len(), - ); - TransactionTrackerMetrics::set_nonce(self.builder_index, self.nonce); - TransactionTrackerMetrics::set_attempt_count(self.builder_index, self.attempt_count); - if let Some(tx) = self.transactions.last() { - TransactionTrackerMetrics::set_current_fees(self.builder_index, Some(tx.gas_fees)); - } else { - TransactionTrackerMetrics::set_current_fees(self.builder_index, None); - } - } - - async fn get_mined_tx_gas_info( - &self, - tx_hash: H256, - ) -> anyhow::Result<(Option, Option)> { - let (tx, tx_receipt) = tokio::try_join!( - self.provider.get_transaction(tx_hash), - self.provider.get_transaction_receipt(tx_hash), - )?; - let gas_limit = tx.map(|t| t.gas).or_else(|| { - warn!("failed to fetch transaction data for tx: {}", tx_hash); - None - }); - let gas_used = match tx_receipt { - Some(r) => r.gas_used, - None => { - warn!("failed to fetch transaction receipt for tx: {}", tx_hash); - None - } - }; - Ok((gas_limit, gas_used)) } } @@ -482,8 +400,6 @@ mod tests { provider: MockProvider, ) -> TransactionTrackerImpl { let settings = Settings { - poll_interval: Duration::from_secs(0), - max_blocks_to_wait_for_mine: 3, replacement_fee_percent_increase: 5, }; @@ -512,7 +428,7 @@ mod tests { .expect_get_transaction_count() .returning(move |_a| Ok(U256::from(0))); - let tracker = create_tracker(sender, provider).await; + let mut tracker = create_tracker(sender, provider).await; let tx = Eip1559TransactionRequest::new() .nonce(0) @@ -598,7 +514,7 @@ mod tests { .expect_get_transaction_count() .returning(move |_a| Ok(U256::from(2))); - let tracker = create_tracker(sender, provider).await; + let mut tracker = create_tracker(sender, provider).await; let tx = Eip1559TransactionRequest::new(); let exp = ExpectedStorage::default(); @@ -625,7 +541,7 @@ mod tests { .expect_get_transaction_count() .returning(move |_a| Ok(U256::from(2))); - let tracker = create_tracker(sender, provider).await; + let mut tracker = create_tracker(sender, provider).await; let tx = Eip1559TransactionRequest::new().nonce(0); let exp = ExpectedStorage::default(); @@ -651,41 +567,11 @@ mod tests { .expect_get_transaction_count() .returning(move |_a| Ok(U256::from(0))); - let tracker = create_tracker(sender, provider).await; + let mut tracker = create_tracker(sender, provider).await; let tx = Eip1559TransactionRequest::new().nonce(0); let exp = ExpectedStorage::default(); - let sent_transaction = tracker.send_transaction(tx.into(), &exp).await.unwrap(); - - assert!(matches!(sent_transaction, SendResult::TxHash(..))); - } - - #[tokio::test] - async fn test_wait_for_update_still_pending() { - let (mut sender, mut provider) = create_base_config(); - sender.expect_address().return_const(Address::zero()); - - let mut s = Sequence::new(); - - provider - .expect_get_transaction_count() - .returning(move |_a| Ok(U256::from(0))); - - for block_number in 1..=4 { - provider - .expect_get_block_number() - .returning(move || Ok(block_number)) - .times(1) - .in_sequence(&mut s); - } - - let tracker = create_tracker(sender, provider).await; - let tracker_update = tracker.wait_for_update().await.unwrap(); - - assert!(matches!( - tracker_update, - TrackerUpdate::StillPendingAfterWait - )); + tracker.send_transaction(tx.into(), &exp).await.unwrap(); } // TODO(#295): fix dropped status @@ -727,7 +613,7 @@ mod tests { // } #[tokio::test] - async fn test_wait_for_update_nonce_used() { + async fn test_check_for_update_nonce_used() { let (mut sender, mut provider) = create_base_config(); sender.expect_address().return_const(Address::zero()); @@ -740,14 +626,9 @@ mod tests { .in_sequence(&mut provider_seq); } - provider - .expect_get_block_number() - .returning(move || Ok(1)) - .times(1); + let mut tracker = create_tracker(sender, provider).await; - let tracker = create_tracker(sender, provider).await; - - let tracker_update = tracker.wait_for_update().await.unwrap(); + let tracker_update = tracker.check_for_update().await.unwrap().unwrap(); assert!(matches!( tracker_update, @@ -756,7 +637,7 @@ mod tests { } #[tokio::test] - async fn test_wait_for_update_mined() { + async fn test_check_for_update_mined() { let (mut sender, mut provider) = create_base_config(); sender.expect_address().return_const(Address::zero()); sender @@ -776,11 +657,6 @@ mod tests { .expect_get_transaction_count() .returning(move |_a| Ok(U256::from(0))); - provider - .expect_get_block_number() - .returning(move || Ok(1)) - .times(1); - provider.expect_get_transaction().returning(|_: H256| { Ok(Some(Transaction { gas: U256::from(0), @@ -797,14 +673,14 @@ mod tests { })) }); - let tracker = create_tracker(sender, provider).await; + let mut tracker = create_tracker(sender, provider).await; let tx = Eip1559TransactionRequest::new().nonce(0); let exp = ExpectedStorage::default(); // send dummy transaction let _sent = tracker.send_transaction(tx.into(), &exp).await; - let tracker_update = tracker.wait_for_update().await.unwrap(); + let tracker_update = tracker.check_for_update().await.unwrap().unwrap(); assert!(matches!(tracker_update, TrackerUpdate::Mined { .. })); }