From a8dca0000b48f4aa22e20e363e0c5330bb3f3286 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 20 Feb 2026 16:49:15 +0000 Subject: [PATCH 1/7] refactor(bench): switch to real-time tx generation with bounded buffer Replace batch-based generation/signing with continuous generation in a producer task. Transactions are generated, signed, and encoded one at a time with valid_before set immediately before signing. A bounded channel (size = sending concurrency) provides backpressure, keeping the buffer small and valid_before timestamps fresh. This fixes the 'valid_before too close to current time' errors that occurred when transactions sat in pre-signed batches waiting to be sent. Changes: - Remove batch-based pipeline (generate N, sign N, send N) - Add generate_transactions_to_channel: producer that continuously generates 1 tx at a time - Consumer drains channel with rate limiting and concurrent sending - Set valid_before right before signing (keeps it fresh) - Remove expiring_batch_secs CLI flag (no longer needed) Co-Authored-By: Claude Opus 4.6 --- bin/tempo-bench/src/cmd/max_tps.rs | 332 ++++++++++++++++++----------- 1 file changed, 207 insertions(+), 125 deletions(-) diff --git a/bin/tempo-bench/src/cmd/max_tps.rs b/bin/tempo-bench/src/cmd/max_tps.rs index 5a9ec9422e..5586c6dd3b 100644 --- a/bin/tempo-bench/src/cmd/max_tps.rs +++ b/bin/tempo-bench/src/cmd/max_tps.rs @@ -191,14 +191,6 @@ pub struct MaxTpsArgs { #[arg(long)] use_standard_nonces: bool, - /// Batch size for signing transactions when using expiring nonces. - /// - /// Since expiring nonces have a 25-second validity window, transactions must be signed - /// in batches close to when they're sent. This controls how many transactions to - /// generate/sign before sending. Default is 15 seconds worth of transactions at the - /// target TPS (e.g., 75,000 at 5,000 TPS). - #[arg(long)] - expiring_batch_secs: Option, } impl MaxTpsArgs { @@ -418,16 +410,14 @@ impl MaxTpsArgs { // with sending to avoid gaps that would cause empty blocks. let use_expiring_nonces = !self.use_2d_nonces && !self.use_standard_nonces; let mut pending_txs = if use_expiring_nonces { - let batch_secs = self.expiring_batch_secs.unwrap_or(15); - let batch_size = self.tps * batch_secs; - let num_batches = total_txs.div_ceil(batch_size); + let expiry_secs = ExpiringNonceFiller::DEFAULT_EXPIRY_SECS; + let buffer_size = self.max_concurrent_requests; info!( total_txs, - batch_size, - num_batches, - batch_secs, - "Generating and sending transactions in batches (expiring nonces, pipelined)" + buffer_size, + expiry_secs, + "Generating and sending transactions continuously (expiring nonces)" ); let tx_counter = Arc::new(AtomicUsize::new(0)); @@ -437,78 +427,87 @@ impl MaxTpsArgs { let success_counter_clone = success_counter.clone(); let failed_counter_clone = failed_counter.clone(); let target_count = total_txs as usize; - let token = CancellationToken::new(); - let token_clone = token.clone(); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); // Start TPS monitor tokio::spawn(async move { - monitor_tps(tx_counter_clone, target_count, token_clone).await; + monitor_tps(tx_counter_clone, target_count, cancel_token_clone).await; }); - let mut all_pending_txs = VecDeque::new(); - let start_time = std::time::Instant::now(); - let total_duration = Duration::from_secs(self.duration); + // Channel: producer generates txs continuously, consumer sends them. + // Buffer size = sending concurrency so there's always a tx ready to send. + let (tx_sender, tx_receiver) = + tokio::sync::mpsc::channel::>(buffer_size); + + // Spawn producer: generates, signs, and encodes transactions continuously + let producer_spm = signer_provider_manager.clone(); + let producer = tokio::spawn(async move { + if let Err(e) = generate_transactions_to_channel( + tx_sender, + producer_spm, + gen_input, + expiry_secs, + ) + .await + { + error!(?e, "Transaction producer failed"); + } + }); - // Generate first batch before starting the send loop - let first_batch_size = batch_size.min(total_txs); - let batch_input = GenerateTransactionsInput { - total_txs: first_batch_size, - ..gen_input.clone() - }; - let mut current_batch = Some( - generate_transactions(batch_input) - .await - .context("Failed to generate first batch")?, + // Consumer: drain channel, rate-limit, send with concurrency + let rate_limiter = RateLimiter::direct( + Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap()), ); - for batch_idx in 0..num_batches { - let elapsed = start_time.elapsed(); - let remaining_duration = total_duration.saturating_sub(elapsed); - let batch_duration = Duration::from_secs(batch_secs).min(remaining_duration); - - if batch_duration.is_zero() { - info!(batch_idx, "Time expired, stopping"); - break; + let all_pending_txs = stream::unfold(tx_receiver, |mut rx| async move { + rx.recv().await.map(|item| (item, rx)) + }) + .boxed() + .ratelimit_stream(&rate_limiter) + .zip(stream::repeat_with(|| { + signer_provider_manager.random_unsigned_provider() + })) + .map(|(bytes, provider)| { + async move { + tokio::time::timeout( + Duration::from_secs(1), + provider.send_raw_transaction(&bytes), + ) + .await } - - let batch_to_send = current_batch.take().expect("batch should exist"); - - // Send current batch while generating next batch in parallel - let send_fut = send_transactions_with_counters( - batch_to_send, - signer_provider_manager.clone(), - self.max_concurrent_requests, - self.tps, - sleep(batch_duration), - tx_counter.clone(), - success_counter.clone(), - failed_counter.clone(), - ); - - if batch_idx + 1 < num_batches { - let remaining_txs = total_txs - ((batch_idx + 1) * batch_size); - let next_batch_size = remaining_txs.min(batch_size); - let batch_input = GenerateTransactionsInput { - total_txs: next_batch_size, - ..gen_input.clone() - }; - // Run send and generate concurrently - let (batch_pending, next) = - tokio::join!(send_fut, generate_transactions(batch_input)); - all_pending_txs.extend(batch_pending); - current_batch = Some(next.context("Failed to generate next batch")?); - } else { - // Last batch - just send - let batch_pending = send_fut.await; - all_pending_txs.extend(batch_pending); + }) + .buffer_unordered(self.max_concurrent_requests) + .filter_map(|result| async { + match result { + Ok(Ok(pending_tx)) => { + tx_counter.fetch_add(1, Ordering::Relaxed); + success_counter.fetch_add(1, Ordering::Relaxed); + Some(pending_tx) + } + Ok(Err(err)) => { + failed_counter.fetch_add(1, Ordering::Relaxed); + debug!(?err, "Failed to send transaction"); + None + } + Err(_) => { + failed_counter.fetch_add(1, Ordering::Relaxed); + debug!("Transaction sending timed out"); + None + } } - } + }) + .take_until(sleep(Duration::from_secs(self.duration))) + .collect::>() + .await; + + // Stop the producer and TPS monitor + producer.abort(); + cancel_token.cancel(); - token.cancel(); info!( success = success_counter_clone.load(Ordering::Relaxed), failed = failed_counter_clone.load(Ordering::Relaxed), - timeout = 0, "Finished sending transactions" ); @@ -699,58 +698,6 @@ async fn send_transactions + 'static>( transactions } -/// Same as `send_transactions` but uses external counters for batch mode. -#[allow(clippy::too_many_arguments)] -async fn send_transactions_with_counters + 'static>( - transactions: Vec>, - signer_provider_manager: SignerProviderManager, - max_concurrent_requests: usize, - tps: u64, - deadline: Sleep, - tx_counter: Arc, - success_counter: Arc, - failed_counter: Arc, -) -> VecDeque> { - // Create a rate limiter - let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(tps as u32).unwrap())); - - stream::iter(transactions) - .ratelimit_stream(&rate_limiter) - .zip(stream::repeat_with(|| { - signer_provider_manager.random_unsigned_provider() - })) - .map(|(bytes, provider)| async move { - tokio::time::timeout( - Duration::from_secs(1), - provider.send_raw_transaction(&bytes), - ) - .await - }) - .buffer_unordered(max_concurrent_requests) - .filter_map(|result| async { - match result { - Ok(Ok(pending_tx)) => { - tx_counter.fetch_add(1, Ordering::Relaxed); - success_counter.fetch_add(1, Ordering::Relaxed); - Some(pending_tx) - } - Ok(Err(err)) => { - failed_counter.fetch_add(1, Ordering::Relaxed); - debug!(?err, "Failed to send transaction"); - None - } - Err(_) => { - failed_counter.fetch_add(1, Ordering::Relaxed); - debug!("Transaction sending timed out"); - None - } - } - }) - .take_until(deadline) - .collect() - .await -} - async fn generate_transactions + 'static>( input: GenerateTransactionsInput, ) -> eyre::Result>> { @@ -945,6 +892,141 @@ async fn generate_transactions + 'static>( Ok(transactions) } +/// Continuously generates, signs, and encodes transactions, sending them to a channel. +/// The channel provides backpressure: when the buffer is full, generation pauses. +/// This keeps `valid_before` fresh since each transaction is signed just before sending. +async fn generate_transactions_to_channel + 'static>( + tx_sender: tokio::sync::mpsc::Sender>, + signer_provider_manager: SignerProviderManager, + input: GenerateTransactionsInput, + expiry_secs: u64, +) -> eyre::Result<()> { + let GenerateTransactionsInput { + tip20_weight, + place_order_weight, + swap_weight, + erc20_weight, + quote_token, + user_tokens, + erc20_tokens, + recipients, + tx_id, + .. + } = input; + + const TX_TYPES: usize = 4; + let tx_weights: [u64; TX_TYPES] = + [tip20_weight, swap_weight, place_order_weight, erc20_weight]; + let gas_estimates: [Arc>; TX_TYPES] = Default::default(); + + loop { + let provider = signer_provider_manager.random_unsigned_provider(); + let signer = signer_provider_manager.random_signer(); + let token = user_tokens.choose(&mut rand::rng()).copied().unwrap(); + + let tx_index = tx_weights + .iter() + .enumerate() + .collect::>() + .choose_weighted(&mut rand::rng(), |(_, weight)| *weight)? + .0; + + let recipient = match &recipients { + Some(addrs) => *addrs.choose(&mut rand::rng()).unwrap(), + None => Address::random(), + }; + + let mut tx = match tx_index { + 0 => { + let token = ITIP20Instance::new(token, provider.clone()); + token.transfer(recipient, U256::ONE).into_transaction_request() + } + 1 => { + let exchange = + IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); + exchange + .quoteSwapExactAmountIn(token, quote_token, 1) + .into_transaction_request() + } + 2 => { + let exchange = + IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); + let tick = rand::random_range(MIN_TICK / TICK_SPACING..=MAX_TICK / TICK_SPACING) + * TICK_SPACING; + exchange + .place(token, MIN_ORDER_AMOUNT, true, tick) + .into_transaction_request() + } + 3 => { + let token_address = erc20_tokens.choose(&mut rand::rng()).copied().unwrap(); + let token = erc20::MockERC20::new(token_address, provider.clone()); + token.transfer(recipient, U256::ONE).into_transaction_request() + } + _ => unreachable!("Only {TX_TYPES} transaction types are supported"), + }; + + tx.inner.set_from(signer.address()); + + let gas = &gas_estimates[tx_index]; + if let Some((max_fee_per_gas, max_priority_fee_per_gas, gas_limit)) = gas.get() { + tx.inner.set_max_fee_per_gas(*max_fee_per_gas); + tx.inner.set_max_priority_fee_per_gas(*max_priority_fee_per_gas); + tx.inner.set_gas_limit(*gas_limit); + } + + let filled = provider.fill(tx).await?; + + if gas.get().is_none() { + let _ = gas.set(match &filled { + SendableTx::Builder(builder) => ( + builder.max_fee_per_gas().ok_or_eyre("max fee per gas should be filled")?, + builder + .max_priority_fee_per_gas() + .ok_or_eyre("max priority fee per gas should be filled")?, + builder.gas_limit().ok_or_eyre("gas limit should be filled")?, + ), + SendableTx::Envelope(envelope) => ( + envelope.max_fee_per_gas(), + envelope + .max_priority_fee_per_gas() + .ok_or_eyre("max priority fee per gas should be filled")?, + envelope.gas_limit(), + ), + }); + } + + let mut req = filled.try_into_request()?; + + // Bump priority fee to ensure unique tx hashes (expiring nonces share nonce=0) + let id = tx_id.fetch_add(1, Ordering::Relaxed) as u128; + if let Some(fee) = req.max_priority_fee_per_gas() { + req.inner.set_max_priority_fee_per_gas(fee + id); + } + if let Some(fee) = req.max_fee_per_gas() { + req.inner.set_max_fee_per_gas(fee + id); + } + + // Set valid_before right before signing to keep it fresh + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + req.set_valid_before(now + expiry_secs); + + // Sign and encode + let mut unsigned = req.build_unsigned()?; + let sig = signer.sign_transaction_sync(unsigned.as_dyn_signable_mut())?; + let bytes = unsigned.into_envelope(sig).encoded_2718(); + + // Send to channel; returns Err if receiver is dropped (consumer stopped) + if tx_sender.send(bytes).await.is_err() { + break; + } + } + + Ok(()) +} + /// Funds accounts from the faucet using `temp_fundAddress` RPC. async fn fund_accounts( provider: &DynProvider, From 5f815d954c725c04ebf2adb881db25da245f1dd8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 20 Feb 2026 18:01:14 +0000 Subject: [PATCH 2/7] unify generate_transactions and generate_transactions_to_channel as a stream --- bin/tempo-bench/src/cmd/max_tps.rs | 490 ++++++----------------------- 1 file changed, 88 insertions(+), 402 deletions(-) diff --git a/bin/tempo-bench/src/cmd/max_tps.rs b/bin/tempo-bench/src/cmd/max_tps.rs index 5586c6dd3b..ef7fbdc28b 100644 --- a/bin/tempo-bench/src/cmd/max_tps.rs +++ b/bin/tempo-bench/src/cmd/max_tps.rs @@ -3,14 +3,12 @@ mod erc20; use alloy_consensus::Transaction; use itertools::Itertools; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; use reth_tracing::{ RethTracer, Tracer, tracing::{debug, error, info}, }; use tempo_alloy::{ - TempoNetwork, fillers::ExpiringNonceFiller, primitives::TempoTxEnvelope, - provider::ext::TempoProviderBuilderExt, + TempoNetwork, fillers::ExpiringNonceFiller, provider::ext::TempoProviderBuilderExt, }; use alloy::{ @@ -30,14 +28,14 @@ use alloy::{ transports::http::reqwest::Url, }; use clap::Parser; -use eyre::{Context, OptionExt, ensure}; +use eyre::{Context, OptionExt}; use futures::{ - FutureExt, StreamExt, TryStreamExt, + FutureExt, Stream, StreamExt, TryStreamExt, future::BoxFuture, stream::{self}, }; use governor::{Quota, RateLimiter, state::StreamRateLimitExt}; -use indicatif::{ParallelProgressIterator, ProgressBar, ProgressIterator}; +use indicatif::{ProgressBar, ProgressIterator}; use rand::{random_range, seq::IndexedRandom}; use rlimit::Resource; use serde::Serialize; @@ -69,7 +67,7 @@ use tempo_precompiles::{ }; use tokio::{ select, - time::{Sleep, interval, sleep}, + time::{interval, sleep}, }; use tokio_util::sync::CancellationToken; @@ -190,7 +188,6 @@ pub struct MaxTpsArgs { /// nonce management. #[arg(long)] use_standard_nonces: bool, - } impl MaxTpsArgs { @@ -284,7 +281,6 @@ impl MaxTpsArgs { self, signer_provider_manager: SignerProviderManager, ) -> eyre::Result<()> { - let accounts = self.accounts.get(); let signer_providers = signer_provider_manager.signer_providers(); if self.clear_txpool { @@ -389,11 +385,14 @@ impl MaxTpsArgs { None }; + let use_expiring_nonces = !self.use_2d_nonces && !self.use_standard_nonces; + let expiry_secs = if use_expiring_nonces { + Some(ExpiringNonceFiller::DEFAULT_EXPIRY_SECS) + } else { + None + }; + let gen_input = GenerateTransactionsInput { - total_txs, - accounts, - signer_provider_manager: signer_provider_manager.clone(), - max_concurrent_requests: self.max_concurrent_requests, tip20_weight, place_order_weight, swap_weight, @@ -402,80 +401,51 @@ impl MaxTpsArgs { user_tokens, erc20_tokens, recipients, - tx_id: Arc::new(AtomicUsize::new(0)), + expiry_secs, }; - // For expiring nonces, we need to generate/sign/send in batches to avoid - // transactions expiring before they're sent. We pipeline batch generation - // with sending to avoid gaps that would cause empty blocks. - let use_expiring_nonces = !self.use_2d_nonces && !self.use_standard_nonces; - let mut pending_txs = if use_expiring_nonces { - let expiry_secs = ExpiringNonceFiller::DEFAULT_EXPIRY_SECS; - let buffer_size = self.max_concurrent_requests; - - info!( - total_txs, - buffer_size, - expiry_secs, - "Generating and sending transactions continuously (expiring nonces)" - ); + info!(total_txs, "Generating and sending transactions"); + + let tx_counter = Arc::new(AtomicUsize::new(0)); + let success_counter = Arc::new(AtomicUsize::new(0)); + let failed_counter = Arc::new(AtomicUsize::new(0)); + let tx_counter_clone = tx_counter.clone(); + let success_counter_clone = success_counter.clone(); + let failed_counter_clone = failed_counter.clone(); + let target_count = total_txs as usize; + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + + // Start TPS monitor + tokio::spawn(async move { + monitor_tps(tx_counter_clone, target_count, cancel_token_clone).await; + }); - let tx_counter = Arc::new(AtomicUsize::new(0)); - let success_counter = Arc::new(AtomicUsize::new(0)); - let failed_counter = Arc::new(AtomicUsize::new(0)); - let tx_counter_clone = tx_counter.clone(); - let success_counter_clone = success_counter.clone(); - let failed_counter_clone = failed_counter.clone(); - let target_count = total_txs as usize; - let cancel_token = CancellationToken::new(); - let cancel_token_clone = cancel_token.clone(); - - // Start TPS monitor - tokio::spawn(async move { - monitor_tps(tx_counter_clone, target_count, cancel_token_clone).await; - }); + let rate_limiter = + RateLimiter::direct(Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap())); - // Channel: producer generates txs continuously, consumer sends them. - // Buffer size = sending concurrency so there's always a tx ready to send. - let (tx_sender, tx_receiver) = - tokio::sync::mpsc::channel::>(buffer_size); - - // Spawn producer: generates, signs, and encodes transactions continuously - let producer_spm = signer_provider_manager.clone(); - let producer = tokio::spawn(async move { - if let Err(e) = generate_transactions_to_channel( - tx_sender, - producer_spm, - gen_input, - expiry_secs, - ) - .await - { - error!(?e, "Transaction producer failed"); + let mut pending_txs = generate_transactions(signer_provider_manager.clone(), gen_input) + .buffer_unordered(self.max_concurrent_requests) + .filter_map(|result| async { + match result { + Ok(bytes) => Some(bytes), + Err(e) => { + debug!(?e, "Transaction generation failed"); + None + } } - }); - - // Consumer: drain channel, rate-limit, send with concurrency - let rate_limiter = RateLimiter::direct( - Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap()), - ); - - let all_pending_txs = stream::unfold(tx_receiver, |mut rx| async move { - rx.recv().await.map(|item| (item, rx)) }) .boxed() .ratelimit_stream(&rate_limiter) .zip(stream::repeat_with(|| { signer_provider_manager.random_unsigned_provider() })) - .map(|(bytes, provider)| { - async move { - tokio::time::timeout( - Duration::from_secs(1), - provider.send_raw_transaction(&bytes), - ) - .await - } + .map(|(bytes, provider)| async move { + tokio::time::timeout( + Duration::from_secs(1), + provider.send_raw_transaction(&bytes), + ) + .await }) .buffer_unordered(self.max_concurrent_requests) .filter_map(|result| async { @@ -501,32 +471,13 @@ impl MaxTpsArgs { .collect::>() .await; - // Stop the producer and TPS monitor - producer.abort(); - cancel_token.cancel(); + cancel_token.cancel(); - info!( - success = success_counter_clone.load(Ordering::Relaxed), - failed = failed_counter_clone.load(Ordering::Relaxed), - "Finished sending transactions" - ); - - all_pending_txs - } else { - let transactions = generate_transactions(gen_input) - .await - .context("Failed to generate transactions")?; - - // Send transactions - send_transactions( - transactions, - signer_provider_manager.clone(), - self.max_concurrent_requests, - self.tps, - sleep(Duration::from_secs(self.duration)), - ) - .await - }; + info!( + success = success_counter_clone.load(Ordering::Relaxed), + failed = failed_counter_clone.load(Ordering::Relaxed), + "Finished sending transactions" + ); let end_block_number = provider.get_block_number().await?; @@ -623,89 +574,12 @@ impl MnemonicArg { } } -/// Awaits pending transactions with up to `tps` per second and `max_concurrent_requests` simultaneous in-flight requests. Stops when `deadline` future resolves. -async fn send_transactions + 'static>( - transactions: Vec>, +/// Returns an infinite stream of futures, each generating, signing, and encoding one transaction. +fn generate_transactions + 'static>( signer_provider_manager: SignerProviderManager, - max_concurrent_requests: usize, - tps: u64, - deadline: Sleep, -) -> VecDeque> { - info!( - transactions = transactions.len(), - max_concurrent_requests, tps, "Sending transactions" - ); - - // Create shared transaction counter and monitoring - let tx_counter = Arc::new(AtomicUsize::new(0)); - - // Spawn monitoring task for TPS reporting - let cancel = CancellationToken::new(); - let _drop_guard = cancel.clone().drop_guard(); - tokio::spawn(monitor_tps( - tx_counter.clone(), - transactions.len(), - cancel.clone(), - )); - - // Create a rate limiter - let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(tps as u32).unwrap())); - - let failed = Arc::new(AtomicUsize::new(0)); - let timeout = Arc::new(AtomicUsize::new(0)); - let transactions = stream::iter(transactions) - .ratelimit_stream(&rate_limiter) - .zip(stream::repeat_with(|| { - signer_provider_manager.random_unsigned_provider() - })) - .map(|(bytes, provider)| async move { - tokio::time::timeout( - Duration::from_secs(1), - provider.send_raw_transaction(&bytes), - ) - .await - }) - .buffer_unordered(max_concurrent_requests) - .filter_map(|result| async { - match result { - Ok(Ok(pending_tx)) => { - tx_counter.fetch_add(1, Ordering::Relaxed); - Some(pending_tx) - } - Ok(Err(err)) => { - failed.fetch_add(1, Ordering::Relaxed); - debug!(?err, "Failed to send transaction"); - None - } - Err(_) => { - timeout.fetch_add(1, Ordering::Relaxed); - debug!("Transaction sending timed out"); - None - } - } - }) - .take_until(deadline) - .collect() - .await; - - info!( - success = tx_counter.load(Ordering::Relaxed), - failed = failed.load(Ordering::Relaxed), - timeout = timeout.load(Ordering::Relaxed), - "Finished sending transactions" - ); - - transactions -} - -async fn generate_transactions + 'static>( - input: GenerateTransactionsInput, -) -> eyre::Result>> { + input: GenerateTransactionsInput, +) -> impl Stream>>> { let GenerateTransactionsInput { - total_txs, - accounts, - signer_provider_manager, - max_concurrent_requests, tip20_weight, place_order_weight, swap_weight, @@ -714,40 +588,26 @@ async fn generate_transactions + 'static>( user_tokens, erc20_tokens, recipients, - tx_id, + expiry_secs, } = input; - let txs_per_sender = total_txs / accounts; - ensure!( - txs_per_sender > 0, - "txs per sender is 0, increase tps or decrease senders" - ); - - info!(transactions = total_txs, "Generating transactions"); - const TX_TYPES: usize = 4; - // Weights for random sampling for each transaction type let tx_weights: [u64; TX_TYPES] = [tip20_weight, swap_weight, place_order_weight, erc20_weight]; - // Cached gas estimates for each transaction type let gas_estimates: [Arc>; TX_TYPES] = Default::default(); - // Counters for number of transactions of each type - let tip20_transfers = Arc::new(AtomicUsize::new(0)); - let swaps = Arc::new(AtomicUsize::new(0)); - let orders = Arc::new(AtomicUsize::new(0)); - let erc20_transfers = Arc::new(AtomicUsize::new(0)); - // `tx_id` is shared across batches via `GenerateTransactionsInput` to ensure - // unique tx hashes across all batches when using expiring nonces. - - let builders = ProgressBar::new(total_txs) - .wrap_stream(stream::iter( - std::iter::repeat_with(|| signer_provider_manager.random_unsigned_provider()) - .take(total_txs as usize), - )) - .map(async |provider| { + stream::repeat_with(move || { + let signer_provider_manager = signer_provider_manager.clone(); + let gas_estimates = gas_estimates.clone(); + let tx_id = Arc::new(AtomicUsize::new(0)); + let recipients = recipients.clone(); + let user_tokens = user_tokens.clone(); + let erc20_tokens = erc20_tokens.clone(); + + async move { + let provider = signer_provider_manager.random_unsigned_provider(); + let signer = signer_provider_manager.random_signer(); let token = user_tokens.choose(&mut rand::rng()).copied().unwrap(); - // TODO: can be improved with an enum per transaction type let tx_index = tx_weights .iter() .enumerate() @@ -762,30 +622,21 @@ async fn generate_transactions + 'static>( let mut tx = match tx_index { 0 => { - tip20_transfers.fetch_add(1, Ordering::Relaxed); let token = ITIP20Instance::new(token, provider.clone()); - - // Transfer minimum possible amount token .transfer(recipient, U256::ONE) .into_transaction_request() } 1 => { - swaps.fetch_add(1, Ordering::Relaxed); let exchange = IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); - - // Swap minimum possible amount exchange .quoteSwapExactAmountIn(token, quote_token, 1) .into_transaction_request() } 2 => { - orders.fetch_add(1, Ordering::Relaxed); let exchange = IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); - - // Place an order at a random tick that's a multiple of `TICK_SPACING` let tick = rand::random_range(MIN_TICK / TICK_SPACING..=MAX_TICK / TICK_SPACING) * TICK_SPACING; @@ -794,11 +645,8 @@ async fn generate_transactions + 'static>( .into_transaction_request() } 3 => { - erc20_transfers.fetch_add(1, Ordering::Relaxed); let token_address = erc20_tokens.choose(&mut rand::rng()).copied().unwrap(); let token = erc20::MockERC20::new(token_address, provider.clone()); - - // Transfer minimum possible amount token .transfer(recipient, U256::ONE) .into_transaction_request() @@ -806,13 +654,9 @@ async fn generate_transactions + 'static>( _ => unreachable!("Only {TX_TYPES} transaction types are supported"), }; - // Get a random signer and set it as the sender of the transaction. - let signer = signer_provider_manager.random_signer(); tx.inner.set_from(signer.address()); let gas = &gas_estimates[tx_index]; - // If we already filled the gas fields once for that transaction type, use it. - // This will skip the gas filler. if let Some((max_fee_per_gas, max_priority_fee_per_gas, gas_limit)) = gas.get() { tx.inner.set_max_fee_per_gas(*max_fee_per_gas); tx.inner @@ -820,15 +664,10 @@ async fn generate_transactions + 'static>( tx.inner.set_gas_limit(*gas_limit); } - // Fill the rest of transaction. In case we already filled the gas fields, - // it will only fill the chain ID and nonce that are efficiently cached inside - // the fillers. - let tx = provider.fill(tx).await?; + let filled = provider.fill(tx).await?; - // If we never filled the gas fields for that transaction type, cache the estimated - // gas. if gas.get().is_none() { - let _ = gas.set(match &tx { + let _ = gas.set(match &filled { SendableTx::Builder(builder) => ( builder .max_fee_per_gas() @@ -850,10 +689,9 @@ async fn generate_transactions + 'static>( }); } - let mut req = tx.try_into_request()?; + let mut req = filled.try_into_request()?; - // Bump priority fee by a unique counter to ensure unique tx hashes - // when using expiring nonces (which share nonce=0). + // Bump priority fee to ensure unique tx hashes (expiring nonces share nonce=0) let id = tx_id.fetch_add(1, Ordering::Relaxed) as u128; if let Some(fee) = req.max_priority_fee_per_gas() { req.inner.set_max_priority_fee_per_gas(fee + id); @@ -862,169 +700,21 @@ async fn generate_transactions + 'static>( req.inner.set_max_fee_per_gas(fee + id); } - eyre::Ok((req, signer)) - }) - .buffer_unordered(max_concurrent_requests) - .try_collect::>() - .await?; - info!( - transactions = builders.len(), - tip20_transfers = tip20_transfers.load(Ordering::Relaxed), - swaps = swaps.load(Ordering::Relaxed), - orders = orders.load(Ordering::Relaxed), - erc20_transfers = erc20_transfers.load(Ordering::Relaxed), - "Generated transactions", - ); - - info!(transactions = builders.len(), "Signing transactions"); - // Sign transactions in parallel using signers directly, so it doesn't require async - let transactions = builders - .into_par_iter() - .progress() - .map(|(tx, signer)| -> eyre::Result { - let mut tx = tx.build_unsigned()?; - let sig = signer.sign_transaction_sync(tx.as_dyn_signable_mut())?; - Ok(tx.into_envelope(sig)) - }) - .map(|result| result.map(|tx| tx.encoded_2718())) - .collect::>>()?; - - Ok(transactions) -} - -/// Continuously generates, signs, and encodes transactions, sending them to a channel. -/// The channel provides backpressure: when the buffer is full, generation pauses. -/// This keeps `valid_before` fresh since each transaction is signed just before sending. -async fn generate_transactions_to_channel + 'static>( - tx_sender: tokio::sync::mpsc::Sender>, - signer_provider_manager: SignerProviderManager, - input: GenerateTransactionsInput, - expiry_secs: u64, -) -> eyre::Result<()> { - let GenerateTransactionsInput { - tip20_weight, - place_order_weight, - swap_weight, - erc20_weight, - quote_token, - user_tokens, - erc20_tokens, - recipients, - tx_id, - .. - } = input; - - const TX_TYPES: usize = 4; - let tx_weights: [u64; TX_TYPES] = - [tip20_weight, swap_weight, place_order_weight, erc20_weight]; - let gas_estimates: [Arc>; TX_TYPES] = Default::default(); - - loop { - let provider = signer_provider_manager.random_unsigned_provider(); - let signer = signer_provider_manager.random_signer(); - let token = user_tokens.choose(&mut rand::rng()).copied().unwrap(); - - let tx_index = tx_weights - .iter() - .enumerate() - .collect::>() - .choose_weighted(&mut rand::rng(), |(_, weight)| *weight)? - .0; - - let recipient = match &recipients { - Some(addrs) => *addrs.choose(&mut rand::rng()).unwrap(), - None => Address::random(), - }; - - let mut tx = match tx_index { - 0 => { - let token = ITIP20Instance::new(token, provider.clone()); - token.transfer(recipient, U256::ONE).into_transaction_request() - } - 1 => { - let exchange = - IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); - exchange - .quoteSwapExactAmountIn(token, quote_token, 1) - .into_transaction_request() - } - 2 => { - let exchange = - IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); - let tick = rand::random_range(MIN_TICK / TICK_SPACING..=MAX_TICK / TICK_SPACING) - * TICK_SPACING; - exchange - .place(token, MIN_ORDER_AMOUNT, true, tick) - .into_transaction_request() - } - 3 => { - let token_address = erc20_tokens.choose(&mut rand::rng()).copied().unwrap(); - let token = erc20::MockERC20::new(token_address, provider.clone()); - token.transfer(recipient, U256::ONE).into_transaction_request() + // Set valid_before right before signing to keep it fresh (expiring nonces only) + if let Some(expiry_secs) = expiry_secs { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + req.set_valid_before(now + expiry_secs); } - _ => unreachable!("Only {TX_TYPES} transaction types are supported"), - }; - - tx.inner.set_from(signer.address()); - let gas = &gas_estimates[tx_index]; - if let Some((max_fee_per_gas, max_priority_fee_per_gas, gas_limit)) = gas.get() { - tx.inner.set_max_fee_per_gas(*max_fee_per_gas); - tx.inner.set_max_priority_fee_per_gas(*max_priority_fee_per_gas); - tx.inner.set_gas_limit(*gas_limit); + // Sign and encode + let mut unsigned = req.build_unsigned()?; + let sig = signer.sign_transaction_sync(unsigned.as_dyn_signable_mut())?; + eyre::Ok(unsigned.into_envelope(sig).encoded_2718()) } - - let filled = provider.fill(tx).await?; - - if gas.get().is_none() { - let _ = gas.set(match &filled { - SendableTx::Builder(builder) => ( - builder.max_fee_per_gas().ok_or_eyre("max fee per gas should be filled")?, - builder - .max_priority_fee_per_gas() - .ok_or_eyre("max priority fee per gas should be filled")?, - builder.gas_limit().ok_or_eyre("gas limit should be filled")?, - ), - SendableTx::Envelope(envelope) => ( - envelope.max_fee_per_gas(), - envelope - .max_priority_fee_per_gas() - .ok_or_eyre("max priority fee per gas should be filled")?, - envelope.gas_limit(), - ), - }); - } - - let mut req = filled.try_into_request()?; - - // Bump priority fee to ensure unique tx hashes (expiring nonces share nonce=0) - let id = tx_id.fetch_add(1, Ordering::Relaxed) as u128; - if let Some(fee) = req.max_priority_fee_per_gas() { - req.inner.set_max_priority_fee_per_gas(fee + id); - } - if let Some(fee) = req.max_fee_per_gas() { - req.inner.set_max_fee_per_gas(fee + id); - } - - // Set valid_before right before signing to keep it fresh - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0); - req.set_valid_before(now + expiry_secs); - - // Sign and encode - let mut unsigned = req.build_unsigned()?; - let sig = signer.sign_transaction_sync(unsigned.as_dyn_signable_mut())?; - let bytes = unsigned.into_envelope(sig).encoded_2718(); - - // Send to channel; returns Err if receiver is dropped (consumer stopped) - if tx_sender.send(bytes).await.is_err() { - break; - } - } - - Ok(()) + }) } /// Funds accounts from the faucet using `temp_fundAddress` RPC. @@ -1278,11 +968,7 @@ async fn assert_receipt(receipt: R) -> eyre::Result<()> { } #[derive(Clone)] -struct GenerateTransactionsInput> { - total_txs: u64, - accounts: u64, - signer_provider_manager: SignerProviderManager, - max_concurrent_requests: usize, +struct GenerateTransactionsInput { tip20_weight: u64, place_order_weight: u64, swap_weight: u64, @@ -1292,7 +978,7 @@ struct GenerateTransactionsInput> { erc20_tokens: Vec
, /// When set, transfers go to these existing addresses instead of `Address::random()`. recipients: Option>, - /// Shared counter used to bump priority fee, ensuring unique tx hashes across batches - /// when using expiring nonces (which share nonce=0). - tx_id: Arc, + /// When `Some`, sets `valid_before` on each transaction right before signing + /// to keep it fresh for expiring nonces. + expiry_secs: Option, } From 4d2c1265a656efc7650643c97aba067960ca8461 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 20 Feb 2026 18:08:27 +0000 Subject: [PATCH 3/7] feat(bench): restore comments and add transaction type counters to generate_transactions Pass TransactionCounters struct into generate_transactions to track per-type counts (tip20_transfers, swaps, orders, erc20_transfers) and log them after the stream is consumed. Restore descriptive comments from the pre-refactor code. Co-Authored-By: Claude Opus 4.6 --- bin/tempo-bench/src/cmd/max_tps.rs | 169 ++++++++++++++++++----------- 1 file changed, 108 insertions(+), 61 deletions(-) diff --git a/bin/tempo-bench/src/cmd/max_tps.rs b/bin/tempo-bench/src/cmd/max_tps.rs index ef7fbdc28b..3a62874478 100644 --- a/bin/tempo-bench/src/cmd/max_tps.rs +++ b/bin/tempo-bench/src/cmd/max_tps.rs @@ -406,6 +406,7 @@ impl MaxTpsArgs { info!(total_txs, "Generating and sending transactions"); + let tx_counters = TransactionCounters::default(); let tx_counter = Arc::new(AtomicUsize::new(0)); let success_counter = Arc::new(AtomicUsize::new(0)); let failed_counter = Arc::new(AtomicUsize::new(0)); @@ -424,55 +425,67 @@ impl MaxTpsArgs { let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap())); - let mut pending_txs = generate_transactions(signer_provider_manager.clone(), gen_input) - .buffer_unordered(self.max_concurrent_requests) - .filter_map(|result| async { - match result { - Ok(bytes) => Some(bytes), - Err(e) => { - debug!(?e, "Transaction generation failed"); - None - } + let mut pending_txs = generate_transactions( + signer_provider_manager.clone(), + gen_input, + tx_counters.clone(), + ) + .buffer_unordered(self.max_concurrent_requests) + .filter_map(|result| async { + match result { + Ok(bytes) => Some(bytes), + Err(e) => { + debug!(?e, "Transaction generation failed"); + None } - }) - .boxed() - .ratelimit_stream(&rate_limiter) - .zip(stream::repeat_with(|| { - signer_provider_manager.random_unsigned_provider() - })) - .map(|(bytes, provider)| async move { - tokio::time::timeout( - Duration::from_secs(1), - provider.send_raw_transaction(&bytes), - ) - .await - }) - .buffer_unordered(self.max_concurrent_requests) - .filter_map(|result| async { - match result { - Ok(Ok(pending_tx)) => { - tx_counter.fetch_add(1, Ordering::Relaxed); - success_counter.fetch_add(1, Ordering::Relaxed); - Some(pending_tx) - } - Ok(Err(err)) => { - failed_counter.fetch_add(1, Ordering::Relaxed); - debug!(?err, "Failed to send transaction"); - None - } - Err(_) => { - failed_counter.fetch_add(1, Ordering::Relaxed); - debug!("Transaction sending timed out"); - None - } + } + }) + .boxed() + .ratelimit_stream(&rate_limiter) + .zip(stream::repeat_with(|| { + signer_provider_manager.random_unsigned_provider() + })) + .map(|(bytes, provider)| async move { + tokio::time::timeout( + Duration::from_secs(1), + provider.send_raw_transaction(&bytes), + ) + .await + }) + .buffer_unordered(self.max_concurrent_requests) + .filter_map(|result| async { + match result { + Ok(Ok(pending_tx)) => { + tx_counter.fetch_add(1, Ordering::Relaxed); + success_counter.fetch_add(1, Ordering::Relaxed); + Some(pending_tx) } - }) - .take_until(sleep(Duration::from_secs(self.duration))) - .collect::>() - .await; + Ok(Err(err)) => { + failed_counter.fetch_add(1, Ordering::Relaxed); + debug!(?err, "Failed to send transaction"); + None + } + Err(_) => { + failed_counter.fetch_add(1, Ordering::Relaxed); + debug!("Transaction sending timed out"); + None + } + } + }) + .take_until(sleep(Duration::from_secs(self.duration))) + .collect::>() + .await; cancel_token.cancel(); + info!( + tip20_transfers = tx_counters.tip20_transfers.load(Ordering::Relaxed), + swaps = tx_counters.swaps.load(Ordering::Relaxed), + orders = tx_counters.orders.load(Ordering::Relaxed), + erc20_transfers = tx_counters.erc20_transfers.load(Ordering::Relaxed), + "Generated transactions", + ); + info!( success = success_counter_clone.load(Ordering::Relaxed), failed = failed_counter_clone.load(Ordering::Relaxed), @@ -574,10 +587,35 @@ impl MnemonicArg { } } +#[derive(Clone, Default)] +struct TransactionCounters { + tip20_transfers: Arc, + swaps: Arc, + orders: Arc, + erc20_transfers: Arc, +} + +#[derive(Clone)] +struct GenerateTransactionsInput { + tip20_weight: u64, + place_order_weight: u64, + swap_weight: u64, + erc20_weight: u64, + quote_token: Address, + user_tokens: Vec
, + erc20_tokens: Vec
, + /// When set, transfers go to these existing addresses instead of `Address::random()`. + recipients: Option>, + /// When `Some`, sets `valid_before` on each transaction right before signing + /// to keep it fresh for expiring nonces. + expiry_secs: Option, +} + /// Returns an infinite stream of futures, each generating, signing, and encoding one transaction. fn generate_transactions + 'static>( signer_provider_manager: SignerProviderManager, input: GenerateTransactionsInput, + counters: TransactionCounters, ) -> impl Stream>>> { let GenerateTransactionsInput { tip20_weight, @@ -592,7 +630,9 @@ fn generate_transactions + 'static>( } = input; const TX_TYPES: usize = 4; + // Weights for random sampling for each transaction type let tx_weights: [u64; TX_TYPES] = [tip20_weight, swap_weight, place_order_weight, erc20_weight]; + // Cached gas estimates for each transaction type let gas_estimates: [Arc>; TX_TYPES] = Default::default(); stream::repeat_with(move || { @@ -602,12 +642,14 @@ fn generate_transactions + 'static>( let recipients = recipients.clone(); let user_tokens = user_tokens.clone(); let erc20_tokens = erc20_tokens.clone(); + let counters = counters.clone(); async move { let provider = signer_provider_manager.random_unsigned_provider(); let signer = signer_provider_manager.random_signer(); let token = user_tokens.choose(&mut rand::rng()).copied().unwrap(); + // TODO: can be improved with an enum per transaction type let tx_index = tx_weights .iter() .enumerate() @@ -622,21 +664,30 @@ fn generate_transactions + 'static>( let mut tx = match tx_index { 0 => { + counters.tip20_transfers.fetch_add(1, Ordering::Relaxed); let token = ITIP20Instance::new(token, provider.clone()); + + // Transfer minimum possible amount token .transfer(recipient, U256::ONE) .into_transaction_request() } 1 => { + counters.swaps.fetch_add(1, Ordering::Relaxed); let exchange = IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); + + // Swap minimum possible amount exchange .quoteSwapExactAmountIn(token, quote_token, 1) .into_transaction_request() } 2 => { + counters.orders.fetch_add(1, Ordering::Relaxed); let exchange = IStablecoinDEXInstance::new(STABLECOIN_DEX_ADDRESS, provider.clone()); + + // Place an order at a random tick that's a multiple of `TICK_SPACING` let tick = rand::random_range(MIN_TICK / TICK_SPACING..=MAX_TICK / TICK_SPACING) * TICK_SPACING; @@ -645,8 +696,11 @@ fn generate_transactions + 'static>( .into_transaction_request() } 3 => { + counters.erc20_transfers.fetch_add(1, Ordering::Relaxed); let token_address = erc20_tokens.choose(&mut rand::rng()).copied().unwrap(); let token = erc20::MockERC20::new(token_address, provider.clone()); + + // Transfer minimum possible amount token .transfer(recipient, U256::ONE) .into_transaction_request() @@ -654,9 +708,12 @@ fn generate_transactions + 'static>( _ => unreachable!("Only {TX_TYPES} transaction types are supported"), }; + // Get a random signer and set it as the sender of the transaction. tx.inner.set_from(signer.address()); let gas = &gas_estimates[tx_index]; + // If we already filled the gas fields once for that transaction type, use it. + // This will skip the gas filler. if let Some((max_fee_per_gas, max_priority_fee_per_gas, gas_limit)) = gas.get() { tx.inner.set_max_fee_per_gas(*max_fee_per_gas); tx.inner @@ -664,8 +721,13 @@ fn generate_transactions + 'static>( tx.inner.set_gas_limit(*gas_limit); } + // Fill the rest of transaction. In case we already filled the gas fields, + // it will only fill the chain ID and nonce that are efficiently cached inside + // the fillers. let filled = provider.fill(tx).await?; + // If we never filled the gas fields for that transaction type, cache the estimated + // gas. if gas.get().is_none() { let _ = gas.set(match &filled { SendableTx::Builder(builder) => ( @@ -691,7 +753,8 @@ fn generate_transactions + 'static>( let mut req = filled.try_into_request()?; - // Bump priority fee to ensure unique tx hashes (expiring nonces share nonce=0) + // Bump priority fee by a unique counter to ensure unique tx hashes + // when using expiring nonces (which share nonce=0). let id = tx_id.fetch_add(1, Ordering::Relaxed) as u128; if let Some(fee) = req.max_priority_fee_per_gas() { req.inner.set_max_priority_fee_per_gas(fee + id); @@ -966,19 +1029,3 @@ async fn assert_receipt(receipt: R) -> eyre::Result<()> { ); Ok(()) } - -#[derive(Clone)] -struct GenerateTransactionsInput { - tip20_weight: u64, - place_order_weight: u64, - swap_weight: u64, - erc20_weight: u64, - quote_token: Address, - user_tokens: Vec
, - erc20_tokens: Vec
, - /// When set, transfers go to these existing addresses instead of `Address::random()`. - recipients: Option>, - /// When `Some`, sets `valid_before` on each transaction right before signing - /// to keep it fresh for expiring nonces. - expiry_secs: Option, -} From b2d3fab9c3a18cbe7518f47923ce19658cf27666 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Sat, 21 Feb 2026 00:03:56 +0000 Subject: [PATCH 4/7] fix(bench): share tx_id counter across stream iterations to prevent duplicate tx hashes The tx_id counter was being created inside the repeat_with closure, resetting to 0 on every iteration. This caused all transactions to get the same priority fee bump (id=0), producing duplicate tx hashes and "already known" / "tx hash already seen" rejections. Co-Authored-By: Claude Opus 4.6 --- bin/tempo-bench/src/cmd/max_tps.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bin/tempo-bench/src/cmd/max_tps.rs b/bin/tempo-bench/src/cmd/max_tps.rs index 3a62874478..9442084b68 100644 --- a/bin/tempo-bench/src/cmd/max_tps.rs +++ b/bin/tempo-bench/src/cmd/max_tps.rs @@ -634,11 +634,14 @@ fn generate_transactions + 'static>( let tx_weights: [u64; TX_TYPES] = [tip20_weight, swap_weight, place_order_weight, erc20_weight]; // Cached gas estimates for each transaction type let gas_estimates: [Arc>; TX_TYPES] = Default::default(); + // Global tx counter used to bump priority fee, ensuring unique tx hashes + // when using expiring nonces (which share nonce=0). + let tx_id = Arc::new(AtomicUsize::new(0)); stream::repeat_with(move || { let signer_provider_manager = signer_provider_manager.clone(); let gas_estimates = gas_estimates.clone(); - let tx_id = Arc::new(AtomicUsize::new(0)); + let tx_id = tx_id.clone(); let recipients = recipients.clone(); let user_tokens = user_tokens.clone(); let erc20_tokens = erc20_tokens.clone(); From 9482605e196cbad47e9559c5f2c3536bd3e577d3 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 23 Feb 2026 10:59:48 +0000 Subject: [PATCH 5/7] refactor(bench): unify counters into TransactionCounters struct Fold the loose tx_counter, success_counter, and failed_counter into the existing TransactionCounters struct as sent/success/failed fields. Update monitor_tps to take &TransactionCounters instead of Arc. Co-Authored-By: Claude Opus 4.6 --- bin/tempo-bench/src/cmd/max_tps.rs | 82 +++++++++++++++++------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/bin/tempo-bench/src/cmd/max_tps.rs b/bin/tempo-bench/src/cmd/max_tps.rs index 9442084b68..2e140df0e7 100644 --- a/bin/tempo-bench/src/cmd/max_tps.rs +++ b/bin/tempo-bench/src/cmd/max_tps.rs @@ -406,21 +406,18 @@ impl MaxTpsArgs { info!(total_txs, "Generating and sending transactions"); - let tx_counters = TransactionCounters::default(); - let tx_counter = Arc::new(AtomicUsize::new(0)); - let success_counter = Arc::new(AtomicUsize::new(0)); - let failed_counter = Arc::new(AtomicUsize::new(0)); - let tx_counter_clone = tx_counter.clone(); - let success_counter_clone = success_counter.clone(); - let failed_counter_clone = failed_counter.clone(); + let counters = TransactionCounters::default(); let target_count = total_txs as usize; let cancel_token = CancellationToken::new(); - let cancel_token_clone = cancel_token.clone(); // Start TPS monitor - tokio::spawn(async move { - monitor_tps(tx_counter_clone, target_count, cancel_token_clone).await; - }); + { + let counters = counters.clone(); + let cancel_token = cancel_token.clone(); + tokio::spawn(async move { + monitor_tps(&counters, target_count, cancel_token).await; + }); + } let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap())); @@ -428,7 +425,7 @@ impl MaxTpsArgs { let mut pending_txs = generate_transactions( signer_provider_manager.clone(), gen_input, - tx_counters.clone(), + counters.clone(), ) .buffer_unordered(self.max_concurrent_requests) .filter_map(|result| async { @@ -453,22 +450,30 @@ impl MaxTpsArgs { .await }) .buffer_unordered(self.max_concurrent_requests) - .filter_map(|result| async { - match result { - Ok(Ok(pending_tx)) => { - tx_counter.fetch_add(1, Ordering::Relaxed); - success_counter.fetch_add(1, Ordering::Relaxed); - Some(pending_tx) - } - Ok(Err(err)) => { - failed_counter.fetch_add(1, Ordering::Relaxed); - debug!(?err, "Failed to send transaction"); - None - } - Err(_) => { - failed_counter.fetch_add(1, Ordering::Relaxed); - debug!("Transaction sending timed out"); - None + .filter_map({ + let counters = counters.clone(); + move |result| { + let counters = counters.clone(); + async move { + match result { + Ok(Ok(pending_tx)) => { + counters.sent.fetch_add(1, Ordering::Relaxed); + counters.success.fetch_add(1, Ordering::Relaxed); + Some(pending_tx) + } + Ok(Err(err)) => { + counters.sent.fetch_add(1, Ordering::Relaxed); + counters.failed.fetch_add(1, Ordering::Relaxed); + debug!(?err, "Failed to send transaction"); + None + } + Err(_) => { + counters.sent.fetch_add(1, Ordering::Relaxed); + counters.failed.fetch_add(1, Ordering::Relaxed); + debug!("Transaction sending timed out"); + None + } + } } } }) @@ -479,16 +484,16 @@ impl MaxTpsArgs { cancel_token.cancel(); info!( - tip20_transfers = tx_counters.tip20_transfers.load(Ordering::Relaxed), - swaps = tx_counters.swaps.load(Ordering::Relaxed), - orders = tx_counters.orders.load(Ordering::Relaxed), - erc20_transfers = tx_counters.erc20_transfers.load(Ordering::Relaxed), + tip20_transfers = counters.tip20_transfers.load(Ordering::Relaxed), + swaps = counters.swaps.load(Ordering::Relaxed), + orders = counters.orders.load(Ordering::Relaxed), + erc20_transfers = counters.erc20_transfers.load(Ordering::Relaxed), "Generated transactions", ); info!( - success = success_counter_clone.load(Ordering::Relaxed), - failed = failed_counter_clone.load(Ordering::Relaxed), + success = counters.success.load(Ordering::Relaxed), + failed = counters.failed.load(Ordering::Relaxed), "Finished sending transactions" ); @@ -589,10 +594,15 @@ impl MnemonicArg { #[derive(Clone, Default)] struct TransactionCounters { + /// Per-type generation counters tip20_transfers: Arc, swaps: Arc, orders: Arc, erc20_transfers: Arc, + /// Sending counters + sent: Arc, + success: Arc, + failed: Arc, } #[derive(Clone)] @@ -960,14 +970,14 @@ pub async fn generate_report( Ok(()) } -async fn monitor_tps(tx_counter: Arc, target_count: usize, token: CancellationToken) { +async fn monitor_tps(counters: &TransactionCounters, target_count: usize, token: CancellationToken) { let mut last_count = 0; let mut ticker = interval(Duration::from_secs(1)); loop { select! { _ = ticker.tick() => { - let current_count = tx_counter.load(Ordering::Relaxed); + let current_count = counters.sent.load(Ordering::Relaxed); let tps = current_count - last_count; last_count = current_count; From 44bc4bb8beebabcfe50084d0fa8c21aa1cc25ca4 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 23 Feb 2026 11:02:59 +0000 Subject: [PATCH 6/7] refactor(bench): pass TransactionCounters by value to monitor_tps Co-Authored-By: Claude Opus 4.6 --- bin/tempo-bench/src/cmd/max_tps.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/bin/tempo-bench/src/cmd/max_tps.rs b/bin/tempo-bench/src/cmd/max_tps.rs index 2e140df0e7..99e84e3b17 100644 --- a/bin/tempo-bench/src/cmd/max_tps.rs +++ b/bin/tempo-bench/src/cmd/max_tps.rs @@ -411,13 +411,11 @@ impl MaxTpsArgs { let cancel_token = CancellationToken::new(); // Start TPS monitor - { - let counters = counters.clone(); - let cancel_token = cancel_token.clone(); - tokio::spawn(async move { - monitor_tps(&counters, target_count, cancel_token).await; - }); - } + tokio::spawn(monitor_tps( + counters.clone(), + target_count, + cancel_token.clone(), + )); let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap())); @@ -970,7 +968,7 @@ pub async fn generate_report( Ok(()) } -async fn monitor_tps(counters: &TransactionCounters, target_count: usize, token: CancellationToken) { +async fn monitor_tps(counters: TransactionCounters, target_count: usize, token: CancellationToken) { let mut last_count = 0; let mut ticker = interval(Duration::from_secs(1)); From 3dcca334c182062ca790a86cc74d971d62c6f546 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 23 Feb 2026 16:48:54 +0000 Subject: [PATCH 7/7] style(bench): apply cargo fmt formatting Co-Authored-By: Claude Opus 4.6 --- bin/tempo-bench/src/cmd/max_tps.rs | 103 ++++++++++++++--------------- 1 file changed, 50 insertions(+), 53 deletions(-) diff --git a/bin/tempo-bench/src/cmd/max_tps.rs b/bin/tempo-bench/src/cmd/max_tps.rs index 99e84e3b17..eb9838bc95 100644 --- a/bin/tempo-bench/src/cmd/max_tps.rs +++ b/bin/tempo-bench/src/cmd/max_tps.rs @@ -420,64 +420,61 @@ impl MaxTpsArgs { let rate_limiter = RateLimiter::direct(Quota::per_second(NonZeroU32::new(self.tps as u32).unwrap())); - let mut pending_txs = generate_transactions( - signer_provider_manager.clone(), - gen_input, - counters.clone(), - ) - .buffer_unordered(self.max_concurrent_requests) - .filter_map(|result| async { - match result { - Ok(bytes) => Some(bytes), - Err(e) => { - debug!(?e, "Transaction generation failed"); - None - } - } - }) - .boxed() - .ratelimit_stream(&rate_limiter) - .zip(stream::repeat_with(|| { - signer_provider_manager.random_unsigned_provider() - })) - .map(|(bytes, provider)| async move { - tokio::time::timeout( - Duration::from_secs(1), - provider.send_raw_transaction(&bytes), - ) - .await - }) - .buffer_unordered(self.max_concurrent_requests) - .filter_map({ - let counters = counters.clone(); - move |result| { - let counters = counters.clone(); - async move { + let mut pending_txs = + generate_transactions(signer_provider_manager.clone(), gen_input, counters.clone()) + .buffer_unordered(self.max_concurrent_requests) + .filter_map(|result| async { match result { - Ok(Ok(pending_tx)) => { - counters.sent.fetch_add(1, Ordering::Relaxed); - counters.success.fetch_add(1, Ordering::Relaxed); - Some(pending_tx) - } - Ok(Err(err)) => { - counters.sent.fetch_add(1, Ordering::Relaxed); - counters.failed.fetch_add(1, Ordering::Relaxed); - debug!(?err, "Failed to send transaction"); + Ok(bytes) => Some(bytes), + Err(e) => { + debug!(?e, "Transaction generation failed"); None } - Err(_) => { - counters.sent.fetch_add(1, Ordering::Relaxed); - counters.failed.fetch_add(1, Ordering::Relaxed); - debug!("Transaction sending timed out"); - None + } + }) + .boxed() + .ratelimit_stream(&rate_limiter) + .zip(stream::repeat_with(|| { + signer_provider_manager.random_unsigned_provider() + })) + .map(|(bytes, provider)| async move { + tokio::time::timeout( + Duration::from_secs(1), + provider.send_raw_transaction(&bytes), + ) + .await + }) + .buffer_unordered(self.max_concurrent_requests) + .filter_map({ + let counters = counters.clone(); + move |result| { + let counters = counters.clone(); + async move { + match result { + Ok(Ok(pending_tx)) => { + counters.sent.fetch_add(1, Ordering::Relaxed); + counters.success.fetch_add(1, Ordering::Relaxed); + Some(pending_tx) + } + Ok(Err(err)) => { + counters.sent.fetch_add(1, Ordering::Relaxed); + counters.failed.fetch_add(1, Ordering::Relaxed); + debug!(?err, "Failed to send transaction"); + None + } + Err(_) => { + counters.sent.fetch_add(1, Ordering::Relaxed); + counters.failed.fetch_add(1, Ordering::Relaxed); + debug!("Transaction sending timed out"); + None + } + } } } - } - } - }) - .take_until(sleep(Duration::from_secs(self.duration))) - .collect::>() - .await; + }) + .take_until(sleep(Duration::from_secs(self.duration))) + .collect::>() + .await; cancel_token.cancel();