diff --git a/crates/pool/src/mempool/pool.rs b/crates/pool/src/mempool/pool.rs index 72361593..817cd9af 100644 --- a/crates/pool/src/mempool/pool.rs +++ b/crates/pool/src/mempool/pool.rs @@ -12,9 +12,10 @@ // If not, see https://www.gnu.org/licenses/. use std::{ - cmp::Ordering, + cmp::{self, Ordering}, collections::{hash_map::Entry, BTreeSet, HashMap, HashSet}, sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use anyhow::Context; @@ -24,7 +25,7 @@ use ethers::{ }; use rundler_types::{ pool::{MempoolError, PoolOperation}, - Entity, EntityType, Timestamp, UserOperation, UserOperationId, UserOperationVariant, + Entity, EntityType, GasFees, Timestamp, UserOperation, UserOperationId, UserOperationVariant, }; use rundler_utils::math; use tracing::info; @@ -81,6 +82,10 @@ pub(crate) struct PoolInner { pool_size: SizeTracker, /// keeps track of the size of the removed cache in bytes cache_size: SizeTracker, + /// The time of the previous block + prev_sys_block_time: Duration, + /// The number of the previous block + prev_block_number: u64, } impl PoolInner { @@ -96,6 +101,8 @@ impl PoolInner { submission_id: 0, pool_size: SizeTracker::default(), cache_size: SizeTracker::default(), + prev_sys_block_time: Duration::default(), + prev_block_number: 0, } } @@ -145,22 +152,54 @@ impl PoolInner { self.best.clone().into_iter().map(|v| v.po) } - /// Removes all operations using the given entity, returning the hashes of the removed operations. + /// Does maintenance on the pool. + /// + /// 1) Removes all operations using the given entity, returning the hashes of the removed operations. + /// 2) Updates time to mine stats for all operations in the pool. /// /// NOTE: This method is O(n) where n is the number of operations in the pool. /// It should be called sparingly (e.g. when a block is mined). - pub(crate) fn remove_expired(&mut self, expire_before: Timestamp) -> Vec<(H256, Timestamp)> { + pub(crate) fn do_maintenance( + &mut self, + block_number: u64, + block_timestamp: Timestamp, + candidate_gas_fees: GasFees, + base_fee: U256, + ) -> Vec<(H256, Timestamp)> { + let sys_block_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time should be after epoch"); + + let block_delta_time = sys_block_time - self.prev_sys_block_time; + let block_delta_height = block_number - self.prev_block_number; let mut expired = Vec::new(); - for (hash, op) in &self.by_hash { - if op.po.valid_time_range.valid_until < expire_before { + let mut num_candidates = 0; + + for (hash, op) in &mut self.by_hash { + if op.po.valid_time_range.valid_until < block_timestamp { expired.push((*hash, op.po.valid_time_range.valid_until)); } + + num_candidates += if op.update_time_to_mine( + block_delta_time, + block_delta_height, + candidate_gas_fees, + base_fee, + ) { + 1 + } else { + 0 + }; } for (hash, _) in &expired { self.remove_operation_by_hash(*hash); } + PoolMetrics::set_num_candidates(num_candidates, self.config.entry_point); + self.prev_block_number = block_number; + self.prev_sys_block_time = sys_block_time; + expired } @@ -243,6 +282,7 @@ impl PoolInner { block_number: u64, ) -> Option> { let tx_in_pool = self.by_id.get(&mined_op.id())?; + PoolMetrics::record_time_to_mine(&tx_in_pool.time_to_mine, mined_op.entry_point); let hash = tx_in_pool .uo() @@ -380,6 +420,7 @@ impl PoolInner { let pool_op = OrderedPoolOperation { po: op, submission_id: submission_id.unwrap_or_else(|| self.next_submission_id()), + time_to_mine: TimeToMineInfo::new(), }; // update counts @@ -484,6 +525,7 @@ impl PoolInner { struct OrderedPoolOperation { po: Arc, submission_id: u64, + time_to_mine: TimeToMineInfo, } impl OrderedPoolOperation { @@ -494,6 +536,28 @@ impl OrderedPoolOperation { fn mem_size(&self) -> usize { std::mem::size_of::() + self.po.mem_size() } + + fn update_time_to_mine( + &mut self, + block_delta_time: Duration, + block_delta_height: u64, + candidate_gas_fees: GasFees, + base_fee: U256, + ) -> bool { + let candidate_gas_price = base_fee + candidate_gas_fees.max_priority_fee_per_gas; + let uo_gas_price = cmp::min( + self.uo().max_fee_per_gas(), + self.uo().max_priority_fee_per_gas() + base_fee, + ); + + if uo_gas_price >= candidate_gas_price { + self.time_to_mine + .increase(block_delta_time, block_delta_height); + true + } else { + false + } + } } impl Eq for OrderedPoolOperation {} @@ -521,6 +585,26 @@ impl PartialEq for OrderedPoolOperation { } } +#[derive(Debug, Clone)] +struct TimeToMineInfo { + candidate_for_blocks: u64, + candidate_for_time: Duration, +} + +impl TimeToMineInfo { + fn new() -> Self { + Self { + candidate_for_blocks: 0, + candidate_for_time: Duration::default(), + } + } + + fn increase(&mut self, block_delta_time: Duration, block_delta_height: u64) { + self.candidate_for_blocks += block_delta_height; + self.candidate_for_time += block_delta_time; + } +} + struct PoolMetrics {} impl PoolMetrics { @@ -530,12 +614,32 @@ impl PoolMetrics { metrics::gauge!("op_pool_size_bytes", "entry_point" => entry_point.to_string()) .set(size_bytes as f64); } + fn set_cache_metrics(num_ops: usize, size_bytes: isize, entry_point: Address) { metrics::gauge!("op_pool_num_ops_in_cache", "entry_point" => entry_point.to_string()) .set(num_ops as f64); metrics::gauge!("op_pool_cache_size_bytes", "entry_point" => entry_point.to_string()) .set(size_bytes as f64); } + + // Set the number of candidates in the pool, only changes on block boundaries + fn set_num_candidates(num_candidates: usize, entry_point: Address) { + metrics::gauge!("op_pool_num_candidates", "entry_point" => entry_point.to_string()) + .set(num_candidates as f64); + } + + fn record_time_to_mine(time_to_mine: &TimeToMineInfo, entry_point: Address) { + metrics::histogram!( + "op_pool_time_to_mine", + "entry_point" => entry_point.to_string() + ) + .record(time_to_mine.candidate_for_time.as_millis() as f64); + metrics::histogram!( + "op_pool_blocks_to_mine", + "entry_point" => entry_point.to_string() + ) + .record(time_to_mine.candidate_for_blocks as f64); + } } #[cfg(test)] @@ -907,7 +1011,8 @@ mod tests { pool.pool_size, OrderedPoolOperation { po: Arc::new(po1), - submission_id: 0 + submission_id: 0, + time_to_mine: TimeToMineInfo::new() } .mem_size() ); @@ -947,7 +1052,8 @@ mod tests { pool.pool_size, OrderedPoolOperation { po: Arc::new(po2), - submission_id: 0 + submission_id: 0, + time_to_mine: TimeToMineInfo::new(), } .mem_size() ); @@ -979,7 +1085,7 @@ mod tests { po1.valid_time_range.valid_until = Timestamp::from(1); let _ = pool.add_operation(po1.clone()).unwrap(); - let res = pool.remove_expired(Timestamp::from(2)); + let res = pool.do_maintenance(0, Timestamp::from(2), GasFees::default(), 0.into()); assert_eq!(res.len(), 1); assert_eq!(res[0].0, po1.uo.hash(conf.entry_point, conf.chain_id)); assert_eq!(res[0].1, Timestamp::from(1)); @@ -1001,7 +1107,8 @@ mod tests { po3.valid_time_range.valid_until = 9.into(); let _ = pool.add_operation(po3.clone()).unwrap(); - let res = pool.remove_expired(10.into()); + let res = pool.do_maintenance(0, Timestamp::from(10), GasFees::default(), 0.into()); + assert_eq!(res.len(), 2); assert!(res.contains(&(po1.uo.hash(conf.entry_point, conf.chain_id), 5.into()))); assert!(res.contains(&(po3.uo.hash(conf.entry_point, conf.chain_id), 9.into()))); @@ -1022,6 +1129,7 @@ mod tests { OrderedPoolOperation { po: Arc::new(create_op(Address::random(), 1, 1)), submission_id: 1, + time_to_mine: TimeToMineInfo::new(), } .mem_size() } diff --git a/crates/pool/src/mempool/uo_pool.rs b/crates/pool/src/mempool/uo_pool.rs index 8a31e05e..9ee07f13 100644 --- a/crates/pool/src/mempool/uo_pool.rs +++ b/crates/pool/src/mempool/uo_pool.rs @@ -25,8 +25,8 @@ use rundler_types::{ pool::{ MempoolError, PaymasterMetadata, PoolOperation, Reputation, ReputationStatus, StakeStatus, }, - Entity, EntityUpdate, EntityUpdateType, EntryPointVersion, UserOperation, UserOperationId, - UserOperationVariant, + Entity, EntityUpdate, EntityUpdateType, EntryPointVersion, GasFees, UserOperation, + UserOperationId, UserOperationVariant, }; use rundler_utils::emit::WithEntryPoint; use tokio::sync::broadcast; @@ -62,6 +62,8 @@ struct UoPoolState { pool: PoolInner, throttled_ops: HashSet, block_number: u64, + gas_fees: GasFees, + base_fee: U256, } impl UoPool @@ -84,6 +86,8 @@ where pool: PoolInner::new(config.clone().into()), throttled_ops: HashSet::new(), block_number: 0, + gas_fees: GasFees::default(), + base_fee: U256::zero(), }), reputation, paymaster, @@ -297,38 +301,61 @@ where }) } - // expire old UOs - let expired = state.pool.remove_expired(update.latest_block_timestamp); + // pool maintenance + let gas_fees = state.gas_fees; + let base_fee = state.base_fee; + let expired = state.pool.do_maintenance( + update.latest_block_number, + update.latest_block_timestamp, + gas_fees, + base_fee, + ); + for (hash, until) in expired { self.emit(OpPoolEvent::RemovedOp { op_hash: hash, reason: OpRemovalReason::Expired { valid_until: until }, }) } - - state.block_number = update.latest_block_number; } // update required bundle fees and update metrics - if let Ok((bundle_fees, base_fee)) = self.prechecker.update_fees().await { - let max_fee = match format_units(bundle_fees.max_fee_per_gas, "gwei") { - Ok(s) => s.parse::().unwrap_or_default(), - Err(_) => 0.0, - }; - UoPoolMetrics::current_max_fee_gwei(max_fee); - - let max_priority_fee = match format_units(bundle_fees.max_priority_fee_per_gas, "gwei") - { - Ok(s) => s.parse::().unwrap_or_default(), - Err(_) => 0.0, - }; - UoPoolMetrics::current_max_priority_fee_gwei(max_priority_fee); - - let base_fee = match format_units(base_fee, "gwei") { - Ok(s) => s.parse::().unwrap_or_default(), - Err(_) => 0.0, - }; - UoPoolMetrics::current_base_fee(base_fee); + match self.prechecker.update_fees().await { + Ok((bundle_fees, base_fee)) => { + let max_fee = match format_units(bundle_fees.max_fee_per_gas, "gwei") { + Ok(s) => s.parse::().unwrap_or_default(), + Err(_) => 0.0, + }; + UoPoolMetrics::current_max_fee_gwei(max_fee); + + let max_priority_fee = + match format_units(bundle_fees.max_priority_fee_per_gas, "gwei") { + Ok(s) => s.parse::().unwrap_or_default(), + Err(_) => 0.0, + }; + UoPoolMetrics::current_max_priority_fee_gwei(max_priority_fee); + + let base_fee_f64 = match format_units(base_fee, "gwei") { + Ok(s) => s.parse::().unwrap_or_default(), + Err(_) => 0.0, + }; + UoPoolMetrics::current_base_fee(base_fee_f64); + + // cache for the next update + { + let mut state = self.state.write(); + state.block_number = update.latest_block_number; + state.gas_fees = bundle_fees; + state.base_fee = base_fee; + } + } + Err(e) => { + tracing::error!("Failed to update fees: {:?}", e); + { + let mut state = self.state.write(); + state.block_number = update.latest_block_number; + } + } } }