diff --git a/bin/tempo-bench/src/cmd/max_tps.rs b/bin/tempo-bench/src/cmd/max_tps.rs index 5a9ec9422e..eb9838bc95 100644 --- a/bin/tempo-bench/src/cmd/max_tps.rs +++ b/bin/tempo-bench/src/cmd/max_tps.rs @@ -3,14 +3,12 @@ mod erc20; use alloy_consensus::Transaction; use itertools::Itertools; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; use reth_tracing::{ RethTracer, Tracer, tracing::{debug, error, info}, }; use tempo_alloy::{ - TempoNetwork, fillers::ExpiringNonceFiller, primitives::TempoTxEnvelope, - provider::ext::TempoProviderBuilderExt, + TempoNetwork, fillers::ExpiringNonceFiller, provider::ext::TempoProviderBuilderExt, }; use alloy::{ @@ -30,14 +28,14 @@ use alloy::{ transports::http::reqwest::Url, }; use clap::Parser; -use eyre::{Context, OptionExt, ensure}; +use eyre::{Context, OptionExt}; use futures::{ - FutureExt, StreamExt, TryStreamExt, + FutureExt, Stream, StreamExt, TryStreamExt, future::BoxFuture, stream::{self}, }; use governor::{Quota, RateLimiter, state::StreamRateLimitExt}; -use indicatif::{ParallelProgressIterator, ProgressBar, ProgressIterator}; +use indicatif::{ProgressBar, ProgressIterator}; use rand::{random_range, seq::IndexedRandom}; use rlimit::Resource; use serde::Serialize; @@ -69,7 +67,7 @@ use tempo_precompiles::{ }; use tokio::{ select, - time::{Sleep, interval, sleep}, + time::{interval, sleep}, }; use tokio_util::sync::CancellationToken; @@ -190,15 +188,6 @@ pub struct MaxTpsArgs { /// nonce management. #[arg(long)] use_standard_nonces: bool, - - /// Batch size for signing transactions when using expiring nonces. - /// - /// Since expiring nonces have a 25-second validity window, transactions must be signed - /// in batches close to when they're sent. This controls how many transactions to - /// generate/sign before sending. Default is 15 seconds worth of transactions at the - /// target TPS (e.g., 75,000 at 5,000 TPS). - #[arg(long)] - expiring_batch_secs: Option, } impl MaxTpsArgs { @@ -292,7 +281,6 @@ impl MaxTpsArgs { self, signer_provider_manager: SignerProviderManager, ) -> eyre::Result<()> { - let accounts = self.accounts.get(); let signer_providers = signer_provider_manager.signer_providers(); if self.clear_txpool { @@ -397,11 +385,14 @@ impl MaxTpsArgs { None }; + let use_expiring_nonces = !self.use_2d_nonces && !self.use_standard_nonces; + let expiry_secs = if use_expiring_nonces { + Some(ExpiringNonceFiller::DEFAULT_EXPIRY_SECS) + } else { + None + }; + let gen_input = GenerateTransactionsInput { - total_txs, - accounts, - signer_provider_manager: signer_provider_manager.clone(), - max_concurrent_requests: self.max_concurrent_requests, tip20_weight, place_order_weight, swap_weight, @@ -410,124 +401,96 @@ impl MaxTpsArgs { user_tokens, erc20_tokens, recipients, - tx_id: Arc::new(AtomicUsize::new(0)), + expiry_secs, }; - // For expiring nonces, we need to generate/sign/send in batches to avoid - // transactions expiring before they're sent. We pipeline batch generation - // with sending to avoid gaps that would cause empty blocks. - let use_expiring_nonces = !self.use_2d_nonces && !self.use_standard_nonces; - let mut pending_txs = if use_expiring_nonces { - let batch_secs = self.expiring_batch_secs.unwrap_or(15); - let batch_size = self.tps * batch_secs; - let num_batches = total_txs.div_ceil(batch_size); - - info!( - total_txs, - batch_size, - num_batches, - batch_secs, - "Generating and sending transactions in batches (expiring nonces, pipelined)" - ); - - let tx_counter = Arc::new(AtomicUsize::new(0)); - let success_counter = Arc::new(AtomicUsize::new(0)); - let failed_counter = Arc::new(AtomicUsize::new(0)); - let tx_counter_clone = tx_counter.clone(); - let success_counter_clone = success_counter.clone(); - let failed_counter_clone = failed_counter.clone(); - let target_count = total_txs as usize; - let token = CancellationToken::new(); - let token_clone = token.clone(); - - // Start TPS monitor - tokio::spawn(async move { - monitor_tps(tx_counter_clone, target_count, token_clone).await; - }); - - let mut all_pending_txs = VecDeque::new(); - let start_time = std::time::Instant::now(); - let total_duration = Duration::from_secs(self.duration); - - // Generate first batch before starting the send loop - let first_batch_size = batch_size.min(total_txs); - let batch_input = GenerateTransactionsInput { - total_txs: first_batch_size, - ..gen_input.clone() - }; - let mut current_batch = Some( - generate_transactions(batch_input) + info!(total_txs, "Generating and sending transactions"); + + let counters = TransactionCounters::default(); + let target_count = total_txs as usize; + let cancel_token = CancellationToken::new(); + + // Start TPS monitor + tokio::spawn(monitor_tps( + counters.clone(), + target_count, + cancel_token.clone(), + )); + + let rate_limiter = + RateLimiter::direct(Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap())); + + let mut pending_txs = + generate_transactions(signer_provider_manager.clone(), gen_input, counters.clone()) + .buffer_unordered(self.max_concurrent_requests) + .filter_map(|result| async { + match result { + Ok(bytes) => Some(bytes), + Err(e) => { + debug!(?e, "Transaction generation failed"); + None + } + } + }) + .boxed() + .ratelimit_stream(&rate_limiter) + .zip(stream::repeat_with(|| { + signer_provider_manager.random_unsigned_provider() + })) + .map(|(bytes, provider)| async move { + tokio::time::timeout( + Duration::from_secs(1), + provider.send_raw_transaction(&bytes), + ) .await - .context("Failed to generate first batch")?, - ); - - for batch_idx in 0..num_batches { - let elapsed = start_time.elapsed(); - let remaining_duration = total_duration.saturating_sub(elapsed); - let batch_duration = Duration::from_secs(batch_secs).min(remaining_duration); + }) + .buffer_unordered(self.max_concurrent_requests) + .filter_map({ + let counters = counters.clone(); + move |result| { + let counters = counters.clone(); + async move { + match result { + Ok(Ok(pending_tx)) => { + counters.sent.fetch_add(1, Ordering::Relaxed); + counters.success.fetch_add(1, Ordering::Relaxed); + Some(pending_tx) + } + Ok(Err(err)) => { + counters.sent.fetch_add(1, Ordering::Relaxed); + counters.failed.fetch_add(1, Ordering::Relaxed); + debug!(?err, "Failed to send transaction"); + None + } + Err(_) => { + counters.sent.fetch_add(1, Ordering::Relaxed); + counters.failed.fetch_add(1, Ordering::Relaxed); + debug!("Transaction sending timed out"); + None + } + } + } + } + }) + .take_until(sleep(Duration::from_secs(self.duration))) + .collect::>() + .await; - if batch_duration.is_zero() { - info!(batch_idx, "Time expired, stopping"); - break; - } + cancel_token.cancel(); - let batch_to_send = current_batch.take().expect("batch should exist"); - - // Send current batch while generating next batch in parallel - let send_fut = send_transactions_with_counters( - batch_to_send, - signer_provider_manager.clone(), - self.max_concurrent_requests, - self.tps, - sleep(batch_duration), - tx_counter.clone(), - success_counter.clone(), - failed_counter.clone(), - ); - - if batch_idx + 1 < num_batches { - let remaining_txs = total_txs - ((batch_idx + 1) * batch_size); - let next_batch_size = remaining_txs.min(batch_size); - let batch_input = GenerateTransactionsInput { - total_txs: next_batch_size, - ..gen_input.clone() - }; - // Run send and generate concurrently - let (batch_pending, next) = - tokio::join!(send_fut, generate_transactions(batch_input)); - all_pending_txs.extend(batch_pending); - current_batch = Some(next.context("Failed to generate next batch")?); - } else { - // Last batch - just send - let batch_pending = send_fut.await; - all_pending_txs.extend(batch_pending); - } - } - - token.cancel(); - info!( - success = success_counter_clone.load(Ordering::Relaxed), - failed = failed_counter_clone.load(Ordering::Relaxed), - timeout = 0, - "Finished sending transactions" - ); + info!( + tip20_transfers = counters.tip20_transfers.load(Ordering::Relaxed), + swaps = counters.swaps.load(Ordering::Relaxed), + orders = counters.orders.load(Ordering::Relaxed), + erc20_transfers = counters.erc20_transfers.load(Ordering::Relaxed), + "Generated transactions", + ); - all_pending_txs - } else { - let transactions = generate_transactions(gen_input) - .await - .context("Failed to generate transactions")?; - - // Send transactions - send_transactions( - transactions, - signer_provider_manager.clone(), - self.max_concurrent_requests, - self.tps, - sleep(Duration::from_secs(self.duration)), - ) - .await - }; + info!( + success = counters.success.load(Ordering::Relaxed), + failed = counters.failed.load(Ordering::Relaxed), + "Finished sending transactions" + ); let end_block_number = provider.get_block_number().await?; @@ -624,141 +587,42 @@ impl MnemonicArg { } } -/// Awaits pending transactions with up to `tps` per second and `max_concurrent_requests` simultaneous in-flight requests. Stops when `deadline` future resolves. -async fn send_transactions + 'static>( - transactions: Vec>, - signer_provider_manager: SignerProviderManager, - max_concurrent_requests: usize, - tps: u64, - deadline: Sleep, -) -> VecDeque> { - info!( - transactions = transactions.len(), - max_concurrent_requests, tps, "Sending transactions" - ); - - // Create shared transaction counter and monitoring - let tx_counter = Arc::new(AtomicUsize::new(0)); - - // Spawn monitoring task for TPS reporting - let cancel = CancellationToken::new(); - let _drop_guard = cancel.clone().drop_guard(); - tokio::spawn(monitor_tps( - tx_counter.clone(), - transactions.len(), - cancel.clone(), - )); - - // Create a rate limiter - let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(tps as u32).unwrap())); - - let failed = Arc::new(AtomicUsize::new(0)); - let timeout = Arc::new(AtomicUsize::new(0)); - let transactions = stream::iter(transactions) - .ratelimit_stream(&rate_limiter) - .zip(stream::repeat_with(|| { - signer_provider_manager.random_unsigned_provider() - })) - .map(|(bytes, provider)| async move { - tokio::time::timeout( - Duration::from_secs(1), - provider.send_raw_transaction(&bytes), - ) - .await - }) - .buffer_unordered(max_concurrent_requests) - .filter_map(|result| async { - match result { - Ok(Ok(pending_tx)) => { - tx_counter.fetch_add(1, Ordering::Relaxed); - Some(pending_tx) - } - Ok(Err(err)) => { - failed.fetch_add(1, Ordering::Relaxed); - debug!(?err, "Failed to send transaction"); - None - } - Err(_) => { - timeout.fetch_add(1, Ordering::Relaxed); - debug!("Transaction sending timed out"); - None - } - } - }) - .take_until(deadline) - .collect() - .await; - - info!( - success = tx_counter.load(Ordering::Relaxed), - failed = failed.load(Ordering::Relaxed), - timeout = timeout.load(Ordering::Relaxed), - "Finished sending transactions" - ); - - transactions +#[derive(Clone, Default)] +struct TransactionCounters { + /// Per-type generation counters + tip20_transfers: Arc, + swaps: Arc, + orders: Arc, + erc20_transfers: Arc, + /// Sending counters + sent: Arc, + success: Arc, + failed: Arc, } -/// Same as `send_transactions` but uses external counters for batch mode. -#[allow(clippy::too_many_arguments)] -async fn send_transactions_with_counters + 'static>( - transactions: Vec>, - signer_provider_manager: SignerProviderManager, - max_concurrent_requests: usize, - tps: u64, - deadline: Sleep, - tx_counter: Arc, - success_counter: Arc, - failed_counter: Arc, -) -> VecDeque> { - // Create a rate limiter - let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(tps as u32).unwrap())); - - stream::iter(transactions) - .ratelimit_stream(&rate_limiter) - .zip(stream::repeat_with(|| { - signer_provider_manager.random_unsigned_provider() - })) - .map(|(bytes, provider)| async move { - tokio::time::timeout( - Duration::from_secs(1), - provider.send_raw_transaction(&bytes), - ) - .await - }) - .buffer_unordered(max_concurrent_requests) - .filter_map(|result| async { - match result { - Ok(Ok(pending_tx)) => { - tx_counter.fetch_add(1, Ordering::Relaxed); - success_counter.fetch_add(1, Ordering::Relaxed); - Some(pending_tx) - } - Ok(Err(err)) => { - failed_counter.fetch_add(1, Ordering::Relaxed); - debug!(?err, "Failed to send transaction"); - None - } - Err(_) => { - failed_counter.fetch_add(1, Ordering::Relaxed); - debug!("Transaction sending timed out"); - None - } - } - }) - .take_until(deadline) - .collect() - .await +#[derive(Clone)] +struct GenerateTransactionsInput { + tip20_weight: u64, + place_order_weight: u64, + swap_weight: u64, + erc20_weight: u64, + quote_token: Address, + user_tokens: Vec
, + erc20_tokens: Vec
, + /// When set, transfers go to these existing addresses instead of `Address::random()`. + recipients: Option>, + /// When `Some`, sets `valid_before` on each transaction right before signing + /// to keep it fresh for expiring nonces. + expiry_secs: Option, } -async fn generate_transactions + 'static>( - input: GenerateTransactionsInput, -) -> eyre::Result>> { +/// Returns an infinite stream of futures, each generating, signing, and encoding one transaction. +fn generate_transactions + 'static>( + signer_provider_manager: SignerProviderManager, + input: GenerateTransactionsInput, + counters: TransactionCounters, +) -> impl Stream>>> { let GenerateTransactionsInput { - total_txs, - accounts, - signer_provider_manager, - max_concurrent_requests, tip20_weight, place_order_weight, swap_weight, @@ -767,37 +631,30 @@ async fn generate_transactions + 'static>( user_tokens, erc20_tokens, recipients, - tx_id, + expiry_secs, } = input; - let txs_per_sender = total_txs / accounts; - ensure!( - txs_per_sender > 0, - "txs per sender is 0, increase tps or decrease senders" - ); - - info!(transactions = total_txs, "Generating transactions"); - const TX_TYPES: usize = 4; // Weights for random sampling for each transaction type let tx_weights: [u64; TX_TYPES] = [tip20_weight, swap_weight, place_order_weight, erc20_weight]; // Cached gas estimates for each transaction type let gas_estimates: [Arc>; TX_TYPES] = Default::default(); - - // Counters for number of transactions of each type - let tip20_transfers = Arc::new(AtomicUsize::new(0)); - let swaps = Arc::new(AtomicUsize::new(0)); - let orders = Arc::new(AtomicUsize::new(0)); - let erc20_transfers = Arc::new(AtomicUsize::new(0)); - // `tx_id` is shared across batches via `GenerateTransactionsInput` to ensure - // unique tx hashes across all batches when using expiring nonces. - - let builders = ProgressBar::new(total_txs) - .wrap_stream(stream::iter( - std::iter::repeat_with(|| signer_provider_manager.random_unsigned_provider()) - .take(total_txs as usize), - )) - .map(async |provider| { + // Global tx counter used to bump priority fee, ensuring unique tx hashes + // when using expiring nonces (which share nonce=0). + let tx_id = Arc::new(AtomicUsize::new(0)); + + stream::repeat_with(move || { + let signer_provider_manager = signer_provider_manager.clone(); + let gas_estimates = gas_estimates.clone(); + let tx_id = tx_id.clone(); + let recipients = recipients.clone(); + let user_tokens = user_tokens.clone(); + let erc20_tokens = erc20_tokens.clone(); + let counters = counters.clone(); + + async move { + let provider = signer_provider_manager.random_unsigned_provider(); + let signer = signer_provider_manager.random_signer(); let token = user_tokens.choose(&mut rand::rng()).copied().unwrap(); // TODO: can be improved with an enum per transaction type @@ -815,7 +672,7 @@ async fn generate_transactions + 'static>( let mut tx = match tx_index { 0 => { - tip20_transfers.fetch_add(1, Ordering::Relaxed); + counters.tip20_transfers.fetch_add(1, Ordering::Relaxed); let token = ITIP20Instance::new(token, provider.clone()); // Transfer minimum possible amount @@ -824,7 +681,7 @@ async fn generate_transactions + 'static>( .into_transaction_request() } 1 => { - swaps.fetch_add(1, Ordering::Relaxed); + counters.swaps.fetch_add(1, Ordering::Relaxed); let exchange = IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); @@ -834,7 +691,7 @@ async fn generate_transactions + 'static>( .into_transaction_request() } 2 => { - orders.fetch_add(1, Ordering::Relaxed); + counters.orders.fetch_add(1, Ordering::Relaxed); let exchange = IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); @@ -847,7 +704,7 @@ async fn generate_transactions + 'static>( .into_transaction_request() } 3 => { - erc20_transfers.fetch_add(1, Ordering::Relaxed); + counters.erc20_transfers.fetch_add(1, Ordering::Relaxed); let token_address = erc20_tokens.choose(&mut rand::rng()).copied().unwrap(); let token = erc20::MockERC20::new(token_address, provider.clone()); @@ -860,7 +717,6 @@ async fn generate_transactions + 'static>( }; // Get a random signer and set it as the sender of the transaction. - let signer = signer_provider_manager.random_signer(); tx.inner.set_from(signer.address()); let gas = &gas_estimates[tx_index]; @@ -876,12 +732,12 @@ async fn generate_transactions + 'static>( // Fill the rest of transaction. In case we already filled the gas fields, // it will only fill the chain ID and nonce that are efficiently cached inside // the fillers. - let tx = provider.fill(tx).await?; + let filled = provider.fill(tx).await?; // If we never filled the gas fields for that transaction type, cache the estimated // gas. if gas.get().is_none() { - let _ = gas.set(match &tx { + let _ = gas.set(match &filled { SendableTx::Builder(builder) => ( builder .max_fee_per_gas() @@ -903,7 +759,7 @@ async fn generate_transactions + 'static>( }); } - let mut req = tx.try_into_request()?; + let mut req = filled.try_into_request()?; // Bump priority fee by a unique counter to ensure unique tx hashes // when using expiring nonces (which share nonce=0). @@ -915,34 +771,21 @@ async fn generate_transactions + 'static>( req.inner.set_max_fee_per_gas(fee + id); } - eyre::Ok((req, signer)) - }) - .buffer_unordered(max_concurrent_requests) - .try_collect::>() - .await?; - info!( - transactions = builders.len(), - tip20_transfers = tip20_transfers.load(Ordering::Relaxed), - swaps = swaps.load(Ordering::Relaxed), - orders = orders.load(Ordering::Relaxed), - erc20_transfers = erc20_transfers.load(Ordering::Relaxed), - "Generated transactions", - ); - - info!(transactions = builders.len(), "Signing transactions"); - // Sign transactions in parallel using signers directly, so it doesn't require async - let transactions = builders - .into_par_iter() - .progress() - .map(|(tx, signer)| -> eyre::Result { - let mut tx = tx.build_unsigned()?; - let sig = signer.sign_transaction_sync(tx.as_dyn_signable_mut())?; - Ok(tx.into_envelope(sig)) - }) - .map(|result| result.map(|tx| tx.encoded_2718())) - .collect::>>()?; + // Set valid_before right before signing to keep it fresh (expiring nonces only) + if let Some(expiry_secs) = expiry_secs { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + req.set_valid_before(now + expiry_secs); + } - Ok(transactions) + // Sign and encode + let mut unsigned = req.build_unsigned()?; + let sig = signer.sign_transaction_sync(unsigned.as_dyn_signable_mut())?; + eyre::Ok(unsigned.into_envelope(sig).encoded_2718()) + } + }) } /// Funds accounts from the faucet using `temp_fundAddress` RPC. @@ -1122,14 +965,14 @@ pub async fn generate_report( Ok(()) } -async fn monitor_tps(tx_counter: Arc, target_count: usize, token: CancellationToken) { +async fn monitor_tps(counters: TransactionCounters, target_count: usize, token: CancellationToken) { let mut last_count = 0; let mut ticker = interval(Duration::from_secs(1)); loop { select! { _ = ticker.tick() => { - let current_count = tx_counter.load(Ordering::Relaxed); + let current_count = counters.sent.load(Ordering::Relaxed); let tps = current_count - last_count; last_count = current_count; @@ -1194,23 +1037,3 @@ async fn assert_receipt(receipt: R) -> eyre::Result<()> { ); Ok(()) } - -#[derive(Clone)] -struct GenerateTransactionsInput> { - total_txs: u64, - accounts: u64, - signer_provider_manager: SignerProviderManager, - max_concurrent_requests: usize, - tip20_weight: u64, - place_order_weight: u64, - swap_weight: u64, - erc20_weight: u64, - quote_token: Address, - user_tokens: Vec
, - erc20_tokens: Vec
, - /// When set, transfers go to these existing addresses instead of `Address::random()`. - recipients: Option>, - /// Shared counter used to bump priority fee, ensuring unique tx hashes across batches - /// when using expiring nonces (which share nonce=0). - tx_id: Arc, -}