From ff2d4d2de5f34513a2b00b67f14f365607a8cee0 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Fri, 19 Dec 2025 10:54:37 +0800 Subject: [PATCH 1/4] fix --- bench_config.template | 2 ++ src/config/bench_config.rs | 11 ++++++++ src/main.rs | 26 ++++++++++++++----- src/txn_plan/addr_pool/mod.rs | 1 - .../addr_pool/weighted_address_pool.rs | 7 +++-- 5 files changed, 36 insertions(+), 11 deletions(-) diff --git a/bench_config.template b/bench_config.template index 65cf948..22aa587 100644 --- a/bench_config.template +++ b/bench_config.template @@ -8,6 +8,8 @@ nodes = [ ] num_tokens=2 enable_swap_token = false +# Address pool type: "random" (default) or "weighted" (hot/normal/long-tail distribution) +address_pool_type = "random" # Faucet and deployer account configuration [faucet] # Private key (example, please replace with real private key, the follows is a example of reth) diff --git a/src/config/bench_config.rs b/src/config/bench_config.rs index f6f24ed..270a5f4 100644 --- a/src/config/bench_config.rs +++ b/src/config/bench_config.rs @@ -3,6 +3,15 @@ use anyhow::{Context, Result}; use serde::{Deserialize, Deserializer, Serialize}; use std::path::Path; +/// Address pool type selection +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum AddressPoolType { + #[default] + Random, + Weighted, +} + /// Complete configuration structure #[derive(Debug, Clone, Deserialize, Serialize)] pub struct BenchConfig { @@ -14,6 +23,8 @@ pub struct BenchConfig { pub num_tokens: usize, pub target_tps: u64, pub enable_swap_token: bool, + #[serde(default)] + pub address_pool_type: AddressPoolType, } /// Node and chain configuration diff --git a/src/main.rs b/src/main.rs index 1e20475..431342a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -365,12 +365,26 @@ async fn start_bench() -> Result<()> { let account_manager = accout_generator.to_manager(); - let address_pool: Arc = Arc::new( - txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new( - account_ids.clone(), - account_manager.clone(), - ), - ); + let address_pool: Arc = match benchmark_config.address_pool_type { + config::AddressPoolType::Random => { + info!("Using RandomAddressPool"); + Arc::new( + txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new( + account_ids.clone(), + account_manager.clone(), + ), + ) + } + config::AddressPoolType::Weighted => { + info!("Using WeightedAddressPool"); + Arc::new( + txn_plan::addr_pool::weighted_address_pool::WeightedAddressPool::new( + account_ids.clone(), + account_manager.clone(), + ), + ) + } + }; // Use the same client instances for Consumer to share metrics let eth_providers: Vec = eth_clients diff --git a/src/txn_plan/addr_pool/mod.rs b/src/txn_plan/addr_pool/mod.rs index 115c3bb..a0cefc0 100644 --- a/src/txn_plan/addr_pool/mod.rs +++ b/src/txn_plan/addr_pool/mod.rs @@ -1,7 +1,6 @@ use crate::util::gen_account::AccountId; pub mod managed_address_pool; -#[allow(unused)] pub mod weighted_address_pool; #[async_trait::async_trait] diff --git a/src/txn_plan/addr_pool/weighted_address_pool.rs b/src/txn_plan/addr_pool/weighted_address_pool.rs index 3211cf4..c737883 100644 --- a/src/txn_plan/addr_pool/weighted_address_pool.rs +++ b/src/txn_plan/addr_pool/weighted_address_pool.rs @@ -1,12 +1,10 @@ -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; -use alloy::primitives::Address; use parking_lot::Mutex; use rand::seq::SliceRandom; -use tokio::sync::RwLock; use super::AddressPool; -use crate::util::gen_account::{AccountGenerator, AccountId, AccountManager}; +use crate::util::gen_account::{AccountId, AccountManager}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] enum AccountCategory { @@ -29,6 +27,7 @@ struct Inner { pub struct WeightedAddressPool { inner: Mutex, + #[allow(unused)] account_generator: AccountManager, } From d8154157332b557ebb520c6635929061e12d2d43 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Sun, 21 Dec 2025 13:33:58 +0800 Subject: [PATCH 2/4] split dp --- src/actors/monitor/txn_tracker.rs | 4 ++-- src/actors/producer/producer_actor.rs | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/actors/monitor/txn_tracker.rs b/src/actors/monitor/txn_tracker.rs index 6b0aeee..4faf2be 100644 --- a/src/actors/monitor/txn_tracker.rs +++ b/src/actors/monitor/txn_tracker.rs @@ -19,8 +19,8 @@ const TXN_TIMEOUT: Duration = Duration::from_secs(600); // 10 minutes timeout const TPS_WINDOW: Duration = Duration::from_secs(17); // Backpressure configuration -const MAX_PENDING_TXNS: usize = 100_000; -const BACKPRESSURE_RESUME_THRESHOLD: usize = 80_000; // 80% of max +const MAX_PENDING_TXNS: usize = 200_000; +const BACKPRESSURE_RESUME_THRESHOLD: usize = 160_000; // 80% of max // Retry configuration const RETRY_TIMEOUT: Duration = Duration::from_secs(120); // Retry if stuck for 120s diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 605785a..116d6f7 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -311,13 +311,18 @@ impl Handler for Producer { None } .into_actor(self) - .map(|maybe_plan, act, _ctx| { + .map(|maybe_plan, act, ctx| { // This block runs after the async block is complete, safely on the actor's context. if let Some(plan) = maybe_plan { act.stats.remain_plans_num += 1; // CRITICAL: If the plan was not ready, push it back to the *front* of the queue // to ensure strict sequential execution. No other plan can run before it. act.plan_queue.push_front(plan); + // Re-trigger plan execution after a short delay to avoid busy-looping. + // This ensures the plan is retried when accounts become ready. + ctx.run_later(Duration::from_millis(100), |act, ctx| { + act.trigger_next_plan_if_needed(ctx); + }); } }), ) From 303083b3d642abe4d1140608a560ad3e22570445 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Sun, 21 Dec 2025 13:54:25 +0800 Subject: [PATCH 3/4] split dp --- src/actors/producer/producer_actor.rs | 67 +++++++++++++++++---------- 1 file changed, 42 insertions(+), 25 deletions(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 116d6f7..f8e72a8 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -1,6 +1,6 @@ use actix::prelude::*; use dashmap::DashMap; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -74,10 +74,14 @@ pub struct Producer { account_generator: AccountManager, /// A queue of plans waiting to be executed. Plans are processed in FIFO order. + /// Only this queue is subject to max_queue_size limit. plan_queue: VecDeque>, - /// Tracks pending plans to respond to the original requester upon completion. - pending_plans: HashMap>>, - /// The maximum number of plans allowed in the queue. + /// Plans that have finished producing transactions but are awaiting completion confirmation. + /// These do NOT count towards the queue size limit. + awaiting_completion: HashSet, + /// Global responder management across all plan states. + plan_responders: HashMap>>, + /// The maximum number of plans allowed in the queue (only applies to plan_queue). max_queue_size: usize, } @@ -112,7 +116,8 @@ impl Producer { monitor_addr, consumer_addr, plan_queue: VecDeque::new(), - pending_plans: HashMap::new(), + awaiting_completion: HashSet::new(), + plan_responders: HashMap::new(), max_queue_size: 10, }) } @@ -282,7 +287,7 @@ impl Handler for Producer { if !is_ready { // If not ready, return the plan so it can be put back at the front of the queue. tracing::debug!("Plan '{}' is not ready, re-queuing at front.", plan.name()); - return Some(plan); + return Err(plan); } // If ready, execute the plan. @@ -305,24 +310,31 @@ impl Handler for Producer { plan_id, reason: e.to_string(), }); + return Ok(None); } - // Return None as the plan has been consumed (either successfully started or failed). - None + // Return the plan_id for adding to awaiting_completion + Ok(Some(plan_id)) } .into_actor(self) - .map(|maybe_plan, act, ctx| { - // This block runs after the async block is complete, safely on the actor's context. - if let Some(plan) = maybe_plan { - act.stats.remain_plans_num += 1; - // CRITICAL: If the plan was not ready, push it back to the *front* of the queue - // to ensure strict sequential execution. No other plan can run before it. - act.plan_queue.push_front(plan); - // Re-trigger plan execution after a short delay to avoid busy-looping. - // This ensures the plan is retried when accounts become ready. - ctx.run_later(Duration::from_millis(100), |act, ctx| { - act.trigger_next_plan_if_needed(ctx); - }); + .map(|result, act, ctx| { + match result { + Err(plan) => { + // Plan was not ready, push it back to the front of the queue + act.stats.remain_plans_num += 1; + act.plan_queue.push_front(plan); + // Re-trigger plan execution after a short delay to avoid busy-looping. + ctx.run_later(Duration::from_millis(100), |act, ctx| { + act.trigger_next_plan_if_needed(ctx); + }); + } + Ok(Some(plan_id)) => { + // Plan executed successfully, move to awaiting_completion + act.awaiting_completion.insert(plan_id); + } + Ok(None) => { + // Plan failed, already handled via PlanFailed message + } } }), ) @@ -334,7 +346,8 @@ impl Handler for Producer { type Result = ResponseFuture>; fn handle(&mut self, msg: RegisterTxnPlan, ctx: &mut Self::Context) -> Self::Result { - if self.pending_plans.len() >= self.max_queue_size { + // Only limit the queue size for plans waiting to be executed + if self.plan_queue.len() >= self.max_queue_size { return Box::pin(async { Err(anyhow::anyhow!("Producer plan queue is full")) }); } @@ -348,7 +361,8 @@ impl Handler for Producer { // Add the plan to the back of the queue. self.plan_queue.push_back(msg.plan); - self.pending_plans.insert(plan_id, msg.responder); + // Store the responder globally (will respond when plan completes) + self.plan_responders.insert(plan_id, msg.responder); // A new plan has been added, so we attempt to trigger execution. // This is done synchronously within the handler to avoid race conditions. @@ -369,7 +383,9 @@ impl Handler for Producer { self.plan_queue.len() ); self.stats.success_plans_num += 1; - if let Some(responder) = self.pending_plans.remove(&msg.plan_id) { + // Remove from awaiting_completion and respond to the original requester + self.awaiting_completion.remove(&msg.plan_id); + if let Some(responder) = self.plan_responders.remove(&msg.plan_id) { let _ = responder.send(Ok(())); } @@ -390,8 +406,9 @@ impl Handler for Producer { self.plan_queue.len() ); self.stats.failed_plans_num += 1; - - if let Some(responder) = self.pending_plans.remove(&msg.plan_id) { + // Remove from awaiting_completion and respond with error + self.awaiting_completion.remove(&msg.plan_id); + if let Some(responder) = self.plan_responders.remove(&msg.plan_id) { let _ = responder.send(Err(anyhow::anyhow!("Plan failed: {}", msg.reason))); } From 275a9dd59d1f19c93ae3e2f6f613b114f7d232f9 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Mon, 22 Dec 2025 10:19:01 +0800 Subject: [PATCH 4/4] split dp --- src/actors/producer/producer_actor.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index f8e72a8..bdb92e6 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -331,6 +331,8 @@ impl Handler for Producer { Ok(Some(plan_id)) => { // Plan executed successfully, move to awaiting_completion act.awaiting_completion.insert(plan_id); + // Trigger execution of the next plan in the queue + act.trigger_next_plan_if_needed(ctx); } Ok(None) => { // Plan failed, already handled via PlanFailed message