diff --git a/conductor/src/metrics.rs b/conductor/src/metrics.rs index d96e1e1cd..0b5f160ad 100644 --- a/conductor/src/metrics.rs +++ b/conductor/src/metrics.rs @@ -90,10 +90,39 @@ pub mod dataplane_metrics { /// Results of this metric for all instances pub result: Vec, } + + pub fn split_data_plane_metrics( + metrics: DataPlaneMetrics, + max_size: usize, + ) -> Vec { + let mut result = Vec::new(); + let mut chunk = Vec::new(); + + for item in metrics.result.into_iter() { + if chunk.len() == max_size { + result.push(DataPlaneMetrics { + name: metrics.name.clone(), + result: chunk, + }); + chunk = Vec::new(); + } + chunk.push(item); + } + + if !chunk.is_empty() { + result.push(DataPlaneMetrics { + name: metrics.name.clone(), + result: chunk, + }); + } + + result + } } #[cfg(test)] mod tests { + use crate::metrics::dataplane_metrics::{split_data_plane_metrics, DataPlaneMetrics}; use crate::metrics::prometheus::{MetricLabels, Metrics, MetricsData, MetricsResult}; const QUERY_RESPONSE: &str = r#" @@ -156,4 +185,81 @@ mod tests { assert_eq!(response, expected); } + + #[test] + fn test_split_data_plane_metrics() { + let mut results = Vec::new(); + + for i in 0..2008 { + results.push(MetricsResult { + metric: MetricLabels { + instance_id: format!("inst_{}", i), + pod: format!("pod_{}", i), + }, + value: (i as i64, i as i64), + }); + } + + let data_plane_metrics = DataPlaneMetrics { + name: "test_metric".into(), + result: results, + }; + + let split_metrics = split_data_plane_metrics(data_plane_metrics, 1000); + + assert_eq!( + split_metrics.len(), + 3, + "Expected 3 chunks, got {}", + split_metrics.len() + ); + assert_eq!( + split_metrics[0].result.len(), + 1000, + "First chunk size incorrect" + ); + assert_eq!( + split_metrics[1].result.len(), + 1000, + "Second chunk size incorrect" + ); + assert_eq!( + split_metrics[2].result.len(), + 8, + "Third chunk size incorrect" + ); + } + #[test] + fn test_split_data_plane_metrics_exact() { + let mut results = Vec::new(); + + for i in 0..1000 { + results.push(MetricsResult { + metric: MetricLabels { + instance_id: format!("inst_{}", i), + pod: format!("pod_{}", i), + }, + value: (i as i64, i as i64), + }); + } + + let data_plane_metrics = DataPlaneMetrics { + name: "test_metric".into(), + result: results, + }; + + let split_metrics = split_data_plane_metrics(data_plane_metrics, 1000); + + assert_eq!( + split_metrics.len(), + 1, + "Expected 1 chunks, got {}", + split_metrics.len() + ); + assert_eq!( + split_metrics[0].result.len(), + 1000, + "First chunk size incorrect" + ); + } } diff --git a/conductor/src/metrics_reporter.rs b/conductor/src/metrics_reporter.rs index 029ba673d..fa7572bc7 100644 --- a/conductor/src/metrics_reporter.rs +++ b/conductor/src/metrics_reporter.rs @@ -1,6 +1,7 @@ use std::{env, time::Duration}; use anyhow::{bail, Context, Result}; +use conductor::metrics::dataplane_metrics::split_data_plane_metrics; use conductor::metrics::{dataplane_metrics::DataPlaneMetrics, prometheus::Metrics}; use log::info; use pgmq::PGMQueueExt; @@ -48,6 +49,7 @@ pub async fn run_metrics_reporter() -> Result<()> { sync_interval.tick().await; for metric in &metrics { + info!("Querying '{}' from Prometheus", metric.name); let metrics = client.query(&metric.query).await?; let num_metrics = metrics.data.result.len(); @@ -61,12 +63,23 @@ pub async fn run_metrics_reporter() -> Result<()> { result: metrics.data.result, }; - // Send to PGMQ - queue - .send(&metrics_events_queue, &data_plane_metrics) - .await?; + let batch_size = 1000; + let metrics_to_send = split_data_plane_metrics(data_plane_metrics, batch_size); + let batches = metrics_to_send.len(); - info!("Saved metric to PGMQ"); + info!( + "Split metrics into {} chunks, each with {} results", + batches, batch_size + ); + + let mut i = 1; + for data_plane_metrics in &metrics_to_send { + queue + .send(&metrics_events_queue, data_plane_metrics) + .await?; + info!("Enqueued batch {}/{} to PGMQ", i, batches); + i += 1; + } } } }