Skip to content

Commit

Permalink
[8.14] fix: Update FlushBytes parsing/defaults (backport #13576) (#…
Browse files Browse the repository at this point in the history
…13577)

Updates the `FlushBytes` setting to default to 1 mib and only override
to 24kb if the user has explicitly set it to 24kb.

Fixes #13024
---------

Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>
(cherry picked from commit a453a88)
Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>

# Conflicts:
#	changelogs/head.asciidoc
#	internal/beater/beater.go

* fix conflicts

Signed-off-by: inge4pres <francesco.gualazzi@elastic.co>
Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>

* lint

Signed-off-by: inge4pres <francesco.gualazzi@elastic.co>
Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>

* fix dependency modified by IDE

Signed-off-by: inge4pres <francesco.gualazzi@elastic.co>
Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>

* remove RequireDataStream

Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>

---------

Signed-off-by: inge4pres <francesco.gualazzi@elastic.co>
Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>
Co-authored-by: Marc Lopez Rubio <marc5.12@outlook.com>
Co-authored-by: inge4pres <francesco.gualazzi@elastic.co>
  • Loading branch information
3 people authored Jul 5, 2024
1 parent 39331b0 commit 1b2815f
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 52 deletions.
114 changes: 63 additions & 51 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-data/model/modelprocessor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/instrumentation"
Expand All @@ -52,11 +54,9 @@ import (
agentconfig "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-docappender"
docappender "github.com/elastic/go-docappender"
"github.com/elastic/go-ucfg"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-data/model/modelprocessor"
"github.com/elastic/apm-server/internal/agentcfg"
"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/config"
Expand Down Expand Up @@ -652,7 +652,6 @@ func (s *Runner) newFinalBatchProcessor(
newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error),
memLimit float64,
) (modelpb.BatchProcessor, func(context.Context) error, error) {

monitoring.Default.Remove("libbeat")
libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat")
if s.elasticsearchOutputConfig == nil {
Expand All @@ -668,58 +667,16 @@ func (s *Runner) newFinalBatchProcessor(
}
monitoring.NewString(outputRegistry, "name").Set("elasticsearch")

var esConfig struct {
*elasticsearch.Config `config:",inline"`
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Enabled *bool `config:"enabled"`
} `config:"autoscaling"`
}
esConfig.FlushInterval = time.Second
esConfig.Config = elasticsearch.DefaultConfig()
esConfig.MaxIdleConnsPerHost = 10
if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil {
// Create the docappender and Elasticsearch config
appenderCfg, esCfg, err := s.newDocappenderConfig(tracer, memLimit)
if err != nil {
return nil, nil, err
}

if esConfig.MaxRequests != 0 {
esConfig.MaxIdleConnsPerHost = esConfig.MaxRequests
}

var flushBytes int
if esConfig.FlushBytes != "" {
b, err := humanize.ParseBytes(esConfig.FlushBytes)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to parse flush_bytes")
}
flushBytes = int(b)
}
minFlush := 24 * 1024
if esConfig.CompressionLevel != 0 && flushBytes < minFlush {
s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush)
flushBytes = minFlush
}
client, err := newElasticsearchClient(esConfig.Config)
client, err := newElasticsearchClient(esCfg)
if err != nil {
return nil, nil, err
}
var scalingCfg docappender.ScalingConfig
if enabled := esConfig.Scaling.Enabled; enabled != nil {
scalingCfg.Disabled = !*enabled
}
opts := docappender.Config{
CompressionLevel: esConfig.CompressionLevel,
FlushBytes: flushBytes,
FlushInterval: esConfig.FlushInterval,
Tracer: tracer,
MaxRequests: esConfig.MaxRequests,
Scaling: scalingCfg,
Logger: zap.New(s.logger.Core(), zap.WithCaller(true)),
}
opts = docappenderConfig(opts, memLimit, s.logger)
appender, err := docappender.New(client, opts)
appender, err := docappender.New(client, appenderCfg)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -782,6 +739,61 @@ func (s *Runner) newFinalBatchProcessor(
return newDocappenderBatchProcessor(appender), appender.Close, nil
}

func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) (
docappender.Config, *elasticsearch.Config, error,
) {
var esConfig struct {
*elasticsearch.Config `config:",inline"`
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Enabled *bool `config:"enabled"`
} `config:"autoscaling"`
}
// Default to 1mib flushes, which is the default for go-docappender.
esConfig.FlushBytes = "1 mib"
esConfig.FlushInterval = time.Second
esConfig.Config = elasticsearch.DefaultConfig()
esConfig.MaxIdleConnsPerHost = 10

if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil {
return docappender.Config{}, nil, err
}

var flushBytes int
if esConfig.FlushBytes != "" {
b, err := humanize.ParseBytes(esConfig.FlushBytes)
if err != nil {
return docappender.Config{}, nil, fmt.Errorf("failed to parse flush_bytes: %w", err)
}
flushBytes = int(b)
}
minFlush := 24 * 1024
if esConfig.CompressionLevel != 0 && flushBytes < minFlush {
s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush)
flushBytes = minFlush
}
var scalingCfg docappender.ScalingConfig
if enabled := esConfig.Scaling.Enabled; enabled != nil {
scalingCfg.Disabled = !*enabled
}
cfg := docappenderConfig(docappender.Config{
CompressionLevel: esConfig.CompressionLevel,
FlushBytes: flushBytes,
FlushInterval: esConfig.FlushInterval,
Tracer: tracer,
MaxRequests: esConfig.MaxRequests,
Scaling: scalingCfg,
Logger: zap.New(s.logger.Core(), zap.WithCaller(true)),
}, memLimit, s.logger)
if cfg.MaxRequests != 0 {
esConfig.MaxIdleConnsPerHost = cfg.MaxRequests
}

return cfg, esConfig.Config, nil
}

func docappenderConfig(
opts docappender.Config, memLimit float64, logger *logp.Logger,
) docappender.Config {
Expand Down
76 changes: 75 additions & 1 deletion internal/beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.elastic.co/apm/v2/apmtest"
"go.uber.org/zap"

agentconfig "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
docappender "github.com/elastic/go-docappender"

"github.com/elastic/apm-server/internal/beater/config"
"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/elastic-agent-libs/monitoring"
)

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
Expand Down Expand Up @@ -152,3 +157,72 @@ func newMockClusterUUIDClient(t testing.TB, clusterUUID string) *elasticsearch.C
require.NoError(t, err)
return client
}

func TestRunnerNewDocappenderConfig(t *testing.T) {
var tc = []struct {
memSize float64
wantMaxRequests int
wantDocBufSize int
}{
{memSize: 1, wantMaxRequests: 11, wantDocBufSize: 819},
{memSize: 2, wantMaxRequests: 13, wantDocBufSize: 1638},
{memSize: 4, wantMaxRequests: 16, wantDocBufSize: 3276},
{memSize: 8, wantMaxRequests: 22, wantDocBufSize: 6553},
}
for _, c := range tc {
t.Run(fmt.Sprintf("default/%vgb", c.memSize), func(t *testing.T) {
r := Runner{
elasticsearchOutputConfig: agentconfig.NewConfig(),
logger: logp.NewLogger("test"),
}
docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize)
require.NoError(t, err)
assert.Equal(t, docappender.Config{
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
FlushInterval: time.Second,
FlushBytes: 1024 * 1024,
MaxRequests: c.wantMaxRequests,
DocumentBufferSize: c.wantDocBufSize,
}, docCfg)
assert.Equal(t, &elasticsearch.Config{
Hosts: elasticsearch.Hosts{"localhost:9200"},
Backoff: elasticsearch.DefaultBackoffConfig,
Protocol: "http",
CompressionLevel: 5,
Timeout: 5 * time.Second,
MaxRetries: 3,
MaxIdleConnsPerHost: c.wantMaxRequests,
}, esCfg)
})
t.Run(fmt.Sprintf("override/%vgb", c.memSize), func(t *testing.T) {
r := Runner{
elasticsearchOutputConfig: agentconfig.MustNewConfigFrom(map[string]interface{}{
"flush_bytes": "500 kib",
"flush_interval": "2s",
"max_requests": 50,
}),
logger: logp.NewLogger("test"),
}
docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize)
require.NoError(t, err)
assert.Equal(t, docappender.Config{
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
FlushInterval: 2 * time.Second,
FlushBytes: 500 * 1024,
MaxRequests: 50,
DocumentBufferSize: c.wantDocBufSize,
}, docCfg)
assert.Equal(t, &elasticsearch.Config{
Hosts: elasticsearch.Hosts{"localhost:9200"},
Backoff: elasticsearch.DefaultBackoffConfig,
Protocol: "http",
CompressionLevel: 5,
Timeout: 5 * time.Second,
MaxRetries: 3,
MaxIdleConnsPerHost: 50,
}, esCfg)
})
}
}

0 comments on commit 1b2815f

Please sign in to comment.