diff --git a/src/actors/consumer/actor.rs b/src/actors/consumer/actor.rs index cb5c424..8c15511 100644 --- a/src/actors/consumer/actor.rs +++ b/src/actors/consumer/actor.rs @@ -268,7 +268,7 @@ impl Consumer { .provider(&url) .await .unwrap() - .get_txn_count(*metadata.from_account.as_ref()) + .get_txn_count(metadata.from_account.as_ref().clone()) .await { // If on-chain nonce is greater than our attempted nonce, our transaction is indeed outdated diff --git a/src/actors/producer/producer_actor.rs b/src/actors/producer/producer_actor.rs index 2394c03..6f38d30 100644 --- a/src/actors/producer/producer_actor.rs +++ b/src/actors/producer/producer_actor.rs @@ -1,11 +1,9 @@ use actix::prelude::*; -use alloy::primitives::Address; use dashmap::DashMap; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::RwLock; use crate::actors::consumer::Consumer; use crate::actors::monitor::monitor_actor::{PlanProduced, ProduceTxns}; @@ -15,7 +13,7 @@ use crate::actors::monitor::{ }; use crate::actors::{ExeFrontPlan, PauseProducer, ResumeProducer}; use crate::txn_plan::{addr_pool::AddressPool, PlanExecutionMode, PlanId, TxnPlan}; -use crate::util::gen_account::AccountGenerator; +use crate::util::gen_account::{AccountId, AccountManager}; use super::messages::RegisterTxnPlan; @@ -71,7 +69,9 @@ pub struct Producer { monitor_addr: Addr, consumer_addr: Addr, - nonce_cache: Arc, u32>>, + nonce_cache: Arc>, + + account_generator: AccountManager, /// A queue of plans waiting to be executed. Plans are processed in FIFO order. plan_queue: VecDeque>, @@ -86,16 +86,14 @@ impl Producer { address_pool: Arc, consumer_addr: Addr, monitor_addr: Addr, - account_generator: Arc>, + account_generator: AccountManager, ) -> Result { let nonce_cache = Arc::new(DashMap::new()); - let account_generator = account_generator.read().await; address_pool.clean_ready_accounts(); - for (account, nonce) in account_generator.accouts_nonce_iter() { - let address = Arc::new(account.address()); + for (account_id, nonce) in account_generator.account_ids_with_nonce() { let nonce = nonce.load(Ordering::Relaxed) as u32; - nonce_cache.insert(address.clone(), nonce); - address_pool.unlock_correct_nonce(address.clone(), nonce); + nonce_cache.insert(account_id, nonce); + address_pool.unlock_correct_nonce(account_id, nonce); } Ok(Self { state: ProducerState::running(), @@ -110,6 +108,7 @@ impl Producer { }, address_pool, nonce_cache, + account_generator, monitor_addr, consumer_addr, plan_queue: VecDeque::new(), @@ -152,17 +151,20 @@ impl Producer { monitor_addr: Addr, consumer_addr: Addr, address_pool: Arc, + account_generator: AccountManager, mut plan: Box, sending_txns: Arc, state: ProducerState, - nonce_cache: Arc, u32>>, + nonce_cache: Arc>, ) -> Result<(), anyhow::Error> { let plan_id = plan.id().clone(); // Fetch accounts and build transactions let ready_accounts = address_pool.fetch_senders(plan.size().unwrap_or_else(|| address_pool.len())); - let iterator = plan.as_mut().build_txns(ready_accounts)?; + let iterator = plan + .as_mut() + .build_txns(ready_accounts, account_generator.clone())?; // If the plan doesn't consume nonces, accounts can be used by other processes immediately. if !iterator.consume_nonce { @@ -183,7 +185,8 @@ impl Producer { tracing::debug!("Producer is paused"); tokio::time::sleep(Duration::from_millis(500)).await; } - let next_nonce = match nonce_cache.get(signed_txn.metadata.from_account.as_ref()) { + let account_id = signed_txn.metadata.from_account_id; + let next_nonce = match nonce_cache.get(&account_id) { Some(nonce) => *nonce, None => 0, }; @@ -269,6 +272,7 @@ impl Handler for Producer { let plan = self.plan_queue.pop_front().unwrap(); self.stats.remain_plans_num -= 1; let address_pool = self.address_pool.clone(); + let account_generator = self.account_generator.clone(); let monitor_addr = self.monitor_addr.clone(); let consumer_addr = self.consumer_addr.clone(); let self_addr = ctx.address(); @@ -292,6 +296,7 @@ impl Handler for Producer { monitor_addr, consumer_addr, address_pool, + account_generator, plan, sending_txns, state, @@ -401,7 +406,6 @@ impl Handler for Producer { fn handle(&mut self, msg: UpdateSubmissionResult, _ctx: &mut Self::Context) -> Self::Result { let address_pool = self.address_pool.clone(); - let account = msg.metadata.from_account.clone(); self.stats.sending_txns.fetch_sub(1, Ordering::Relaxed); match msg.result.as_ref() { SubmissionResult::Success(_) => { @@ -409,8 +413,8 @@ impl Handler for Producer { } SubmissionResult::NonceTooLow { expect_nonce, .. } => { self.stats.success_txns += 1; - self.nonce_cache - .insert(account.clone(), *expect_nonce as u32); + let account_id = msg.metadata.from_account_id; + self.nonce_cache.insert(account_id, *expect_nonce as u32); } SubmissionResult::ErrorWithRetry => { self.stats.failed_txns += 1; @@ -419,21 +423,22 @@ impl Handler for Producer { let ready_accounts = self.stats.ready_accounts.clone(); Box::pin( async move { + let account_id = msg.metadata.from_account_id; match msg.result.as_ref() { SubmissionResult::Success(_) => { - address_pool.unlock_next_nonce(account); + address_pool.unlock_next_nonce(account_id); } SubmissionResult::NonceTooLow { expect_nonce, .. } => { tracing::debug!( "Nonce too low for account {:?}, expect nonce: {}, actual nonce: {}", - account, + account_id, expect_nonce, msg.metadata.nonce ); - address_pool.unlock_correct_nonce(account, *expect_nonce as u32); + address_pool.unlock_correct_nonce(account_id, *expect_nonce as u32); } SubmissionResult::ErrorWithRetry => { - address_pool.retry_current_nonce(account); + address_pool.retry_current_nonce(account_id); } } ready_accounts.store(address_pool.ready_len() as u64, Ordering::Relaxed); diff --git a/src/main.rs b/src/main.rs index 764b368..98e8cfa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,10 +14,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader as TokioBufReader}, - sync::RwLock, -}; +use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader}; use tracing::{info, Level}; use crate::{ @@ -30,7 +27,7 @@ use crate::{ faucet_txn_builder::{Erc20FaucetTxnBuilder, EthFaucetTxnBuilder, FaucetTxnBuilder}, PlanBuilder, TxnPlan, }, - util::gen_account::AccountGenerator, + util::gen_account::{AccountGenerator, AccountManager}, }; #[derive(Parser, Debug)] @@ -212,11 +209,11 @@ fn run_command(command: &str) -> Result { } async fn get_init_nonce_map( - accout_generator: Arc>, + accout_generator: AccountManager, faucet_private_key: &str, eth_client: Arc, ) -> Arc> { - let mut init_nonce_map = accout_generator.read().await.init_nonce_map(); + let mut init_nonce_map = accout_generator.init_nonce_map(); let faucet_signer = PrivateKeySigner::from_str(faucet_private_key).unwrap(); let faucet_address = faucet_signer.address(); init_nonce_map.insert( @@ -264,19 +261,18 @@ async fn start_bench() -> Result<()> { }); contract_config }; - - let accout_generator = AccountGenerator::with_capacity(benchmark_config.accounts.num_accounts); - let accounts = accout_generator - .write() - .await + let mut accout_generator = AccountGenerator::with_capacity( + PrivateKeySigner::from_str(&benchmark_config.faucet.private_key).unwrap(), + ); + let account_ids = accout_generator .gen_account(0, benchmark_config.accounts.num_accounts as u64) .unwrap(); - let account_addresses = Arc::new( - accounts + let account_addresses = Arc::new({ + account_ids .iter() - .map(|(address, _)| address.clone()) - .collect::>(), - ); + .map(|&id| Arc::new(accout_generator.get_address_by_id(id))) + .collect::>() + }); // Create EthHttpCli instance let eth_clients: Vec> = benchmark_config .nodes @@ -287,10 +283,6 @@ async fn start_bench() -> Result<()> { }) .collect(); - let address_pool: Arc = Arc::new( - txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new(accounts.clone()), - ); - let chain_id = benchmark_config.nodes[0].chain_id; info!("Initializing Faucet constructor..."); @@ -305,32 +297,51 @@ async fn start_bench() -> Result<()> { U256::from(benchmark_config.num_tokens) * U256::from(21000) * U256::from(1000_000_000_000u64), - accout_generator.clone(), + &mut accout_generator, ) .await .unwrap(); if args.recover { - init_nonce(accout_generator.clone(), eth_clients[0].clone()).await; + init_nonce(&mut accout_generator, eth_clients[0].clone()).await; } let monitor = Monitor::new_with_clients( eth_clients.clone(), benchmark_config.performance.max_pool_size, ) .start(); - // let mut file = tokio::fs::File::create("accounts.txt").await.unwrap(); - // for (sign, nonce) in accout_generator.read().await.accouts_nonce_iter() { - // file.write( - // format!( - // "{}, {}, {}\n", - // hex::encode(sign.to_bytes()), - // sign.address().to_string(), - // nonce.load(Ordering::Relaxed), - // ) - // .as_bytes(), - // ) - // .await - // .unwrap(); - // } + + let tokens = contract_config.get_all_token(); + let mut tokens_plan = Vec::new(); + for token in &tokens { + start_nonce += benchmark_config.faucet.faucet_level as u64; + info!("distributing token: {}", token.address); + let token_address = Address::from_str(&token.address).unwrap(); + let faucet_token_balance = U256::from_str(&token.faucet_balance).unwrap(); + info!("balance of token: {}", faucet_token_balance); + let token_faucet_builder = PlanBuilder::create_faucet_tree_plan_builder( + benchmark_config.faucet.faucet_level as usize, + faucet_token_balance, + &benchmark_config.faucet.private_key, + start_nonce, + account_addresses.clone(), + Arc::new(Erc20FaucetTxnBuilder::new(token_address)), + U256::ZERO, + &mut accout_generator, + ) + .await + .unwrap(); + tokens_plan.push(token_faucet_builder); + } + + let account_manager = accout_generator.to_manager(); + + let address_pool: Arc = Arc::new( + txn_plan::addr_pool::managed_address_pool::RandomAddressPool::new( + account_ids.clone(), + account_manager.clone(), + ), + ); + // Use the same client instances for Consumer to share metrics let eth_providers: Vec = eth_clients .iter() @@ -346,7 +357,7 @@ async fn start_bench() -> Result<()> { ) .start(); let init_nonce_map = get_init_nonce_map( - accout_generator.clone(), + account_manager.clone(), benchmark_config.faucet.private_key.as_str(), eth_clients[0].clone(), ) @@ -356,7 +367,7 @@ async fn start_bench() -> Result<()> { address_pool.clone(), consumer, monitor, - accout_generator.clone(), + account_manager.clone(), ) .await .unwrap() @@ -371,29 +382,9 @@ async fn start_bench() -> Result<()> { ) .await?; - let tokens = contract_config.get_all_token(); - - for token in &tokens { - start_nonce += benchmark_config.faucet.faucet_level as u64; - info!("distributing token: {}", token.address); - let token_address = Address::from_str(&token.address).unwrap(); - let faucet_token_balance = U256::from_str(&token.faucet_balance).unwrap(); - info!("balance of token: {}", faucet_token_balance); - let token_faucet_builder = PlanBuilder::create_faucet_tree_plan_builder( - benchmark_config.faucet.faucet_level as usize, - faucet_token_balance, - &benchmark_config.faucet.private_key, - start_nonce, - account_addresses.clone(), - Arc::new(Erc20FaucetTxnBuilder::new(token_address)), - U256::ZERO, - accout_generator.clone(), - ) - .await - .unwrap(); - + for (token_plan, token) in tokens_plan.into_iter().zip(tokens.iter()) { execute_faucet_distribution( - token_faucet_builder, + token_plan, chain_id, &producer, &format!("Token {}", token.symbol), @@ -431,14 +422,13 @@ async fn start_bench() -> Result<()> { Ok(()) } -async fn init_nonce(accout_generator: Arc>, eth_client: Arc) { +async fn init_nonce(accout_generator: &mut AccountGenerator, eth_client: Arc) { tracing::info!("Initializing nonce..."); - let accout_generator = accout_generator.read().await; let tasks = accout_generator .accouts_nonce_iter() .map(|(account, nonce)| { let client = eth_client.clone(); - let addr = account.address(); + let addr = account.clone(); async move { let init_nonce = client.get_txn_count(addr).await; match init_nonce { @@ -462,7 +452,10 @@ static ALLOC: dhat::Alloc = dhat::Alloc; #[actix::main] async fn main() -> Result<()> { #[cfg(feature = "dhat-heap")] - let _profiler = dhat::Profiler::new_heap(); + let _profiler = { + println!("starting heap profiler..."); + dhat::Profiler::new_heap() + }; let res = async { start_bench().await }; let ctrl_c = async { tokio::signal::ctrl_c() diff --git a/src/txn_plan/addr_pool/managed_address_pool.rs b/src/txn_plan/addr_pool/managed_address_pool.rs index a28fb71..ce71530 100644 --- a/src/txn_plan/addr_pool/managed_address_pool.rs +++ b/src/txn_plan/addr_pool/managed_address_pool.rs @@ -1,54 +1,50 @@ -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; -use alloy::{primitives::Address, signers::local::PrivateKeySigner}; use parking_lot::Mutex; use super::AddressPool; +use crate::util::gen_account::{AccountId, AccountManager}; struct Inner { - account_signers: HashMap, Arc>, - account_status: HashMap, u32>, - ready_accounts: Vec<(Arc, Arc
, u32)>, - all_account_addresses: Vec>, + account_status: HashMap, + ready_accounts: Vec<(AccountId, u32)>, + all_account_ids: Vec, } pub struct RandomAddressPool { inner: Mutex, + #[allow(unused)] + account_generator: AccountManager, } impl RandomAddressPool { #[allow(unused)] - pub fn new(account_signers: Vec<(Arc
, Arc)>) -> Self { + pub fn new(account_ids: Vec, account_generator: AccountManager) -> Self { let mut account_status = HashMap::new(); let mut ready_accounts = Vec::new(); - let mut hashmap = HashMap::new(); - let all_account_addresses: Vec> = account_signers - .iter() - .map(|(address, _)| address.clone()) - .collect(); - for (addr, signer) in account_signers.iter() { + + for &account_id in account_ids.iter() { // assume all address start from nonce, this is correct beacause a nonce too low error will trigger correct nonce let nonce = 0; - hashmap.insert(addr.clone(), signer.clone()); - account_status.insert(addr.clone(), nonce); - ready_accounts.push((signer.clone(), addr.clone(), nonce)); + account_status.insert(account_id, nonce); + ready_accounts.push((account_id, nonce)); } let inner = Inner { - account_signers: hashmap, account_status, ready_accounts, - all_account_addresses, + all_account_ids: account_ids, }; Self { inner: Mutex::new(inner), + account_generator, } } } impl AddressPool for RandomAddressPool { - fn fetch_senders(&self, count: usize) -> Vec<(Arc, Arc
, u32)> { + fn fetch_senders(&self, count: usize) -> Vec<(AccountId, u32)> { let mut inner = self.inner.lock(); let len = inner.ready_accounts.len(); if count < len { @@ -63,51 +59,47 @@ impl AddressPool for RandomAddressPool { inner.ready_accounts.clear(); } - fn unlock_next_nonce(&self, account: Arc
) { + fn unlock_next_nonce(&self, account: AccountId) { let mut inner = self.inner.lock(); if let Some(status) = inner.account_status.get_mut(&account) { *status += 1; - let signer = inner.account_signers.get(&account).unwrap().clone(); let status = *inner.account_status.get(&account).unwrap(); - inner.ready_accounts.push((signer, account.clone(), status)); + inner.ready_accounts.push((account, status)); } } - fn unlock_correct_nonce(&self, account: Arc
, nonce: u32) { + fn unlock_correct_nonce(&self, account: AccountId, nonce: u32) { let mut inner = self.inner.lock(); if let Some(status) = inner.account_status.get_mut(&account) { *status = nonce; let status = *status; - let signer = inner.account_signers.get(&account).unwrap().clone(); - inner.ready_accounts.push((signer, account.clone(), status)); + inner.ready_accounts.push((account, status)); } } - fn retry_current_nonce(&self, account: Arc
) { + fn retry_current_nonce(&self, account: AccountId) { let mut inner = self.inner.lock(); if inner.account_status.get_mut(&account).is_some() { - let signer = inner.account_signers.get(&account).unwrap().clone(); let status = *inner.account_status.get(&account).unwrap(); - inner.ready_accounts.push((signer, account.clone(), status)); + inner.ready_accounts.push((account, status)); } } fn resume_all_accounts(&self) { let mut inner = self.inner.lock(); inner.ready_accounts = inner - .all_account_addresses + .all_account_ids .iter() - .map(|account| { - let signer = inner.account_signers.get(account).unwrap().clone(); - let status = *inner.account_status.get(account).unwrap(); - (signer, account.clone(), status) + .map(|&account_id| { + let status = *inner.account_status.get(&account_id).unwrap(); + (account_id, status) }) .collect(); } fn is_full_ready(&self) -> bool { let inner = self.inner.lock(); - inner.ready_accounts.len() == inner.all_account_addresses.len() + inner.ready_accounts.len() == inner.all_account_ids.len() } fn ready_len(&self) -> usize { @@ -115,16 +107,17 @@ impl AddressPool for RandomAddressPool { } fn len(&self) -> usize { - self.inner.lock().account_signers.len() + self.inner.lock().all_account_ids.len() } - fn select_receiver(&self, excluded: &Address) -> Address { + fn select_receiver(&self, excluded: AccountId) -> AccountId { let inner = self.inner.lock(); + loop { - let idx = rand::random::() % inner.all_account_addresses.len(); - let to_address = inner.all_account_addresses[idx].clone(); - if to_address.as_ref() != excluded { - return *to_address; + let idx = rand::random::() % inner.all_account_ids.len(); + let account_id = inner.all_account_ids[idx]; + if account_id != excluded { + return account_id; } } } diff --git a/src/txn_plan/addr_pool/mod.rs b/src/txn_plan/addr_pool/mod.rs index e5aed4c..115c3bb 100644 --- a/src/txn_plan/addr_pool/mod.rs +++ b/src/txn_plan/addr_pool/mod.rs @@ -1,24 +1,24 @@ -use std::sync::Arc; - -use alloy::{primitives::Address, signers::local::PrivateKeySigner}; +use crate::util::gen_account::AccountId; pub mod managed_address_pool; #[allow(unused)] pub mod weighted_address_pool; -pub trait AddressPool: Send + Sync { +#[async_trait::async_trait] +pub trait AddressPool: Send + Sync + 'static { /// Fetches a batch of ready sender accounts based on the internal sampling strategy. /// This operation should internally lock the accounts to prevent concurrent use. - fn fetch_senders(&self, count: usize) -> Vec<(Arc, Arc
, u32)>; + /// Returns (AccountId, nonce) pairs. + fn fetch_senders(&self, count: usize) -> Vec<(AccountId, u32)>; /// Unlocks an account after a successful transaction and increments its nonce. - fn unlock_next_nonce(&self, account: Arc
); + fn unlock_next_nonce(&self, account: AccountId); /// Unlocks an account and updates its nonce to a specific value. - fn unlock_correct_nonce(&self, account: Arc
, nonce: u32); + fn unlock_correct_nonce(&self, account: AccountId, nonce: u32); /// Makes an account available again for retry, using the same nonce. - fn retry_current_nonce(&self, account: Arc
); + fn retry_current_nonce(&self, account: AccountId); /// Resumes all accounts, making them available again. fn resume_all_accounts(&self); @@ -34,6 +34,7 @@ pub trait AddressPool: Send + Sync { /// Returns the total number of accounts in the pool. fn len(&self) -> usize; - /// Selects a receiver address based on the internal sampling strategy. - fn select_receiver(&self, excluded: &Address) -> Address; + /// Selects a receiver account ID based on the internal sampling strategy. + /// The excluded parameter is the account ID to exclude from selection. + fn select_receiver(&self, excluded: AccountId) -> AccountId; } diff --git a/src/txn_plan/addr_pool/weighted_address_pool.rs b/src/txn_plan/addr_pool/weighted_address_pool.rs index 01e4872..3211cf4 100644 --- a/src/txn_plan/addr_pool/weighted_address_pool.rs +++ b/src/txn_plan/addr_pool/weighted_address_pool.rs @@ -1,10 +1,12 @@ use std::{collections::HashMap, sync::Arc}; -use alloy::{primitives::Address, signers::local::PrivateKeySigner}; +use alloy::primitives::Address; use parking_lot::Mutex; use rand::seq::SliceRandom; +use tokio::sync::RwLock; use super::AddressPool; +use crate::util::gen_account::{AccountGenerator, AccountId, AccountManager}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] enum AccountCategory { @@ -15,29 +17,28 @@ enum AccountCategory { struct Inner { // Static data - account_signers: HashMap, Arc>, - account_categories: HashMap, AccountCategory>, - all_account_addresses: Vec>, + account_categories: HashMap, + all_account_ids: Vec, // Dynamic data - account_status: HashMap, u32>, - hot_ready_accounts: Vec<(Arc, Arc
, u32)>, - normal_ready_accounts: Vec<(Arc, Arc
, u32)>, - long_tail_ready_accounts: Vec<(Arc, Arc
, u32)>, + account_status: HashMap, + hot_ready_accounts: Vec<(AccountId, u32)>, + normal_ready_accounts: Vec<(AccountId, u32)>, + long_tail_ready_accounts: Vec<(AccountId, u32)>, } pub struct WeightedAddressPool { inner: Mutex, + account_generator: AccountManager, } impl WeightedAddressPool { - pub fn new(account_signers: HashMap, Arc>) -> Self { - let mut all_account_addresses: Vec> = - account_signers.keys().cloned().collect(); + pub fn new(account_ids: Vec, account_generator: AccountManager) -> Self { + let mut all_account_ids = account_ids; // Shuffle for random distribution - all_account_addresses.shuffle(&mut rand::thread_rng()); + all_account_ids.shuffle(&mut rand::thread_rng()); - let total_accounts = all_account_addresses.len(); + let total_accounts = all_account_ids.len(); let hot_count = (total_accounts as f64 * 0.2).round() as usize; let normal_count = (total_accounts as f64 * 0.1).round() as usize; @@ -46,16 +47,16 @@ impl WeightedAddressPool { let mut normal_accounts = Vec::with_capacity(normal_count); let mut long_tail_accounts = Vec::with_capacity(total_accounts - hot_count - normal_count); - for (i, addr) in all_account_addresses.iter().enumerate() { + for (i, &account_id) in all_account_ids.iter().enumerate() { if i < hot_count { - account_categories.insert(addr.clone(), AccountCategory::Hot); - hot_accounts.push(addr.clone()); + account_categories.insert(account_id, AccountCategory::Hot); + hot_accounts.push(account_id); } else if i < hot_count + normal_count { - account_categories.insert(addr.clone(), AccountCategory::Normal); - normal_accounts.push(addr.clone()); + account_categories.insert(account_id, AccountCategory::Normal); + normal_accounts.push(account_id); } else { - account_categories.insert(addr.clone(), AccountCategory::LongTail); - long_tail_accounts.push(addr.clone()); + account_categories.insert(account_id, AccountCategory::LongTail); + long_tail_accounts.push(account_id); } } @@ -64,11 +65,11 @@ impl WeightedAddressPool { let mut normal_ready_accounts = Vec::new(); let mut long_tail_ready_accounts = Vec::new(); - for (addr, signer) in &account_signers { + for &account_id in &all_account_ids { let nonce = 0; - account_status.insert(addr.clone(), nonce); - let ready_tuple = (signer.clone(), addr.clone(), nonce); - match account_categories.get(addr).unwrap() { + account_status.insert(account_id, nonce); + let ready_tuple = (account_id, nonce); + match account_categories.get(&account_id).unwrap() { AccountCategory::Hot => hot_ready_accounts.push(ready_tuple), AccountCategory::Normal => normal_ready_accounts.push(ready_tuple), AccountCategory::LongTail => long_tail_ready_accounts.push(ready_tuple), @@ -76,9 +77,8 @@ impl WeightedAddressPool { } let inner = Inner { - account_signers, account_categories, - all_account_addresses, + all_account_ids, account_status, hot_ready_accounts, normal_ready_accounts, @@ -87,10 +87,11 @@ impl WeightedAddressPool { Self { inner: Mutex::new(inner), + account_generator, } } - fn unlock_account(&self, account: Arc
, nonce: Option) { + fn unlock_account(&self, account: AccountId, nonce: Option) { let mut inner = self.inner.lock(); if let Some(current_nonce) = inner.account_status.get_mut(&account) { let new_nonce = match nonce { @@ -99,8 +100,7 @@ impl WeightedAddressPool { }; *current_nonce = new_nonce; - let signer = inner.account_signers.get(&account).unwrap().clone(); - let ready_tuple = (signer, account.clone(), new_nonce); + let ready_tuple = (account, new_nonce); match inner.account_categories.get(&account).unwrap() { AccountCategory::Hot => inner.hot_ready_accounts.push(ready_tuple), @@ -112,7 +112,7 @@ impl WeightedAddressPool { } impl AddressPool for WeightedAddressPool { - fn fetch_senders(&self, count: usize) -> Vec<(Arc, Arc
, u32)> { + fn fetch_senders(&self, count: usize) -> Vec<(AccountId, u32)> { let mut inner = self.inner.lock(); let mut result = Vec::with_capacity(count); @@ -154,32 +154,32 @@ impl AddressPool for WeightedAddressPool { } fn clean_ready_accounts(&self) { - self.inner.lock().hot_ready_accounts.clear(); - self.inner.lock().normal_ready_accounts.clear(); - self.inner.lock().long_tail_ready_accounts.clear(); + let mut inner = self.inner.lock(); + inner.hot_ready_accounts.clear(); + inner.normal_ready_accounts.clear(); + inner.long_tail_ready_accounts.clear(); } - fn unlock_next_nonce(&self, account: Arc
) { + fn unlock_next_nonce(&self, account: AccountId) { self.unlock_account(account, None); } - fn unlock_correct_nonce(&self, account: Arc
, nonce: u32) { + fn unlock_correct_nonce(&self, account: AccountId, nonce: u32) { self.unlock_account(account, Some(nonce)); } - fn retry_current_nonce(&self, account: Arc
) { + fn retry_current_nonce(&self, account: AccountId) { let mut inner = self.inner.lock(); let maybe_data = if let Some(nonce) = inner.account_status.get(&account) { - let signer = inner.account_signers.get(&account).unwrap().clone(); let category = *inner.account_categories.get(&account).unwrap(); - Some((*nonce, signer, category)) + Some((*nonce, category)) } else { None }; - if let Some((nonce, signer, category)) = maybe_data { - let ready_tuple = (signer, account.clone(), nonce); + if let Some((nonce, category)) = maybe_data { + let ready_tuple = (account, nonce); match category { AccountCategory::Hot => inner.hot_ready_accounts.push(ready_tuple), AccountCategory::Normal => inner.normal_ready_accounts.push(ready_tuple), @@ -197,17 +197,16 @@ impl AddressPool for WeightedAddressPool { let mut normal_ready_accounts = Vec::new(); let mut long_tail_ready_accounts = Vec::new(); - for account in &inner.all_account_addresses { - let maybe_data = if let Some(nonce) = inner.account_status.get(account) { - let signer = inner.account_signers.get(account).unwrap().clone(); - let category = *inner.account_categories.get(account).unwrap(); - Some((*nonce, signer, category)) + for &account_id in &inner.all_account_ids { + let maybe_data = if let Some(nonce) = inner.account_status.get(&account_id) { + let category = *inner.account_categories.get(&account_id).unwrap(); + Some((*nonce, category)) } else { None }; - if let Some((nonce, signer, category)) = maybe_data { - let ready_tuple = (signer, account.clone(), nonce); + if let Some((nonce, category)) = maybe_data { + let ready_tuple = (account_id, nonce); match category { AccountCategory::Hot => hot_ready_accounts.push(ready_tuple), AccountCategory::Normal => normal_ready_accounts.push(ready_tuple), @@ -226,7 +225,7 @@ impl AddressPool for WeightedAddressPool { let ready_count = inner.hot_ready_accounts.len() + inner.normal_ready_accounts.len() + inner.long_tail_ready_accounts.len(); - ready_count == inner.all_account_addresses.len() + ready_count == inner.all_account_ids.len() } fn ready_len(&self) -> usize { @@ -237,16 +236,17 @@ impl AddressPool for WeightedAddressPool { } fn len(&self) -> usize { - self.inner.lock().account_signers.len() + self.inner.lock().all_account_ids.len() } - fn select_receiver(&self, excluded: &Address) -> Address { + fn select_receiver(&self, excluded: AccountId) -> AccountId { let inner = self.inner.lock(); + loop { - let idx = rand::random::() % inner.all_account_addresses.len(); - let to_address = inner.all_account_addresses[idx].clone(); - if to_address.as_ref() != excluded { - return *to_address; + let idx = rand::random::() % inner.all_account_ids.len(); + let account_id = inner.all_account_ids[idx]; + if account_id != excluded { + return account_id; } } } diff --git a/src/txn_plan/constructor/approve.rs b/src/txn_plan/constructor/approve.rs index bff3b00..c972efa 100644 --- a/src/txn_plan/constructor/approve.rs +++ b/src/txn_plan/constructor/approve.rs @@ -1,14 +1,15 @@ -use std::sync::Arc; - use alloy::{ network::TransactionBuilder, primitives::{Address, Bytes, U256}, rpc::types::TransactionRequest, - signers::local::PrivateKeySigner, sol_types::SolCall, }; -use crate::{config::IERC20, txn_plan::traits::FromTxnConstructor}; +use crate::{ + config::IERC20, + txn_plan::traits::FromTxnConstructor, + util::gen_account::{AccountId, AccountManager}, +}; /// ERC20 approve constructor /// Approve tokens for multiple accounts to a specified spender (e.g. Uniswap Router) @@ -32,8 +33,8 @@ impl ApproveTokenConstructor { impl FromTxnConstructor for ApproveTokenConstructor { fn build_for_sender( &self, - from_account: &Arc
, - _from_signer: &Arc, + from_account_id: AccountId, + account_generator: AccountManager, nonce: u64, ) -> Result { let approve_call = IERC20::approveCall { @@ -43,10 +44,10 @@ impl FromTxnConstructor for ApproveTokenConstructor { let call_data = approve_call.abi_encode(); let call_data = Bytes::from(call_data); - + let from_address = account_generator.get_address_by_id(from_account_id); // create transaction request let tx_request = TransactionRequest::default() - .with_from(*from_account.as_ref()) + .with_from(from_address) .with_to(self.token_address) .with_input(call_data) .with_nonce(nonce) diff --git a/src/txn_plan/constructor/distribute_token.rs b/src/txn_plan/constructor/distribute_token.rs index 503c657..e891211 100644 --- a/src/txn_plan/constructor/distribute_token.rs +++ b/src/txn_plan/constructor/distribute_token.rs @@ -1,12 +1,13 @@ -use std::sync::Arc; - use alloy::{ primitives::{Address, U256}, rpc::types::TransactionRequest, - signers::local::PrivateKeySigner, }; -use crate::{eth::TxnBuilder, txn_plan::traits::FromTxnConstructor}; +use crate::{ + eth::TxnBuilder, + txn_plan::traits::FromTxnConstructor, + util::gen_account::{AccountId, AccountManager}, +}; /// Token distribute constructor /// Distribute tokens to accounts using ETH @@ -44,13 +45,13 @@ impl SwapEthToTokenConstructor { impl FromTxnConstructor for SwapEthToTokenConstructor { fn build_for_sender( &self, - _from_account: &Arc
, - from_signer: &Arc, + from_account_id: AccountId, + account_generator: AccountManager, nonce: u64, ) -> Result { // set transaction deadline (current time + 30 minutes) let deadline = U256::from(chrono::Utc::now().timestamp() + 1800); - + let from_address = account_generator.get_address_by_id(from_account_id); // build swap path: WETH -> Token let path = vec![self.weth_address, self.token_address]; @@ -59,7 +60,7 @@ impl FromTxnConstructor for SwapEthToTokenConstructor { self.router_address, self.amount_out_min, path, - from_signer.address(), + from_address, deadline, self.eth_amount_per_account, nonce, diff --git a/src/txn_plan/constructor/erc20_transfer.rs b/src/txn_plan/constructor/erc20_transfer.rs index 3415ce2..ae18f17 100644 --- a/src/txn_plan/constructor/erc20_transfer.rs +++ b/src/txn_plan/constructor/erc20_transfer.rs @@ -1,12 +1,12 @@ use crate::{ config::IERC20, txn_plan::{addr_pool::AddressPool, FromTxnConstructor}, + util::gen_account::{AccountId, AccountManager}, }; use alloy::{ network::TransactionBuilder, primitives::{Address, Bytes, U256}, rpc::types::TransactionRequest, - signers::local::PrivateKeySigner, sol_types::SolCall, }; use std::sync::Arc; @@ -39,13 +39,14 @@ impl Erc20TransferConstructor { impl FromTxnConstructor for Erc20TransferConstructor { fn build_for_sender( &self, - from_account: &Arc
, - from_signer: &Arc, + from_account_id: AccountId, + account_generator: AccountManager, nonce: u64, ) -> Result { // random select a receiver address, ensure not to self - let to_address = self.address_pool.select_receiver(from_account); - + let to_address = self.address_pool.select_receiver(from_account_id); + let to_address = account_generator.get_address_by_id(to_address); + let from_address = account_generator.get_address_by_id(from_account_id); // build ERC20 transfer call let transfer_call = IERC20::transferCall { to: to_address, @@ -58,7 +59,7 @@ impl FromTxnConstructor for Erc20TransferConstructor { let token_address = self.token_list[token_idx]; // create transaction request let tx_request = TransactionRequest::default() - .with_from(from_signer.address()) + .with_from(from_address) .with_to(token_address) .with_input(call_data) .with_nonce(nonce) diff --git a/src/txn_plan/constructor/faucet.rs b/src/txn_plan/constructor/faucet.rs index 8f6ea0d..b6f16ef 100644 --- a/src/txn_plan/constructor/faucet.rs +++ b/src/txn_plan/constructor/faucet.rs @@ -2,7 +2,7 @@ use crate::{ txn_plan::{ faucet_plan::LevelFaucetPlan, faucet_txn_builder::FaucetTxnBuilder, traits::TxnPlan, }, - util::gen_account::AccountGenerator, + util::gen_account::{AccountGenerator, AccountId}, }; use alloy::{ primitives::{Address, U256}, @@ -13,7 +13,6 @@ use std::{ marker::PhantomData, sync::{atomic::AtomicU64, Arc, Mutex}, }; -use tokio::sync::RwLock; use tracing::info; // Gas parameters must match the values used in the plan executor. @@ -23,8 +22,8 @@ static NONCE_MAP: std::sync::OnceLock> std::sync::OnceLock::new(); pub struct FaucetTreePlanBuilder { - faucet: Arc, - account_levels: Vec>>, + faucet_id: AccountId, + account_levels: Vec>, final_recipients: Arc>>, amount_per_recipient: U256, nonce_map: Arc>>>, @@ -44,7 +43,7 @@ impl FaucetTreePlanBuilder { final_recipients: Arc>>, txn_builder: Arc, remained_eth: U256, - account_generator: Arc>, + account_generator: &mut AccountGenerator, ) -> Self { let mut degree = faucet_level; let total_accounts = final_recipients.len(); @@ -116,17 +115,10 @@ impl FaucetTreePlanBuilder { let num_intermediate_levels = total_levels - 1; for level in 0..num_intermediate_levels { let num_accounts_at_level = degree.pow(level as u32 + 1); - let accounts = account_generator - .write() - .await + let account_ids = account_generator .gen_account(start_index as u64, num_accounts_at_level as u64) .unwrap(); - account_levels.push( - accounts - .iter() - .map(|(_, signer)| signer.clone()) - .collect::>(), - ); + account_levels.push(account_ids); start_index += num_accounts_at_level as usize; } } @@ -140,15 +132,16 @@ impl FaucetTreePlanBuilder { for level in &account_levels { for acc in level { + let address = account_generator.get_address_by_id(*acc); nonce_map - .entry(acc.address()) + .entry(address) .or_insert_with(|| Arc::new(AtomicU64::new(0))); } } } info!("FaucetTreePlanBuilder: balance={:?}, amount_per_recipient={:?}, intermediate_funding_amounts={:?}, accounts_levels={:?}, accounts_num={:?}", faucet_balance, amount_per_recipient, intermediate_funding_amounts, account_levels.len(), total_accounts); Self { - faucet: Arc::new(faucet), + faucet_id: account_generator.faucet_accout_id(), account_levels, final_recipients, amount_per_recipient, @@ -187,9 +180,9 @@ impl FaucetTreePlanBuilder { } } - fn get_senders_for_level(&self, level: usize) -> Vec> { + fn get_senders_for_level(&self, level: usize) -> Vec { if level == 0 { - vec![self.faucet.clone()] + vec![self.faucet_id] } else { self.account_levels[level - 1].clone() } diff --git a/src/txn_plan/constructor/swap_token_2_token.rs b/src/txn_plan/constructor/swap_token_2_token.rs index 1b0e147..8c4d5ff 100644 --- a/src/txn_plan/constructor/swap_token_2_token.rs +++ b/src/txn_plan/constructor/swap_token_2_token.rs @@ -1,12 +1,12 @@ use crate::{ config::{IUniswapV2Router, LiquidityPair}, txn_plan::{addr_pool::AddressPool, FromTxnConstructor}, + util::gen_account::{AccountId, AccountManager}, }; use alloy::{ network::TransactionBuilder, primitives::{Address, Bytes, U256}, rpc::types::TransactionRequest, - signers::local::PrivateKeySigner, sol_types::SolCall, }; use std::{str::FromStr, sync::Arc}; @@ -40,11 +40,13 @@ impl SwapTokenToTokenConstructor { impl FromTxnConstructor for SwapTokenToTokenConstructor { fn build_for_sender( &self, - from_account: &Arc
, - from_signer: &Arc, + from_account_id: AccountId, + account_generator: AccountManager, nonce: u64, ) -> Result { - let to_address = self.address_pool.select_receiver(from_account); + let to_address = self.address_pool.select_receiver(from_account_id); + let to_address = account_generator.get_address_by_id(to_address); + let from_address = account_generator.get_address_by_id(from_account_id); let token_idx = rand::random::() % self.token_list.len(); let from_token = Address::from_str(&self.token_list[token_idx].token_a_address).unwrap(); let to_token = Address::from_str(&self.token_list[token_idx].token_b_address).unwrap(); @@ -65,7 +67,7 @@ impl FromTxnConstructor for SwapTokenToTokenConstructor { let call_data = swap_call.abi_encode(); let call_data = Bytes::from(call_data); let tx_request = TransactionRequest::default() - .with_from(from_signer.address()) + .with_from(from_address) .with_to(self.router_address) .with_input(call_data) .with_nonce(nonce) diff --git a/src/txn_plan/faucet_plan.rs b/src/txn_plan/faucet_plan.rs index f9e9208..f6df74d 100644 --- a/src/txn_plan/faucet_plan.rs +++ b/src/txn_plan/faucet_plan.rs @@ -4,11 +4,11 @@ use crate::{ faucet_txn_builder::FaucetTxnBuilder, traits::{PlanExecutionMode, PlanId, SignedTxnWithMetadata, TxnMetadata, TxnPlan}, }, + util::gen_account::{AccountId, AccountManager}, }; use alloy::{ eips::Encodable2718, primitives::{Address, U256}, - signers::local::PrivateKeySigner, }; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use std::{ @@ -31,9 +31,9 @@ pub struct LevelFaucetPlan { execution_mode: PlanExecutionMode, chain_id: u64, level: usize, - senders: Vec>, + senders: Vec, final_recipients: Arc>>, - account_levels: Vec>>, + account_levels: Vec>, amount_per_recipient: U256, intermediate_funding_amounts: Vec, degree: usize, @@ -50,9 +50,9 @@ impl LevelFaucetPlan { chain_id: u64, level: usize, account_init_nonce: Arc>, - senders: Vec>, + senders: Vec, final_recipients: Arc>>, - account_levels: Vec>>, + account_levels: Vec>, amount_per_recipient: U256, intermediate_funding_amounts: Vec, degree: usize, @@ -106,7 +106,8 @@ impl TxnPlan for LevelFaucetPlan { fn build_txns( &mut self, - _ready_accounts: Vec<(Arc, Arc
, u32)>, + _ready_accounts: Vec<(crate::util::gen_account::AccountId, u32)>, + account_generator: AccountManager, ) -> Result { let plan_id = self.id.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); @@ -127,10 +128,10 @@ impl TxnPlan for LevelFaucetPlan { .chunks(1024) .enumerate() .for_each(|(chunk_index, chunk)| { - chunk - .into_par_iter() - .enumerate() - .for_each(|(sender_index, sender_signer)| { + chunk.into_par_iter().enumerate().for_each( + |(sender_index, sender_signer_id)| { + let sender_signer = + account_generator.get_signer_by_id(*sender_signer_id); let start_index = (chunk_index * 1024 + sender_index) * degree; let end_index = (start_index + degree).min(final_recipients.len()); if end_index < start_index { @@ -142,7 +143,8 @@ impl TxnPlan for LevelFaucetPlan { let val = amount_per_recipient; (to, val) } else { - let to = account_levels[level][i].address(); + let to_id = account_levels[level][i]; + let to = account_generator.get_address_by_id(to_id); let val = intermediate_funding_amounts[level]; (Arc::new(to), val) }; @@ -171,6 +173,7 @@ impl TxnPlan for LevelFaucetPlan { let metadata = Arc::new(TxnMetadata { from_account: Arc::new(sender_signer.address()), nonce, + from_account_id: *sender_signer_id, txn_id: Uuid::new_v4(), plan_id: plan_id.clone(), }); @@ -181,7 +184,8 @@ impl TxnPlan for LevelFaucetPlan { }) .unwrap(); } - }) + }, + ) }); drop(tx); }); diff --git a/src/txn_plan/plan.rs b/src/txn_plan/plan.rs index 42e3092..868b3c6 100644 --- a/src/txn_plan/plan.rs +++ b/src/txn_plan/plan.rs @@ -7,11 +7,9 @@ use crate::{ }, TxnIter, }, + util::gen_account::{AccountId, AccountManager}, }; -use alloy::{ - consensus::transaction::SignerRecoverable, eips::Encodable2718, primitives::Address, - signers::local::PrivateKeySigner, -}; +use alloy::eips::Encodable2718; use rayon::iter::IntoParallelIterator; use rayon::iter::ParallelIterator; use std::sync::Arc; @@ -74,11 +72,13 @@ impl TxnPlan for ManyToOnePlan { fn build_txns( &mut self, - ready_accounts: Vec<(Arc, Arc
, u32)>, + ready_accounts: Vec<(AccountId, u32)>, + account_generator: AccountManager, ) -> Result { let plan_id = self.id.clone(); let constructor = self.constructor.clone(); let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); + // 4. Create async stream, process in batches let handle = tokio::task::spawn_blocking(move || { ready_accounts @@ -86,18 +86,27 @@ impl TxnPlan for ManyToOnePlan { .map(|chunk| { chunk .into_par_iter() - .map(|(signer, address, nonce)| { + .map(|(from_account_id, nonce)| { + let address = account_generator.get_address_by_id(*from_account_id); + let signer = + account_generator.get_signer_by_id(*from_account_id).clone(); let tx_request = constructor - .build_for_sender(address, signer, *nonce as u64) + .build_for_sender( + *from_account_id, + account_generator.clone(), + *nonce as u64, + ) .unwrap(); let metadata = Arc::new(TxnMetadata { - from_account: address.clone(), + from_account: Arc::new(address), nonce: *nonce as u64, txn_id: Uuid::new_v4(), + from_account_id: *from_account_id, plan_id: plan_id.clone(), }); let tx_envelope = - TxnBuilder::build_and_sign_transaction(tx_request, signer).unwrap(); + TxnBuilder::build_and_sign_transaction(tx_request, &signer) + .unwrap(); SignedTxnWithMetadata { bytes: tx_envelope.encoded_2718(), metadata, @@ -172,28 +181,40 @@ impl TxnPlan for OneToManyPlan { fn build_txns( &mut self, - ready_accounts: Vec<(Arc, Arc
, u32)>, + ready_accounts: Vec<(AccountId, u32)>, + account_generator: AccountManager, ) -> Result { // 3. Parallelly build and sign transactions let plan_id = self.id.clone(); let constructor = self.constructor.clone(); let chain_id = self.chain_id; let (tx, rx) = crossbeam::channel::bounded(self.concurrency_limit); + + // Convert AccountId to addresses + let addresses = ready_accounts; + let handle = tokio::task::spawn_blocking(move || { - ready_accounts + addresses .chunks(1024) .map(|chunk| { chunk .into_par_iter() - .flat_map(|(_signer, address, _nonce)| { + .flat_map(|(to_account_id, _nonce)| { // Build transaction request - let txs = constructor.build_for_receiver(address, chain_id).unwrap(); + let txs = constructor + .build_for_receiver( + *to_account_id, + account_generator.clone(), + chain_id, + ) + .unwrap(); txs.into_iter() - .map(|tx_envelope| { + .map(|(from_account_id, tx_envelope)| { let metadata = Arc::new(TxnMetadata { from_account: Arc::new( - tx_envelope.recover_signer_unchecked().unwrap(), + account_generator.get_address_by_id(from_account_id), ), + from_account_id: from_account_id, nonce: 0, txn_id: Uuid::new_v4(), plan_id: plan_id.clone(), diff --git a/src/txn_plan/plan_builder.rs b/src/txn_plan/plan_builder.rs index 25c2f7d..a0e6f7c 100644 --- a/src/txn_plan/plan_builder.rs +++ b/src/txn_plan/plan_builder.rs @@ -4,7 +4,6 @@ use alloy::{ primitives::{Address, U256}, signers::local::PrivateKeySigner, }; -use tokio::sync::RwLock; use crate::{ config::LiquidityPair, @@ -86,7 +85,7 @@ impl PlanBuilder { total_accounts: Arc>>, txn_builder: Arc, remained_eth: U256, - account_generator: Arc>, + account_generator: &mut AccountGenerator, ) -> Result>, anyhow::Error> { let faucet_signer = PrivateKeySigner::from_str(faucet_private_key)?; let constructor = FaucetTreePlanBuilder::new( diff --git a/src/txn_plan/traits.rs b/src/txn_plan/traits.rs index 0449dfa..38fc8a9 100644 --- a/src/txn_plan/traits.rs +++ b/src/txn_plan/traits.rs @@ -4,9 +4,10 @@ use actix::Message; use alloy::consensus::TxEnvelope; use alloy::primitives::Address; use alloy::rpc::types::TransactionRequest; -use alloy::signers::local::PrivateKeySigner; use uuid::Uuid; +use crate::util::gen_account::{AccountId, AccountManager}; + #[derive(Debug, Clone, Message)] #[rtype(result = "anyhow::Result<()>")] pub struct SignedTxnWithMetadata { @@ -20,6 +21,7 @@ pub struct TxnMetadata { pub txn_id: Uuid, pub plan_id: PlanId, pub from_account: Arc
, + pub from_account_id: AccountId, pub nonce: u64, } @@ -50,8 +52,8 @@ pub trait FromTxnConstructor: Send + Sync + 'static { /// The `to` address is usually a field of the constructor itself (e.g., fixed spender or router address). fn build_for_sender( &self, - from_account: &Arc
, - from_signer: &Arc, + from_account_id: AccountId, + accout_generator: AccountManager, nonce: u64, ) -> Result; @@ -61,11 +63,13 @@ pub trait FromTxnConstructor: Send + Sync + 'static { pub trait ToTxnConstructor: Send + Sync + 'static { /// Build transaction based on receiver information. + /// return the from account id and the transaction envelope fn build_for_receiver( &self, - to: &Arc
, + to_account_id: AccountId, + account_generator: AccountManager, chain_id: u64, - ) -> Result, anyhow::Error>; + ) -> Result, anyhow::Error>; /// Provide transaction description. fn description(&self) -> &'static str; @@ -84,12 +88,12 @@ pub struct TxnIter { pub trait TxnPlan: Send + Sync { /// Builds and signs a vector of transactions based on the plan's logic. /// - /// This method should read from the `AccountManager` to get available accounts and their nonces, - /// but it should NOT mutate the manager. All state changes will be handled by the Producer - /// after the transactions are built. + /// This method receives ready account IDs with their nonces and uses the AccountGenerator + /// to retrieve the actual signers and addresses needed for transaction construction. fn build_txns( &mut self, - ready_accounts: Vec<(Arc, Arc
, u32)>, + ready_accounts: Vec<(AccountId, u32)>, + account_generator: AccountManager, ) -> Result; /// Returns the unique identifier for this plan instance. diff --git a/src/util/gen_account.rs b/src/util/gen_account.rs index 49e76b3..6a43b35 100644 --- a/src/util/gen_account.rs +++ b/src/util/gen_account.rs @@ -4,6 +4,7 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, }, + u32, }; use alloy::{ @@ -12,59 +13,135 @@ use alloy::{ }; use anyhow::{Context, Result}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use tokio::sync::RwLock; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct AccountId(u32); +const CACHE_SIZE: usize = 1024 * 1024; + +pub struct AccountSignerCache { + signers: Vec, + size: usize, +} + +impl AccountSignerCache { + pub(crate) fn new(size: usize) -> Self { + Self { + signers: Vec::with_capacity(size), + size, + } + } + + pub(crate) fn save_signer(&mut self, signer: PrivateKeySigner, account_id: AccountId) { + if account_id.0 as usize >= self.size { + return; + } + if account_id.0 as usize == self.signers.len() { + self.signers.push(signer); + } else { + self.signers[account_id.0 as usize] = signer; + } + } + + pub(crate) fn get_signer(&self, index: usize) -> PrivateKeySigner { + if index >= self.signers.len() { + return Self::compute_signer(AccountId(index as u32)); + } + self.signers[index].clone() + } + + fn compute_signer(id: AccountId) -> PrivateKeySigner { + let private_key_bytes = keccak256((id.0 as u64).to_le_bytes()); + PrivateKeySigner::from_slice(private_key_bytes.as_slice()) + .context("Failed to create deterministic signer") + .unwrap() + } +} + pub struct AccountGenerator { - accouts: Vec, - accout_to_id: HashMap, + accout_signers: AccountSignerCache, + accout_addresses: Vec
, + faucet_accout: PrivateKeySigner, + faucet_accout_id: AccountId, init_nonces: Vec>, } +pub type AccountManager = Arc; + impl AccountGenerator { - pub fn with_capacity(capacity: usize) -> Arc> { - Arc::new(RwLock::new(Self { - accouts: Vec::with_capacity(capacity), - accout_to_id: HashMap::with_capacity(capacity), - init_nonces: Vec::with_capacity(capacity), - })) + pub fn with_capacity(faucet_accout: PrivateKeySigner) -> Self { + Self { + accout_signers: AccountSignerCache::new(CACHE_SIZE), + accout_addresses: Vec::new(), + faucet_accout, + faucet_accout_id: AccountId(u32::MAX), + init_nonces: Vec::new(), + } + } + + pub fn to_manager(mut self) -> AccountManager { + self.accout_addresses.shrink_to_fit(); + self.init_nonces.shrink_to_fit(); + Arc::new(self) + } + + pub fn get_signer_by_id(&self, id: AccountId) -> PrivateKeySigner { + if id == self.faucet_accout_id { + self.faucet_accout.clone() + } else { + self.accout_signers.get_signer(id.0 as usize) + } + } + + pub fn faucet_accout_id(&self) -> AccountId { + self.faucet_accout_id + } + + pub fn get_address_by_id(&self, id: AccountId) -> Address { + if id == self.faucet_accout_id { + self.faucet_accout.address() + } else { + self.accout_addresses[id.0 as usize] + } } pub fn init_nonce_map(&self) -> HashMap { let mut map = HashMap::new(); for (account, nonce) in self.accouts_nonce_iter() { - map.insert(account.address(), nonce.load(Ordering::Relaxed)); + map.insert(account.clone(), nonce.load(Ordering::Relaxed)); } map } - pub fn accouts_nonce_iter(&self) -> impl Iterator)> { - self.accouts.iter().zip(self.init_nonces.iter().cloned()) + pub fn accouts_nonce_iter(&self) -> impl Iterator)> { + self.accout_addresses + .iter() + .zip(self.init_nonces.iter().cloned()) } - pub fn gen_account( - &mut self, - start_index: u64, - size: u64, - ) -> Result, Arc)>> { - let begin_index = self.accouts.len() as u64; + pub fn account_ids_with_nonce(&self) -> impl Iterator)> + '_ { + (0..self.accout_addresses.len()).map(|i| (AccountId(i as u32), self.init_nonces[i].clone())) + } + + pub fn gen_account(&mut self, start_index: u64, size: u64) -> Result> { + let begin_index = self.accout_addresses.len() as u64; let end_index = start_index + size; if begin_index < end_index { let res = self.gen_deterministic_accounts(begin_index, end_index); - self.accouts.extend(res); + self.accout_addresses.reserve_exact(res.len()); + self.init_nonces.reserve(res.len()); + self.accout_addresses + .extend(res.iter().map(|signer| signer.address())); + for (i, signer) in res.iter().enumerate() { + self.accout_signers + .save_signer(signer.clone(), AccountId(i as u32)); + } self.init_nonces .extend((0..size).map(|_| Arc::new(AtomicU64::new(0)))); - for i in begin_index..end_index { - self.accout_to_id - .insert(self.accouts[i as usize].address(), AccountId(i as u32)); - } } - let mut res = Vec::new(); + let mut res = Vec::with_capacity(size as usize); for i in 0..size { - let signer = self.accouts[(start_index + i) as usize].clone(); - res.push((Arc::new(signer.address()), Arc::new(signer))); + res.push(AccountId((start_index + i) as u32)); } Ok(res) }