Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Move txpool statistics from callbacks to PoolMap #4276

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion chain/src/tests/block_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,14 @@ fn test_package_multi_best_scores() {
TxEntry::dummy_resolve(tx2_3.clone(), 0, Capacity::shannons(150), 100),
TxEntry::dummy_resolve(tx2_4.clone(), 0, Capacity::shannons(150), 100),
TxEntry::dummy_resolve(tx3_1.clone(), 0, Capacity::shannons(1000), 1000),
TxEntry::dummy_resolve(tx4_1.clone(), 0, Capacity::shannons(300), 250),
TxEntry::dummy_resolve(tx4_1.clone(), 100, Capacity::shannons(300), 250),
];
tx_pool.plug_entry(entries, PlugTarget::Proposed).unwrap();

let tx_pool_info = tx_pool.get_tx_pool_info().unwrap();
assert_eq!(tx_pool_info.total_tx_size, 2400);
assert_eq!(tx_pool_info.total_tx_cycles, 100);

// 250 size best scored txs
let txs = tx_pool.package_txs(Some(250)).unwrap();
check_txs(&txs, vec![&tx1, &tx2, &tx3], "250 size best scored txs");
Expand Down
27 changes: 5 additions & 22 deletions shared/src/shared_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,39 +381,22 @@ fn register_tx_pool_callback(tx_pool_builder: &mut TxPoolServiceBuilder, notify:
fee: entry.fee,
timestamp: entry.timestamp,
};
tx_pool_builder.register_pending(Box::new(move |tx_pool: &mut TxPool, entry: &TxEntry| {
// update statics
tx_pool.update_statics_for_add_tx(entry.size, entry.cycles);

tx_pool_builder.register_pending(Box::new(move |entry: &TxEntry| {
// notify
let notify_tx_entry = create_notify_entry(entry);
notify_pending.notify_new_transaction(notify_tx_entry);
}));

let notify_proposed = notify.clone();
tx_pool_builder.register_proposed(Box::new(
move |tx_pool: &mut TxPool, entry: &TxEntry, new: bool| {
// update statics
if new {
tx_pool.update_statics_for_add_tx(entry.size, entry.cycles);
}

// notify
let notify_tx_entry = create_notify_entry(entry);
notify_proposed.notify_proposed_transaction(notify_tx_entry);
},
));

tx_pool_builder.register_committed(Box::new(move |tx_pool: &mut TxPool, entry: &TxEntry| {
tx_pool.update_statics_for_remove_tx(entry.size, entry.cycles);
tx_pool_builder.register_proposed(Box::new(move |entry: &TxEntry| {
// notify
let notify_tx_entry = create_notify_entry(entry);
notify_proposed.notify_proposed_transaction(notify_tx_entry);
}));

let notify_reject = notify;
tx_pool_builder.register_reject(Box::new(
move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
// update statics
tx_pool.update_statics_for_remove_tx(entry.size, entry.cycles);

let tx_hash = entry.transaction().hash();
// record recent reject
if matches!(reject, Reject::Resolve(..) | Reject::RBFRejected(..)) {
Expand Down
30 changes: 8 additions & 22 deletions tx-pool/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ use crate::error::Reject;
use crate::pool::TxPool;

/// Callback boxed fn pointer wrapper
pub type Callback = Box<dyn Fn(&mut TxPool, &TxEntry) + Sync + Send>;
pub type PendingCallback = Box<dyn Fn(&TxEntry) + Sync + Send>;
/// Proposed Callback boxed fn pointer wrapper
pub type ProposedCallback = Box<dyn Fn(&mut TxPool, &TxEntry, bool) + Sync + Send>;
pub type ProposedCallback = Box<dyn Fn(&TxEntry) + Sync + Send>;
/// Reject Callback boxed fn pointer wrapper
pub type RejectCallback = Box<dyn Fn(&mut TxPool, &TxEntry, Reject) + Sync + Send>;

/// Struct hold callbacks
pub struct Callbacks {
pub(crate) pending: Option<Callback>,
pub(crate) pending: Option<PendingCallback>,
pub(crate) proposed: Option<ProposedCallback>,
pub(crate) committed: Option<Callback>,
pub(crate) reject: Option<RejectCallback>,
}

Expand All @@ -29,13 +28,12 @@ impl Callbacks {
Callbacks {
pending: None,
proposed: None,
committed: None,
reject: None,
}
}

/// Register a new pending callback
pub fn register_pending(&mut self, callback: Callback) {
pub fn register_pending(&mut self, callback: PendingCallback) {
self.pending = Some(callback);
}

Expand All @@ -44,34 +42,22 @@ impl Callbacks {
self.proposed = Some(callback);
}

/// Register a new committed callback
pub fn register_committed(&mut self, callback: Callback) {
self.committed = Some(callback);
}

/// Register a new abandon callback
pub fn register_reject(&mut self, callback: RejectCallback) {
self.reject = Some(callback);
}

/// Call on after pending
pub fn call_pending(&self, tx_pool: &mut TxPool, entry: &TxEntry) {
pub fn call_pending(&self, entry: &TxEntry) {
if let Some(call) = &self.pending {
call(tx_pool, entry)
call(entry)
}
}

/// Call on after proposed
pub fn call_proposed(&self, tx_pool: &mut TxPool, entry: &TxEntry, new: bool) {
pub fn call_proposed(&self, entry: &TxEntry) {
if let Some(call) = &self.proposed {
call(tx_pool, entry, new)
}
}

/// Call on after proposed
pub fn call_committed(&self, tx_pool: &mut TxPool, entry: &TxEntry) {
if let Some(call) = &self.committed {
call(tx_pool, entry)
call(entry)
}
}

Expand Down
57 changes: 54 additions & 3 deletions tx-pool/src/component/pool_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::component::links::{Relation, TxLinksMap};
use crate::component::sort_key::{AncestorsScoreSortKey, EvictKey};
use crate::error::Reject;
use crate::TxEntry;
use ckb_logger::{debug, trace};
use ckb_logger::{debug, error, trace};
use ckb_types::core::error::OutPointError;
use ckb_types::core::Cycle;
use ckb_types::packed::OutPoint;
use ckb_types::prelude::*;
use ckb_types::{
Expand Down Expand Up @@ -66,6 +67,10 @@ pub struct PoolMap {
/// All the parent/children relationships
pub(crate) links: TxLinksMap,
pub(crate) max_ancestors_count: usize,
// sum of all tx_pool tx's virtual sizes.
pub(crate) total_tx_size: usize,
// sum of all tx_pool tx's cycles.
pub(crate) total_tx_cycles: Cycle,
}

impl PoolMap {
Expand All @@ -75,6 +80,8 @@ impl PoolMap {
edges: Edges::default(),
links: TxLinksMap::new(),
max_ancestors_count,
total_tx_size: 0,
total_tx_cycles: 0,
}
}

Expand Down Expand Up @@ -193,6 +200,7 @@ impl PoolMap {
self.insert_entry(&entry, status);
self.record_entry_descendants(&entry);
self.track_entry_statics();
self.update_stat_for_add_tx(entry.size, entry.cycles);
Ok(true)
}

Expand All @@ -217,6 +225,7 @@ impl PoolMap {
self.update_descendants_index_key(&entry.inner, EntryOp::Remove);
self.remove_entry_edges(&entry.inner);
self.remove_entry_links(id);
self.update_stat_for_remove_tx(entry.inner.size, entry.inner.cycles);
entry.inner
})
}
Expand Down Expand Up @@ -332,6 +341,8 @@ impl PoolMap {
self.entries = MultiIndexPoolEntryMap::default();
self.edges.clear();
self.links.clear();
self.total_tx_size = 0;
self.total_tx_cycles = 0;
}

pub(crate) fn score_sorted_iter_by(
Expand Down Expand Up @@ -497,8 +508,7 @@ impl PoolMap {
}

fn remove_entry_edges(&mut self, entry: &TxEntry) {
let inputs = entry.transaction().input_pts_iter();
for i in inputs {
for i in entry.transaction().input_pts_iter() {
// release input record
self.edges.remove_input(&i);
}
Expand Down Expand Up @@ -539,4 +549,45 @@ impl PoolMap {
.set(self.proposed_size() as i64);
}
}

/// Update size and cycles statistics for add tx
fn update_stat_for_add_tx(&mut self, tx_size: usize, cycles: Cycle) {
let total_tx_size = self.total_tx_size.checked_add(tx_size).unwrap_or_else(|| {
error!(
"total_tx_size {} overflown by add {}",
self.total_tx_size, tx_size
);
self.total_tx_size
});
let total_tx_cycles = self.total_tx_cycles.checked_add(cycles).unwrap_or_else(|| {
error!(
"total_tx_cycles {} overflown by add {}",
self.total_tx_cycles, cycles
);
self.total_tx_cycles
});
self.total_tx_size = total_tx_size;
self.total_tx_cycles = total_tx_cycles;
}

/// Update size and cycles statistics for remove tx
/// cycles overflow is possible, currently obtaining cycles is not accurate
fn update_stat_for_remove_tx(&mut self, tx_size: usize, cycles: Cycle) {
let total_tx_size = self.total_tx_size.checked_sub(tx_size).unwrap_or_else(|| {
error!(
"total_tx_size {} overflown by sub {}",
self.total_tx_size, tx_size
);
0
});
let total_tx_cycles = self.total_tx_cycles.checked_sub(cycles).unwrap_or_else(|| {
error!(
"total_tx_cycles {} overflown by sub {}",
self.total_tx_cycles, cycles
);
0
});
self.total_tx_size = total_tx_size;
self.total_tx_cycles = total_tx_cycles;
}
}
60 changes: 5 additions & 55 deletions tx-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ pub struct TxPool {
pub(crate) pool_map: PoolMap,
/// cache for committed transactions hash
pub(crate) committed_txs_hash_cache: LruCache<ProposalShortId, Byte32>,
// sum of all tx_pool tx's virtual sizes.
pub(crate) total_tx_size: usize,
// sum of all tx_pool tx's cycles.
pub(crate) total_tx_cycles: Cycle,
/// storage snapshot reference
pub(crate) snapshot: Arc<Snapshot>,
/// record recent reject
Expand All @@ -55,8 +51,6 @@ impl TxPool {
TxPool {
pool_map: PoolMap::new(config.max_ancestors_count),
committed_txs_hash_cache: LruCache::new(COMMITTED_HASH_CACHE_SIZE),
total_tx_size: 0,
total_tx_cycles: 0,
config,
snapshot,
recent_reject,
Expand All @@ -83,12 +77,6 @@ impl TxPool {
self.get_by_status(status).len()
}

/// Update size and cycles statics for add tx
pub fn update_statics_for_add_tx(&mut self, tx_size: usize, cycles: Cycle) {
self.total_tx_size += tx_size;
self.total_tx_cycles += cycles;
}

/// Check whether tx-pool enable RBF
pub fn enable_rbf(&self) -> bool {
self.config.min_rbf_rate > self.config.min_fee_rate
Expand Down Expand Up @@ -127,27 +115,6 @@ impl TxPool {
}
}

/// Update size and cycles statics for remove tx
/// cycles overflow is possible, currently obtaining cycles is not accurate
pub fn update_statics_for_remove_tx(&mut self, tx_size: usize, cycles: Cycle) {
let total_tx_size = self.total_tx_size.checked_sub(tx_size).unwrap_or_else(|| {
error!(
"total_tx_size {} overflown by sub {}",
self.total_tx_size, tx_size
);
0
});
let total_tx_cycles = self.total_tx_cycles.checked_sub(cycles).unwrap_or_else(|| {
error!(
"total_tx_cycles {} overflown by sub {}",
self.total_tx_cycles, cycles
);
0
});
self.total_tx_size = total_tx_size;
self.total_tx_cycles = total_tx_cycles;
}

/// Add tx with pending status
/// If did have this value present, false is returned.
pub(crate) fn add_pending(&mut self, entry: TxEntry) -> Result<bool, Reject> {
Expand Down Expand Up @@ -229,13 +196,11 @@ impl TxPool {

fn remove_committed_tx(&mut self, tx: &TransactionView, callbacks: &Callbacks) {
let short_id = tx.proposal_short_id();
if let Some(entry) = self.pool_map.remove_entry(&short_id) {
if let Some(_entry) = self.pool_map.remove_entry(&short_id) {
debug!("remove_committed_tx for {}", tx.hash());
callbacks.call_committed(self, &entry)
}
{
let conflicts = self.pool_map.resolve_conflict(tx);
for (entry, reject) in conflicts {
for (entry, reject) in self.pool_map.resolve_conflict(tx) {
debug!(
"removed {} for commited: {}",
entry.transaction().hash(),
Expand Down Expand Up @@ -267,7 +232,7 @@ impl TxPool {

// Remove transactions from the pool until total size <= size_limit.
pub(crate) fn limit_size(&mut self, callbacks: &Callbacks) {
while self.total_tx_size > self.config.max_tx_pool_size {
while self.pool_map.total_tx_size > self.config.max_tx_pool_size {
let next_evict_entry = || {
self.pool_map
.next_evict_entry(Status::Pending)
Expand Down Expand Up @@ -323,18 +288,7 @@ impl TxPool {

pub(crate) fn remove_tx(&mut self, id: &ProposalShortId) -> bool {
let entries = self.pool_map.remove_entry_and_descendants(id);
if !entries.is_empty() {
for entry in entries {
self.update_statics_for_remove_tx(entry.size, entry.cycles);
}
return true;
}

if let Some(entry) = self.pool_map.remove_entry(id) {
self.update_statics_for_remove_tx(entry.size, entry.cycles);
return true;
}
false
!entries.is_empty()
Copy link
Collaborator

@eval-exec eval-exec Dec 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this change introduces a logical inconsistency compared to the previous commit. In the previous commit, when self.pool_map.remove_entry(id) was Some, it would return true, but now it returns false.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.pool_map.remove_entry(id) will never return Some(_), because the above self.pool_map.remove_entry_and_descendants(id) already removed the entry with id.

}

pub(crate) fn check_rtx_from_pool(&self, rtx: &ResolvedTransaction) -> Result<(), Reject> {
Expand Down Expand Up @@ -456,7 +410,7 @@ impl TxPool {

pub(crate) fn drain_all_transactions(&mut self) -> Vec<TransactionView> {
let mut txs = CommitTxsScanner::new(&self.pool_map)
.txs_to_commit(self.total_tx_size, self.total_tx_cycles)
.txs_to_commit(self.pool_map.total_tx_size, self.pool_map.total_tx_cycles)
chenyukang marked this conversation as resolved.
Show resolved Hide resolved
.0
.into_iter()
.map(|tx_entry| tx_entry.into_transaction())
Expand All @@ -477,8 +431,6 @@ impl TxPool {
.map(|e| e.inner.into_transaction())
.collect::<Vec<_>>();
txs.append(&mut gap);
self.total_tx_size = 0;
self.total_tx_cycles = 0;
self.pool_map.clear();
txs
}
Expand All @@ -487,8 +439,6 @@ impl TxPool {
self.pool_map.clear();
self.snapshot = snapshot;
self.committed_txs_hash_cache = LruCache::new(COMMITTED_HASH_CACHE_SIZE);
self.total_tx_size = 0;
self.total_tx_cycles = 0;
}

pub(crate) fn package_proposals(
Expand Down
Loading
Loading