Skip to content

Commit

Permalink
feat(pool): add time to mine tracking and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Jun 18, 2024
1 parent 1b71cfb commit 3b7c365
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 35 deletions.
128 changes: 118 additions & 10 deletions crates/pool/src/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// If not, see https://www.gnu.org/licenses/.

use std::{
cmp::Ordering,
cmp::{self, Ordering},
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use anyhow::Context;
Expand All @@ -24,7 +25,7 @@ use ethers::{
};
use rundler_types::{
pool::{MempoolError, PoolOperation},
Entity, EntityType, Timestamp, UserOperation, UserOperationId, UserOperationVariant,
Entity, EntityType, GasFees, Timestamp, UserOperation, UserOperationId, UserOperationVariant,
};
use rundler_utils::math;
use tracing::info;
Expand Down Expand Up @@ -81,6 +82,10 @@ pub(crate) struct PoolInner {
pool_size: SizeTracker,
/// keeps track of the size of the removed cache in bytes
cache_size: SizeTracker,
/// The time of the previous block
prev_sys_block_time: Duration,
/// The number of the previous block
prev_block_number: u64,
}

impl PoolInner {
Expand All @@ -96,6 +101,8 @@ impl PoolInner {
submission_id: 0,
pool_size: SizeTracker::default(),
cache_size: SizeTracker::default(),
prev_sys_block_time: Duration::default(),
prev_block_number: 0,
}
}

Expand Down Expand Up @@ -145,22 +152,54 @@ impl PoolInner {
self.best.clone().into_iter().map(|v| v.po)
}

/// Removes all operations using the given entity, returning the hashes of the removed operations.
/// Does maintenance on the pool.
///
/// 1) Removes all operations using the given entity, returning the hashes of the removed operations.
/// 2) Updates time to mine stats for all operations in the pool.
///
/// NOTE: This method is O(n) where n is the number of operations in the pool.
/// It should be called sparingly (e.g. when a block is mined).
pub(crate) fn remove_expired(&mut self, expire_before: Timestamp) -> Vec<(H256, Timestamp)> {
pub(crate) fn do_maintenance(
&mut self,
block_number: u64,
block_timestamp: Timestamp,
candidate_gas_fees: GasFees,
base_fee: U256,
) -> Vec<(H256, Timestamp)> {
let sys_block_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time should be after epoch");

let block_delta_time = sys_block_time - self.prev_sys_block_time;
let block_delta_height = block_number - self.prev_block_number;
let mut expired = Vec::new();
for (hash, op) in &self.by_hash {
if op.po.valid_time_range.valid_until < expire_before {
let mut num_candidates = 0;

for (hash, op) in &mut self.by_hash {
if op.po.valid_time_range.valid_until < block_timestamp {
expired.push((*hash, op.po.valid_time_range.valid_until));
}

num_candidates += if op.update_time_to_mine(
block_delta_time,
block_delta_height,
candidate_gas_fees,
base_fee,
) {
1
} else {
0
};
}

for (hash, _) in &expired {
self.remove_operation_by_hash(*hash);
}

PoolMetrics::set_num_candidates(num_candidates, self.config.entry_point);
self.prev_block_number = block_number;
self.prev_sys_block_time = sys_block_time;

expired
}

Expand Down Expand Up @@ -243,6 +282,7 @@ impl PoolInner {
block_number: u64,
) -> Option<Arc<PoolOperation>> {
let tx_in_pool = self.by_id.get(&mined_op.id())?;
PoolMetrics::record_time_to_mine(&tx_in_pool.time_to_mine, mined_op.entry_point);

let hash = tx_in_pool
.uo()
Expand Down Expand Up @@ -380,6 +420,7 @@ impl PoolInner {
let pool_op = OrderedPoolOperation {
po: op,
submission_id: submission_id.unwrap_or_else(|| self.next_submission_id()),
time_to_mine: TimeToMineInfo::new(),
};

// update counts
Expand Down Expand Up @@ -484,6 +525,7 @@ impl PoolInner {
struct OrderedPoolOperation {
po: Arc<PoolOperation>,
submission_id: u64,
time_to_mine: TimeToMineInfo,
}

impl OrderedPoolOperation {
Expand All @@ -494,6 +536,28 @@ impl OrderedPoolOperation {
fn mem_size(&self) -> usize {
std::mem::size_of::<Self>() + self.po.mem_size()
}

fn update_time_to_mine(
&mut self,
block_delta_time: Duration,
block_delta_height: u64,
candidate_gas_fees: GasFees,
base_fee: U256,
) -> bool {
let candidate_gas_price = base_fee + candidate_gas_fees.max_priority_fee_per_gas;
let uo_gas_price = cmp::min(
self.uo().max_fee_per_gas(),
self.uo().max_priority_fee_per_gas() + base_fee,
);

if uo_gas_price >= candidate_gas_price {
self.time_to_mine
.increase(block_delta_time, block_delta_height);
true
} else {
false
}
}
}

impl Eq for OrderedPoolOperation {}
Expand Down Expand Up @@ -521,6 +585,26 @@ impl PartialEq for OrderedPoolOperation {
}
}

#[derive(Debug, Clone)]
struct TimeToMineInfo {
candidate_for_blocks: u64,
candidate_for_time: Duration,
}

impl TimeToMineInfo {
fn new() -> Self {
Self {
candidate_for_blocks: 0,
candidate_for_time: Duration::default(),
}
}

fn increase(&mut self, block_delta_time: Duration, block_delta_height: u64) {
self.candidate_for_blocks += block_delta_height;
self.candidate_for_time += block_delta_time;
}
}

struct PoolMetrics {}

impl PoolMetrics {
Expand All @@ -530,12 +614,32 @@ impl PoolMetrics {
metrics::gauge!("op_pool_size_bytes", "entry_point" => entry_point.to_string())
.set(size_bytes as f64);
}

fn set_cache_metrics(num_ops: usize, size_bytes: isize, entry_point: Address) {
metrics::gauge!("op_pool_num_ops_in_cache", "entry_point" => entry_point.to_string())
.set(num_ops as f64);
metrics::gauge!("op_pool_cache_size_bytes", "entry_point" => entry_point.to_string())
.set(size_bytes as f64);
}

// Set the number of candidates in the pool, only changes on block boundaries
fn set_num_candidates(num_candidates: usize, entry_point: Address) {
metrics::gauge!("op_pool_num_candidates", "entry_point" => entry_point.to_string())
.set(num_candidates as f64);
}

fn record_time_to_mine(time_to_mine: &TimeToMineInfo, entry_point: Address) {
metrics::histogram!(
"op_pool_time_to_mine",
"entry_point" => entry_point.to_string()
)
.record(time_to_mine.candidate_for_time.as_millis() as f64);
metrics::histogram!(
"op_pool_blocks_to_mine",
"entry_point" => entry_point.to_string()
)
.record(time_to_mine.candidate_for_blocks as f64);
}
}

#[cfg(test)]
Expand Down Expand Up @@ -907,7 +1011,8 @@ mod tests {
pool.pool_size,
OrderedPoolOperation {
po: Arc::new(po1),
submission_id: 0
submission_id: 0,
time_to_mine: TimeToMineInfo::new()
}
.mem_size()
);
Expand Down Expand Up @@ -947,7 +1052,8 @@ mod tests {
pool.pool_size,
OrderedPoolOperation {
po: Arc::new(po2),
submission_id: 0
submission_id: 0,
time_to_mine: TimeToMineInfo::new(),
}
.mem_size()
);
Expand Down Expand Up @@ -979,7 +1085,7 @@ mod tests {
po1.valid_time_range.valid_until = Timestamp::from(1);
let _ = pool.add_operation(po1.clone()).unwrap();

let res = pool.remove_expired(Timestamp::from(2));
let res = pool.do_maintenance(0, Timestamp::from(2), GasFees::default(), 0.into());
assert_eq!(res.len(), 1);
assert_eq!(res[0].0, po1.uo.hash(conf.entry_point, conf.chain_id));
assert_eq!(res[0].1, Timestamp::from(1));
Expand All @@ -1001,7 +1107,8 @@ mod tests {
po3.valid_time_range.valid_until = 9.into();
let _ = pool.add_operation(po3.clone()).unwrap();

let res = pool.remove_expired(10.into());
let res = pool.do_maintenance(0, Timestamp::from(10), GasFees::default(), 0.into());

assert_eq!(res.len(), 2);
assert!(res.contains(&(po1.uo.hash(conf.entry_point, conf.chain_id), 5.into())));
assert!(res.contains(&(po3.uo.hash(conf.entry_point, conf.chain_id), 9.into())));
Expand All @@ -1022,6 +1129,7 @@ mod tests {
OrderedPoolOperation {
po: Arc::new(create_op(Address::random(), 1, 1)),
submission_id: 1,
time_to_mine: TimeToMineInfo::new(),
}
.mem_size()
}
Expand Down
77 changes: 52 additions & 25 deletions crates/pool/src/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use rundler_types::{
pool::{
MempoolError, PaymasterMetadata, PoolOperation, Reputation, ReputationStatus, StakeStatus,
},
Entity, EntityUpdate, EntityUpdateType, EntryPointVersion, UserOperation, UserOperationId,
UserOperationVariant,
Entity, EntityUpdate, EntityUpdateType, EntryPointVersion, GasFees, UserOperation,
UserOperationId, UserOperationVariant,
};
use rundler_utils::emit::WithEntryPoint;
use tokio::sync::broadcast;
Expand Down Expand Up @@ -62,6 +62,8 @@ struct UoPoolState {
pool: PoolInner,
throttled_ops: HashSet<H256>,
block_number: u64,
gas_fees: GasFees,
base_fee: U256,
}

impl<UO, P, S, E> UoPool<UO, P, S, E>
Expand All @@ -84,6 +86,8 @@ where
pool: PoolInner::new(config.clone().into()),
throttled_ops: HashSet::new(),
block_number: 0,
gas_fees: GasFees::default(),
base_fee: U256::zero(),
}),
reputation,
paymaster,
Expand Down Expand Up @@ -297,38 +301,61 @@ where
})
}

// expire old UOs
let expired = state.pool.remove_expired(update.latest_block_timestamp);
// pool maintenance
let gas_fees = state.gas_fees;
let base_fee = state.base_fee;
let expired = state.pool.do_maintenance(
update.latest_block_number,
update.latest_block_timestamp,
gas_fees,
base_fee,
);

for (hash, until) in expired {
self.emit(OpPoolEvent::RemovedOp {
op_hash: hash,
reason: OpRemovalReason::Expired { valid_until: until },
})
}

state.block_number = update.latest_block_number;
}

// update required bundle fees and update metrics
if let Ok((bundle_fees, base_fee)) = self.prechecker.update_fees().await {
let max_fee = match format_units(bundle_fees.max_fee_per_gas, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_max_fee_gwei(max_fee);

let max_priority_fee = match format_units(bundle_fees.max_priority_fee_per_gas, "gwei")
{
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_max_priority_fee_gwei(max_priority_fee);

let base_fee = match format_units(base_fee, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_base_fee(base_fee);
match self.prechecker.update_fees().await {
Ok((bundle_fees, base_fee)) => {
let max_fee = match format_units(bundle_fees.max_fee_per_gas, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_max_fee_gwei(max_fee);

let max_priority_fee =
match format_units(bundle_fees.max_priority_fee_per_gas, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_max_priority_fee_gwei(max_priority_fee);

let base_fee_f64 = match format_units(base_fee, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_base_fee(base_fee_f64);

// cache for the next update
{
let mut state = self.state.write();
state.block_number = update.latest_block_number;
state.gas_fees = bundle_fees;
state.base_fee = base_fee;
}
}
Err(e) => {
tracing::error!("Failed to update fees: {:?}", e);
{
let mut state = self.state.write();
state.block_number = update.latest_block_number;
}
}
}
}

Expand Down

0 comments on commit 3b7c365

Please sign in to comment.