From 8c5daad1b0fb50227d70f25d7e90a71753b8b158 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 10:26:17 +0800 Subject: [PATCH 01/21] feat: use accout id rather than index --- src/actors/producer/producer_actor.rs | 71 ++++++++--- src/main.rs | 17 +-- .../addr_pool/managed_address_pool.rs | 80 ++++++------ src/txn_plan/addr_pool/mod.rs | 14 +- .../addr_pool/weighted_address_pool.rs | 120 ++++++++++-------- src/txn_plan/constructor/faucet.rs | 7 +- src/util/gen_account.rs | 21 ++- 7 files changed, 198 insertions(+), 132 deletions(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 2394c03..1591739 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -72,6 +72,8 @@ pub struct Producer { consumer_addr: Addr, nonce_cache: Arc, u32>>, + + account_generator: Arc>, /// A queue of plans waiting to be executed. Plans are processed in FIFO order. plan_queue: VecDeque>, @@ -89,14 +91,15 @@ impl Producer { account_generator: Arc>, ) -> Result { let nonce_cache = Arc::new(DashMap::new()); - let account_generator = account_generator.read().await; + let gen = account_generator.read().await; address_pool.clean_ready_accounts(); - for (account, nonce) in account_generator.accouts_nonce_iter() { - let address = Arc::new(account.address()); + for (account_id, nonce) in gen.account_ids_with_nonce() { + let address = Arc::new(gen.get_address_by_id(account_id)); let nonce = nonce.load(Ordering::Relaxed) as u32; nonce_cache.insert(address.clone(), nonce); - address_pool.unlock_correct_nonce(address.clone(), nonce); + address_pool.unlock_correct_nonce(account_id, nonce); } + drop(gen); Ok(Self { state: ProducerState::running(), stats: ProducerStats { @@ -110,6 +113,7 @@ impl Producer { }, address_pool, nonce_cache, + account_generator, monitor_addr, consumer_addr, plan_queue: VecDeque::new(), @@ -152,6 +156,7 @@ impl Producer { monitor_addr: Addr, consumer_addr: Addr, address_pool: Arc, + account_generator: Arc>, mut plan: Box, sending_txns: Arc, state: ProducerState, @@ -160,8 +165,23 @@ impl Producer { let plan_id = plan.id().clone(); // Fetch accounts and build transactions - let ready_accounts = + let ready_account_ids = address_pool.fetch_senders(plan.size().unwrap_or_else(|| address_pool.len())); + + // Convert AccountId to (signer, address, nonce) for build_txns + let gen = account_generator.read().await; + let ready_accounts: Vec<_> = ready_account_ids + .into_iter() + .map(|(account_id, nonce)| { + ( + Arc::new(gen.get_signer_by_id(account_id).clone()), + Arc::new(gen.get_address_by_id(account_id)), + nonce, + ) + }) + .collect(); + drop(gen); + let iterator = plan.as_mut().build_txns(ready_accounts)?; // If the plan doesn't consume nonces, accounts can be used by other processes immediately. @@ -269,6 +289,7 @@ impl Handler for Producer { let plan = self.plan_queue.pop_front().unwrap(); self.stats.remain_plans_num -= 1; let address_pool = self.address_pool.clone(); + let account_generator = self.account_generator.clone(); let monitor_addr = self.monitor_addr.clone(); let consumer_addr = self.consumer_addr.clone(); let self_addr = ctx.address(); @@ -292,6 +313,7 @@ impl Handler for Producer { monitor_addr, consumer_addr, address_pool, + account_generator, plan, sending_txns, state, @@ -401,6 +423,7 @@ impl Handler for Producer { fn handle(&mut self, msg: UpdateSubmissionResult, _ctx: &mut Self::Context) -> Self::Result { let address_pool = self.address_pool.clone(); + let account_generator = self.account_generator.clone(); let account = msg.metadata.from_account.clone(); self.stats.sending_txns.fetch_sub(1, Ordering::Relaxed); match msg.result.as_ref() { @@ -419,22 +442,30 @@ impl Handler for Producer { let ready_accounts = self.stats.ready_accounts.clone(); Box::pin( async move { - match msg.result.as_ref() { - SubmissionResult::Success(_) => { - address_pool.unlock_next_nonce(account); - } - SubmissionResult::NonceTooLow { expect_nonce, .. } => { - tracing::debug!( - "Nonce too low for account {:?}, expect nonce: {}, actual nonce: {}", - account, - expect_nonce, - msg.metadata.nonce - ); - address_pool.unlock_correct_nonce(account, *expect_nonce as u32); - } - SubmissionResult::ErrorWithRetry => { - address_pool.retry_current_nonce(account); + let gen = account_generator.read().await; + let account_id = gen.get_id_by_address(&account); + drop(gen); + + if let Some(account_id) = account_id { + match msg.result.as_ref() { + SubmissionResult::Success(_) => { + address_pool.unlock_next_nonce(account_id); + } + SubmissionResult::NonceTooLow { expect_nonce, .. } => { + tracing::debug!( + "Nonce too low for account {:?}, expect nonce: {}, actual nonce: {}", + account, + expect_nonce, + msg.metadata.nonce + ); + address_pool.unlock_correct_nonce(account_id, *expect_nonce as u32); + } + SubmissionResult::ErrorWithRetry => { + address_pool.retry_current_nonce(account_id); + } } + } else { + tracing::warn!("Account {:?} not found in account generator", account); } ready_accounts.store(address_pool.ready_len() as u64, Ordering::Relaxed); } diff --git a/src/main.rs b/src/main.rs index 764b368..e731619 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader as TokioBufReader}, + io::{AsyncBufReadExt, BufReader as TokioBufReader}, sync::RwLock, }; use tracing::{info, Level}; @@ -266,17 +266,18 @@ async fn start_bench() -> Result<()> { }; let accout_generator = AccountGenerator::with_capacity(benchmark_config.accounts.num_accounts); - let accounts = accout_generator + let account_ids = accout_generator .write() .await .gen_account(0, benchmark_config.accounts.num_accounts as u64) .unwrap(); - let account_addresses = Arc::new( - accounts + let account_addresses = Arc::new({ + let gen = accout_generator.read().await; + account_ids .iter() - .map(|(address, _)| address.clone()) - .collect::>(), - ); + .map(|&id| Arc::new(gen.get_address_by_id(id))) + .collect::>() + }); // Create EthHttpCli instance let eth_clients: Vec> = benchmark_config .nodes @@ -288,7 +289,7 @@ async fn start_bench() -> Result<()> { .collect(); let address_pool: Arc = Arc::new( - txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(accounts.clone()), + txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(account_ids.clone(), accout_generator.clone()), ); let chain_id = benchmark_config.nodes[0].chain_id; diff --git a/src/txn_plan/addr_pool/managed_address_pool.rs b/src/txn_plan/addr_pool/managed_address_pool.rs index a28fb71..89bca84 100644 --- a/src/txn_plan/addr_pool/managed_address_pool.rs +++ b/src/txn_plan/addr_pool/managed_address_pool.rs @@ -1,15 +1,18 @@ use std::{collections::HashMap, sync::Arc}; -use alloy::{primitives::Address, signers::local::PrivateKeySigner}; +use alloy::primitives::Address; use parking_lot::Mutex; +use tokio::sync::RwLock; use super::AddressPool; +use crate::util::gen_account::{AccountGenerator, AccountId}; struct Inner { - account_signers: HashMap, Arc>, - account_status: HashMap, u32>, - ready_accounts: Vec<(Arc, Arc
, u32)>, - all_account_addresses: Vec>, + account_status: HashMap, + ready_accounts: Vec<(AccountId, u32)>, + all_account_ids: Vec, + account_id_to_address: HashMap, + address_to_account_id: HashMap, } pub struct RandomAddressPool { @@ -18,27 +21,31 @@ pub struct RandomAddressPool { impl RandomAddressPool { #[allow(unused)] - pub fn new(account_signers: Vec<(Arc
, Arc)>) -> Self { + pub fn new(account_ids: Vec, account_generator: Arc>) -> Self { let mut account_status = HashMap::new(); let mut ready_accounts = Vec::new(); - let mut hashmap = HashMap::new(); - let all_account_addresses: Vec> = account_signers - .iter() - .map(|(address, _)| address.clone()) - .collect(); - for (addr, signer) in account_signers.iter() { + let mut account_id_to_address = HashMap::new(); + let mut address_to_account_id = HashMap::new(); + + // Build address mapping synchronously by blocking on the async read + let gen = tokio::runtime::Handle::current().block_on(account_generator.read()); + for &account_id in account_ids.iter() { // assume all address start from nonce, this is correct beacause a nonce too low error will trigger correct nonce let nonce = 0; - hashmap.insert(addr.clone(), signer.clone()); - account_status.insert(addr.clone(), nonce); - ready_accounts.push((signer.clone(), addr.clone(), nonce)); + let address = gen.get_address_by_id(account_id); + account_status.insert(account_id, nonce); + ready_accounts.push((account_id, nonce)); + account_id_to_address.insert(account_id, address); + address_to_account_id.insert(address, account_id); } + drop(gen); let inner = Inner { - account_signers: hashmap, account_status, ready_accounts, - all_account_addresses, + all_account_ids: account_ids, + account_id_to_address, + address_to_account_id, }; Self { @@ -48,7 +55,7 @@ impl RandomAddressPool { } impl AddressPool for RandomAddressPool { - fn fetch_senders(&self, count: usize) -> Vec<(Arc, Arc
, u32)> { + fn fetch_senders(&self, count: usize) -> Vec<(AccountId, u32)> { let mut inner = self.inner.lock(); let len = inner.ready_accounts.len(); if count < len { @@ -63,51 +70,47 @@ impl AddressPool for RandomAddressPool { inner.ready_accounts.clear(); } - fn unlock_next_nonce(&self, account: Arc
) { + fn unlock_next_nonce(&self, account: AccountId) { let mut inner = self.inner.lock(); if let Some(status) = inner.account_status.get_mut(&account) { *status += 1; - let signer = inner.account_signers.get(&account).unwrap().clone(); let status = *inner.account_status.get(&account).unwrap(); - inner.ready_accounts.push((signer, account.clone(), status)); + inner.ready_accounts.push((account, status)); } } - fn unlock_correct_nonce(&self, account: Arc
, nonce: u32) { + fn unlock_correct_nonce(&self, account: AccountId, nonce: u32) { let mut inner = self.inner.lock(); if let Some(status) = inner.account_status.get_mut(&account) { *status = nonce; let status = *status; - let signer = inner.account_signers.get(&account).unwrap().clone(); - inner.ready_accounts.push((signer, account.clone(), status)); + inner.ready_accounts.push((account, status)); } } - fn retry_current_nonce(&self, account: Arc
) { + fn retry_current_nonce(&self, account: AccountId) { let mut inner = self.inner.lock(); if inner.account_status.get_mut(&account).is_some() { - let signer = inner.account_signers.get(&account).unwrap().clone(); let status = *inner.account_status.get(&account).unwrap(); - inner.ready_accounts.push((signer, account.clone(), status)); + inner.ready_accounts.push((account, status)); } } fn resume_all_accounts(&self) { let mut inner = self.inner.lock(); inner.ready_accounts = inner - .all_account_addresses + .all_account_ids .iter() - .map(|account| { - let signer = inner.account_signers.get(account).unwrap().clone(); - let status = *inner.account_status.get(account).unwrap(); - (signer, account.clone(), status) + .map(|&account_id| { + let status = *inner.account_status.get(&account_id).unwrap(); + (account_id, status) }) .collect(); } fn is_full_ready(&self) -> bool { let inner = self.inner.lock(); - inner.ready_accounts.len() == inner.all_account_addresses.len() + inner.ready_accounts.len() == inner.all_account_ids.len() } fn ready_len(&self) -> usize { @@ -115,16 +118,17 @@ impl AddressPool for RandomAddressPool { } fn len(&self) -> usize { - self.inner.lock().account_signers.len() + self.inner.lock().all_account_ids.len() } fn select_receiver(&self, excluded: &Address) -> Address { let inner = self.inner.lock(); + let excluded_id = inner.address_to_account_id.get(excluded); loop { - let idx = rand::random::() % inner.all_account_addresses.len(); - let to_address = inner.all_account_addresses[idx].clone(); - if to_address.as_ref() != excluded { - return *to_address; + let idx = rand::random::() % inner.all_account_ids.len(); + let account_id = inner.all_account_ids[idx]; + if Some(&account_id) != excluded_id { + return *inner.account_id_to_address.get(&account_id).unwrap(); } } } diff --git a/src/txn_plan/addr_pool/mod.rs b/src/txn_plan/addr_pool/mod.rs index e5aed4c..e607ef6 100644 --- a/src/txn_plan/addr_pool/mod.rs +++ b/src/txn_plan/addr_pool/mod.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; +use alloy::primitives::Address; -use alloy::{primitives::Address, signers::local::PrivateKeySigner}; +use crate::util::gen_account::AccountId; pub mod managed_address_pool; #[allow(unused)] @@ -9,16 +9,17 @@ pub mod weighted_address_pool; pub trait AddressPool: Send + Sync { /// Fetches a batch of ready sender accounts based on the internal sampling strategy. /// This operation should internally lock the accounts to prevent concurrent use. - fn fetch_senders(&self, count: usize) -> Vec<(Arc, Arc
, u32)>; + /// Returns (AccountId, nonce) pairs. + fn fetch_senders(&self, count: usize) -> Vec<(AccountId, u32)>; /// Unlocks an account after a successful transaction and increments its nonce. - fn unlock_next_nonce(&self, account: Arc
); + fn unlock_next_nonce(&self, account: AccountId); /// Unlocks an account and updates its nonce to a specific value. - fn unlock_correct_nonce(&self, account: Arc
, nonce: u32); + fn unlock_correct_nonce(&self, account: AccountId, nonce: u32); /// Makes an account available again for retry, using the same nonce. - fn retry_current_nonce(&self, account: Arc
); + fn retry_current_nonce(&self, account: AccountId); /// Resumes all accounts, making them available again. fn resume_all_accounts(&self); @@ -35,5 +36,6 @@ pub trait AddressPool: Send + Sync { fn len(&self) -> usize; /// Selects a receiver address based on the internal sampling strategy. + /// The excluded parameter is the address to exclude from selection. fn select_receiver(&self, excluded: &Address) -> Address; } diff --git a/src/txn_plan/addr_pool/weighted_address_pool.rs b/src/txn_plan/addr_pool/weighted_address_pool.rs index 01e4872..a7b1cb4 100644 --- a/src/txn_plan/addr_pool/weighted_address_pool.rs +++ b/src/txn_plan/addr_pool/weighted_address_pool.rs @@ -1,10 +1,12 @@ use std::{collections::HashMap, sync::Arc}; -use alloy::{primitives::Address, signers::local::PrivateKeySigner}; +use alloy::primitives::Address; use parking_lot::Mutex; use rand::seq::SliceRandom; +use tokio::sync::RwLock; use super::AddressPool; +use crate::util::gen_account::{AccountGenerator, AccountId}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] enum AccountCategory { @@ -15,15 +17,16 @@ enum AccountCategory { struct Inner { // Static data - account_signers: HashMap, Arc>, - account_categories: HashMap, AccountCategory>, - all_account_addresses: Vec>, + account_categories: HashMap, + all_account_ids: Vec, + account_id_to_address: HashMap, + address_to_account_id: HashMap, // Dynamic data - account_status: HashMap, u32>, - hot_ready_accounts: Vec<(Arc, Arc
, u32)>, - normal_ready_accounts: Vec<(Arc, Arc
, u32)>, - long_tail_ready_accounts: Vec<(Arc, Arc
, u32)>, + account_status: HashMap, + hot_ready_accounts: Vec<(AccountId, u32)>, + normal_ready_accounts: Vec<(AccountId, u32)>, + long_tail_ready_accounts: Vec<(AccountId, u32)>, } pub struct WeightedAddressPool { @@ -31,13 +34,12 @@ pub struct WeightedAddressPool { } impl WeightedAddressPool { - pub fn new(account_signers: HashMap, Arc>) -> Self { - let mut all_account_addresses: Vec> = - account_signers.keys().cloned().collect(); + pub fn new(account_ids: Vec, account_generator: Arc>) -> Self { + let mut all_account_ids = account_ids; // Shuffle for random distribution - all_account_addresses.shuffle(&mut rand::thread_rng()); + all_account_ids.shuffle(&mut rand::thread_rng()); - let total_accounts = all_account_addresses.len(); + let total_accounts = all_account_ids.len(); let hot_count = (total_accounts as f64 * 0.2).round() as usize; let normal_count = (total_accounts as f64 * 0.1).round() as usize; @@ -45,30 +47,40 @@ impl WeightedAddressPool { let mut hot_accounts = Vec::with_capacity(hot_count); let mut normal_accounts = Vec::with_capacity(normal_count); let mut long_tail_accounts = Vec::with_capacity(total_accounts - hot_count - normal_count); - - for (i, addr) in all_account_addresses.iter().enumerate() { + let mut account_id_to_address = HashMap::new(); + let mut address_to_account_id = HashMap::new(); + + // Build address mapping synchronously by blocking on the async read + let gen = tokio::runtime::Handle::current().block_on(account_generator.read()); + + for (i, &account_id) in all_account_ids.iter().enumerate() { + let address = gen.get_address_by_id(account_id); + account_id_to_address.insert(account_id, address); + address_to_account_id.insert(address, account_id); + if i < hot_count { - account_categories.insert(addr.clone(), AccountCategory::Hot); - hot_accounts.push(addr.clone()); + account_categories.insert(account_id, AccountCategory::Hot); + hot_accounts.push(account_id); } else if i < hot_count + normal_count { - account_categories.insert(addr.clone(), AccountCategory::Normal); - normal_accounts.push(addr.clone()); + account_categories.insert(account_id, AccountCategory::Normal); + normal_accounts.push(account_id); } else { - account_categories.insert(addr.clone(), AccountCategory::LongTail); - long_tail_accounts.push(addr.clone()); + account_categories.insert(account_id, AccountCategory::LongTail); + long_tail_accounts.push(account_id); } } + drop(gen); let mut account_status = HashMap::new(); let mut hot_ready_accounts = Vec::new(); let mut normal_ready_accounts = Vec::new(); let mut long_tail_ready_accounts = Vec::new(); - for (addr, signer) in &account_signers { + for &account_id in &all_account_ids { let nonce = 0; - account_status.insert(addr.clone(), nonce); - let ready_tuple = (signer.clone(), addr.clone(), nonce); - match account_categories.get(addr).unwrap() { + account_status.insert(account_id, nonce); + let ready_tuple = (account_id, nonce); + match account_categories.get(&account_id).unwrap() { AccountCategory::Hot => hot_ready_accounts.push(ready_tuple), AccountCategory::Normal => normal_ready_accounts.push(ready_tuple), AccountCategory::LongTail => long_tail_ready_accounts.push(ready_tuple), @@ -76,9 +88,10 @@ impl WeightedAddressPool { } let inner = Inner { - account_signers, account_categories, - all_account_addresses, + all_account_ids, + account_id_to_address, + address_to_account_id, account_status, hot_ready_accounts, normal_ready_accounts, @@ -90,7 +103,7 @@ impl WeightedAddressPool { } } - fn unlock_account(&self, account: Arc
, nonce: Option) { + fn unlock_account(&self, account: AccountId, nonce: Option) { let mut inner = self.inner.lock(); if let Some(current_nonce) = inner.account_status.get_mut(&account) { let new_nonce = match nonce { @@ -99,8 +112,7 @@ impl WeightedAddressPool { }; *current_nonce = new_nonce; - let signer = inner.account_signers.get(&account).unwrap().clone(); - let ready_tuple = (signer, account.clone(), new_nonce); + let ready_tuple = (account, new_nonce); match inner.account_categories.get(&account).unwrap() { AccountCategory::Hot => inner.hot_ready_accounts.push(ready_tuple), @@ -112,7 +124,7 @@ impl WeightedAddressPool { } impl AddressPool for WeightedAddressPool { - fn fetch_senders(&self, count: usize) -> Vec<(Arc, Arc
, u32)> { + fn fetch_senders(&self, count: usize) -> Vec<(AccountId, u32)> { let mut inner = self.inner.lock(); let mut result = Vec::with_capacity(count); @@ -154,32 +166,32 @@ impl AddressPool for WeightedAddressPool { } fn clean_ready_accounts(&self) { - self.inner.lock().hot_ready_accounts.clear(); - self.inner.lock().normal_ready_accounts.clear(); - self.inner.lock().long_tail_ready_accounts.clear(); + let mut inner = self.inner.lock(); + inner.hot_ready_accounts.clear(); + inner.normal_ready_accounts.clear(); + inner.long_tail_ready_accounts.clear(); } - fn unlock_next_nonce(&self, account: Arc
) { + fn unlock_next_nonce(&self, account: AccountId) { self.unlock_account(account, None); } - fn unlock_correct_nonce(&self, account: Arc
, nonce: u32) { + fn unlock_correct_nonce(&self, account: AccountId, nonce: u32) { self.unlock_account(account, Some(nonce)); } - fn retry_current_nonce(&self, account: Arc
) { + fn retry_current_nonce(&self, account: AccountId) { let mut inner = self.inner.lock(); let maybe_data = if let Some(nonce) = inner.account_status.get(&account) { - let signer = inner.account_signers.get(&account).unwrap().clone(); let category = *inner.account_categories.get(&account).unwrap(); - Some((*nonce, signer, category)) + Some((*nonce, category)) } else { None }; - if let Some((nonce, signer, category)) = maybe_data { - let ready_tuple = (signer, account.clone(), nonce); + if let Some((nonce, category)) = maybe_data { + let ready_tuple = (account, nonce); match category { AccountCategory::Hot => inner.hot_ready_accounts.push(ready_tuple), AccountCategory::Normal => inner.normal_ready_accounts.push(ready_tuple), @@ -197,17 +209,16 @@ impl AddressPool for WeightedAddressPool { let mut normal_ready_accounts = Vec::new(); let mut long_tail_ready_accounts = Vec::new(); - for account in &inner.all_account_addresses { - let maybe_data = if let Some(nonce) = inner.account_status.get(account) { - let signer = inner.account_signers.get(account).unwrap().clone(); - let category = *inner.account_categories.get(account).unwrap(); - Some((*nonce, signer, category)) + for &account_id in &inner.all_account_ids { + let maybe_data = if let Some(nonce) = inner.account_status.get(&account_id) { + let category = *inner.account_categories.get(&account_id).unwrap(); + Some((*nonce, category)) } else { None }; - if let Some((nonce, signer, category)) = maybe_data { - let ready_tuple = (signer, account.clone(), nonce); + if let Some((nonce, category)) = maybe_data { + let ready_tuple = (account_id, nonce); match category { AccountCategory::Hot => hot_ready_accounts.push(ready_tuple), AccountCategory::Normal => normal_ready_accounts.push(ready_tuple), @@ -226,7 +237,7 @@ impl AddressPool for WeightedAddressPool { let ready_count = inner.hot_ready_accounts.len() + inner.normal_ready_accounts.len() + inner.long_tail_ready_accounts.len(); - ready_count == inner.all_account_addresses.len() + ready_count == inner.all_account_ids.len() } fn ready_len(&self) -> usize { @@ -237,16 +248,17 @@ impl AddressPool for WeightedAddressPool { } fn len(&self) -> usize { - self.inner.lock().account_signers.len() + self.inner.lock().all_account_ids.len() } fn select_receiver(&self, excluded: &Address) -> Address { let inner = self.inner.lock(); + let excluded_id = inner.address_to_account_id.get(excluded); loop { - let idx = rand::random::() % inner.all_account_addresses.len(); - let to_address = inner.all_account_addresses[idx].clone(); - if to_address.as_ref() != excluded { - return *to_address; + let idx = rand::random::() % inner.all_account_ids.len(); + let account_id = inner.all_account_ids[idx]; + if Some(&account_id) != excluded_id { + return *inner.account_id_to_address.get(&account_id).unwrap(); } } } diff --git a/src/txn_plan/constructor/faucet.rs b/src/txn_plan/constructor/faucet.rs index 8f6ea0d..db5ab72 100644 --- a/src/txn_plan/constructor/faucet.rs +++ b/src/txn_plan/constructor/faucet.rs @@ -116,15 +116,16 @@ impl FaucetTreePlanBuilder { let num_intermediate_levels = total_levels - 1; for level in 0..num_intermediate_levels { let num_accounts_at_level = degree.pow(level as u32 + 1); - let accounts = account_generator + let account_ids = account_generator .write() .await .gen_account(start_index as u64, num_accounts_at_level as u64) .unwrap(); + let gen = account_generator.read().await; account_levels.push( - accounts + account_ids .iter() - .map(|(_, signer)| signer.clone()) + .map(|&id| Arc::new(gen.get_signer_by_id(id).clone())) .collect::>(), ); start_index += num_accounts_at_level as usize; diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index 49e76b3..0787988 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -32,6 +32,18 @@ impl AccountGenerator { })) } + pub fn get_signer_by_id(&self, id: AccountId) -> &PrivateKeySigner { + &self.accouts[id.0 as usize] + } + + pub fn get_address_by_id(&self, id: AccountId) -> Address { + self.accouts[id.0 as usize].address() + } + + pub fn get_id_by_address(&self, address: &Address) -> Option { + self.accout_to_id.get(address).copied() + } + pub fn init_nonce_map(&self) -> HashMap { let mut map = HashMap::new(); for (account, nonce) in self.accouts_nonce_iter() { @@ -44,11 +56,15 @@ impl AccountGenerator { self.accouts.iter().zip(self.init_nonces.iter().cloned()) } + pub fn account_ids_with_nonce(&self) -> impl Iterator)> + '_ { + (0..self.accouts.len()).map(|i| (AccountId(i as u32), self.init_nonces[i].clone())) + } + pub fn gen_account( &mut self, start_index: u64, size: u64, - ) -> Result, Arc)>> { + ) -> Result> { let begin_index = self.accouts.len() as u64; let end_index = start_index + size; if begin_index < end_index { @@ -63,8 +79,7 @@ impl AccountGenerator { } let mut res = Vec::new(); for i in 0..size { - let signer = self.accouts[(start_index + i) as usize].clone(); - res.push((Arc::new(signer.address()), Arc::new(signer))); + res.push(AccountId((start_index + i) as u32)); } Ok(res) } From bec8609e8fc1247221369fd1591718e46cfbff92 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 10:42:25 +0800 Subject: [PATCH 02/21] feat: use accout id rather than index --- src/actors/producer/producer_actor.rs | 18 ++---------- src/txn_plan/faucet_plan.rs | 3 +- src/txn_plan/plan.rs | 41 ++++++++++++++++++++++----- src/txn_plan/traits.rs | 11 ++++--- 4 files changed, 45 insertions(+), 28 deletions(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 1591739..e37ded8 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -165,24 +165,10 @@ impl Producer { let plan_id = plan.id().clone(); // Fetch accounts and build transactions - let ready_account_ids = + let ready_accounts = address_pool.fetch_senders(plan.size().unwrap_or_else(|| address_pool.len())); - // Convert AccountId to (signer, address, nonce) for build_txns - let gen = account_generator.read().await; - let ready_accounts: Vec<_> = ready_account_ids - .into_iter() - .map(|(account_id, nonce)| { - ( - Arc::new(gen.get_signer_by_id(account_id).clone()), - Arc::new(gen.get_address_by_id(account_id)), - nonce, - ) - }) - .collect(); - drop(gen); - - let iterator = plan.as_mut().build_txns(ready_accounts)?; + let iterator = plan.as_mut().build_txns(ready_accounts, account_generator)?; // If the plan doesn't consume nonces, accounts can be used by other processes immediately. if !iterator.consume_nonce { diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index f9e9208..082efc1 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -106,7 +106,8 @@ impl TxnPlan for LevelFaucetPlan { fn build_txns( &mut self, - _ready_accounts: Vec<(Arc, Arc
, u32)>, + _ready_accounts: Vec<(crate::util::gen_account::AccountId, u32)>, + _account_generator: Arc>, ) -> Result { let plan_id = self.id.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); diff --git a/src/txn_plan/plan.rs b/src/txn_plan/plan.rs index 42e3092..3abde00 100644 --- a/src/txn_plan/plan.rs +++ b/src/txn_plan/plan.rs @@ -7,14 +7,15 @@ use crate::{ }, TxnIter, }, + util::gen_account::{AccountGenerator, AccountId}, }; use alloy::{ - consensus::transaction::SignerRecoverable, eips::Encodable2718, primitives::Address, - signers::local::PrivateKeySigner, + consensus::transaction::SignerRecoverable, eips::Encodable2718, }; use rayon::iter::IntoParallelIterator; use rayon::iter::ParallelIterator; use std::sync::Arc; +use tokio::sync::RwLock; use uuid::Uuid; /// Concurrency control parameters @@ -74,14 +75,30 @@ impl TxnPlan for ManyToOnePlan { fn build_txns( &mut self, - ready_accounts: Vec<(Arc, Arc
, u32)>, + ready_accounts: Vec<(AccountId, u32)>, + account_generator: Arc>, ) -> Result { let plan_id = self.id.clone(); let constructor = self.constructor.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); + + // Convert AccountId to (signer, address, nonce) tuples + let gen = tokio::runtime::Handle::current().block_on(account_generator.read()); + let ready_accounts_with_signers: Vec<_> = ready_accounts + .into_iter() + .map(|(account_id, nonce)| { + ( + Arc::new(gen.get_signer_by_id(account_id).clone()), + Arc::new(gen.get_address_by_id(account_id)), + nonce, + ) + }) + .collect(); + drop(gen); + // 4. Create async stream, process in batches let handle = tokio::task::spawn_blocking(move || { - ready_accounts + ready_accounts_with_signers .chunks(1024) .map(|chunk| { chunk @@ -172,20 +189,30 @@ impl TxnPlan for OneToManyPlan { fn build_txns( &mut self, - ready_accounts: Vec<(Arc, Arc
, u32)>, + ready_accounts: Vec<(AccountId, u32)>, + account_generator: Arc>, ) -> Result { // 3. Parallelly build and sign transactions let plan_id = self.id.clone(); let constructor = self.constructor.clone(); let chain_id = self.chain_id; let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); + + // Convert AccountId to addresses + let gen = tokio::runtime::Handle::current().block_on(account_generator.read()); + let addresses: Vec<_> = ready_accounts + .into_iter() + .map(|(account_id, _nonce)| Arc::new(gen.get_address_by_id(account_id))) + .collect(); + drop(gen); + let handle = tokio::task::spawn_blocking(move || { - ready_accounts + addresses .chunks(1024) .map(|chunk| { chunk .into_par_iter() - .flat_map(|(_signer, address, _nonce)| { + .flat_map(|address| { // Build transaction request let txs = constructor.build_for_receiver(address, chain_id).unwrap(); txs.into_iter() diff --git a/src/txn_plan/traits.rs b/src/txn_plan/traits.rs index 0449dfa..8be0eb8 100644 --- a/src/txn_plan/traits.rs +++ b/src/txn_plan/traits.rs @@ -5,8 +5,11 @@ use alloy::consensus::TxEnvelope; use alloy::primitives::Address; use alloy::rpc::types::TransactionRequest; use alloy::signers::local::PrivateKeySigner; +use tokio::sync::RwLock; use uuid::Uuid; +use crate::util::gen_account::{AccountGenerator, AccountId}; + #[derive(Debug, Clone, Message)] #[rtype(result = "anyhow::Result<()>")] pub struct SignedTxnWithMetadata { @@ -84,12 +87,12 @@ pub struct TxnIter { pub trait TxnPlan: Send + Sync { /// Builds and signs a vector of transactions based on the plan's logic. /// - /// This method should read from the `AccountManager` to get available accounts and their nonces, - /// but it should NOT mutate the manager. All state changes will be handled by the Producer - /// after the transactions are built. + /// This method receives ready account IDs with their nonces and uses the AccountGenerator + /// to retrieve the actual signers and addresses needed for transaction construction. fn build_txns( &mut self, - ready_accounts: Vec<(Arc, Arc
, u32)>, + ready_accounts: Vec<(AccountId, u32)>, + account_generator: Arc>, ) -> Result; /// Returns the unique identifier for this plan instance. From 8520d6fbc0c001696edf9f58cc0e891d9bef0a12 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 10:45:08 +0800 Subject: [PATCH 03/21] add txn --- .../addr_pool/managed_address_pool.rs | 22 ++++++----------- .../addr_pool/weighted_address_pool.rs | 24 ++++++------------- 2 files changed, 14 insertions(+), 32 deletions(-) diff --git a/src/txn_plan/addr_pool/managed_address_pool.rs b/src/txn_plan/addr_pool/managed_address_pool.rs index 89bca84..d6de224 100644 --- a/src/txn_plan/addr_pool/managed_address_pool.rs +++ b/src/txn_plan/addr_pool/managed_address_pool.rs @@ -11,12 +11,11 @@ struct Inner { account_status: HashMap, ready_accounts: Vec<(AccountId, u32)>, all_account_ids: Vec, - account_id_to_address: HashMap, - address_to_account_id: HashMap, } pub struct RandomAddressPool { inner: Mutex, + account_generator: Arc>, } impl RandomAddressPool { @@ -24,32 +23,23 @@ impl RandomAddressPool { pub fn new(account_ids: Vec, account_generator: Arc>) -> Self { let mut account_status = HashMap::new(); let mut ready_accounts = Vec::new(); - let mut account_id_to_address = HashMap::new(); - let mut address_to_account_id = HashMap::new(); - // Build address mapping synchronously by blocking on the async read - let gen = tokio::runtime::Handle::current().block_on(account_generator.read()); for &account_id in account_ids.iter() { // assume all address start from nonce, this is correct beacause a nonce too low error will trigger correct nonce let nonce = 0; - let address = gen.get_address_by_id(account_id); account_status.insert(account_id, nonce); ready_accounts.push((account_id, nonce)); - account_id_to_address.insert(account_id, address); - address_to_account_id.insert(address, account_id); } - drop(gen); let inner = Inner { account_status, ready_accounts, all_account_ids: account_ids, - account_id_to_address, - address_to_account_id, }; Self { inner: Mutex::new(inner), + account_generator, } } } @@ -123,12 +113,14 @@ impl AddressPool for RandomAddressPool { fn select_receiver(&self, excluded: &Address) -> Address { let inner = self.inner.lock(); - let excluded_id = inner.address_to_account_id.get(excluded); + let gen = tokio::runtime::Handle::current().block_on(self.account_generator.read()); + + let excluded_id = gen.get_id_by_address(excluded); loop { let idx = rand::random::() % inner.all_account_ids.len(); let account_id = inner.all_account_ids[idx]; - if Some(&account_id) != excluded_id { - return *inner.account_id_to_address.get(&account_id).unwrap(); + if Some(account_id) != excluded_id { + return gen.get_address_by_id(account_id); } } } diff --git a/src/txn_plan/addr_pool/weighted_address_pool.rs b/src/txn_plan/addr_pool/weighted_address_pool.rs index a7b1cb4..a8558b9 100644 --- a/src/txn_plan/addr_pool/weighted_address_pool.rs +++ b/src/txn_plan/addr_pool/weighted_address_pool.rs @@ -19,8 +19,6 @@ struct Inner { // Static data account_categories: HashMap, all_account_ids: Vec, - account_id_to_address: HashMap, - address_to_account_id: HashMap, // Dynamic data account_status: HashMap, @@ -31,6 +29,7 @@ struct Inner { pub struct WeightedAddressPool { inner: Mutex, + account_generator: Arc>, } impl WeightedAddressPool { @@ -47,17 +46,8 @@ impl WeightedAddressPool { let mut hot_accounts = Vec::with_capacity(hot_count); let mut normal_accounts = Vec::with_capacity(normal_count); let mut long_tail_accounts = Vec::with_capacity(total_accounts - hot_count - normal_count); - let mut account_id_to_address = HashMap::new(); - let mut address_to_account_id = HashMap::new(); - - // Build address mapping synchronously by blocking on the async read - let gen = tokio::runtime::Handle::current().block_on(account_generator.read()); for (i, &account_id) in all_account_ids.iter().enumerate() { - let address = gen.get_address_by_id(account_id); - account_id_to_address.insert(account_id, address); - address_to_account_id.insert(address, account_id); - if i < hot_count { account_categories.insert(account_id, AccountCategory::Hot); hot_accounts.push(account_id); @@ -69,7 +59,6 @@ impl WeightedAddressPool { long_tail_accounts.push(account_id); } } - drop(gen); let mut account_status = HashMap::new(); let mut hot_ready_accounts = Vec::new(); @@ -90,8 +79,6 @@ impl WeightedAddressPool { let inner = Inner { account_categories, all_account_ids, - account_id_to_address, - address_to_account_id, account_status, hot_ready_accounts, normal_ready_accounts, @@ -100,6 +87,7 @@ impl WeightedAddressPool { Self { inner: Mutex::new(inner), + account_generator, } } @@ -253,12 +241,14 @@ impl AddressPool for WeightedAddressPool { fn select_receiver(&self, excluded: &Address) -> Address { let inner = self.inner.lock(); - let excluded_id = inner.address_to_account_id.get(excluded); + let gen = tokio::runtime::Handle::current().block_on(self.account_generator.read()); + + let excluded_id = gen.get_id_by_address(excluded); loop { let idx = rand::random::() % inner.all_account_ids.len(); let account_id = inner.all_account_ids[idx]; - if Some(&account_id) != excluded_id { - return *inner.account_id_to_address.get(&account_id).unwrap(); + if Some(account_id) != excluded_id { + return gen.get_address_by_id(account_id); } } } From c42adc97c0e38362c40c90ed38c1a7d1271bbda7 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 16:00:31 +0800 Subject: [PATCH 04/21] add txn --- src/actors/producer/producer_actor.rs | 4 +-- src/main.rs | 4 +-- .../addr_pool/managed_address_pool.rs | 2 +- src/txn_plan/addr_pool/mod.rs | 3 +- src/txn_plan/faucet_plan.rs | 2 +- src/txn_plan/plan.rs | 14 ++++----- src/txn_plan/traits.rs | 2 +- src/util/gen_account.rs | 29 ++++++++++++++----- 8 files changed, 36 insertions(+), 24 deletions(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index e37ded8..fb8d9fa 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -167,8 +167,8 @@ impl Producer { // Fetch accounts and build transactions let ready_accounts = address_pool.fetch_senders(plan.size().unwrap_or_else(|| address_pool.len())); - - let iterator = plan.as_mut().build_txns(ready_accounts, account_generator)?; + let account_generator = account_generator.read().await; + let iterator = plan.as_mut().build_txns(ready_accounts, &account_generator)?; // If the plan doesn't consume nonces, accounts can be used by other processes immediately. if !iterator.consume_nonce { diff --git a/src/main.rs b/src/main.rs index e731619..97e4fc7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -264,8 +264,8 @@ async fn start_bench() -> Result<()> { }); contract_config }; - - let accout_generator = AccountGenerator::with_capacity(benchmark_config.accounts.num_accounts); + let private_key_signer = PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap(); + let accout_generator = AccountGenerator::with_capacity(benchmark_config.accounts.num_accounts, PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap()); let account_ids = accout_generator .write() .await diff --git a/src/txn_plan/addr_pool/managed_address_pool.rs b/src/txn_plan/addr_pool/managed_address_pool.rs index d6de224..4fcf8cd 100644 --- a/src/txn_plan/addr_pool/managed_address_pool.rs +++ b/src/txn_plan/addr_pool/managed_address_pool.rs @@ -113,7 +113,7 @@ impl AddressPool for RandomAddressPool { fn select_receiver(&self, excluded: &Address) -> Address { let inner = self.inner.lock(); - let gen = tokio::runtime::Handle::current().block_on(self.account_generator.read()); + let gen = self.account_generator.read(); let excluded_id = gen.get_id_by_address(excluded); loop { diff --git a/src/txn_plan/addr_pool/mod.rs b/src/txn_plan/addr_pool/mod.rs index e607ef6..d476948 100644 --- a/src/txn_plan/addr_pool/mod.rs +++ b/src/txn_plan/addr_pool/mod.rs @@ -6,7 +6,8 @@ pub mod managed_address_pool; #[allow(unused)] pub mod weighted_address_pool; -pub trait AddressPool: Send + Sync { +#[async_trait::async_trait] +pub trait AddressPool: Send + Sync + 'static { /// Fetches a batch of ready sender accounts based on the internal sampling strategy. /// This operation should internally lock the accounts to prevent concurrent use. /// Returns (AccountId, nonce) pairs. diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index 082efc1..f0779db 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -107,7 +107,7 @@ impl TxnPlan for LevelFaucetPlan { fn build_txns( &mut self, _ready_accounts: Vec<(crate::util::gen_account::AccountId, u32)>, - _account_generator: Arc>, + _account_generator: &crate::util::gen_account::AccountGenerator, ) -> Result { let plan_id = self.id.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); diff --git a/src/txn_plan/plan.rs b/src/txn_plan/plan.rs index 3abde00..ea8817c 100644 --- a/src/txn_plan/plan.rs +++ b/src/txn_plan/plan.rs @@ -76,25 +76,23 @@ impl TxnPlan for ManyToOnePlan { fn build_txns( &mut self, ready_accounts: Vec<(AccountId, u32)>, - account_generator: Arc>, + account_generator: &AccountGenerator, ) -> Result { let plan_id = self.id.clone(); let constructor = self.constructor.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); // Convert AccountId to (signer, address, nonce) tuples - let gen = tokio::runtime::Handle::current().block_on(account_generator.read()); let ready_accounts_with_signers: Vec<_> = ready_accounts .into_iter() .map(|(account_id, nonce)| { ( - Arc::new(gen.get_signer_by_id(account_id).clone()), - Arc::new(gen.get_address_by_id(account_id)), + Arc::new(account_generator.get_signer_by_id(account_id).clone()), + Arc::new(account_generator.get_address_by_id(account_id)), nonce, ) }) .collect(); - drop(gen); // 4. Create async stream, process in batches let handle = tokio::task::spawn_blocking(move || { @@ -190,7 +188,7 @@ impl TxnPlan for OneToManyPlan { fn build_txns( &mut self, ready_accounts: Vec<(AccountId, u32)>, - account_generator: Arc>, + account_generator: &AccountGenerator, ) -> Result { // 3. Parallelly build and sign transactions let plan_id = self.id.clone(); @@ -199,12 +197,10 @@ impl TxnPlan for OneToManyPlan { let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); // Convert AccountId to addresses - let gen = tokio::runtime::Handle::current().block_on(account_generator.read()); let addresses: Vec<_> = ready_accounts .into_iter() - .map(|(account_id, _nonce)| Arc::new(gen.get_address_by_id(account_id))) + .map(|(account_id, _nonce)| Arc::new(account_generator.get_address_by_id(account_id))) .collect(); - drop(gen); let handle = tokio::task::spawn_blocking(move || { addresses diff --git a/src/txn_plan/traits.rs b/src/txn_plan/traits.rs index 8be0eb8..5f77c91 100644 --- a/src/txn_plan/traits.rs +++ b/src/txn_plan/traits.rs @@ -92,7 +92,7 @@ pub trait TxnPlan: Send + Sync { fn build_txns( &mut self, ready_accounts: Vec<(AccountId, u32)>, - account_generator: Arc>, + account_generator: &AccountGenerator, ) -> Result; /// Returns the unique identifier for this plan instance. diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index 0787988..d85fb3d 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -1,9 +1,8 @@ use std::{ collections::HashMap, sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + Arc, atomic::{AtomicU64, Ordering} + }, u32, }; use alloy::{ @@ -19,29 +18,45 @@ pub struct AccountId(u32); pub struct AccountGenerator { accouts: Vec, + faucet_accout: PrivateKeySigner, + faucet_accout_id: AccountId, accout_to_id: HashMap, init_nonces: Vec>, } impl AccountGenerator { - pub fn with_capacity(capacity: usize) -> Arc> { + pub fn with_capacity(capacity: usize, faucet_accout: PrivateKeySigner) -> Arc> { Arc::new(RwLock::new(Self { accouts: Vec::with_capacity(capacity), + faucet_accout, + faucet_accout_id: AccountId(u32::MAX), accout_to_id: HashMap::with_capacity(capacity), init_nonces: Vec::with_capacity(capacity), })) } pub fn get_signer_by_id(&self, id: AccountId) -> &PrivateKeySigner { - &self.accouts[id.0 as usize] + if id == self.faucet_accout_id { + &self.faucet_accout + } else { + &self.accouts[id.0 as usize] + } } pub fn get_address_by_id(&self, id: AccountId) -> Address { - self.accouts[id.0 as usize].address() + if id == self.faucet_accout_id { + self.faucet_accout.address() + } else { + self.accouts[id.0 as usize].address() + } } pub fn get_id_by_address(&self, address: &Address) -> Option { - self.accout_to_id.get(address).copied() + if address == &self.faucet_accout.address() { + Some(self.faucet_accout_id) + } else { + self.accout_to_id.get(address).copied() + } } pub fn init_nonce_map(&self) -> HashMap { From 43b9b679e4349ec7647b44d94eccf70829cbecdf Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 16:15:35 +0800 Subject: [PATCH 05/21] add txn --- .../addr_pool/managed_address_pool.rs | 9 +-- src/txn_plan/addr_pool/mod.rs | 8 +-- .../addr_pool/weighted_address_pool.rs | 8 +-- src/txn_plan/constructor/approve.rs | 10 +-- src/txn_plan/constructor/distribute_token.rs | 10 +-- src/txn_plan/constructor/erc20_transfer.rs | 13 ++-- .../constructor/swap_token_2_token.rs | 12 ++-- src/txn_plan/plan.rs | 62 ++++++++----------- src/txn_plan/traits.rs | 7 ++- 9 files changed, 63 insertions(+), 76 deletions(-) diff --git a/src/txn_plan/addr_pool/managed_address_pool.rs b/src/txn_plan/addr_pool/managed_address_pool.rs index 4fcf8cd..5f96e05 100644 --- a/src/txn_plan/addr_pool/managed_address_pool.rs +++ b/src/txn_plan/addr_pool/managed_address_pool.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, sync::Arc}; -use alloy::primitives::Address; use parking_lot::Mutex; use tokio::sync::RwLock; @@ -111,16 +110,14 @@ impl AddressPool for RandomAddressPool { self.inner.lock().all_account_ids.len() } - fn select_receiver(&self, excluded: &Address) -> Address { + fn select_receiver(&self, excluded: AccountId) -> AccountId { let inner = self.inner.lock(); - let gen = self.account_generator.read(); - let excluded_id = gen.get_id_by_address(excluded); loop { let idx = rand::random::() % inner.all_account_ids.len(); let account_id = inner.all_account_ids[idx]; - if Some(account_id) != excluded_id { - return gen.get_address_by_id(account_id); + if account_id != excluded { + return account_id; } } } diff --git a/src/txn_plan/addr_pool/mod.rs b/src/txn_plan/addr_pool/mod.rs index d476948..115c3bb 100644 --- a/src/txn_plan/addr_pool/mod.rs +++ b/src/txn_plan/addr_pool/mod.rs @@ -1,5 +1,3 @@ -use alloy::primitives::Address; - use crate::util::gen_account::AccountId; pub mod managed_address_pool; @@ -36,7 +34,7 @@ pub trait AddressPool: Send + Sync + 'static { /// Returns the total number of accounts in the pool. fn len(&self) -> usize; - /// Selects a receiver address based on the internal sampling strategy. - /// The excluded parameter is the address to exclude from selection. - fn select_receiver(&self, excluded: &Address) -> Address; + /// Selects a receiver account ID based on the internal sampling strategy. + /// The excluded parameter is the account ID to exclude from selection. + fn select_receiver(&self, excluded: AccountId) -> AccountId; } diff --git a/src/txn_plan/addr_pool/weighted_address_pool.rs b/src/txn_plan/addr_pool/weighted_address_pool.rs index a8558b9..ff48c8f 100644 --- a/src/txn_plan/addr_pool/weighted_address_pool.rs +++ b/src/txn_plan/addr_pool/weighted_address_pool.rs @@ -239,16 +239,14 @@ impl AddressPool for WeightedAddressPool { self.inner.lock().all_account_ids.len() } - fn select_receiver(&self, excluded: &Address) -> Address { + fn select_receiver(&self, excluded: AccountId) -> AccountId { let inner = self.inner.lock(); - let gen = tokio::runtime::Handle::current().block_on(self.account_generator.read()); - let excluded_id = gen.get_id_by_address(excluded); loop { let idx = rand::random::() % inner.all_account_ids.len(); let account_id = inner.all_account_ids[idx]; - if Some(account_id) != excluded_id { - return gen.get_address_by_id(account_id); + if account_id != excluded { + return account_id; } } } diff --git a/src/txn_plan/constructor/approve.rs b/src/txn_plan/constructor/approve.rs index bff3b00..096e67a 100644 --- a/src/txn_plan/constructor/approve.rs +++ b/src/txn_plan/constructor/approve.rs @@ -8,7 +8,7 @@ use alloy::{ sol_types::SolCall, }; -use crate::{config::IERC20, txn_plan::traits::FromTxnConstructor}; +use crate::{config::IERC20, txn_plan::traits::FromTxnConstructor, util::gen_account::{AccountGenerator, AccountId}}; /// ERC20 approve constructor /// Approve tokens for multiple accounts to a specified spender (e.g. Uniswap Router) @@ -32,8 +32,8 @@ impl ApproveTokenConstructor { impl FromTxnConstructor for ApproveTokenConstructor { fn build_for_sender( &self, - from_account: &Arc
, - _from_signer: &Arc, + from_account_id: AccountId, + account_generator: &AccountGenerator, nonce: u64, ) -> Result { let approve_call = IERC20::approveCall { @@ -43,10 +43,10 @@ impl FromTxnConstructor for ApproveTokenConstructor { let call_data = approve_call.abi_encode(); let call_data = Bytes::from(call_data); - + let from_address = account_generator.get_address_by_id(from_account_id); // create transaction request let tx_request = TransactionRequest::default() - .with_from(*from_account.as_ref()) + .with_from(from_address) .with_to(self.token_address) .with_input(call_data) .with_nonce(nonce) diff --git a/src/txn_plan/constructor/distribute_token.rs b/src/txn_plan/constructor/distribute_token.rs index 503c657..e44eb81 100644 --- a/src/txn_plan/constructor/distribute_token.rs +++ b/src/txn_plan/constructor/distribute_token.rs @@ -6,7 +6,7 @@ use alloy::{ signers::local::PrivateKeySigner, }; -use crate::{eth::TxnBuilder, txn_plan::traits::FromTxnConstructor}; +use crate::{eth::TxnBuilder, txn_plan::traits::FromTxnConstructor, util::gen_account::{AccountGenerator, AccountId}}; /// Token distribute constructor /// Distribute tokens to accounts using ETH @@ -44,13 +44,13 @@ impl SwapEthToTokenConstructor { impl FromTxnConstructor for SwapEthToTokenConstructor { fn build_for_sender( &self, - _from_account: &Arc
, - from_signer: &Arc, + from_account_id: AccountId, + account_generator: &AccountGenerator, nonce: u64, ) -> Result { // set transaction deadline (current time + 30 minutes) let deadline = U256::from(chrono::Utc::now().timestamp() + 1800); - + let from_address = account_generator.get_address_by_id(from_account_id); // build swap path: WETH -> Token let path = vec![self.weth_address, self.token_address]; @@ -59,7 +59,7 @@ impl FromTxnConstructor for SwapEthToTokenConstructor { self.router_address, self.amount_out_min, path, - from_signer.address(), + from_address, deadline, self.eth_amount_per_account, nonce, diff --git a/src/txn_plan/constructor/erc20_transfer.rs b/src/txn_plan/constructor/erc20_transfer.rs index 3415ce2..7c3b98a 100644 --- a/src/txn_plan/constructor/erc20_transfer.rs +++ b/src/txn_plan/constructor/erc20_transfer.rs @@ -1,6 +1,6 @@ use crate::{ config::IERC20, - txn_plan::{addr_pool::AddressPool, FromTxnConstructor}, + txn_plan::{FromTxnConstructor, addr_pool::AddressPool}, util::gen_account::{AccountGenerator, AccountId}, }; use alloy::{ network::TransactionBuilder, @@ -39,13 +39,14 @@ impl Erc20TransferConstructor { impl FromTxnConstructor for Erc20TransferConstructor { fn build_for_sender( &self, - from_account: &Arc
, - from_signer: &Arc, + from_account_id: AccountId, + account_generator: &AccountGenerator, nonce: u64, ) -> Result { // random select a receiver address, ensure not to self - let to_address = self.address_pool.select_receiver(from_account); - + let to_address = self.address_pool.select_receiver(from_account_id); + let to_address = account_generator.get_address_by_id(to_address); + let from_address = account_generator.get_address_by_id(from_account_id); // build ERC20 transfer call let transfer_call = IERC20::transferCall { to: to_address, @@ -58,7 +59,7 @@ impl FromTxnConstructor for Erc20TransferConstructor { let token_address = self.token_list[token_idx]; // create transaction request let tx_request = TransactionRequest::default() - .with_from(from_signer.address()) + .with_from(from_address) .with_to(token_address) .with_input(call_data) .with_nonce(nonce) diff --git a/src/txn_plan/constructor/swap_token_2_token.rs b/src/txn_plan/constructor/swap_token_2_token.rs index 1b0e147..e53cbd0 100644 --- a/src/txn_plan/constructor/swap_token_2_token.rs +++ b/src/txn_plan/constructor/swap_token_2_token.rs @@ -1,6 +1,6 @@ use crate::{ config::{IUniswapV2Router, LiquidityPair}, - txn_plan::{addr_pool::AddressPool, FromTxnConstructor}, + txn_plan::{FromTxnConstructor, addr_pool::AddressPool}, util::gen_account::{AccountGenerator, AccountId}, }; use alloy::{ network::TransactionBuilder, @@ -40,11 +40,13 @@ impl SwapTokenToTokenConstructor { impl FromTxnConstructor for SwapTokenToTokenConstructor { fn build_for_sender( &self, - from_account: &Arc
, - from_signer: &Arc, + from_account_id: AccountId, + account_generator: &AccountGenerator, nonce: u64, ) -> Result { - let to_address = self.address_pool.select_receiver(from_account); + let to_address = self.address_pool.select_receiver(from_account_id); + let to_address = account_generator.get_address_by_id(to_address); + let from_address = account_generator.get_address_by_id(from_account_id); let token_idx = rand::random::() % self.token_list.len(); let from_token = Address::from_str(&self.token_list[token_idx].token_a_address).unwrap(); let to_token = Address::from_str(&self.token_list[token_idx].token_b_address).unwrap(); @@ -65,7 +67,7 @@ impl FromTxnConstructor for SwapTokenToTokenConstructor { let call_data = swap_call.abi_encode(); let call_data = Bytes::from(call_data); let tx_request = TransactionRequest::default() - .with_from(from_signer.address()) + .with_from(from_address) .with_to(self.router_address) .with_input(call_data) .with_nonce(nonce) diff --git a/src/txn_plan/plan.rs b/src/txn_plan/plan.rs index ea8817c..f2c89f2 100644 --- a/src/txn_plan/plan.rs +++ b/src/txn_plan/plan.rs @@ -10,7 +10,7 @@ use crate::{ util::gen_account::{AccountGenerator, AccountId}, }; use alloy::{ - consensus::transaction::SignerRecoverable, eips::Encodable2718, + consensus::{TxEnvelope, transaction::SignerRecoverable}, eips::Encodable2718, }; use rayon::iter::IntoParallelIterator; use rayon::iter::ParallelIterator; @@ -81,42 +81,34 @@ impl TxnPlan for ManyToOnePlan { let plan_id = self.id.clone(); let constructor = self.constructor.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); - - // Convert AccountId to (signer, address, nonce) tuples - let ready_accounts_with_signers: Vec<_> = ready_accounts - .into_iter() - .map(|(account_id, nonce)| { - ( - Arc::new(account_generator.get_signer_by_id(account_id).clone()), - Arc::new(account_generator.get_address_by_id(account_id)), - nonce, - ) - }) - .collect(); + // 4. Create async stream, process in batches let handle = tokio::task::spawn_blocking(move || { - ready_accounts_with_signers + ready_accounts .chunks(1024) .map(|chunk| { chunk .into_par_iter() - .map(|(signer, address, nonce)| { - let tx_request = constructor - .build_for_sender(address, signer, *nonce as u64) - .unwrap(); - let metadata = Arc::new(TxnMetadata { - from_account: address.clone(), - nonce: *nonce as u64, - txn_id: Uuid::new_v4(), - plan_id: plan_id.clone(), - }); - let tx_envelope = - TxnBuilder::build_and_sign_transaction(tx_request, signer).unwrap(); - SignedTxnWithMetadata { - bytes: tx_envelope.encoded_2718(), - metadata, - } + .map(|(from_account_id, nonce)| { + todo!() + // let address = account_generator.get_address_by_id(*from_account_id); + // let signer = account_generator.get_signer_by_id(*from_account_id).clone(); + // let tx_request = constructor + // .build_for_sender(*from_account_id, account_generator, *nonce as u64) + // .unwrap(); + // let metadata = Arc::new(TxnMetadata { + // from_account: Arc::new(address), + // nonce: *nonce as u64, + // txn_id: Uuid::new_v4(), + // plan_id: plan_id.clone(), + // }); + // let tx_envelope = + // TxnBuilder::build_and_sign_transaction(tx_request, &signer).unwrap(); + // SignedTxnWithMetadata { + // bytes: tx_envelope.encoded_2718(), + // metadata, + // } }) .collect::>() }) @@ -197,10 +189,7 @@ impl TxnPlan for OneToManyPlan { let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); // Convert AccountId to addresses - let addresses: Vec<_> = ready_accounts - .into_iter() - .map(|(account_id, _nonce)| Arc::new(account_generator.get_address_by_id(account_id))) - .collect(); + let addresses = ready_accounts; let handle = tokio::task::spawn_blocking(move || { addresses @@ -208,9 +197,10 @@ impl TxnPlan for OneToManyPlan { .map(|chunk| { chunk .into_par_iter() - .flat_map(|address| { + .flat_map(|(to_account_id, _nonce)| { // Build transaction request - let txs = constructor.build_for_receiver(address, chain_id).unwrap(); + // let txs = constructor.build_for_receiver(*to_account_id, account_generator, chain_id).unwrap(); + let txs: Vec = todo!(); txs.into_iter() .map(|tx_envelope| { let metadata = Arc::new(TxnMetadata { diff --git a/src/txn_plan/traits.rs b/src/txn_plan/traits.rs index 5f77c91..088e2df 100644 --- a/src/txn_plan/traits.rs +++ b/src/txn_plan/traits.rs @@ -53,8 +53,8 @@ pub trait FromTxnConstructor: Send + Sync + 'static { /// The `to` address is usually a field of the constructor itself (e.g., fixed spender or router address). fn build_for_sender( &self, - from_account: &Arc
, - from_signer: &Arc, + from_account_id: AccountId, + accout_generator: &AccountGenerator, nonce: u64, ) -> Result; @@ -66,7 +66,8 @@ pub trait ToTxnConstructor: Send + Sync + 'static { /// Build transaction based on receiver information. fn build_for_receiver( &self, - to: &Arc
, + to_account_id: AccountId, + account_generator: &AccountGenerator, chain_id: u64, ) -> Result, anyhow::Error>; From d76e71ec52514cf703d0c75856f45b0a51810c64 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 16:35:08 +0800 Subject: [PATCH 06/21] add txn --- src/actors/producer/producer_actor.rs | 21 ++-- src/main.rs | 104 ++++++++---------- .../addr_pool/managed_address_pool.rs | 6 +- .../addr_pool/weighted_address_pool.rs | 6 +- src/txn_plan/constructor/approve.rs | 4 +- src/txn_plan/constructor/distribute_token.rs | 4 +- src/txn_plan/constructor/erc20_transfer.rs | 4 +- src/txn_plan/constructor/faucet.rs | 7 +- .../constructor/swap_token_2_token.rs | 4 +- src/txn_plan/faucet_plan.rs | 4 +- src/txn_plan/plan.rs | 49 ++++----- src/txn_plan/plan_builder.rs | 2 +- src/txn_plan/traits.rs | 8 +- src/util/gen_account.rs | 12 +- 14 files changed, 108 insertions(+), 127 deletions(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index fb8d9fa..e7d4a9f 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -15,7 +15,7 @@ use crate::actors::monitor::{ }; use crate::actors::{ExeFrontPlan, PauseProducer, ResumeProducer}; use crate::txn_plan::{addr_pool::AddressPool, PlanExecutionMode, PlanId, TxnPlan}; -use crate::util::gen_account::AccountGenerator; +use crate::util::gen_account::{AccountGenerator, AccountManager}; use super::messages::RegisterTxnPlan; @@ -73,7 +73,7 @@ pub struct Producer { nonce_cache: Arc, u32>>, - account_generator: Arc>, + account_generator: AccountManager, /// A queue of plans waiting to be executed. Plans are processed in FIFO order. plan_queue: VecDeque>, @@ -88,18 +88,16 @@ impl Producer { address_pool: Arc, consumer_addr: Addr, monitor_addr: Addr, - account_generator: Arc>, + account_generator: AccountManager, ) -> Result { let nonce_cache = Arc::new(DashMap::new()); - let gen = account_generator.read().await; address_pool.clean_ready_accounts(); - for (account_id, nonce) in gen.account_ids_with_nonce() { - let address = Arc::new(gen.get_address_by_id(account_id)); + for (account_id, nonce) in account_generator.account_ids_with_nonce() { + let address = Arc::new(account_generator.get_address_by_id(account_id)); let nonce = nonce.load(Ordering::Relaxed) as u32; nonce_cache.insert(address.clone(), nonce); address_pool.unlock_correct_nonce(account_id, nonce); } - drop(gen); Ok(Self { state: ProducerState::running(), stats: ProducerStats { @@ -156,7 +154,7 @@ impl Producer { monitor_addr: Addr, consumer_addr: Addr, address_pool: Arc, - account_generator: Arc>, + account_generator: AccountManager, mut plan: Box, sending_txns: Arc, state: ProducerState, @@ -167,8 +165,7 @@ impl Producer { // Fetch accounts and build transactions let ready_accounts = address_pool.fetch_senders(plan.size().unwrap_or_else(|| address_pool.len())); - let account_generator = account_generator.read().await; - let iterator = plan.as_mut().build_txns(ready_accounts, &account_generator)?; + let iterator = plan.as_mut().build_txns(ready_accounts, account_generator.clone())?; // If the plan doesn't consume nonces, accounts can be used by other processes immediately. if !iterator.consume_nonce { @@ -428,9 +425,7 @@ impl Handler for Producer { let ready_accounts = self.stats.ready_accounts.clone(); Box::pin( async move { - let gen = account_generator.read().await; - let account_id = gen.get_id_by_address(&account); - drop(gen); + let account_id = account_generator.get_id_by_address(&account); if let Some(account_id) = account_id { match msg.result.as_ref() { diff --git a/src/main.rs b/src/main.rs index 97e4fc7..6935de8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,16 +21,13 @@ use tokio::{ use tracing::{info, Level}; use crate::{ - actors::{consumer::Consumer, producer::Producer, Monitor, RegisterTxnPlan}, + actors::{Monitor, RegisterTxnPlan, consumer::Consumer, producer::Producer}, config::{BenchConfig, ContractConfig}, eth::EthHttpCli, txn_plan::{ - addr_pool::AddressPool, - constructor::FaucetTreePlanBuilder, - faucet_txn_builder::{Erc20FaucetTxnBuilder, EthFaucetTxnBuilder, FaucetTxnBuilder}, - PlanBuilder, TxnPlan, + PlanBuilder, TxnPlan, addr_pool::AddressPool, constructor::FaucetTreePlanBuilder, faucet_txn_builder::{Erc20FaucetTxnBuilder, EthFaucetTxnBuilder, FaucetTxnBuilder} }, - util::gen_account::AccountGenerator, + util::gen_account::{AccountGenerator, AccountManager}, }; #[derive(Parser, Debug)] @@ -212,11 +209,11 @@ fn run_command(command: &str) -> Result { } async fn get_init_nonce_map( - accout_generator: Arc>, + accout_generator: AccountManager, faucet_private_key: &str, eth_client: Arc, ) -> Arc> { - let mut init_nonce_map = accout_generator.read().await.init_nonce_map(); + let mut init_nonce_map = accout_generator.init_nonce_map(); let faucet_signer = PrivateKeySigner::from_str(faucet_private_key).unwrap(); let faucet_address = faucet_signer.address(); init_nonce_map.insert( @@ -264,18 +261,14 @@ async fn start_bench() -> Result<()> { }); contract_config }; - let private_key_signer = PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap(); - let accout_generator = AccountGenerator::with_capacity(benchmark_config.accounts.num_accounts, PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap()); + let mut accout_generator = AccountGenerator::with_capacity(benchmark_config.accounts.num_accounts, PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap()); let account_ids = accout_generator - .write() - .await .gen_account(0, benchmark_config.accounts.num_accounts as u64) .unwrap(); let account_addresses = Arc::new({ - let gen = accout_generator.read().await; account_ids .iter() - .map(|&id| Arc::new(gen.get_address_by_id(id))) + .map(|&id| Arc::new(accout_generator.get_address_by_id(id))) .collect::>() }); // Create EthHttpCli instance @@ -288,9 +281,7 @@ async fn start_bench() -> Result<()> { }) .collect(); - let address_pool: Arc = Arc::new( - txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(account_ids.clone(), accout_generator.clone()), - ); + let chain_id = benchmark_config.nodes[0].chain_id; @@ -306,32 +297,48 @@ async fn start_bench() -> Result<()> { U256::from(benchmark_config.num_tokens) * U256::from(21000) * U256::from(1000_000_000_000u64), - accout_generator.clone(), + &mut accout_generator, ) .await .unwrap(); if args.recover { - init_nonce(accout_generator.clone(), eth_clients[0].clone()).await; + init_nonce(&mut accout_generator, eth_clients[0].clone()).await; } let monitor = Monitor::new_with_clients( eth_clients.clone(), benchmark_config.performance.max_pool_size, ) .start(); - // let mut file = tokio::fs::File::create("accounts.txt").await.unwrap(); - // for (sign, nonce) in accout_generator.read().await.accouts_nonce_iter() { - // file.write( - // format!( - // "{}, {}, {}\n", - // hex::encode(sign.to_bytes()), - // sign.address().to_string(), - // nonce.load(Ordering::Relaxed), - // ) - // .as_bytes(), - // ) - // .await - // .unwrap(); - // } + + let tokens = contract_config.get_all_token(); + let mut tokens_plan = Vec::new(); + for token in &tokens { + start_nonce += benchmark_config.faucet.faucet_level as u64; + info!("distributing token: {}", token.address); + let token_address = Address::from_str(&token.address).unwrap(); + let faucet_token_balance = U256::from_str(&token.faucet_balance).unwrap(); + info!("balance of token: {}", faucet_token_balance); + let token_faucet_builder = PlanBuilder::create_faucet_tree_plan_builder( + benchmark_config.faucet.faucet_level as usize, + faucet_token_balance, + &benchmark_config.faucet.private_key, + start_nonce, + account_addresses.clone(), + Arc::new(Erc20FaucetTxnBuilder::new(token_address)), + U256::ZERO, + &mut accout_generator, + ) + .await + .unwrap(); + tokens_plan.push(token_faucet_builder); + } + + let account_manager = accout_generator.to_manager(); + + let address_pool: Arc = Arc::new( + txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(account_ids.clone(), account_manager.clone()), + ); + // Use the same client instances for Consumer to share metrics let eth_providers: Vec = eth_clients .iter() @@ -347,7 +354,7 @@ async fn start_bench() -> Result<()> { ) .start(); let init_nonce_map = get_init_nonce_map( - accout_generator.clone(), + account_manager.clone(), benchmark_config.faucet.private_key.as_str(), eth_clients[0].clone(), ) @@ -357,7 +364,7 @@ async fn start_bench() -> Result<()> { address_pool.clone(), consumer, monitor, - accout_generator.clone(), + account_manager.clone(), ) .await .unwrap() @@ -372,29 +379,9 @@ async fn start_bench() -> Result<()> { ) .await?; - let tokens = contract_config.get_all_token(); - - for token in &tokens { - start_nonce += benchmark_config.faucet.faucet_level as u64; - info!("distributing token: {}", token.address); - let token_address = Address::from_str(&token.address).unwrap(); - let faucet_token_balance = U256::from_str(&token.faucet_balance).unwrap(); - info!("balance of token: {}", faucet_token_balance); - let token_faucet_builder = PlanBuilder::create_faucet_tree_plan_builder( - benchmark_config.faucet.faucet_level as usize, - faucet_token_balance, - &benchmark_config.faucet.private_key, - start_nonce, - account_addresses.clone(), - Arc::new(Erc20FaucetTxnBuilder::new(token_address)), - U256::ZERO, - accout_generator.clone(), - ) - .await - .unwrap(); - + for (token_plan, token) in tokens_plan.into_iter().zip(tokens.iter()) { execute_faucet_distribution( - token_faucet_builder, + token_plan, chain_id, &producer, &format!("Token {}", token.symbol), @@ -432,9 +419,8 @@ async fn start_bench() -> Result<()> { Ok(()) } -async fn init_nonce(accout_generator: Arc>, eth_client: Arc) { +async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc) { tracing::info!("Initializing nonce..."); - let accout_generator = accout_generator.read().await; let tasks = accout_generator .accouts_nonce_iter() .map(|(account, nonce)| { diff --git a/src/txn_plan/addr_pool/managed_address_pool.rs b/src/txn_plan/addr_pool/managed_address_pool.rs index 5f96e05..a9beec9 100644 --- a/src/txn_plan/addr_pool/managed_address_pool.rs +++ b/src/txn_plan/addr_pool/managed_address_pool.rs @@ -4,7 +4,7 @@ use parking_lot::Mutex; use tokio::sync::RwLock; use super::AddressPool; -use crate::util::gen_account::{AccountGenerator, AccountId}; +use crate::util::gen_account::{AccountGenerator, AccountId, AccountManager}; struct Inner { account_status: HashMap, @@ -14,12 +14,12 @@ struct Inner { pub struct RandomAddressPool { inner: Mutex, - account_generator: Arc>, + account_generator: AccountManager, } impl RandomAddressPool { #[allow(unused)] - pub fn new(account_ids: Vec, account_generator: Arc>) -> Self { + pub fn new(account_ids: Vec, account_generator: AccountManager) -> Self { let mut account_status = HashMap::new(); let mut ready_accounts = Vec::new(); diff --git a/src/txn_plan/addr_pool/weighted_address_pool.rs b/src/txn_plan/addr_pool/weighted_address_pool.rs index ff48c8f..60d1295 100644 --- a/src/txn_plan/addr_pool/weighted_address_pool.rs +++ b/src/txn_plan/addr_pool/weighted_address_pool.rs @@ -6,7 +6,7 @@ use rand::seq::SliceRandom; use tokio::sync::RwLock; use super::AddressPool; -use crate::util::gen_account::{AccountGenerator, AccountId}; +use crate::util::gen_account::{AccountGenerator, AccountId, AccountManager}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] enum AccountCategory { @@ -29,11 +29,11 @@ struct Inner { pub struct WeightedAddressPool { inner: Mutex, - account_generator: Arc>, + account_generator: AccountManager, } impl WeightedAddressPool { - pub fn new(account_ids: Vec, account_generator: Arc>) -> Self { + pub fn new(account_ids: Vec, account_generator: AccountManager) -> Self { let mut all_account_ids = account_ids; // Shuffle for random distribution all_account_ids.shuffle(&mut rand::thread_rng()); diff --git a/src/txn_plan/constructor/approve.rs b/src/txn_plan/constructor/approve.rs index 096e67a..c27715e 100644 --- a/src/txn_plan/constructor/approve.rs +++ b/src/txn_plan/constructor/approve.rs @@ -8,7 +8,7 @@ use alloy::{ sol_types::SolCall, }; -use crate::{config::IERC20, txn_plan::traits::FromTxnConstructor, util::gen_account::{AccountGenerator, AccountId}}; +use crate::{config::IERC20, txn_plan::traits::FromTxnConstructor, util::gen_account::{AccountGenerator, AccountId, AccountManager}}; /// ERC20 approve constructor /// Approve tokens for multiple accounts to a specified spender (e.g. Uniswap Router) @@ -33,7 +33,7 @@ impl FromTxnConstructor for ApproveTokenConstructor { fn build_for_sender( &self, from_account_id: AccountId, - account_generator: &AccountGenerator, + account_generator: AccountManager, nonce: u64, ) -> Result { let approve_call = IERC20::approveCall { diff --git a/src/txn_plan/constructor/distribute_token.rs b/src/txn_plan/constructor/distribute_token.rs index e44eb81..8b94bdf 100644 --- a/src/txn_plan/constructor/distribute_token.rs +++ b/src/txn_plan/constructor/distribute_token.rs @@ -6,7 +6,7 @@ use alloy::{ signers::local::PrivateKeySigner, }; -use crate::{eth::TxnBuilder, txn_plan::traits::FromTxnConstructor, util::gen_account::{AccountGenerator, AccountId}}; +use crate::{eth::TxnBuilder, txn_plan::traits::FromTxnConstructor, util::gen_account::{AccountGenerator, AccountId, AccountManager}}; /// Token distribute constructor /// Distribute tokens to accounts using ETH @@ -45,7 +45,7 @@ impl FromTxnConstructor for SwapEthToTokenConstructor { fn build_for_sender( &self, from_account_id: AccountId, - account_generator: &AccountGenerator, + account_generator: AccountManager, nonce: u64, ) -> Result { // set transaction deadline (current time + 30 minutes) diff --git a/src/txn_plan/constructor/erc20_transfer.rs b/src/txn_plan/constructor/erc20_transfer.rs index 7c3b98a..0aad4a3 100644 --- a/src/txn_plan/constructor/erc20_transfer.rs +++ b/src/txn_plan/constructor/erc20_transfer.rs @@ -1,6 +1,6 @@ use crate::{ config::IERC20, - txn_plan::{FromTxnConstructor, addr_pool::AddressPool}, util::gen_account::{AccountGenerator, AccountId}, + txn_plan::{FromTxnConstructor, addr_pool::AddressPool}, util::gen_account::{AccountGenerator, AccountId, AccountManager}, }; use alloy::{ network::TransactionBuilder, @@ -40,7 +40,7 @@ impl FromTxnConstructor for Erc20TransferConstructor { fn build_for_sender( &self, from_account_id: AccountId, - account_generator: &AccountGenerator, + account_generator: AccountManager, nonce: u64, ) -> Result { // random select a receiver address, ensure not to self diff --git a/src/txn_plan/constructor/faucet.rs b/src/txn_plan/constructor/faucet.rs index db5ab72..3ff57a3 100644 --- a/src/txn_plan/constructor/faucet.rs +++ b/src/txn_plan/constructor/faucet.rs @@ -44,7 +44,7 @@ impl FaucetTreePlanBuilder { final_recipients: Arc>>, txn_builder: Arc, remained_eth: U256, - account_generator: Arc>, + account_generator: &mut AccountGenerator, ) -> Self { let mut degree = faucet_level; let total_accounts = final_recipients.len(); @@ -117,15 +117,12 @@ impl FaucetTreePlanBuilder { for level in 0..num_intermediate_levels { let num_accounts_at_level = degree.pow(level as u32 + 1); let account_ids = account_generator - .write() - .await .gen_account(start_index as u64, num_accounts_at_level as u64) .unwrap(); - let gen = account_generator.read().await; account_levels.push( account_ids .iter() - .map(|&id| Arc::new(gen.get_signer_by_id(id).clone())) + .map(|&id| Arc::new(account_generator.get_signer_by_id(id).clone())) .collect::>(), ); start_index += num_accounts_at_level as usize; diff --git a/src/txn_plan/constructor/swap_token_2_token.rs b/src/txn_plan/constructor/swap_token_2_token.rs index e53cbd0..f419a70 100644 --- a/src/txn_plan/constructor/swap_token_2_token.rs +++ b/src/txn_plan/constructor/swap_token_2_token.rs @@ -1,6 +1,6 @@ use crate::{ config::{IUniswapV2Router, LiquidityPair}, - txn_plan::{FromTxnConstructor, addr_pool::AddressPool}, util::gen_account::{AccountGenerator, AccountId}, + txn_plan::{FromTxnConstructor, addr_pool::AddressPool}, util::gen_account::{AccountGenerator, AccountId, AccountManager}, }; use alloy::{ network::TransactionBuilder, @@ -41,7 +41,7 @@ impl FromTxnConstructor for SwapTokenToTokenConstructor { fn build_for_sender( &self, from_account_id: AccountId, - account_generator: &AccountGenerator, + account_generator: AccountManager, nonce: u64, ) -> Result { let to_address = self.address_pool.select_receiver(from_account_id); diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index f0779db..3944934 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -3,7 +3,7 @@ use crate::{ txn_plan::{ faucet_txn_builder::FaucetTxnBuilder, traits::{PlanExecutionMode, PlanId, SignedTxnWithMetadata, TxnMetadata, TxnPlan}, - }, + }, util::gen_account::AccountManager, }; use alloy::{ eips::Encodable2718, @@ -107,7 +107,7 @@ impl TxnPlan for LevelFaucetPlan { fn build_txns( &mut self, _ready_accounts: Vec<(crate::util::gen_account::AccountId, u32)>, - _account_generator: &crate::util::gen_account::AccountGenerator, + _account_generator: AccountManager, ) -> Result { let plan_id = self.id.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); diff --git a/src/txn_plan/plan.rs b/src/txn_plan/plan.rs index f2c89f2..6400359 100644 --- a/src/txn_plan/plan.rs +++ b/src/txn_plan/plan.rs @@ -1,13 +1,12 @@ use crate::{ eth::TxnBuilder, txn_plan::{ - traits::{ + TxnIter, traits::{ FromTxnConstructor, PlanExecutionMode, PlanId, SignedTxnWithMetadata, ToTxnConstructor, TxnMetadata, TxnPlan, - }, - TxnIter, + } }, - util::gen_account::{AccountGenerator, AccountId}, + util::gen_account::{AccountGenerator, AccountId, AccountManager}, }; use alloy::{ consensus::{TxEnvelope, transaction::SignerRecoverable}, eips::Encodable2718, @@ -76,7 +75,7 @@ impl TxnPlan for ManyToOnePlan { fn build_txns( &mut self, ready_accounts: Vec<(AccountId, u32)>, - account_generator: &AccountGenerator, + account_generator: AccountManager, ) -> Result { let plan_id = self.id.clone(); let constructor = self.constructor.clone(); @@ -91,24 +90,23 @@ impl TxnPlan for ManyToOnePlan { chunk .into_par_iter() .map(|(from_account_id, nonce)| { - todo!() - // let address = account_generator.get_address_by_id(*from_account_id); - // let signer = account_generator.get_signer_by_id(*from_account_id).clone(); - // let tx_request = constructor - // .build_for_sender(*from_account_id, account_generator, *nonce as u64) - // .unwrap(); - // let metadata = Arc::new(TxnMetadata { - // from_account: Arc::new(address), - // nonce: *nonce as u64, - // txn_id: Uuid::new_v4(), - // plan_id: plan_id.clone(), - // }); - // let tx_envelope = - // TxnBuilder::build_and_sign_transaction(tx_request, &signer).unwrap(); - // SignedTxnWithMetadata { - // bytes: tx_envelope.encoded_2718(), - // metadata, - // } + let address = account_generator.get_address_by_id(*from_account_id); + let signer = account_generator.get_signer_by_id(*from_account_id).clone(); + let tx_request = constructor + .build_for_sender(*from_account_id, account_generator.clone(), *nonce as u64) + .unwrap(); + let metadata = Arc::new(TxnMetadata { + from_account: Arc::new(address), + nonce: *nonce as u64, + txn_id: Uuid::new_v4(), + plan_id: plan_id.clone(), + }); + let tx_envelope = + TxnBuilder::build_and_sign_transaction(tx_request, &signer).unwrap(); + SignedTxnWithMetadata { + bytes: tx_envelope.encoded_2718(), + metadata, + } }) .collect::>() }) @@ -180,7 +178,7 @@ impl TxnPlan for OneToManyPlan { fn build_txns( &mut self, ready_accounts: Vec<(AccountId, u32)>, - account_generator: &AccountGenerator, + account_generator: AccountManager, ) -> Result { // 3. Parallelly build and sign transactions let plan_id = self.id.clone(); @@ -199,8 +197,7 @@ impl TxnPlan for OneToManyPlan { .into_par_iter() .flat_map(|(to_account_id, _nonce)| { // Build transaction request - // let txs = constructor.build_for_receiver(*to_account_id, account_generator, chain_id).unwrap(); - let txs: Vec = todo!(); + let txs = constructor.build_for_receiver(*to_account_id, account_generator.clone(), chain_id).unwrap(); txs.into_iter() .map(|tx_envelope| { let metadata = Arc::new(TxnMetadata { diff --git a/src/txn_plan/plan_builder.rs b/src/txn_plan/plan_builder.rs index 25c2f7d..1f3893d 100644 --- a/src/txn_plan/plan_builder.rs +++ b/src/txn_plan/plan_builder.rs @@ -86,7 +86,7 @@ impl PlanBuilder { total_accounts: Arc>>, txn_builder: Arc, remained_eth: U256, - account_generator: Arc>, + account_generator: &mut AccountGenerator, ) -> Result>, anyhow::Error> { let faucet_signer = PrivateKeySigner::from_str(faucet_private_key)?; let constructor = FaucetTreePlanBuilder::new( diff --git a/src/txn_plan/traits.rs b/src/txn_plan/traits.rs index 088e2df..ecb1cf5 100644 --- a/src/txn_plan/traits.rs +++ b/src/txn_plan/traits.rs @@ -8,7 +8,7 @@ use alloy::signers::local::PrivateKeySigner; use tokio::sync::RwLock; use uuid::Uuid; -use crate::util::gen_account::{AccountGenerator, AccountId}; +use crate::util::gen_account::{AccountGenerator, AccountId, AccountManager}; #[derive(Debug, Clone, Message)] #[rtype(result = "anyhow::Result<()>")] @@ -54,7 +54,7 @@ pub trait FromTxnConstructor: Send + Sync + 'static { fn build_for_sender( &self, from_account_id: AccountId, - accout_generator: &AccountGenerator, + accout_generator: AccountManager, nonce: u64, ) -> Result; @@ -67,7 +67,7 @@ pub trait ToTxnConstructor: Send + Sync + 'static { fn build_for_receiver( &self, to_account_id: AccountId, - account_generator: &AccountGenerator, + account_generator: AccountManager, chain_id: u64, ) -> Result, anyhow::Error>; @@ -93,7 +93,7 @@ pub trait TxnPlan: Send + Sync { fn build_txns( &mut self, ready_accounts: Vec<(AccountId, u32)>, - account_generator: &AccountGenerator, + account_generator: AccountManager, ) -> Result; /// Returns the unique identifier for this plan instance. diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index d85fb3d..f21f35b 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -24,15 +24,21 @@ pub struct AccountGenerator { init_nonces: Vec>, } +pub type AccountManager = Arc; + impl AccountGenerator { - pub fn with_capacity(capacity: usize, faucet_accout: PrivateKeySigner) -> Arc> { - Arc::new(RwLock::new(Self { + pub fn with_capacity(capacity: usize, faucet_accout: PrivateKeySigner) -> Self { + Self { accouts: Vec::with_capacity(capacity), faucet_accout, faucet_accout_id: AccountId(u32::MAX), accout_to_id: HashMap::with_capacity(capacity), init_nonces: Vec::with_capacity(capacity), - })) + } + } + + pub fn to_manager(self) -> AccountManager { + Arc::new(self) } pub fn get_signer_by_id(&self, id: AccountId) -> &PrivateKeySigner { From 1d0645110f71f3442d4b99a2c592a90b6fbac6f2 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 16:48:28 +0800 Subject: [PATCH 07/21] add txn --- src/actors/producer/producer_actor.rs | 9 +++-- src/main.rs | 26 +++++++------ .../addr_pool/managed_address_pool.rs | 9 ++--- .../addr_pool/weighted_address_pool.rs | 4 +- src/txn_plan/constructor/approve.rs | 9 +++-- src/txn_plan/constructor/distribute_token.rs | 9 +++-- src/txn_plan/constructor/erc20_transfer.rs | 4 +- src/txn_plan/constructor/faucet.rs | 1 - .../constructor/swap_token_2_token.rs | 4 +- src/txn_plan/faucet_plan.rs | 3 +- src/txn_plan/plan.rs | 39 ++++++++++++------- src/txn_plan/plan_builder.rs | 1 - src/txn_plan/traits.rs | 4 +- src/util/gen_account.rs | 13 +++---- 14 files changed, 72 insertions(+), 63 deletions(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index e7d4a9f..180e24a 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -5,7 +5,6 @@ use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; use crate::actors::consumer::Consumer; use crate::actors::monitor::monitor_actor::{PlanProduced, ProduceTxns}; @@ -15,7 +14,7 @@ use crate::actors::monitor::{ }; use crate::actors::{ExeFrontPlan, PauseProducer, ResumeProducer}; use crate::txn_plan::{addr_pool::AddressPool, PlanExecutionMode, PlanId, TxnPlan}; -use crate::util::gen_account::{AccountGenerator, AccountManager}; +use crate::util::gen_account::AccountManager; use super::messages::RegisterTxnPlan; @@ -72,7 +71,7 @@ pub struct Producer { consumer_addr: Addr, nonce_cache: Arc, u32>>, - + account_generator: AccountManager, /// A queue of plans waiting to be executed. Plans are processed in FIFO order. @@ -165,7 +164,9 @@ impl Producer { // Fetch accounts and build transactions let ready_accounts = address_pool.fetch_senders(plan.size().unwrap_or_else(|| address_pool.len())); - let iterator = plan.as_mut().build_txns(ready_accounts, account_generator.clone())?; + let iterator = plan + .as_mut() + .build_txns(ready_accounts, account_generator.clone())?; // If the plan doesn't consume nonces, accounts can be used by other processes immediately. if !iterator.consume_nonce { diff --git a/src/main.rs b/src/main.rs index 6935de8..0ac9d3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,18 +14,18 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::{ - io::{AsyncBufReadExt, BufReader as TokioBufReader}, - sync::RwLock, -}; +use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader}; use tracing::{info, Level}; use crate::{ - actors::{Monitor, RegisterTxnPlan, consumer::Consumer, producer::Producer}, + actors::{consumer::Consumer, producer::Producer, Monitor, RegisterTxnPlan}, config::{BenchConfig, ContractConfig}, eth::EthHttpCli, txn_plan::{ - PlanBuilder, TxnPlan, addr_pool::AddressPool, constructor::FaucetTreePlanBuilder, faucet_txn_builder::{Erc20FaucetTxnBuilder, EthFaucetTxnBuilder, FaucetTxnBuilder} + addr_pool::AddressPool, + constructor::FaucetTreePlanBuilder, + faucet_txn_builder::{Erc20FaucetTxnBuilder, EthFaucetTxnBuilder, FaucetTxnBuilder}, + PlanBuilder, TxnPlan, }, util::gen_account::{AccountGenerator, AccountManager}, }; @@ -261,7 +261,10 @@ async fn start_bench() -> Result<()> { }); contract_config }; - let mut accout_generator = AccountGenerator::with_capacity(benchmark_config.accounts.num_accounts, PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap()); + let mut accout_generator = AccountGenerator::with_capacity( + benchmark_config.accounts.num_accounts, + PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap(), + ); let account_ids = accout_generator .gen_account(0, benchmark_config.accounts.num_accounts as u64) .unwrap(); @@ -281,8 +284,6 @@ async fn start_bench() -> Result<()> { }) .collect(); - - let chain_id = benchmark_config.nodes[0].chain_id; info!("Initializing Faucet constructor..."); @@ -332,11 +333,14 @@ async fn start_bench() -> Result<()> { .unwrap(); tokens_plan.push(token_faucet_builder); } - + let account_manager = accout_generator.to_manager(); let address_pool: Arc = Arc::new( - txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(account_ids.clone(), account_manager.clone()), + txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new( + account_ids.clone(), + account_manager.clone(), + ), ); // Use the same client instances for Consumer to share metrics diff --git a/src/txn_plan/addr_pool/managed_address_pool.rs b/src/txn_plan/addr_pool/managed_address_pool.rs index a9beec9..c564e3d 100644 --- a/src/txn_plan/addr_pool/managed_address_pool.rs +++ b/src/txn_plan/addr_pool/managed_address_pool.rs @@ -1,10 +1,9 @@ -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use parking_lot::Mutex; -use tokio::sync::RwLock; use super::AddressPool; -use crate::util::gen_account::{AccountGenerator, AccountId, AccountManager}; +use crate::util::gen_account::{AccountId, AccountManager}; struct Inner { account_status: HashMap, @@ -22,7 +21,7 @@ impl RandomAddressPool { pub fn new(account_ids: Vec, account_generator: AccountManager) -> Self { let mut account_status = HashMap::new(); let mut ready_accounts = Vec::new(); - + for &account_id in account_ids.iter() { // assume all address start from nonce, this is correct beacause a nonce too low error will trigger correct nonce let nonce = 0; @@ -112,7 +111,7 @@ impl AddressPool for RandomAddressPool { fn select_receiver(&self, excluded: AccountId) -> AccountId { let inner = self.inner.lock(); - + loop { let idx = rand::random::() % inner.all_account_ids.len(); let account_id = inner.all_account_ids[idx]; diff --git a/src/txn_plan/addr_pool/weighted_address_pool.rs b/src/txn_plan/addr_pool/weighted_address_pool.rs index 60d1295..3211cf4 100644 --- a/src/txn_plan/addr_pool/weighted_address_pool.rs +++ b/src/txn_plan/addr_pool/weighted_address_pool.rs @@ -46,7 +46,7 @@ impl WeightedAddressPool { let mut hot_accounts = Vec::with_capacity(hot_count); let mut normal_accounts = Vec::with_capacity(normal_count); let mut long_tail_accounts = Vec::with_capacity(total_accounts - hot_count - normal_count); - + for (i, &account_id) in all_account_ids.iter().enumerate() { if i < hot_count { account_categories.insert(account_id, AccountCategory::Hot); @@ -241,7 +241,7 @@ impl AddressPool for WeightedAddressPool { fn select_receiver(&self, excluded: AccountId) -> AccountId { let inner = self.inner.lock(); - + loop { let idx = rand::random::() % inner.all_account_ids.len(); let account_id = inner.all_account_ids[idx]; diff --git a/src/txn_plan/constructor/approve.rs b/src/txn_plan/constructor/approve.rs index c27715e..c972efa 100644 --- a/src/txn_plan/constructor/approve.rs +++ b/src/txn_plan/constructor/approve.rs @@ -1,14 +1,15 @@ -use std::sync::Arc; - use alloy::{ network::TransactionBuilder, primitives::{Address, Bytes, U256}, rpc::types::TransactionRequest, - signers::local::PrivateKeySigner, sol_types::SolCall, }; -use crate::{config::IERC20, txn_plan::traits::FromTxnConstructor, util::gen_account::{AccountGenerator, AccountId, AccountManager}}; +use crate::{ + config::IERC20, + txn_plan::traits::FromTxnConstructor, + util::gen_account::{AccountId, AccountManager}, +}; /// ERC20 approve constructor /// Approve tokens for multiple accounts to a specified spender (e.g. Uniswap Router) diff --git a/src/txn_plan/constructor/distribute_token.rs b/src/txn_plan/constructor/distribute_token.rs index 8b94bdf..e891211 100644 --- a/src/txn_plan/constructor/distribute_token.rs +++ b/src/txn_plan/constructor/distribute_token.rs @@ -1,12 +1,13 @@ -use std::sync::Arc; - use alloy::{ primitives::{Address, U256}, rpc::types::TransactionRequest, - signers::local::PrivateKeySigner, }; -use crate::{eth::TxnBuilder, txn_plan::traits::FromTxnConstructor, util::gen_account::{AccountGenerator, AccountId, AccountManager}}; +use crate::{ + eth::TxnBuilder, + txn_plan::traits::FromTxnConstructor, + util::gen_account::{AccountId, AccountManager}, +}; /// Token distribute constructor /// Distribute tokens to accounts using ETH diff --git a/src/txn_plan/constructor/erc20_transfer.rs b/src/txn_plan/constructor/erc20_transfer.rs index 0aad4a3..ae18f17 100644 --- a/src/txn_plan/constructor/erc20_transfer.rs +++ b/src/txn_plan/constructor/erc20_transfer.rs @@ -1,12 +1,12 @@ use crate::{ config::IERC20, - txn_plan::{FromTxnConstructor, addr_pool::AddressPool}, util::gen_account::{AccountGenerator, AccountId, AccountManager}, + txn_plan::{addr_pool::AddressPool, FromTxnConstructor}, + util::gen_account::{AccountId, AccountManager}, }; use alloy::{ network::TransactionBuilder, primitives::{Address, Bytes, U256}, rpc::types::TransactionRequest, - signers::local::PrivateKeySigner, sol_types::SolCall, }; use std::sync::Arc; diff --git a/src/txn_plan/constructor/faucet.rs b/src/txn_plan/constructor/faucet.rs index 3ff57a3..a93daaf 100644 --- a/src/txn_plan/constructor/faucet.rs +++ b/src/txn_plan/constructor/faucet.rs @@ -13,7 +13,6 @@ use std::{ marker::PhantomData, sync::{atomic::AtomicU64, Arc, Mutex}, }; -use tokio::sync::RwLock; use tracing::info; // Gas parameters must match the values used in the plan executor. diff --git a/src/txn_plan/constructor/swap_token_2_token.rs b/src/txn_plan/constructor/swap_token_2_token.rs index f419a70..8c4d5ff 100644 --- a/src/txn_plan/constructor/swap_token_2_token.rs +++ b/src/txn_plan/constructor/swap_token_2_token.rs @@ -1,12 +1,12 @@ use crate::{ config::{IUniswapV2Router, LiquidityPair}, - txn_plan::{FromTxnConstructor, addr_pool::AddressPool}, util::gen_account::{AccountGenerator, AccountId, AccountManager}, + txn_plan::{addr_pool::AddressPool, FromTxnConstructor}, + util::gen_account::{AccountId, AccountManager}, }; use alloy::{ network::TransactionBuilder, primitives::{Address, Bytes, U256}, rpc::types::TransactionRequest, - signers::local::PrivateKeySigner, sol_types::SolCall, }; use std::{str::FromStr, sync::Arc}; diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index 3944934..19ec12a 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -3,7 +3,8 @@ use crate::{ txn_plan::{ faucet_txn_builder::FaucetTxnBuilder, traits::{PlanExecutionMode, PlanId, SignedTxnWithMetadata, TxnMetadata, TxnPlan}, - }, util::gen_account::AccountManager, + }, + util::gen_account::AccountManager, }; use alloy::{ eips::Encodable2718, diff --git a/src/txn_plan/plan.rs b/src/txn_plan/plan.rs index 6400359..9ac4cea 100644 --- a/src/txn_plan/plan.rs +++ b/src/txn_plan/plan.rs @@ -1,20 +1,18 @@ use crate::{ eth::TxnBuilder, txn_plan::{ - TxnIter, traits::{ + traits::{ FromTxnConstructor, PlanExecutionMode, PlanId, SignedTxnWithMetadata, ToTxnConstructor, TxnMetadata, TxnPlan, - } + }, + TxnIter, }, - util::gen_account::{AccountGenerator, AccountId, AccountManager}, -}; -use alloy::{ - consensus::{TxEnvelope, transaction::SignerRecoverable}, eips::Encodable2718, + util::gen_account::{AccountId, AccountManager}, }; +use alloy::{consensus::transaction::SignerRecoverable, eips::Encodable2718}; use rayon::iter::IntoParallelIterator; use rayon::iter::ParallelIterator; use std::sync::Arc; -use tokio::sync::RwLock; use uuid::Uuid; /// Concurrency control parameters @@ -80,8 +78,7 @@ impl TxnPlan for ManyToOnePlan { let plan_id = self.id.clone(); let constructor = self.constructor.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); - - + // 4. Create async stream, process in batches let handle = tokio::task::spawn_blocking(move || { ready_accounts @@ -91,9 +88,14 @@ impl TxnPlan for ManyToOnePlan { .into_par_iter() .map(|(from_account_id, nonce)| { let address = account_generator.get_address_by_id(*from_account_id); - let signer = account_generator.get_signer_by_id(*from_account_id).clone(); + let signer = + account_generator.get_signer_by_id(*from_account_id).clone(); let tx_request = constructor - .build_for_sender(*from_account_id, account_generator.clone(), *nonce as u64) + .build_for_sender( + *from_account_id, + account_generator.clone(), + *nonce as u64, + ) .unwrap(); let metadata = Arc::new(TxnMetadata { from_account: Arc::new(address), @@ -102,7 +104,8 @@ impl TxnPlan for ManyToOnePlan { plan_id: plan_id.clone(), }); let tx_envelope = - TxnBuilder::build_and_sign_transaction(tx_request, &signer).unwrap(); + TxnBuilder::build_and_sign_transaction(tx_request, &signer) + .unwrap(); SignedTxnWithMetadata { bytes: tx_envelope.encoded_2718(), metadata, @@ -185,10 +188,10 @@ impl TxnPlan for OneToManyPlan { let constructor = self.constructor.clone(); let chain_id = self.chain_id; let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); - + // Convert AccountId to addresses let addresses = ready_accounts; - + let handle = tokio::task::spawn_blocking(move || { addresses .chunks(1024) @@ -197,7 +200,13 @@ impl TxnPlan for OneToManyPlan { .into_par_iter() .flat_map(|(to_account_id, _nonce)| { // Build transaction request - let txs = constructor.build_for_receiver(*to_account_id, account_generator.clone(), chain_id).unwrap(); + let txs = constructor + .build_for_receiver( + *to_account_id, + account_generator.clone(), + chain_id, + ) + .unwrap(); txs.into_iter() .map(|tx_envelope| { let metadata = Arc::new(TxnMetadata { diff --git a/src/txn_plan/plan_builder.rs b/src/txn_plan/plan_builder.rs index 1f3893d..a0e6f7c 100644 --- a/src/txn_plan/plan_builder.rs +++ b/src/txn_plan/plan_builder.rs @@ -4,7 +4,6 @@ use alloy::{ primitives::{Address, U256}, signers::local::PrivateKeySigner, }; -use tokio::sync::RwLock; use crate::{ config::LiquidityPair, diff --git a/src/txn_plan/traits.rs b/src/txn_plan/traits.rs index ecb1cf5..2543cb9 100644 --- a/src/txn_plan/traits.rs +++ b/src/txn_plan/traits.rs @@ -4,11 +4,9 @@ use actix::Message; use alloy::consensus::TxEnvelope; use alloy::primitives::Address; use alloy::rpc::types::TransactionRequest; -use alloy::signers::local::PrivateKeySigner; -use tokio::sync::RwLock; use uuid::Uuid; -use crate::util::gen_account::{AccountGenerator, AccountId, AccountManager}; +use crate::util::gen_account::{AccountId, AccountManager}; #[derive(Debug, Clone, Message)] #[rtype(result = "anyhow::Result<()>")] diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index f21f35b..d16304b 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -1,8 +1,10 @@ use std::{ collections::HashMap, sync::{ - Arc, atomic::{AtomicU64, Ordering} - }, u32, + atomic::{AtomicU64, Ordering}, + Arc, + }, + u32, }; use alloy::{ @@ -11,7 +13,6 @@ use alloy::{ }; use anyhow::{Context, Result}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use tokio::sync::RwLock; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct AccountId(u32); @@ -81,11 +82,7 @@ impl AccountGenerator { (0..self.accouts.len()).map(|i| (AccountId(i as u32), self.init_nonces[i].clone())) } - pub fn gen_account( - &mut self, - start_index: u64, - size: u64, - ) -> Result> { + pub fn gen_account(&mut self, start_index: u64, size: u64) -> Result> { let begin_index = self.accouts.len() as u64; let end_index = start_index + size; if begin_index < end_index { From 4a3a65e89b2f9a3a664b26a3c4c77808b98d8217 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 16:48:51 +0800 Subject: [PATCH 08/21] add txn --- src/actors/producer/producer_actor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 180e24a..16de2cd 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -427,7 +427,6 @@ impl Handler for Producer { Box::pin( async move { let account_id = account_generator.get_id_by_address(&account); - if let Some(account_id) = account_id { match msg.result.as_ref() { SubmissionResult::Success(_) => { From 90c28534f455d1f665cd1452caf8bef9be1ef596 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 16:49:44 +0800 Subject: [PATCH 09/21] add txn --- src/txn_plan/addr_pool/managed_address_pool.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/txn_plan/addr_pool/managed_address_pool.rs b/src/txn_plan/addr_pool/managed_address_pool.rs index c564e3d..ce71530 100644 --- a/src/txn_plan/addr_pool/managed_address_pool.rs +++ b/src/txn_plan/addr_pool/managed_address_pool.rs @@ -13,6 +13,7 @@ struct Inner { pub struct RandomAddressPool { inner: Mutex, + #[allow(unused)] account_generator: AccountManager, } From 12f8360814ae618059deb1df5d2767576115f379 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 16:55:39 +0800 Subject: [PATCH 10/21] add txn --- src/actors/producer/producer_actor.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 16de2cd..79b70a4 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -1,5 +1,4 @@ use actix::prelude::*; -use alloy::primitives::Address; use dashmap::DashMap; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; @@ -14,7 +13,7 @@ use crate::actors::monitor::{ }; use crate::actors::{ExeFrontPlan, PauseProducer, ResumeProducer}; use crate::txn_plan::{addr_pool::AddressPool, PlanExecutionMode, PlanId, TxnPlan}; -use crate::util::gen_account::AccountManager; +use crate::util::gen_account::{AccountId, AccountManager}; use super::messages::RegisterTxnPlan; @@ -70,7 +69,7 @@ pub struct Producer { monitor_addr: Addr, consumer_addr: Addr, - nonce_cache: Arc, u32>>, + nonce_cache: Arc>, account_generator: AccountManager, @@ -92,9 +91,8 @@ impl Producer { let nonce_cache = Arc::new(DashMap::new()); address_pool.clean_ready_accounts(); for (account_id, nonce) in account_generator.account_ids_with_nonce() { - let address = Arc::new(account_generator.get_address_by_id(account_id)); let nonce = nonce.load(Ordering::Relaxed) as u32; - nonce_cache.insert(address.clone(), nonce); + nonce_cache.insert(account_id, nonce); address_pool.unlock_correct_nonce(account_id, nonce); } Ok(Self { @@ -157,7 +155,7 @@ impl Producer { mut plan: Box, sending_txns: Arc, state: ProducerState, - nonce_cache: Arc, u32>>, + nonce_cache: Arc>, ) -> Result<(), anyhow::Error> { let plan_id = plan.id().clone(); @@ -187,7 +185,8 @@ impl Producer { tracing::debug!("Producer is paused"); tokio::time::sleep(Duration::from_millis(500)).await; } - let next_nonce = match nonce_cache.get(signed_txn.metadata.from_account.as_ref()) { + let account_id = account_generator.get_id_by_address(&signed_txn.metadata.from_account).unwrap(); + let next_nonce = match nonce_cache.get(&account_id) { Some(nonce) => *nonce, None => 0, }; @@ -416,8 +415,9 @@ impl Handler for Producer { } SubmissionResult::NonceTooLow { expect_nonce, .. } => { self.stats.success_txns += 1; + let account_id = account_generator.get_id_by_address(&account).unwrap(); self.nonce_cache - .insert(account.clone(), *expect_nonce as u32); + .insert(account_id, *expect_nonce as u32); } SubmissionResult::ErrorWithRetry => { self.stats.failed_txns += 1; From 0c9184abd2f0acdb0344f0712093b07e031bd672 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 16:58:22 +0800 Subject: [PATCH 11/21] fix --- src/main.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0ac9d3e..cc5f615 100644 --- a/src/main.rs +++ b/src/main.rs @@ -452,8 +452,11 @@ static ALLOC: dhat::Alloc = dhat::Alloc; #[actix::main] async fn main() -> Result<()> { - #[cfg(feature = "dhat-heap")] - let _profiler = dhat::Profiler::new_heap(); + #[cfg(feature = "dhat-heap")] + let _profiler = { + println!("starting heap profiler..."); + dhat::Profiler::new_heap(); + }; let res = async { start_bench().await }; let ctrl_c = async { tokio::signal::ctrl_c() From 74cc21bcf5ddb453063b68a18891280f21bb20c9 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 16:59:40 +0800 Subject: [PATCH 12/21] add txn --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index cc5f615..fe17aa0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -455,7 +455,7 @@ async fn main() -> Result<()> { #[cfg(feature = "dhat-heap")] let _profiler = { println!("starting heap profiler..."); - dhat::Profiler::new_heap(); + dhat::Profiler::new_heap() }; let res = async { start_bench().await }; let ctrl_c = async { From 901f15b61a60ecbc251213f806faa8122902448c Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 17:20:22 +0800 Subject: [PATCH 13/21] add txn --- src/actors/consumer/actor.rs | 2 +- src/actors/producer/producer_actor.rs | 40 ++++++++++++--------------- src/txn_plan/constructor/faucet.rs | 22 ++++++--------- src/txn_plan/faucet_plan.rs | 19 +++++++------ src/txn_plan/plan.rs | 8 +++--- src/txn_plan/traits.rs | 4 ++- src/util/gen_account.rs | 22 ++++----------- 7 files changed, 52 insertions(+), 65 deletions(-) diff --git a/src/actors/consumer/actor.rs b/src/actors/consumer/actor.rs index cb5c424..8c15511 100644 --- a/src/actors/consumer/actor.rs +++ b/src/actors/consumer/actor.rs @@ -268,7 +268,7 @@ impl Consumer { .provider(&url) .await .unwrap() - .get_txn_count(*metadata.from_account.as_ref()) + .get_txn_count(metadata.from_account.as_ref().clone()) .await { // If on-chain nonce is greater than our attempted nonce, our transaction is indeed outdated diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 79b70a4..2c9bdd4 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -185,7 +185,7 @@ impl Producer { tracing::debug!("Producer is paused"); tokio::time::sleep(Duration::from_millis(500)).await; } - let account_id = account_generator.get_id_by_address(&signed_txn.metadata.from_account).unwrap(); + let account_id = signed_txn.metadata.from_account_id; let next_nonce = match nonce_cache.get(&account_id) { Some(nonce) => *nonce, None => 0, @@ -415,7 +415,7 @@ impl Handler for Producer { } SubmissionResult::NonceTooLow { expect_nonce, .. } => { self.stats.success_txns += 1; - let account_id = account_generator.get_id_by_address(&account).unwrap(); + let account_id = msg.metadata.from_account_id; self.nonce_cache .insert(account_id, *expect_nonce as u32); } @@ -426,27 +426,23 @@ impl Handler for Producer { let ready_accounts = self.stats.ready_accounts.clone(); Box::pin( async move { - let account_id = account_generator.get_id_by_address(&account); - if let Some(account_id) = account_id { - match msg.result.as_ref() { - SubmissionResult::Success(_) => { - address_pool.unlock_next_nonce(account_id); - } - SubmissionResult::NonceTooLow { expect_nonce, .. } => { - tracing::debug!( - "Nonce too low for account {:?}, expect nonce: {}, actual nonce: {}", - account, - expect_nonce, - msg.metadata.nonce - ); - address_pool.unlock_correct_nonce(account_id, *expect_nonce as u32); - } - SubmissionResult::ErrorWithRetry => { - address_pool.retry_current_nonce(account_id); - } + let account_id = msg.metadata.from_account_id; + match msg.result.as_ref() { + SubmissionResult::Success(_) => { + address_pool.unlock_next_nonce(account_id); + } + SubmissionResult::NonceTooLow { expect_nonce, .. } => { + tracing::debug!( + "Nonce too low for account {:?}, expect nonce: {}, actual nonce: {}", + account, + expect_nonce, + msg.metadata.nonce + ); + address_pool.unlock_correct_nonce(account_id, *expect_nonce as u32); + } + SubmissionResult::ErrorWithRetry => { + address_pool.retry_current_nonce(account_id); } - } else { - tracing::warn!("Account {:?} not found in account generator", account); } ready_accounts.store(address_pool.ready_len() as u64, Ordering::Relaxed); } diff --git a/src/txn_plan/constructor/faucet.rs b/src/txn_plan/constructor/faucet.rs index a93daaf..b6f16ef 100644 --- a/src/txn_plan/constructor/faucet.rs +++ b/src/txn_plan/constructor/faucet.rs @@ -2,7 +2,7 @@ use crate::{ txn_plan::{ faucet_plan::LevelFaucetPlan, faucet_txn_builder::FaucetTxnBuilder, traits::TxnPlan, }, - util::gen_account::AccountGenerator, + util::gen_account::{AccountGenerator, AccountId}, }; use alloy::{ primitives::{Address, U256}, @@ -22,8 +22,8 @@ static NONCE_MAP: std::sync::OnceLock> std::sync::OnceLock::new(); pub struct FaucetTreePlanBuilder { - faucet: Arc, - account_levels: Vec>>, + faucet_id: AccountId, + account_levels: Vec>, final_recipients: Arc>>, amount_per_recipient: U256, nonce_map: Arc>>>, @@ -118,12 +118,7 @@ impl FaucetTreePlanBuilder { let account_ids = account_generator .gen_account(start_index as u64, num_accounts_at_level as u64) .unwrap(); - account_levels.push( - account_ids - .iter() - .map(|&id| Arc::new(account_generator.get_signer_by_id(id).clone())) - .collect::>(), - ); + account_levels.push(account_ids); start_index += num_accounts_at_level as usize; } } @@ -137,15 +132,16 @@ impl FaucetTreePlanBuilder { for level in &account_levels { for acc in level { + let address = account_generator.get_address_by_id(*acc); nonce_map - .entry(acc.address()) + .entry(address) .or_insert_with(|| Arc::new(AtomicU64::new(0))); } } } info!("FaucetTreePlanBuilder: balance={:?}, amount_per_recipient={:?}, intermediate_funding_amounts={:?}, accounts_levels={:?}, accounts_num={:?}", faucet_balance, amount_per_recipient, intermediate_funding_amounts, account_levels.len(), total_accounts); Self { - faucet: Arc::new(faucet), + faucet_id: account_generator.faucet_accout_id(), account_levels, final_recipients, amount_per_recipient, @@ -184,9 +180,9 @@ impl FaucetTreePlanBuilder { } } - fn get_senders_for_level(&self, level: usize) -> Vec> { + fn get_senders_for_level(&self, level: usize) -> Vec { if level == 0 { - vec![self.faucet.clone()] + vec![self.faucet_id] } else { self.account_levels[level - 1].clone() } diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index 19ec12a..9dca072 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -4,7 +4,7 @@ use crate::{ faucet_txn_builder::FaucetTxnBuilder, traits::{PlanExecutionMode, PlanId, SignedTxnWithMetadata, TxnMetadata, TxnPlan}, }, - util::gen_account::AccountManager, + util::gen_account::{AccountId, AccountManager}, }; use alloy::{ eips::Encodable2718, @@ -32,9 +32,9 @@ pub struct LevelFaucetPlan { execution_mode: PlanExecutionMode, chain_id: u64, level: usize, - senders: Vec>, + senders: Vec, final_recipients: Arc>>, - account_levels: Vec>>, + account_levels: Vec>, amount_per_recipient: U256, intermediate_funding_amounts: Vec, degree: usize, @@ -51,9 +51,9 @@ impl LevelFaucetPlan { chain_id: u64, level: usize, account_init_nonce: Arc>, - senders: Vec>, + senders: Vec, final_recipients: Arc>>, - account_levels: Vec>>, + account_levels: Vec>, amount_per_recipient: U256, intermediate_funding_amounts: Vec, degree: usize, @@ -108,7 +108,7 @@ impl TxnPlan for LevelFaucetPlan { fn build_txns( &mut self, _ready_accounts: Vec<(crate::util::gen_account::AccountId, u32)>, - _account_generator: AccountManager, + account_generator: AccountManager, ) -> Result { let plan_id = self.id.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); @@ -132,7 +132,8 @@ impl TxnPlan for LevelFaucetPlan { chunk .into_par_iter() .enumerate() - .for_each(|(sender_index, sender_signer)| { + .for_each(|(sender_index, sender_signer_id)| { + let sender_signer = account_generator.get_signer_by_id(*sender_signer_id); let start_index = (chunk_index * 1024 + sender_index) * degree; let end_index = (start_index + degree).min(final_recipients.len()); if end_index < start_index { @@ -144,7 +145,8 @@ impl TxnPlan for LevelFaucetPlan { let val = amount_per_recipient; (to, val) } else { - let to = account_levels[level][i].address(); + let to_id = account_levels[level][i]; + let to = account_generator.get_address_by_id(to_id); let val = intermediate_funding_amounts[level]; (Arc::new(to), val) }; @@ -173,6 +175,7 @@ impl TxnPlan for LevelFaucetPlan { let metadata = Arc::new(TxnMetadata { from_account: Arc::new(sender_signer.address()), nonce, + from_account_id: *sender_signer_id, txn_id: Uuid::new_v4(), plan_id: plan_id.clone(), }); diff --git a/src/txn_plan/plan.rs b/src/txn_plan/plan.rs index 9ac4cea..a8e314d 100644 --- a/src/txn_plan/plan.rs +++ b/src/txn_plan/plan.rs @@ -101,6 +101,7 @@ impl TxnPlan for ManyToOnePlan { from_account: Arc::new(address), nonce: *nonce as u64, txn_id: Uuid::new_v4(), + from_account_id: *from_account_id, plan_id: plan_id.clone(), }); let tx_envelope = @@ -208,11 +209,10 @@ impl TxnPlan for OneToManyPlan { ) .unwrap(); txs.into_iter() - .map(|tx_envelope| { + .map(|(from_account_id, tx_envelope)| { let metadata = Arc::new(TxnMetadata { - from_account: Arc::new( - tx_envelope.recover_signer_unchecked().unwrap(), - ), + from_account: Arc::new(account_generator.get_address_by_id(from_account_id)), + from_account_id: from_account_id, nonce: 0, txn_id: Uuid::new_v4(), plan_id: plan_id.clone(), diff --git a/src/txn_plan/traits.rs b/src/txn_plan/traits.rs index 2543cb9..38fc8a9 100644 --- a/src/txn_plan/traits.rs +++ b/src/txn_plan/traits.rs @@ -21,6 +21,7 @@ pub struct TxnMetadata { pub txn_id: Uuid, pub plan_id: PlanId, pub from_account: Arc
, + pub from_account_id: AccountId, pub nonce: u64, } @@ -62,12 +63,13 @@ pub trait FromTxnConstructor: Send + Sync + 'static { pub trait ToTxnConstructor: Send + Sync + 'static { /// Build transaction based on receiver information. + /// return the from account id and the transaction envelope fn build_for_receiver( &self, to_account_id: AccountId, account_generator: AccountManager, chain_id: u64, - ) -> Result, anyhow::Error>; + ) -> Result, anyhow::Error>; /// Provide transaction description. fn description(&self) -> &'static str; diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index d16304b..a4d4022 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -21,7 +21,6 @@ pub struct AccountGenerator { accouts: Vec, faucet_accout: PrivateKeySigner, faucet_accout_id: AccountId, - accout_to_id: HashMap, init_nonces: Vec>, } @@ -30,11 +29,10 @@ pub type AccountManager = Arc; impl AccountGenerator { pub fn with_capacity(capacity: usize, faucet_accout: PrivateKeySigner) -> Self { Self { - accouts: Vec::with_capacity(capacity), + accouts: Vec::new(), faucet_accout, faucet_accout_id: AccountId(u32::MAX), - accout_to_id: HashMap::with_capacity(capacity), - init_nonces: Vec::with_capacity(capacity), + init_nonces: Vec::new(), } } @@ -50,6 +48,10 @@ impl AccountGenerator { } } + pub fn faucet_accout_id(&self) -> AccountId { + self.faucet_accout_id + } + pub fn get_address_by_id(&self, id: AccountId) -> Address { if id == self.faucet_accout_id { self.faucet_accout.address() @@ -58,14 +60,6 @@ impl AccountGenerator { } } - pub fn get_id_by_address(&self, address: &Address) -> Option { - if address == &self.faucet_accout.address() { - Some(self.faucet_accout_id) - } else { - self.accout_to_id.get(address).copied() - } - } - pub fn init_nonce_map(&self) -> HashMap { let mut map = HashMap::new(); for (account, nonce) in self.accouts_nonce_iter() { @@ -90,10 +84,6 @@ impl AccountGenerator { self.accouts.extend(res); self.init_nonces .extend((0..size).map(|_| Arc::new(AtomicU64::new(0)))); - for i in begin_index..end_index { - self.accout_to_id - .insert(self.accouts[i as usize].address(), AccountId(i as u32)); - } } let mut res = Vec::new(); for i in 0..size { From 389df8a549ec73204e3538a1a2529431008b5dca Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 17:22:02 +0800 Subject: [PATCH 14/21] add txn --- src/actors/producer/producer_actor.rs | 5 ++--- src/main.rs | 2 +- src/txn_plan/faucet_plan.rs | 12 ++++++------ src/txn_plan/plan.rs | 4 +++- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 2c9bdd4..cea8d9f 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -416,8 +416,7 @@ impl Handler for Producer { SubmissionResult::NonceTooLow { expect_nonce, .. } => { self.stats.success_txns += 1; let account_id = msg.metadata.from_account_id; - self.nonce_cache - .insert(account_id, *expect_nonce as u32); + self.nonce_cache.insert(account_id, *expect_nonce as u32); } SubmissionResult::ErrorWithRetry => { self.stats.failed_txns += 1; @@ -426,7 +425,7 @@ impl Handler for Producer { let ready_accounts = self.stats.ready_accounts.clone(); Box::pin( async move { - let account_id = msg.metadata.from_account_id; + let account_id = msg.metadata.from_account_id; match msg.result.as_ref() { SubmissionResult::Success(_) => { address_pool.unlock_next_nonce(account_id); diff --git a/src/main.rs b/src/main.rs index fe17aa0..118c41f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -452,7 +452,7 @@ static ALLOC: dhat::Alloc = dhat::Alloc; #[actix::main] async fn main() -> Result<()> { - #[cfg(feature = "dhat-heap")] + #[cfg(feature = "dhat-heap")] let _profiler = { println!("starting heap profiler..."); dhat::Profiler::new_heap() diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index 9dca072..75740bc 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -129,11 +129,10 @@ impl TxnPlan for LevelFaucetPlan { .chunks(1024) .enumerate() .for_each(|(chunk_index, chunk)| { - chunk - .into_par_iter() - .enumerate() - .for_each(|(sender_index, sender_signer_id)| { - let sender_signer = account_generator.get_signer_by_id(*sender_signer_id); + chunk.into_par_iter().enumerate().for_each( + |(sender_index, sender_signer_id)| { + let sender_signer = + account_generator.get_signer_by_id(*sender_signer_id); let start_index = (chunk_index * 1024 + sender_index) * degree; let end_index = (start_index + degree).min(final_recipients.len()); if end_index < start_index { @@ -186,7 +185,8 @@ impl TxnPlan for LevelFaucetPlan { }) .unwrap(); } - }) + }, + ) }); drop(tx); }); diff --git a/src/txn_plan/plan.rs b/src/txn_plan/plan.rs index a8e314d..1724d83 100644 --- a/src/txn_plan/plan.rs +++ b/src/txn_plan/plan.rs @@ -211,7 +211,9 @@ impl TxnPlan for OneToManyPlan { txs.into_iter() .map(|(from_account_id, tx_envelope)| { let metadata = Arc::new(TxnMetadata { - from_account: Arc::new(account_generator.get_address_by_id(from_account_id)), + from_account: Arc::new( + account_generator.get_address_by_id(from_account_id), + ), from_account_id: from_account_id, nonce: 0, txn_id: Uuid::new_v4(), From 09b7fcded85c6a068fd61ff02a9b762f334b2360 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 17:22:59 +0800 Subject: [PATCH 15/21] add txn --- src/actors/producer/producer_actor.rs | 4 +--- src/txn_plan/faucet_plan.rs | 1 - src/txn_plan/plan.rs | 2 +- src/util/gen_account.rs | 2 +- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index cea8d9f..6f38d30 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -406,8 +406,6 @@ impl Handler for Producer { fn handle(&mut self, msg: UpdateSubmissionResult, _ctx: &mut Self::Context) -> Self::Result { let address_pool = self.address_pool.clone(); - let account_generator = self.account_generator.clone(); - let account = msg.metadata.from_account.clone(); self.stats.sending_txns.fetch_sub(1, Ordering::Relaxed); match msg.result.as_ref() { SubmissionResult::Success(_) => { @@ -433,7 +431,7 @@ impl Handler for Producer { SubmissionResult::NonceTooLow { expect_nonce, .. } => { tracing::debug!( "Nonce too low for account {:?}, expect nonce: {}, actual nonce: {}", - account, + account_id, expect_nonce, msg.metadata.nonce ); diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index 75740bc..f6df74d 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -9,7 +9,6 @@ use crate::{ use alloy::{ eips::Encodable2718, primitives::{Address, U256}, - signers::local::PrivateKeySigner, }; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use std::{ diff --git a/src/txn_plan/plan.rs b/src/txn_plan/plan.rs index 1724d83..868b3c6 100644 --- a/src/txn_plan/plan.rs +++ b/src/txn_plan/plan.rs @@ -9,7 +9,7 @@ use crate::{ }, util::gen_account::{AccountId, AccountManager}, }; -use alloy::{consensus::transaction::SignerRecoverable, eips::Encodable2718}; +use alloy::eips::Encodable2718; use rayon::iter::IntoParallelIterator; use rayon::iter::ParallelIterator; use std::sync::Arc; diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index a4d4022..7bfa7a5 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -27,7 +27,7 @@ pub struct AccountGenerator { pub type AccountManager = Arc; impl AccountGenerator { - pub fn with_capacity(capacity: usize, faucet_accout: PrivateKeySigner) -> Self { + pub fn with_capacity(_capacity: usize, faucet_accout: PrivateKeySigner) -> Self { Self { accouts: Vec::new(), faucet_accout, From 36d73e4dd29d6b4aac8798dab72c5dda016fcf39 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 17:41:55 +0800 Subject: [PATCH 16/21] add txn --- src/main.rs | 2 +- src/util/gen_account.rs | 23 +++++++++++++---------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index 118c41f..57057e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -429,7 +429,7 @@ async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc, + accout_signers: Vec, + accout_addresses: Vec
, faucet_accout: PrivateKeySigner, faucet_accout_id: AccountId, init_nonces: Vec>, @@ -29,7 +30,8 @@ pub type AccountManager = Arc; impl AccountGenerator { pub fn with_capacity(_capacity: usize, faucet_accout: PrivateKeySigner) -> Self { Self { - accouts: Vec::new(), + accout_signers: Vec::new(), + accout_addresses: Vec::new(), faucet_accout, faucet_accout_id: AccountId(u32::MAX), init_nonces: Vec::new(), @@ -44,7 +46,7 @@ impl AccountGenerator { if id == self.faucet_accout_id { &self.faucet_accout } else { - &self.accouts[id.0 as usize] + &self.accout_signers[id.0 as usize] } } @@ -56,32 +58,33 @@ impl AccountGenerator { if id == self.faucet_accout_id { self.faucet_accout.address() } else { - self.accouts[id.0 as usize].address() + self.accout_signers[id.0 as usize].address() } } pub fn init_nonce_map(&self) -> HashMap { let mut map = HashMap::new(); for (account, nonce) in self.accouts_nonce_iter() { - map.insert(account.address(), nonce.load(Ordering::Relaxed)); + map.insert(account.clone(), nonce.load(Ordering::Relaxed)); } map } - pub fn accouts_nonce_iter(&self) -> impl Iterator)> { - self.accouts.iter().zip(self.init_nonces.iter().cloned()) + pub fn accouts_nonce_iter(&self) -> impl Iterator)> { + self.accout_addresses.iter().zip(self.init_nonces.iter().cloned()) } pub fn account_ids_with_nonce(&self) -> impl Iterator)> + '_ { - (0..self.accouts.len()).map(|i| (AccountId(i as u32), self.init_nonces[i].clone())) + (0..self.accout_signers.len()).map(|i| (AccountId(i as u32), self.init_nonces[i].clone())) } pub fn gen_account(&mut self, start_index: u64, size: u64) -> Result> { - let begin_index = self.accouts.len() as u64; + let begin_index = self.accout_signers.len() as u64; let end_index = start_index + size; if begin_index < end_index { let res = self.gen_deterministic_accounts(begin_index, end_index); - self.accouts.extend(res); + self.accout_addresses.extend(res.iter().map(|signer| signer.address())); + self.accout_signers.extend(res); self.init_nonces .extend((0..size).map(|_| Arc::new(AtomicU64::new(0)))); } From d9079a4a0b9a93d25b75c9562a70a0a13c7319d4 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 17:42:13 +0800 Subject: [PATCH 17/21] add txn --- src/util/gen_account.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index 77fb805..16be6d1 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -58,7 +58,7 @@ impl AccountGenerator { if id == self.faucet_accout_id { self.faucet_accout.address() } else { - self.accout_signers[id.0 as usize].address() + self.accout_addresses[id.0 as usize] } } From 38afc34cf4514f6b492315cf07e16f22bde325b6 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 19:46:22 +0800 Subject: [PATCH 18/21] add txn --- src/main.rs | 1 - src/util/gen_account.rs | 12 +++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 57057e2..98e8cfa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -262,7 +262,6 @@ async fn start_bench() -> Result<()> { contract_config }; let mut accout_generator = AccountGenerator::with_capacity( - benchmark_config.accounts.num_accounts, PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap(), ); let account_ids = accout_generator diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index 16be6d1..dfaac5a 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -28,7 +28,7 @@ pub struct AccountGenerator { pub type AccountManager = Arc; impl AccountGenerator { - pub fn with_capacity(_capacity: usize, faucet_accout: PrivateKeySigner) -> Self { + pub fn with_capacity(faucet_accout: PrivateKeySigner) -> Self { Self { accout_signers: Vec::new(), accout_addresses: Vec::new(), @@ -38,7 +38,10 @@ impl AccountGenerator { } } - pub fn to_manager(self) -> AccountManager { + pub fn to_manager(mut self) -> AccountManager { + self.accout_signers.shrink_to_fit(); + self.accout_addresses.shrink_to_fit(); + self.init_nonces.shrink_to_fit(); Arc::new(self) } @@ -83,12 +86,15 @@ impl AccountGenerator { let end_index = start_index + size; if begin_index < end_index { let res = self.gen_deterministic_accounts(begin_index, end_index); + self.accout_addresses.reserve_exact(res.len()); + self.accout_signers.reserve_exact(res.len()); + self.init_nonces.reserve(res.len()); self.accout_addresses.extend(res.iter().map(|signer| signer.address())); self.accout_signers.extend(res); self.init_nonces .extend((0..size).map(|_| Arc::new(AtomicU64::new(0)))); } - let mut res = Vec::new(); + let mut res = Vec::with_capacity(size as usize); for i in 0..size { res.push(AccountId((start_index + i) as u32)); } From a9fda264832097738c0a87f5824dd2f7d0ac92b9 Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 19:50:17 +0800 Subject: [PATCH 19/21] add txn --- src/util/gen_account.rs | 88 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index dfaac5a..478ca7d 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -42,9 +42,97 @@ impl AccountGenerator { self.accout_signers.shrink_to_fit(); self.accout_addresses.shrink_to_fit(); self.init_nonces.shrink_to_fit(); + + // 打印内存使用统计 + self.print_memory_summary(); + Arc::new(self) } + /// Calculate and print memory usage of AccountGenerator + pub fn print_memory_summary(&self) { + const GB: f64 = 1024.0 * 1024.0 * 1024.0; + + // Vec overhead (ptr, len, capacity) + let vec_overhead = std::mem::size_of::>() * 3; + + // accout_signers: Vec + // PrivateKeySigner contains 32-byte private key + signing_key structure + let signer_size = std::mem::size_of::(); + let signers_capacity = self.accout_signers.capacity() * signer_size; + let signers_used = self.accout_signers.len() * signer_size; + + // accout_addresses: Vec
+ // Address is 20 bytes + let address_size = std::mem::size_of::
(); + let addresses_capacity = self.accout_addresses.capacity() * address_size; + let addresses_used = self.accout_addresses.len() * address_size; + + // init_nonces: Vec> + // Arc pointer + reference count + let arc_size = std::mem::size_of::>(); + let atomic_u64_size = std::mem::size_of::(); + let nonces_capacity = self.init_nonces.capacity() * arc_size; + let nonces_used = self.init_nonces.len() * arc_size; + let nonces_heap = self.init_nonces.len() * atomic_u64_size; + + // faucet_accout: PrivateKeySigner + let faucet_size = signer_size; + + // faucet_accout_id: AccountId (u32) + let faucet_id_size = std::mem::size_of::(); + + // Total + let total_capacity = signers_capacity + addresses_capacity + nonces_capacity + + faucet_size + faucet_id_size + vec_overhead + nonces_heap; + let total_used = signers_used + addresses_used + nonces_used + + faucet_size + faucet_id_size + vec_overhead + nonces_heap; + + println!("╔══════════════════════════════════════════════════════════╗"); + println!("║ AccountGenerator Memory Usage Summary ║"); + println!("╠══════════════════════════════════════════════════════════╣"); + println!("║ Account Count: {:<43} ║", self.accout_signers.len()); + println!("╠══════════════════════════════════════════════════════════╣"); + println!("║ accout_signers (Vec): ║"); + println!("║ - Element size: {} bytes ║", signer_size); + println!("║ - Used: {:<10.6} GB ({} elements) ║", + signers_used as f64 / GB, self.accout_signers.len()); + println!("║ - Capacity: {:<10.6} GB ({} capacity) ║", + signers_capacity as f64 / GB, self.accout_signers.capacity()); + println!("╠══════════════════════════════════════════════════════════╣"); + println!("║ accout_addresses (Vec
): ║"); + println!("║ - Element size: {} bytes ║", address_size); + println!("║ - Used: {:<10.6} GB ({} elements) ║", + addresses_used as f64 / GB, self.accout_addresses.len()); + println!("║ - Capacity: {:<10.6} GB ({} capacity) ║", + addresses_capacity as f64 / GB, self.accout_addresses.capacity()); + println!("╠══════════════════════════════════════════════════════════╣"); + println!("║ init_nonces (Vec>): ║"); + println!("║ - Arc pointer size: {} bytes ║", arc_size); + println!("║ - Used: {:<10.6} GB ({} elements) ║", + nonces_used as f64 / GB, self.init_nonces.len()); + println!("║ - Capacity: {:<10.6} GB ({} capacity) ║", + nonces_capacity as f64 / GB, self.init_nonces.capacity()); + println!("║ - Heap AtomicU64: {:<10.6} GB ║", nonces_heap as f64 / GB); + println!("╠══════════════════════════════════════════════════════════╣"); + println!("║ faucet_accout (PrivateKeySigner): {:<24} ║", + format!("{:.9} GB", faucet_size as f64 / GB)); + println!("║ faucet_accout_id (AccountId/u32): {:<24} ║", + format!("{:.9} GB", faucet_id_size as f64 / GB)); + println!("║ Vec struct overhead: {:<35} ║", + format!("{:.9} GB", vec_overhead as f64 / GB)); + println!("╠══════════════════════════════════════════════════════════╣"); + println!("║ Total Memory Used: {:<39} ║", + format!("{:.6} GB", total_used as f64 / GB)); + println!("║ Total Memory Capacity: {:<35} ║", + format!("{:.6} GB", total_capacity as f64 / GB)); + println!("║ Wasted Space: {:<44} ║", + format!("{:.6} GB ({:.1}%)", + (total_capacity - total_used) as f64 / GB, + (total_capacity - total_used) as f64 / total_capacity as f64 * 100.0)); + println!("╚══════════════════════════════════════════════════════════╝"); + } + pub fn get_signer_by_id(&self, id: AccountId) -> &PrivateKeySigner { if id == self.faucet_accout_id { &self.faucet_accout From d2541489158cc3a2ac18e7ae726363da9b56fbde Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 20:33:14 +0800 Subject: [PATCH 20/21] add txn --- src/util/gen_account.rs | 149 ++++++++++++++-------------------------- 1 file changed, 51 insertions(+), 98 deletions(-) diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index 478ca7d..a5d459f 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -17,8 +17,49 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct AccountId(u32); +const CACHE_SIZE: usize = 1024 * 1024; + +pub struct AccountSignerCache { + signers: Vec, + size: usize, +} + +impl AccountSignerCache { + pub(crate) fn new(size: usize) -> Self { + Self { + signers: Vec::with_capacity(size), + size, + } + } + + pub(crate) fn save_signer(&mut self, signer: PrivateKeySigner, account_id: AccountId) { + if account_id.0 as usize >= self.size { + return; + } + if account_id.0 as usize == self.signers.len() { + self.signers.push(signer); + } else { + self.signers[account_id.0 as usize] = signer; + } + } + + pub(crate) fn get_signer(&self, index: usize) -> PrivateKeySigner { + if index >= self.signers.len() { + return Self::compute_signer(AccountId(index as u32)); + } + self.signers[index].clone() + } + + fn compute_signer(id: AccountId) -> PrivateKeySigner { + let private_key_bytes = keccak256((id.0 as u64).to_le_bytes()); + PrivateKeySigner::from_slice(private_key_bytes.as_slice()) + .context("Failed to create deterministic signer") + .unwrap() + } +} + pub struct AccountGenerator { - accout_signers: Vec, + accout_signers: AccountSignerCache, accout_addresses: Vec
, faucet_accout: PrivateKeySigner, faucet_accout_id: AccountId, @@ -30,7 +71,7 @@ pub type AccountManager = Arc; impl AccountGenerator { pub fn with_capacity(faucet_accout: PrivateKeySigner) -> Self { Self { - accout_signers: Vec::new(), + accout_signers: AccountSignerCache::new(CACHE_SIZE), accout_addresses: Vec::new(), faucet_accout, faucet_accout_id: AccountId(u32::MAX), @@ -39,105 +80,16 @@ impl AccountGenerator { } pub fn to_manager(mut self) -> AccountManager { - self.accout_signers.shrink_to_fit(); self.accout_addresses.shrink_to_fit(); self.init_nonces.shrink_to_fit(); - - // 打印内存使用统计 - self.print_memory_summary(); - Arc::new(self) } - /// Calculate and print memory usage of AccountGenerator - pub fn print_memory_summary(&self) { - const GB: f64 = 1024.0 * 1024.0 * 1024.0; - - // Vec overhead (ptr, len, capacity) - let vec_overhead = std::mem::size_of::>() * 3; - - // accout_signers: Vec - // PrivateKeySigner contains 32-byte private key + signing_key structure - let signer_size = std::mem::size_of::(); - let signers_capacity = self.accout_signers.capacity() * signer_size; - let signers_used = self.accout_signers.len() * signer_size; - - // accout_addresses: Vec
- // Address is 20 bytes - let address_size = std::mem::size_of::
(); - let addresses_capacity = self.accout_addresses.capacity() * address_size; - let addresses_used = self.accout_addresses.len() * address_size; - - // init_nonces: Vec> - // Arc pointer + reference count - let arc_size = std::mem::size_of::>(); - let atomic_u64_size = std::mem::size_of::(); - let nonces_capacity = self.init_nonces.capacity() * arc_size; - let nonces_used = self.init_nonces.len() * arc_size; - let nonces_heap = self.init_nonces.len() * atomic_u64_size; - - // faucet_accout: PrivateKeySigner - let faucet_size = signer_size; - - // faucet_accout_id: AccountId (u32) - let faucet_id_size = std::mem::size_of::(); - - // Total - let total_capacity = signers_capacity + addresses_capacity + nonces_capacity + - faucet_size + faucet_id_size + vec_overhead + nonces_heap; - let total_used = signers_used + addresses_used + nonces_used + - faucet_size + faucet_id_size + vec_overhead + nonces_heap; - - println!("╔══════════════════════════════════════════════════════════╗"); - println!("║ AccountGenerator Memory Usage Summary ║"); - println!("╠══════════════════════════════════════════════════════════╣"); - println!("║ Account Count: {:<43} ║", self.accout_signers.len()); - println!("╠══════════════════════════════════════════════════════════╣"); - println!("║ accout_signers (Vec): ║"); - println!("║ - Element size: {} bytes ║", signer_size); - println!("║ - Used: {:<10.6} GB ({} elements) ║", - signers_used as f64 / GB, self.accout_signers.len()); - println!("║ - Capacity: {:<10.6} GB ({} capacity) ║", - signers_capacity as f64 / GB, self.accout_signers.capacity()); - println!("╠══════════════════════════════════════════════════════════╣"); - println!("║ accout_addresses (Vec
): ║"); - println!("║ - Element size: {} bytes ║", address_size); - println!("║ - Used: {:<10.6} GB ({} elements) ║", - addresses_used as f64 / GB, self.accout_addresses.len()); - println!("║ - Capacity: {:<10.6} GB ({} capacity) ║", - addresses_capacity as f64 / GB, self.accout_addresses.capacity()); - println!("╠══════════════════════════════════════════════════════════╣"); - println!("║ init_nonces (Vec>): ║"); - println!("║ - Arc pointer size: {} bytes ║", arc_size); - println!("║ - Used: {:<10.6} GB ({} elements) ║", - nonces_used as f64 / GB, self.init_nonces.len()); - println!("║ - Capacity: {:<10.6} GB ({} capacity) ║", - nonces_capacity as f64 / GB, self.init_nonces.capacity()); - println!("║ - Heap AtomicU64: {:<10.6} GB ║", nonces_heap as f64 / GB); - println!("╠══════════════════════════════════════════════════════════╣"); - println!("║ faucet_accout (PrivateKeySigner): {:<24} ║", - format!("{:.9} GB", faucet_size as f64 / GB)); - println!("║ faucet_accout_id (AccountId/u32): {:<24} ║", - format!("{:.9} GB", faucet_id_size as f64 / GB)); - println!("║ Vec struct overhead: {:<35} ║", - format!("{:.9} GB", vec_overhead as f64 / GB)); - println!("╠══════════════════════════════════════════════════════════╣"); - println!("║ Total Memory Used: {:<39} ║", - format!("{:.6} GB", total_used as f64 / GB)); - println!("║ Total Memory Capacity: {:<35} ║", - format!("{:.6} GB", total_capacity as f64 / GB)); - println!("║ Wasted Space: {:<44} ║", - format!("{:.6} GB ({:.1}%)", - (total_capacity - total_used) as f64 / GB, - (total_capacity - total_used) as f64 / total_capacity as f64 * 100.0)); - println!("╚══════════════════════════════════════════════════════════╝"); - } - - pub fn get_signer_by_id(&self, id: AccountId) -> &PrivateKeySigner { + pub fn get_signer_by_id(&self, id: AccountId) -> PrivateKeySigner { if id == self.faucet_accout_id { - &self.faucet_accout + self.faucet_accout.clone() } else { - &self.accout_signers[id.0 as usize] + self.accout_signers.get_signer(id.0 as usize) } } @@ -166,19 +118,20 @@ impl AccountGenerator { } pub fn account_ids_with_nonce(&self) -> impl Iterator)> + '_ { - (0..self.accout_signers.len()).map(|i| (AccountId(i as u32), self.init_nonces[i].clone())) + (0..self.accout_addresses.len()).map(|i| (AccountId(i as u32), self.init_nonces[i].clone())) } pub fn gen_account(&mut self, start_index: u64, size: u64) -> Result> { - let begin_index = self.accout_signers.len() as u64; + let begin_index = self.accout_addresses.len() as u64; let end_index = start_index + size; if begin_index < end_index { let res = self.gen_deterministic_accounts(begin_index, end_index); self.accout_addresses.reserve_exact(res.len()); - self.accout_signers.reserve_exact(res.len()); self.init_nonces.reserve(res.len()); self.accout_addresses.extend(res.iter().map(|signer| signer.address())); - self.accout_signers.extend(res); + for (i, signer) in res.iter().enumerate() { + self.accout_signers.save_signer(signer.clone(), AccountId(i as u32)); + } self.init_nonces .extend((0..size).map(|_| Arc::new(AtomicU64::new(0)))); } From fce0438b1be9e1a0035512313b2980e8dc12f48c Mon Sep 17 00:00:00 2001 From: keanji-x Date: Thu, 4 Dec 2025 20:33:26 +0800 Subject: [PATCH 21/21] fmt --- src/util/gen_account.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index a5d459f..6a43b35 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -24,7 +24,7 @@ pub struct AccountSignerCache { size: usize, } -impl AccountSignerCache { +impl AccountSignerCache { pub(crate) fn new(size: usize) -> Self { Self { signers: Vec::with_capacity(size), @@ -114,7 +114,9 @@ impl AccountGenerator { } pub fn accouts_nonce_iter(&self) -> impl Iterator)> { - self.accout_addresses.iter().zip(self.init_nonces.iter().cloned()) + self.accout_addresses + .iter() + .zip(self.init_nonces.iter().cloned()) } pub fn account_ids_with_nonce(&self) -> impl Iterator)> + '_ { @@ -128,9 +130,11 @@ impl AccountGenerator { let res = self.gen_deterministic_accounts(begin_index, end_index); self.accout_addresses.reserve_exact(res.len()); self.init_nonces.reserve(res.len()); - self.accout_addresses.extend(res.iter().map(|signer| signer.address())); + self.accout_addresses + .extend(res.iter().map(|signer| signer.address())); for (i, signer) in res.iter().enumerate() { - self.accout_signers.save_signer(signer.clone(), AccountId(i as u32)); + self.accout_signers + .save_signer(signer.clone(), AccountId(i as u32)); } self.init_nonces .extend((0..size).map(|_| Arc::new(AtomicU64::new(0))));