Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use old aggregation for 386 only, in 8.10 #11403

Merged
merged 7 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,34 @@ Third party libraries used by the Elastic APM Server project:
================================================================================


--------------------------------------------------------------------------------
Dependency : github.com/axiomhq/hyperloglog
Version: v0.0.0-20230201085229-3ddf4bad03dc
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/axiomhq/hyperloglog@v0.0.0-20230201085229-3ddf4bad03dc/LICENSE:

Copyright (c) 2021, Axiom, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/cespare/xxhash/v2
Version: v2.2.0
Expand Down Expand Up @@ -7304,34 +7332,6 @@ IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


--------------------------------------------------------------------------------
Dependency : github.com/axiomhq/hyperloglog
Version: v0.0.0-20230201085229-3ddf4bad03dc
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/axiomhq/hyperloglog@v0.0.0-20230201085229-3ddf4bad03dc/LICENSE:

Copyright (c) 2021, Axiom, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/beorn7/perks
Version: v1.0.1
Expand Down
2 changes: 1 addition & 1 deletion changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ https://github.com/elastic/apm-server/compare/8.9\...main[View commits]
- Add permissions to reroute events in the integration package. {pull}11168[11168]

[float]
==== Aggregation improvements
==== Aggregation improvements (except for 386 architecture)
- Replace aggregation with LSM-based aggregator which has a lower memory footprint {pull}11117[11117]
- Add `service.language.name` to service destination metrics {pull}11117[11117]
- Modify per-service transaction groups limit to consider more than service.name; Add per-service service destination groups limit and per-service service transaction groups limit {pull}11117[11117]
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/elastic/apm-server
go 1.20

require (
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc
github.com/cespare/xxhash/v2 v2.2.0
github.com/dgraph-io/badger/v2 v2.2007.3-0.20201012072640-f5a7e0a1c83b
github.com/dustin/go-humanize v1.0.1
Expand All @@ -15,6 +16,7 @@ require (
github.com/elastic/gmux v0.2.0
github.com/elastic/go-docappender v0.2.1-0.20230724080315-b714d6181871
github.com/elastic/go-elasticsearch/v8 v8.9.0
github.com/elastic/go-hdrhistogram v0.1.0
github.com/elastic/go-sysinfo v1.11.0
github.com/elastic/go-ucfg v0.8.6
github.com/go-sourcemap/sourcemap v2.1.3+incompatible
Expand Down Expand Up @@ -64,7 +66,6 @@ require (
github.com/Shopify/sarama v1.38.1 // indirect
github.com/apache/thrift v0.18.1 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cockroachdb/errors v1.8.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ github.com/elastic/go-docappender v0.2.1-0.20230724080315-b714d6181871 h1:8MRHKw
github.com/elastic/go-docappender v0.2.1-0.20230724080315-b714d6181871/go.mod h1:yqBlaclEXBY6DCP3d28bf/HjJJcXhd2CCIy0kDuLNjQ=
github.com/elastic/go-elasticsearch/v8 v8.9.0 h1:8xtmYjUkqtahl50E0Bg/wjKI7K63krJrrLipbNj/fCU=
github.com/elastic/go-elasticsearch/v8 v8.9.0/go.mod h1:NGmpvohKiRHXI0Sw4fuUGn6hYOmAXlyCphKpzVBiqDE=
github.com/elastic/go-hdrhistogram v0.1.0 h1:7UVeQ9MsO5c9h8RJeH2S2lXCGi9hQB/94W6Pjjqprc4=
github.com/elastic/go-hdrhistogram v0.1.0/go.mod h1:NEl0wZTQXzwq7X2WBZGl5G3efcKbvv+r9mTZpXrIs78=
github.com/elastic/go-licenser v0.3.1/go.mod h1:D8eNQk70FOCVBl3smCGQt/lv7meBeQno2eI1S5apiHQ=
github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
github.com/elastic/go-licenser v0.4.1 h1:1xDURsc8pL5zYT9R29425J3vkHdt4RT5TNEMeRN48x4=
Expand Down
155 changes: 155 additions & 0 deletions x-pack/apm-server/aggregation/baseaggregator/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package baseaggregator

import (
"context"
"sync"
"time"

"github.com/pkg/errors"

"github.com/elastic/elastic-agent-libs/logp"
)

// PublishFunc encapsulates the metric publishing function
type PublishFunc func(context.Context, time.Duration) error

// AggregatorConfig defined the common aggregator configuration.
type AggregatorConfig struct {
// PublishFunc is the function that performs the actual publishing
// of aggregated events.
PublishFunc PublishFunc

// Logger is the logger for logging metrics aggregation/publishing.
//
// If Logger is nil, a new logger will be constructed.
Logger *logp.Logger

// RollUpIntervals are additional MetricsInterval for the aggregator to
// compute and publish metrics for. Each additional interval is constrained
// to the same rules as MetricsInterval, and will result in additional
// memory to be allocated.
RollUpIntervals []time.Duration

// Interval is the interval between publishing of aggregated metrics.
// There may be additional metrics reported at arbitrary times if the
// aggregation groups fill up.
Interval time.Duration
}

// Validate validates the aggregator config.
func (cfg AggregatorConfig) Validate() error {
if cfg.PublishFunc == nil {
return errors.New("PublishFunc unspecified")
}
if cfg.Interval <= 0 {
return errors.New("Interval unspecified or negative")
}
if cfg.Logger == nil {
return errors.New("Logger unspecified")
}
for i, interval := range cfg.RollUpIntervals {
if interval <= 0 {
return errors.Errorf("RollUpIntervals[%d]: unspecified or negative", i)
}
if interval%cfg.Interval != 0 {
return errors.Errorf("RollUpIntervals[%d]: interval must be a multiple of MetricsInterval", i)
}
}
return nil
}

// Aggregator contains the basic methods for the metrics aggregators to embed.
type Aggregator struct {
stopMu sync.Mutex
stopping chan struct{}
stopped chan struct{}
publish PublishFunc
Intervals []time.Duration

config AggregatorConfig
}

// New returns a new base Aggregator.
func New(cfg AggregatorConfig) (*Aggregator, error) {
if err := cfg.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid aggregator config")
}
return &Aggregator{
stopping: make(chan struct{}),
stopped: make(chan struct{}),
config: cfg,
publish: cfg.PublishFunc,
Intervals: append([]time.Duration{cfg.Interval}, cfg.RollUpIntervals...),
}, nil
}

// Run runs the Aggregator, periodically publishing and clearing aggregated
// metrics. Run returns when either a fatal error occurs, or the Aggregator's
// Stop method is invoked.
func (a *Aggregator) Run() error {
ticker := time.NewTicker(a.config.Interval)
defer ticker.Stop()
defer func() {
a.stopMu.Lock()
defer a.stopMu.Unlock()
select {
case <-a.stopped:
default:
close(a.stopped)
}
}()
var stop bool
var ticks uint64
for !stop {
select {
case <-a.stopping:
stop = true
case <-ticker.C:
ticks++
}
// Publish the metricsets for all configured intervals.
for _, interval := range a.Intervals {
// Publish $interval MetricSets when:
// - ticks * MetricsInterval % $interval == 0.
// - Aggregator is stopped.
if !stop && (ticks*uint64(a.config.Interval))%uint64(interval) != 0 {
continue
}
if err := a.publish(context.Background(), interval); err != nil {
a.config.Logger.With(logp.Error(err)).Warnf(
"publishing of %s metrics failed: %s",
interval.String(), err,
)
}
}
}
return nil
}

// Stop stops the Aggregator if it is running, waiting for it to flush any
// aggregated metrics and return, or for the context to be cancelled.
//
// After Stop has been called the aggregator cannot be reused, as the Run
// method will always return immediately.
func (a *Aggregator) Stop(ctx context.Context) error {
a.stopMu.Lock()
select {
case <-a.stopped:
case <-a.stopping:
// Already stopping/stopped.
default:
close(a.stopping)
}
a.stopMu.Unlock()

select {
case <-a.stopped:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
41 changes: 41 additions & 0 deletions x-pack/apm-server/aggregation/baseaggregator/space.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package baseaggregator

import "github.com/pkg/errors"

// Space models a backing array for aggregators to store their metrics. The backing
// array pre-allocates the required number of entries to reduce GC overhead.
//
// Requires protection using mutexes for concurrent usage by the caller.
// TODO: Update all aggregators to use Space.
type Space[k any] struct {
index int
space []k
}

// NewSpace creates a new space given the max number of elements.
func NewSpace[k any](limit int) *Space[k] {
return &Space[k]{
space: make([]k, limit),
}
}

// Next returns the next entry from the space or error if no more entries can be
// returned.
func (s *Space[k]) Next() (*k, error) {
if s.index == len(s.space) {
return nil, errors.New("all entries are used")
}
e := &s.space[s.index]
s.index++
return e, nil
}

// Reset resets the space to be reused. Note that reset doesn't reset the fields
// for the struct and they must be updated or reset by the caller.
func (s *Space[k]) Reset() {
s.index = 0
}
19 changes: 19 additions & 0 deletions x-pack/apm-server/aggregation/interval/string.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package interval

import (
"fmt"
"time"
)

// FormatDuration formats the duration in minutes (when the duration >= 1m), or
// seconds if smaller.
func FormatDuration(d time.Duration) string {
if duration := d.Minutes(); duration >= 1 {
return fmt.Sprintf("%.0fm", duration)
}
return fmt.Sprintf("%.0fs", d.Seconds())
}
Loading