Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bench_config.template
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ num_senders = 1500
# Maximum capacity of the transaction pool inside Consumer
max_pool_size = 100000
# Duration of the benchmark in seconds. Set to 0 to run indefinitely.
duration_secs = 60
# Duration of the benchmark in seconds. Set to 0 to run indefinitely.
duration_secs = 60
# Sampling strategy: integer (transaction count) or "full" (check all pending)
# Default is 10
sampling = 20
99 changes: 61 additions & 38 deletions src/actors/consumer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,49 +283,72 @@ impl Consumer {
return;
}

// --- Handle permanent errors that should NOT be retried ---
// These errors will never succeed no matter how many times we retry
if error_string.contains("insufficient funds")
|| error_string.contains("insufficient balance")
|| error_string.contains("gas limit exceeded")
|| error_string.contains("exceeds block gas limit")
|| error_string.contains("intrinsic gas too low")
{
error!(
"Permanent error for txn {:?}: {}, marking as failed (no retry)",
metadata.txn_id, e
);
// Treat as NonceTooLow with placeholder values - this will mark as resolved
// without infinite retry. The transaction is dropped.
monitor_addr.do_send(UpdateSubmissionResult {
metadata,
result: Arc::new(SubmissionResult::NonceTooLow {
tx_hash: keccak256(&signed_txn.bytes),
expect_nonce: 0, // placeholder
actual_nonce: 0, // placeholder
from_account: Arc::new(alloy::primitives::Address::ZERO),
}),
rpc_url: url,
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});
transactions_sending.fetch_sub(1, Ordering::Relaxed);
return;
}

// --- Requirement 2: If it's a Nonce related error ---
// "nonce too low" or "invalid nonce" suggests the on-chain nonce may have advanced
if error_string.contains("nonce too low")
|| error_string.contains("invalid nonce")
{
// Check current on-chain nonce to determine final state
if let Ok(next_nonce) = dispatcher
.provider(&url)
.await
.unwrap()
.get_txn_count(metadata.from_account.as_ref().clone())
.await
{
// If on-chain nonce is greater than our attempted nonce, our transaction is indeed outdated
if next_nonce > metadata.nonce {
// Try to find the hash of the transaction using our nonce
let actual_nonce = metadata.nonce;
let from_account = metadata.from_account.clone();
// Can't find hash, but can provide correct nonce
monitor_addr.do_send(UpdateSubmissionResult {
metadata,
result: Arc::new(SubmissionResult::NonceTooLow {
tx_hash: keccak256(&signed_txn.bytes),
expect_nonce: next_nonce,
actual_nonce,
from_account,
}),
rpc_url: url,
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});
}
} else {
// Failed to get nonce, can only mark as retryable error
warn!("Failed to get nonce for txn {:?}: {}", metadata.txn_id, e);
monitor_addr.do_send(UpdateSubmissionResult {
metadata,
result: Arc::new(SubmissionResult::ErrorWithRetry),
rpc_url: "unknown".to_string(),
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});
}
// The RPC already told us "nonce too low" - trust this directly.
// Try to get current nonce for better logging, but don't fail if we can't.
let (expect_nonce, actual_nonce, from_account) =
if let Ok(next_nonce) = dispatcher
.provider(&url)
.await
.unwrap()
.get_pending_txn_count(metadata.from_account.as_ref().clone())
.await
{
(next_nonce, metadata.nonce, metadata.from_account.clone())
} else {
// Failed to get nonce, but RPC already said "nonce too low"
// Use 0 as placeholder - the important thing is NOT to retry
warn!("Nonce too low but failed to get current nonce for {:?}, treating as resolved", metadata.txn_id);
(0, metadata.nonce, metadata.from_account.clone())
};

monitor_addr.do_send(UpdateSubmissionResult {
metadata,
result: Arc::new(SubmissionResult::NonceTooLow {
tx_hash: keccak256(&signed_txn.bytes),
expect_nonce,
actual_nonce,
from_account,
}),
rpc_url: url,
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});

// After encountering Nonce error, should stop retrying and return regardless
transactions_sending.fetch_sub(1, Ordering::Relaxed);
return;
Expand Down
97 changes: 91 additions & 6 deletions src/actors/monitor/mempool_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,41 @@ use actix::Addr;

use crate::{
actors::{PauseProducer, Producer, ResumeProducer},
eth::{EthHttpCli, MempoolStatus},
eth::{EthHttpCli, MempoolStatus, TxPoolContent},
};



/// Action to take after analyzing mempool status
#[derive(Debug)]
pub enum MempoolAction {
/// No action needed
None,
/// Pause producer due to mempool being full
Pause,
/// Resume producer
Resume,
/// Need to fetch txpool_content for nonce correction
NeedsNonceCorrection,
}

pub(crate) struct MempoolTracker {
max_pool_size: usize,
/// Threshold for queued/pending ratio to trigger nonce correction
queued_ratio_threshold: f64,
/// Cooldown to avoid frequent txpool_content calls
last_correction_check: std::time::Instant,
correction_cooldown: std::time::Duration,
}

impl MempoolTracker {
pub fn new(max_pool_size: usize) -> Self {
Self { max_pool_size }
Self {
max_pool_size,
queued_ratio_threshold: 1.0,
last_correction_check: std::time::Instant::now(),
correction_cooldown: std::time::Duration::from_secs(30),
}
}

pub async fn get_pool_status(
Expand All @@ -28,22 +53,82 @@ impl MempoolTracker {
}

pub fn process_pool_status(
&self,
&mut self,
status: Vec<anyhow::Result<MempoolStatus>>,
producer_addr: &Addr<Producer>,
) -> Result<(u64, u64), anyhow::Error> {
let _ = producer_addr;
) -> Result<(u64, u64, MempoolAction), anyhow::Error> {
let mut total_pending = 0;
let mut total_queued = 0;
for status in status.into_iter().flatten() {
total_pending += status.pending;
total_queued += status.queued;
}

let mut action = MempoolAction::None;

// Check for mempool size limits
if total_pending + total_queued > self.max_pool_size {
producer_addr.do_send(PauseProducer);
action = MempoolAction::Pause;
} else if total_pending + total_queued < self.max_pool_size / 2 {
producer_addr.do_send(ResumeProducer);
action = MempoolAction::Resume;
}

// Check for high queued transactions (indicates nonce gaps)
if total_queued > 0 {
// Calculate ratio, handling division by zero if pending is 0
let ratio = if total_pending > 0 {
total_queued as f64 / total_pending as f64
} else {
f64::INFINITY
};

// Trigger if ratio is high OR if we have significant queued transactions with low pending
if ratio > self.queued_ratio_threshold || (total_queued > 100 && total_pending < 10) {
// Check cooldown
if self.last_correction_check.elapsed() > self.correction_cooldown {
self.last_correction_check = std::time::Instant::now();
tracing::warn!(
"High queued/pending ratio detected: {:.2} (queued={}, pending={}), triggering nonce correction",
ratio,
total_queued,
total_pending
);
action = MempoolAction::NeedsNonceCorrection;
}
}
}
Ok((total_pending as u64, total_queued as u64))

Ok((total_pending as u64, total_queued as u64, action))
}

/// Identify accounts with nonce gaps from txpool_content
/// Returns list of addresses that need correction
pub fn identify_problematic_accounts(content: &TxPoolContent) -> Vec<alloy::primitives::Address> {
let mut problematic_accounts = Vec::new();

for (address, nonces) in &content.queued {
// Identify accounts that have queued transactions.
// Presence in 'queued' implies a gap or issue.
// We check if they also have pending transactions.
// If they have NO pending transactions but HAVE queued, strictly implies a gap.
let has_pending = content.pending.contains_key(address);

// Check if there are any valid nonces in queued
if nonces.keys().any(|s| s.parse::<u64>().is_ok()) {
if !has_pending {
problematic_accounts.push(*address);
}
}
}

tracing::info!(
"Identified {} accounts with likely nonce gaps",
problematic_accounts.len()
);

problematic_accounts
}
}

17 changes: 17 additions & 0 deletions src/actors/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ pub struct PlanCompleted {
pub plan_id: PlanId,
}


#[derive(Message)]
#[rtype(result = "()")]
pub struct PlanFailed {
pub plan_id: PlanId,
pub reason: String,
}



/// Message to retry a timed-out transaction
#[derive(Message, Clone)]
#[rtype(result = "()")]
Expand All @@ -86,4 +89,18 @@ pub struct RetryTxn {
pub metadata: Arc<TxnMetadata>,
}

/// Information for correcting account nonce
#[derive(Debug, Clone)]
pub struct NonceCorrectionInfo {
pub account: Address,
pub expected_nonce: u64,
}

/// Message to correct nonces based on txpool_content analysis
#[derive(Message)]
#[rtype(result = "()")]
pub struct CorrectNonces {
pub corrections: Vec<NonceCorrectionInfo>,
}

pub use monitor_actor::Monitor;
Loading
Loading