diff --git a/Cargo.lock b/Cargo.lock index 3909e2f..702c1f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,11 +54,13 @@ dependencies = [ "alloy-genesis", "alloy-network", "alloy-provider", + "alloy-pubsub", "alloy-rpc-client", "alloy-rpc-types", "alloy-serde", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "reqwest", ] @@ -197,11 +199,13 @@ dependencies = [ "alloy-json-rpc", "alloy-network", "alloy-primitives", + "alloy-pubsub", "alloy-rpc-client", "alloy-rpc-types", "alloy-rpc-types-trace", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "async-stream", "async-trait", "auto_impl", @@ -215,6 +219,24 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-pubsub" +version = "0.1.0" +source = "git+https://github.com/alloy-rs/alloy?rev=bfd0fda#bfd0fda492e560c3463d521958793c81bbeadfc1" +dependencies = [ + "alloy-json-rpc", + "alloy-primitives", + "alloy-transport", + "bimap", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower", + "tracing", +] + [[package]] name = "alloy-rlp" version = "0.3.4" @@ -243,8 +265,11 @@ version = "0.1.0" source = "git+https://github.com/alloy-rs/alloy?rev=bfd0fda#bfd0fda492e560c3463d521958793c81bbeadfc1" dependencies = [ "alloy-json-rpc", + "alloy-primitives", + "alloy-pubsub", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "futures", "pin-project", "reqwest", @@ -395,6 +420,22 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport-ws" +version = "0.1.0" +source = "git+https://github.com/alloy-rs/alloy?rev=bfd0fda#bfd0fda492e560c3463d521958793c81bbeadfc1" +dependencies = [ + "alloy-pubsub", + "alloy-transport", + "futures", + "http 0.2.12", + "serde_json", + "tokio", + "tokio-tungstenite 0.20.1", + "tracing", + "ws_stream_wasm", +] + [[package]] name = "anstream" version = "0.6.13" @@ -612,6 +653,17 @@ dependencies = [ "syn 2.0.59", ] +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version 0.4.0", +] + [[package]] name = "auto_impl" version = "1.2.0" @@ -799,6 +851,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bit-set" version = "0.5.3" @@ -1214,6 +1272,7 @@ dependencies = [ "tracing", "tracing-opentelemetry", "tracing-subscriber", + "url", "warp", ] @@ -2223,6 +2282,16 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version 0.4.0", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -2536,6 +2605,21 @@ dependencies = [ "subtle", ] +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rlp" version = "0.5.2" @@ -2619,6 +2703,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -2635,6 +2731,16 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.15" @@ -2680,6 +2786,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sec1" version = "0.7.3" @@ -2741,6 +2857,12 @@ dependencies = [ "pest", ] +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "serde" version = "1.0.197" @@ -3079,6 +3201,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -3091,6 +3223,21 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "rustls", + "tokio", + "tokio-rustls", + "tungstenite 0.20.1", + "webpki-roots", +] + [[package]] name = "tokio-tungstenite" version = "0.21.0" @@ -3100,7 +3247,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.21.0", ] [[package]] @@ -3291,6 +3438,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 0.2.12", + "httparse", + "log", + "rand", + "rustls", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.21.0" @@ -3370,6 +3537,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.0" @@ -3458,7 +3631,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.21.0", "tokio-util", "tower-service", "tracing", @@ -3556,6 +3729,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "winapi" version = "0.3.9" @@ -3745,6 +3924,25 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ws_stream_wasm" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5" +dependencies = [ + "async_io_stream", + "futures", + "js-sys", + "log", + "pharos", + "rustc_version 0.4.0", + "send_wrapper", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 0fd9f26..e8db24f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ alloy = { git = "https://github.com/alloy-rs/alloy", rev = "bfd0fda", features = "network", "providers", "provider-http", + "provider-ws", "rpc-client", "rpc-types-eth", ] } @@ -30,6 +31,7 @@ tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] } tracing = { version = "0.1.40", features = ["attributes"] } tracing-opentelemetry = "0.23.0" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +url = "2.5.0" warp = "0.3.7" [profile.maxperf] diff --git a/Makefile b/Makefile index 35dd591..3a5f80e 100644 --- a/Makefile +++ b/Makefile @@ -58,17 +58,6 @@ build-%: RUSTFLAGS="-C link-arg=-lgcc -Clink-arg=-static-libgcc" \ cross build --target $* --features "$(FEATURES)" --profile "$(PROFILE)" -# Unfortunately we can't easily use cross to build for Darwin because of licensing issues. -# If we wanted to, we would need to build a custom Docker image with the SDK available. -# -# Note: You must set `SDKROOT` and `MACOSX_DEPLOYMENT_TARGET`. These can be found using `xcrun`. -# -# `SDKROOT=$(xcrun -sdk macosx --show-sdk-path) MACOSX_DEPLOYMENT_TARGET=$(xcrun -sdk macosx --show-sdk-platform-version)` -build-x86_64-apple-darwin: - $(MAKE) build-native-x86_64-apple-darwin -build-aarch64-apple-darwin: - $(MAKE) build-native-aarch64-apple-darwin - # Create a `.tar.gz` containing a binary for a specific target. define tarball_release_binary cp $(BUILD_PATH)/$(1)/$(PROFILE)/$(2) $(BIN_DIR)/$(2) diff --git a/src/api.rs b/src/api.rs index d6904ae..4bf1d50 100644 --- a/src/api.rs +++ b/src/api.rs @@ -29,7 +29,10 @@ pub async fn health_handler( if state.is_healthy() { healthy_response() } else { - unhealthy_response("block is stale or intentionally failing", &state.latest_block) + unhealthy_response( + "block is stale or intentionally failing", + &state.latest_block, + ) } } diff --git a/src/main.rs b/src/main.rs index be96086..afbe410 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,10 @@ mod api; mod instrument; mod monitor; -use alloy::providers::ProviderBuilder; +use alloy::{ + providers::{Provider, ProviderBuilder}, + rpc::client::WsConnect, +}; use axum::{ routing::{get, post}, Router, @@ -10,12 +13,10 @@ use axum::{ use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; use clap::Parser; use eyre::Result; +use futures_util::StreamExt; use monitor::AppState; -use std::{ - sync::{Arc, Mutex}, - time::SystemTime, -}; -use tokio::time::{self, Duration, Instant}; +use std::sync::{Arc, Mutex}; +use url::Url; /// Monitor an Ethereum node RPC endpoint #[derive(Parser, Debug)] @@ -24,9 +25,12 @@ struct MonitorArgs { /// Listen address for the API #[clap(long, default_value = "127.0.0.1:8080")] listen: String, - /// JSON-RPC URL of the Ethereum node - #[clap(long, default_value = "http://localhost:8545")] - rpc_url: String, + /// HTTP URL of the Ethereum node + #[clap(long, value_parser=parse_url, default_value = "http://localhost:8545")] + http_rpc_url: Url, + /// Websockets URL of the Ethereum node + #[clap(long, value_parser=parse_url, default_value = "ws://localhost:8546")] + ws_rpc_url: Url, /// Block frequency that is to be expected from the Ethereum node #[clap(long, default_value = "12")] block_frequency: u64, @@ -35,16 +39,37 @@ struct MonitorArgs { tracing: bool, } +fn parse_url(s: &str) -> Result { + Url::parse(s) +} + #[tokio::main] async fn main() -> Result<()> { // Parse command line arguments let args = MonitorArgs::parse(); + // If the scheme is not http or https, return an error + if args.http_rpc_url.scheme() != "http" && args.http_rpc_url.scheme() != "https" { + return Err(eyre::eyre!( + "Invalid scheme for RPC URL: {}", + args.http_rpc_url.scheme() + )); + } + + // If the scheme is not ws or wss, return an error + if args.ws_rpc_url.scheme() != "ws" && args.ws_rpc_url.scheme() != "wss" { + return Err(eyre::eyre!( + "Invalid scheme for RPC URL: {}", + args.ws_rpc_url.scheme() + )); + } + // Initialize tracing instrument::init(args.tracing); tracing::info!( - rpc_url = args.rpc_url, + http_rpc_url = args.http_rpc_url.to_string(), + ws_rpc_url = args.ws_rpc_url.to_string(), block_frequency = args.block_frequency, "Starting Ethereum node monitor" ); @@ -57,15 +82,20 @@ async fn main() -> Result<()> { let toggle_fail_api_state = app_state.clone(); let health_api_state = app_state.clone(); let app = Router::new() - .route("/lastBlock", get(move || api::last_block_handler(last_block_api_state))) + .route( + "/lastBlock", + get(move || api::last_block_handler(last_block_api_state)), + ) .route( "/toggleFail", post(move || api::toggle_fail_handler(toggle_fail_api_state)), ) - .route("/health", get(move || api::health_handler(health_api_state))) + .route( + "/health", + get(move || api::health_handler(health_api_state)), + ) .layer(OtelInResponseLayer::default()) .layer(OtelAxumLayer::default()); - // .with_state(shared_state); // Spawn a task to run the API tokio::spawn(async move { @@ -73,24 +103,20 @@ async fn main() -> Result<()> { axum::serve(listener, app).await.unwrap(); }); - // Create a provider - let rpc_url = args.rpc_url.parse()?; - let provider = ProviderBuilder::new().on_reqwest_http(rpc_url)?; - - let block_frequency_millis = args.block_frequency * 1_000; - - loop { - AppState::poll_and_update_block(provider.clone(), app_state.clone()).await; - - let now_since_epoch = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Time went backwards") - .as_millis() as u64; - - let delay_millis = block_frequency_millis - (now_since_epoch % block_frequency_millis); - let next_tick = Instant::now() + Duration::from_millis(delay_millis) + Duration::from_secs(1); - - // Sleep until the next calculated multiple of block_frequency - time::sleep_until(next_tick).await; + // Create providers + let http_provider = ProviderBuilder::new().on_reqwest_http(args.http_rpc_url)?; + let ws = WsConnect::new(args.ws_rpc_url); + let ws_provider = ProviderBuilder::new().on_ws(ws).await?; + + // Subscribe to new blocks + let sub = ws_provider.subscribe_blocks().await?; + + let mut stream = sub.into_stream(); + + while let Some(block) = stream.next().await { + monitor::AppState::poll_and_update_block(block, &http_provider, app_state.clone()) + .await; } + + Ok(()) } diff --git a/src/monitor.rs b/src/monitor.rs index 16505df..e9074be 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -4,20 +4,17 @@ use std::{ }; use alloy::{ - network::Network, - providers::Provider, - rpc::types::eth::{Block, BlockNumberOrTag}, - transports::Transport, + network::Network, primitives::FixedBytes, providers::Provider, rpc::types::eth::{Block, BlockNumberOrTag}, transports::Transport }; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct AppState { - /// Set as an option to allow for the initial state to be None + // Set as an option to allow for the initial state to be None pub latest_block: Option, - /// Initial state is false + // Initial state is false pub fail_intentional: bool, - /// Block frequency that is to be expected from the Ethereum node + // Block frequency that is to be expected from the Ethereum node block_frequency: u64, } @@ -33,33 +30,56 @@ impl AppState { /// Poll the Ethereum node for the latest block and update the shared state /// with the latest block. - #[tracing::instrument(skip(app_state))] + #[tracing::instrument(skip_all)] pub(crate) async fn poll_and_update_block< T: Transport + Clone, P: Provider + std::fmt::Debug, N: Network, >( - provider: P, + block: Block, + provider: &P, app_state: Arc>, ) { - let block = provider - .get_block_by_number(BlockNumberOrTag::Latest, true) - .await; - let mut state = app_state.lock().unwrap(); - match block { - Ok(Some(block)) => { - state.latest_block = Some(block); - let block = state.latest_block.as_ref().unwrap(); - tracing::debug!( - block.number = block.header.number.map(|num| num.to::()), - block.timestamp = block.header.timestamp.to::(), - block.hash = block.header.hash.unwrap_or_default().to_string(), - "Updated latest block" - ); + // Before updating the latest block, check to see if it is retrievable from the + // HTTP provider. If the block is not found in the HTTP provider, log an error + // and continue to the next block. + // If the block is found in the HTTP provider, check to see if the block hash + // matches between the HTTP and WS providers. If the block hash does not match, + // log an error and continue to the next block. + let http_block = provider + .get_block_by_number( + BlockNumberOrTag::Number(block.header.number.unwrap().to::()), + true, + ) + .await + .unwrap(); + + match http_block { + Some(ref blk) => { + let ws_hash = block.header.hash.unwrap_or_default(); + let http_hash = blk.header.hash.unwrap_or_default(); + if ws_hash != http_hash && ws_hash != FixedBytes::ZERO { + tracing::error!( + ws_block = ?block, + http_block = ?http_block, + "Block hash mismatch between HTTP and WS providers" + ); + return; + } + } + None => { + tracing::error!("Block not found in HTTP provider"); + return; } - Ok(None) => tracing::error!("No block found"), - Err(e) => tracing::error!("Failed to get latest block: {:?}", e), } + + tracing::debug!( + block.number = ?block.header.number, + block.timestamp = ?block.header.timestamp, + block.hash = ?block.header.hash, + "Updating to latest block" + ); + app_state.lock().unwrap().latest_block = Some(block); } #[tracing::instrument(skip(self))]