Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sender fixes #734

Merged
merged 4 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 87 additions & 18 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

use std::{net::SocketAddr, time::Duration};

use anyhow::Context;
use anyhow::{bail, Context};
use clap::Args;
use rundler_builder::{
self, BloxrouteSenderArgs, BuilderEvent, BuilderEventKind, BuilderTask, BuilderTaskArgs,
EntryPointBuilderSettings, FlashbotsSenderArgs, LocalBuilderBuilder, TransactionSenderArgs,
TransactionSenderKind,
EntryPointBuilderSettings, FlashbotsSenderArgs, LocalBuilderBuilder, RawSenderArgs,
TransactionSenderArgs, TransactionSenderKind,
};
use rundler_pool::RemotePoolClient;
use rundler_sim::{MempoolConfigs, PriorityFeeMode};
Expand Down Expand Up @@ -57,13 +57,28 @@ pub struct BuilderArgs {
host: String,

/// Private key to use for signing transactions
/// DEPRECATED: Use `builder.private_keys` instead
///
/// If both `builder.private_key` and `builder.private_keys` are set, `builder.private_key` is appended
/// to `builder.private_keys`. Keys must be unique.
#[arg(
long = "builder.private_key",
name = "builder.private_key",
env = "BUILDER_PRIVATE_KEY"
)]
private_key: Option<String>,

/// Private keys to use for signing transactions
///
/// Cannot use both `builder.private_key` and `builder.aws_kms_key_ids` at the same time.
#[arg(
long = "builder.private_keys",
name = "builder.private_keys",
env = "BUILDER_PRIVATE_KEYS",
value_delimiter = ','
)]
private_keys: Vec<String>,

/// AWS KMS key IDs to use for signing transactions
#[arg(
long = "builder.aws_kms_key_ids",
Expand Down Expand Up @@ -115,14 +130,48 @@ pub struct BuilderArgs {
/// If present, the url of the ETH provider that will be used to send
/// transactions. Defaults to the value of `node_http`.
///
/// Only used when BUILDER_SENDER is "raw" or "conditional"
/// Only used when BUILDER_SENDER is "raw"
#[arg(
long = "builder.submit_url",
name = "builder.submit_url",
env = "BUILDER_SUBMIT_URL"
)]
pub submit_url: Option<String>,

/// If present, the url of the ETH provider that will be used to check
/// transaction status. Else will use the node http for status.
///
/// Only used when BUILDER_SENDER is "raw"
#[arg(
long = "builder.use_submit_for_status",
name = "builder.use_submit_for_status",
env = "BUILDER_USE_SUBMIT_FOR_STATUS",
default_value = "false"
)]
pub use_submit_for_status: bool,

/// Use the conditional RPC endpoint for transaction submission.
///
/// Only used when BUILDER_SENDER is "raw"
#[arg(
long = "builder.use_conditional_rpc",
name = "builder.use_conditional_rpc",
env = "BUILDER_USE_CONDITIONAL_RPC",
default_value = "false"
)]
pub use_conditional_rpc: bool,

/// If the "dropped" status is unsupported by the status provider.
///
/// Only used when BUILDER_SENDER is "raw"
#[arg(
long = "builder.dropped_status_unsupported",
name = "builder.dropped_status_unsupported",
env = "BUILDER_DROPPED_STATUS_UNSUPPORTED",
default_value = "false"
)]
pub dropped_status_unsupported: bool,

/// A list of builders to pass into the Flashbots Relay RPC.
///
/// Only used when BUILDER_SENDER is "flashbots"
Expand Down Expand Up @@ -216,7 +265,6 @@ impl BuilderArgs {
.node_http
.clone()
.context("should have a node HTTP URL")?;
let submit_url = self.submit_url.clone().unwrap_or_else(|| rpc_url.clone());

let mempool_configs = match &common.mempool_config_path {
Some(path) => get_json_config::<MempoolConfigs>(path, &common.aws_region)
Expand Down Expand Up @@ -251,27 +299,41 @@ impl BuilderArgs {
num_builders += common.num_builders_v0_7;
}

if self.private_key.is_some() {
if num_builders > 1 {
return Err(anyhow::anyhow!(
"Cannot use a private key with multiple builders. You may need to disable one of the entry points."
));
if (self.private_key.is_some() || !self.private_keys.is_empty())
&& !self.aws_kms_key_ids.is_empty()
{
bail!(
"Cannot use both builder.private_key(s) and builder.aws_kms_key_ids at the same time."
);
}

let mut private_keys = self.private_keys.clone();
if self.private_key.is_some() || !self.private_keys.is_empty() {
if let Some(pk) = &self.private_key {
private_keys.push(pk.clone());
}

if num_builders > private_keys.len() as u64 {
bail!(
"Found {} private keys, but need {} keys for the number of builders. You may need to disable one of the entry points.",
private_keys.len(), num_builders
);
}
} else if self.aws_kms_key_ids.len() < num_builders as usize {
return Err(anyhow::anyhow!(
bail!(
"Not enough AWS KMS key IDs for the number of builders. Need {} keys, found {}. You may need to disable one of the entry points.",
num_builders, self.aws_kms_key_ids.len()
));
);
}

let sender_args = self.sender_args(&chain_spec)?;
let sender_args = self.sender_args(&chain_spec, &rpc_url)?;

Ok(BuilderTaskArgs {
entry_points,
chain_spec,
unsafe_mode: common.unsafe_mode,
rpc_url,
private_key: self.private_key.clone(),
private_keys,
aws_kms_key_ids: self.aws_kms_key_ids.clone(),
aws_kms_region: common
.aws_region
Expand All @@ -281,7 +343,6 @@ impl BuilderArgs {
redis_lock_ttl_millis: self.redis_lock_ttl_millis,
max_bundle_size: self.max_bundle_size,
max_bundle_gas: common.max_bundle_gas,
submit_url,
bundle_priority_fee_overhead_percent: common.bundle_priority_fee_overhead_percent,
priority_fee_mode,
sender_args,
Expand All @@ -294,10 +355,18 @@ impl BuilderArgs {
})
}

fn sender_args(&self, chain_spec: &ChainSpec) -> anyhow::Result<TransactionSenderArgs> {
fn sender_args(
&self,
chain_spec: &ChainSpec,
rpc_url: &str,
) -> anyhow::Result<TransactionSenderArgs> {
match self.sender_type {
TransactionSenderKind::Raw => Ok(TransactionSenderArgs::Raw),
TransactionSenderKind::Conditional => Ok(TransactionSenderArgs::Conditional),
TransactionSenderKind::Raw => Ok(TransactionSenderArgs::Raw(RawSenderArgs {
submit_url: self.submit_url.clone().unwrap_or_else(|| rpc_url.into()),
use_submit_for_status: self.use_submit_for_status,
dropped_status_supported: !self.dropped_status_unsupported,
use_conditional_rpc: self.use_conditional_rpc,
})),
TransactionSenderKind::Flashbots => {
if !chain_spec.flashbots_enabled {
return Err(anyhow::anyhow!("Flashbots sender is not enabled for chain"));
Expand Down
94 changes: 90 additions & 4 deletions crates/builder/src/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use rundler_utils::{emit::WithEntryPoint, math};
use tokio::{sync::broadcast, try_join};
use tracing::{error, info, warn};

use crate::emit::{BuilderEvent, OpRejectionReason, SkipReason};
use crate::emit::{BuilderEvent, ConditionNotMetReason, OpRejectionReason, SkipReason};

/// Extra buffer percent to add on the bundle transaction gas estimate to be sure it will be enough
const BUNDLE_TRANSACTION_GAS_OVERHEAD_PERCENT: u64 = 5;
Expand Down Expand Up @@ -101,7 +101,7 @@ pub(crate) trait BundleProposer: Send + Sync + 'static {
/// If `min_fees` is `Some`, the proposer will ensure the bundle has
/// at least `min_fees`.
async fn make_bundle(
&self,
&mut self,
min_fees: Option<GasFees>,
is_replacement: bool,
) -> anyhow::Result<Bundle<Self::UO>>;
Expand All @@ -111,6 +111,9 @@ pub(crate) trait BundleProposer: Send + Sync + 'static {
/// If `min_fees` is `Some`, the proposer will ensure the gas fees returned are at least `min_fees`.
async fn estimate_gas_fees(&self, min_fees: Option<GasFees>)
-> anyhow::Result<(GasFees, U256)>;

/// Notifies the proposer that a condition was not met during the last bundle proposal
fn notify_condition_not_met(&mut self);
}

#[derive(Debug)]
Expand All @@ -123,6 +126,7 @@ pub(crate) struct BundleProposerImpl<UO, S, E, P, M> {
settings: Settings,
fee_estimator: FeeEstimator<P>,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
condition_not_met_notified: bool,
_uo_type: PhantomData<UO>,
}

Expand Down Expand Up @@ -155,8 +159,12 @@ where
self.fee_estimator.required_bundle_fees(required_fees).await
}

fn notify_condition_not_met(&mut self) {
self.condition_not_met_notified = true;
}

async fn make_bundle(
&self,
&mut self,
required_fees: Option<GasFees>,
is_replacement: bool,
) -> anyhow::Result<Bundle<UO>> {
Expand Down Expand Up @@ -238,6 +246,16 @@ where
gas_estimate
);

// If recently notified that a bundle condition was not met, check each of
// the conditions again to ensure if they are met, rejecting OPs if they are not.
if self.condition_not_met_notified {
self.condition_not_met_notified = false;
self.check_conditions_met(&mut context).await?;
if context.is_empty() {
break;
}
}

let mut expected_storage = ExpectedStorage::default();
for op in context.iter_ops_with_simulations() {
expected_storage.merge(&op.simulation.expected_storage)?;
Expand Down Expand Up @@ -295,6 +313,7 @@ where
),
settings,
event_sender,
condition_not_met_notified: false,
_uo_type: PhantomData,
}
}
Expand Down Expand Up @@ -549,6 +568,73 @@ where
context
}

async fn check_conditions_met(&self, context: &mut ProposalContext<UO>) -> anyhow::Result<()> {
let futs = context
.iter_ops_with_simulations()
.enumerate()
.map(|(i, op)| async move {
self.check_op_conditions_met(&op.simulation.expected_storage)
.await
.map(|reason| (i, reason))
})
.collect::<Vec<_>>();

let to_reject = future::join_all(futs).await.into_iter().flatten();

for (index, reason) in to_reject {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.op_hash(&context.get_op_at(index)?.op),
OpRejectionReason::ConditionNotMet(reason),
));
self.reject_index(context, index).await;
}

Ok(())
}

async fn check_op_conditions_met(
&self,
expected_storage: &ExpectedStorage,
) -> Option<ConditionNotMetReason> {
let futs = expected_storage
.0
.iter()
.map(|(address, slots)| async move {
let storage = match self
.provider
.batch_get_storage_at(*address, slots.keys().copied().collect())
.await
{
Ok(storage) => storage,
Err(e) => {
error!("Error getting storage for address {address:?} failing open: {e:?}");
return None;
}
};

for ((slot, expected), actual) in slots.iter().zip(storage) {
if *expected != actual {
return Some(ConditionNotMetReason {
address: *address,
slot: *slot,
expected: *expected,
actual,
});
}
}
None
});

let results = future::join_all(futs).await;
for result in results {
if result.is_some() {
return result;
}
}
None
}

async fn reject_index(&self, context: &mut ProposalContext<UO>, i: usize) {
let changed_aggregator = context.reject_index(i);
self.compute_aggregator_signatures(context, &changed_aggregator)
Expand Down Expand Up @@ -2154,7 +2240,7 @@ mod tests {
.expect_aggregate_signatures()
.returning(move |address, _| Ok(signatures_by_aggregator[&address]().unwrap()));
let (event_sender, _) = broadcast::channel(16);
let proposer = BundleProposerImpl::new(
let mut proposer = BundleProposerImpl::new(
0,
pool_client,
simulator,
Expand Down
Loading
Loading