From 932d1491af9596d3da5b7a021d525c6b2f46b5f5 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 2 Aug 2023 11:48:43 +0100 Subject: [PATCH 1/9] Add mergeHistogram fastpath code --- aggregators/merger.go | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/aggregators/merger.go b/aggregators/merger.go index d9d0234..c8a5891 100644 --- a/aggregators/merger.go +++ b/aggregators/merger.go @@ -5,11 +5,11 @@ package aggregators import ( - "io" - "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" "golang.org/x/exp/slices" + "io" + "sort" "github.com/elastic/apm-aggregation/aggregationpb" "github.com/elastic/apm-aggregation/aggregators/internal/constraint" @@ -395,6 +395,34 @@ func mergeHistogram(to, from *aggregationpb.HDRHistogram) { return } + li, lj := len(to.Buckets), len(from.Buckets) + + var extra int + for j := 0; j < lj; j++ { + i, found := sort.Find(li, func(i int) int { + // Avoid int overflow by comparing instead of subtracting + y := from.Buckets[j] + x := to.Buckets[i] + if y == x { + return 0 + } else if y > x { + return 1 + } else { + return -1 + } + }) + if found { + to.Counts[i] += from.Counts[j] + from.Counts[j] = 0 + } else { + extra++ + } + } + + if extra == 0 { + return + } + requiredLen := len(to.Buckets) + len(from.Buckets) for toIdx, fromIdx := 0, 0; toIdx < len(to.Buckets) && fromIdx < len(from.Buckets); { v := to.Buckets[toIdx] - from.Buckets[fromIdx] From e05f0fb6b699f3fe33183b8f7bdcd80727982aba Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 2 Aug 2023 12:06:52 +0100 Subject: [PATCH 2/9] Tidy up fast path --- aggregators/merger.go | 78 ++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 42 deletions(-) diff --git a/aggregators/merger.go b/aggregators/merger.go index c8a5891..c82a63e 100644 --- a/aggregators/merger.go +++ b/aggregators/merger.go @@ -395,55 +395,49 @@ func mergeHistogram(to, from *aggregationpb.HDRHistogram) { return } - li, lj := len(to.Buckets), len(from.Buckets) - var extra int - for j := 0; j < lj; j++ { - i, found := sort.Find(li, func(i int) int { - // Avoid int overflow by comparing instead of subtracting - y := from.Buckets[j] - x := to.Buckets[i] - if y == x { - return 0 - } else if y > x { - return 1 + toLen, fromLen := len(to.Buckets), len(from.Buckets) + if fromLen < toLen { // Heuristics to trade between O(m lg n) and O(n + m). + // Fast path to optimize for cases where len(from.Buckets) << len(to.Buckets) + // Binary search for all from.Buckets in to.Buckets for fewer comparisons, + // mergeHistogram will be O(m lg n) where m = fromLen and n = toLen. + for fromIdx := 0; fromIdx < fromLen; fromIdx++ { + toIdx, found := sort.Find(toLen, func(toIdx int) int { + return int(from.Buckets[fromIdx] - to.Buckets[toIdx]) + }) + if found { + to.Counts[toIdx] += from.Counts[fromIdx] + from.Counts[fromIdx] = 0 } else { - return -1 + extra++ } - }) - if found { - to.Counts[i] += from.Counts[j] - from.Counts[j] = 0 - } else { - extra++ } - } - - if extra == 0 { - return - } - - requiredLen := len(to.Buckets) + len(from.Buckets) - for toIdx, fromIdx := 0, 0; toIdx < len(to.Buckets) && fromIdx < len(from.Buckets); { - v := to.Buckets[toIdx] - from.Buckets[fromIdx] - switch { - case v == 0: - // For every bucket that is common, we need one less bucket in final slice - requiredLen-- - toIdx++ - fromIdx++ - case v < 0: - toIdx++ - case v > 0: - fromIdx++ + if extra == 0 { + return + } + } else { + // Slow path with runtime O(n + m). + requiredLen := toLen + fromLen + for toIdx, fromIdx := 0, 0; toIdx < toLen && fromIdx < fromLen; { + v := to.Buckets[toIdx] - from.Buckets[fromIdx] + switch { + case v == 0: + // For every bucket that is common, we need one less bucket in final slice + requiredLen-- + toIdx++ + fromIdx++ + case v < 0: + toIdx++ + case v > 0: + fromIdx++ + } } + extra = requiredLen - toLen } - toIdx, fromIdx := len(to.Buckets)-1, len(from.Buckets)-1 - to.Buckets = slices.Grow(to.Buckets, requiredLen-len(to.Buckets)) - to.Counts = slices.Grow(to.Counts, requiredLen-len(to.Counts)) - to.Buckets = to.Buckets[:requiredLen] - to.Counts = to.Counts[:requiredLen] + toIdx, fromIdx := toLen-1, fromLen-1 + to.Buckets = slices.Grow(to.Buckets, extra)[:toLen+extra] + to.Counts = slices.Grow(to.Counts, extra)[:toLen+extra] for idx := len(to.Buckets) - 1; idx >= 0; idx-- { if fromIdx < 0 { break From 9ec068a9f41f3f9d1f15e353de57306393a83904 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 2 Aug 2023 12:47:41 +0100 Subject: [PATCH 3/9] Add note about mutating input --- aggregators/merger.go | 1 + 1 file changed, 1 insertion(+) diff --git a/aggregators/merger.go b/aggregators/merger.go index c82a63e..213f6db 100644 --- a/aggregators/merger.go +++ b/aggregators/merger.go @@ -384,6 +384,7 @@ func mergeSpanMetrics(to, from *aggregationpb.SpanMetrics) { // mergeHistogram merges two proto representation of HDRHistogram. The // merge assumes both histograms are created with same arguments and // their representations are sorted by bucket. +// Caution: this function may mutate from.Count. func mergeHistogram(to, from *aggregationpb.HDRHistogram) { if len(from.Buckets) == 0 { return From c198c547b6429a7cd389be19979c36b19810482c Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 2 Aug 2023 12:48:24 +0100 Subject: [PATCH 4/9] make fmt --- aggregators/merger.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aggregators/merger.go b/aggregators/merger.go index 213f6db..dc99b8d 100644 --- a/aggregators/merger.go +++ b/aggregators/merger.go @@ -5,11 +5,12 @@ package aggregators import ( + "io" + "sort" + "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" "golang.org/x/exp/slices" - "io" - "sort" "github.com/elastic/apm-aggregation/aggregationpb" "github.com/elastic/apm-aggregation/aggregators/internal/constraint" From ee39e5d03f8d07114d5099df87b61bb453c9591a Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 2 Aug 2023 13:59:29 +0100 Subject: [PATCH 5/9] Fix typo --- aggregators/merger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregators/merger.go b/aggregators/merger.go index dc99b8d..cd3b4ef 100644 --- a/aggregators/merger.go +++ b/aggregators/merger.go @@ -385,7 +385,7 @@ func mergeSpanMetrics(to, from *aggregationpb.SpanMetrics) { // mergeHistogram merges two proto representation of HDRHistogram. The // merge assumes both histograms are created with same arguments and // their representations are sorted by bucket. -// Caution: this function may mutate from.Count. +// Caution: this function may mutate from.Counts. func mergeHistogram(to, from *aggregationpb.HDRHistogram) { if len(from.Buckets) == 0 { return From cc92e03cb7e43ca5041ba7f68e7a13a3e6547d48 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 2 Aug 2023 14:13:52 +0100 Subject: [PATCH 6/9] Limit search space according to previous result --- aggregators/merger.go | 9 +++++++-- aggregators/merger_test.go | 24 ++++++++++++------------ 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/aggregators/merger.go b/aggregators/merger.go index cd3b4ef..6535de2 100644 --- a/aggregators/merger.go +++ b/aggregators/merger.go @@ -403,8 +403,12 @@ func mergeHistogram(to, from *aggregationpb.HDRHistogram) { // Fast path to optimize for cases where len(from.Buckets) << len(to.Buckets) // Binary search for all from.Buckets in to.Buckets for fewer comparisons, // mergeHistogram will be O(m lg n) where m = fromLen and n = toLen. - for fromIdx := 0; fromIdx < fromLen; fromIdx++ { - toIdx, found := sort.Find(toLen, func(toIdx int) int { + searchToLen := toLen + for fromIdx := fromLen - 1; fromIdx >= 0; fromIdx-- { + // Instead of searching in to.Buckets[0:toLen] each time, + // make use of the result of the previous pass since from.Buckets[i] > from.Buckets[i-1], + // such that the search space can be reduced to to.Buckets[0:searchToLen]. + toIdx, found := sort.Find(searchToLen, func(toIdx int) int { return int(from.Buckets[fromIdx] - to.Buckets[toIdx]) }) if found { @@ -413,6 +417,7 @@ func mergeHistogram(to, from *aggregationpb.HDRHistogram) { } else { extra++ } + searchToLen = toIdx } if extra == 0 { return diff --git a/aggregators/merger_test.go b/aggregators/merger_test.go index 26b050d..5319fa0 100644 --- a/aggregators/merger_test.go +++ b/aggregators/merger_test.go @@ -1147,31 +1147,31 @@ func TestMergeHistogram(t *testing.T) { { name: "from_between_to", to: &aggregationpb.HDRHistogram{ - Buckets: []int32{1000, 3000}, - Counts: []int64{1, 3}, + Buckets: []int32{1000, 4000, 5000}, + Counts: []int64{1, 4, 5}, }, from: &aggregationpb.HDRHistogram{ - Buckets: []int32{2000}, - Counts: []int64{2}, + Buckets: []int32{2000, 3000}, + Counts: []int64{2, 3}, }, expected: &aggregationpb.HDRHistogram{ - Buckets: []int32{1000, 2000, 3000}, - Counts: []int64{1, 2, 3}, + Buckets: []int32{1000, 2000, 3000, 4000, 5000}, + Counts: []int64{1, 2, 3, 4, 5}, }, }, { name: "to_between_from", to: &aggregationpb.HDRHistogram{ - Buckets: []int32{2000}, - Counts: []int64{2}, + Buckets: []int32{2000, 3000}, + Counts: []int64{2, 3}, }, from: &aggregationpb.HDRHistogram{ - Buckets: []int32{1000, 3000}, - Counts: []int64{1, 3}, + Buckets: []int32{1000, 4000, 5000}, + Counts: []int64{1, 4, 5}, }, expected: &aggregationpb.HDRHistogram{ - Buckets: []int32{1000, 2000, 3000}, - Counts: []int64{1, 2, 3}, + Buckets: []int32{1000, 2000, 3000, 4000, 5000}, + Counts: []int64{1, 2, 3, 4, 5}, }, }, { From 5003bc784533d7242fdae74592f54a9ec6e15398 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 2 Aug 2023 15:02:19 +0100 Subject: [PATCH 7/9] Add explanation --- aggregators/merger.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/aggregators/merger.go b/aggregators/merger.go index 6535de2..538d4af 100644 --- a/aggregators/merger.go +++ b/aggregators/merger.go @@ -417,6 +417,15 @@ func mergeHistogram(to, from *aggregationpb.HDRHistogram) { } else { extra++ } + // Invariance: + // to.Buckets[toIdx] >= from.Buckets[fromIdx] (defined by sort.Find) + // from.Buckets[i-1] < from.Buckets[i] (buckets are strictly increasing) + // to.Buckets[i-1] < to.Buckets[i] (buckets are strictly increasing) + // + // Therefore: + // from.Buckets[fromIdx-1] < to.Buckets[toIdx] + // In the next pass, we can safely search in to.Buckets[0:toIdx], i.e. calling sort.Find(toIdx, ...). + // Edge case: where from.Buckets[fromIdx-1] > to.Buckets[toIdx-1], sort.Find will return (toIdx, false). searchToLen = toIdx } if extra == 0 { From 274e7222d9d3c205663cbba5449963495c8612d4 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 2 Aug 2023 15:10:55 +0100 Subject: [PATCH 8/9] Fix typo --- aggregators/merger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregators/merger.go b/aggregators/merger.go index 538d4af..25fc38a 100644 --- a/aggregators/merger.go +++ b/aggregators/merger.go @@ -417,7 +417,7 @@ func mergeHistogram(to, from *aggregationpb.HDRHistogram) { } else { extra++ } - // Invariance: + // Invariants: // to.Buckets[toIdx] >= from.Buckets[fromIdx] (defined by sort.Find) // from.Buckets[i-1] < from.Buckets[i] (buckets are strictly increasing) // to.Buckets[i-1] < to.Buckets[i] (buckets are strictly increasing) From cdb5bebeaece942dafbff2f3bbeea32d9f0b8c0e Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 2 Aug 2023 18:17:36 +0100 Subject: [PATCH 9/9] Fallback immediately on new bucket --- aggregators/merger.go | 57 +++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/aggregators/merger.go b/aggregators/merger.go index 25fc38a..2a85032 100644 --- a/aggregators/merger.go +++ b/aggregators/merger.go @@ -397,13 +397,14 @@ func mergeHistogram(to, from *aggregationpb.HDRHistogram) { return } - var extra int toLen, fromLen := len(to.Buckets), len(from.Buckets) - if fromLen < toLen { // Heuristics to trade between O(m lg n) and O(n + m). + // Heuristics to decide whether to use the fast path. + if fromLen < toLen && from.Buckets[0] >= to.Buckets[0] && from.Buckets[fromLen-1] <= to.Buckets[toLen-1] { // Fast path to optimize for cases where len(from.Buckets) << len(to.Buckets) // Binary search for all from.Buckets in to.Buckets for fewer comparisons, // mergeHistogram will be O(m lg n) where m = fromLen and n = toLen. searchToLen := toLen + var fallback bool for fromIdx := fromLen - 1; fromIdx >= 0; fromIdx-- { // Instead of searching in to.Buckets[0:toLen] each time, // make use of the result of the previous pass since from.Buckets[i] > from.Buckets[i-1], @@ -411,14 +412,15 @@ func mergeHistogram(to, from *aggregationpb.HDRHistogram) { toIdx, found := sort.Find(searchToLen, func(toIdx int) int { return int(from.Buckets[fromIdx] - to.Buckets[toIdx]) }) - if found { - to.Counts[toIdx] += from.Counts[fromIdx] - from.Counts[fromIdx] = 0 - } else { - extra++ + if !found { + fallback = true + break } + + to.Counts[toIdx] += from.Counts[fromIdx] + from.Counts[fromIdx] = 0 // Invariants: - // to.Buckets[toIdx] >= from.Buckets[fromIdx] (defined by sort.Find) + // to.Buckets[toIdx] == from.Buckets[fromIdx] (because we fallback immediately when not found) // from.Buckets[i-1] < from.Buckets[i] (buckets are strictly increasing) // to.Buckets[i-1] < to.Buckets[i] (buckets are strictly increasing) // @@ -428,29 +430,32 @@ func mergeHistogram(to, from *aggregationpb.HDRHistogram) { // Edge case: where from.Buckets[fromIdx-1] > to.Buckets[toIdx-1], sort.Find will return (toIdx, false). searchToLen = toIdx } - if extra == 0 { + if !fallback { + // from.Buckets is a subset of to.Buckets. + // No further merging is needed. return } - } else { - // Slow path with runtime O(n + m). - requiredLen := toLen + fromLen - for toIdx, fromIdx := 0, 0; toIdx < toLen && fromIdx < fromLen; { - v := to.Buckets[toIdx] - from.Buckets[fromIdx] - switch { - case v == 0: - // For every bucket that is common, we need one less bucket in final slice - requiredLen-- - toIdx++ - fromIdx++ - case v < 0: - toIdx++ - case v > 0: - fromIdx++ - } + } + + // Determine the number of extra buckets needed so we can grow the slice in one go. + requiredLen := toLen + fromLen + for toIdx, fromIdx := 0, 0; toIdx < toLen && fromIdx < fromLen; { + v := to.Buckets[toIdx] - from.Buckets[fromIdx] + switch { + case v == 0: + // For every bucket that is common, we need one less bucket in final slice + requiredLen-- + toIdx++ + fromIdx++ + case v < 0: + toIdx++ + case v > 0: + fromIdx++ } - extra = requiredLen - toLen } + extra := requiredLen - toLen + // Merge the slices. toIdx, fromIdx := toLen-1, fromLen-1 to.Buckets = slices.Grow(to.Buckets, extra)[:toLen+extra] to.Counts = slices.Grow(to.Counts, extra)[:toLen+extra]