Skip to content

Commit

Permalink
Add get param to disable jobstats and control max memory (#65)
Browse files Browse the repository at this point in the history
* Limit max memory through systemd

* Add a jobstats get param to disable jobstats

* Fix bug when some jobstats are empty

* Fix visibility to prepare release

* Simplify accepted params

Signed-off-by: Joe Grund <jgrund@whamcloud.io>

---------

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
Co-authored-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
RDruon and jgrund authored Aug 8, 2024
1 parent bf6a96c commit dde9bc2
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 36 deletions.
17 changes: 9 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lustre-collector/src/quota/quota_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ where
}

#[derive(Debug)]
pub enum QMTStat {
pub(crate) enum QMTStat {
Usr(Vec<QuotaStat>),
Prj(Vec<QuotaStat>),
Grp(Vec<QuotaStat>),
Expand Down
3 changes: 2 additions & 1 deletion lustrefs-exporter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ num-traits = "0.2"
prometheus = "0.13"
prometheus_exporter_base = {version = "1.4.0"}
regex = {version = "1", default-features = false, features = ["perf", "std", "perf-dfa-full"]}
serde = {version = "1", features = ["derive"]}
thiserror = "1"
tokio = {workspace = true, features = [
"rt-multi-thread",
Expand All @@ -25,8 +26,8 @@ tracing-subscriber = {workspace = true, features = ["env-filter"]}
tracing.workspace = true

[dev-dependencies]
const_format = "0.2.32"
combine.workspace = true
const_format = "0.2.32"
include_dir.workspace = true
insta.workspace = true
serde_json = "1"
3 changes: 3 additions & 0 deletions lustrefs-exporter/fixtures/jobstats_only/some_empty.txt
Git LFS file not shown
2 changes: 2 additions & 0 deletions lustrefs-exporter/lustrefs_exporter.service
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Documentation=https://github.com/whamcloud/lustrefs-exporter
Environment=RUST_LOG=info
Restart=on-failure
ExecStart=/usr/bin/lustrefs_exporter
MemoryHigh=1750M
MemoryMax=2G

[Install]
WantedBy=multi-user.target
23 changes: 22 additions & 1 deletion lustrefs-exporter/src/jobstats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ pub fn jobstats_stream<R: BufRead + std::marker::Send + 'static>(
{
return Ok((state, LoopInstruction::Noop))
}
State::Empty if line.starts_with("obdfilter") || line.starts_with("mdt.") => {
State::Empty | State::Target(_)
if line.starts_with("obdfilter") || line.starts_with("mdt.") =>
{
state = State::Target(line);
}
State::Target(x) if line.starts_with("- job_id:") => {
Expand Down Expand Up @@ -421,4 +423,23 @@ job_stats:{}"#,
* 10
);
}

#[tokio::test(flavor = "multi_thread")]
async fn parse_some_empty() {
let f = File::open("fixtures/jobstats_only/some_empty.txt").unwrap();

let f = BufReader::with_capacity(128 * 1_024, f);

let (fut, mut rx) = jobstats_stream(f);

let mut cnt = 0;

while rx.recv().await.is_some() {
cnt += 1;
}

fut.await.unwrap();

assert_eq!(cnt, 108);
}
}
69 changes: 44 additions & 25 deletions lustrefs-exporter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use axum::{
body::{Body, Bytes},
error_handling::HandleErrorLayer,
extract::Query,
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Expand All @@ -13,6 +14,7 @@ use axum::{
use clap::Parser;
use lustre_collector::{parse_lctl_output, parse_lnetctl_output, parse_lnetctl_stats, parser};
use lustrefs_exporter::{build_lustre_stats, Error};
use serde::Deserialize;
use std::{
borrow::Cow,
convert::Infallible,
Expand Down Expand Up @@ -50,6 +52,17 @@ async fn handle_error(error: BoxError) -> impl IntoResponse {
)
}

fn default_as_true() -> bool {
true
}

#[derive(Debug, Deserialize)]
struct Params {
// Only disable jobstats if "jobstats=false"
#[serde(default = "default_as_true")]
jobstats: bool,
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt::init();
Expand All @@ -76,7 +89,7 @@ async fn main() -> Result<(), Error> {
Ok(())
}

async fn scrape() -> Result<Response<Body>, Error> {
async fn scrape(Query(params): Query<Params>) -> Result<Response<Body>, Error> {
let mut output = vec![];

let lctl = Command::new("lctl")
Expand Down Expand Up @@ -112,36 +125,42 @@ async fn scrape() -> Result<Response<Body>, Error> {

output.append(&mut lnetctl_stats_record);

let reader = tokio::task::spawn_blocking(move || {
let mut lctl_jobstats = std::process::Command::new("lctl")
.arg("get_param")
.args(["obdfilter.*OST*.job_stats", "mdt.*.job_stats"])
.stdout(std::process::Stdio::piped())
.spawn()?;

let reader = BufReader::with_capacity(
128 * 1_024,
lctl_jobstats.stdout.take().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"stdout missing for lctl jobstats call.",
))?,
);
let s = if params.jobstats {
let reader = tokio::task::spawn_blocking(move || {
let mut lctl_jobstats = std::process::Command::new("lctl")
.arg("get_param")
.args(["obdfilter.*OST*.job_stats", "mdt.*.job_stats"])
.stdout(std::process::Stdio::piped())
.spawn()?;

let reader = BufReader::with_capacity(
128 * 1_024,
lctl_jobstats.stdout.take().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"stdout missing for lctl jobstats call.",
))?,
);

Ok::<_, Error>(reader)
})
.await??;

Ok::<_, Error>(reader)
})
.await??;
let (_, rx) = lustrefs_exporter::jobstats::jobstats_stream(reader);

let (_, rx) = lustrefs_exporter::jobstats::jobstats_stream(reader);
let stream = ReceiverStream::new(rx)
.map(|x| Bytes::from_iter(x.into_bytes()))
.map(Ok);

let stream = ReceiverStream::new(rx)
.map(|x| Bytes::from_iter(x.into_bytes()))
.map(Ok);
let lustre_stats = Ok::<_, Infallible>(build_lustre_stats(output).into());

let lustre_stats = Ok::<_, Infallible>(build_lustre_stats(output).into());
let merged = tokio_stream::StreamExt::merge(tokio_stream::once(lustre_stats), stream);

let merged = tokio_stream::StreamExt::merge(tokio_stream::once(lustre_stats), stream);
Body::from_stream(merged)
} else {
tracing::debug!("Jobstats is disabled");

let s = Body::from_stream(merged);
Body::from(build_lustre_stats(output))
};

let response_builder = Response::builder().status(StatusCode::OK);

Expand Down

0 comments on commit dde9bc2

Please sign in to comment.