Skip to content

Commit

Permalink
[8.9] Handle queue.* config for libbeat outputs (backport #11534) (#1…
Browse files Browse the repository at this point in the history
…1536)

* Handle queue.* config for libbeat outputs (#11534)

We were ignoring queue.* config when using libbeat
outputs, such as logstash and kafka.

(cherry picked from commit 3771a5f)

# Conflicts:
#	changelogs/8.10.asciidoc
#	systemtest/go.mod
#	systemtest/go.sum

* Fix merge conflicts

---------

Co-authored-by: Andrew Wilkins <axw@elastic.co>
  • Loading branch information
mergify[bot] and axw authored Aug 31, 2023
1 parent d2d8675 commit 07b7932
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 2 deletions.
6 changes: 5 additions & 1 deletion internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,11 @@ func (s *Runner) newLibbeatFinalBatchProcessor(
output, err := outputs.Load(indexSupporter, beatInfo, stats, outputName, s.outputConfig.Config())
return outputName, output, err
}
pipeline, err := pipeline.Load(beatInfo, monitors, pipeline.Config{}, nopProcessingSupporter{}, outputFactory)
var pipelineConfig pipeline.Config
if err := s.rawConfig.Unpack(&pipelineConfig); err != nil {
return nil, nil, fmt.Errorf("failed to unpack libbeat pipeline config: %w", err)
}
pipeline, err := pipeline.Load(beatInfo, monitors, pipelineConfig, nopProcessingSupporter{}, outputFactory)
if err != nil {
return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err)
}
Expand Down
8 changes: 8 additions & 0 deletions systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,21 @@ type InstrumentationConfig struct {
type OutputConfig struct {
Console *ConsoleOutputConfig `json:"console,omitempty"`
Elasticsearch *ElasticsearchOutputConfig `json:"elasticsearch,omitempty"`
Logstash *LogstashOutputConfig `json:"logstash,omitempty"`
}

// ConsoleOutputConfig holds APM Server libbeat console output configuration.
type ConsoleOutputConfig struct {
Enabled bool `json:"enabled"`
}

// LogstashOutputConfig holds APM Server libbeat logstash output configuration.
type LogstashOutputConfig struct {
Enabled bool `json:"enabled"`
Hosts []string `json:"hosts,omitempty"`
BulkMaxSize int `json:"bulk_max_size,omitempty"`
}

// ElasticsearchOutputConfig holds APM Server libbeat Elasticsearch output configuration.
type ElasticsearchOutputConfig struct {
Enabled bool `json:"enabled"`
Expand Down
3 changes: 2 additions & 1 deletion systemtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/docker/docker v23.0.3+incompatible
github.com/docker/go-connections v0.4.0
github.com/elastic/go-elasticsearch/v8 v8.4.0
github.com/elastic/go-lumber v0.1.1
github.com/fatih/color v1.13.0
github.com/google/go-cmp v0.5.9
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38
Expand Down Expand Up @@ -37,7 +38,6 @@ require (
golang.org/x/sys v0.5.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/grpc v1.48.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -97,6 +97,7 @@ require (
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/genproto v0.0.0-20220728213248-dd149ef739b9 // indirect
google.golang.org/protobuf v1.28.1 // indirect
howett.net/plist v1.0.0 // indirect
)

Expand Down
3 changes: 3 additions & 0 deletions systemtest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ github.com/elastic/go-elasticsearch/v8 v8.4.0 h1:Rn1mcqaIMcNT43hnx2H62cIFZ+B6mjW
github.com/elastic/go-elasticsearch/v8 v8.4.0/go.mod h1:yY52i2Vj0unLz+N3Nwx1gM5LXwoj3h2dgptNGBYkMLA=
github.com/elastic/go-licenser v0.4.0 h1:jLq6A5SilDS/Iz1ABRkO6BHy91B9jBora8FwGRsDqUI=
github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
github.com/elastic/go-lumber v0.1.1 h1:aae5rSBnwBvdB0aShJ7AbOYPyvP1/wS/JIOC1A4D1DM=
github.com/elastic/go-lumber v0.1.1/go.mod h1:DMVoFv7YM71enE9X5vWJWWv7wvQNtzXh7bPeKukDccY=
github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0=
github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
Expand Down Expand Up @@ -408,6 +410,7 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
79 changes: 79 additions & 0 deletions systemtest/logstash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package systemtest_test

import (
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/systemtest/apmservertest"
lumberserver "github.com/elastic/go-lumber/server"
)

func TestLogstashOutput(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() { listener.Close() }) // ignore error; ls.Close closes it too

ls, err := lumberserver.NewWithListener(listener, lumberserver.V2(true))
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, ls.Close()) })

srv := apmservertest.NewUnstartedServerTB(t,
"-E", "queue.mem.events=20000",
"-E", "queue.mem.flush.min_events=10000",
"-E", "queue.mem.flush.timeout=60s",
)
srv.Config.Output = apmservertest.OutputConfig{
Logstash: &apmservertest.LogstashOutputConfig{
Enabled: true,
BulkMaxSize: 5000,
Hosts: []string{listener.Addr().String()},
},
}
require.NoError(t, srv.Start())

// Send 20000 events. We should receive 4 batches of 5000.
tracer := srv.Tracer()
for i := 0; i < 20; i++ {
for i := 0; i < 1000; i++ {
tx := tracer.StartTransaction("name", "type")
tx.End()
}
tracer.Flush(nil)
}
for i := 0; i < 4; i++ {
select {
case batch := <-ls.ReceiveChan():
batch.ACK()
assert.Len(t, batch.Events, 5000)
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for batch")
}
}
// Should be no more batches.
select {
case <-ls.ReceiveChan():
t.Error("unexpected batch received")
case <-time.After(100 * time.Millisecond):
}
}

0 comments on commit 07b7932

Please sign in to comment.