Skip to content

Commit

Permalink
Add tests for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
esensar committed Jan 27, 2025
1 parent 0198b7e commit 940bb28
Showing 1 changed file with 214 additions and 2 deletions.
216 changes: 214 additions & 2 deletions src/enrichment_tables/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,19 @@ impl StreamSink<Event> for Memory {

#[cfg(test)]
mod tests {
use futures::future::ready;
use futures::{future::ready, StreamExt};
use futures_util::stream;
use std::time::Duration;

use vector_lib::sink::VectorSink;
use vector_lib::{
event::{EventContainer, MetricValue},
metrics::Controller,
sink::VectorSink,
};

use super::*;
use crate::{
enrichment_tables::memory::internal_events::InternalMetricsConfig,
event::{Event, LogEvent},
test_util::components::{run_and_assert_sink_compliance, SINK_TAGS},
};
Expand Down Expand Up @@ -614,4 +619,211 @@ mod tests {
)
.await;
}

#[tokio::test]
async fn flush_metrics_without_interval() {
let event = Event::Log(LogEvent::from(ObjectMap::from([(
"test_key".into(),
Value::from(5),
)])));

let memory = Memory::new(Default::default());

run_and_assert_sink_compliance(
VectorSink::from_event_streamsink(memory),
stream::once(ready(event)),
&SINK_TAGS,
)
.await;

let metrics = Controller::get().unwrap().capture_metrics();
let insertions_counter = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Counter { .. })
&& m.name() == "memory_enrichment_table_insertions_total"
})
.expect("Insertions metric is missing!");
let MetricValue::Counter {
value: insertions_count,
} = insertions_counter.value()
else {
unreachable!();
};
let flushes_counter = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Counter { .. })
&& m.name() == "memory_enrichment_table_flushes_total"
})
.expect("Flushes metric is missing!");
let MetricValue::Counter {
value: flushes_count,
} = flushes_counter.value()
else {
unreachable!();
};
let object_count_gauge = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Gauge { .. })
&& m.name() == "memory_enrichment_table_objects_count"
})
.expect("Object count metric is missing!");
let MetricValue::Gauge {
value: object_count,
} = object_count_gauge.value()
else {
unreachable!();
};
let byte_size_gauge = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Gauge { .. })
&& m.name() == "memory_enrichment_table_byte_size"
})
.expect("Byte size metric is missing!");
assert_eq!(*insertions_count, 1.0);
assert_eq!(*flushes_count, 1.0);
assert_eq!(*object_count, 1.0);
assert!(!byte_size_gauge.is_empty());
}

#[tokio::test]
async fn flush_metrics_with_interval() {
let event = Event::Log(LogEvent::from(ObjectMap::from([(
"test_key".into(),
Value::from(5),
)])));

let memory = Memory::new(build_memory_config(|c| {
c.flush_interval = Some(1);
}));

run_and_assert_sink_compliance(
VectorSink::from_event_streamsink(memory),
stream::iter(vec![event.clone(), event]).flat_map(|e| {
stream::once(async move {
tokio::time::sleep(Duration::from_millis(600)).await;
e
})
}),
&SINK_TAGS,
)
.await;

let metrics = Controller::get().unwrap().capture_metrics();
let insertions_counter = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Counter { .. })
&& m.name() == "memory_enrichment_table_insertions_total"
})
.expect("Insertions metric is missing!");
let MetricValue::Counter {
value: insertions_count,
} = insertions_counter.value()
else {
unreachable!();
};
let flushes_counter = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Counter { .. })
&& m.name() == "memory_enrichment_table_flushes_total"
})
.expect("Flushes metric is missing!");
let MetricValue::Counter {
value: flushes_count,
} = flushes_counter.value()
else {
unreachable!();
};
let object_count_gauge = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Gauge { .. })
&& m.name() == "memory_enrichment_table_objects_count"
})
.expect("Object count metric is missing!");
let MetricValue::Gauge {
value: object_count,
} = object_count_gauge.value()
else {
unreachable!();
};
let byte_size_gauge = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Gauge { .. })
&& m.name() == "memory_enrichment_table_byte_size"
})
.expect("Byte size metric is missing!");

assert_eq!(*insertions_count, 2.0);
// One is done right away and the next one after the interval
assert_eq!(*flushes_count, 2.0);
assert_eq!(*object_count, 1.0);
assert!(!byte_size_gauge.is_empty());
}

#[tokio::test]
async fn flush_metrics_with_key() {
let event = Event::Log(LogEvent::from(ObjectMap::from([(
"test_key".into(),
Value::from(5),
)])));

let memory = Memory::new(build_memory_config(|c| {
c.internal_metrics = InternalMetricsConfig {
include_key_tag: true,
};
}));

run_and_assert_sink_compliance(
VectorSink::from_event_streamsink(memory),
stream::once(ready(event)),
&SINK_TAGS,
)
.await;

let metrics = Controller::get().unwrap().capture_metrics();
let insertions_counter = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Counter { .. })
&& m.name() == "memory_enrichment_table_insertions_total"
})
.expect("Insertions metric is missing!");

assert!(insertions_counter.tag_matches("key", "test_key"));
}

#[tokio::test]
async fn flush_metrics_without_key() {
let event = Event::Log(LogEvent::from(ObjectMap::from([(
"test_key".into(),
Value::from(5),
)])));

let memory = Memory::new(Default::default());

run_and_assert_sink_compliance(
VectorSink::from_event_streamsink(memory),
stream::once(ready(event)),
&SINK_TAGS,
)
.await;

let metrics = Controller::get().unwrap().capture_metrics();
let insertions_counter = metrics
.iter()
.find(|m| {
matches!(m.value(), MetricValue::Counter { .. })
&& m.name() == "memory_enrichment_table_insertions_total"
})
.expect("Insertions metric is missing!");

assert!(insertions_counter.tag_value("key").is_none());
}
}

0 comments on commit 940bb28

Please sign in to comment.