Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument RustyVault with Prometheus #76

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ glob = "0.3"
serde_asn1_der = "0.8"
base64 = "0.22"
ipnetwork = "0.20"
prometheus-client = "0.22.3"
tokio = "1.40.0"
sysinfo = "0.31.4"

# optional dependencies
openssl = { version = "0.10.64", optional = true }
Expand Down
27 changes: 22 additions & 5 deletions src/cli/command/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{
sync::{Arc, RwLock},
};

use actix_web::{middleware, web, App, HttpResponse, HttpServer};
use actix_web::{
middleware::{self, from_fn},
web, App, HttpResponse, HttpServer,
};
use anyhow::format_err;
use clap::ArgMatches;
use openssl::{
Expand All @@ -21,8 +24,8 @@ use crate::{
core::Core,
errors::RvError,
http,
storage,
EXIT_CODE_INSUFFICIENT_PARAMS, EXIT_CODE_LOAD_CONFIG_FAILURE, EXIT_CODE_OK,
metrics::{manager::MetricsManager, middleware::metrics_midleware},
storage, EXIT_CODE_INSUFFICIENT_PARAMS, EXIT_CODE_LOAD_CONFIG_FAILURE, EXIT_CODE_OK,
};

pub const WORK_DIR_PATH_DEFAULT: &str = "/tmp/rusty_vault";
Expand Down Expand Up @@ -113,7 +116,14 @@ pub fn main(config_path: &str) -> Result<(), RvError> {

let barrier = storage::barrier_aes_gcm::AESGCMBarrier::new(Arc::clone(&backend));

let core = Arc::new(RwLock::new(Core { physical: backend, barrier: Arc::new(barrier), ..Default::default() }));
let metrics_manager = Arc::new(RwLock::new(MetricsManager::new(config.collection_interval)));
let system_metrics = Arc::clone(&metrics_manager.read().unwrap().system_metrics);

let core = Arc::new(RwLock::new(Core {
physical: backend,
barrier: Arc::new(barrier),
..Default::default()
}));

{
let mut c = core.write()?;
Expand All @@ -123,7 +133,9 @@ pub fn main(config_path: &str) -> Result<(), RvError> {
let mut http_server = HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
.wrap(from_fn(metrics_midleware))
.app_data(web::Data::new(Arc::clone(&core)))
.app_data(web::Data::new(Arc::clone(&metrics_manager)))
.configure(http::init_service)
.default_service(web::to(|| HttpResponse::NotFound()))
})
Expand Down Expand Up @@ -182,7 +194,12 @@ pub fn main(config_path: &str) -> Result<(), RvError> {

log::info!("rusty_vault server starts, waiting for request...");

server.block_on(async { http_server.run().await })?;
server.block_on(async {
tokio::spawn(async {
system_metrics.start_collecting().await;
});
http_server.run().await
})?;
let _ = server.run();

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct Config {
pub daemon_user: String,
#[serde(default)]
pub daemon_group: String,
#[serde(default)]
pub collection_interval: u64,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The collection_interval has no default value.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default value added through fn default_collection_interval() -> u64.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default value added through fn default_collection_interval() -> u64.

}

/// A struct that contains several configurable options for networking stuffs
Expand Down
24 changes: 24 additions & 0 deletions src/http/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::sync::{Arc, RwLock};

use actix_web::{web, HttpResponse};
use prometheus_client::encoding::text::encode;
use crate::metrics::manager::MetricsManager;

pub async fn metrics_handler(metrics_manager: web::Data<Arc<RwLock<MetricsManager>>>) -> HttpResponse {
let m = metrics_manager.read().unwrap();
let registry = m.registry.lock().unwrap();

let mut buffer = String::new();
if let Err(e) = encode(&mut buffer, &registry) {
log::error!("Failed to encode metrics: {}", e);
return HttpResponse::InternalServerError().finish();
}

HttpResponse::Ok()
.content_type("text/plain; version=0.0.4")
.body(buffer)
}

pub fn init_metrics_service(cfg: &mut web::ServiceConfig){
cfg.service(web::resource("/metrics").route(web::get().to(metrics_handler)));
}
2 changes: 2 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{core::Core, errors::RvError, logical::Request};

pub mod logical;
pub mod sys;
pub mod metrics;

pub const AUTH_COOKIE_NAME: &str = "token";
pub const AUTH_HEADER_NAME: &str = "X-RustyVault-Token";
Expand Down Expand Up @@ -101,6 +102,7 @@ pub fn request_on_connect_handler(conn: &dyn Any, ext: &mut Extensions) {
pub fn init_service(cfg: &mut web::ServiceConfig) {
sys::init_sys_service(cfg);
logical::init_logical_service(cfg);
metrics::init_metrics_service(cfg);
}

impl ResponseError for RvError {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub mod router;
pub mod shamir;
pub mod storage;
pub mod utils;
pub mod metrics;
#[cfg(feature = "storage_mysql")]
pub mod schema;

Expand Down
73 changes: 73 additions & 0 deletions src/metrics/http_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::fmt::Write;

use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue, LabelValueEncoder};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{linear_buckets, Histogram};
use prometheus_client::registry::Registry;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should also be formatted with cargo fmt.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatted with cargo fmt.


#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum MetricsMethod {
GET,
POST,
PUT,
DELETE,
LIST,
OTHER,
wa5i marked this conversation as resolved.
Show resolved Hide resolved
}

impl EncodeLabelValue for MetricsMethod {
fn encode(&self, writer: &mut LabelValueEncoder<'_>) -> Result<(), std::fmt::Error> {
match self {
MetricsMethod::GET => writer.write_str("get"),
MetricsMethod::POST => writer.write_str("post"),
MetricsMethod::PUT => writer.write_str("put"),
MetricsMethod::DELETE => writer.write_str("delete"),
MetricsMethod::LIST => writer.write_str("list"),
MetricsMethod::OTHER => writer.write_str("other"),
}
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct HttpLabel {
pub path: String,
pub method: MetricsMethod,
pub status: u16,
}

#[derive(Clone)]
pub struct HttpMetrics {
requests: Family<HttpLabel, Counter>,
histogram: Family<HttpLabel, Histogram>,
}

impl HttpMetrics {
pub fn new(registry: &mut Registry) -> Self {
let requests = Family::<HttpLabel, Counter>::default();
let histogram =
Family::<HttpLabel, Histogram>::new_with_constructor(|| Histogram::new(linear_buckets(0.1, 0.1, 10)));

registry.register(
"http_request_count",
"Number of HTTP requests received, labeled by method and status",
requests.clone(),
);

registry.register(
"http_request_duration_seconds",
"Duration of HTTP requests, labeled by method and status",
histogram.clone(),
);

Self { requests, histogram }
}

pub fn increment_request_count(&self, label: &HttpLabel) {
self.requests.get_or_create(label).inc();
}

pub fn observe_duration(&self, label: &HttpLabel, duration: f64) {
self.histogram.get_or_create(label).observe(duration);
}
}
20 changes: 20 additions & 0 deletions src/metrics/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crate::metrics::http_metrics::HttpMetrics;
use crate::metrics::system_metrics::SystemMetrics;
use prometheus_client::registry::Registry;
use std::sync::{Arc, Mutex};

#[derive(Clone)]
pub struct MetricsManager {
pub registry: Arc<Mutex<Registry>>,
pub system_metrics: Arc<SystemMetrics>,
pub http_metrics: Arc<HttpMetrics>,
}

impl MetricsManager {
pub fn new(collection_interval: u64) -> Self {
let registry = Arc::new(Mutex::new(Registry::default()));
let system_metrics = Arc::new(SystemMetrics::new(&mut registry.lock().unwrap(), collection_interval));
let http_metrics = Arc::new(HttpMetrics::new(&mut registry.lock().unwrap()));
MetricsManager { registry, system_metrics, http_metrics }
}
}
41 changes: 41 additions & 0 deletions src/metrics/middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::{sync::{Arc, RwLock}, time::Instant};

use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
http::Method,
middleware::Next,
web::Data,
Error,
};
use crate::metrics::http_metrics::HttpLabel;

use super::{http_metrics::MetricsMethod, manager::MetricsManager};

pub async fn metrics_midleware(
req: ServiceRequest,
next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {
let start_time = Instant::now();
let path = req.path().to_string();
let method = match *req.method() {
Method::GET => MetricsMethod::GET,
Method::POST => MetricsMethod::POST,
Method::PUT => MetricsMethod::PUT,
Method::DELETE => MetricsMethod::DELETE,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, LIST is missing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LIST added.

_ => MetricsMethod::OTHER,
};

let res = next.call(req).await?;

let status = res.status().as_u16();
let label = HttpLabel{path, method, status};
if let Some(m) = res.request().app_data::<Data<Arc<RwLock<MetricsManager>>>>(){
let metrics_manager = m.read().unwrap();
metrics_manager.http_metrics.increment_request_count(&label);
let duration = start_time.elapsed().as_secs_f64();
metrics_manager.http_metrics.observe_duration(&label, duration);
}

Ok(res)
}
8 changes: 8 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! The `rusty_vault::metriccs` is a module that utilize Prometheus to capture system metrics defines 'backend' and relevant data
//! defines 'manager' and relevant data structures such as `SystemMetrics` and `HttpMetrics`
//!
//! The 'manager' holds the Prometheus registry
pub mod middleware;
pub mod manager;
pub mod system_metrics;
pub mod http_metrics;
109 changes: 109 additions & 0 deletions src/metrics/system_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use std::sync::{atomic::AtomicU64, Arc, Mutex};
use sysinfo::{Disks, Networks, System};
use tokio::time::{self, Duration};

pub struct SystemMetrics {
system: Arc<Mutex<System>>,
collection_interval: u64,
cpu_usage: Gauge<f64, AtomicU64>,
total_memory: Gauge<f64, AtomicU64>,
used_memory: Gauge<f64, AtomicU64>,
free_memory: Gauge<f64, AtomicU64>,
total_disk_available: Gauge<f64, AtomicU64>,
total_disk_space: Gauge<f64, AtomicU64>,
network_in: Gauge<f64, AtomicU64>,
network_out: Gauge<f64, AtomicU64>,
load_avg: Gauge<f64, AtomicU64>,
}

impl SystemMetrics {
pub fn new(registry: &mut Registry, collection_interval: u64) -> Self {
let cpu_usage = Gauge::<f64, AtomicU64>::default();

let total_memory = Gauge::<f64, AtomicU64>::default();
let used_memory = Gauge::<f64, AtomicU64>::default();
let free_memory = Gauge::<f64, AtomicU64>::default();

let total_disk_space = Gauge::<f64, AtomicU64>::default();
let total_disk_available = Gauge::<f64, AtomicU64>::default();

let network_in = Gauge::<f64, AtomicU64>::default();
let network_out = Gauge::<f64, AtomicU64>::default();
let load_avg = Gauge::<f64, AtomicU64>::default();

registry.register("cpu_usage_percent", "CPU usage percent", cpu_usage.clone());

registry.register("total_memory", "Total memory", total_memory.clone());
registry.register("used_memory", "Used memory", used_memory.clone());
registry.register("free_memory", "Free memory", free_memory.clone());

registry.register("total_disk_space", "Total disk space", total_disk_space.clone());
registry.register("total_disk_available", "Total disk available", total_disk_available.clone());

registry.register("network_in_bytes", "Incoming network traffic in bytes", network_in.clone());
registry.register("network_out_bytes", "Outgoing network traffic in bytes", network_out.clone());

registry.register("load_average", "System load average", load_avg.clone());

let system = Arc::new(Mutex::new(System::new_all()));

Self {
system,
collection_interval,
cpu_usage,
total_memory,
used_memory,
free_memory,
total_disk_available,
total_disk_space,
network_in,
network_out,
load_avg,
}
}

pub async fn start_collecting(self: Arc<Self>) {
let mut interval = time::interval(Duration::from_secs(self.collection_interval));

loop {
interval.tick().await;
self.collect_metrics();
}
}

fn collect_metrics(&self) {
let mut sys = self.system.lock().unwrap();
sys.refresh_all();

self.cpu_usage.set(sys.global_cpu_usage() as f64);

self.total_memory.set(sys.total_memory() as f64);
self.used_memory.set(sys.used_memory() as f64);
self.free_memory.set(sys.free_memory() as f64);

let mut total_available_space = 0;
let mut total_disk_space = 0;

for disk in Disks::new_with_refreshed_list().list() {
total_available_space += disk.available_space();
total_disk_space += disk.total_space();
}
self.total_disk_available.set(total_available_space as f64);
self.total_disk_space.set(total_disk_space as f64);

let mut total_network_in = 0;
let mut total_network_out = 0;

for (_, n) in Networks::new_with_refreshed_list().list() {
total_network_in += n.received();
total_network_out += n.transmitted();
}

self.network_in.set(total_network_in as f64);
self.network_out.set(total_network_out as f64);

self.load_avg.set(System::load_average().one as f64);
}
}