Skip to content

Commit 6192425

Browse files
authored
feat: add peer connection metrics (#48)
* refactor: remove unneeded vergen dep * feat: add network peer connection metrics Add the following metrics from the leanMetrics specification: - lean_connected_peers: Gauge with client label tracking connected peer count - lean_peer_connection_events_total: Counter with direction and result labels - lean_peer_disconnection_events_total: Counter with direction and reason labels Connection events are tracked in the p2p event loop by handling SwarmEvent::ConnectionEstablished, SwarmEvent::ConnectionClosed, SwarmEvent::OutgoingConnectionError, and SwarmEvent::IncomingConnectionError. * refactor: unify metrics functions * fix: follow spec in /metrics and /health responses See https://github.com/leanEthereum/leanSpec/blob/914e09333578072c6bf02d2d405f8c13d5b55d38/src/lean_spec/subspecs/api/server.py#L36-L46 * fix: take into account multiple connections to same peer * fix: avoid dialing ourselves * fix: use reason matching in connection closed * feat: add node name to peer metrics * docs: mark metrics as supported in metrics.md * chore: fix clippy lints
1 parent eb08775 commit 6192425

File tree

9 files changed

+214
-19
lines changed

9 files changed

+214
-19
lines changed

Cargo.lock

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ tree_hash = "0.9.1"
6363
tree_hash_derive = "0.9.1"
6464

6565
# Build-time version info
66-
vergen = { version = "9", features = ["build", "rustc"] }
67-
vergen-git2 = "9"
66+
vergen-git2 = { version = "9", features = ["rustc"] }
6867

6968
rand = "0.9"

bin/ethlambda/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ serde_json.workspace = true
2323
serde_yaml_ng = "0.10"
2424
hex.workspace = true
2525

26-
ethereum-types.workspace = true
2726
clap.workspace = true
2827

2928
[build-dependencies]
30-
vergen.workspace = true
3129
vergen-git2.workspace = true

bin/ethlambda/build.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use vergen::{Emitter, RustcBuilder};
2-
use vergen_git2::Git2Builder;
1+
use vergen_git2::{Emitter, Git2Builder, RustcBuilder};
32

43
fn main() -> Result<(), Box<dyn std::error::Error>> {
54
let git2 = Git2Builder::default().branch(true).sha(true).build()?;

bin/ethlambda/src/main.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99
use clap::Parser;
1010
use ethlambda_p2p::{Bootnode, parse_enrs, start_p2p};
1111
use ethlambda_rpc::metrics::start_prometheus_metrics_api;
12+
use ethlambda_types::primitives::H256;
1213
use ethlambda_types::{
1314
genesis::Genesis,
1415
signature::ValidatorSecretKey,
@@ -73,12 +74,16 @@ async fn main() {
7374
let validators_path = options
7475
.custom_network_config_dir
7576
.join("annotated_validators.yaml");
77+
let validator_config = options
78+
.custom_network_config_dir
79+
.join("validator-config.yaml");
7680
let validator_keys_dir = options.custom_network_config_dir.join("hash-sig-keys");
7781

7882
let genesis_json = std::fs::read_to_string(&genesis_path).expect("Failed to read genesis.json");
7983
let genesis: Genesis =
8084
serde_json::from_str(&genesis_json).expect("Failed to parse genesis.json");
8185

86+
populate_name_registry(&validator_config);
8287
let bootnodes = read_bootnodes(&bootnodes_path);
8388

8489
let validators = read_validators(&validators_path);
@@ -113,6 +118,31 @@ async fn main() {
113118
println!("Shutting down...");
114119
}
115120

121+
fn populate_name_registry(validator_config: impl AsRef<Path>) {
122+
#[derive(Deserialize)]
123+
struct Validator {
124+
name: String,
125+
privkey: H256,
126+
}
127+
#[derive(Deserialize)]
128+
struct Config {
129+
validators: Vec<Validator>,
130+
}
131+
let config_yaml =
132+
std::fs::read_to_string(&validator_config).expect("Failed to read validator config file");
133+
let config: Config =
134+
serde_yaml_ng::from_str(&config_yaml).expect("Failed to parse validator config file");
135+
136+
let names_and_privkeys = config
137+
.validators
138+
.into_iter()
139+
.map(|v| (v.name, v.privkey))
140+
.collect();
141+
142+
// Populates a dictionary used for labeling metrics with node names
143+
ethlambda_p2p::metrics::populate_name_registry(names_and_privkeys);
144+
}
145+
116146
fn read_bootnodes(bootnodes_path: impl AsRef<Path>) -> Vec<Bootnode> {
117147
let bootnodes_yaml =
118148
std::fs::read_to_string(bootnodes_path).expect("Failed to read bootnodes file");

crates/net/p2p/src/lib.rs

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use libp2p::{
1919
use sha2::Digest;
2020
use ssz::Encode;
2121
use tokio::sync::mpsc;
22-
use tracing::{info, trace};
22+
use tracing::{info, trace, warn};
2323

2424
use crate::{
2525
gossipsub::{ATTESTATION_TOPIC_KIND, BLOCK_TOPIC_KIND},
@@ -31,6 +31,9 @@ use crate::{
3131

3232
mod gossipsub;
3333
mod messages;
34+
pub mod metrics;
35+
36+
pub use metrics::populate_name_registry;
3437

3538
pub async fn start_p2p(
3639
node_key: Vec<u8>,
@@ -66,7 +69,6 @@ pub async fn start_p2p(
6669
.build()
6770
.expect("invalid gossipsub config");
6871

69-
// TODO: setup custom message ID function
7072
let gossipsub = libp2p::gossipsub::Behaviour::new(MessageAuthenticity::Anonymous, config)
7173
.expect("failed to initiate behaviour");
7274

@@ -100,12 +102,17 @@ pub async fn start_p2p(
100102
config.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
101103
})
102104
.build();
105+
let local_peer_id = *swarm.local_peer_id();
103106
for bootnode in bootnodes {
107+
let peer_id = PeerId::from_public_key(&bootnode.public_key);
108+
if peer_id == local_peer_id {
109+
continue;
110+
}
104111
let addr = Multiaddr::empty()
105112
.with(bootnode.ip.into())
106113
.with(Protocol::Udp(bootnode.quic_port))
107114
.with(Protocol::QuicV1)
108-
.with_p2p(PeerId::from_public_key(&bootnode.public_key))
115+
.with_p2p(peer_id)
109116
.expect("failed to add peer ID to multiaddr");
110117
swarm.dial(addr).unwrap();
111118
}
@@ -179,6 +186,58 @@ async fn event_loop(
179186
)) => {
180187
gossipsub::handle_gossipsub_message(&mut blockchain, message).await;
181188
}
189+
SwarmEvent::ConnectionEstablished {
190+
peer_id,
191+
endpoint,
192+
num_established,
193+
..
194+
} => {
195+
let direction = connection_direction(&endpoint);
196+
if num_established.get() == 1 {
197+
metrics::notify_peer_connected(&Some(peer_id), direction, "success");
198+
}
199+
info!(%peer_id, %direction, "Peer connected");
200+
}
201+
SwarmEvent::ConnectionClosed {
202+
peer_id,
203+
endpoint,
204+
num_established,
205+
cause,
206+
..
207+
} => {
208+
let direction = connection_direction(&endpoint);
209+
let reason = match cause {
210+
None => "remote_close",
211+
Some(err) => {
212+
// Categorize disconnection reasons
213+
let err_str = err.to_string().to_lowercase();
214+
if err_str.contains("timeout") || err_str.contains("timedout") || err_str.contains("keepalive") {
215+
"timeout"
216+
} else if err_str.contains("reset") || err_str.contains("connectionreset") {
217+
"remote_close"
218+
} else {
219+
"error"
220+
}
221+
}
222+
};
223+
if num_established == 0 {
224+
metrics::notify_peer_disconnected(&Some(peer_id), direction, reason);
225+
}
226+
info!(%peer_id, %direction, %reason, "Peer disconnected");
227+
}
228+
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
229+
let result = if error.to_string().to_lowercase().contains("timed out") {
230+
"timeout"
231+
} else {
232+
"error"
233+
};
234+
metrics::notify_peer_connected(&peer_id, "outbound", result);
235+
warn!(?peer_id, %error, "Outgoing connection error");
236+
}
237+
SwarmEvent::IncomingConnectionError { peer_id, error, .. } => {
238+
metrics::notify_peer_connected(&peer_id, "inbound", "error");
239+
warn!(%error, "Incoming connection error");
240+
}
182241
_ => {
183242
trace!(?event, "Ignored swarm event");
184243
}
@@ -310,6 +369,14 @@ pub fn parse_enrs(enrs: Vec<String>) -> Vec<Bootnode> {
310369
bootnodes
311370
}
312371

372+
fn connection_direction(endpoint: &libp2p::core::ConnectedPoint) -> &'static str {
373+
if endpoint.is_dialer() {
374+
"outbound"
375+
} else {
376+
"inbound"
377+
}
378+
}
379+
313380
fn compute_message_id(message: &libp2p::gossipsub::Message) -> libp2p::gossipsub::MessageId {
314381
const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0x00, 0x00, 0x00, 0x00];
315382
const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [0x01, 0x00, 0x00, 0x00];

crates/net/p2p/src/metrics.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
//! Prometheus metrics for the P2P network layer.
2+
3+
use std::{
4+
collections::HashMap,
5+
sync::{LazyLock, RwLock},
6+
};
7+
8+
use ethlambda_types::primitives::H256;
9+
use libp2p::{
10+
PeerId,
11+
identity::{Keypair, secp256k1},
12+
};
13+
use prometheus::{IntCounterVec, IntGaugeVec, register_int_counter_vec, register_int_gauge_vec};
14+
15+
static NODE_NAME_REGISTRY: LazyLock<RwLock<HashMap<PeerId, &'static str>>> =
16+
LazyLock::new(|| RwLock::new(HashMap::new()));
17+
18+
pub fn populate_name_registry(names_and_privkeys: HashMap<String, H256>) {
19+
let mut registry = NODE_NAME_REGISTRY.write().unwrap();
20+
let name_registry = names_and_privkeys
21+
.into_iter()
22+
.filter_map(|(name, mut privkey)| {
23+
let Ok(privkey) = secp256k1::SecretKey::try_from_bytes(&mut privkey) else {
24+
return None;
25+
};
26+
let pubkey = Keypair::from(secp256k1::Keypair::from(privkey)).public();
27+
let peer_id = PeerId::from_public_key(&pubkey);
28+
// NOTE: we leak the name string to get a 'static lifetime.
29+
// In reality, the name registry is not expected to be read, so it should be safe
30+
// to turn these strings to &'static str.
31+
Some((peer_id, &*name.leak()))
32+
})
33+
.collect();
34+
*registry = name_registry;
35+
}
36+
37+
fn resolve(peer_id: &Option<PeerId>) -> &'static str {
38+
let registry = NODE_NAME_REGISTRY.read().unwrap();
39+
peer_id
40+
.as_ref()
41+
.and_then(|peer_id| registry.get(peer_id))
42+
.unwrap_or(&"unknown")
43+
}
44+
45+
static LEAN_CONNECTED_PEERS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
46+
register_int_gauge_vec!(
47+
"lean_connected_peers",
48+
"Number of connected peers",
49+
&["client"]
50+
)
51+
.unwrap()
52+
});
53+
54+
static LEAN_PEER_CONNECTION_EVENTS_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
55+
register_int_counter_vec!(
56+
"lean_peer_connection_events_total",
57+
"Total number of peer connection events",
58+
&["direction", "result"]
59+
)
60+
.unwrap()
61+
});
62+
63+
static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
64+
register_int_counter_vec!(
65+
"lean_peer_disconnection_events_total",
66+
"Total number of peer disconnection events",
67+
&["direction", "reason"]
68+
)
69+
.unwrap()
70+
});
71+
72+
/// Notify that a peer connection event occurred.
73+
///
74+
/// If `result` is "success", the connected peer count is incremented.
75+
/// The connection event counter is always incremented.
76+
pub fn notify_peer_connected(peer_id: &Option<PeerId>, direction: &str, result: &str) {
77+
LEAN_PEER_CONNECTION_EVENTS_TOTAL
78+
.with_label_values(&[direction, result])
79+
.inc();
80+
81+
if result == "success" {
82+
let name = resolve(peer_id);
83+
LEAN_CONNECTED_PEERS.with_label_values(&[name]).inc();
84+
}
85+
}
86+
87+
/// Notify that a peer disconnected.
88+
///
89+
/// Decrements the connected peer count and increments the disconnection event counter.
90+
pub fn notify_peer_disconnected(peer_id: &Option<PeerId>, direction: &str, reason: &str) {
91+
LEAN_PEER_DISCONNECTION_EVENTS_TOTAL
92+
.with_label_values(&[direction, reason])
93+
.inc();
94+
95+
let name = resolve(peer_id);
96+
LEAN_CONNECTED_PEERS.with_label_values(&[name]).dec();
97+
}

crates/net/rpc/src/metrics.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use std::net::SocketAddr;
22

3-
use axum::{Router, routing::get};
3+
use axum::{Router, http::HeaderValue, response::IntoResponse, routing::get};
44
use thiserror::Error;
55
use tracing::warn;
66

77
pub async fn start_prometheus_metrics_api(address: SocketAddr) -> Result<(), std::io::Error> {
88
let app = Router::new()
99
.route("/metrics", get(get_metrics))
10-
.route("/health", get("Service Up"));
10+
.route("/health", get(get_health));
1111

1212
// Start the axum app
1313
let listener = tokio::net::TcpListener::bind(address).await?;
@@ -16,12 +16,20 @@ pub async fn start_prometheus_metrics_api(address: SocketAddr) -> Result<(), std
1616
Ok(())
1717
}
1818

19-
pub(crate) async fn get_metrics() -> String {
20-
gather_default_metrics()
19+
pub(crate) async fn get_health() -> impl IntoResponse {
20+
r#"{"status": "healthy", "service": "lean-spec-api"}"#
21+
}
22+
23+
pub(crate) async fn get_metrics() -> impl IntoResponse {
24+
let mut response = gather_default_metrics()
2125
.inspect_err(|err| {
2226
warn!(%err, "Failed to gather Prometheus metrics");
2327
})
2428
.unwrap_or_default()
29+
.into_response();
30+
let content_type = HeaderValue::from_static("text/plain; version=0.0.4; charset=utf-8");
31+
response.headers_mut().insert("content-type", content_type);
32+
response
2533
}
2634

2735
#[derive(Debug, Error)]

docs/metrics.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ The exposed metrics follow [the leanMetrics specification](https://github.com/le
6565

6666
| Name | Type | Usage | Sample collection event | Labels | Supported |
6767
|--------|-------|-------|-------------------------|--------|-----------|
68-
|`lean_connected_peers`| Gauge | Number of connected peers | On scrape | client=lantern,qlean,ream,zeam | |
69-
|`lean_peer_connection_events_total`| Counter | Total number of peer connection events | On peer connection | direction=inbound,outbound<br>result=success,timeout,error | |
70-
|`lean_peer_disconnection_events_total`| Counter | Total number of peer disconnection events | On peer disconnection | direction=inbound,outbound<br>reason=timeout,remote_close,local_close,error | |
68+
|`lean_connected_peers`| Gauge | Number of connected peers | On scrape | client=lantern,qlean,ream,zeam | ✅(*) |
69+
|`lean_peer_connection_events_total`| Counter | Total number of peer connection events | On peer connection | direction=inbound,outbound<br>result=success,timeout,error | |
70+
|`lean_peer_disconnection_events_total`| Counter | Total number of peer disconnection events | On peer disconnection | direction=inbound,outbound<br>reason=timeout,remote_close,local_close,error | |
7171

7272
---
7373

0 commit comments

Comments
 (0)