From e9c331ce0bf3dd69d6811f1d5c6e25a4b3e1c5c6 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Tue, 30 Dec 2025 14:06:39 +0800 Subject: [PATCH 01/10] fix --- src/actors/monitor/mod.rs | 3 +++ src/actors/monitor/monitor_actor.rs | 9 +++++++++ src/actors/monitor/txn_tracker.rs | 29 +++++++++++++++++++++++++-- src/actors/producer/producer_actor.rs | 5 +++++ 4 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/actors/monitor/mod.rs b/src/actors/monitor/mod.rs index 5fd8f9d..caa0160 100644 --- a/src/actors/monitor/mod.rs +++ b/src/actors/monitor/mod.rs @@ -71,6 +71,7 @@ pub struct PlanCompleted { pub plan_id: PlanId, } + #[derive(Message)] #[rtype(result = "()")] pub struct PlanFailed { @@ -78,6 +79,8 @@ pub struct PlanFailed { pub reason: String, } + + /// Message to retry a timed-out transaction #[derive(Message, Clone)] #[rtype(result = "()")] diff --git a/src/actors/monitor/monitor_actor.rs b/src/actors/monitor/monitor_actor.rs index 4171765..006e305 100644 --- a/src/actors/monitor/monitor_actor.rs +++ b/src/actors/monitor/monitor_actor.rs @@ -251,3 +251,12 @@ impl Handler for Monitor { .update_producer_stats(msg.ready_accounts, msg.sending_txns); } } + +impl Handler for Monitor { + type Result = (); + + fn handle(&mut self, msg: PlanFailed, _ctx: &mut Self::Context) { + tracing::warn!("Plan {} failed: {}", msg.plan_id, msg.reason); + self.txn_tracker.mark_plan_failed(msg.plan_id); + } +} diff --git a/src/actors/monitor/txn_tracker.rs b/src/actors/monitor/txn_tracker.rs index 800fc00..a8ff766 100644 --- a/src/actors/monitor/txn_tracker.rs +++ b/src/actors/monitor/txn_tracker.rs @@ -56,6 +56,7 @@ pub struct TxnTracker { total_resolved_transactions: u64, total_failed_submissions: u64, total_failed_executions: u64, + total_failed_production_plans: u64, last_completed_plan: Option<(PlanId, PlanTracker)>, producer_ready_accounts: u64, producer_sending_txns: u64, @@ -80,6 +81,8 @@ struct PlanTracker { failed_executions: u64, plan_produced: bool, + /// Indicates if the plan failed during production (e.g. consumer error) + plan_failed_production: bool, plan_name: String, } @@ -164,6 +167,7 @@ impl TxnTracker { total_resolved_transactions: 0, total_failed_submissions: 0, total_failed_executions: 0, + total_failed_production_plans: 0, last_completed_plan: None, producer_ready_accounts: 0, producer_sending_txns: 0, @@ -228,12 +232,23 @@ impl TxnTracker { consumed_transactions: 0, failed_submissions: 0, failed_executions: 0, + plan_produced: false, + plan_failed_production: false, plan_name, }; self.plan_trackers.insert(plan_id, tracker); } + pub fn mark_plan_failed(&mut self, plan_id: PlanId) { + if let Some(tracker) = self.plan_trackers.get_mut(&plan_id) { + tracker.plan_failed_production = true; + // Also mark as produced so it doesn't count as "Not Produced" (pending) + tracker.plan_produced = true; + self.total_failed_production_plans += 1; + } + } + /// Handle transaction submission result pub fn handle_submission_result(&mut self, msg: &UpdateSubmissionResult) { let plan_id = &msg.metadata.plan_id; @@ -631,7 +646,17 @@ impl TxnTracker { let mut timed_out_txns = 0u64; for (_plan_id, tracker) in &self.plan_trackers { - if tracker.plan_produced { + if tracker.plan_failed_production { + // Counted separately or as completed failed? + // Let's count it as completed (failed) for the "Completed Plans" metric if we consider it "Done" + // But for clarity, let's keep it separate or part of "Not Produced" logic? + // The requirement is to explain "Not Produced". + // If we set plan_produced=true in mark_plan_failed, it lands here. + + // If failed production, it contributes to "Prod Failures" but we should decide if it's "Completed". + // Since it will never produce more txns, it is effectively completed (failed). + completed_plans += 1; + } else if tracker.plan_produced { produced_plans += 1; if tracker.resolved_transactions as usize >= tracker.produce_transactions { completed_plans += 1; @@ -696,7 +721,7 @@ impl TxnTracker { Cell::new("Produced Plans"), Cell::new(&format_large_number(produced_plans)), Cell::new("Not Produced"), - Cell::new(&format_large_number(not_produced_plans)), + Cell::new(&format!("{}/{}F", format_large_number(not_produced_plans), format_large_number(self.total_failed_production_plans))), ]); // Row 5: Completed plans and in progress plans diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index bdb92e6..c08116b 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -202,6 +202,11 @@ impl Producer { plan.name(), e ); + + monitor_addr.do_send(PlanFailed { + plan_id: plan_id.clone(), + reason: format!("Consumer send error: {}", e), + }); return Err(anyhow::anyhow!( "Failed to send transaction to Consumer: {}", e From a66e5f4e03e50f311375d6a3e5bf13ba02917d81 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Wed, 31 Dec 2025 10:48:39 +0800 Subject: [PATCH 02/10] feat: Implement mempool-driven nonce correction by analyzing txpool content and add O(1) account ID lookup. --- src/actors/monitor/mempool_tracker.rs | 92 +++++++++++++++++++++++++-- src/actors/monitor/mod.rs | 14 ++++ src/actors/monitor/monitor_actor.rs | 32 +++++++++- src/actors/producer/producer_actor.rs | 40 +++++++++++- src/eth/eth_cli.rs | 29 +++++++++ src/eth/mod.rs | 1 + src/util/gen_account.rs | 24 +++++-- 7 files changed, 218 insertions(+), 14 deletions(-) diff --git a/src/actors/monitor/mempool_tracker.rs b/src/actors/monitor/mempool_tracker.rs index 5f81671..71c9566 100644 --- a/src/actors/monitor/mempool_tracker.rs +++ b/src/actors/monitor/mempool_tracker.rs @@ -4,16 +4,41 @@ use actix::Addr; use crate::{ actors::{PauseProducer, Producer, ResumeProducer}, - eth::{EthHttpCli, MempoolStatus}, + eth::{EthHttpCli, MempoolStatus, TxPoolContent}, }; +use super::NonceCorrectionInfo; + +/// Action to take after analyzing mempool status +#[derive(Debug)] +pub enum MempoolAction { + /// No action needed + None, + /// Pause producer due to mempool being full + Pause, + /// Resume producer + Resume, + /// Need to fetch txpool_content for nonce correction + NeedsNonceCorrection, +} + pub(crate) struct MempoolTracker { max_pool_size: usize, + /// Threshold for queued/pending ratio to trigger nonce correction + queued_ratio_threshold: f64, + /// Cooldown to avoid frequent txpool_content calls + last_correction_check: std::time::Instant, + correction_cooldown: std::time::Duration, } impl MempoolTracker { pub fn new(max_pool_size: usize) -> Self { - Self { max_pool_size } + Self { + max_pool_size, + queued_ratio_threshold: 1.0, + last_correction_check: std::time::Instant::now(), + correction_cooldown: std::time::Duration::from_secs(30), + } } pub async fn get_pool_status( @@ -28,22 +53,77 @@ impl MempoolTracker { } pub fn process_pool_status( - &self, + &mut self, status: Vec>, producer_addr: &Addr, - ) -> Result<(u64, u64), anyhow::Error> { - let _ = producer_addr; + ) -> Result<(u64, u64, MempoolAction), anyhow::Error> { let mut total_pending = 0; let mut total_queued = 0; for status in status.into_iter().flatten() { total_pending += status.pending; total_queued += status.queued; } + + let mut action = MempoolAction::None; + + // Check for mempool size limits if total_pending + total_queued > self.max_pool_size { producer_addr.do_send(PauseProducer); + action = MempoolAction::Pause; } else if total_pending + total_queued < self.max_pool_size / 2 { producer_addr.do_send(ResumeProducer); + action = MempoolAction::Resume; + } + + // Check for high queued ratio (indicates nonce gaps) + if total_pending > 0 { + let ratio = total_queued as f64 / total_pending as f64; + if ratio > self.queued_ratio_threshold { + // Check cooldown + if self.last_correction_check.elapsed() > self.correction_cooldown { + self.last_correction_check = std::time::Instant::now(); + tracing::warn!( + "High queued/pending ratio detected: {:.2} (queued={}, pending={}), triggering nonce correction", + ratio, + total_queued, + total_pending + ); + action = MempoolAction::NeedsNonceCorrection; + } + } } - Ok((total_pending as u64, total_queued as u64)) + + Ok((total_pending as u64, total_queued as u64, action)) + } + + /// Extract accounts with nonce gaps from txpool_content + /// For each account in queued, the minimum nonce is what they're waiting for + pub fn extract_nonce_corrections(content: &TxPoolContent) -> Vec { + let mut corrections = Vec::new(); + + for (address, nonces) in &content.queued { + // Find the minimum nonce in queued for this address + if let Some(min_nonce) = nonces.keys().filter_map(|s| s.parse::().ok()).min() { + // Check if this account has pending transactions + let has_pending = content.pending.contains_key(address); + + if !has_pending && min_nonce > 0 { + // Account has queued txns but no pending, meaning nonce gap exists + // The expected nonce should be min_nonce (the one they're waiting for) + corrections.push(NonceCorrectionInfo { + account: *address, + expected_nonce: min_nonce, + }); + } + } + } + + tracing::info!( + "Extracted {} nonce corrections from txpool_content", + corrections.len() + ); + + corrections } } + diff --git a/src/actors/monitor/mod.rs b/src/actors/monitor/mod.rs index caa0160..05d8e37 100644 --- a/src/actors/monitor/mod.rs +++ b/src/actors/monitor/mod.rs @@ -89,4 +89,18 @@ pub struct RetryTxn { pub metadata: Arc, } +/// Information for correcting account nonce +#[derive(Debug, Clone)] +pub struct NonceCorrectionInfo { + pub account: Address, + pub expected_nonce: u64, +} + +/// Message to correct nonces based on txpool_content analysis +#[derive(Message)] +#[rtype(result = "()")] +pub struct CorrectNonces { + pub corrections: Vec, +} + pub use monitor_actor::Monitor; diff --git a/src/actors/monitor/monitor_actor.rs b/src/actors/monitor/monitor_actor.rs index 006e305..0ab842e 100644 --- a/src/actors/monitor/monitor_actor.rs +++ b/src/actors/monitor/monitor_actor.rs @@ -13,9 +13,10 @@ use crate::eth::EthHttpCli; use crate::txn_plan::PlanId; use super::txn_tracker::{BackpressureAction, PlanStatus, TxnTracker}; +use super::mempool_tracker::MempoolAction; use super::{ PlanCompleted, PlanFailed, RegisterConsumer, RegisterPlan, RegisterProducer, - ReportProducerStats, RetryTxn, Tick, UpdateSubmissionResult, + ReportProducerStats, RetryTxn, Tick, UpdateSubmissionResult, CorrectNonces, }; use crate::actors::{PauseProducer, ResumeProducer}; @@ -92,11 +93,36 @@ impl Actor for Monitor { ctx.spawn( async move { MempoolTracker::get_pool_status(&client_clone).await } .into_actor(act) - .map(|res, act, _ctx| { + .map(|res, act, ctx| { if let Some(producer_addr) = &act.producer_addr { match act.mempool_tracker.process_pool_status(res, producer_addr) { - Ok((pending, queued)) => { + Ok((pending, queued, action)) => { act.txn_tracker.update_mempool_stats(pending, queued); + + // Handle nonce correction if needed + if matches!(action, MempoolAction::NeedsNonceCorrection) { + let clients = act.clients.clone(); + let producer = producer_addr.clone(); + ctx.spawn( + async move { + // Get the first client to fetch txpool_content + if let Some(client) = clients.values().next() { + match client.get_txpool_content().await { + Ok(content) => { + let corrections = MempoolTracker::extract_nonce_corrections(&content); + if !corrections.is_empty() { + producer.do_send(super::CorrectNonces { corrections }); + } + } + Err(e) => { + error!("Failed to get txpool_content: {}", e); + } + } + } + } + .into_actor(act), + ); + } } Err(e) => { error!("Failed to process pool status: {}", e); diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index c08116b..ad711f0 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -9,7 +9,7 @@ use crate::actors::consumer::Consumer; use crate::actors::monitor::monitor_actor::{PlanProduced, ProduceTxns}; use crate::actors::monitor::{ Monitor, PlanCompleted, PlanFailed, RegisterPlan, RegisterProducer, ReportProducerStats, - SubmissionResult, UpdateSubmissionResult, + SubmissionResult, UpdateSubmissionResult, CorrectNonces, }; use crate::actors::{ExeFrontPlan, PauseProducer, ResumeProducer}; use crate::txn_plan::{addr_pool::AddressPool, PlanExecutionMode, PlanId, TxnPlan}; @@ -510,3 +510,41 @@ impl Handler for Producer { self.trigger_next_plan_if_needed(ctx); } } + +/// Handler for correcting nonces based on txpool_content analysis +impl Handler for Producer { + type Result = (); + + fn handle(&mut self, msg: CorrectNonces, _ctx: &mut Self::Context) { + tracing::info!( + "Received {} nonce corrections from txpool_content analysis", + msg.corrections.len() + ); + + for correction in msg.corrections { + // Find the account_id from the address + if let Some(account_id) = self.account_generator.find_account_id_by_address(&correction.account) { + tracing::debug!( + "Correcting nonce for account {:?} to {}", + correction.account, + correction.expected_nonce + ); + // Update nonce cache + self.nonce_cache.insert(account_id, correction.expected_nonce as u32); + // Unlock the account with the correct nonce + self.address_pool.unlock_correct_nonce(account_id, correction.expected_nonce as u32); + } else { + tracing::warn!( + "Could not find account_id for address {:?}", + correction.account + ); + } + } + + // Update ready accounts count + self.stats.ready_accounts.store( + self.address_pool.ready_len() as u64, + Ordering::Relaxed, + ); + } +} diff --git a/src/eth/eth_cli.rs b/src/eth/eth_cli.rs index 8fa681c..b836119 100644 --- a/src/eth/eth_cli.rs +++ b/src/eth/eth_cli.rs @@ -63,6 +63,15 @@ where } } +/// Response from txpool_content RPC call +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct TxPoolContent { + #[serde(default)] + pub pending: HashMap>, + #[serde(default)] + pub queued: HashMap>, +} + /// Ethereum transaction sender, providing reliable communication with nodes #[derive(Clone)] pub struct EthHttpCli { @@ -491,4 +500,24 @@ impl EthHttpCli { self.retry_with_backoff(|| async { self.inner[0].get_account(address).await }) .await } + + /// Get full txpool content (WARNING: can be large, use sparingly) + /// This is used for nonce correction when queued/pending ratio is too high + pub async fn get_txpool_content(&self) -> Result { + let start = Instant::now(); + + let result = self + .retry_with_backoff(|| async { + let result: TxPoolContent = self.inner[0] + .raw_request::<(), TxPoolContent>("txpool_content".into(), ()) + .await?; + Ok(result) + }) + .await; + + self.update_metrics("txpool_content", result.is_ok(), start.elapsed()) + .await; + + result.with_context(|| "Failed to get txpool content") + } } diff --git a/src/eth/mod.rs b/src/eth/mod.rs index de6e47f..6022917 100644 --- a/src/eth/mod.rs +++ b/src/eth/mod.rs @@ -4,3 +4,4 @@ pub use txn_builder::*; pub use eth_cli::EthHttpCli; pub use eth_cli::MempoolStatus; +pub use eth_cli::TxPoolContent; diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index 9c5382f..8b653e8 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -61,6 +61,7 @@ impl AccountSignerCache { pub struct AccountGenerator { accout_signers: AccountSignerCache, accout_addresses: Vec
, + address_to_id: HashMap, faucet_accout: PrivateKeySigner, faucet_accout_id: AccountId, init_nonces: Vec>, @@ -73,6 +74,7 @@ impl AccountGenerator { Self { accout_signers: AccountSignerCache::new(CACHE_SIZE), accout_addresses: Vec::new(), + address_to_id: HashMap::new(), faucet_accout, faucet_accout_id: AccountId(u32::MAX), init_nonces: Vec::new(), @@ -82,6 +84,7 @@ impl AccountGenerator { pub fn to_manager(mut self) -> AccountManager { self.accout_addresses.shrink_to_fit(); self.init_nonces.shrink_to_fit(); + self.address_to_id.shrink_to_fit(); Arc::new(self) } @@ -130,11 +133,13 @@ impl AccountGenerator { let res = self.gen_deterministic_accounts(begin_index, end_index); self.accout_addresses.reserve_exact(res.len()); self.init_nonces.reserve(res.len()); - self.accout_addresses - .extend(res.iter().map(|signer| signer.address())); + self.address_to_id.reserve(res.len()); for (i, signer) in res.iter().enumerate() { - self.accout_signers - .save_signer(signer.clone(), AccountId(i as u32)); + let addr = signer.address(); + let account_id = AccountId((begin_index + i as u64) as u32); + self.accout_addresses.push(addr); + self.address_to_id.insert(addr, account_id); + self.accout_signers.save_signer(signer.clone(), AccountId(i as u32)); } self.init_nonces .extend((0..size).map(|_| Arc::new(AtomicU64::new(0)))); @@ -167,6 +172,17 @@ impl AccountGenerator { } } +impl AccountGenerator { + /// Find account ID by address using O(1) hashmap lookup + pub fn find_account_id_by_address(&self, address: &Address) -> Option { + // Check if it's the faucet account + if *address == self.faucet_accout.address() { + return Some(self.faucet_accout_id); + } + self.address_to_id.get(address).copied() + } +} + #[cfg(test)] mod tests { use super::*; From ee06ebe2d243827a8e93737c1e61f04ae0ca8f75 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Wed, 31 Dec 2025 11:39:11 +0800 Subject: [PATCH 03/10] disable save --- src/actors/consumer/actor.rs | 2 +- src/actors/monitor/txn_tracker.rs | 12 +++++------ src/eth/eth_cli.rs | 35 +++++++++++++------------------ src/main.rs | 4 ++-- 4 files changed, 23 insertions(+), 30 deletions(-) diff --git a/src/actors/consumer/actor.rs b/src/actors/consumer/actor.rs index 37ccea0..fc71e82 100644 --- a/src/actors/consumer/actor.rs +++ b/src/actors/consumer/actor.rs @@ -293,7 +293,7 @@ impl Consumer { .provider(&url) .await .unwrap() - .get_txn_count(metadata.from_account.as_ref().clone()) + .get_pending_txn_count(metadata.from_account.as_ref().clone()) .await { // If on-chain nonce is greater than our attempted nonce, our transaction is indeed outdated diff --git a/src/actors/monitor/txn_tracker.rs b/src/actors/monitor/txn_tracker.rs index a8ff766..df36dc0 100644 --- a/src/actors/monitor/txn_tracker.rs +++ b/src/actors/monitor/txn_tracker.rs @@ -353,7 +353,7 @@ impl TxnTracker { impl std::future::Future< Output = ( PendingTxInfo, - Result, + Result, Result, anyhow::Error>, ), >, @@ -394,7 +394,7 @@ impl TxnTracker { let task = async move { let result = client.get_transaction_receipt(task_info.tx_hash).await; let account = client - .get_account(*task_info.metadata.from_account.as_ref()) + .get_latest_txn_count(task_info.metadata.from_account.as_ref()) .await; tracing::debug!( "checked tx_hash={:?} result={:?}", @@ -416,7 +416,7 @@ impl TxnTracker { &mut self, results: Vec<( PendingTxInfo, - Result, + Result, Result, anyhow::Error>, )>, ) -> Vec { @@ -425,7 +425,7 @@ impl TxnTracker { let mut retry_queue = Vec::new(); // 1. Categorize results - for (info, account, result) in results { + for (info, account_nonce, result) in results { match result { Ok(Some(receipt)) => { // Transaction successfully confirmed @@ -434,8 +434,8 @@ impl TxnTracker { } Ok(None) => { // Transaction still pending - if let Ok(account) = account { - if account.nonce > info.metadata.nonce { + if let Ok(account_nonce) = account_nonce { + if account_nonce > info.metadata.nonce { successful_txns.push((info, true)); } } else { diff --git a/src/eth/eth_cli.rs b/src/eth/eth_cli.rs index b836119..da12c83 100644 --- a/src/eth/eth_cli.rs +++ b/src/eth/eth_cli.rs @@ -162,9 +162,9 @@ impl EthHttpCli { self.chain_id } - pub async fn get_txn_count(&self, address: Address) -> Result { + pub async fn get_pending_txn_count(&self, address: Address) -> Result { tokio::time::timeout(Duration::from_secs(10), async { - let nonce = self.inner[0].get_transaction_count(address).await?; + let nonce = self.inner[0].get_transaction_count(address).pending().await?; Ok(nonce) }) .await? @@ -188,22 +188,6 @@ impl EthHttpCli { result.with_context(|| "Failed to verify connection to Ethereum node") } - /// Get account transaction count (nonce) - #[allow(unused)] - pub async fn get_transaction_count(&self, address: Address) -> Result { - let start = Instant::now(); - - let result = self - .retry_with_backoff(|| async { self.inner[0].get_transaction_count(address).await }) - .await; - - self.update_metrics("eth_getTransactionCount", result.is_ok(), start.elapsed()) - .await; - - result - .with_context(|| format!("Failed to get transaction count for address: {:?}", address)) - } - /// Get account balance #[allow(unused)] pub async fn get_balance(&self, address: &Address) -> Result { @@ -496,11 +480,20 @@ impl EthHttpCli { result.with_context(|| format!("Failed to get transaction receipt for hash: {:?}", tx_hash)) } - pub async fn get_account(&self, address: Address) -> Result { - self.retry_with_backoff(|| async { self.inner[0].get_account(address).await }) - .await + pub async fn get_latest_txn_count(&self, address: &Address) -> Result { + tokio::time::timeout(Duration::from_secs(10), async { + let nonce = self.inner[0].get_transaction_count(*address).latest().await?; + Ok(nonce) + }) + .await? } + + // pub async fn get_account(&self, address: Address) -> Result { + // self.retry_with_backoff(|| async { self.inner[0].get_account(address).await }) + // .await + // } + /// Get full txpool content (WARNING: can be large, use sparingly) /// This is used for nonce correction when queued/pending ratio is too high pub async fn get_txpool_content(&self) -> Result { diff --git a/src/main.rs b/src/main.rs index 764f6b5..defc0c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -250,7 +250,7 @@ async fn get_init_nonce_map( let faucet_address = faucet_signer.address(); init_nonce_map.insert( faucet_address, - eth_client.get_txn_count(faucet_address).await.unwrap(), + eth_client.get_pending_txn_count(faucet_address).await.unwrap(), ); Arc::new(init_nonce_map) } @@ -535,7 +535,7 @@ async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc Date: Thu, 1 Jan 2026 16:48:25 +0800 Subject: [PATCH 04/10] fix --- src/util/gen_account.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index 8b653e8..1fdc02a 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -139,7 +139,7 @@ impl AccountGenerator { let account_id = AccountId((begin_index + i as u64) as u32); self.accout_addresses.push(addr); self.address_to_id.insert(addr, account_id); - self.accout_signers.save_signer(signer.clone(), AccountId(i as u32)); + self.accout_signers.save_signer(signer.clone(), account_id); } self.init_nonces .extend((0..size).map(|_| Arc::new(AtomicU64::new(0)))); From e83c34fdf669d6121eec405030e4419a0e2d9882 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 1 Jan 2026 22:07:40 +0800 Subject: [PATCH 05/10] fmt --- bench_config.template | 6 +- src/actors/monitor/mempool_tracker.rs | 53 +++++++------ src/actors/monitor/monitor_actor.rs | 56 +++++++++++-- src/actors/monitor/txn_tracker.rs | 110 +++++++++++++++++++------- src/config/bench_config.rs | 33 ++++++++ src/eth/eth_cli.rs | 2 +- src/main.rs | 1 + src/txn_plan/faucet_plan.rs | 2 +- 8 files changed, 202 insertions(+), 61 deletions(-) diff --git a/bench_config.template b/bench_config.template index 22aa587..7493f73 100644 --- a/bench_config.template +++ b/bench_config.template @@ -35,4 +35,8 @@ num_senders = 1500 # Maximum capacity of the transaction pool inside Consumer max_pool_size = 100000 # Duration of the benchmark in seconds. Set to 0 to run indefinitely. -duration_secs = 60 \ No newline at end of file +# Duration of the benchmark in seconds. Set to 0 to run indefinitely. +duration_secs = 60 +# Sampling strategy: integer (transaction count) or "full" (check all pending) +# Default is 10 +sampling = 20 \ No newline at end of file diff --git a/src/actors/monitor/mempool_tracker.rs b/src/actors/monitor/mempool_tracker.rs index 71c9566..13414f5 100644 --- a/src/actors/monitor/mempool_tracker.rs +++ b/src/actors/monitor/mempool_tracker.rs @@ -7,7 +7,7 @@ use crate::{ eth::{EthHttpCli, MempoolStatus, TxPoolContent}, }; -use super::NonceCorrectionInfo; + /// Action to take after analyzing mempool status #[derive(Debug)] @@ -75,10 +75,17 @@ impl MempoolTracker { action = MempoolAction::Resume; } - // Check for high queued ratio (indicates nonce gaps) - if total_pending > 0 { - let ratio = total_queued as f64 / total_pending as f64; - if ratio > self.queued_ratio_threshold { + // Check for high queued transactions (indicates nonce gaps) + if total_queued > 0 { + // Calculate ratio, handling division by zero if pending is 0 + let ratio = if total_pending > 0 { + total_queued as f64 / total_pending as f64 + } else { + f64::INFINITY + }; + + // Trigger if ratio is high OR if we have significant queued transactions with low pending + if ratio > self.queued_ratio_threshold || (total_queued > 100 && total_pending < 10) { // Check cooldown if self.last_correction_check.elapsed() > self.correction_cooldown { self.last_correction_check = std::time::Instant::now(); @@ -96,34 +103,32 @@ impl MempoolTracker { Ok((total_pending as u64, total_queued as u64, action)) } - /// Extract accounts with nonce gaps from txpool_content - /// For each account in queued, the minimum nonce is what they're waiting for - pub fn extract_nonce_corrections(content: &TxPoolContent) -> Vec { - let mut corrections = Vec::new(); + /// Identify accounts with nonce gaps from txpool_content + /// Returns list of addresses that need correction + pub fn identify_problematic_accounts(content: &TxPoolContent) -> Vec { + let mut problematic_accounts = Vec::new(); for (address, nonces) in &content.queued { - // Find the minimum nonce in queued for this address - if let Some(min_nonce) = nonces.keys().filter_map(|s| s.parse::().ok()).min() { - // Check if this account has pending transactions - let has_pending = content.pending.contains_key(address); - - if !has_pending && min_nonce > 0 { - // Account has queued txns but no pending, meaning nonce gap exists - // The expected nonce should be min_nonce (the one they're waiting for) - corrections.push(NonceCorrectionInfo { - account: *address, - expected_nonce: min_nonce, - }); + // Identify accounts that have queued transactions. + // Presence in 'queued' implies a gap or issue. + // We check if they also have pending transactions. + // If they have NO pending transactions but HAVE queued, strictly implies a gap. + let has_pending = content.pending.contains_key(address); + + // Check if there are any valid nonces in queued + if nonces.keys().any(|s| s.parse::().is_ok()) { + if !has_pending { + problematic_accounts.push(*address); } } } tracing::info!( - "Extracted {} nonce corrections from txpool_content", - corrections.len() + "Identified {} accounts with likely nonce gaps", + problematic_accounts.len() ); - corrections + problematic_accounts } } diff --git a/src/actors/monitor/monitor_actor.rs b/src/actors/monitor/monitor_actor.rs index 0ab842e..42b41d8 100644 --- a/src/actors/monitor/monitor_actor.rs +++ b/src/actors/monitor/monitor_actor.rs @@ -20,6 +20,8 @@ use super::{ }; use crate::actors::{PauseProducer, ResumeProducer}; +use crate::config::SamplingPolicy; + #[derive(Message)] #[rtype(result = "()")] struct LogStats; @@ -37,14 +39,16 @@ pub struct Monitor { } impl Monitor { + pub fn new_with_clients( clients: Vec>, max_pool_size: usize, + sampling_policy: SamplingPolicy, ) -> Self { Self { producer_addr: None, consumer_addr: None, - txn_tracker: TxnTracker::new(clients.clone()), + txn_tracker: TxnTracker::new(clients.clone(), sampling_policy), mempool_tracker: MempoolTracker::new(max_pool_size), clients: Arc::new( clients @@ -109,9 +113,26 @@ impl Actor for Monitor { if let Some(client) = clients.values().next() { match client.get_txpool_content().await { Ok(content) => { - let corrections = MempoolTracker::extract_nonce_corrections(&content); - if !corrections.is_empty() { - producer.do_send(super::CorrectNonces { corrections }); + let accounts = MempoolTracker::identify_problematic_accounts(&content); + if !accounts.is_empty() { + let mut corrections = Vec::new(); + for account in accounts { + match client.get_pending_txn_count(account).await { + Ok(nonce) => { + corrections.push(super::NonceCorrectionInfo { + account, + expected_nonce: nonce, + }); + } + Err(e) => { + error!("Failed to get nonce for account {:?}: {}", account, e); + } + } + } + if !corrections.is_empty() { + info!("Sending {} nonce corrections to producer", corrections.len()); + producer.do_send(CorrectNonces { corrections }); + } } } Err(e) => { @@ -173,7 +194,32 @@ impl Handler for Monitor { if let Some(producer_addr) = &self.producer_addr { producer_addr.do_send(msg.clone()); } - self.txn_tracker.handle_submission_result(&msg); + + match msg.result.as_ref() { + crate::actors::monitor::SubmissionResult::ErrorWithRetry => { + // If the transaction failed submission, retry it endlessly to prevent nonce gaps + // and premature plan completion. Do NOT tell TxnTracker about the failure yet. + tracing::warn!( + "Transaction failed submission (ErrorWithRetry). Retrying via Consumer. plan_id={}, tx_hash={:?}", + msg.metadata.plan_id, + msg.metadata.txn_id + ); + + if let Some(consumer) = &self.consumer_addr { + consumer.do_send(RetryTxn { + signed_bytes: msg.signed_bytes.clone(), + metadata: msg.metadata.clone(), + }); + } else { + tracing::error!("Cannot retry transaction, no consumer address: {:?}", msg.metadata.txn_id); + // Fallback to tracker if no consumer (will mark as failed) + self.txn_tracker.handle_submission_result(&msg); + } + } + _ => { + self.txn_tracker.handle_submission_result(&msg); + } + } } } diff --git a/src/actors/monitor/txn_tracker.rs b/src/actors/monitor/txn_tracker.rs index df36dc0..42236b4 100644 --- a/src/actors/monitor/txn_tracker.rs +++ b/src/actors/monitor/txn_tracker.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; -use alloy::consensus::Account; + use alloy::primitives::TxHash; use comfy_table::{presets::UTF8_FULL, Cell, Table}; use tracing::{debug, error, warn}; @@ -12,9 +12,11 @@ use crate::actors::monitor::SubmissionResult; use crate::eth::EthHttpCli; use crate::txn_plan::{PlanId, TxnMetadata}; +use crate::config::SamplingPolicy; + use super::UpdateSubmissionResult; -const SAMPLING_SIZE: usize = 10; // Define sampling size + const TXN_TIMEOUT: Duration = Duration::from_secs(600); // 10 minutes timeout const TPS_WINDOW: Duration = Duration::from_secs(17); @@ -64,6 +66,10 @@ pub struct TxnTracker { mempool_queued: u64, /// Track if producer was paused due to pending txn limit producer_paused_by_pending: bool, + /// Transactions currently being checked (to prevent overlapping checks) + inflight_checks: HashSet, + /// Sampling configuration + sampling_policy: SamplingPolicy, } /// Tracking status of a single transaction plan @@ -150,7 +156,8 @@ pub struct RetryTxnInfo { impl TxnTracker { /// Create new transaction tracker - pub fn new(clients: Vec>) -> Self { + /// Create new transaction tracker + pub fn new(clients: Vec>, sampling_policy: SamplingPolicy) -> Self { let mut client_map = HashMap::new(); for client in clients { let rpc_url = client.rpc().as_ref().clone(); @@ -174,6 +181,8 @@ impl TxnTracker { mempool_pending: 0, mempool_queued: 0, producer_paused_by_pending: false, + inflight_checks: HashSet::new(), + sampling_policy, } } @@ -223,21 +232,32 @@ impl TxnTracker { } } - /// Register new plan (no changes) + /// Register new plan (or update existing one if retried) pub fn register_plan(&mut self, plan_id: PlanId, plan_name: String) { - debug!("Plan registered: plan_id={}", plan_id); - let tracker = PlanTracker { - produce_transactions: 0, - resolved_transactions: 0, - consumed_transactions: 0, - failed_submissions: 0, - failed_executions: 0, - - plan_produced: false, - plan_failed_production: false, - plan_name, - }; - self.plan_trackers.insert(plan_id, tracker); + if let Some(tracker) = self.plan_trackers.get_mut(&plan_id) { + debug!( + "Plan already registered (likely retry): plan_id={}. Resetting flags.", + plan_id + ); + // Result of retry logic: we are producing more transactions for this plan. + // Reset flags to keep plan open until the new attempt finishes. + tracker.plan_produced = false; + tracker.plan_failed_production = false; + } else { + debug!("Plan registered: plan_id={}", plan_id); + let tracker = PlanTracker { + produce_transactions: 0, + resolved_transactions: 0, + consumed_transactions: 0, + failed_submissions: 0, + failed_executions: 0, + + plan_produced: false, + plan_failed_production: false, + plan_name, + }; + self.plan_trackers.insert(plan_id, tracker); + } } pub fn mark_plan_failed(&mut self, plan_id: PlanId) { @@ -253,7 +273,7 @@ impl TxnTracker { pub fn handle_submission_result(&mut self, msg: &UpdateSubmissionResult) { let plan_id = &msg.metadata.plan_id; if !self.plan_trackers.contains_key(plan_id) { - warn!("Plan not found: plan_id={}", plan_id); + warn!("Plan not found: plan_id={}, tx_hash={:?}, result={:?}", plan_id, msg.result, msg.result.as_ref()); return; } let plan_tracker = self.plan_trackers.get_mut(plan_id).unwrap(); @@ -305,6 +325,7 @@ impl TxnTracker { tracker.resolved_transactions += 1; tracker.failed_submissions += 1; self.total_failed_submissions += 1; + warn!("Incrementing failed_submissions for plan {}: resolved={}, failed={}", plan_id, tracker.resolved_transactions, tracker.failed_submissions); self.resolved_txn_timestamps.push_back(Instant::now()); self.total_resolved_transactions += 1; } @@ -336,6 +357,14 @@ impl TxnTracker { } if let PlanStatus::Completed = status { if let Some(completed_tracker) = self.plan_trackers.remove(plan_id) { + warn!("Removing completed plan {}: produced={}, resolved={}, consumed={}, failed_sub={}, failed_exec={}", + plan_id, + completed_tracker.produce_transactions, + completed_tracker.resolved_transactions, + completed_tracker.consumed_transactions, + completed_tracker.failed_submissions, + completed_tracker.failed_executions + ); self.last_completed_plan = Some((plan_id.clone(), completed_tracker)); } } @@ -367,25 +396,40 @@ impl TxnTracker { let mut tasks = Vec::new(); // --- Core sampling logic --- - if total_pending <= SAMPLING_SIZE { - // If total is less than sampling size, check all - samples.extend(self.pending_txns.iter().cloned()); + // Filter out transactions that are already being checked + let candidates: Vec<_> = self.pending_txns + .iter() + .filter(|info| !self.inflight_checks.contains(&info.tx_hash)) + .cloned() + .collect(); + + if candidates.is_empty() { + return Vec::new(); + } + + if self.sampling_policy.is_full() || candidates.len() <= self.sampling_policy.size() { + // Check all candidates + samples.extend(candidates); } else { - // Select samples at fixed intervals in the queue - // For example, with 1000 txns and 10 samples, take one every 100 - let step = total_pending / SAMPLING_SIZE; - for i in 0..SAMPLING_SIZE { + // Select samples at fixed intervals from candidates + let total_candidates = candidates.len(); + let step = total_candidates / self.sampling_policy.size(); + for i in 0..self.sampling_policy.size() { let index = i * step; - if let Some(txn_info) = self.pending_txns.iter().nth(index) { + if let Some(txn_info) = candidates.get(index) { samples.insert(txn_info.clone()); } } - // Always include the oldest one as it's most critical - if let Some(oldest) = self.pending_txns.iter().next() { + // Always include the oldest one + if let Some(oldest) = candidates.first() { samples.insert(oldest.clone()); } } + for pending_info in &samples { + self.inflight_checks.insert(pending_info.tx_hash); + } + for pending_info in samples { if let Some(client) = self.clients.get(&pending_info.rpc_url) { let client = client.clone(); @@ -426,6 +470,9 @@ impl TxnTracker { // 1. Categorize results for (info, account_nonce, result) in results { + // Always remove from inflight_checks as we have a result (or error) + self.inflight_checks.remove(&info.tx_hash); + match result { Ok(Some(receipt)) => { // Transaction successfully confirmed @@ -466,7 +513,8 @@ impl TxnTracker { .collect::>(); // 2. If there are successful transactions, calculate median time and clean up - if !successful_txns.is_empty() { + // Only use heuristic batch cleaning in Partial mode. In Full mode, we track every txn explicitly. + if !self.sampling_policy.is_full() && !successful_txns.is_empty() { // Create a temporary TxnInfo for BTreeSet split_off // TxHash is not important as sorting is mainly based on time let split_info = successful_txns[successful_txns.len() - 1].0.clone(); @@ -553,6 +601,9 @@ impl TxnTracker { } for info in to_process { + // Also remove from inflight checks if we are retrying them (removing from pending) + // This covers cases where we retry transactions that might be currently inflight + self.inflight_checks.remove(&info.tx_hash); self.pending_txns.remove(&info); if info.submit_time.elapsed() > TXN_TIMEOUT { @@ -584,6 +635,7 @@ impl TxnTracker { "Transaction completely timed out: plan_id={}, tx_hash={:?}", info.metadata.plan_id, info.tx_hash ); + self.inflight_checks.remove(&info.tx_hash); // Clean up inflight if removing self.pending_txns.remove(&info); if let Some(plan_tracker) = self.plan_trackers.get_mut(&info.metadata.plan_id) { plan_tracker.resolved_transactions += 1; diff --git a/src/config/bench_config.rs b/src/config/bench_config.rs index 65d5f99..b08c5b9 100644 --- a/src/config/bench_config.rs +++ b/src/config/bench_config.rs @@ -73,8 +73,41 @@ pub struct PerformanceConfig { pub max_pool_size: usize, /// Duration of the benchmark in seconds pub duration_secs: u64, + /// Sampling configuration: "full" or integer size (default: 10) + #[serde(default)] + pub sampling: SamplingPolicy, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum SamplingPolicy { + Full(String), + Partial(usize), +} + +impl Default for SamplingPolicy { + fn default() -> Self { + SamplingPolicy::Partial(10) + } } +impl SamplingPolicy { + pub fn is_full(&self) -> bool { + match self { + SamplingPolicy::Full(s) => s.eq_ignore_ascii_case("full"), + _ => false, + } + } + + pub fn size(&self) -> usize { + match self { + SamplingPolicy::Partial(n) => *n, + _ => 10, // Default fallback if needed, though is_full should be checked first + } + } +} + + impl BenchConfig { /// Load configuration from TOML file pub fn load>(path: P) -> Result { diff --git a/src/eth/eth_cli.rs b/src/eth/eth_cli.rs index da12c83..f5a6e42 100644 --- a/src/eth/eth_cli.rs +++ b/src/eth/eth_cli.rs @@ -1,5 +1,5 @@ use alloy::{ - consensus::{Account, TxEnvelope}, + consensus::TxEnvelope, eips::Encodable2718, network::Ethereum, primitives::{Address, TxHash, U256}, diff --git a/src/main.rs b/src/main.rs index defc0c2..c3ee9e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -377,6 +377,7 @@ async fn start_bench() -> Result<()> { let monitor = Monitor::new_with_clients( eth_clients.clone(), benchmark_config.performance.max_pool_size, + benchmark_config.performance.sampling, ) .start(); diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index fd1424b..3ee2ee7 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -23,7 +23,7 @@ use uuid::Uuid; use super::TxnIter; -const DEFAULT_CONCURRENCY_LIMIT: usize = 256; +const DEFAULT_CONCURRENCY_LIMIT: usize = 64; pub struct LevelFaucetPlan { id: PlanId, From 2ea92c6cf4dd6c95afc493e2d18575ff399756ae Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 1 Jan 2026 22:25:44 +0800 Subject: [PATCH 06/10] fmt --- src/actors/monitor/txn_tracker.rs | 67 ++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/src/actors/monitor/txn_tracker.rs b/src/actors/monitor/txn_tracker.rs index 42236b4..7619fd7 100644 --- a/src/actors/monitor/txn_tracker.rs +++ b/src/actors/monitor/txn_tracker.rs @@ -91,6 +91,9 @@ struct PlanTracker { plan_failed_production: bool, plan_name: String, + + /// Set of transaction hashes that have been resolved to avoid double counting + resolved_hashes: HashSet, } /// Detailed information of in-flight transactions @@ -226,9 +229,14 @@ impl TxnTracker { } } - pub fn handle_plan_produced(&mut self, plan_id: PlanId, _count: usize) { + pub fn handle_plan_produced(&mut self, plan_id: PlanId, count: usize) { if let Some(tracker) = self.plan_trackers.get_mut(&plan_id) { tracker.plan_produced = true; + // Force sync produced transactions count to catch up any lagging messages + if tracker.produce_transactions != count { + warn!("PlanProduced sync: plan {} count adjusted from {} to source-of-truth {}", plan_id, tracker.produce_transactions, count); + tracker.produce_transactions = count; + } } } @@ -255,6 +263,7 @@ impl TxnTracker { plan_produced: false, plan_failed_production: false, plan_name, + resolved_hashes: HashSet::new(), }; self.plan_trackers.insert(plan_id, tracker); } @@ -538,9 +547,11 @@ impl TxnTracker { if let Some(plan_tracker) = self.plan_trackers.get_mut(&cleared_info.metadata.plan_id) { - plan_tracker.resolved_transactions += 1; - self.resolved_txn_timestamps.push_back(Instant::now()); - self.total_resolved_transactions += 1; + if plan_tracker.resolved_hashes.insert(cleared_info.tx_hash) { + plan_tracker.resolved_transactions += 1; + self.resolved_txn_timestamps.push_back(Instant::now()); + self.total_resolved_transactions += 1; + } } } @@ -557,16 +568,20 @@ impl TxnTracker { } if let Some(plan_tracker) = self.plan_trackers.get_mut(&info.metadata.plan_id) { - plan_tracker.resolved_transactions += 1; - self.resolved_txn_timestamps.push_back(Instant::now()); - self.total_resolved_transactions += 1; - if !receipt_status { - plan_tracker.failed_executions += 1; - self.total_failed_executions += 1; - warn!( - "Transaction reverted: plan_id={}, tx_hash={:?}", - info.metadata.plan_id, info.tx_hash - ); + if plan_tracker.resolved_hashes.insert(info.tx_hash) { + plan_tracker.resolved_transactions += 1; + self.resolved_txn_timestamps.push_back(Instant::now()); + self.total_resolved_transactions += 1; + if !receipt_status { + plan_tracker.failed_executions += 1; + self.total_failed_executions += 1; + warn!( + "Transaction reverted: plan_id={}, tx_hash={:?}", + info.metadata.plan_id, info.tx_hash + ); + } + } else { + debug!("Duplicate resolution skipped: plan_id={}, tx_hash={:?}", info.metadata.plan_id, info.tx_hash); } } } @@ -613,11 +628,13 @@ impl TxnTracker { info.metadata.plan_id, info.tx_hash ); if let Some(plan_tracker) = self.plan_trackers.get_mut(&info.metadata.plan_id) { - plan_tracker.resolved_transactions += 1; - plan_tracker.failed_executions += 1; - self.total_failed_executions += 1; - self.resolved_txn_timestamps.push_back(Instant::now()); - self.total_resolved_transactions += 1; + if plan_tracker.resolved_hashes.insert(info.tx_hash) { + plan_tracker.resolved_transactions += 1; + plan_tracker.failed_executions += 1; + self.total_failed_executions += 1; + self.resolved_txn_timestamps.push_back(Instant::now()); + self.total_resolved_transactions += 1; + } } } else { // Queue for retry (and remove from pending to avoid duplicate retry) @@ -638,11 +655,13 @@ impl TxnTracker { self.inflight_checks.remove(&info.tx_hash); // Clean up inflight if removing self.pending_txns.remove(&info); if let Some(plan_tracker) = self.plan_trackers.get_mut(&info.metadata.plan_id) { - plan_tracker.resolved_transactions += 1; - plan_tracker.failed_executions += 1; - self.total_failed_executions += 1; - self.resolved_txn_timestamps.push_back(Instant::now()); - self.total_resolved_transactions += 1; + if plan_tracker.resolved_hashes.insert(info.tx_hash) { + plan_tracker.resolved_transactions += 1; + plan_tracker.failed_executions += 1; + self.total_failed_executions += 1; + self.resolved_txn_timestamps.push_back(Instant::now()); + self.total_resolved_transactions += 1; + } } } // If not timed out, they stay in pending_txns (already there from earlier) From 1a17bf4f550b081607426820500ff629815420a0 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 1 Jan 2026 23:50:32 +0800 Subject: [PATCH 07/10] feat: Configure `reqwest::Client` with specific pool and timeout settings and integrate `RpcClient` for `RootProvider` connection. --- src/eth/eth_cli.rs | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/src/eth/eth_cli.rs b/src/eth/eth_cli.rs index f5a6e42..dc6dfa2 100644 --- a/src/eth/eth_cli.rs +++ b/src/eth/eth_cli.rs @@ -4,7 +4,8 @@ use alloy::{ network::Ethereum, primitives::{Address, TxHash, U256}, providers::{Provider, ProviderBuilder, RootProvider}, - rpc::types::TransactionReceipt, + rpc::{client::RpcClient, types::TransactionReceipt}, + transports::http::Http, }; use anyhow::{Context as AnyhowContext, Result}; use comfy_table::{presets::UTF8_FULL, Cell, Table}; @@ -120,25 +121,20 @@ impl EthHttpCli { Url::parse(rpc_url).with_context(|| format!("Failed to parse RPC URL: {}", rpc_url))?; let mut inner = Vec::new(); for _ in 0..1 { - // let client = reqwest::Client::builder() - // // .pool_idle_timeout(Duration::from_secs(120)) - // // .pool_max_idle_per_host(2000) - // // .connect_timeout(Duration::from_secs(10)) - // // .timeout(Duration::from_secs(5)) - // // .tcp_keepalive(Duration::from_secs(30)) - // // .tcp_nodelay(true) - // // .http2_prior_knowledge() - // // .http2_adaptive_window(true) - // // .http2_keep_alive_timeout(Duration::from_secs(10)) - // // .no_gzip() - // // .no_brotli() - // // .no_deflate() - // // .no_zstd() - // .build() - // .unwrap(); - - let provider: RootProvider = - ProviderBuilder::default().connect_http(url.clone()); + let client = reqwest::Client::builder() + .pool_idle_timeout(Duration::from_secs(10)) // Shorter idle, high TPS rarely idles + .pool_max_idle_per_host(2000) // Large pool for 1500 concurrent senders + .connect_timeout(Duration::from_secs(5)) // Fast fail on connection issues + .timeout(Duration::from_secs(60)) // Reasonable global timeout to prevent stuck requests + .tcp_keepalive(Duration::from_secs(30)) + .tcp_nodelay(true) // Disable Nagle's algorithm for low latency + .http2_prior_knowledge() // Use HTTP/2 if server supports, better multiplexing + .build() + .unwrap(); + + let http = Http::with_client(client, url.clone()); + let rpc_client = RpcClient::new(http, true); + let provider: RootProvider = ProviderBuilder::default().connect_client(rpc_client); inner.push(Arc::new(provider)); } From bff586bd4434a0a0d466445ca2ec3ad7167cf70a Mon Sep 17 00:00:00 2001 From: keanji-x Date: Fri, 2 Jan 2026 00:45:59 +0800 Subject: [PATCH 08/10] fmt --- src/actors/consumer/actor.rs | 69 ++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 38 deletions(-) diff --git a/src/actors/consumer/actor.rs b/src/actors/consumer/actor.rs index fc71e82..0bbe733 100644 --- a/src/actors/consumer/actor.rs +++ b/src/actors/consumer/actor.rs @@ -288,44 +288,37 @@ impl Consumer { if error_string.contains("nonce too low") || error_string.contains("invalid nonce") { - // Check current on-chain nonce to determine final state - if let Ok(next_nonce) = dispatcher - .provider(&url) - .await - .unwrap() - .get_pending_txn_count(metadata.from_account.as_ref().clone()) - .await - { - // If on-chain nonce is greater than our attempted nonce, our transaction is indeed outdated - if next_nonce > metadata.nonce { - // Try to find the hash of the transaction using our nonce - let actual_nonce = metadata.nonce; - let from_account = metadata.from_account.clone(); - // Can't find hash, but can provide correct nonce - monitor_addr.do_send(UpdateSubmissionResult { - metadata, - result: Arc::new(SubmissionResult::NonceTooLow { - tx_hash: keccak256(&signed_txn.bytes), - expect_nonce: next_nonce, - actual_nonce, - from_account, - }), - rpc_url: url, - send_time: Instant::now(), - signed_bytes: Arc::new(signed_txn.bytes.clone()), - }); - } - } else { - // Failed to get nonce, can only mark as retryable error - warn!("Failed to get nonce for txn {:?}: {}", metadata.txn_id, e); - monitor_addr.do_send(UpdateSubmissionResult { - metadata, - result: Arc::new(SubmissionResult::ErrorWithRetry), - rpc_url: "unknown".to_string(), - send_time: Instant::now(), - signed_bytes: Arc::new(signed_txn.bytes.clone()), - }); - } + // The RPC already told us "nonce too low" - trust this directly. + // Try to get current nonce for better logging, but don't fail if we can't. + let (expect_nonce, actual_nonce, from_account) = + if let Ok(next_nonce) = dispatcher + .provider(&url) + .await + .unwrap() + .get_pending_txn_count(metadata.from_account.as_ref().clone()) + .await + { + (next_nonce, metadata.nonce, metadata.from_account.clone()) + } else { + // Failed to get nonce, but RPC already said "nonce too low" + // Use 0 as placeholder - the important thing is NOT to retry + warn!("Nonce too low but failed to get current nonce for {:?}, treating as resolved", metadata.txn_id); + (0, metadata.nonce, metadata.from_account.clone()) + }; + + monitor_addr.do_send(UpdateSubmissionResult { + metadata, + result: Arc::new(SubmissionResult::NonceTooLow { + tx_hash: keccak256(&signed_txn.bytes), + expect_nonce, + actual_nonce, + from_account, + }), + rpc_url: url, + send_time: Instant::now(), + signed_bytes: Arc::new(signed_txn.bytes.clone()), + }); + // After encountering Nonce error, should stop retrying and return regardless transactions_sending.fetch_sub(1, Ordering::Relaxed); return; From 2df3ddf57d3bef715c98a803e61c4c72e02b104e Mon Sep 17 00:00:00 2001 From: keanji-x Date: Fri, 2 Jan 2026 00:46:59 +0800 Subject: [PATCH 09/10] fmt --- src/actors/consumer/actor.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/actors/consumer/actor.rs b/src/actors/consumer/actor.rs index 0bbe733..6fb8ac7 100644 --- a/src/actors/consumer/actor.rs +++ b/src/actors/consumer/actor.rs @@ -283,6 +283,36 @@ impl Consumer { return; } + // --- Handle permanent errors that should NOT be retried --- + // These errors will never succeed no matter how many times we retry + if error_string.contains("insufficient funds") + || error_string.contains("insufficient balance") + || error_string.contains("gas limit exceeded") + || error_string.contains("exceeds block gas limit") + || error_string.contains("intrinsic gas too low") + { + error!( + "Permanent error for txn {:?}: {}, marking as failed (no retry)", + metadata.txn_id, e + ); + // Treat as NonceTooLow with placeholder values - this will mark as resolved + // without infinite retry. The transaction is dropped. + monitor_addr.do_send(UpdateSubmissionResult { + metadata, + result: Arc::new(SubmissionResult::NonceTooLow { + tx_hash: keccak256(&signed_txn.bytes), + expect_nonce: 0, // placeholder + actual_nonce: 0, // placeholder + from_account: Arc::new(alloy::primitives::Address::ZERO), + }), + rpc_url: url, + send_time: Instant::now(), + signed_bytes: Arc::new(signed_txn.bytes.clone()), + }); + transactions_sending.fetch_sub(1, Ordering::Relaxed); + return; + } + // --- Requirement 2: If it's a Nonce related error --- // "nonce too low" or "invalid nonce" suggests the on-chain nonce may have advanced if error_string.contains("nonce too low") From a161f2752f255f5cfae17ad8483670a4fa41a548 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Sun, 4 Jan 2026 14:30:20 +0800 Subject: [PATCH 10/10] fix price precision --- src/eth/eth_cli.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/eth/eth_cli.rs b/src/eth/eth_cli.rs index dc6dfa2..50fa25a 100644 --- a/src/eth/eth_cli.rs +++ b/src/eth/eth_cli.rs @@ -124,11 +124,8 @@ impl EthHttpCli { let client = reqwest::Client::builder() .pool_idle_timeout(Duration::from_secs(10)) // Shorter idle, high TPS rarely idles .pool_max_idle_per_host(2000) // Large pool for 1500 concurrent senders - .connect_timeout(Duration::from_secs(5)) // Fast fail on connection issues - .timeout(Duration::from_secs(60)) // Reasonable global timeout to prevent stuck requests .tcp_keepalive(Duration::from_secs(30)) .tcp_nodelay(true) // Disable Nagle's algorithm for low latency - .http2_prior_knowledge() // Use HTTP/2 if server supports, better multiplexing .build() .unwrap();