Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bench_config.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ nodes = [
]
num_tokens=2
enable_swap_token = false
# Address pool type: "random" (default) or "weighted" (hot/normal/long-tail distribution)
address_pool_type = "random"
# Faucet and deployer account configuration
[faucet]
# Private key (example, please replace with real private key, the follows is a example of reth)
Expand Down
4 changes: 2 additions & 2 deletions src/actors/monitor/txn_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ const TXN_TIMEOUT: Duration = Duration::from_secs(600); // 10 minutes timeout
const TPS_WINDOW: Duration = Duration::from_secs(17);

// Backpressure configuration
const MAX_PENDING_TXNS: usize = 100_000;
const BACKPRESSURE_RESUME_THRESHOLD: usize = 80_000; // 80% of max
const MAX_PENDING_TXNS: usize = 200_000;
const BACKPRESSURE_RESUME_THRESHOLD: usize = 160_000; // 80% of max

// Retry configuration
const RETRY_TIMEOUT: Duration = Duration::from_secs(120); // Retry if stuck for 120s
Expand Down
64 changes: 44 additions & 20 deletions src/actors/producer/producer_actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use actix::prelude::*;
use dashmap::DashMap;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -74,10 +74,14 @@ pub struct Producer {
account_generator: AccountManager,

/// A queue of plans waiting to be executed. Plans are processed in FIFO order.
/// Only this queue is subject to max_queue_size limit.
plan_queue: VecDeque<Box<dyn TxnPlan>>,
/// Tracks pending plans to respond to the original requester upon completion.
pending_plans: HashMap<PlanId, tokio::sync::oneshot::Sender<Result<(), anyhow::Error>>>,
/// The maximum number of plans allowed in the queue.
/// Plans that have finished producing transactions but are awaiting completion confirmation.
/// These do NOT count towards the queue size limit.
awaiting_completion: HashSet<PlanId>,
/// Global responder management across all plan states.
plan_responders: HashMap<PlanId, tokio::sync::oneshot::Sender<Result<(), anyhow::Error>>>,
/// The maximum number of plans allowed in the queue (only applies to plan_queue).
max_queue_size: usize,
}

Expand Down Expand Up @@ -112,7 +116,8 @@ impl Producer {
monitor_addr,
consumer_addr,
plan_queue: VecDeque::new(),
pending_plans: HashMap::new(),
awaiting_completion: HashSet::new(),
plan_responders: HashMap::new(),
max_queue_size: 10,
})
}
Expand Down Expand Up @@ -282,7 +287,7 @@ impl Handler<ExeFrontPlan> for Producer {
if !is_ready {
// If not ready, return the plan so it can be put back at the front of the queue.
tracing::debug!("Plan '{}' is not ready, re-queuing at front.", plan.name());
return Some(plan);
return Err(plan);
}

// If ready, execute the plan.
Expand All @@ -305,19 +310,33 @@ impl Handler<ExeFrontPlan> for Producer {
plan_id,
reason: e.to_string(),
});
return Ok(None);
}

// Return None as the plan has been consumed (either successfully started or failed).
None
// Return the plan_id for adding to awaiting_completion
Ok(Some(plan_id))
}
.into_actor(self)
.map(|maybe_plan, act, _ctx| {
// This block runs after the async block is complete, safely on the actor's context.
if let Some(plan) = maybe_plan {
act.stats.remain_plans_num += 1;
// CRITICAL: If the plan was not ready, push it back to the *front* of the queue
// to ensure strict sequential execution. No other plan can run before it.
act.plan_queue.push_front(plan);
.map(|result, act, ctx| {
match result {
Err(plan) => {
// Plan was not ready, push it back to the front of the queue
act.stats.remain_plans_num += 1;
act.plan_queue.push_front(plan);
// Re-trigger plan execution after a short delay to avoid busy-looping.
ctx.run_later(Duration::from_millis(100), |act, ctx| {
act.trigger_next_plan_if_needed(ctx);
});
}
Ok(Some(plan_id)) => {
// Plan executed successfully, move to awaiting_completion
act.awaiting_completion.insert(plan_id);
// Trigger execution of the next plan in the queue
act.trigger_next_plan_if_needed(ctx);
}
Ok(None) => {
// Plan failed, already handled via PlanFailed message
}
}
}),
)
Expand All @@ -329,7 +348,8 @@ impl Handler<RegisterTxnPlan> for Producer {
type Result = ResponseFuture<Result<(), anyhow::Error>>;

fn handle(&mut self, msg: RegisterTxnPlan, ctx: &mut Self::Context) -> Self::Result {
if self.pending_plans.len() >= self.max_queue_size {
// Only limit the queue size for plans waiting to be executed
if self.plan_queue.len() >= self.max_queue_size {
return Box::pin(async { Err(anyhow::anyhow!("Producer plan queue is full")) });
}

Expand All @@ -343,7 +363,8 @@ impl Handler<RegisterTxnPlan> for Producer {

// Add the plan to the back of the queue.
self.plan_queue.push_back(msg.plan);
self.pending_plans.insert(plan_id, msg.responder);
// Store the responder globally (will respond when plan completes)
self.plan_responders.insert(plan_id, msg.responder);

// A new plan has been added, so we attempt to trigger execution.
// This is done synchronously within the handler to avoid race conditions.
Expand All @@ -364,7 +385,9 @@ impl Handler<PlanCompleted> for Producer {
self.plan_queue.len()
);
self.stats.success_plans_num += 1;
if let Some(responder) = self.pending_plans.remove(&msg.plan_id) {
// Remove from awaiting_completion and respond to the original requester
self.awaiting_completion.remove(&msg.plan_id);
if let Some(responder) = self.plan_responders.remove(&msg.plan_id) {
let _ = responder.send(Ok(()));
}

Expand All @@ -385,8 +408,9 @@ impl Handler<PlanFailed> for Producer {
self.plan_queue.len()
);
self.stats.failed_plans_num += 1;

if let Some(responder) = self.pending_plans.remove(&msg.plan_id) {
// Remove from awaiting_completion and respond with error
self.awaiting_completion.remove(&msg.plan_id);
if let Some(responder) = self.plan_responders.remove(&msg.plan_id) {
let _ = responder.send(Err(anyhow::anyhow!("Plan failed: {}", msg.reason)));
}

Expand Down
11 changes: 11 additions & 0 deletions src/config/bench_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ use anyhow::{Context, Result};
use serde::{Deserialize, Deserializer, Serialize};
use std::path::Path;

/// Address pool type selection
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum AddressPoolType {
#[default]
Random,
Weighted,
}

/// Complete configuration structure
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BenchConfig {
Expand All @@ -14,6 +23,8 @@ pub struct BenchConfig {
pub num_tokens: usize,
pub target_tps: u64,
pub enable_swap_token: bool,
#[serde(default)]
pub address_pool_type: AddressPoolType,
}

/// Node and chain configuration
Expand Down
26 changes: 20 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,26 @@ async fn start_bench() -> Result<()> {

let account_manager = accout_generator.to_manager();

let address_pool: Arc<dyn AddressPool> = Arc::new(
txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(
account_ids.clone(),
account_manager.clone(),
),
);
let address_pool: Arc<dyn AddressPool> = match benchmark_config.address_pool_type {
config::AddressPoolType::Random => {
info!("Using RandomAddressPool");
Arc::new(
txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(
account_ids.clone(),
account_manager.clone(),
),
)
}
config::AddressPoolType::Weighted => {
info!("Using WeightedAddressPool");
Arc::new(
txn_plan::addr_pool::weighted_address_pool::WeightedAddressPool::new(
account_ids.clone(),
account_manager.clone(),
),
)
}
};

// Use the same client instances for Consumer to share metrics
let eth_providers: Vec<EthHttpCli> = eth_clients
Expand Down
1 change: 0 additions & 1 deletion src/txn_plan/addr_pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::util::gen_account::AccountId;

pub mod managed_address_pool;
#[allow(unused)]
pub mod weighted_address_pool;

#[async_trait::async_trait]
Expand Down
7 changes: 3 additions & 4 deletions src/txn_plan/addr_pool/weighted_address_pool.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;

use alloy::primitives::Address;
use parking_lot::Mutex;
use rand::seq::SliceRandom;
use tokio::sync::RwLock;

use super::AddressPool;
use crate::util::gen_account::{AccountGenerator, AccountId, AccountManager};
use crate::util::gen_account::{AccountId, AccountManager};

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum AccountCategory {
Expand All @@ -29,6 +27,7 @@ struct Inner {

pub struct WeightedAddressPool {
inner: Mutex<Inner>,
#[allow(unused)]
account_generator: AccountManager,
}

Expand Down
Loading