From cc32b9143da4ba76972f2e302d310f1eee5febee Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 6 Dec 2023 12:18:11 +0800 Subject: [PATCH] add rbf cheap check and add specs for currency issues --- test/src/main.rs | 1 + test/src/node.rs | 10 ++- test/src/rpc.rs | 3 +- test/src/specs/tx_pool/replace.rs | 112 +++++++++++++++++++++--------- tx-pool/src/component/edges.rs | 7 +- tx-pool/src/pool.rs | 5 ++ tx-pool/src/process.rs | 26 ++++--- 7 files changed, 116 insertions(+), 48 deletions(-) diff --git a/test/src/main.rs b/test/src/main.rs index 59438635077..f8f6b166108 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -476,6 +476,7 @@ fn all_specs() -> Vec> { Box::new(RbfContainInvalidCells), Box::new(RbfRejectReplaceProposed), Box::new(RbfReplaceProposedSuccess), + Box::new(RbfConcurrency), Box::new(CompactBlockEmpty), Box::new(CompactBlockEmptyParentUnknown), Box::new(CompactBlockPrefilled), diff --git a/test/src/node.rs b/test/src/node.rs index 595ec6347b2..6b095b547bb 100644 --- a/test/src/node.rs +++ b/test/src/node.rs @@ -52,6 +52,7 @@ pub struct Node { consensus: Consensus, p2p_listen: String, rpc_client: RpcClient, + rpc_listen: String, node_id: Option, // initialize when starts node guard: Option, // initialize when starts node @@ -134,8 +135,8 @@ impl Node { }; let p2p_listen = app_config.network.listen_addresses[0].to_string(); - let rpc_address = app_config.rpc.listen_address; - let rpc_client = RpcClient::new(&format!("http://{rpc_address}/")); + let rpc_listen = format!("http://{}/", app_config.rpc.listen_address); + let rpc_client = RpcClient::new(&rpc_listen); let consensus = { // Ensure the data path is available because chain_spec.build_consensus() needs to access the // system-cell data. @@ -154,6 +155,7 @@ impl Node { consensus, p2p_listen, rpc_client, + rpc_listen, node_id: None, guard: None, } @@ -184,6 +186,10 @@ impl Node { self.p2p_listen.clone() } + pub fn rpc_listen(&self) -> String { + self.rpc_listen.clone() + } + pub fn p2p_address(&self) -> String { format!("{}/p2p/{}", self.p2p_listen(), self.node_id()) } diff --git a/test/src/rpc.rs b/test/src/rpc.rs index d0a11ef1f3e..2502f8ad76c 100644 --- a/test/src/rpc.rs +++ b/test/src/rpc.rs @@ -306,7 +306,8 @@ impl RpcClient { } } -jsonrpc!(pub struct Inner { +jsonrpc!( + pub struct Inner { pub fn get_block(&self, _hash: H256) -> Option; pub fn get_fork_block(&self, _hash: H256) -> Option; pub fn get_block_by_number(&self, _number: BlockNumber) -> Option; diff --git a/test/src/specs/tx_pool/replace.rs b/test/src/specs/tx_pool/replace.rs index 794a8a8102d..b9a5fc263bb 100644 --- a/test/src/specs/tx_pool/replace.rs +++ b/test/src/specs/tx_pool/replace.rs @@ -1,4 +1,9 @@ -use crate::{utils::wait_until, Node, Spec}; +use crate::{ + rpc::RpcClient, + util::{cell::gen_spendable, transaction::always_success_transactions}, + utils::wait_until, + Node, Spec, +}; use ckb_jsonrpc_types::Status; use ckb_logger::info; use ckb_types::{ @@ -441,40 +446,23 @@ impl Spec for RbfContainInvalidCells { node0.mine_until_out_bootstrap_period(); - // build txs chain - let tx0 = node0.new_transaction_spend_tip_cellbase(); - let mut txs = vec![tx0]; - let max_count = 5; - while txs.len() <= max_count { - let parent = txs.last().unwrap(); - let child = parent - .as_advanced_builder() - .set_inputs(vec![{ - CellInput::new_builder() - .previous_output(OutPoint::new(parent.hash(), 0)) - .build() - }]) - .set_outputs(vec![parent.output(0).unwrap()]) - .build(); - txs.push(child); - } - assert_eq!(txs.len(), max_count + 1); - // send Tx chain - for tx in txs[..=max_count - 1].iter() { + let cells = gen_spendable(node0, 3); + let txs = always_success_transactions(node0, &cells); + for tx in txs.iter() { let ret = node0.rpc_client().send_transaction_result(tx.data().into()); assert!(ret.is_ok()); } let clone_tx = txs[2].clone(); + + let cell = CellDep::new_builder() + .out_point(OutPoint::new(txs[1].hash(), 0)) + .build(); + // Set tx2 fee to a higher value let output2 = CellOutputBuilder::default() .capacity(capacity_bytes!(70).pack()) .build(); - - // build a cell from conflicts txs's output - let cell = CellDep::new_builder() - .out_point(OutPoint::new(txs[2].hash(), 0)) - .build(); let tx2 = clone_tx .as_advanced_builder() .set_inputs(vec![{ @@ -490,11 +478,6 @@ impl Spec for RbfContainInvalidCells { .rpc_client() .send_transaction_result(tx2.data().into()); assert!(res.is_err(), "tx2 should be rejected"); - assert!(res - .err() - .unwrap() - .to_string() - .contains("new Tx contains cell deps from conflicts")); } fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { @@ -687,7 +670,7 @@ impl Spec for RbfReplaceProposedSuccess { let tx2_status = node0.rpc_client().get_transaction(tx2.hash()).tx_status; assert_eq!(tx2_status.status, Status::Pending); - // submit a black block + // submit a blank block let example = node0.new_block(None, None, None); let blank_block = example .as_advanced_builder() @@ -730,3 +713,68 @@ impl Spec for RbfReplaceProposedSuccess { config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); } } + +pub struct RbfConcurrency; +impl Spec for RbfConcurrency { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + node0.mine_until_out_bootstrap_period(); + node0.new_block_with_blocking(|template| template.number.value() != 13); + let tx_hash_0 = node0.generate_transaction(); + info!("Generate 4 txs with same input"); + let tx1 = node0.new_transaction(tx_hash_0.clone()); + + let mut conflicts = vec![tx1]; + // tx1 capacity is 100, set other txs to higer fee + let fees = vec![ + capacity_bytes!(83), + capacity_bytes!(82), + capacity_bytes!(81), + capacity_bytes!(80), + ]; + for fee in fees.iter() { + let tx2_temp = node0.new_transaction(tx_hash_0.clone()); + let output = CellOutputBuilder::default().capacity(fee.pack()).build(); + + let tx2 = tx2_temp + .as_advanced_builder() + .set_outputs(vec![output]) + .build(); + conflicts.push(tx2); + } + + // make 5 threads to set_transaction concurrently + let mut handles = vec![]; + for tx in &conflicts { + let cur_tx = tx.clone(); + let rpc_address = node0.rpc_listen(); + let handle = std::thread::spawn(move || { + let rpc_client = RpcClient::new(&rpc_address); + let _ = rpc_client.send_transaction_result(cur_tx.data().into()); + }); + handles.push(handle); + } + for handle in handles { + let _ = handle.join(); + } + + let status: Vec<_> = conflicts + .iter() + .map(|tx| { + let res = node0.rpc_client().get_transaction(tx.hash()); + res.tx_status.status + }) + .collect(); + + // the last tx should be in Pending(with the highest fee), others should be in Rejected + assert_eq!(status[4], Status::Pending); + for s in status.iter().take(4) { + assert_eq!(*s, Status::Rejected); + } + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} diff --git a/tx-pool/src/component/edges.rs b/tx-pool/src/component/edges.rs index 3ef0c3ddb33..87d4e3cc52c 100644 --- a/tx-pool/src/component/edges.rs +++ b/tx-pool/src/component/edges.rs @@ -1,3 +1,4 @@ +use ckb_logger::error; use ckb_types::packed::{Byte32, OutPoint, ProposalShortId}; use std::collections::{hash_map::Entry, HashMap, HashSet}; @@ -28,9 +29,11 @@ impl Edges { } pub(crate) fn insert_input(&mut self, out_point: OutPoint, txid: ProposalShortId) { - let res = self.inputs.insert(out_point, txid); // inputs is occupied means double speanding happened here - assert!(res.is_none()); + if self.inputs.contains_key(&out_point) { + error!("double spending happened {:?} {:?}", out_point, txid); + } + self.inputs.insert(out_point, txid); } pub(crate) fn remove_input(&mut self, out_point: &OutPoint) -> Option { diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index ce898f88ec2..dce9aca6edd 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -524,6 +524,11 @@ impl TxPool { (entries, size, cycles) } + pub(crate) fn find_conflict_tx(&self, txv: &TransactionView) -> HashSet { + let tx_inputs: Vec = txv.input_pts_iter().collect(); + self.pool_map.find_conflict_tx(&tx_inputs) + } + pub(crate) fn check_rbf( &self, snapshot: &Snapshot, diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 1f92689cad3..1d32c98f452 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -105,16 +105,15 @@ impl TxPoolService { ) -> (Result<(), Reject>, Arc) { let (ret, snapshot) = self .with_tx_pool_write_lock(move |tx_pool, snapshot| { - // the invoking of check_rbf in pre_check only holds read lock - // so here we double confirm RBF rules before insert entry - let mut conflicts = HashSet::new(); - if tx_pool.enable_rbf() { - conflicts = - tx_pool.check_rbf(&snapshot, entry.transaction(), entry.fee, entry.size)?; - } + // here we double confirm RBF rules before insert entry + // check_rbf must be invoked in `write` lock to avoid concurrent issues. + let conflicts = if tx_pool.enable_rbf() { + tx_pool.check_rbf(&snapshot, entry.transaction(), entry.fee, entry.size)? + } else { + HashSet::new() + }; // if snapshot changed by context switch we need redo time_relative verify - // if snapshot changed by context switch let tip_hash = snapshot.tip_hash(); if pre_resolve_tip != tip_hash { debug!( @@ -245,12 +244,17 @@ impl TxPoolService { if tx_pool.enable_rbf() && matches!(err, Reject::Resolve(OutPointError::Dead(_))) { - // Try RBF check let (rtx, status) = resolve_tx(tx_pool, &snapshot, tx.clone(), true)?; let fee = check_tx_fee(tx_pool, &snapshot, &rtx, tx_size)?; - let conflicts = - tx_pool.check_rbf(&snapshot, &rtx.transaction, fee, tx_size)?; + // Try an RBF cheap check, here if the tx is resolved as Dead, + // we assume there must be conflicted happened in txpool now, + // if there is no conflicted transactions reject it + let conflicts = tx_pool.find_conflict_tx(&rtx.transaction); if conflicts.is_empty() { + error!( + "{} is resolved as Dead, but there is no conflicted tx", + rtx.transaction.proposal_short_id() + ); return Err(err); } Ok((tip_hash, rtx, status, fee, tx_size))