Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ tree_hash = "0.9.1"
tree_hash_derive = "0.9.1"

# Build-time version info
vergen = { version = "9", features = ["build", "rustc"] }
vergen-git2 = "9"
vergen-git2 = { version = "9", features = ["rustc"] }

rand = "0.9"
2 changes: 0 additions & 2 deletions bin/ethlambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ serde_json.workspace = true
serde_yaml_ng = "0.10"
hex.workspace = true

ethereum-types.workspace = true
clap.workspace = true

[build-dependencies]
vergen.workspace = true
vergen-git2.workspace = true
3 changes: 1 addition & 2 deletions bin/ethlambda/build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use vergen::{Emitter, RustcBuilder};
use vergen_git2::Git2Builder;
use vergen_git2::{Emitter, Git2Builder, RustcBuilder};

fn main() -> Result<(), Box<dyn std::error::Error>> {
let git2 = Git2Builder::default().branch(true).sha(true).build()?;
Expand Down
30 changes: 30 additions & 0 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
use clap::Parser;
use ethlambda_p2p::{Bootnode, parse_enrs, start_p2p};
use ethlambda_rpc::metrics::start_prometheus_metrics_api;
use ethlambda_types::primitives::H256;
use ethlambda_types::{
genesis::Genesis,
signature::ValidatorSecretKey,
Expand Down Expand Up @@ -73,12 +74,16 @@ async fn main() {
let validators_path = options
.custom_network_config_dir
.join("annotated_validators.yaml");
let validator_config = options
.custom_network_config_dir
.join("validator-config.yaml");
let validator_keys_dir = options.custom_network_config_dir.join("hash-sig-keys");

let genesis_json = std::fs::read_to_string(&genesis_path).expect("Failed to read genesis.json");
let genesis: Genesis =
serde_json::from_str(&genesis_json).expect("Failed to parse genesis.json");

populate_name_registry(&validator_config);
let bootnodes = read_bootnodes(&bootnodes_path);

let validators = read_validators(&validators_path);
Expand Down Expand Up @@ -113,6 +118,31 @@ async fn main() {
println!("Shutting down...");
}

fn populate_name_registry(validator_config: impl AsRef<Path>) {
#[derive(Deserialize)]
struct Validator {
name: String,
privkey: H256,
}
#[derive(Deserialize)]
struct Config {
validators: Vec<Validator>,
}
let config_yaml =
std::fs::read_to_string(&validator_config).expect("Failed to read validator config file");
let config: Config =
serde_yaml_ng::from_str(&config_yaml).expect("Failed to parse validator config file");

let names_and_privkeys = config
.validators
.into_iter()
.map(|v| (v.name, v.privkey))
.collect();

// Populates a dictionary used for labeling metrics with node names
ethlambda_p2p::metrics::populate_name_registry(names_and_privkeys);
}

fn read_bootnodes(bootnodes_path: impl AsRef<Path>) -> Vec<Bootnode> {
let bootnodes_yaml =
std::fs::read_to_string(bootnodes_path).expect("Failed to read bootnodes file");
Expand Down
73 changes: 70 additions & 3 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use libp2p::{
use sha2::Digest;
use ssz::Encode;
use tokio::sync::mpsc;
use tracing::{info, trace};
use tracing::{info, trace, warn};

use crate::{
gossipsub::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND},
Expand All @@ -31,6 +31,9 @@ use crate::{

mod gossipsub;
mod messages;
pub mod metrics;

pub use metrics::populate_name_registry;

pub async fn start_p2p(
node_key: Vec<u8>,
Expand Down Expand Up @@ -66,7 +69,6 @@ pub async fn start_p2p(
.build()
.expect("invalid gossipsub config");

// TODO: setup custom message ID function
let gossipsub = libp2p::gossipsub::Behaviour::new(MessageAuthenticity::Anonymous, config)
.expect("failed to initiate behaviour");

Expand Down Expand Up @@ -100,12 +102,17 @@ pub async fn start_p2p(
config.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
})
.build();
let local_peer_id = *swarm.local_peer_id();
for bootnode in bootnodes {
let peer_id = PeerId::from_public_key(&bootnode.public_key);
if peer_id == local_peer_id {
continue;
}
let addr = Multiaddr::empty()
.with(bootnode.ip.into())
.with(Protocol::Udp(bootnode.quic_port))
.with(Protocol::QuicV1)
.with_p2p(PeerId::from_public_key(&bootnode.public_key))
.with_p2p(peer_id)
.expect("failed to add peer ID to multiaddr");
swarm.dial(addr).unwrap();
}
Expand Down Expand Up @@ -179,6 +186,58 @@ async fn event_loop(
)) => {
gossipsub::handle_gossipsub_message(&mut blockchain, message).await;
}
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint,
num_established,
..
} => {
let direction = connection_direction(&endpoint);
if num_established.get() == 1 {
metrics::notify_peer_connected(&Some(peer_id), direction, "success");
}
info!(%peer_id, %direction, "Peer connected");
}
SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
num_established,
cause,
..
} => {
let direction = connection_direction(&endpoint);
let reason = match cause {
None => "remote_close",
Some(err) => {
// Categorize disconnection reasons
let err_str = err.to_string().to_lowercase();
if err_str.contains("timeout") || err_str.contains("timedout") || err_str.contains("keepalive") {
"timeout"
} else if err_str.contains("reset") || err_str.contains("connectionreset") {
"remote_close"
} else {
"error"
}
}
};
if num_established == 0 {
metrics::notify_peer_disconnected(&Some(peer_id), direction, reason);
}
info!(%peer_id, %direction, %reason, "Peer disconnected");
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
let result = if error.to_string().to_lowercase().contains("timed out") {
"timeout"
} else {
"error"
};
metrics::notify_peer_connected(&peer_id, "outbound", result);
warn!(?peer_id, %error, "Outgoing connection error");
}
SwarmEvent::IncomingConnectionError { peer_id, error, .. } => {
metrics::notify_peer_connected(&peer_id, "inbound", "error");
warn!(%error, "Incoming connection error");
}
_ => {
trace!(?event, "Ignored swarm event");
}
Expand Down Expand Up @@ -310,6 +369,14 @@ pub fn parse_enrs(enrs: Vec<String>) -> Vec<Bootnode> {
bootnodes
}

fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str {
if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
}
}

fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId {
const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00];
const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00];
Expand Down
97 changes: 97 additions & 0 deletions crates/net/p2p/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//! Prometheus metrics for the P2P network layer.

use std::{
collections::HashMap,
sync::{LazyLock, RwLock},
};

use ethlambda_types::primitives::H256;
use libp2p::{
PeerId,
identity::{Keypair, secp256k1},
};
use prometheus::{IntCounterVec, IntGaugeVec, register_int_counter_vec, register_int_gauge_vec};

static NODE_NAME_REGISTRY: LazyLock<RwLock<HashMap<PeerId, &'static str>>> =
LazyLock::new(|| RwLock::new(HashMap::new()));

pub fn populate_name_registry(names_and_privkeys: HashMap<String, H256>) {
let mut registry = NODE_NAME_REGISTRY.write().unwrap();
let name_registry = names_and_privkeys
.into_iter()
.filter_map(|(name, mut privkey)| {
let Ok(privkey) = secp256k1::SecretKey::try_from_bytes(&mut privkey) else {
return None;
};
let pubkey = Keypair::from(secp256k1::Keypair::from(privkey)).public();
let peer_id = PeerId::from_public_key(&pubkey);
// NOTE: we leak the name string to get a 'static lifetime.
// In reality, the name registry is not expected to be read, so it should be safe
// to turn these strings to &'static str.
Some((peer_id, &*name.leak()))
})
.collect();
*registry = name_registry;
}

fn resolve(peer_id: &Option<PeerId>) -> &'static str {
let registry = NODE_NAME_REGISTRY.read().unwrap();
peer_id
.as_ref()
.and_then(|peer_id| registry.get(peer_id))
.unwrap_or(&"unknown")
}

static LEAN_CONNECTED_PEERS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"lean_connected_peers",
"Number of connected peers",
&["client"]
)
.unwrap()
});

static LEAN_PEER_CONNECTION_EVENTS_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec!(
"lean_peer_connection_events_total",
"Total number of peer connection events",
&["direction", "result"]
)
.unwrap()
});

static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec!(
"lean_peer_disconnection_events_total",
"Total number of peer disconnection events",
&["direction", "reason"]
)
.unwrap()
});

/// Notify that a peer connection event occurred.
///
/// If `result` is "success", the connected peer count is incremented.
/// The connection event counter is always incremented.
pub fn notify_peer_connected(peer_id: &Option<PeerId>, direction: &str, result: &str) {
LEAN_PEER_CONNECTION_EVENTS_TOTAL
.with_label_values(&[direction, result])
.inc();

if result == "success" {
let name = resolve(peer_id);
LEAN_CONNECTED_PEERS.with_label_values(&[name]).inc();
}
}

/// Notify that a peer disconnected.
///
/// Decrements the connected peer count and increments the disconnection event counter.
pub fn notify_peer_disconnected(peer_id: &Option<PeerId>, direction: &str, reason: &str) {
LEAN_PEER_DISCONNECTION_EVENTS_TOTAL
.with_label_values(&[direction, reason])
.inc();

let name = resolve(peer_id);
LEAN_CONNECTED_PEERS.with_label_values(&[name]).dec();
}
16 changes: 12 additions & 4 deletions crates/net/rpc/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::net::SocketAddr;

use axum::{Router, routing::get};
use axum::{Router, http::HeaderValue, response::IntoResponse, routing::get};
use thiserror::Error;
use tracing::warn;

pub async fn start_prometheus_metrics_api(address: SocketAddr) -> Result<(), std::io::Error> {
let app = Router::new()
.route("/metrics", get(get_metrics))
.route("/health", get("Service Up"));
.route("/health", get(get_health));

// Start the axum app
let listener = tokio::net::TcpListener::bind(address).await?;
Expand All @@ -16,12 +16,20 @@ pub async fn start_prometheus_metrics_api(address: SocketAddr) -> Result<(), std
Ok(())
}

pub(crate) async fn get_metrics() -> String {
gather_default_metrics()
pub(crate) async fn get_health() -> impl IntoResponse {
r#"{"status": "healthy", "service": "lean-spec-api"}"#
}

pub(crate) async fn get_metrics() -> impl IntoResponse {
let mut response = gather_default_metrics()
.inspect_err(|err| {
warn!(%err, "Failed to gather Prometheus metrics");
})
.unwrap_or_default()
.into_response();
let content_type = HeaderValue::from_static("text/plain; version=0.0.4; charset=utf-8");
response.headers_mut().insert("content-type", content_type);
response
}

#[derive(Debug, Error)]
Expand Down
6 changes: 3 additions & 3 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ The exposed metrics follow [the leanMetrics specification](https://github.com/le

| Name | Type | Usage | Sample collection event | Labels | Supported |
|--------|-------|-------|-------------------------|--------|-----------|
|`lean_connected_peers`| Gauge | Number of connected peers | On scrape | client=lantern,qlean,ream,zeam | |
|`lean_peer_connection_events_total`| Counter | Total number of peer connection events | On peer connection | direction=inbound,outbound<br>result=success,timeout,error | |
|`lean_peer_disconnection_events_total`| Counter | Total number of peer disconnection events | On peer disconnection | direction=inbound,outbound<br>reason=timeout,remote_close,local_close,error | |
|`lean_connected_peers`| Gauge | Number of connected peers | On scrape | client=lantern,qlean,ream,zeam | ✅(*) |
|`lean_peer_connection_events_total`| Counter | Total number of peer connection events | On peer connection | direction=inbound,outbound<br>result=success,timeout,error | |
|`lean_peer_disconnection_events_total`| Counter | Total number of peer disconnection events | On peer disconnection | direction=inbound,outbound<br>reason=timeout,remote_close,local_close,error | |

---

Expand Down