Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument RustyVault with Prometheus #76

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ glob = "0.3"
serde_asn1_der = "0.8"
base64 = "0.22"
ipnetwork = "0.20"
prometheus-client = "0.22.3"
tokio = "1.40.0"
sysinfo = "0.31.4"

# optional dependencies
openssl = { version = "0.10.64", optional = true }
Expand Down
28 changes: 19 additions & 9 deletions src/cli/command/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
sync::{Arc, RwLock},
};

use actix_web::{middleware, web, App, HttpResponse, HttpServer};
use actix_web::{middleware::{self, from_fn}, web, App, HttpResponse, HttpServer};
use anyhow::format_err;
use clap::ArgMatches;
use openssl::{
Expand All @@ -17,12 +17,7 @@ use openssl::{
use sysexits::ExitCode;

use crate::{
cli::config,
core::Core,
errors::RvError,
http,
storage,
EXIT_CODE_INSUFFICIENT_PARAMS, EXIT_CODE_LOAD_CONFIG_FAILURE, EXIT_CODE_OK,
cli::config, core::Core, errors::RvError, http, metrics::{manager::MetricsManager, middleware::metrics_midleware}, storage, EXIT_CODE_INSUFFICIENT_PARAMS, EXIT_CODE_LOAD_CONFIG_FAILURE, EXIT_CODE_OK
wa5i marked this conversation as resolved.
Show resolved Hide resolved
};

pub const WORK_DIR_PATH_DEFAULT: &str = "/tmp/rusty_vault";
Expand Down Expand Up @@ -113,7 +108,15 @@ pub fn main(config_path: &str) -> Result<(), RvError> {

let barrier = storage::barrier_aes_gcm::AESGCMBarrier::new(Arc::clone(&backend));

let core = Arc::new(RwLock::new(Core { physical: backend, barrier: Arc::new(barrier), ..Default::default() }));
let metrics_manager = Arc::new(RwLock::new(MetricsManager::new()));
let system_metrics = Arc::clone(&metrics_manager.read().unwrap().system_metrics);


wa5i marked this conversation as resolved.
Show resolved Hide resolved
let core = Arc::new(RwLock::new(Core {
physical: backend,
barrier: Arc::new(barrier),
..Default::default()
}));

{
let mut c = core.write()?;
Expand All @@ -123,7 +126,9 @@ pub fn main(config_path: &str) -> Result<(), RvError> {
let mut http_server = HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
.wrap(from_fn(metrics_midleware))
.app_data(web::Data::new(Arc::clone(&core)))
.app_data(web::Data::new(Arc::clone(&metrics_manager)))
.configure(http::init_service)
.default_service(web::to(|| HttpResponse::NotFound()))
})
Expand Down Expand Up @@ -182,7 +187,12 @@ pub fn main(config_path: &str) -> Result<(), RvError> {

log::info!("rusty_vault server starts, waiting for request...");

server.block_on(async { http_server.run().await })?;
server.block_on(async {
tokio::spawn(async {
system_metrics.start_collecting().await;
});
http_server.run().await
})?;
let _ = server.run();

Ok(())
Expand Down
24 changes: 24 additions & 0 deletions src/http/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::sync::{Arc, RwLock};

use actix_web::{web, HttpResponse};
use prometheus_client::encoding::text::encode;
use crate::metrics::manager::MetricsManager;

pub async fn metrics_handler(metrics_manager: web::Data<Arc<RwLock<MetricsManager>>>) -> HttpResponse {
let m = metrics_manager.read().unwrap();
let registry = m.registry.lock().unwrap();

let mut buffer = String::new();
if let Err(e) = encode(&mut buffer, &registry) {
eprintln!("Failed to encode metrics: {}", e);
wa5i marked this conversation as resolved.
Show resolved Hide resolved
return HttpResponse::InternalServerError().finish();
}

HttpResponse::Ok()
.content_type("text/plain; version=0.0.4")
.body(buffer)
}

pub fn init_metrics_service(cfg: &mut web::ServiceConfig){
cfg.service(web::resource("/metrics").route(web::get().to(metrics_handler)));
}
2 changes: 2 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{core::Core, errors::RvError, logical::Request};

pub mod logical;
pub mod sys;
pub mod metrics;

pub const AUTH_COOKIE_NAME: &str = "token";
pub const AUTH_HEADER_NAME: &str = "X-RustyVault-Token";
Expand Down Expand Up @@ -101,6 +102,7 @@ pub fn request_on_connect_handler(conn: &dyn Any, ext: &mut Extensions) {
pub fn init_service(cfg: &mut web::ServiceConfig) {
sys::init_sys_service(cfg);
logical::init_logical_service(cfg);
metrics::init_metrics_service(cfg);
}

impl ResponseError for RvError {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub mod router;
pub mod shamir;
pub mod storage;
pub mod utils;
pub mod metrics;
#[cfg(feature = "storage_mysql")]
pub mod schema;

Expand Down
72 changes: 72 additions & 0 deletions src/metrics/http_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::fmt::Write;

use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue, LabelValueEncoder};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{linear_buckets, Histogram};
use prometheus_client::registry::Registry;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should also be formatted with cargo fmt.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatted with cargo fmt.


#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum MetricsMethod {
GET,
POST,
PUT,
DELETE,
OTHER,
wa5i marked this conversation as resolved.
Show resolved Hide resolved
}

impl EncodeLabelValue for MetricsMethod {
fn encode(&self, writer: &mut LabelValueEncoder<'_>) -> Result<(), std::fmt::Error> {
match self {
MetricsMethod::GET => writer.write_str("get"),
MetricsMethod::POST => writer.write_str("post"),
MetricsMethod::PUT => writer.write_str("put"),
MetricsMethod::DELETE => writer.write_str("delete"),
MetricsMethod::OTHER => writer.write_str("other"),
}
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct HttpLabel {
pub path: String,
pub method: MetricsMethod,
pub status: u16,
}

#[derive(Clone)]
pub struct HttpMetrics {
requests: Family<HttpLabel, Counter>,
histogram: Family<HttpLabel, Histogram>,
}

impl HttpMetrics {
pub fn new(registry: &mut Registry) -> Self {
let requests = Family::<HttpLabel, Counter>::default();
let histogram = Family::<HttpLabel, Histogram>::new_with_constructor(|| {
Histogram::new(linear_buckets(0.1, 0.1, 10))
});

registry.register(
"http_request_count",
"Number of HTTP requests received, labeled by method and status",
requests.clone(),
);

registry.register(
"http_request_duration_seconds",
"Duration of HTTP requests, labeled by method and status",
histogram.clone(),
);

Self { requests, histogram }
}

pub fn increment_request_count(&self, label:&HttpLabel) {
self.requests.get_or_create(label).inc();
}

pub fn observe_duration(&self, label:&HttpLabel, duration: f64,) {
self.histogram.get_or_create(label).observe(duration);
}
}
20 changes: 20 additions & 0 deletions src/metrics/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crate::metrics::http_metrics::HttpMetrics;
use crate::metrics::system_metrics::SystemMetrics;
use prometheus_client::registry::Registry;
use std::sync::{Arc, Mutex};

#[derive(Clone)]
pub struct MetricsManager {
pub registry: Arc<Mutex<Registry>>,
pub system_metrics: Arc<SystemMetrics>,
pub http_metrics: Arc<HttpMetrics>,
}

impl MetricsManager {
pub fn new() -> Self {
let registry = Arc::new(Mutex::new(Registry::default()));
let system_metrics = Arc::new(SystemMetrics::new(&mut registry.lock().unwrap()));
let http_metrics = Arc::new(HttpMetrics::new(&mut registry.lock().unwrap()));
MetricsManager { registry, system_metrics, http_metrics }
}
}
41 changes: 41 additions & 0 deletions src/metrics/middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::{sync::{Arc, RwLock}, time::Instant};

use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
http::Method,
middleware::Next,
web::Data,
Error,
};
use crate::metrics::http_metrics::HttpLabel;

use super::{http_metrics::MetricsMethod, manager::MetricsManager};

pub async fn metrics_midleware(
req: ServiceRequest,
next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {
let start_time = Instant::now();
let path = req.path().to_string();
let method = match *req.method() {
Method::GET => MetricsMethod::GET,
Method::POST => MetricsMethod::POST,
Method::PUT => MetricsMethod::PUT,
Method::DELETE => MetricsMethod::DELETE,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, LIST is missing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LIST added.

_ => MetricsMethod::OTHER,
};

let res = next.call(req).await?;

let status = res.status().as_u16();
let label = HttpLabel{path, method, status};
if let Some(m) = res.request().app_data::<Data<Arc<RwLock<MetricsManager>>>>(){
let metrics_manager = m.read().unwrap();
metrics_manager.http_metrics.increment_request_count(&label);
let duration = start_time.elapsed().as_secs_f64();
metrics_manager.http_metrics.observe_duration(&label, duration);
}

Ok(res)
}
8 changes: 8 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! The `rusty_vault::metriccs` is a module that utilize Prometheus to capture system metrics defines 'backend' and relevant data
//! defines 'manager' and relevant data structures such as `SystemMetrics` and `HttpMetrics`
//!
//! The 'manager' holds the Prometheus registry
pub mod middleware;
pub mod manager;
pub mod system_metrics;
pub mod http_metrics;
96 changes: 96 additions & 0 deletions src/metrics/system_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::sync::{atomic::AtomicU64, Arc, Mutex};
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use sysinfo::{Disks, Networks, System};
use tokio::time::{self, Duration};

pub struct SystemMetrics {
system: Arc<Mutex<System>>,
cpu_usage: Gauge<f64, AtomicU64>,
total_memory: Gauge<f64, AtomicU64>,
used_memory: Gauge<f64, AtomicU64>,
free_memory: Gauge<f64, AtomicU64>,
total_disk_available: Gauge<f64, AtomicU64>,
total_disk_space: Gauge<f64, AtomicU64>,
network_in: Gauge<f64, AtomicU64>,
network_out: Gauge<f64, AtomicU64>,
load_avg: Gauge<f64, AtomicU64>,
}

impl SystemMetrics {
pub fn new(registry: &mut Registry) -> Self {
let cpu_usage = Gauge::<f64, AtomicU64>::default();

let total_memory = Gauge::<f64, AtomicU64>::default();
let used_memory = Gauge::<f64, AtomicU64>::default();
let free_memory = Gauge::<f64, AtomicU64>::default();

let total_disk_space = Gauge::<f64, AtomicU64>::default();
let total_disk_available = Gauge::<f64, AtomicU64>::default();

let network_in = Gauge::<f64, AtomicU64>::default();
let network_out = Gauge::<f64, AtomicU64>::default();
let load_avg = Gauge::<f64, AtomicU64>::default();

registry.register("cpu_usage_percent", "CPU usage percent", cpu_usage.clone());

registry.register("total_memory", "Total memory", total_memory.clone());
registry.register("used_memory", "Used memory", used_memory.clone());
registry.register("free_memory", "Free memory", free_memory.clone());

registry.register("total_disk_space", "Total disk space", total_disk_space.clone());
registry.register("total_disk_available", "Total disk available", total_disk_available.clone());

registry.register("network_in_bytes", "Incoming network traffic in bytes", network_in.clone());
registry.register("network_out_bytes", "Outgoing network traffic in bytes", network_out.clone());

registry.register("load_average", "System load average", load_avg.clone());

let system = Arc::new(Mutex::new(System::new_all()));

Self { system, cpu_usage, total_memory, used_memory, free_memory, total_disk_available, total_disk_space, network_in, network_out, load_avg }
}

pub async fn start_collecting(self: Arc<Self>) {
let mut interval = time::interval(Duration::from_secs(5));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the collection interval be set in the configuration file?

Copy link
Author

@cybershang cybershang Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wa5i

Decision item: Use a single interval for all system metrics?

Since current sysinfo only provides separate refresh for CPU, memory, and process metrics; network and disk metrics cannot be refreshed individually.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wa5i

Decision item: Use a single interval for all system metrics?

Since current sysinfo only provides separate refresh for CPU, memory, and process metrics; network and disk metrics cannot be refreshed individually.

Decision: Interval configuration is supported through the configuration file, but currently limited to a single interval.


loop {
interval.tick().await;
self.collect_metrics();
}
}

fn collect_metrics(&self) {
let mut sys = self.system.lock().unwrap();
sys.refresh_all();

self.cpu_usage.set(sys.global_cpu_usage() as f64);

self.total_memory.set(sys.total_memory() as f64);
self.used_memory.set(sys.used_memory() as f64);
self.free_memory.set(sys.free_memory() as f64);

let mut total_available_space = 0;
let mut total_disk_space = 0;

for disk in Disks::new_with_refreshed_list().list() {
total_available_space += disk.available_space();
total_disk_space += disk.total_space();
}
self.total_disk_available.set(total_available_space as f64);
self.total_disk_space.set(total_disk_space as f64);

let mut total_network_in = 0;
let mut total_network_out = 0;

for (_, n) in Networks::new_with_refreshed_list().list() {
total_network_in += n.received();
total_network_out += n.transmitted();
}

self.network_in.set(total_network_in as f64);
self.network_out.set(total_network_out as f64);

self.load_avg.set(System::load_average().one as f64);
}
}
Loading