Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(builder): reject ops if condition not met after failure #723

Merged
merged 2 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use anyhow::Context;
use clap::Args;
use rundler_builder::{
self, BloxrouteSenderArgs, BuilderEvent, BuilderEventKind, BuilderTask, BuilderTaskArgs,
EntryPointBuilderSettings, FlashbotsSenderArgs, LocalBuilderBuilder, TransactionSenderArgs,
TransactionSenderKind,
EntryPointBuilderSettings, FlashbotsSenderArgs, LocalBuilderBuilder, RawSenderArgs,
TransactionSenderArgs, TransactionSenderKind,
};
use rundler_pool::RemotePoolClient;
use rundler_sim::{MempoolConfigs, PriorityFeeMode};
Expand Down Expand Up @@ -115,14 +115,48 @@ pub struct BuilderArgs {
/// If present, the url of the ETH provider that will be used to send
/// transactions. Defaults to the value of `node_http`.
///
/// Only used when BUILDER_SENDER is "raw" or "conditional"
/// Only used when BUILDER_SENDER is "raw"
#[arg(
long = "builder.submit_url",
name = "builder.submit_url",
env = "BUILDER_SUBMIT_URL"
)]
pub submit_url: Option<String>,

/// If present, the url of the ETH provider that will be used to check
/// transaction status. Else will use the node http for status.
///
/// Only used when BUILDER_SENDER is "raw"
#[arg(
long = "builder.use_submit_for_status",
name = "builder.use_submit_for_status",
env = "BUILDER_USE_SUBMIT_FOR_STATUS",
default_value = "false"
)]
pub use_submit_for_status: bool,

/// Use the conditional RPC endpoint for transaction submission.
///
/// Only used when BUILDER_SENDER is "raw"
#[arg(
long = "builder.use_conditional_rpc",
name = "builder.use_conditional_rpc",
env = "BUILDER_USE_CONDITIONAL_RPC",
default_value = "false"
)]
pub use_conditional_rpc: bool,

/// If the "dropped" status is unsupported by the status provider.
///
/// Only used when BUILDER_SENDER is "raw"
#[arg(
long = "builder.dropped_status_unsupported",
name = "builder.dropped_status_unsupported",
env = "BUILDER_DROPPED_STATUS_UNSUPPORTED",
default_value = "false"
)]
pub dropped_status_unsupported: bool,

/// A list of builders to pass into the Flashbots Relay RPC.
///
/// Only used when BUILDER_SENDER is "flashbots"
Expand Down Expand Up @@ -216,7 +250,6 @@ impl BuilderArgs {
.node_http
.clone()
.context("should have a node HTTP URL")?;
let submit_url = self.submit_url.clone().unwrap_or_else(|| rpc_url.clone());

let mempool_configs = match &common.mempool_config_path {
Some(path) => get_json_config::<MempoolConfigs>(path, &common.aws_region)
Expand Down Expand Up @@ -264,7 +297,7 @@ impl BuilderArgs {
));
}

let sender_args = self.sender_args(&chain_spec)?;
let sender_args = self.sender_args(&chain_spec, &rpc_url)?;

Ok(BuilderTaskArgs {
entry_points,
Expand All @@ -281,7 +314,6 @@ impl BuilderArgs {
redis_lock_ttl_millis: self.redis_lock_ttl_millis,
max_bundle_size: self.max_bundle_size,
max_bundle_gas: common.max_bundle_gas,
submit_url,
bundle_priority_fee_overhead_percent: common.bundle_priority_fee_overhead_percent,
priority_fee_mode,
sender_args,
Expand All @@ -294,10 +326,18 @@ impl BuilderArgs {
})
}

fn sender_args(&self, chain_spec: &ChainSpec) -> anyhow::Result<TransactionSenderArgs> {
fn sender_args(
&self,
chain_spec: &ChainSpec,
rpc_url: &str,
) -> anyhow::Result<TransactionSenderArgs> {
match self.sender_type {
TransactionSenderKind::Raw => Ok(TransactionSenderArgs::Raw),
TransactionSenderKind::Conditional => Ok(TransactionSenderArgs::Conditional),
TransactionSenderKind::Raw => Ok(TransactionSenderArgs::Raw(RawSenderArgs {
submit_url: self.submit_url.clone().unwrap_or_else(|| rpc_url.into()),
use_submit_for_status: self.use_submit_for_status,
dropped_status_supported: !self.dropped_status_unsupported,
use_conditional_rpc: self.use_conditional_rpc,
})),
TransactionSenderKind::Flashbots => {
if !chain_spec.flashbots_enabled {
return Err(anyhow::anyhow!("Flashbots sender is not enabled for chain"));
Expand Down
94 changes: 90 additions & 4 deletions crates/builder/src/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use rundler_utils::{emit::WithEntryPoint, math};
use tokio::{sync::broadcast, try_join};
use tracing::{error, info, warn};

use crate::emit::{BuilderEvent, OpRejectionReason, SkipReason};
use crate::emit::{BuilderEvent, ConditionNotMetReason, OpRejectionReason, SkipReason};

/// Extra buffer percent to add on the bundle transaction gas estimate to be sure it will be enough
const BUNDLE_TRANSACTION_GAS_OVERHEAD_PERCENT: u64 = 5;
Expand Down Expand Up @@ -101,7 +101,7 @@ pub(crate) trait BundleProposer: Send + Sync + 'static {
/// If `min_fees` is `Some`, the proposer will ensure the bundle has
/// at least `min_fees`.
async fn make_bundle(
&self,
&mut self,
min_fees: Option<GasFees>,
is_replacement: bool,
) -> anyhow::Result<Bundle<Self::UO>>;
Expand All @@ -111,6 +111,9 @@ pub(crate) trait BundleProposer: Send + Sync + 'static {
/// If `min_fees` is `Some`, the proposer will ensure the gas fees returned are at least `min_fees`.
async fn estimate_gas_fees(&self, min_fees: Option<GasFees>)
-> anyhow::Result<(GasFees, U256)>;

/// Notifies the proposer that a condition was not met during the last bundle proposal
fn notify_condition_not_met(&mut self);
}

#[derive(Debug)]
Expand All @@ -123,6 +126,7 @@ pub(crate) struct BundleProposerImpl<UO, S, E, P, M> {
settings: Settings,
fee_estimator: FeeEstimator<P>,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
condition_not_met_notified: bool,
_uo_type: PhantomData<UO>,
}

Expand Down Expand Up @@ -155,8 +159,12 @@ where
self.fee_estimator.required_bundle_fees(required_fees).await
}

fn notify_condition_not_met(&mut self) {
self.condition_not_met_notified = true;
}

async fn make_bundle(
&self,
&mut self,
required_fees: Option<GasFees>,
is_replacement: bool,
) -> anyhow::Result<Bundle<UO>> {
Expand Down Expand Up @@ -238,6 +246,16 @@ where
gas_estimate
);

// If recently notified that a bundle condition was not met, check each of
// the conditions again to ensure if they are met, rejecting OPs if they are not.
if self.condition_not_met_notified {
self.condition_not_met_notified = false;
self.check_conditions_met(&mut context).await?;
if context.is_empty() {
break;
}
}

let mut expected_storage = ExpectedStorage::default();
for op in context.iter_ops_with_simulations() {
expected_storage.merge(&op.simulation.expected_storage)?;
Expand Down Expand Up @@ -295,6 +313,7 @@ where
),
settings,
event_sender,
condition_not_met_notified: false,
_uo_type: PhantomData,
}
}
Expand Down Expand Up @@ -549,6 +568,73 @@ where
context
}

async fn check_conditions_met(&self, context: &mut ProposalContext<UO>) -> anyhow::Result<()> {
let futs = context
.iter_ops_with_simulations()
.enumerate()
.map(|(i, op)| async move {
self.check_op_conditions_met(&op.simulation.expected_storage)
.await
.map(|reason| (i, reason))
})
.collect::<Vec<_>>();

let to_reject = future::join_all(futs).await.into_iter().flatten();

for (index, reason) in to_reject {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.op_hash(&context.get_op_at(index)?.op),
OpRejectionReason::ConditionNotMet(reason),
));
self.reject_index(context, index).await;
}

Ok(())
}

async fn check_op_conditions_met(
&self,
expected_storage: &ExpectedStorage,
) -> Option<ConditionNotMetReason> {
let futs = expected_storage
.0
.iter()
.map(|(address, slots)| async move {
let storage = match self
.provider
.batch_get_storage_at(*address, slots.keys().copied().collect())
.await
{
Ok(storage) => storage,
Err(e) => {
error!("Error getting storage for address {address:?} failing open: {e:?}");
return None;
}
};

for ((slot, expected), actual) in slots.iter().zip(storage) {
if *expected != actual {
return Some(ConditionNotMetReason {
address: *address,
slot: *slot,
expected: *expected,
actual,
});
}
}
None
});

let results = future::join_all(futs).await;
for result in results {
if result.is_some() {
return result;
}
}
None
}

async fn reject_index(&self, context: &mut ProposalContext<UO>, i: usize) {
let changed_aggregator = context.reject_index(i);
self.compute_aggregator_signatures(context, &changed_aggregator)
Expand Down Expand Up @@ -2154,7 +2240,7 @@ mod tests {
.expect_aggregate_signatures()
.returning(move |address, _| Ok(signatures_by_aggregator[&address]().unwrap()));
let (event_sender, _) = broadcast::channel(16);
let proposer = BundleProposerImpl::new(
let mut proposer = BundleProposerImpl::new(
0,
pool_client,
simulator,
Expand Down
27 changes: 24 additions & 3 deletions crates/builder/src/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ enum SendBundleAttemptResult {
NoOperations,
// Replacement Underpriced
ReplacementUnderpriced,
// Condition not met
ConditionNotMet,
// Nonce too low
NonceTooLow,
}
Expand Down Expand Up @@ -255,6 +257,11 @@ where
info!("Replacement transaction underpriced, entering cancellation loop");
state.update(InnerState::Cancelling(inner.to_cancelling()));
}
Ok(SendBundleAttemptResult::ConditionNotMet) => {
info!("Condition not met, notifying proposer and starting new bundle attempt");
self.proposer.notify_condition_not_met();
state.reset();
}
Err(error) => {
error!("Bundle send error {error:?}");
self.metrics.increment_bundle_txns_failed();
Expand Down Expand Up @@ -487,14 +494,20 @@ where
Ok(SendBundleAttemptResult::Success)
}
Err(TransactionTrackerError::NonceTooLow) => {
warn!("Replacement transaction underpriced");
self.metrics.increment_bundle_txn_nonce_too_low();
warn!("Bundle attempt nonce too low");
Ok(SendBundleAttemptResult::NonceTooLow)
}
Err(TransactionTrackerError::ReplacementUnderpriced) => {
self.metrics.increment_bundle_txn_replacement_underpriced();
warn!("Replacement transaction underpriced");
warn!("Bundle attempt replacement transaction underpriced");
Ok(SendBundleAttemptResult::ReplacementUnderpriced)
}
Err(TransactionTrackerError::ConditionNotMet) => {
self.metrics.increment_bundle_txn_condition_not_met();
warn!("Bundle attempt condition not met");
Ok(SendBundleAttemptResult::ConditionNotMet)
}
Err(e) => {
error!("Failed to send bundle with unexpected error: {e:?}");
Err(e.into())
Expand All @@ -505,7 +518,7 @@ where
/// Builds a bundle and returns some metadata and the transaction to send
/// it, or `None` if there are no valid operations available.
async fn get_bundle_tx(
&self,
&mut self,
nonce: U256,
required_fees: Option<GasFees>,
is_replacement: bool,
Expand Down Expand Up @@ -964,6 +977,14 @@ impl BuilderMetrics {
metrics::counter!("builder_bundle_replacement_underpriced", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1);
}

fn increment_bundle_txn_nonce_too_low(&self) {
metrics::counter!("builder_bundle_nonce_too_low", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1);
}

fn increment_bundle_txn_condition_not_met(&self) {
metrics::counter!("builder_bundle_condition_not_met", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1);
}

fn increment_cancellation_txns_sent(&self) {
metrics::counter!("builder_cancellation_txns_sent", "entry_point" => self.entry_point.to_string(), "builder_index" => self.builder_index.to_string()).increment(1);
}
Expand Down
11 changes: 11 additions & 0 deletions crates/builder/src/emit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ pub enum OpRejectionReason {
FailedRevalidation { error: SimulationError },
/// Operation reverted during bundle formation simulation with message
FailedInBundle { message: Arc<String> },
/// Operation's storage slot condition was not met
ConditionNotMet(ConditionNotMetReason),
}

/// Reason for a condition not being met
#[derive(Clone, Debug)]
pub struct ConditionNotMetReason {
pub address: Address,
pub slot: H256,
pub expected: H256,
pub actual: H256,
}

impl Display for BuilderEvent {
Expand Down
3 changes: 2 additions & 1 deletion crates/builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pub use emit::{BuilderEvent, BuilderEventKind};

mod sender;
pub use sender::{
BloxrouteSenderArgs, FlashbotsSenderArgs, TransactionSenderArgs, TransactionSenderKind,
BloxrouteSenderArgs, FlashbotsSenderArgs, RawSenderArgs, TransactionSenderArgs,
TransactionSenderKind,
};

mod server;
Expand Down
Loading
Loading