diff --git a/src/actors/consumer/actor.rs b/src/actors/consumer/actor.rs index 8c15511..d7d0576 100644 --- a/src/actors/consumer/actor.rs +++ b/src/actors/consumer/actor.rs @@ -242,6 +242,23 @@ impl Consumer { Err((e, url)) => { let error_string = e.to_string().to_lowercase(); + // --- Handle "already known" error --- + // This means the transaction is already in the node's mempool + if error_string.contains("already known") || error_string.contains("already imported") { + let tx_hash = keccak256(&signed_txn.bytes); + debug!("Transaction already known by node: {:?}, treating as success", tx_hash); + monitor_addr.do_send(UpdateSubmissionResult { + metadata, + result: Arc::new(SubmissionResult::Success(tx_hash)), + rpc_url: url, + send_time: Instant::now(), + }); + + transactions_sending.fetch_sub(1, Ordering::Relaxed); + transactions_sent.fetch_add(1, Ordering::Relaxed); + return; + } + // --- Requirement 3: If it's an "underpriced" error --- // This error means the transaction was accepted by the node but gas is insufficient. We can calculate the hash and treat it as successfully submitted. if error_string.contains("underpriced") { diff --git a/src/actors/monitor/mempool_tracker.rs b/src/actors/monitor/mempool_tracker.rs index 1f35199..5f81671 100644 --- a/src/actors/monitor/mempool_tracker.rs +++ b/src/actors/monitor/mempool_tracker.rs @@ -31,7 +31,7 @@ impl MempoolTracker { &self, status: Vec>, producer_addr: &Addr, - ) -> Result<(), anyhow::Error> { + ) -> Result<(u64, u64), anyhow::Error> { let _ = producer_addr; let mut total_pending = 0; let mut total_queued = 0; @@ -44,6 +44,6 @@ impl MempoolTracker { } else if total_pending + total_queued < self.max_pool_size / 2 { producer_addr.do_send(ResumeProducer); } - Ok(()) + Ok((total_pending as u64, total_queued as u64)) } } diff --git a/src/actors/monitor/mod.rs b/src/actors/monitor/mod.rs index d660c68..f9a425e 100644 --- a/src/actors/monitor/mod.rs +++ b/src/actors/monitor/mod.rs @@ -56,6 +56,13 @@ pub struct UpdateSubmissionResult { pub struct Tick; // Monitor Messages +#[derive(Message)] +#[rtype(result = "()")] +pub struct ReportProducerStats { + pub ready_accounts: u64, + pub sending_txns: u64, +} + #[derive(Message)] #[rtype(result = "()")] pub struct PlanCompleted { diff --git a/src/actors/monitor/monitor_actor.rs b/src/actors/monitor/monitor_actor.rs index c9b0e69..a888712 100644 --- a/src/actors/monitor/monitor_actor.rs +++ b/src/actors/monitor/monitor_actor.rs @@ -14,8 +14,8 @@ use crate::txn_plan::PlanId; use super::txn_tracker::{PlanStatus, TxnTracker}; use super::{ - PlanCompleted, PlanFailed, RegisterConsumer, RegisterPlan, RegisterProducer, Tick, - UpdateSubmissionResult, + PlanCompleted, PlanFailed, RegisterConsumer, RegisterPlan, RegisterProducer, + ReportProducerStats, Tick, UpdateSubmissionResult, }; #[derive(Message)] @@ -93,10 +93,13 @@ impl Actor for Monitor { .into_actor(act) .map(|res, act, _ctx| { if let Some(producer_addr) = &act.producer_addr { - if let Err(e) = - act.mempool_tracker.process_pool_status(res, producer_addr) - { - error!("Failed to process pool status: {}", e); + match act.mempool_tracker.process_pool_status(res, producer_addr) { + Ok((pending, queued)) => { + act.txn_tracker.update_mempool_stats(pending, queued); + } + Err(e) => { + error!("Failed to process pool status: {}", e); + } } } }), @@ -212,3 +215,12 @@ impl Handler for Monitor { .handle_plan_produced(msg.plan_id, msg.count); } } + +impl Handler for Monitor { + type Result = (); + + fn handle(&mut self, msg: ReportProducerStats, _ctx: &mut Self::Context) { + self.txn_tracker + .update_producer_stats(msg.ready_accounts, msg.sending_txns); + } +} diff --git a/src/actors/monitor/txn_tracker.rs b/src/actors/monitor/txn_tracker.rs index 5d70430..8bafead 100644 --- a/src/actors/monitor/txn_tracker.rs +++ b/src/actors/monitor/txn_tracker.rs @@ -49,6 +49,10 @@ pub struct TxnTracker { total_failed_submissions: u64, total_failed_executions: u64, last_completed_plan: Option<(PlanId, PlanTracker)>, + producer_ready_accounts: u64, + producer_sending_txns: u64, + mempool_pending: u64, + mempool_queued: u64, } /// Tracking status of a single transaction plan @@ -134,9 +138,23 @@ impl TxnTracker { total_failed_submissions: 0, total_failed_executions: 0, last_completed_plan: None, + producer_ready_accounts: 0, + producer_sending_txns: 0, + mempool_pending: 0, + mempool_queued: 0, } } + pub fn update_producer_stats(&mut self, ready_accounts: u64, sending_txns: u64) { + self.producer_ready_accounts = ready_accounts; + self.producer_sending_txns = sending_txns; + } + + pub fn update_mempool_stats(&mut self, pending: u64, queued: u64) { + self.mempool_pending = pending; + self.mempool_queued = queued; + } + pub fn handler_produce_txns(&mut self, plan_id: PlanId, count: usize) { if let Some(tracker) = self.plan_trackers.get_mut(&plan_id) { tracker.produce_transactions += count; @@ -478,7 +496,7 @@ impl TxnTracker { let tps = self.resolved_txn_timestamps.len() as f64 / TPS_WINDOW.as_secs_f64(); // Calculate latency stats - let (avg_latency, min_latency, max_latency) = if !self.latencies.is_empty() { + let (avg_latency, _min_latency, _max_latency) = if !self.latencies.is_empty() { let sum: Duration = self.latencies.iter().sum(); let avg = sum / self.latencies.len() as u32; let min = *self.latencies.iter().min().unwrap(); @@ -660,16 +678,33 @@ impl TxnTracker { .add_attribute(Attribute::Bold) .fg(Color::Blue), Cell::new(&format!( - "TPS:{:.1} | Latency(avg/min/max): {:.1}s/{:.1}s/{:.1}s", + "TPS:{:.1} | Lat: {:.1}s | Pool:{}/{}", tps, avg_latency.as_secs_f64(), - min_latency.as_secs_f64(), - max_latency.as_secs_f64() + format_large_number(self.mempool_pending), + format_large_number(self.mempool_queued) )) .add_attribute(Attribute::Bold) .fg(Color::Magenta), ]); + table.add_row(vec![ + Cell::new("SYSTEM") + .add_attribute(Attribute::Bold) + .fg(Color::Yellow), + Cell::new(""), // Progress placeholder + Cell::new(""), // Success% placeholder + Cell::new(""), // SendFail placeholder + Cell::new(""), // ExecFail placeholder + Cell::new(&format!( + "Ready Accounts: {} | Processing: {}", + format_large_number(self.producer_ready_accounts), + format_large_number(self.producer_sending_txns) + )) + .add_attribute(Attribute::Bold) + .fg(Color::Yellow), + ]); + println!("{}", table); } } diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 6f38d30..a4dedef 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -8,8 +8,8 @@ use std::time::Duration; use crate::actors::consumer::Consumer; use crate::actors::monitor::monitor_actor::{PlanProduced, ProduceTxns}; use crate::actors::monitor::{ - Monitor, PlanCompleted, PlanFailed, RegisterPlan, RegisterProducer, SubmissionResult, - UpdateSubmissionResult, + Monitor, PlanCompleted, PlanFailed, RegisterPlan, RegisterProducer, ReportProducerStats, + SubmissionResult, UpdateSubmissionResult, }; use crate::actors::{ExeFrontPlan, PauseProducer, ResumeProducer}; use crate::txn_plan::{addr_pool::AddressPool, PlanExecutionMode, PlanId, TxnPlan}; @@ -190,15 +190,6 @@ impl Producer { Some(nonce) => *nonce, None => 0, }; - if next_nonce > signed_txn.metadata.nonce as u32 { - tracing::debug!( - "Nonce too low for account {:?}, expect nonce: {}, actual nonce: {}", - signed_txn.metadata.from_account, - next_nonce, - signed_txn.metadata.nonce - ); - continue; - } if let Err(e) = consumer_addr.send(signed_txn).await { // If sending to the consumer fails, we abort the entire plan. tracing::error!( @@ -248,7 +239,11 @@ impl Actor for Producer { } .into_actor(self) .wait(ctx); - ctx.run_interval(Duration::from_secs(5), |act, _ctx| { + ctx.run_interval(Duration::from_secs(1), |act, _ctx| { + act.monitor_addr.do_send(ReportProducerStats { + ready_accounts: act.stats.ready_accounts.load(Ordering::Relaxed), + sending_txns: act.stats.sending_txns.load(Ordering::Relaxed), + }); tracing::debug!("Producer stats: plans_num={}, sending_txns={}, ready_accounts={}, success_plans_num={}, failed_plans_num={}, success_txns={}, failed_txns={}", act.stats.remain_plans_num, act.stats.sending_txns.load(Ordering::Relaxed), act.stats.ready_accounts.load(Ordering::Relaxed), act.stats.success_plans_num, act.stats.failed_plans_num, act.stats.success_txns, act.stats.failed_txns); }); } @@ -406,7 +401,12 @@ impl Handler for Producer { fn handle(&mut self, msg: UpdateSubmissionResult, _ctx: &mut Self::Context) -> Self::Result { let address_pool = self.address_pool.clone(); - self.stats.sending_txns.fetch_sub(1, Ordering::Relaxed); + self.stats + .sending_txns + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| { + Some(val.saturating_sub(1)) + }) + .ok(); match msg.result.as_ref() { SubmissionResult::Success(_) => { self.stats.success_txns += 1; diff --git a/src/main.rs b/src/main.rs index a669429..6cde07b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -425,11 +425,11 @@ async fn start_bench() -> Result<()> { async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc) { tracing::info!("Initializing nonce..."); - + // Collect all accounts first to get total count let accounts: Vec<_> = accout_generator.accouts_nonce_iter().collect(); let total_accounts = accounts.len() as u64; - + // Create progress bar let pb = ProgressBar::new(total_accounts); pb.set_style( @@ -438,10 +438,10 @@ async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc-"), ); - + let pb = Arc::new(pb); let start_time = Instant::now(); - + let tasks = accounts.into_iter().map(|(account, nonce)| { let client = eth_client.clone(); let addr = account.clone(); @@ -455,7 +455,7 @@ async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc { tracing::error!("Failed to get nonce for address: {}: {}", addr, e); - pb.inc(1); + panic!("Failed to get nonce for address: {}", addr); } } } @@ -465,7 +465,7 @@ async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc>() .await; - + pb.finish_with_message("Done"); let elapsed = start_time.elapsed(); let rate = total_accounts as f64 / elapsed.as_secs_f64(); diff --git a/src/txn_plan/constructor/faucet.rs b/src/txn_plan/constructor/faucet.rs index b6f16ef..ea24694 100644 --- a/src/txn_plan/constructor/faucet.rs +++ b/src/txn_plan/constructor/faucet.rs @@ -196,8 +196,13 @@ impl FaucetTreePlanBuilder { ) -> Box { let senders = self.get_senders_for_level(level); let is_final_level = level == self.total_levels.saturating_sub(1); + + // Generate descriptive plan name + let token_name = self.txn_builder.token_name(); + let plan_name = format!("Level{}Faucet{}Plan", level, token_name); let plan = LevelFaucetPlan::new( + plan_name, chain_id, level, account_init_nonce, diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index f6df74d..fd1424b 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -27,6 +27,7 @@ const DEFAULT_CONCURRENCY_LIMIT: usize = 256; pub struct LevelFaucetPlan { id: PlanId, + name: String, account_init_nonce: Arc>, execution_mode: PlanExecutionMode, chain_id: u64, @@ -47,6 +48,7 @@ pub struct LevelFaucetPlan { impl LevelFaucetPlan { #[allow(clippy::too_many_arguments)] pub fn new( + name: String, chain_id: u64, level: usize, account_init_nonce: Arc>, @@ -67,6 +69,7 @@ impl LevelFaucetPlan { }; Self { id, + name, account_init_nonce, execution_mode, chain_id, @@ -101,7 +104,7 @@ impl TxnPlan for LevelFaucetPlan { } fn name(&self) -> &str { - "LevelFaucetPlan" + &self.name } fn build_txns( @@ -156,6 +159,9 @@ impl TxnPlan for LevelFaucetPlan { let init_nonce = account_init_nonce .get(&sender_signer.address()) .unwrap_or(&0); + // Skip transaction if it was already executed (recovery mode). + // In normal mode, init_nonce is 0 for all accounts, so nothing is skipped. + // In recovery mode, init_nonce is the on-chain nonce, so we skip if init_nonce > nonce. if *init_nonce > nonce && init_nonce != &0 { continue; } diff --git a/src/txn_plan/faucet_txn_builder.rs b/src/txn_plan/faucet_txn_builder.rs index 849dcc2..0e75fa3 100644 --- a/src/txn_plan/faucet_txn_builder.rs +++ b/src/txn_plan/faucet_txn_builder.rs @@ -24,6 +24,9 @@ pub trait FaucetTxnBuilder: Send + Sync { nonce: u64, chain_id: u64, ) -> TransactionRequest; + + /// Returns the name of the token type (e.g., "Eth", "Token") + fn token_name(&self) -> &str; } /// A `FaucetTxnBuilder` for native Ethereum (ETH) transfers. @@ -46,6 +49,10 @@ impl FaucetTxnBuilder for EthFaucetTxnBuilder { .with_max_fee_per_gas(10_000_000_000) .with_gas_limit(21_000) // Standard gas for ETH transfer } + + fn token_name(&self) -> &str { + "Eth" + } } /// A `FaucetTxnBuilder` for ERC20 token transfers. @@ -80,4 +87,8 @@ impl FaucetTxnBuilder for Erc20FaucetTxnBuilder { .with_max_fee_per_gas(10_000_000_000) .with_gas_limit(60_000) // A reasonable default for ERC20 transfers } + + fn token_name(&self) -> &str { + "Token" + } } diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index 6a43b35..9c5382f 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -166,3 +166,109 @@ impl AccountGenerator { accounts } } + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::keccak256; + + #[test] + fn test_compute_account_address() { + // Test computing address for specific account IDs + let test_ids = vec![0, 1, 100, 1000, 10000, 100000, 1000000]; + + println!("\n=== Account ID to Address Mapping ==="); + for id in test_ids { + let private_key_bytes = keccak256((id as u64).to_le_bytes()); + let signer = PrivateKeySigner::from_slice(private_key_bytes.as_slice()).unwrap(); + let address = signer.address(); + + println!("ID {}: Address = {:?}", id, address); + println!(" Private Key = 0x{}", hex::encode(private_key_bytes)); + } + } + + #[test] + fn test_find_account_by_address() { + // Find account ID for a specific address + let target_address = "0x3ece3a612e4e8849a3eaf093c61683b1370f3418"; + + println!("\n=== Searching for address {} ===", target_address); + + // Search final recipients in batches + println!("Searching final recipients (0-999999) in batches..."); + let batch_size = 10000; + for batch_start in (0..1000000).step_by(batch_size) { + let batch_end = (batch_start + batch_size).min(1000000); + + for id in batch_start..batch_end { + let private_key_bytes = keccak256((id as u64).to_le_bytes()); + let signer = PrivateKeySigner::from_slice(private_key_bytes.as_slice()).unwrap(); + let address = format!("{:?}", signer.address()).to_lowercase(); + + if address == target_address.to_lowercase() { + println!("FOUND! Account ID = {}", id); + println!("This is a final recipient account"); + println!("Private Key = 0x{}", hex::encode(private_key_bytes)); + + // Calculate parent node + let degree = 10; + let parent_index_in_level5 = id / degree; + let parent_id = 1011110 + parent_index_in_level5; + + println!("\nParent node calculation:"); + println!(" Parent is in Level 5"); + println!(" Parent index in Level 5: {}", parent_index_in_level5); + println!(" Parent account ID: {}", parent_id); + + let parent_pk = keccak256((parent_id as u64).to_le_bytes()); + let parent_signer = PrivateKeySigner::from_slice(parent_pk.as_slice()).unwrap(); + println!(" Parent address: {:?}", parent_signer.address()); + + return; + } + } + + if batch_start % 100000 == 0 { + println!(" Checked up to ID {}...", batch_end); + } + } + + println!("Address not found in final recipients"); + } + + #[test] + fn test_faucet_tree_structure() { + // Print the faucet tree structure with actual addresses + println!("\n=== Faucet Tree Structure ==="); + + let degree = 10; + let total_accounts = 1000000; + + // Faucet account (using the one from bench_config.toml) + let faucet_pk = "5c173b12be434289682782ac6f7e7bf73a6fa5a20d507e318a4bdb039b1a5f6e"; + let faucet_bytes = hex::decode(faucet_pk).unwrap(); + let faucet_signer = PrivateKeySigner::from_slice(&faucet_bytes).unwrap(); + println!("Faucet: {:?}", faucet_signer.address()); + + // Level 0 (first 10 intermediate accounts) + println!("\nLevel 0 (indices 1000000-1000009):"); + for i in 0..3 { + let id = 1000000 + i; + let pk = keccak256((id as u64).to_le_bytes()); + let signer = PrivateKeySigner::from_slice(pk.as_slice()).unwrap(); + println!(" ID {}: {:?}", id, signer.address()); + } + println!(" ..."); + + // Level 1 (sample) + println!("\nLevel 1 (indices 1000010-1000109, showing first 3):"); + for i in 0..3 { + let id = 1000010 + i; + let pk = keccak256((id as u64).to_le_bytes()); + let signer = PrivateKeySigner::from_slice(pk.as_slice()).unwrap(); + println!(" ID {}: {:?}", id, signer.address()); + } + println!(" ..."); + } +}