diff --git a/Cargo.lock b/Cargo.lock index 53978a0..6a40ffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,7 +1892,6 @@ name = "ethlambda" version = "0.1.0" dependencies = [ "clap", - "ethereum-types", "ethlambda-blockchain", "ethlambda-p2p", "ethlambda-rpc", @@ -1907,7 +1906,6 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", - "vergen", "vergen-git2", ] @@ -7363,7 +7361,6 @@ dependencies = [ "derive_builder", "rustc_version 0.4.1", "rustversion", - "time", "vergen-lib", ] diff --git a/Cargo.toml b/Cargo.toml index 5bfbc03..fa3812d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,6 @@ tree_hash = "0.9.1" tree_hash_derive = "0.9.1" # Build-time version info -vergen = { version = "9", features = ["build", "rustc"] } -vergen-git2 = "9" +vergen-git2 = { version = "9", features = ["rustc"] } rand = "0.9" diff --git a/bin/ethlambda/Cargo.toml b/bin/ethlambda/Cargo.toml index 4f58e68..ef20137 100644 --- a/bin/ethlambda/Cargo.toml +++ b/bin/ethlambda/Cargo.toml @@ -23,9 +23,7 @@ serde_json.workspace = true serde_yaml_ng = "0.10" hex.workspace = true -ethereum-types.workspace = true clap.workspace = true [build-dependencies] -vergen.workspace = true vergen-git2.workspace = true diff --git a/bin/ethlambda/build.rs b/bin/ethlambda/build.rs index 9fcf812..ad4184e 100644 --- a/bin/ethlambda/build.rs +++ b/bin/ethlambda/build.rs @@ -1,5 +1,4 @@ -use vergen::{Emitter, RustcBuilder}; -use vergen_git2::Git2Builder; +use vergen_git2::{Emitter, Git2Builder, RustcBuilder}; fn main() -> Result<(), Box> { let git2 = Git2Builder::default().branch(true).sha(true).build()?; diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index d951e45..0b68239 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -9,6 +9,7 @@ use std::{ use clap::Parser; use ethlambda_p2p::{Bootnode, parse_enrs, start_p2p}; use ethlambda_rpc::metrics::start_prometheus_metrics_api; +use ethlambda_types::primitives::H256; use ethlambda_types::{ genesis::Genesis, signature::ValidatorSecretKey, @@ -73,12 +74,16 @@ async fn main() { let validators_path = options .custom_network_config_dir .join("annotated_validators.yaml"); + let validator_config = options + .custom_network_config_dir + .join("validator-config.yaml"); let validator_keys_dir = options.custom_network_config_dir.join("hash-sig-keys"); let genesis_json = std::fs::read_to_string(&genesis_path).expect("Failed to read genesis.json"); let genesis: Genesis = serde_json::from_str(&genesis_json).expect("Failed to parse genesis.json"); + populate_name_registry(&validator_config); let bootnodes = read_bootnodes(&bootnodes_path); let validators = read_validators(&validators_path); @@ -113,6 +118,31 @@ async fn main() { println!("Shutting down..."); } +fn populate_name_registry(validator_config: impl AsRef) { + #[derive(Deserialize)] + struct Validator { + name: String, + privkey: H256, + } + #[derive(Deserialize)] + struct Config { + validators: Vec, + } + let config_yaml = + std::fs::read_to_string(&validator_config).expect("Failed to read validator config file"); + let config: Config = + serde_yaml_ng::from_str(&config_yaml).expect("Failed to parse validator config file"); + + let names_and_privkeys = config + .validators + .into_iter() + .map(|v| (v.name, v.privkey)) + .collect(); + + // Populates a dictionary used for labeling metrics with node names + ethlambda_p2p::metrics::populate_name_registry(names_and_privkeys); +} + fn read_bootnodes(bootnodes_path: impl AsRef) -> Vec { let bootnodes_yaml = std::fs::read_to_string(bootnodes_path).expect("Failed to read bootnodes file"); diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 6024693..25b9fc2 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -19,7 +19,7 @@ use libp2p::{ use sha2::Digest; use ssz::Encode; use tokio::sync::mpsc; -use tracing::{info, trace}; +use tracing::{info, trace, warn}; use crate::{ gossipsub::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND}, @@ -31,6 +31,9 @@ use crate::{ mod gossipsub; mod messages; +pub mod metrics; + +pub use metrics::populate_name_registry; pub async fn start_p2p( node_key: Vec, @@ -66,7 +69,6 @@ pub async fn start_p2p( .build() .expect("invalid gossipsub config"); - // TODO: setup custom message ID function let gossipsub = libp2p::gossipsub::Behaviour::new(MessageAuthenticity::Anonymous, config) .expect("failed to initiate behaviour"); @@ -100,12 +102,17 @@ pub async fn start_p2p( config.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) }) .build(); + let local_peer_id = *swarm.local_peer_id(); for bootnode in bootnodes { + let peer_id = PeerId::from_public_key(&bootnode.public_key); + if peer_id == local_peer_id { + continue; + } let addr = Multiaddr::empty() .with(bootnode.ip.into()) .with(Protocol::Udp(bootnode.quic_port)) .with(Protocol::QuicV1) - .with_p2p(PeerId::from_public_key(&bootnode.public_key)) + .with_p2p(peer_id) .expect("failed to add peer ID to multiaddr"); swarm.dial(addr).unwrap(); } @@ -179,6 +186,58 @@ async fn event_loop( )) => { gossipsub::handle_gossipsub_message(&mut blockchain, message).await; } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + .. + } => { + let direction = connection_direction(&endpoint); + if num_established.get() == 1 { + metrics::notify_peer_connected(&Some(peer_id), direction, "success"); + } + info!(%peer_id, %direction, "Peer connected"); + } + SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + num_established, + cause, + .. + } => { + let direction = connection_direction(&endpoint); + let reason = match cause { + None => "remote_close", + Some(err) => { + // Categorize disconnection reasons + let err_str = err.to_string().to_lowercase(); + if err_str.contains("timeout") || err_str.contains("timedout") || err_str.contains("keepalive") { + "timeout" + } else if err_str.contains("reset") || err_str.contains("connectionreset") { + "remote_close" + } else { + "error" + } + } + }; + if num_established == 0 { + metrics::notify_peer_disconnected(&Some(peer_id), direction, reason); + } + info!(%peer_id, %direction, %reason, "Peer disconnected"); + } + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + let result = if error.to_string().to_lowercase().contains("timed out") { + "timeout" + } else { + "error" + }; + metrics::notify_peer_connected(&peer_id, "outbound", result); + warn!(?peer_id, %error, "Outgoing connection error"); + } + SwarmEvent::IncomingConnectionError { peer_id, error, .. } => { + metrics::notify_peer_connected(&peer_id, "inbound", "error"); + warn!(%error, "Incoming connection error"); + } _ => { trace!(?event, "Ignored swarm event"); } @@ -310,6 +369,14 @@ pub fn parse_enrs(enrs: Vec) -> Vec { bootnodes } +fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str { + if endpoint.is_dialer() { + "outbound" + } else { + "inbound" + } +} + fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId { const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs new file mode 100644 index 0000000..ba96a63 --- /dev/null +++ b/crates/net/p2p/src/metrics.rs @@ -0,0 +1,97 @@ +//! Prometheus metrics for the P2P network layer. + +use std::{ + collections::HashMap, + sync::{LazyLock, RwLock}, +}; + +use ethlambda_types::primitives::H256; +use libp2p::{ + PeerId, + identity::{Keypair, secp256k1}, +}; +use prometheus::{IntCounterVec, IntGaugeVec, register_int_counter_vec, register_int_gauge_vec}; + +static NODE_NAME_REGISTRY: LazyLock>> = + LazyLock::new(|| RwLock::new(HashMap::new())); + +pub fn populate_name_registry(names_and_privkeys: HashMap) { + let mut registry = NODE_NAME_REGISTRY.write().unwrap(); + let name_registry = names_and_privkeys + .into_iter() + .filter_map(|(name, mut privkey)| { + let Ok(privkey) = secp256k1::SecretKey::try_from_bytes(&mut privkey) else { + return None; + }; + let pubkey = Keypair::from(secp256k1::Keypair::from(privkey)).public(); + let peer_id = PeerId::from_public_key(&pubkey); + // NOTE: we leak the name string to get a 'static lifetime. + // In reality, the name registry is not expected to be read, so it should be safe + // to turn these strings to &'static str. + Some((peer_id, &*name.leak())) + }) + .collect(); + *registry = name_registry; +} + +fn resolve(peer_id: &Option) -> &'static str { + let registry = NODE_NAME_REGISTRY.read().unwrap(); + peer_id + .as_ref() + .and_then(|peer_id| registry.get(peer_id)) + .unwrap_or(&"unknown") +} + +static LEAN_CONNECTED_PEERS: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "lean_connected_peers", + "Number of connected peers", + &["client"] + ) + .unwrap() +}); + +static LEAN_PEER_CONNECTION_EVENTS_TOTAL: LazyLock = LazyLock::new(|| { + register_int_counter_vec!( + "lean_peer_connection_events_total", + "Total number of peer connection events", + &["direction", "result"] + ) + .unwrap() +}); + +static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock = LazyLock::new(|| { + register_int_counter_vec!( + "lean_peer_disconnection_events_total", + "Total number of peer disconnection events", + &["direction", "reason"] + ) + .unwrap() +}); + +/// Notify that a peer connection event occurred. +/// +/// If `result` is "success", the connected peer count is incremented. +/// The connection event counter is always incremented. +pub fn notify_peer_connected(peer_id: &Option, direction: &str, result: &str) { + LEAN_PEER_CONNECTION_EVENTS_TOTAL + .with_label_values(&[direction, result]) + .inc(); + + if result == "success" { + let name = resolve(peer_id); + LEAN_CONNECTED_PEERS.with_label_values(&[name]).inc(); + } +} + +/// Notify that a peer disconnected. +/// +/// Decrements the connected peer count and increments the disconnection event counter. +pub fn notify_peer_disconnected(peer_id: &Option, direction: &str, reason: &str) { + LEAN_PEER_DISCONNECTION_EVENTS_TOTAL + .with_label_values(&[direction, reason]) + .inc(); + + let name = resolve(peer_id); + LEAN_CONNECTED_PEERS.with_label_values(&[name]).dec(); +} diff --git a/crates/net/rpc/src/metrics.rs b/crates/net/rpc/src/metrics.rs index 075d84c..62a85d1 100644 --- a/crates/net/rpc/src/metrics.rs +++ b/crates/net/rpc/src/metrics.rs @@ -1,13 +1,13 @@ use std::net::SocketAddr; -use axum::{Router, routing::get}; +use axum::{Router, http::HeaderValue, response::IntoResponse, routing::get}; use thiserror::Error; use tracing::warn; pub async fn start_prometheus_metrics_api(address: SocketAddr) -> Result<(), std::io::Error> { let app = Router::new() .route("/metrics", get(get_metrics)) - .route("/health", get("Service Up")); + .route("/health", get(get_health)); // Start the axum app let listener = tokio::net::TcpListener::bind(address).await?; @@ -16,12 +16,20 @@ pub async fn start_prometheus_metrics_api(address: SocketAddr) -> Result<(), std Ok(()) } -pub(crate) async fn get_metrics() -> String { - gather_default_metrics() +pub(crate) async fn get_health() -> impl IntoResponse { + r#"{"status": "healthy", "service": "lean-spec-api"}"# +} + +pub(crate) async fn get_metrics() -> impl IntoResponse { + let mut response = gather_default_metrics() .inspect_err(|err| { warn!(%err, "Failed to gather Prometheus metrics"); }) .unwrap_or_default() + .into_response(); + let content_type = HeaderValue::from_static("text/plain; version=0.0.4; charset=utf-8"); + response.headers_mut().insert("content-type", content_type); + response } #[derive(Debug, Error)] diff --git a/docs/metrics.md b/docs/metrics.md index c6f28ac..00283bb 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -65,9 +65,9 @@ The exposed metrics follow [the leanMetrics specification](https://github.com/le | Name | Type | Usage | Sample collection event | Labels | Supported | |--------|-------|-------|-------------------------|--------|-----------| -|`lean_connected_peers`| Gauge | Number of connected peers | On scrape | client=lantern,qlean,ream,zeam | □ | -|`lean_peer_connection_events_total`| Counter | Total number of peer connection events | On peer connection | direction=inbound,outbound
result=success,timeout,error | □ | -|`lean_peer_disconnection_events_total`| Counter | Total number of peer disconnection events | On peer disconnection | direction=inbound,outbound
reason=timeout,remote_close,local_close,error | □ | +|`lean_connected_peers`| Gauge | Number of connected peers | On scrape | client=lantern,qlean,ream,zeam | ✅(*) | +|`lean_peer_connection_events_total`| Counter | Total number of peer connection events | On peer connection | direction=inbound,outbound
result=success,timeout,error | ✅ | +|`lean_peer_disconnection_events_total`| Counter | Total number of peer disconnection events | On peer disconnection | direction=inbound,outbound
reason=timeout,remote_close,local_close,error | ✅ | ---