From dde9bc278396c09ab2fd6e4f3c74493b7f1da31c Mon Sep 17 00:00:00 2001 From: Raphael Druon <64585623+RDruon@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:53:53 +0200 Subject: [PATCH] Add get param to disable jobstats and control max memory (#65) * Limit max memory through systemd * Add a jobstats get param to disable jobstats * Fix bug when some jobstats are empty * Fix visibility to prepare release * Simplify accepted params Signed-off-by: Joe Grund --------- Signed-off-by: Joe Grund Co-authored-by: Joe Grund --- Cargo.lock | 17 ++--- lustre-collector/src/quota/quota_parser.rs | 2 +- lustrefs-exporter/Cargo.toml | 3 +- .../fixtures/jobstats_only/some_empty.txt | 3 + lustrefs-exporter/lustrefs_exporter.service | 2 + lustrefs-exporter/src/jobstats.rs | 23 ++++++- lustrefs-exporter/src/main.rs | 69 ++++++++++++------- 7 files changed, 83 insertions(+), 36 deletions(-) create mode 100644 lustrefs-exporter/fixtures/jobstats_only/some_empty.txt diff --git a/Cargo.lock b/Cargo.lock index fc17dab..1b924b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,9 +202,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" +checksum = "504bdec147f2cc13c8b57ed9401fd8a147cc66b67ad5cb241394244f2c947549" [[package]] name = "cfg-if" @@ -676,6 +676,7 @@ dependencies = [ "prometheus", "prometheus_exporter_base", "regex", + "serde", "serde_json", "thiserror", "tokio", @@ -818,9 +819,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.2" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" dependencies = [ "memchr", ] @@ -1043,18 +1044,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.204" +version = "1.0.205" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" +checksum = "e33aedb1a7135da52b7c21791455563facbbcc43d0f0f66165b42c21b3dfb150" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.204" +version = "1.0.205" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" +checksum = "692d6f5ac90220161d6774db30c662202721e64aed9058d2c394f451261420c1" dependencies = [ "proc-macro2", "quote", diff --git a/lustre-collector/src/quota/quota_parser.rs b/lustre-collector/src/quota/quota_parser.rs index e7b1a25..ee22189 100644 --- a/lustre-collector/src/quota/quota_parser.rs +++ b/lustre-collector/src/quota/quota_parser.rs @@ -97,7 +97,7 @@ where } #[derive(Debug)] -pub enum QMTStat { +pub(crate) enum QMTStat { Usr(Vec), Prj(Vec), Grp(Vec), diff --git a/lustrefs-exporter/Cargo.toml b/lustrefs-exporter/Cargo.toml index 8cf5bc8..06bf942 100644 --- a/lustrefs-exporter/Cargo.toml +++ b/lustrefs-exporter/Cargo.toml @@ -13,6 +13,7 @@ num-traits = "0.2" prometheus = "0.13" prometheus_exporter_base = {version = "1.4.0"} regex = {version = "1", default-features = false, features = ["perf", "std", "perf-dfa-full"]} +serde = {version = "1", features = ["derive"]} thiserror = "1" tokio = {workspace = true, features = [ "rt-multi-thread", @@ -25,8 +26,8 @@ tracing-subscriber = {workspace = true, features = ["env-filter"]} tracing.workspace = true [dev-dependencies] -const_format = "0.2.32" combine.workspace = true +const_format = "0.2.32" include_dir.workspace = true insta.workspace = true serde_json = "1" diff --git a/lustrefs-exporter/fixtures/jobstats_only/some_empty.txt b/lustrefs-exporter/fixtures/jobstats_only/some_empty.txt new file mode 100644 index 0000000..043acf1 --- /dev/null +++ b/lustrefs-exporter/fixtures/jobstats_only/some_empty.txt @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:c3aaa9e9f0daf3ca55b889bec0b1b57efdeaf2864da58b8ac6196956a48fb43e +size 2260 diff --git a/lustrefs-exporter/lustrefs_exporter.service b/lustrefs-exporter/lustrefs_exporter.service index 3ef8367..0358481 100644 --- a/lustrefs-exporter/lustrefs_exporter.service +++ b/lustrefs-exporter/lustrefs_exporter.service @@ -6,6 +6,8 @@ Documentation=https://github.com/whamcloud/lustrefs-exporter Environment=RUST_LOG=info Restart=on-failure ExecStart=/usr/bin/lustrefs_exporter +MemoryHigh=1750M +MemoryMax=2G [Install] WantedBy=multi-user.target diff --git a/lustrefs-exporter/src/jobstats.rs b/lustrefs-exporter/src/jobstats.rs index b330448..0462e8c 100644 --- a/lustrefs-exporter/src/jobstats.rs +++ b/lustrefs-exporter/src/jobstats.rs @@ -90,7 +90,9 @@ pub fn jobstats_stream( { return Ok((state, LoopInstruction::Noop)) } - State::Empty if line.starts_with("obdfilter") || line.starts_with("mdt.") => { + State::Empty | State::Target(_) + if line.starts_with("obdfilter") || line.starts_with("mdt.") => + { state = State::Target(line); } State::Target(x) if line.starts_with("- job_id:") => { @@ -421,4 +423,23 @@ job_stats:{}"#, * 10 ); } + + #[tokio::test(flavor = "multi_thread")] + async fn parse_some_empty() { + let f = File::open("fixtures/jobstats_only/some_empty.txt").unwrap(); + + let f = BufReader::with_capacity(128 * 1_024, f); + + let (fut, mut rx) = jobstats_stream(f); + + let mut cnt = 0; + + while rx.recv().await.is_some() { + cnt += 1; + } + + fut.await.unwrap(); + + assert_eq!(cnt, 108); + } } diff --git a/lustrefs-exporter/src/main.rs b/lustrefs-exporter/src/main.rs index cdb38f3..3a2a851 100644 --- a/lustrefs-exporter/src/main.rs +++ b/lustrefs-exporter/src/main.rs @@ -5,6 +5,7 @@ use axum::{ body::{Body, Bytes}, error_handling::HandleErrorLayer, + extract::Query, http::StatusCode, response::{IntoResponse, Response}, routing::get, @@ -13,6 +14,7 @@ use axum::{ use clap::Parser; use lustre_collector::{parse_lctl_output, parse_lnetctl_output, parse_lnetctl_stats, parser}; use lustrefs_exporter::{build_lustre_stats, Error}; +use serde::Deserialize; use std::{ borrow::Cow, convert::Infallible, @@ -50,6 +52,17 @@ async fn handle_error(error: BoxError) -> impl IntoResponse { ) } +fn default_as_true() -> bool { + true +} + +#[derive(Debug, Deserialize)] +struct Params { + // Only disable jobstats if "jobstats=false" + #[serde(default = "default_as_true")] + jobstats: bool, +} + #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt::init(); @@ -76,7 +89,7 @@ async fn main() -> Result<(), Error> { Ok(()) } -async fn scrape() -> Result, Error> { +async fn scrape(Query(params): Query) -> Result, Error> { let mut output = vec![]; let lctl = Command::new("lctl") @@ -112,36 +125,42 @@ async fn scrape() -> Result, Error> { output.append(&mut lnetctl_stats_record); - let reader = tokio::task::spawn_blocking(move || { - let mut lctl_jobstats = std::process::Command::new("lctl") - .arg("get_param") - .args(["obdfilter.*OST*.job_stats", "mdt.*.job_stats"]) - .stdout(std::process::Stdio::piped()) - .spawn()?; - - let reader = BufReader::with_capacity( - 128 * 1_024, - lctl_jobstats.stdout.take().ok_or(io::Error::new( - io::ErrorKind::NotFound, - "stdout missing for lctl jobstats call.", - ))?, - ); + let s = if params.jobstats { + let reader = tokio::task::spawn_blocking(move || { + let mut lctl_jobstats = std::process::Command::new("lctl") + .arg("get_param") + .args(["obdfilter.*OST*.job_stats", "mdt.*.job_stats"]) + .stdout(std::process::Stdio::piped()) + .spawn()?; + + let reader = BufReader::with_capacity( + 128 * 1_024, + lctl_jobstats.stdout.take().ok_or(io::Error::new( + io::ErrorKind::NotFound, + "stdout missing for lctl jobstats call.", + ))?, + ); + + Ok::<_, Error>(reader) + }) + .await??; - Ok::<_, Error>(reader) - }) - .await??; + let (_, rx) = lustrefs_exporter::jobstats::jobstats_stream(reader); - let (_, rx) = lustrefs_exporter::jobstats::jobstats_stream(reader); + let stream = ReceiverStream::new(rx) + .map(|x| Bytes::from_iter(x.into_bytes())) + .map(Ok); - let stream = ReceiverStream::new(rx) - .map(|x| Bytes::from_iter(x.into_bytes())) - .map(Ok); + let lustre_stats = Ok::<_, Infallible>(build_lustre_stats(output).into()); - let lustre_stats = Ok::<_, Infallible>(build_lustre_stats(output).into()); + let merged = tokio_stream::StreamExt::merge(tokio_stream::once(lustre_stats), stream); - let merged = tokio_stream::StreamExt::merge(tokio_stream::once(lustre_stats), stream); + Body::from_stream(merged) + } else { + tracing::debug!("Jobstats is disabled"); - let s = Body::from_stream(merged); + Body::from(build_lustre_stats(output)) + }; let response_builder = Response::builder().status(StatusCode::OK);