Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metrics dedup/sort bug #2093

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

utpilla
Copy link
Contributor

@utpilla utpilla commented Sep 10, 2024

ValueMap holds both the user provided order and the sorted order of metrics attributes for looking up a MetricPoint. This is done for optimizing the lookup performance when the user provides the attributes in an unsorted order. Currently, we only dedup the metric attributes when storing the sorted order of metric attributes. This can lead to issues when collecting metrics as we could return a non-deduplicated and/or non-sorted set of attributes to the exporters.

For example, if a user provides duplicate keys when reporting measurments:

counter.add(10, &[KeyValue::new("key1", "value1"), KeyValue::new("key1", "value2")]);

We would store two entries in the ValueMap

  1. User-provided order: vec![KeyValue::new("key1", "value1"), KeyValue::new("key1", "value2")]
  2. Deduped + sorted order: vec![KeyValue::new("key1", "value2")]

When we collect metrics, we should only send the deduped + sorted set of attributes to the exporters.
 

Changes

  • No change in the hot-path
  • Updated the collect code for ValueMap based instruments to dedup and sort the metric attributes if needed.

Benchmarks

  • No changes in the benchmark results although this would increase the time taken to collect metrics.
Machine config OS: Ubuntu 22.04.4 LTS (5.15.153.1-microsoft-standard-WSL2) Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs, RAM: 64.0 GB
Test Average time
Counter_Add_Sorted 181.49 ns
Counter_Add_Unsorted 185.92 ns
Counter_Overflow 837.50 ns
ThreadLocal_Random_Generator_5 36.574 ns

Merge requirement checklist

  • CONTRIBUTING guidelines followed
  • Unit tests added/updated (if applicable)
  • Appropriate CHANGELOG.md files updated for non-trivial, user-facing changes
  • Changes in public API reviewed (if applicable)

@utpilla utpilla requested a review from a team September 10, 2024 07:22
Copy link

codecov bot commented Sep 10, 2024

Codecov Report

Attention: Patch coverage is 93.54839% with 6 lines in your changes missing coverage. Please review.

Project coverage is 78.4%. Comparing base (7a074b5) to head (4b53e38).
Report is 51 commits behind head on main.

Files with missing lines Patch % Lines
opentelemetry-sdk/src/metrics/mod.rs 92.9% 6 Missing ⚠️
Additional details and impacted files
@@          Coverage Diff          @@
##            main   #2093   +/-   ##
=====================================
  Coverage   78.3%   78.4%           
=====================================
  Files        121     121           
  Lines      20815   20894   +79     
=====================================
+ Hits       16309   16383   +74     
- Misses      4506    4511    +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@utpilla
Copy link
Contributor Author

utpilla commented Sep 10, 2024

A few other approaches to solve this could be:

  • Update the ValueMap to also store information about which key can be used for export. So instead of simply storing AtomicTracker as the value, we could use a (bool useKeyForExport, AtomicTracker tracker) as the value type. We would only mark the deduped and sorted set of attributes as useForExport=true. This way when we iterate over ValueMap for collecting metrics, we could check if a given key could be used for export. The downside to this would be that there would be another indirection in the hot path for accessing the tracker.
  • We could make it the exporter's responsibility to dedup and sort metric attributes instead of doing it in the SDK.

@fraillt
Copy link
Contributor

fraillt commented Sep 10, 2024

We could make it the exporter's responsibility to dedup and sort metric attributes instead of doing it in the SDK.

I'm working on same issue, and I think it's not possible to leave it as exporters responsibility, because SDK still need to correctly update specific tracker. E.g. [k1=v1, k2=v2] and [k2=v2, k1=v1] should use same tracker instance.

But I like your ideas :)

@utpilla
Copy link
Contributor Author

utpilla commented Sep 10, 2024

We could make it the exporter's responsibility to dedup and sort metric attributes instead of doing it in the SDK.

I'm working on same issue, and I think it's not possible to leave it as exporters responsibility, because SDK still need to correctly update specific tracker. E.g. [k1=v1, k2=v2] and [k2=v2, k1=v1] should use same tracker instance.

But I like your ideas :)

@fraillt My sentence might have been misleading. When I suggested we make deduping/sorting of attributes the responsibility of exporter and not SDK, I was referring only to collect path and not the update path. This is how it would work:

  • (Within SDK) No changes in the update path. SDK would continue to keep two entries for performant lookups: one for the user provided order and one for the deduped and sorted order.
  • (Within SDK) No changes in the collect path. SDK would continue to do the naive iteration of the ValueMap and pick whichever key it gets (may or may not be the deduped and sorted set)
  • (Individual exporters) Any exporter that wants to offer deduped and sorted attributes to users, would iterate over the provided ResourceMetrics and dedup and sort them before the actual export (sending over the network, printing to console etc.). That would ensure that the user sees deduped and sorted attributes.

Comment on lines +129 to +168
#[allow(dead_code)]
pub(crate) struct KeyValueHelper;

impl KeyValueHelper {
#[allow(dead_code)]
pub(crate) fn dedup_and_sort_attributes(attributes: &[KeyValue]) -> Vec<KeyValue> {
// Check if the attributes are already deduped
let mut has_duplicates = false;
let mut keys_set: HashSet<Key> = HashSet::with_capacity(attributes.len());
for kv in attributes {
if !keys_set.insert(kv.key.clone()) {
has_duplicates = true;
break;
}
}

if has_duplicates {
// Dedup the attributes and sort them
keys_set.clear();
let mut vec = attributes
.iter()
.rev()
.filter_map(|kv| {
if keys_set.insert(kv.key.clone()) {
Some(kv.clone())
} else {
None
}
})
.collect::<Vec<_>>();
vec.sort_unstable();
vec
} else {
// Attributes are already deduped
let mut vec = attributes.to_vec();
vec.sort_unstable();
vec
}
}
}
Copy link
Contributor

@TommyCpp TommyCpp Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define this as a trait? It's easier to use IMO as you can just do attr.dedup_and_sort_attributes(). I asked chatGPT to write a simple example here

@@ -245,6 +286,14 @@ mod tests {
counter_aggregation_attribute_order_helper(Temporality::Cumulative, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_aggregation_duplicate_attribute_keys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't test actual issue (it succeeds on main every single time, even though main has a bug).

I have wrote a test that actually fails on main everytime* (has 1% chance to succeed, due to Hasher being unpredictable), but succeeds on your PR.
So your fix works :)

Here's the tests content.

    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn aggregation_attribute_keys_are_deduplicated_and_sorted() {
        // Run this test with stdout enabled to see output.
        // cargo test counter_aggregation_duplicate_attribute_keys --features=testing -- --nocapture

        // attributes are stored in HashMap, but default Hasher has unpredictable order everytime
        // to reduce test flakiness create many combinations
        for combination in 0..10 {
            aggregation_attribute_keys_are_deduplicated_and_sorted_helper(combination, Temporality::Delta);
            aggregation_attribute_keys_are_deduplicated_and_sorted_helper(
                combination,
                Temporality::Cumulative,
            );
        }
    }

    fn aggregation_attribute_keys_are_deduplicated_and_sorted_helper(
        combination: i32,
        temporality: Temporality,
    ) {
        // Arrange
        let mut test_context = TestContext::new(temporality);
        let counter = test_context.u64_counter("test", "my_counter", None);

        // duplicate zero(0)
        let keys = vec![0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
        let mut rng = thread_rng();

        // create 10 measurement with shuffled keys
        for _ in 0..10 {
            let mut shuffled = keys.clone();
            shuffled.shuffle(&mut rng);
            let shuffled: Vec<_> = shuffled
                .into_iter()
                .scan(false, |zero_seen, value| {
                    // make sure that for duplicated key (value=0) first time it will be 100, last value will be 0
                    Some(KeyValue::new(
                        format!("k{combination}.{value}"),
                        if value == 0 {
                            if *zero_seen {
                                // collector should always return last value (0)
                                value
                            } else {
                                // for zero value, if we see it for the first time, set it to 100,
                                // last value will always be 0,
                                *zero_seen = true;
                                100
                            }
                        } else {
                            value
                        },
                    ))
                })
                .collect();
            counter.add(1, &shuffled);
        }

        test_context.flush_metrics();

        // Assert
        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
        assert_eq!(sum.data_points.len(), 1);
        
        let expected: Vec<_> = (0..10)
            .into_iter()
            .map(|v| KeyValue::new(format!("k{combination}.{v}"), v))
            .collect();
        let dp = sum.data_points.iter().next().unwrap();
        assert_eq!(dp.value, 10);
        assert_eq!(dp.attributes, expected);
    }

Copy link
Contributor Author

@utpilla utpilla Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't test actual issue (it succeeds on main every single time, even though main has a bug).

The test does fail on main. I suspect that you ran the test without updating fn find_datapoint_with_key_value<'a, T>.

@fraillt
Copy link
Contributor

fraillt commented Oct 24, 2024

I would love this to be landed... so I could remove "fix" in my own exporter implementation... :)

@utpilla
Copy link
Contributor Author

utpilla commented Oct 24, 2024

I would love this to be landed... so I could remove "fix" in my own exporter implementation... :)

We are currently focusing on 0.27 release. We would revisit this issue post that.

@cijothomas
Copy link
Member

I would love this to be landed... so I could remove "fix" in my own exporter implementation... :)

We are currently focusing on 0.27 release. We would revisit this issue post that.

Yes, lets get 0.27 out (and declare metrics api as RC), and come back to any sdk issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants