diff --git a/Cargo.lock b/Cargo.lock index d7b7a618..85d9f86c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5252,9 +5252,9 @@ dependencies = [ name = "rundler-task" version = "0.3.0" dependencies = [ + "alloy-primitives", "anyhow", "async-trait", - "ethers", "futures", "metrics", "pin-project", diff --git a/crates/task/Cargo.toml b/crates/task/Cargo.toml index 3cd2bd8b..e965bb24 100644 --- a/crates/task/Cargo.toml +++ b/crates/task/Cargo.toml @@ -11,10 +11,11 @@ publish = false rundler-provider = { path = "../provider" } rundler-utils = { path = "../utils" } +alloy-primitives.workspace = true + anyhow.workspace = true async-trait.workspace = true futures.workspace = true -ethers.workspace = true pin-project.workspace = true metrics.workspace = true tokio.workspace = true diff --git a/crates/task/src/block_watcher.rs b/crates/task/src/block_watcher.rs index 153352f3..19a92538 100644 --- a/crates/task/src/block_watcher.rs +++ b/crates/task/src/block_watcher.rs @@ -15,8 +15,8 @@ use std::time::Duration; -use ethers::types::{Block, BlockNumber, H256}; -use rundler_provider::Provider; +use alloy_primitives::B256; +use rundler_provider::{Block, BlockId, EvmProvider}; use rundler_utils::retry::{self, UnlimitedRetryOpts}; use tokio::time; use tracing::error; @@ -26,14 +26,14 @@ use tracing::error; /// This function polls the provider for the latest block until a new block is discovered, with /// unlimited retries. pub async fn wait_for_new_block( - provider: &impl Provider, - last_block_hash: H256, + provider: &impl EvmProvider, + last_block_hash: B256, poll_interval: Duration, -) -> (H256, Block) { +) -> (B256, Block) { loop { let block = retry::with_unlimited_retries( "watch latest block", - || provider.get_block(BlockNumber::Latest), + || provider.get_block(BlockId::latest()), UnlimitedRetryOpts::default(), ) .await; @@ -41,12 +41,8 @@ pub async fn wait_for_new_block( error!("Latest block should be present when waiting for new block."); continue; }; - let Some(hash) = block.hash else { - error!("Latest block should have hash."); - continue; - }; - if last_block_hash != hash { - return (hash, block); + if last_block_hash != block.header.hash { + return (block.header.hash, block); } time::sleep(poll_interval).await; } @@ -57,7 +53,7 @@ pub async fn wait_for_new_block( /// This function polls the provider for the latest block number until a new block number is discovered, /// with unlimited retries. pub async fn wait_for_new_block_number( - provider: &impl Provider, + provider: &impl EvmProvider, last_block_number: u64, poll_interval: Duration, ) -> u64 { diff --git a/crates/task/src/grpc/protos.rs b/crates/task/src/grpc/protos.rs index bb7966f2..f3b20944 100644 --- a/crates/task/src/grpc/protos.rs +++ b/crates/task/src/grpc/protos.rs @@ -13,7 +13,7 @@ //! Protobuf utilities -use ethers::types::{Address, Bytes, H256, U128, U256}; +use alloy_primitives::{Address, Bytes, B256, U128, U256}; /// Error type for conversions from protobuf types to Ethers/local types. #[derive(Debug, thiserror::Error)] @@ -79,7 +79,7 @@ impl FromFixedLengthProtoBytes for U128 { const LEN: usize = 16; fn from_fixed_length_bytes(bytes: &[u8]) -> Self { - Self::from_little_endian(bytes) + Self::from_le_slice(bytes) } } @@ -87,11 +87,11 @@ impl FromFixedLengthProtoBytes for U256 { const LEN: usize = 32; fn from_fixed_length_bytes(bytes: &[u8]) -> Self { - Self::from_little_endian(bytes) + Self::from_le_slice(bytes) } } -impl FromFixedLengthProtoBytes for H256 { +impl FromFixedLengthProtoBytes for B256 { const LEN: usize = 32; fn from_fixed_length_bytes(bytes: &[u8]) -> Self { @@ -99,6 +99,15 @@ impl FromFixedLengthProtoBytes for H256 { } } +impl FromFixedLengthProtoBytes for u128 { + const LEN: usize = 16; + + fn from_fixed_length_bytes(bytes: &[u8]) -> Self { + let (int_bytes, _) = bytes.split_at(std::mem::size_of::()); + u128::from_le_bytes(int_bytes.try_into().unwrap()) + } +} + /// Trait for a type that can be converted to protobuf bytes. pub trait ToProtoBytes { /// Convert to protobuf bytes. @@ -107,29 +116,25 @@ pub trait ToProtoBytes { impl ToProtoBytes for Address { fn to_proto_bytes(&self) -> Vec { - self.as_bytes().to_vec() + self.to_vec() } } impl ToProtoBytes for U128 { fn to_proto_bytes(&self) -> Vec { - let mut vec = vec![0_u8; 16]; - self.to_little_endian(&mut vec); - vec + self.to_le_bytes::<16>().into() } } impl ToProtoBytes for U256 { fn to_proto_bytes(&self) -> Vec { - let mut vec = vec![0_u8; 32]; - self.to_little_endian(&mut vec); - vec + self.to_le_bytes::<32>().into() } } -impl ToProtoBytes for H256 { +impl ToProtoBytes for B256 { fn to_proto_bytes(&self) -> Vec { - self.as_bytes().to_vec() + self.to_vec() } } @@ -138,3 +143,9 @@ impl ToProtoBytes for Bytes { self.to_vec() } } + +impl ToProtoBytes for u128 { + fn to_proto_bytes(&self) -> Vec { + self.to_le_bytes().into() + } +} diff --git a/crates/task/src/task.rs b/crates/task/src/task.rs index ab6c0342..80b984a8 100644 --- a/crates/task/src/task.rs +++ b/crates/task/src/task.rs @@ -22,6 +22,9 @@ use tracing::{error, info}; /// Core task trait implemented by top level Rundler tasks. #[async_trait] pub trait Task: Sync + Send + 'static { + /// Convert into a boxed task. + fn boxed(self) -> Box; + /// Run the task. async fn run(self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()>; }