diff --git a/bench_config.template b/bench_config.template index 65cf948..22aa587 100644 --- a/bench_config.template +++ b/bench_config.template @@ -8,6 +8,8 @@ nodes = [ ] num_tokens=2 enable_swap_token = false +# Address pool type: "random" (default) or "weighted" (hot/normal/long-tail distribution) +address_pool_type = "random" # Faucet and deployer account configuration [faucet] # Private key (example, please replace with real private key, the follows is a example of reth) diff --git a/src/actors/monitor/txn_tracker.rs b/src/actors/monitor/txn_tracker.rs index 6b0aeee..4faf2be 100644 --- a/src/actors/monitor/txn_tracker.rs +++ b/src/actors/monitor/txn_tracker.rs @@ -19,8 +19,8 @@ const TXN_TIMEOUT: Duration = Duration::from_secs(600); // 10 minutes timeout const TPS_WINDOW: Duration = Duration::from_secs(17); // Backpressure configuration -const MAX_PENDING_TXNS: usize = 100_000; -const BACKPRESSURE_RESUME_THRESHOLD: usize = 80_000; // 80% of max +const MAX_PENDING_TXNS: usize = 200_000; +const BACKPRESSURE_RESUME_THRESHOLD: usize = 160_000; // 80% of max // Retry configuration const RETRY_TIMEOUT: Duration = Duration::from_secs(120); // Retry if stuck for 120s diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 605785a..bdb92e6 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -1,6 +1,6 @@ use actix::prelude::*; use dashmap::DashMap; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -74,10 +74,14 @@ pub struct Producer { account_generator: AccountManager, /// A queue of plans waiting to be executed. Plans are processed in FIFO order. + /// Only this queue is subject to max_queue_size limit. plan_queue: VecDeque>, - /// Tracks pending plans to respond to the original requester upon completion. - pending_plans: HashMap>>, - /// The maximum number of plans allowed in the queue. + /// Plans that have finished producing transactions but are awaiting completion confirmation. + /// These do NOT count towards the queue size limit. + awaiting_completion: HashSet, + /// Global responder management across all plan states. + plan_responders: HashMap>>, + /// The maximum number of plans allowed in the queue (only applies to plan_queue). max_queue_size: usize, } @@ -112,7 +116,8 @@ impl Producer { monitor_addr, consumer_addr, plan_queue: VecDeque::new(), - pending_plans: HashMap::new(), + awaiting_completion: HashSet::new(), + plan_responders: HashMap::new(), max_queue_size: 10, }) } @@ -282,7 +287,7 @@ impl Handler for Producer { if !is_ready { // If not ready, return the plan so it can be put back at the front of the queue. tracing::debug!("Plan '{}' is not ready, re-queuing at front.", plan.name()); - return Some(plan); + return Err(plan); } // If ready, execute the plan. @@ -305,19 +310,33 @@ impl Handler for Producer { plan_id, reason: e.to_string(), }); + return Ok(None); } - // Return None as the plan has been consumed (either successfully started or failed). - None + // Return the plan_id for adding to awaiting_completion + Ok(Some(plan_id)) } .into_actor(self) - .map(|maybe_plan, act, _ctx| { - // This block runs after the async block is complete, safely on the actor's context. - if let Some(plan) = maybe_plan { - act.stats.remain_plans_num += 1; - // CRITICAL: If the plan was not ready, push it back to the *front* of the queue - // to ensure strict sequential execution. No other plan can run before it. - act.plan_queue.push_front(plan); + .map(|result, act, ctx| { + match result { + Err(plan) => { + // Plan was not ready, push it back to the front of the queue + act.stats.remain_plans_num += 1; + act.plan_queue.push_front(plan); + // Re-trigger plan execution after a short delay to avoid busy-looping. + ctx.run_later(Duration::from_millis(100), |act, ctx| { + act.trigger_next_plan_if_needed(ctx); + }); + } + Ok(Some(plan_id)) => { + // Plan executed successfully, move to awaiting_completion + act.awaiting_completion.insert(plan_id); + // Trigger execution of the next plan in the queue + act.trigger_next_plan_if_needed(ctx); + } + Ok(None) => { + // Plan failed, already handled via PlanFailed message + } } }), ) @@ -329,7 +348,8 @@ impl Handler for Producer { type Result = ResponseFuture>; fn handle(&mut self, msg: RegisterTxnPlan, ctx: &mut Self::Context) -> Self::Result { - if self.pending_plans.len() >= self.max_queue_size { + // Only limit the queue size for plans waiting to be executed + if self.plan_queue.len() >= self.max_queue_size { return Box::pin(async { Err(anyhow::anyhow!("Producer plan queue is full")) }); } @@ -343,7 +363,8 @@ impl Handler for Producer { // Add the plan to the back of the queue. self.plan_queue.push_back(msg.plan); - self.pending_plans.insert(plan_id, msg.responder); + // Store the responder globally (will respond when plan completes) + self.plan_responders.insert(plan_id, msg.responder); // A new plan has been added, so we attempt to trigger execution. // This is done synchronously within the handler to avoid race conditions. @@ -364,7 +385,9 @@ impl Handler for Producer { self.plan_queue.len() ); self.stats.success_plans_num += 1; - if let Some(responder) = self.pending_plans.remove(&msg.plan_id) { + // Remove from awaiting_completion and respond to the original requester + self.awaiting_completion.remove(&msg.plan_id); + if let Some(responder) = self.plan_responders.remove(&msg.plan_id) { let _ = responder.send(Ok(())); } @@ -385,8 +408,9 @@ impl Handler for Producer { self.plan_queue.len() ); self.stats.failed_plans_num += 1; - - if let Some(responder) = self.pending_plans.remove(&msg.plan_id) { + // Remove from awaiting_completion and respond with error + self.awaiting_completion.remove(&msg.plan_id); + if let Some(responder) = self.plan_responders.remove(&msg.plan_id) { let _ = responder.send(Err(anyhow::anyhow!("Plan failed: {}", msg.reason))); } diff --git a/src/config/bench_config.rs b/src/config/bench_config.rs index f6f24ed..270a5f4 100644 --- a/src/config/bench_config.rs +++ b/src/config/bench_config.rs @@ -3,6 +3,15 @@ use anyhow::{Context, Result}; use serde::{Deserialize, Deserializer, Serialize}; use std::path::Path; +/// Address pool type selection +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum AddressPoolType { + #[default] + Random, + Weighted, +} + /// Complete configuration structure #[derive(Debug, Clone, Deserialize, Serialize)] pub struct BenchConfig { @@ -14,6 +23,8 @@ pub struct BenchConfig { pub num_tokens: usize, pub target_tps: u64, pub enable_swap_token: bool, + #[serde(default)] + pub address_pool_type: AddressPoolType, } /// Node and chain configuration diff --git a/src/main.rs b/src/main.rs index 1e20475..431342a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -365,12 +365,26 @@ async fn start_bench() -> Result<()> { let account_manager = accout_generator.to_manager(); - let address_pool: Arc = Arc::new( - txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new( - account_ids.clone(), - account_manager.clone(), - ), - ); + let address_pool: Arc = match benchmark_config.address_pool_type { + config::AddressPoolType::Random => { + info!("Using RandomAddressPool"); + Arc::new( + txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new( + account_ids.clone(), + account_manager.clone(), + ), + ) + } + config::AddressPoolType::Weighted => { + info!("Using WeightedAddressPool"); + Arc::new( + txn_plan::addr_pool::weighted_address_pool::WeightedAddressPool::new( + account_ids.clone(), + account_manager.clone(), + ), + ) + } + }; // Use the same client instances for Consumer to share metrics let eth_providers: Vec = eth_clients diff --git a/src/txn_plan/addr_pool/mod.rs b/src/txn_plan/addr_pool/mod.rs index 115c3bb..a0cefc0 100644 --- a/src/txn_plan/addr_pool/mod.rs +++ b/src/txn_plan/addr_pool/mod.rs @@ -1,7 +1,6 @@ use crate::util::gen_account::AccountId; pub mod managed_address_pool; -#[allow(unused)] pub mod weighted_address_pool; #[async_trait::async_trait] diff --git a/src/txn_plan/addr_pool/weighted_address_pool.rs b/src/txn_plan/addr_pool/weighted_address_pool.rs index 3211cf4..c737883 100644 --- a/src/txn_plan/addr_pool/weighted_address_pool.rs +++ b/src/txn_plan/addr_pool/weighted_address_pool.rs @@ -1,12 +1,10 @@ -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; -use alloy::primitives::Address; use parking_lot::Mutex; use rand::seq::SliceRandom; -use tokio::sync::RwLock; use super::AddressPool; -use crate::util::gen_account::{AccountGenerator, AccountId, AccountManager}; +use crate::util::gen_account::{AccountId, AccountManager}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] enum AccountCategory { @@ -29,6 +27,7 @@ struct Inner { pub struct WeightedAddressPool { inner: Mutex, + #[allow(unused)] account_generator: AccountManager, }