Skip to content

Commit

Permalink
feat(ckb-bench): properly handle big input
Browse files Browse the repository at this point in the history
  • Loading branch information
keroro520 committed Aug 4, 2021
1 parent 7823224 commit 2f3b0fe
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 105 deletions.
60 changes: 34 additions & 26 deletions ckb-bench/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
mod bench;
mod prepare;
mod stat;
mod utils;
mod watcher;

#[cfg(test)]
mod tests;

use crate::bench::{LiveCellProducer, TransactionProducer};
use crate::prepare::{collect, dispatch, generate_privkeys};
use crate::utils::maybe_retry_send_transaction;
use crate::watcher::Watcher;
use ckb_crypto::secp::Privkey;
use ckb_testkit::{Node, Nodes, User};
Expand Down Expand Up @@ -123,9 +125,13 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) {
})
.collect::<Vec<_>>();
let n_users = value_t_or_exit!(arguments, "n-users", usize);
let cells_per_user = value_t_or_exit!(arguments, "cells-per-user", u64);
let capacity_per_cell = value_t_or_exit!(arguments, "capacity-per-cell", u64);
let owner_raw_privkey = env::var("CKB_BENCH_OWNER_PRIVKEY").unwrap_or_else(|err| {
prompt_and_exit!("cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}", err)
prompt_and_exit!(
"cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}",
err
)
});
let genesis_block = nodes[0].get_block_by_number(0);
let owner = {
Expand Down Expand Up @@ -154,7 +160,7 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) {
};
wait_for_nodes_sync(&nodes);
wait_for_indexer_synced(&nodes);
dispatch(&nodes, &owner, &users, capacity_per_cell);
dispatch(&nodes, &owner, &users, cells_per_user, capacity_per_cell);
}
("collect", Some(arguments)) => {
let data_dir = value_t_or_exit!(arguments, "data-dir", PathBuf);
Expand All @@ -177,7 +183,10 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) {
.collect::<Vec<_>>();
let n_users = value_t_or_exit!(arguments, "n-users", usize);
let owner_raw_privkey = env::var("CKB_BENCH_OWNER_PRIVKEY").unwrap_or_else(|err| {
prompt_and_exit!("cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}", err)
prompt_and_exit!(
"cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}",
err
)
});
let genesis_block = nodes[0].get_block_by_number(0);
let owner = {
Expand Down Expand Up @@ -238,7 +247,10 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) {
Duration::from_millis(bench_time_ms)
};
let owner_raw_privkey = env::var("CKB_BENCH_OWNER_PRIVKEY").unwrap_or_else(|err| {
prompt_and_exit!("cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}", err)
prompt_and_exit!(
"cannot find \"CKB_BENCH_OWNER_PRIVKEY\" from environment variables, error: {}",
err
)
});
let genesis_block = nodes[0].get_block_by_number(0);
let users = {
Expand Down Expand Up @@ -309,31 +321,18 @@ pub fn entrypoint(clap_arg_match: ArgMatches<'static>) {

loop {
i = (i + 1) % nodes.len();
let result = nodes[i]
.rpc_client()
.send_transaction_result(tx.data().into());
match result {
Err(err) => {
if err.to_string().contains("PoolIsFull") {
sleep(Duration::from_millis(10));
continue;
} else if err
.to_string()
.contains("PoolRejectedDuplicatedTransaction")
{
break;
} else {
ckb_testkit::error!(
"failed to send {:#x}, error: {:?}",
tx.hash(),
err
);
break;
}
}
match maybe_retry_send_transaction(&nodes[i], &tx) {
Ok(hash) => {
ckb_testkit::debug!("sent transaction {:#x}", hash);
benched_transactions += 1;
break;
}
Err(err) => {
ckb_testkit::error!(
"failed to send tx {:#x}, error: {}",
tx.hash(),
err
);
}
}
}
Expand Down Expand Up @@ -512,6 +511,15 @@ fn clap_app() -> App<'static, 'static> {
.help("Number of users")
.validator(|s| s.parse::<u64>().map(|_| ()).map_err(|err| err.to_string())),
)
.arg(
Arg::with_name("cells-per-user")
.long("cells-per-user")
.value_name("NUMBER")
.takes_value(true)
.required(true)
.help("Cells per user")
.validator(|s| s.parse::<u64>().map(|_| ()).map_err(|err| err.to_string())),
)
.arg(
Arg::with_name("capacity-per-cell")
.long("capacity-per-cell")
Expand Down
192 changes: 113 additions & 79 deletions ckb-bench/src/prepare.rs
Original file line number Diff line number Diff line change
@@ -1,100 +1,139 @@
use crate::utils::maybe_retry_send_transaction;
use ckb_crypto::secp::Privkey;
use ckb_testkit::{Node, User};
use ckb_types::core::cell::CellMeta;
use ckb_types::packed::OutPoint;
use ckb_types::{
core::{Capacity, TransactionBuilder},
packed::{Byte32, CellInput, CellOutput},
prelude::*,
};
use std::cmp::min;
use std::thread::sleep;
use std::time::Duration;
use std::collections::VecDeque;

/// count of two-in-two-out txs a block should capable to package.
const TWO_IN_TWO_OUT_COUNT: u64 = 1_000;
const MAX_OUT_COUNT: u64 = TWO_IN_TWO_OUT_COUNT;
const FEE_RATE_OF_OUTPUT: u64 = 1000;

// TODO handle big cell
pub fn dispatch(nodes: &[Node], owner: &User, users: &[User], capacity_per_cell: u64) {
pub fn dispatch(
nodes: &[Node],
owner: &User,
users: &[User],
cells_per_user: u64,
capacity_per_cell: u64,
) {
ckb_testkit::info!(
"dispatch to {} users, {} capacity per user",
"dispatch to {} users, {} cells per user, {} capacity per cell",
users.len(),
cells_per_user,
capacity_per_cell
);
let live_cells = owner.get_spendable_single_secp256k1_cells(&nodes[0]);
let mut i_user = 0;

let mut live_cells: VecDeque<CellMeta> = owner
.get_spendable_single_secp256k1_cells(&nodes[0])
.into_iter()
.collect();

{
let total_capacity: u64 = live_cells.iter().map(|cell| cell.capacity().as_u64()).sum();
let total_fee = users.len() as u64 * cells_per_user * FEE_RATE_OF_OUTPUT;
let need_capacity = users.len() as u64 * cells_per_user * capacity_per_cell + total_fee;
assert!(
total_capacity > need_capacity,
"insufficient capacity, owner's total_capacity({}) <= {} = n_users({}) * cells_per_user({}) * capacity_per_cell({}) + total_fee({})",
total_capacity,
need_capacity,
users.len(),
cells_per_user,
capacity_per_cell,
total_fee,
);
}

let total_outs = users.len() * cells_per_user as usize;
let index_user = |out_i: usize| out_i % (cells_per_user as usize);

let mut i_out = 0usize;
let mut txs = Vec::new();
for chunk in live_cells.chunks(1) {
let inputs = chunk;
let inputs_capacity: u64 = inputs.iter().map(|cell| cell.capacity().as_u64()).sum();
while let Some(input) = live_cells.pop_front() {
let input_capacity = input.capacity().as_u64();
// TODO estimate tx fee
let fee = (inputs_capacity / capacity_per_cell) * 1000;
let outputs_capacity = inputs_capacity - fee;
let n_outputs = if (outputs_capacity / capacity_per_cell) as usize > users.len() - i_user {
min(1500, users.len() - i_user)
} else {
min(1500, (outputs_capacity / capacity_per_cell) as usize)
};
let change_capacity = inputs_capacity - n_outputs as u64 * capacity_per_cell - fee;
let mut outputs = users[i_user..i_user + n_outputs]
.iter()
.map(|user| {
CellOutput::new_builder()
.capacity(capacity_per_cell.pack())
.lock(user.single_secp256k1_lock_script())
.build()
})
.collect::<Vec<_>>();
let fee = MAX_OUT_COUNT * FEE_RATE_OF_OUTPUT;
let outputs_capacity = input_capacity - fee;
let mut n_outs = min(MAX_OUT_COUNT, outputs_capacity / capacity_per_cell) as usize;
if i_out + n_outs >= total_outs {
n_outs = total_outs - i_out;
}
let change_capacity = outputs_capacity - n_outs as u64 * capacity_per_cell;

let mut outputs = Vec::with_capacity(n_outs as usize + 1);
if change_capacity >= Capacity::bytes(67).unwrap().as_u64() {
let change_output = CellOutput::new_builder()
.capacity(change_capacity.pack())
.lock(owner.single_secp256k1_lock_script())
.build();
outputs.push(change_output);
}
let outputs_data = (0..outputs.len())
.map(|_| Default::default())
.collect::<Vec<_>>();
let unsigned_tx = TransactionBuilder::default()
.inputs(
inputs
.iter()
.map(|cell| CellInput::new(cell.out_point.clone(), 0)),
)
.outputs(outputs)
.outputs_data(outputs_data)
.cell_dep(owner.single_secp256k1_cell_dep())
.build();
let witness = owner
.single_secp256k1_signed_witness(&unsigned_tx)
.as_bytes()
.pack();
let signed_tx = unsigned_tx
.as_advanced_builder()
.set_witnesses(vec![witness])
.build();
for i in i_out..i_out + n_outs {
let user = &users[index_user(i)];
let cell_output = CellOutput::new_builder()
.capacity(capacity_per_cell.pack())
.lock(user.single_secp256k1_lock_script())
.build();
outputs.push(cell_output);
}

let signed_tx = {
let unsigned_tx = TransactionBuilder::default()
.input(CellInput::new(input.out_point.clone(), 0))
.outputs_data(
(0..outputs.len())
.map(|_| Default::default())
.collect::<Vec<_>>(),
)
.outputs(outputs)
.cell_dep(owner.single_secp256k1_cell_dep())
.build();
let witness = owner
.single_secp256k1_signed_witness(&unsigned_tx)
.as_bytes()
.pack();
unsigned_tx
.as_advanced_builder()
.set_witnesses(vec![witness])
.build()
};

txs.push(signed_tx);
i_user += n_outputs;
if i_user >= users.len() {
txs.push(signed_tx.clone());
i_out += n_outs;
if i_out == total_outs {
break;
}
if signed_tx.outputs().len() > n_outs {
// the 1st output is a change cell, push it back into live_cells as it is a live cell
let change_live_cell = {
let cell_output = signed_tx.output(0).expect("1st output exists");
let out_point = OutPoint::new(signed_tx.hash(), 0);
CellMeta {
cell_output,
out_point,
..Default::default()
}
};
live_cells.push_back(change_live_cell);
}
}

let total_capacity: u64 = live_cells.iter().map(|cell| cell.capacity().as_u64()).sum();
assert!(
i_user >= users.len(),
"owner has not enough capacity for users, total_capacity: {}, rest {} users",
total_capacity,
users.len().saturating_sub(i_user),
);
assert!(i_out == total_outs);
for tx in txs {
while let Err(err) = nodes[0]
.rpc_client()
.send_transaction_result(tx.data().into())
{
ckb_testkit::debug!(
"failed to send transaction {:#x}, error: {}",
tx.hash(),
err
);
sleep(Duration::from_secs(1));
}
let result = maybe_retry_send_transaction(&nodes[0], &tx);
assert!(
result.is_ok(),
"dispatch-transaction should be ok but got {}",
result.unwrap_err()
);
}
}

Expand Down Expand Up @@ -138,17 +177,12 @@ pub fn collect(nodes: &[Node], owner: &User, users: &[User]) {
}

for tx in txs {
while let Err(err) = nodes[0]
.rpc_client()
.send_transaction_result(tx.data().into())
{
ckb_testkit::debug!(
"failed to send transaction {:#x}, error: {}",
tx.hash(),
err
);
sleep(Duration::from_secs(1));
}
let result = maybe_retry_send_transaction(&nodes[0], &tx);
assert!(
result.is_ok(),
"collect-transaction should be ok but got {}",
result.unwrap_err()
);
}
}

Expand Down
32 changes: 32 additions & 0 deletions ckb-bench/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use ckb_testkit::Node;
use ckb_types::core::TransactionView;
use ckb_types::packed::Byte32;
use std::thread::sleep;
use std::time::{Duration, Instant};

pub fn maybe_retry_send_transaction(node: &Node, tx: &TransactionView) -> Result<Byte32, String> {
let mut last_logging_time = Instant::now();
loop {
let result = node.rpc_client().send_transaction_result(tx.data().into());
match result {
Ok(hash) => return Ok(hash),
Err(err) => {
let raw_err = err.to_string();
if raw_err.contains("PoolIsFull") {
sleep(Duration::from_millis(10));
if last_logging_time.elapsed() >= Duration::from_secs(5) {
last_logging_time = Instant::now();
ckb_testkit::debug!(
"retry to send tx {:#x} as the pool is full",
tx.hash()
);
}
} else if raw_err.contains("PoolRejectedDuplicatedTransaction") {
return Ok(tx.hash());
} else {
return Err(raw_err);
}
}
}
}
}

0 comments on commit 2f3b0fe

Please sign in to comment.