From a9bb40f7cca4c9af00e1cf421952311758a93948 Mon Sep 17 00:00:00 2001 From: David Ventura Date: Tue, 17 Dec 2024 10:33:33 +0100 Subject: [PATCH 1/2] Implement prometheus metrics --- Cargo.lock | 49 +++++++++ Cargo.toml | 5 +- src/client.rs | 3 +- src/commands/init/config_builder.rs | 7 ++ src/commands/init/templates/metrics.toml | 2 + src/commands/start.rs | 12 ++- src/config.rs | 6 +- src/config/metrics.rs | 11 ++ src/lib.rs | 1 + src/prometheus.rs | 130 +++++++++++++++++++++++ src/session.rs | 25 ++++- tmkms.toml.example | 2 + 12 files changed, 245 insertions(+), 8 deletions(-) create mode 100644 src/commands/init/templates/metrics.toml create mode 100644 src/config/metrics.rs create mode 100644 src/prometheus.rs diff --git a/Cargo.lock b/Cargo.lock index 9f42bda6..d5ff4ff0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1566,6 +1566,29 @@ dependencies = [ "sha2 0.10.8", ] +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.6", +] + [[package]] name = "paste" version = "1.0.15" @@ -1666,6 +1689,20 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "thiserror", +] + [[package]] name = "prost" version = "0.12.6" @@ -1743,6 +1780,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "regex" version = "1.11.1" @@ -2437,8 +2483,10 @@ dependencies = [ "hkd32", "hkdf", "k256", + "lazy_static", "ledger", "once_cell", + "prometheus", "prost", "prost-derive", "rand", @@ -2457,6 +2505,7 @@ dependencies = [ "tendermint-p2p", "tendermint-proto", "thiserror", + "tiny_http", "url 2.5.2", "uuid", "wait-timeout", diff --git a/Cargo.toml b/Cargo.toml index 8626e58e..f404c896 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,8 +29,10 @@ getrandom = "0.2" hkd32 = { version = "0.7", default-features = false, features = ["mnemonic"] } hkdf = "0.12" k256 = { version = "0.13", features = ["ecdsa", "sha256"] } -ledger = { version = "0.2", optional = true } +lazy_static = { version = "1.5.0" } +ledger = { version = "0.2" } once_cell = "1.5" +prometheus = { version = "0.13.4", default-features = false, features = [] } prost = "0.12" prost-derive = "0.12" rand_core = { version = "0.6", features = ["std"] } @@ -48,6 +50,7 @@ tendermint-config = "0.35" tendermint-p2p = "0.35" tendermint-proto = "0.35" thiserror = "1" +tiny_http = { version = "0.12.0" } url = { version = "2.2.2", features = ["serde"], optional = true } uuid = { version = "1", features = ["serde"], optional = true } wait-timeout = "0.2" diff --git a/src/client.rs b/src/client.rs index 347f78ab..2f3a2037 100644 --- a/src/client.rs +++ b/src/client.rs @@ -10,6 +10,7 @@ use crate::{ config::ValidatorConfig, error::{Error, ErrorKind}, prelude::*, + prometheus::initialize_consensus_metrics, session::Session, }; use std::{panic, process::exit, thread, time::Duration}; @@ -88,7 +89,7 @@ fn main_loop(config: ValidatorConfig) -> Result<(), Error> { /// Ensure chain with given ID is properly registered pub fn register_chain(chain_id: &chain::Id) { let registry = chain::REGISTRY.get(); - + initialize_consensus_metrics(chain_id); debug!("registering chain: {}", chain_id); registry.get_chain(chain_id).unwrap_or_else(|| { status_err!( diff --git a/src/commands/init/config_builder.rs b/src/commands/init/config_builder.rs index 5465528c..f06fa725 100644 --- a/src/commands/init/config_builder.rs +++ b/src/commands/init/config_builder.rs @@ -43,6 +43,7 @@ impl ConfigBuilder { self.add_chain_config(); self.add_provider_config(); self.add_validator_config(); + self.add_metrics_config(); self.contents } @@ -148,6 +149,12 @@ impl ConfigBuilder { self.add_template_with_chain_id(include_str!("templates/keyring/fortanixdsm.toml")); } + /// Add `[metrics]` configurations + fn add_metrics_config(&mut self) { + self.add_section_comment("Metrics exporter configuration"); + self.add_template(include_str!("templates/metrics.toml")); + } + /// Append a template to the config file, substituting `$KMS_HOME` fn add_template(&mut self, template: &str) { self.add_str(&format_template( diff --git a/src/commands/init/templates/metrics.toml b/src/commands/init/templates/metrics.toml new file mode 100644 index 00000000..b8b4f92a --- /dev/null +++ b/src/commands/init/templates/metrics.toml @@ -0,0 +1,2 @@ +[metrics] +bind_address = "localhost:3333" diff --git a/src/commands/start.rs b/src/commands/start.rs index 0df75528..a702f0dc 100644 --- a/src/commands/start.rs +++ b/src/commands/start.rs @@ -1,6 +1,6 @@ //! Start the KMS -use crate::{chain, client::Client, prelude::*}; +use crate::{chain, client::Client, prelude::*, prometheus}; use abscissa_core::Command; use clap::Parser; use std::{path::PathBuf, process}; @@ -40,6 +40,16 @@ impl StartCommand { process::exit(1); }); + if let Some(config) = &APP.config().metrics { + let address = config.bind_address.clone(); + info!("Starting up prometheus server on {}", address); + let thread_name = abscissa_core::thread::Name::new("prometheus-thread").unwrap(); + APP.state() + .threads_mut() + .spawn(thread_name, move || prometheus::serve(&address)) + .expect("Unable to start prometheus exporter thread"); + } + // Spawn the validator client threads config .validator diff --git a/src/config.rs b/src/config.rs index 3f08374c..d614f422 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,12 +1,13 @@ //! Configuration file structures (with serde-derived parser) pub mod chain; +pub mod metrics; pub mod provider; pub mod validator; pub use self::validator::*; -use self::{chain::ChainConfig, provider::ProviderConfig}; +use self::{chain::ChainConfig, metrics::MetricsConfig, provider::ProviderConfig}; use serde::Deserialize; /// Environment variable containing path to config file @@ -29,4 +30,7 @@ pub struct KmsConfig { /// Addresses of validator nodes #[serde(default)] pub validator: Vec, + + /// Configuration for metrics exporter + pub metrics: Option, } diff --git a/src/config/metrics.rs b/src/config/metrics.rs new file mode 100644 index 00000000..6fd80bb3 --- /dev/null +++ b/src/config/metrics.rs @@ -0,0 +1,11 @@ +//! Metrics configuration + +use serde::Deserialize; + +/// Metrics configuration +#[derive(Deserialize, Debug, Clone, Default)] +#[serde(deny_unknown_fields)] +pub struct MetricsConfig { + /// Address on which to bind metrics exporter + pub bind_address: String, +} diff --git a/src/lib.rs b/src/lib.rs index dcb19beb..7a47e385 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ pub mod key_utils; pub mod keyring; pub mod prelude; pub mod privval; +pub mod prometheus; pub mod rpc; pub mod session; diff --git a/src/prometheus.rs b/src/prometheus.rs new file mode 100644 index 00000000..98777a03 --- /dev/null +++ b/src/prometheus.rs @@ -0,0 +1,130 @@ +//! Prometheus metrics collection and serving functionality +use crate::rpc; + +use lazy_static::lazy_static; +use prometheus::{ + default_registry, register_histogram_vec, register_int_counter_vec, Encoder, HistogramVec, + IntCounterVec, TextEncoder, +}; +use std::time::Duration; +use tendermint::chain; + +lazy_static! { + static ref METRIC_CONSENSUS_UPDATES: IntCounterVec = register_int_counter_vec!( + "consensus_updates_total", + "Number of consensus updates by status.", + &["chain", "status"] + ) + .unwrap(); + static ref METRIC_REQUEST_DURATION: HistogramVec = register_histogram_vec!( + "request_duration_seconds", + "Duration of request", + &["chain", "message_type"], + ) + .unwrap(); +} + +/// Status outcomes for consensus updates +pub enum ConsensusUpdateStatus { + /// Update completed successfully + Success, + /// Generic error occurred during update + Error, + /// Double signing attempt detected + DoubleSign, +} + +/// Type of RPC request received from validator node +pub enum RpcRequestType { + /// Sign proposal or vote request + Sign, + /// Ping/heartbeat request + Ping, + /// Request for public key + Pubkey, +} + +impl From<&rpc::Request> for RpcRequestType { + fn from(value: &rpc::Request) -> Self { + match value { + rpc::Request::SignProposal(_) | rpc::Request::SignVote(_) => RpcRequestType::Sign, + rpc::Request::PingRequest => RpcRequestType::Ping, + rpc::Request::ShowPublicKey => RpcRequestType::Pubkey, + } + } +} +impl ToString for ConsensusUpdateStatus { + fn to_string(&self) -> String { + match self { + ConsensusUpdateStatus::Success => "success".into(), + ConsensusUpdateStatus::Error => "error".into(), + ConsensusUpdateStatus::DoubleSign => "double_sign".into(), + } + } +} + +/// Initialize metrics counters for a given chain ID +/// +/// Creates and resets counters for all possible status outcomes +pub fn initialize_consensus_metrics(chain_id: &chain::Id) { + METRIC_CONSENSUS_UPDATES + .with_label_values(&[ + chain_id.as_str(), + &ConsensusUpdateStatus::Success.to_string(), + ]) + .reset(); + METRIC_CONSENSUS_UPDATES + .with_label_values(&[chain_id.as_str(), &ConsensusUpdateStatus::Error.to_string()]) + .reset(); + METRIC_CONSENSUS_UPDATES + .with_label_values(&[ + chain_id.as_str(), + &ConsensusUpdateStatus::DoubleSign.to_string(), + ]) + .reset(); +} + +/// Increment counter for a consensus update with given status +pub fn increment_consensus_counter(chain_id: &chain::Id, status: ConsensusUpdateStatus) { + METRIC_CONSENSUS_UPDATES + .with_label_values(&[chain_id.as_str(), &status.to_string()]) + .inc(); +} + +impl ToString for RpcRequestType { + fn to_string(&self) -> String { + match self { + RpcRequestType::Sign => "sign", + RpcRequestType::Ping => "ping", + RpcRequestType::Pubkey => "pubkey", + } + .into() + } +} + +/// Record duration of an RPC request +pub fn record_request_duration( + chain_id: &chain::Id, + request_type: &RpcRequestType, + duration: Duration, +) { + METRIC_REQUEST_DURATION + .with_label_values(&[chain_id.as_str(), &request_type.to_string()]) + .observe(duration.as_secs_f64()); +} + +/// Start HTTP server to expose Prometheus metrics +pub fn serve(address: &str) { + let encoder = TextEncoder::new(); + let registry = default_registry(); + let server = tiny_http::Server::http(address).expect("Unable to bind to address"); + for request in server.incoming_requests() { + let mut response = Vec::::new(); + let metric_families = registry.gather(); + // TODO + encoder.encode(&metric_families, &mut response).unwrap(); + request + .respond(tiny_http::Response::from_data(response)) + .unwrap(); + } +} diff --git a/src/session.rs b/src/session.rs index d61f7831..6801b8cf 100644 --- a/src/session.rs +++ b/src/session.rs @@ -7,6 +7,9 @@ use crate::{ error::{Error, ErrorKind::*}, prelude::*, privval::SignableMsg, + prometheus::{ + increment_consensus_counter, record_request_duration, ConsensusUpdateStatus, RpcRequestType, + }, rpc::{Request, Response}, }; use std::{os::unix::net::UnixStream, time::Instant}; @@ -101,6 +104,8 @@ impl Session { "[{}@{}] received request: {:?}", &self.config.chain_id, &self.config.addr, &request ); + let request_start = Instant::now(); + let request_type = RpcRequestType::from(&request); let response = match request { Request::SignProposal(_) | Request::SignVote(_) => { @@ -117,6 +122,13 @@ impl Session { ); let response_bytes = response.encode()?; + + record_request_duration( + &self.config.chain_id, + &request_type, + request_start.elapsed(), + ); + self.connection.write_all(&response_bytes)?; Ok(true) @@ -197,9 +209,11 @@ impl Session { let msg_type = signable_msg.msg_type(); let request_state = signable_msg.consensus_state(); let mut chain_state = chain.state.lock().unwrap(); - match chain_state.update_consensus_state(request_state.clone()) { - Ok(()) => Ok(None), + Ok(()) => { + increment_consensus_counter(&chain.id, ConsensusUpdateStatus::Success); + Ok(None) + } Err(e) if e.kind() == StateErrorKind::DoubleSign => { // Report double signing error back to the validator let original_block_id = chain_state.consensus_state().block_id_prefix(); @@ -214,10 +228,14 @@ impl Session { request_state.block_id_prefix() ); + increment_consensus_counter(&chain.id, ConsensusUpdateStatus::DoubleSign); let remote_err = double_sign(request_state); Ok(Some(remote_err)) } - Err(e) => Err(e.into()), + Err(e) => { + increment_consensus_counter(&chain.id, ConsensusUpdateStatus::Error); + Err(e.into()) + } } } @@ -269,7 +287,6 @@ impl Session { fn double_sign(consensus_state: consensus::State) -> proto::privval::RemoteSignerError { /// Double signing error code. const DOUBLE_SIGN_ERROR: i32 = 2; - proto::privval::RemoteSignerError { code: DOUBLE_SIGN_ERROR, description: format!( diff --git a/tmkms.toml.example b/tmkms.toml.example index 541c6c5f..dd72b13f 100644 --- a/tmkms.toml.example +++ b/tmkms.toml.example @@ -83,3 +83,5 @@ path = "path/to/consensus-ed25519.key" # generate using `tmkms softsign keygen - # source = { protocol = "jsonrpc", uri = "http://127.0.0.1:23456" } # rpc = { addr = "tcp://127.0.0.1:26657" } # seq_file = "irishub-account-seq.json" +[metrics] +bind_address = "localhost:3333" From 8e405499e2df30d6e1146fd8518824f0359682f3 Mon Sep 17 00:00:00 2001 From: David Ventura Date: Wed, 12 Feb 2025 17:35:18 +0100 Subject: [PATCH 2/2] Fine-tune the buckets for request_duration_seconds --- src/prometheus.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/prometheus.rs b/src/prometheus.rs index 98777a03..17d3c4fb 100644 --- a/src/prometheus.rs +++ b/src/prometheus.rs @@ -20,6 +20,20 @@ lazy_static! { "request_duration_seconds", "Duration of request", &["chain", "message_type"], + vec![ + 0.005, + 0.05, + 0.1, + 0.150, + 0.200, + 0.250, + 0.300, + 0.350, + 0.400, + 0.500, + 1.0, + std::f64::INFINITY + ], ) .unwrap(); }