Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 29 additions & 23 deletions src/sinks/prometheus/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,27 +509,8 @@ impl PrometheusExporter {
}
_ => metric,
};

match new_metric.kind() {
MetricKind::Absolute => Some(new_metric),
MetricKind::Incremental => {
let metrics = self.metrics.read().expect(LOCK_FAILED);
let metric_ref = MetricRef::from_metric(&new_metric);

if let Some(existing) = metrics.get(&metric_ref) {
let mut current = existing.0.value().clone();
if current.add(new_metric.value()) {
// If we were able to add to the existing value (i.e. they were compatible),
// return the result as an absolute metric.
return Some(new_metric.with_value(current).into_absolute());
}
}

// Otherwise, if we didn't have an existing value or we did and it was not
// compatible with the new value, simply return the new value as absolute.
Some(new_metric.into_absolute())
}
}

Some(new_metric)
}
}

Expand Down Expand Up @@ -574,16 +555,41 @@ impl StreamSink<Event> for PrometheusExporter {

// We have a normalized metric, in absolute form. If we're already aware of this
// metric, update its expiration deadline, otherwise, start tracking it.
//
// For incremental metrics, we accumulate the value atomically under the
// write lock to avoid race conditions where concurrent updates could
// cause lost increments or counter value decreases.
let mut metrics = self.metrics.write().expect(LOCK_FAILED);

match metrics.entry(MetricRef::from_metric(&normalized)) {
Entry::Occupied(mut entry) => {
let (data, metadata) = entry.get_mut();
*data = normalized;

match normalized.kind() {
MetricKind::Absolute => {
*data = normalized;
}
MetricKind::Incremental => {
// For incremental metrics, accumulate atomically under the write lock
let mut current = data.value().clone();
if current.add(normalized.value()) {
// Successfully accumulated - update in place
*data = normalized.with_value(current).into_absolute();
} else {
// Incompatible values - replace with new value as absolute
*data = normalized.into_absolute();
}
}
}
metadata.refresh();
}
Entry::Vacant(entry) => {
entry.insert((normalized, MetricMetadata::new(flush_period)));
// For new metrics, convert incremental to absolute before storing
let to_store = match normalized.kind() {
MetricKind::Absolute => normalized,
MetricKind::Incremental => normalized.into_absolute(),
};
entry.insert((to_store, MetricMetadata::new(flush_period)));
}
}
finalizers.update_status(EventStatus::Delivered);
Expand Down
Loading