Skip to content

Commit

Permalink
Batch metrics data (#850)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjmiller609 authored Jun 22, 2024
1 parent 3b1236e commit 4b2b6f6
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 5 deletions.
106 changes: 106 additions & 0 deletions conductor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,39 @@ pub mod dataplane_metrics {
/// Results of this metric for all instances
pub result: Vec<MetricsResult>,
}

pub fn split_data_plane_metrics(
metrics: DataPlaneMetrics,
max_size: usize,
) -> Vec<DataPlaneMetrics> {
let mut result = Vec::new();
let mut chunk = Vec::new();

for item in metrics.result.into_iter() {
if chunk.len() == max_size {
result.push(DataPlaneMetrics {
name: metrics.name.clone(),
result: chunk,
});
chunk = Vec::new();
}
chunk.push(item);
}

if !chunk.is_empty() {
result.push(DataPlaneMetrics {
name: metrics.name.clone(),
result: chunk,
});
}

result
}
}

#[cfg(test)]
mod tests {
use crate::metrics::dataplane_metrics::{split_data_plane_metrics, DataPlaneMetrics};
use crate::metrics::prometheus::{MetricLabels, Metrics, MetricsData, MetricsResult};

const QUERY_RESPONSE: &str = r#"
Expand Down Expand Up @@ -156,4 +185,81 @@ mod tests {

assert_eq!(response, expected);
}

#[test]
fn test_split_data_plane_metrics() {
let mut results = Vec::new();

for i in 0..2008 {
results.push(MetricsResult {
metric: MetricLabels {
instance_id: format!("inst_{}", i),
pod: format!("pod_{}", i),
},
value: (i as i64, i as i64),
});
}

let data_plane_metrics = DataPlaneMetrics {
name: "test_metric".into(),
result: results,
};

let split_metrics = split_data_plane_metrics(data_plane_metrics, 1000);

assert_eq!(
split_metrics.len(),
3,
"Expected 3 chunks, got {}",
split_metrics.len()
);
assert_eq!(
split_metrics[0].result.len(),
1000,
"First chunk size incorrect"
);
assert_eq!(
split_metrics[1].result.len(),
1000,
"Second chunk size incorrect"
);
assert_eq!(
split_metrics[2].result.len(),
8,
"Third chunk size incorrect"
);
}
#[test]
fn test_split_data_plane_metrics_exact() {
let mut results = Vec::new();

for i in 0..1000 {
results.push(MetricsResult {
metric: MetricLabels {
instance_id: format!("inst_{}", i),
pod: format!("pod_{}", i),
},
value: (i as i64, i as i64),
});
}

let data_plane_metrics = DataPlaneMetrics {
name: "test_metric".into(),
result: results,
};

let split_metrics = split_data_plane_metrics(data_plane_metrics, 1000);

assert_eq!(
split_metrics.len(),
1,
"Expected 1 chunks, got {}",
split_metrics.len()
);
assert_eq!(
split_metrics[0].result.len(),
1000,
"First chunk size incorrect"
);
}
}
23 changes: 18 additions & 5 deletions conductor/src/metrics_reporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{env, time::Duration};

use anyhow::{bail, Context, Result};
use conductor::metrics::dataplane_metrics::split_data_plane_metrics;
use conductor::metrics::{dataplane_metrics::DataPlaneMetrics, prometheus::Metrics};
use log::info;
use pgmq::PGMQueueExt;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub async fn run_metrics_reporter() -> Result<()> {
sync_interval.tick().await;

for metric in &metrics {
info!("Querying '{}' from Prometheus", metric.name);
let metrics = client.query(&metric.query).await?;

let num_metrics = metrics.data.result.len();
Expand All @@ -61,12 +63,23 @@ pub async fn run_metrics_reporter() -> Result<()> {
result: metrics.data.result,
};

// Send to PGMQ
queue
.send(&metrics_events_queue, &data_plane_metrics)
.await?;
let batch_size = 1000;
let metrics_to_send = split_data_plane_metrics(data_plane_metrics, batch_size);
let batches = metrics_to_send.len();

info!("Saved metric to PGMQ");
info!(
"Split metrics into {} chunks, each with {} results",
batches, batch_size
);

let mut i = 1;
for data_plane_metrics in &metrics_to_send {
queue
.send(&metrics_events_queue, data_plane_metrics)
.await?;
info!("Enqueued batch {}/{} to PGMQ", i, batches);
i += 1;
}
}
}
}
Expand Down

0 comments on commit 4b2b6f6

Please sign in to comment.