Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/actors/consumer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,23 @@ impl Consumer {
Err((e, url)) => {
let error_string = e.to_string().to_lowercase();

// --- Handle "already known" error ---
// This means the transaction is already in the node's mempool
if error_string.contains("already known") || error_string.contains("already imported") {
let tx_hash = keccak256(&signed_txn.bytes);
debug!("Transaction already known by node: {:?}, treating as success", tx_hash);
monitor_addr.do_send(UpdateSubmissionResult {
metadata,
result: Arc::new(SubmissionResult::Success(tx_hash)),
rpc_url: url,
send_time: Instant::now(),
});

transactions_sending.fetch_sub(1, Ordering::Relaxed);
transactions_sent.fetch_add(1, Ordering::Relaxed);
return;
}

// --- Requirement 3: If it's an "underpriced" error ---
// This error means the transaction was accepted by the node but gas is insufficient. We can calculate the hash and treat it as successfully submitted.
if error_string.contains("underpriced") {
Expand Down
4 changes: 2 additions & 2 deletions src/actors/monitor/mempool_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl MempoolTracker {
&self,
status: Vec<anyhow::Result<MempoolStatus>>,
producer_addr: &Addr<Producer>,
) -> Result<(), anyhow::Error> {
) -> Result<(u64, u64), anyhow::Error> {
let _ = producer_addr;
let mut total_pending = 0;
let mut total_queued = 0;
Expand All @@ -44,6 +44,6 @@ impl MempoolTracker {
} else if total_pending + total_queued < self.max_pool_size / 2 {
producer_addr.do_send(ResumeProducer);
}
Ok(())
Ok((total_pending as u64, total_queued as u64))
}
}
7 changes: 7 additions & 0 deletions src/actors/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ pub struct UpdateSubmissionResult {
pub struct Tick;

// Monitor Messages
#[derive(Message)]
#[rtype(result = "()")]
pub struct ReportProducerStats {
pub ready_accounts: u64,
pub sending_txns: u64,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct PlanCompleted {
Expand Down
24 changes: 18 additions & 6 deletions src/actors/monitor/monitor_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::txn_plan::PlanId;

use super::txn_tracker::{PlanStatus, TxnTracker};
use super::{
PlanCompleted, PlanFailed, RegisterConsumer, RegisterPlan, RegisterProducer, Tick,
UpdateSubmissionResult,
PlanCompleted, PlanFailed, RegisterConsumer, RegisterPlan, RegisterProducer,
ReportProducerStats, Tick, UpdateSubmissionResult,
};

#[derive(Message)]
Expand Down Expand Up @@ -93,10 +93,13 @@ impl Actor for Monitor {
.into_actor(act)
.map(|res, act, _ctx| {
if let Some(producer_addr) = &act.producer_addr {
if let Err(e) =
act.mempool_tracker.process_pool_status(res, producer_addr)
{
error!("Failed to process pool status: {}", e);
match act.mempool_tracker.process_pool_status(res, producer_addr) {
Ok((pending, queued)) => {
act.txn_tracker.update_mempool_stats(pending, queued);
}
Err(e) => {
error!("Failed to process pool status: {}", e);
}
}
}
}),
Expand Down Expand Up @@ -212,3 +215,12 @@ impl Handler<PlanProduced> for Monitor {
.handle_plan_produced(msg.plan_id, msg.count);
}
}

impl Handler<ReportProducerStats> for Monitor {
type Result = ();

fn handle(&mut self, msg: ReportProducerStats, _ctx: &mut Self::Context) {
self.txn_tracker
.update_producer_stats(msg.ready_accounts, msg.sending_txns);
}
}
43 changes: 39 additions & 4 deletions src/actors/monitor/txn_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ pub struct TxnTracker {
total_failed_submissions: u64,
total_failed_executions: u64,
last_completed_plan: Option<(PlanId, PlanTracker)>,
producer_ready_accounts: u64,
producer_sending_txns: u64,
mempool_pending: u64,
mempool_queued: u64,
}

/// Tracking status of a single transaction plan
Expand Down Expand Up @@ -134,9 +138,23 @@ impl TxnTracker {
total_failed_submissions: 0,
total_failed_executions: 0,
last_completed_plan: None,
producer_ready_accounts: 0,
producer_sending_txns: 0,
mempool_pending: 0,
mempool_queued: 0,
}
}

pub fn update_producer_stats(&mut self, ready_accounts: u64, sending_txns: u64) {
self.producer_ready_accounts = ready_accounts;
self.producer_sending_txns = sending_txns;
}

pub fn update_mempool_stats(&mut self, pending: u64, queued: u64) {
self.mempool_pending = pending;
self.mempool_queued = queued;
}

pub fn handler_produce_txns(&mut self, plan_id: PlanId, count: usize) {
if let Some(tracker) = self.plan_trackers.get_mut(&plan_id) {
tracker.produce_transactions += count;
Expand Down Expand Up @@ -478,7 +496,7 @@ impl TxnTracker {
let tps = self.resolved_txn_timestamps.len() as f64 / TPS_WINDOW.as_secs_f64();

// Calculate latency stats
let (avg_latency, min_latency, max_latency) = if !self.latencies.is_empty() {
let (avg_latency, _min_latency, _max_latency) = if !self.latencies.is_empty() {
let sum: Duration = self.latencies.iter().sum();
let avg = sum / self.latencies.len() as u32;
let min = *self.latencies.iter().min().unwrap();
Expand Down Expand Up @@ -660,16 +678,33 @@ impl TxnTracker {
.add_attribute(Attribute::Bold)
.fg(Color::Blue),
Cell::new(&format!(
"TPS:{:.1} | Latency(avg/min/max): {:.1}s/{:.1}s/{:.1}s",
"TPS:{:.1} | Lat: {:.1}s | Pool:{}/{}",
tps,
avg_latency.as_secs_f64(),
min_latency.as_secs_f64(),
max_latency.as_secs_f64()
format_large_number(self.mempool_pending),
format_large_number(self.mempool_queued)
))
.add_attribute(Attribute::Bold)
.fg(Color::Magenta),
]);

table.add_row(vec![
Cell::new("SYSTEM")
.add_attribute(Attribute::Bold)
.fg(Color::Yellow),
Cell::new(""), // Progress placeholder
Cell::new(""), // Success% placeholder
Cell::new(""), // SendFail placeholder
Cell::new(""), // ExecFail placeholder
Cell::new(&format!(
"Ready Accounts: {} | Processing: {}",
format_large_number(self.producer_ready_accounts),
format_large_number(self.producer_sending_txns)
))
.add_attribute(Attribute::Bold)
.fg(Color::Yellow),
]);

println!("{}", table);
}
}
26 changes: 13 additions & 13 deletions src/actors/producer/producer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::time::Duration;
use crate::actors::consumer::Consumer;
use crate::actors::monitor::monitor_actor::{PlanProduced, ProduceTxns};
use crate::actors::monitor::{
Monitor, PlanCompleted, PlanFailed, RegisterPlan, RegisterProducer, SubmissionResult,
UpdateSubmissionResult,
Monitor, PlanCompleted, PlanFailed, RegisterPlan, RegisterProducer, ReportProducerStats,
SubmissionResult, UpdateSubmissionResult,
};
use crate::actors::{ExeFrontPlan, PauseProducer, ResumeProducer};
use crate::txn_plan::{addr_pool::AddressPool, PlanExecutionMode, PlanId, TxnPlan};
Expand Down Expand Up @@ -190,15 +190,6 @@ impl Producer {
Some(nonce) => *nonce,
None => 0,
};
if next_nonce > signed_txn.metadata.nonce as u32 {
tracing::debug!(
"Nonce too low for account {:?}, expect nonce: {}, actual nonce: {}",
signed_txn.metadata.from_account,
next_nonce,
signed_txn.metadata.nonce
);
continue;
}
if let Err(e) = consumer_addr.send(signed_txn).await {
// If sending to the consumer fails, we abort the entire plan.
tracing::error!(
Expand Down Expand Up @@ -248,7 +239,11 @@ impl Actor for Producer {
}
.into_actor(self)
.wait(ctx);
ctx.run_interval(Duration::from_secs(5), |act, _ctx| {
ctx.run_interval(Duration::from_secs(1), |act, _ctx| {
act.monitor_addr.do_send(ReportProducerStats {
ready_accounts: act.stats.ready_accounts.load(Ordering::Relaxed),
sending_txns: act.stats.sending_txns.load(Ordering::Relaxed),
});
tracing::debug!("Producer stats: plans_num={}, sending_txns={}, ready_accounts={}, success_plans_num={}, failed_plans_num={}, success_txns={}, failed_txns={}", act.stats.remain_plans_num, act.stats.sending_txns.load(Ordering::Relaxed), act.stats.ready_accounts.load(Ordering::Relaxed), act.stats.success_plans_num, act.stats.failed_plans_num, act.stats.success_txns, act.stats.failed_txns);
});
}
Expand Down Expand Up @@ -406,7 +401,12 @@ impl Handler<UpdateSubmissionResult> for Producer {

fn handle(&mut self, msg: UpdateSubmissionResult, _ctx: &mut Self::Context) -> Self::Result {
let address_pool = self.address_pool.clone();
self.stats.sending_txns.fetch_sub(1, Ordering::Relaxed);
self.stats
.sending_txns
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
Some(val.saturating_sub(1))
})
.ok();
match msg.result.as_ref() {
SubmissionResult::Success(_) => {
self.stats.success_txns += 1;
Expand Down
12 changes: 6 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,11 @@ async fn start_bench() -> Result<()> {

async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc<EthHttpCli>) {
tracing::info!("Initializing nonce...");

// Collect all accounts first to get total count
let accounts: Vec<_> = accout_generator.accouts_nonce_iter().collect();
let total_accounts = accounts.len() as u64;

// Create progress bar
let pb = ProgressBar::new(total_accounts);
pb.set_style(
Expand All @@ -438,10 +438,10 @@ async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc<Eth
.unwrap()
.progress_chars("#>-"),
);

let pb = Arc::new(pb);
let start_time = Instant::now();

let tasks = accounts.into_iter().map(|(account, nonce)| {
let client = eth_client.clone();
let addr = account.clone();
Expand All @@ -455,7 +455,7 @@ async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc<Eth
}
Err(e) => {
tracing::error!("Failed to get nonce for address: {}: {}", addr, e);
pb.inc(1);
panic!("Failed to get nonce for address: {}", addr);
}
}
}
Expand All @@ -465,7 +465,7 @@ async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc<Eth
.buffer_unordered(1024)
.collect::<Vec<_>>()
.await;

pb.finish_with_message("Done");
let elapsed = start_time.elapsed();
let rate = total_accounts as f64 / elapsed.as_secs_f64();
Expand Down
5 changes: 5 additions & 0 deletions src/txn_plan/constructor/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,13 @@ impl<T: FaucetTxnBuilder + 'static> FaucetTreePlanBuilder<T> {
) -> Box<dyn TxnPlan> {
let senders = self.get_senders_for_level(level);
let is_final_level = level == self.total_levels.saturating_sub(1);

// Generate descriptive plan name
let token_name = self.txn_builder.token_name();
let plan_name = format!("Level{}Faucet{}Plan", level, token_name);

let plan = LevelFaucetPlan::new(
plan_name,
chain_id,
level,
account_init_nonce,
Expand Down
8 changes: 7 additions & 1 deletion src/txn_plan/faucet_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const DEFAULT_CONCURRENCY_LIMIT: usize = 256;

pub struct LevelFaucetPlan<T: FaucetTxnBuilder> {
id: PlanId,
name: String,
account_init_nonce: Arc<HashMap<Address, u64>>,
execution_mode: PlanExecutionMode,
chain_id: u64,
Expand All @@ -47,6 +48,7 @@ pub struct LevelFaucetPlan<T: FaucetTxnBuilder> {
impl<T: FaucetTxnBuilder> LevelFaucetPlan<T> {
#[allow(clippy::too_many_arguments)]
pub fn new(
name: String,
chain_id: u64,
level: usize,
account_init_nonce: Arc<HashMap<Address, u64>>,
Expand All @@ -67,6 +69,7 @@ impl<T: FaucetTxnBuilder> LevelFaucetPlan<T> {
};
Self {
id,
name,
account_init_nonce,
execution_mode,
chain_id,
Expand Down Expand Up @@ -101,7 +104,7 @@ impl<T: FaucetTxnBuilder + 'static> TxnPlan for LevelFaucetPlan<T> {
}

fn name(&self) -> &str {
"LevelFaucetPlan"
&self.name
}

fn build_txns(
Expand Down Expand Up @@ -156,6 +159,9 @@ impl<T: FaucetTxnBuilder + 'static> TxnPlan for LevelFaucetPlan<T> {
let init_nonce = account_init_nonce
.get(&sender_signer.address())
.unwrap_or(&0);
// Skip transaction if it was already executed (recovery mode).
// In normal mode, init_nonce is 0 for all accounts, so nothing is skipped.
// In recovery mode, init_nonce is the on-chain nonce, so we skip if init_nonce > nonce.
if *init_nonce > nonce && init_nonce != &0 {
continue;
}
Expand Down
11 changes: 11 additions & 0 deletions src/txn_plan/faucet_txn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub trait FaucetTxnBuilder: Send + Sync {
nonce: u64,
chain_id: u64,
) -> TransactionRequest;

/// Returns the name of the token type (e.g., "Eth", "Token")
fn token_name(&self) -> &str;
}

/// A `FaucetTxnBuilder` for native Ethereum (ETH) transfers.
Expand All @@ -46,6 +49,10 @@ impl FaucetTxnBuilder for EthFaucetTxnBuilder {
.with_max_fee_per_gas(10_000_000_000)
.with_gas_limit(21_000) // Standard gas for ETH transfer
}

fn token_name(&self) -> &str {
"Eth"
}
}

/// A `FaucetTxnBuilder` for ERC20 token transfers.
Expand Down Expand Up @@ -80,4 +87,8 @@ impl FaucetTxnBuilder for Erc20FaucetTxnBuilder {
.with_max_fee_per_gas(10_000_000_000)
.with_gas_limit(60_000) // A reasonable default for ERC20 transfers
}

fn token_name(&self) -> &str {
"Token"
}
}
Loading
Loading