Skip to content

Commit

Permalink
add rbf cheap check and add specs for currency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Dec 6, 2023
1 parent ba249e6 commit 3e520a5
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 48 deletions.
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(RbfContainInvalidCells),
Box::new(RbfRejectReplaceProposed),
Box::new(RbfReplaceProposedSuccess),
Box::new(RbfConcurrency),
Box::new(CompactBlockEmpty),
Box::new(CompactBlockEmptyParentUnknown),
Box::new(CompactBlockPrefilled),
Expand Down
10 changes: 8 additions & 2 deletions test/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct Node {
consensus: Consensus,
p2p_listen: String,
rpc_client: RpcClient,
rpc_listen: String,

node_id: Option<String>, // initialize when starts node
guard: Option<ProcessGuard>, // initialize when starts node
Expand Down Expand Up @@ -134,8 +135,8 @@ impl Node {
};

let p2p_listen = app_config.network.listen_addresses[0].to_string();
let rpc_address = app_config.rpc.listen_address;
let rpc_client = RpcClient::new(&format!("http://{rpc_address}/"));
let rpc_listen = format!("http://{}/", app_config.rpc.listen_address);
let rpc_client = RpcClient::new(&rpc_listen);
let consensus = {
// Ensure the data path is available because chain_spec.build_consensus() needs to access the
// system-cell data.
Expand All @@ -154,6 +155,7 @@ impl Node {
consensus,
p2p_listen,
rpc_client,
rpc_listen,
node_id: None,
guard: None,
}
Expand Down Expand Up @@ -184,6 +186,10 @@ impl Node {
self.p2p_listen.clone()
}

pub fn rpc_listen(&self) -> String {
self.rpc_listen.clone()
}

pub fn p2p_address(&self) -> String {
format!("{}/p2p/{}", self.p2p_listen(), self.node_id())
}
Expand Down
3 changes: 2 additions & 1 deletion test/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ impl RpcClient {
}
}

jsonrpc!(pub struct Inner {
jsonrpc!(
pub struct Inner {
pub fn get_block(&self, _hash: H256) -> Option<BlockView>;
pub fn get_fork_block(&self, _hash: H256) -> Option<BlockView>;
pub fn get_block_by_number(&self, _number: BlockNumber) -> Option<BlockView>;
Expand Down
112 changes: 80 additions & 32 deletions test/src/specs/tx_pool/replace.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::{utils::wait_until, Node, Spec};
use crate::{
rpc::RpcClient,
util::{cell::gen_spendable, transaction::always_success_transactions},
utils::wait_until,
Node, Spec,
};
use ckb_jsonrpc_types::Status;
use ckb_logger::info;
use ckb_types::{
Expand Down Expand Up @@ -441,40 +446,23 @@ impl Spec for RbfContainInvalidCells {

node0.mine_until_out_bootstrap_period();

// build txs chain
let tx0 = node0.new_transaction_spend_tip_cellbase();
let mut txs = vec![tx0];
let max_count = 5;
while txs.len() <= max_count {
let parent = txs.last().unwrap();
let child = parent
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
.previous_output(OutPoint::new(parent.hash(), 0))
.build()
}])
.set_outputs(vec![parent.output(0).unwrap()])
.build();
txs.push(child);
}
assert_eq!(txs.len(), max_count + 1);
// send Tx chain
for tx in txs[..=max_count - 1].iter() {
let cells = gen_spendable(node0, 3);
let txs = always_success_transactions(node0, &cells);
for tx in txs.iter() {
let ret = node0.rpc_client().send_transaction_result(tx.data().into());
assert!(ret.is_ok());
}

let clone_tx = txs[2].clone();

let cell = CellDep::new_builder()
.out_point(OutPoint::new(txs[1].hash(), 0))
.build();

// Set tx2 fee to a higher value
let output2 = CellOutputBuilder::default()
.capacity(capacity_bytes!(70).pack())
.build();

// build a cell from conflicts txs's output
let cell = CellDep::new_builder()
.out_point(OutPoint::new(txs[2].hash(), 0))
.build();
let tx2 = clone_tx
.as_advanced_builder()
.set_inputs(vec![{
Expand All @@ -490,11 +478,6 @@ impl Spec for RbfContainInvalidCells {
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_err(), "tx2 should be rejected");
assert!(res
.err()
.unwrap()
.to_string()
.contains("new Tx contains cell deps from conflicts"));
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -687,7 +670,7 @@ impl Spec for RbfReplaceProposedSuccess {
let tx2_status = node0.rpc_client().get_transaction(tx2.hash()).tx_status;
assert_eq!(tx2_status.status, Status::Pending);

// submit a black block
// submit a blank block
let example = node0.new_block(None, None, None);
let blank_block = example
.as_advanced_builder()
Expand Down Expand Up @@ -730,3 +713,68 @@ impl Spec for RbfReplaceProposedSuccess {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}

pub struct RbfConcurrency;
impl Spec for RbfConcurrency {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];

node0.mine_until_out_bootstrap_period();
node0.new_block_with_blocking(|template| template.number.value() != 13);
let tx_hash_0 = node0.generate_transaction();
info!("Generate 4 txs with same input");
let tx1 = node0.new_transaction(tx_hash_0.clone());

let mut conflicts = vec![tx1];
// tx1 capacity is 100, set other txs to higer fee
let fees = vec![
capacity_bytes!(83),
capacity_bytes!(82),
capacity_bytes!(81),
capacity_bytes!(80),
];
for fee in fees.iter() {
let tx2_temp = node0.new_transaction(tx_hash_0.clone());
let output = CellOutputBuilder::default().capacity(fee.pack()).build();

let tx2 = tx2_temp
.as_advanced_builder()
.set_outputs(vec![output])
.build();
conflicts.push(tx2);
}

// make 5 threads to set_transaction concurrently
let mut handles = vec![];
for tx in &conflicts {
let cur_tx = tx.clone();
let rpc_address = node0.rpc_listen();
let handle = std::thread::spawn(move || {
let rpc_client = RpcClient::new(&rpc_address);
let _ = rpc_client.send_transaction_result(cur_tx.data().into());
});
handles.push(handle);
}
for handle in handles {
let _ = handle.join();
}

let status: Vec<_> = conflicts
.iter()
.map(|tx| {
let res = node0.rpc_client().get_transaction(tx.hash());
res.tx_status.status
})
.collect();

// the last tx should be in Pending(with the highest fee), others should be in Rejected
assert_eq!(status[4], Status::Pending);
for s in status.iter().take(4) {
assert_eq!(*s, Status::Rejected);
}
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}
6 changes: 4 additions & 2 deletions tx-pool/src/component/edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ impl Edges {
}

pub(crate) fn insert_input(&mut self, out_point: OutPoint, txid: ProposalShortId) {
let res = self.inputs.insert(out_point, txid);
// inputs is occupied means double speanding happened here
assert!(res.is_none());
if self.inputs.contains_key(&out_point) {
panic!("double spending happened {:?} {:?}", out_point, txid);
}
self.inputs.insert(out_point, txid);
}

pub(crate) fn remove_input(&mut self, out_point: &OutPoint) -> Option<ProposalShortId> {
Expand Down
5 changes: 5 additions & 0 deletions tx-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ impl TxPool {
(entries, size, cycles)
}

pub(crate) fn find_conflict_tx(&self, txv: &TransactionView) -> HashSet<ProposalShortId> {
let tx_inputs: Vec<OutPoint> = txv.input_pts_iter().collect();
self.pool_map.find_conflict_tx(&tx_inputs)
}

pub(crate) fn check_rbf(
&self,
snapshot: &Snapshot,
Expand Down
26 changes: 15 additions & 11 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,15 @@ impl TxPoolService {
) -> (Result<(), Reject>, Arc<Snapshot>) {
let (ret, snapshot) = self
.with_tx_pool_write_lock(move |tx_pool, snapshot| {
// the invoking of check_rbf in pre_check only holds read lock
// so here we double confirm RBF rules before insert entry
let mut conflicts = HashSet::new();
if tx_pool.enable_rbf() {
conflicts =
tx_pool.check_rbf(&snapshot, entry.transaction(), entry.fee, entry.size)?;
}
// here we double confirm RBF rules before insert entry
// check_rbf must be invoked in `write` lock to avoid concurrent issues.
let conflicts = if tx_pool.enable_rbf() {
tx_pool.check_rbf(&snapshot, entry.transaction(), entry.fee, entry.size)?
} else {
HashSet::new()
};

// if snapshot changed by context switch we need redo time_relative verify
// if snapshot changed by context switch
let tip_hash = snapshot.tip_hash();
if pre_resolve_tip != tip_hash {
debug!(
Expand Down Expand Up @@ -245,12 +244,17 @@ impl TxPoolService {
if tx_pool.enable_rbf()
&& matches!(err, Reject::Resolve(OutPointError::Dead(_)))
{
// Try RBF check
let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?;
let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?;
let conflicts =
tx_pool.check_rbf(&snapshot, &rtx.transaction, fee, tx_size)?;
// Try an RBF cheap check, here if the tx is resolved as Dead,
// we assume there must be conflicted happened in txpool now,
// if there is no conflicted transactions reject it
let conflicts = tx_pool.find_conflict_tx(&rtx.transaction);
if conflicts.is_empty() {
error!(
"{} is resolved as Dead, but there is no conflicted tx",
rtx.transaction.proposal_short_id()
);
return Err(err);
}
Ok((tip_hash, rtx, status, fee, tx_size))
Expand Down

0 comments on commit 3e520a5

Please sign in to comment.