Skip to content

Commit d30daec

Browse files
authored
Merge pull request #4276 from chenyukang/yukang_cleanup_txpool_statics
chore: Move txpool statistics from callbacks to PoolMap
2 parents ae1ab00 + b1a1e2a commit d30daec

File tree

7 files changed

+97
-143
lines changed

7 files changed

+97
-143
lines changed

chain/src/tests/block_assembler.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,10 +555,14 @@ fn test_package_multi_best_scores() {
555555
TxEntry::dummy_resolve(tx2_3.clone(), 0, Capacity::shannons(150), 100),
556556
TxEntry::dummy_resolve(tx2_4.clone(), 0, Capacity::shannons(150), 100),
557557
TxEntry::dummy_resolve(tx3_1.clone(), 0, Capacity::shannons(1000), 1000),
558-
TxEntry::dummy_resolve(tx4_1.clone(), 0, Capacity::shannons(300), 250),
558+
TxEntry::dummy_resolve(tx4_1.clone(), 100, Capacity::shannons(300), 250),
559559
];
560560
tx_pool.plug_entry(entries, PlugTarget::Proposed).unwrap();
561561

562+
let tx_pool_info = tx_pool.get_tx_pool_info().unwrap();
563+
assert_eq!(tx_pool_info.total_tx_size, 2400);
564+
assert_eq!(tx_pool_info.total_tx_cycles, 100);
565+
562566
// 250 size best scored txs
563567
let txs = tx_pool.package_txs(Some(250)).unwrap();
564568
check_txs(&txs, vec![&tx1, &tx2, &tx3], "250 size best scored txs");

shared/src/shared_builder.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -399,39 +399,22 @@ fn register_tx_pool_callback(tx_pool_builder: &mut TxPoolServiceBuilder, notify:
399399
fee: entry.fee,
400400
timestamp: entry.timestamp,
401401
};
402-
tx_pool_builder.register_pending(Box::new(move |tx_pool: &mut TxPool, entry: &TxEntry| {
403-
// update statics
404-
tx_pool.update_statics_for_add_tx(entry.size, entry.cycles);
405-
402+
tx_pool_builder.register_pending(Box::new(move |entry: &TxEntry| {
406403
// notify
407404
let notify_tx_entry = create_notify_entry(entry);
408405
notify_pending.notify_new_transaction(notify_tx_entry);
409406
}));
410407

411408
let notify_proposed = notify.clone();
412-
tx_pool_builder.register_proposed(Box::new(
413-
move |tx_pool: &mut TxPool, entry: &TxEntry, new: bool| {
414-
// update statics
415-
if new {
416-
tx_pool.update_statics_for_add_tx(entry.size, entry.cycles);
417-
}
418-
419-
// notify
420-
let notify_tx_entry = create_notify_entry(entry);
421-
notify_proposed.notify_proposed_transaction(notify_tx_entry);
422-
},
423-
));
424-
425-
tx_pool_builder.register_committed(Box::new(move |tx_pool: &mut TxPool, entry: &TxEntry| {
426-
tx_pool.update_statics_for_remove_tx(entry.size, entry.cycles);
409+
tx_pool_builder.register_proposed(Box::new(move |entry: &TxEntry| {
410+
// notify
411+
let notify_tx_entry = create_notify_entry(entry);
412+
notify_proposed.notify_proposed_transaction(notify_tx_entry);
427413
}));
428414

429415
let notify_reject = notify;
430416
tx_pool_builder.register_reject(Box::new(
431417
move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
432-
// update statics
433-
tx_pool.update_statics_for_remove_tx(entry.size, entry.cycles);
434-
435418
let tx_hash = entry.transaction().hash();
436419
// record recent reject
437420
if matches!(reject, Reject::Resolve(..) | Reject::RBFRejected(..)) {

tx-pool/src/callback.rs

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@ use crate::error::Reject;
33
use crate::pool::TxPool;
44

55
/// Callback boxed fn pointer wrapper
6-
pub type Callback = Box<dyn Fn(&mut TxPool, &TxEntry) + Sync + Send>;
6+
pub type PendingCallback = Box<dyn Fn(&TxEntry) + Sync + Send>;
77
/// Proposed Callback boxed fn pointer wrapper
8-
pub type ProposedCallback = Box<dyn Fn(&mut TxPool, &TxEntry, bool) + Sync + Send>;
8+
pub type ProposedCallback = Box<dyn Fn(&TxEntry) + Sync + Send>;
99
/// Reject Callback boxed fn pointer wrapper
1010
pub type RejectCallback = Box<dyn Fn(&mut TxPool, &TxEntry, Reject) + Sync + Send>;
1111

1212
/// Struct hold callbacks
1313
pub struct Callbacks {
14-
pub(crate) pending: Option<Callback>,
14+
pub(crate) pending: Option<PendingCallback>,
1515
pub(crate) proposed: Option<ProposedCallback>,
16-
pub(crate) committed: Option<Callback>,
1716
pub(crate) reject: Option<RejectCallback>,
1817
}
1918

@@ -29,13 +28,12 @@ impl Callbacks {
2928
Callbacks {
3029
pending: None,
3130
proposed: None,
32-
committed: None,
3331
reject: None,
3432
}
3533
}
3634

3735
/// Register a new pending callback
38-
pub fn register_pending(&mut self, callback: Callback) {
36+
pub fn register_pending(&mut self, callback: PendingCallback) {
3937
self.pending = Some(callback);
4038
}
4139

@@ -44,34 +42,22 @@ impl Callbacks {
4442
self.proposed = Some(callback);
4543
}
4644

47-
/// Register a new committed callback
48-
pub fn register_committed(&mut self, callback: Callback) {
49-
self.committed = Some(callback);
50-
}
51-
5245
/// Register a new abandon callback
5346
pub fn register_reject(&mut self, callback: RejectCallback) {
5447
self.reject = Some(callback);
5548
}
5649

5750
/// Call on after pending
58-
pub fn call_pending(&self, tx_pool: &mut TxPool, entry: &TxEntry) {
51+
pub fn call_pending(&self, entry: &TxEntry) {
5952
if let Some(call) = &self.pending {
60-
call(tx_pool, entry)
53+
call(entry)
6154
}
6255
}
6356

6457
/// Call on after proposed
65-
pub fn call_proposed(&self, tx_pool: &mut TxPool, entry: &TxEntry, new: bool) {
58+
pub fn call_proposed(&self, entry: &TxEntry) {
6659
if let Some(call) = &self.proposed {
67-
call(tx_pool, entry, new)
68-
}
69-
}
70-
71-
/// Call on after proposed
72-
pub fn call_committed(&self, tx_pool: &mut TxPool, entry: &TxEntry) {
73-
if let Some(call) = &self.committed {
74-
call(tx_pool, entry)
60+
call(entry)
7561
}
7662
}
7763

tx-pool/src/component/pool_map.rs

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use crate::component::links::{Relation, TxLinksMap};
77
use crate::component::sort_key::{AncestorsScoreSortKey, EvictKey};
88
use crate::error::Reject;
99
use crate::TxEntry;
10-
use ckb_logger::{debug, trace};
10+
use ckb_logger::{debug, error, trace};
1111
use ckb_types::core::error::OutPointError;
12+
use ckb_types::core::Cycle;
1213
use ckb_types::packed::OutPoint;
1314
use ckb_types::prelude::*;
1415
use ckb_types::{
@@ -66,6 +67,10 @@ pub struct PoolMap {
6667
/// All the parent/children relationships
6768
pub(crate) links: TxLinksMap,
6869
pub(crate) max_ancestors_count: usize,
70+
// sum of all tx_pool tx's virtual sizes.
71+
pub(crate) total_tx_size: usize,
72+
// sum of all tx_pool tx's cycles.
73+
pub(crate) total_tx_cycles: Cycle,
6974
}
7075

7176
impl PoolMap {
@@ -75,6 +80,8 @@ impl PoolMap {
7580
edges: Edges::default(),
7681
links: TxLinksMap::new(),
7782
max_ancestors_count,
83+
total_tx_size: 0,
84+
total_tx_cycles: 0,
7885
}
7986
}
8087

@@ -193,6 +200,7 @@ impl PoolMap {
193200
self.insert_entry(&entry, status);
194201
self.record_entry_descendants(&entry);
195202
self.track_entry_statics();
203+
self.update_stat_for_add_tx(entry.size, entry.cycles);
196204
Ok(true)
197205
}
198206

@@ -217,6 +225,7 @@ impl PoolMap {
217225
self.update_descendants_index_key(&entry.inner, EntryOp::Remove);
218226
self.remove_entry_edges(&entry.inner);
219227
self.remove_entry_links(id);
228+
self.update_stat_for_remove_tx(entry.inner.size, entry.inner.cycles);
220229
entry.inner
221230
})
222231
}
@@ -332,6 +341,8 @@ impl PoolMap {
332341
self.entries = MultiIndexPoolEntryMap::default();
333342
self.edges.clear();
334343
self.links.clear();
344+
self.total_tx_size = 0;
345+
self.total_tx_cycles = 0;
335346
}
336347

337348
pub(crate) fn score_sorted_iter_by(
@@ -497,8 +508,7 @@ impl PoolMap {
497508
}
498509

499510
fn remove_entry_edges(&mut self, entry: &TxEntry) {
500-
let inputs = entry.transaction().input_pts_iter();
501-
for i in inputs {
511+
for i in entry.transaction().input_pts_iter() {
502512
// release input record
503513
self.edges.remove_input(&i);
504514
}
@@ -539,4 +549,45 @@ impl PoolMap {
539549
.set(self.proposed_size() as i64);
540550
}
541551
}
552+
553+
/// Update size and cycles statistics for add tx
554+
fn update_stat_for_add_tx(&mut self, tx_size: usize, cycles: Cycle) {
555+
let total_tx_size = self.total_tx_size.checked_add(tx_size).unwrap_or_else(|| {
556+
error!(
557+
"total_tx_size {} overflown by add {}",
558+
self.total_tx_size, tx_size
559+
);
560+
self.total_tx_size
561+
});
562+
let total_tx_cycles = self.total_tx_cycles.checked_add(cycles).unwrap_or_else(|| {
563+
error!(
564+
"total_tx_cycles {} overflown by add {}",
565+
self.total_tx_cycles, cycles
566+
);
567+
self.total_tx_cycles
568+
});
569+
self.total_tx_size = total_tx_size;
570+
self.total_tx_cycles = total_tx_cycles;
571+
}
572+
573+
/// Update size and cycles statistics for remove tx
574+
/// cycles overflow is possible, currently obtaining cycles is not accurate
575+
fn update_stat_for_remove_tx(&mut self, tx_size: usize, cycles: Cycle) {
576+
let total_tx_size = self.total_tx_size.checked_sub(tx_size).unwrap_or_else(|| {
577+
error!(
578+
"total_tx_size {} overflown by sub {}",
579+
self.total_tx_size, tx_size
580+
);
581+
0
582+
});
583+
let total_tx_cycles = self.total_tx_cycles.checked_sub(cycles).unwrap_or_else(|| {
584+
error!(
585+
"total_tx_cycles {} overflown by sub {}",
586+
self.total_tx_cycles, cycles
587+
);
588+
0
589+
});
590+
self.total_tx_size = total_tx_size;
591+
self.total_tx_cycles = total_tx_cycles;
592+
}
542593
}

tx-pool/src/pool.rs

Lines changed: 5 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ pub struct TxPool {
3535
pub(crate) pool_map: PoolMap,
3636
/// cache for committed transactions hash
3737
pub(crate) committed_txs_hash_cache: LruCache<ProposalShortId, Byte32>,
38-
// sum of all tx_pool tx's virtual sizes.
39-
pub(crate) total_tx_size: usize,
40-
// sum of all tx_pool tx's cycles.
41-
pub(crate) total_tx_cycles: Cycle,
4238
/// storage snapshot reference
4339
pub(crate) snapshot: Arc<Snapshot>,
4440
/// record recent reject
@@ -55,8 +51,6 @@ impl TxPool {
5551
TxPool {
5652
pool_map: PoolMap::new(config.max_ancestors_count),
5753
committed_txs_hash_cache: LruCache::new(COMMITTED_HASH_CACHE_SIZE),
58-
total_tx_size: 0,
59-
total_tx_cycles: 0,
6054
config,
6155
snapshot,
6256
recent_reject,
@@ -83,12 +77,6 @@ impl TxPool {
8377
self.get_by_status(status).len()
8478
}
8579

86-
/// Update size and cycles statics for add tx
87-
pub fn update_statics_for_add_tx(&mut self, tx_size: usize, cycles: Cycle) {
88-
self.total_tx_size += tx_size;
89-
self.total_tx_cycles += cycles;
90-
}
91-
9280
/// Check whether tx-pool enable RBF
9381
pub fn enable_rbf(&self) -> bool {
9482
self.config.min_rbf_rate > self.config.min_fee_rate
@@ -127,27 +115,6 @@ impl TxPool {
127115
}
128116
}
129117

130-
/// Update size and cycles statics for remove tx
131-
/// cycles overflow is possible, currently obtaining cycles is not accurate
132-
pub fn update_statics_for_remove_tx(&mut self, tx_size: usize, cycles: Cycle) {
133-
let total_tx_size = self.total_tx_size.checked_sub(tx_size).unwrap_or_else(|| {
134-
error!(
135-
"total_tx_size {} overflown by sub {}",
136-
self.total_tx_size, tx_size
137-
);
138-
0
139-
});
140-
let total_tx_cycles = self.total_tx_cycles.checked_sub(cycles).unwrap_or_else(|| {
141-
error!(
142-
"total_tx_cycles {} overflown by sub {}",
143-
self.total_tx_cycles, cycles
144-
);
145-
0
146-
});
147-
self.total_tx_size = total_tx_size;
148-
self.total_tx_cycles = total_tx_cycles;
149-
}
150-
151118
/// Add tx with pending status
152119
/// If did have this value present, false is returned.
153120
pub(crate) fn add_pending(&mut self, entry: TxEntry) -> Result<bool, Reject> {
@@ -229,13 +196,11 @@ impl TxPool {
229196

230197
fn remove_committed_tx(&mut self, tx: &TransactionView, callbacks: &Callbacks) {
231198
let short_id = tx.proposal_short_id();
232-
if let Some(entry) = self.pool_map.remove_entry(&short_id) {
199+
if let Some(_entry) = self.pool_map.remove_entry(&short_id) {
233200
debug!("remove_committed_tx for {}", tx.hash());
234-
callbacks.call_committed(self, &entry)
235201
}
236202
{
237-
let conflicts = self.pool_map.resolve_conflict(tx);
238-
for (entry, reject) in conflicts {
203+
for (entry, reject) in self.pool_map.resolve_conflict(tx) {
239204
debug!(
240205
"removed {} for commited: {}",
241206
entry.transaction().hash(),
@@ -267,7 +232,7 @@ impl TxPool {
267232

268233
// Remove transactions from the pool until total size <= size_limit.
269234
pub(crate) fn limit_size(&mut self, callbacks: &Callbacks) {
270-
while self.total_tx_size > self.config.max_tx_pool_size {
235+
while self.pool_map.total_tx_size > self.config.max_tx_pool_size {
271236
let next_evict_entry = || {
272237
self.pool_map
273238
.next_evict_entry(Status::Pending)
@@ -323,18 +288,7 @@ impl TxPool {
323288

324289
pub(crate) fn remove_tx(&mut self, id: &ProposalShortId) -> bool {
325290
let entries = self.pool_map.remove_entry_and_descendants(id);
326-
if !entries.is_empty() {
327-
for entry in entries {
328-
self.update_statics_for_remove_tx(entry.size, entry.cycles);
329-
}
330-
return true;
331-
}
332-
333-
if let Some(entry) = self.pool_map.remove_entry(id) {
334-
self.update_statics_for_remove_tx(entry.size, entry.cycles);
335-
return true;
336-
}
337-
false
291+
!entries.is_empty()
338292
}
339293

340294
pub(crate) fn check_rtx_from_pool(&self, rtx: &ResolvedTransaction) -> Result<(), Reject> {
@@ -456,7 +410,7 @@ impl TxPool {
456410

457411
pub(crate) fn drain_all_transactions(&mut self) -> Vec<TransactionView> {
458412
let mut txs = CommitTxsScanner::new(&self.pool_map)
459-
.txs_to_commit(self.total_tx_size, self.total_tx_cycles)
413+
.txs_to_commit(usize::MAX, Cycle::MAX)
460414
.0
461415
.into_iter()
462416
.map(|tx_entry| tx_entry.into_transaction())
@@ -477,8 +431,6 @@ impl TxPool {
477431
.map(|e| e.inner.into_transaction())
478432
.collect::<Vec<_>>();
479433
txs.append(&mut gap);
480-
self.total_tx_size = 0;
481-
self.total_tx_cycles = 0;
482434
self.pool_map.clear();
483435
txs
484436
}
@@ -487,8 +439,6 @@ impl TxPool {
487439
self.pool_map.clear();
488440
self.snapshot = snapshot;
489441
self.committed_txs_hash_cache = LruCache::new(COMMITTED_HASH_CACHE_SIZE);
490-
self.total_tx_size = 0;
491-
self.total_tx_cycles = 0;
492442
}
493443

494444
pub(crate) fn package_proposals(

0 commit comments

Comments
 (0)