From 2f3b0fe0c13eaf67ae2e055d34e91f684e464706 Mon Sep 17 00:00:00 2001 From: keroro Date: Thu, 5 Aug 2021 00:39:33 +0800 Subject: [PATCH] feat(ckb-bench): properly handle big input --- ckb-bench/src/main.rs | 60 ++++++------ ckb-bench/src/prepare.rs | 192 +++++++++++++++++++++++---------------- ckb-bench/src/utils.rs | 32 +++++++ 3 files changed, 179 insertions(+), 105 deletions(-) create mode 100644 ckb-bench/src/utils.rs diff --git a/ckb-bench/src/main.rs b/ckb-bench/src/main.rs index eaa32d1..415a402 100644 --- a/ckb-bench/src/main.rs +++ b/ckb-bench/src/main.rs @@ -1,6 +1,7 @@ mod bench; mod prepare; mod stat; +mod utils; mod watcher; #[cfg(test)] @@ -8,6 +9,7 @@ mod tests; use crate::bench::{LiveCellProducer, TransactionProducer}; use crate::prepare::{collect, dispatch, generate_privkeys}; +use crate::utils::maybe_retry_send_transaction; use crate::watcher::Watcher; use ckb_crypto::secp::Privkey; use ckb_testkit::{Node, Nodes, User}; @@ -123,9 +125,13 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) { }) .collect::>(); let n_users = value_t_or_exit!(arguments, "n-users", usize); + let cells_per_user = value_t_or_exit!(arguments, "cells-per-user", u64); let capacity_per_cell = value_t_or_exit!(arguments, "capacity-per-cell", u64); let owner_raw_privkey = env::var("CKB_BENCH_OWNER_PRIVKEY").unwrap_or_else(|err| { - prompt_and_exit!("cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}", err) + prompt_and_exit!( + "cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}", + err + ) }); let genesis_block = nodes[0].get_block_by_number(0); let owner = { @@ -154,7 +160,7 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) { }; wait_for_nodes_sync(&nodes); wait_for_indexer_synced(&nodes); - dispatch(&nodes, &owner, &users, capacity_per_cell); + dispatch(&nodes, &owner, &users, cells_per_user, capacity_per_cell); } ("collect", Some(arguments)) => { let data_dir = value_t_or_exit!(arguments, "data-dir", PathBuf); @@ -177,7 +183,10 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) { .collect::>(); let n_users = value_t_or_exit!(arguments, "n-users", usize); let owner_raw_privkey = env::var("CKB_BENCH_OWNER_PRIVKEY").unwrap_or_else(|err| { - prompt_and_exit!("cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}", err) + prompt_and_exit!( + "cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}", + err + ) }); let genesis_block = nodes[0].get_block_by_number(0); let owner = { @@ -238,7 +247,10 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) { Duration::from_millis(bench_time_ms) }; let owner_raw_privkey = env::var("CKB_BENCH_OWNER_PRIVKEY").unwrap_or_else(|err| { - prompt_and_exit!("cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}", err) + prompt_and_exit!( + "cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}", + err + ) }); let genesis_block = nodes[0].get_block_by_number(0); let users = { @@ -309,31 +321,18 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) { loop { i = (i + 1) % nodes.len(); - let result = nodes[i] - .rpc_client() - .send_transaction_result(tx.data().into()); - match result { - Err(err) => { - if err.to_string().contains("PoolIsFull") { - sleep(Duration::from_millis(10)); - continue; - } else if err - .to_string() - .contains("PoolRejectedDuplicatedTransaction") - { - break; - } else { - ckb_testkit::error!( - "failed to send {:#x}, error: {:?}", - tx.hash(), - err - ); - break; - } - } + match maybe_retry_send_transaction(&nodes[i], &tx) { Ok(hash) => { ckb_testkit::debug!("sent transaction {:#x}", hash); benched_transactions += 1; + break; + } + Err(err) => { + ckb_testkit::error!( + "failed to send tx {:#x}, error: {}", + tx.hash(), + err + ); } } } @@ -512,6 +511,15 @@ fn clap_app() -> App<'static, 'static> { .help("Number of users") .validator(|s| s.parse::().map(|_| ()).map_err(|err| err.to_string())), ) + .arg( + Arg::with_name("cells-per-user") + .long("cells-per-user") + .value_name("NUMBER") + .takes_value(true) + .required(true) + .help("Cells per user") + .validator(|s| s.parse::().map(|_| ()).map_err(|err| err.to_string())), + ) .arg( Arg::with_name("capacity-per-cell") .long("capacity-per-cell") diff --git a/ckb-bench/src/prepare.rs b/ckb-bench/src/prepare.rs index e564447..cf903ad 100644 --- a/ckb-bench/src/prepare.rs +++ b/ckb-bench/src/prepare.rs @@ -1,45 +1,74 @@ +use crate::utils::maybe_retry_send_transaction; use ckb_crypto::secp::Privkey; use ckb_testkit::{Node, User}; +use ckb_types::core::cell::CellMeta; +use ckb_types::packed::OutPoint; use ckb_types::{ core::{Capacity, TransactionBuilder}, packed::{Byte32, CellInput, CellOutput}, prelude::*, }; use std::cmp::min; -use std::thread::sleep; -use std::time::Duration; +use std::collections::VecDeque; + +/// count of two-in-two-out txs a block should capable to package. +const TWO_IN_TWO_OUT_COUNT: u64 = 1_000; +const MAX_OUT_COUNT: u64 = TWO_IN_TWO_OUT_COUNT; +const FEE_RATE_OF_OUTPUT: u64 = 1000; // TODO handle big cell -pub fn dispatch(nodes: &[Node], owner: &User, users: &[User], capacity_per_cell: u64) { +pub fn dispatch( + nodes: &[Node], + owner: &User, + users: &[User], + cells_per_user: u64, + capacity_per_cell: u64, +) { ckb_testkit::info!( - "dispatch to {} users, {} capacity per user", + "dispatch to {} users, {} cells per user, {} capacity per cell", users.len(), + cells_per_user, capacity_per_cell ); - let live_cells = owner.get_spendable_single_secp256k1_cells(&nodes[0]); - let mut i_user = 0; + + let mut live_cells: VecDeque = owner + .get_spendable_single_secp256k1_cells(&nodes[0]) + .into_iter() + .collect(); + + { + let total_capacity: u64 = live_cells.iter().map(|cell| cell.capacity().as_u64()).sum(); + let total_fee = users.len() as u64 * cells_per_user * FEE_RATE_OF_OUTPUT; + let need_capacity = users.len() as u64 * cells_per_user * capacity_per_cell + total_fee; + assert!( + total_capacity > need_capacity, + "insufficient capacity, owner's total_capacity({}) <= {} = n_users({}) * cells_per_user({}) * capacity_per_cell({}) + total_fee({})", + total_capacity, + need_capacity, + users.len(), + cells_per_user, + capacity_per_cell, + total_fee, + ); + } + + let total_outs = users.len() * cells_per_user as usize; + let index_user = |out_i: usize| out_i % (cells_per_user as usize); + + let mut i_out = 0usize; let mut txs = Vec::new(); - for chunk in live_cells.chunks(1) { - let inputs = chunk; - let inputs_capacity: u64 = inputs.iter().map(|cell| cell.capacity().as_u64()).sum(); + while let Some(input) = live_cells.pop_front() { + let input_capacity = input.capacity().as_u64(); // TODO estimate tx fee - let fee = (inputs_capacity / capacity_per_cell) * 1000; - let outputs_capacity = inputs_capacity - fee; - let n_outputs = if (outputs_capacity / capacity_per_cell) as usize > users.len() - i_user { - min(1500, users.len() - i_user) - } else { - min(1500, (outputs_capacity / capacity_per_cell) as usize) - }; - let change_capacity = inputs_capacity - n_outputs as u64 * capacity_per_cell - fee; - let mut outputs = users[i_user..i_user + n_outputs] - .iter() - .map(|user| { - CellOutput::new_builder() - .capacity(capacity_per_cell.pack()) - .lock(user.single_secp256k1_lock_script()) - .build() - }) - .collect::>(); + let fee = MAX_OUT_COUNT * FEE_RATE_OF_OUTPUT; + let outputs_capacity = input_capacity - fee; + let mut n_outs = min(MAX_OUT_COUNT, outputs_capacity / capacity_per_cell) as usize; + if i_out + n_outs >= total_outs { + n_outs = total_outs - i_out; + } + let change_capacity = outputs_capacity - n_outs as u64 * capacity_per_cell; + + let mut outputs = Vec::with_capacity(n_outs as usize + 1); if change_capacity >= Capacity::bytes(67).unwrap().as_u64() { let change_output = CellOutput::new_builder() .capacity(change_capacity.pack()) @@ -47,54 +76,64 @@ pub fn dispatch(nodes: &[Node], owner: &User, users: &[User], capacity_per_cell: .build(); outputs.push(change_output); } - let outputs_data = (0..outputs.len()) - .map(|_| Default::default()) - .collect::>(); - let unsigned_tx = TransactionBuilder::default() - .inputs( - inputs - .iter() - .map(|cell| CellInput::new(cell.out_point.clone(), 0)), - ) - .outputs(outputs) - .outputs_data(outputs_data) - .cell_dep(owner.single_secp256k1_cell_dep()) - .build(); - let witness = owner - .single_secp256k1_signed_witness(&unsigned_tx) - .as_bytes() - .pack(); - let signed_tx = unsigned_tx - .as_advanced_builder() - .set_witnesses(vec![witness]) - .build(); + for i in i_out..i_out + n_outs { + let user = &users[index_user(i)]; + let cell_output = CellOutput::new_builder() + .capacity(capacity_per_cell.pack()) + .lock(user.single_secp256k1_lock_script()) + .build(); + outputs.push(cell_output); + } + + let signed_tx = { + let unsigned_tx = TransactionBuilder::default() + .input(CellInput::new(input.out_point.clone(), 0)) + .outputs_data( + (0..outputs.len()) + .map(|_| Default::default()) + .collect::>(), + ) + .outputs(outputs) + .cell_dep(owner.single_secp256k1_cell_dep()) + .build(); + let witness = owner + .single_secp256k1_signed_witness(&unsigned_tx) + .as_bytes() + .pack(); + unsigned_tx + .as_advanced_builder() + .set_witnesses(vec![witness]) + .build() + }; - txs.push(signed_tx); - i_user += n_outputs; - if i_user >= users.len() { + txs.push(signed_tx.clone()); + i_out += n_outs; + if i_out == total_outs { break; } + if signed_tx.outputs().len() > n_outs { + // the 1st output is a change cell, push it back into live_cells as it is a live cell + let change_live_cell = { + let cell_output = signed_tx.output(0).expect("1st output exists"); + let out_point = OutPoint::new(signed_tx.hash(), 0); + CellMeta { + cell_output, + out_point, + ..Default::default() + } + }; + live_cells.push_back(change_live_cell); + } } - let total_capacity: u64 = live_cells.iter().map(|cell| cell.capacity().as_u64()).sum(); - assert!( - i_user >= users.len(), - "owner has not enough capacity for users, total_capacity: {}, rest {} users", - total_capacity, - users.len().saturating_sub(i_user), - ); + assert!(i_out == total_outs); for tx in txs { - while let Err(err) = nodes[0] - .rpc_client() - .send_transaction_result(tx.data().into()) - { - ckb_testkit::debug!( - "failed to send transaction {:#x}, error: {}", - tx.hash(), - err - ); - sleep(Duration::from_secs(1)); - } + let result = maybe_retry_send_transaction(&nodes[0], &tx); + assert!( + result.is_ok(), + "dispatch-transaction should be ok but got {}", + result.unwrap_err() + ); } } @@ -138,17 +177,12 @@ pub fn collect(nodes: &[Node], owner: &User, users: &[User]) { } for tx in txs { - while let Err(err) = nodes[0] - .rpc_client() - .send_transaction_result(tx.data().into()) - { - ckb_testkit::debug!( - "failed to send transaction {:#x}, error: {}", - tx.hash(), - err - ); - sleep(Duration::from_secs(1)); - } + let result = maybe_retry_send_transaction(&nodes[0], &tx); + assert!( + result.is_ok(), + "collect-transaction should be ok but got {}", + result.unwrap_err() + ); } } diff --git a/ckb-bench/src/utils.rs b/ckb-bench/src/utils.rs new file mode 100644 index 0000000..41d540b --- /dev/null +++ b/ckb-bench/src/utils.rs @@ -0,0 +1,32 @@ +use ckb_testkit::Node; +use ckb_types::core::TransactionView; +use ckb_types::packed::Byte32; +use std::thread::sleep; +use std::time::{Duration, Instant}; + +pub fn maybe_retry_send_transaction(node: &Node, tx: &TransactionView) -> Result { + let mut last_logging_time = Instant::now(); + loop { + let result = node.rpc_client().send_transaction_result(tx.data().into()); + match result { + Ok(hash) => return Ok(hash), + Err(err) => { + let raw_err = err.to_string(); + if raw_err.contains("PoolIsFull") { + sleep(Duration::from_millis(10)); + if last_logging_time.elapsed() >= Duration::from_secs(5) { + last_logging_time = Instant::now(); + ckb_testkit::debug!( + "retry to send tx {:#x} as the pool is full", + tx.hash() + ); + } + } else if raw_err.contains("PoolRejectedDuplicatedTransaction") { + return Ok(tx.hash()); + } else { + return Err(raw_err); + } + } + } + } +}