Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.9] Handle queue.* config for libbeat outputs (backport #11534) #11536

Merged
merged 2 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,11 @@ func (s *Runner) newLibbeatFinalBatchProcessor(
output, err := outputs.Load(indexSupporter, beatInfo, stats, outputName, s.outputConfig.Config())
return outputName, output, err
}
pipeline, err := pipeline.Load(beatInfo, monitors, pipeline.Config{}, nopProcessingSupporter{}, outputFactory)
var pipelineConfig pipeline.Config
if err := s.rawConfig.Unpack(&pipelineConfig); err != nil {
return nil, nil, fmt.Errorf("failed to unpack libbeat pipeline config: %w", err)
}
pipeline, err := pipeline.Load(beatInfo, monitors, pipelineConfig, nopProcessingSupporter{}, outputFactory)
if err != nil {
return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err)
}
Expand Down
8 changes: 8 additions & 0 deletions systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,21 @@ type InstrumentationConfig struct {
type OutputConfig struct {
Console *ConsoleOutputConfig `json:"console,omitempty"`
Elasticsearch *ElasticsearchOutputConfig `json:"elasticsearch,omitempty"`
Logstash *LogstashOutputConfig `json:"logstash,omitempty"`
}

// ConsoleOutputConfig holds APM Server libbeat console output configuration.
type ConsoleOutputConfig struct {
Enabled bool `json:"enabled"`
}

// LogstashOutputConfig holds APM Server libbeat logstash output configuration.
type LogstashOutputConfig struct {
Enabled bool `json:"enabled"`
Hosts []string `json:"hosts,omitempty"`
BulkMaxSize int `json:"bulk_max_size,omitempty"`
}

// ElasticsearchOutputConfig holds APM Server libbeat Elasticsearch output configuration.
type ElasticsearchOutputConfig struct {
Enabled bool `json:"enabled"`
Expand Down
3 changes: 2 additions & 1 deletion systemtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/docker/docker v23.0.3+incompatible
github.com/docker/go-connections v0.4.0
github.com/elastic/go-elasticsearch/v8 v8.4.0
github.com/elastic/go-lumber v0.1.1
github.com/fatih/color v1.13.0
github.com/google/go-cmp v0.5.9
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38
Expand Down Expand Up @@ -37,7 +38,6 @@ require (
golang.org/x/sys v0.5.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/grpc v1.48.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -97,6 +97,7 @@ require (
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/genproto v0.0.0-20220728213248-dd149ef739b9 // indirect
google.golang.org/protobuf v1.28.1 // indirect
howett.net/plist v1.0.0 // indirect
)

Expand Down
3 changes: 3 additions & 0 deletions systemtest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ github.com/elastic/go-elasticsearch/v8 v8.4.0 h1:Rn1mcqaIMcNT43hnx2H62cIFZ+B6mjW
github.com/elastic/go-elasticsearch/v8 v8.4.0/go.mod h1:yY52i2Vj0unLz+N3Nwx1gM5LXwoj3h2dgptNGBYkMLA=
github.com/elastic/go-licenser v0.4.0 h1:jLq6A5SilDS/Iz1ABRkO6BHy91B9jBora8FwGRsDqUI=
github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
github.com/elastic/go-lumber v0.1.1 h1:aae5rSBnwBvdB0aShJ7AbOYPyvP1/wS/JIOC1A4D1DM=
github.com/elastic/go-lumber v0.1.1/go.mod h1:DMVoFv7YM71enE9X5vWJWWv7wvQNtzXh7bPeKukDccY=
github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0=
github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
Expand Down Expand Up @@ -408,6 +410,7 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
79 changes: 79 additions & 0 deletions systemtest/logstash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package systemtest_test

import (
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/systemtest/apmservertest"
lumberserver "github.com/elastic/go-lumber/server"
)

func TestLogstashOutput(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() { listener.Close() }) // ignore error; ls.Close closes it too

ls, err := lumberserver.NewWithListener(listener, lumberserver.V2(true))
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, ls.Close()) })

srv := apmservertest.NewUnstartedServerTB(t,
"-E", "queue.mem.events=20000",
"-E", "queue.mem.flush.min_events=10000",
"-E", "queue.mem.flush.timeout=60s",
)
srv.Config.Output = apmservertest.OutputConfig{
Logstash: &apmservertest.LogstashOutputConfig{
Enabled: true,
BulkMaxSize: 5000,
Hosts: []string{listener.Addr().String()},
},
}
require.NoError(t, srv.Start())

// Send 20000 events. We should receive 4 batches of 5000.
tracer := srv.Tracer()
for i := 0; i < 20; i++ {
for i := 0; i < 1000; i++ {
tx := tracer.StartTransaction("name", "type")
tx.End()
}
tracer.Flush(nil)
}
for i := 0; i < 4; i++ {
select {
case batch := <-ls.ReceiveChan():
batch.ACK()
assert.Len(t, batch.Events, 5000)
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for batch")
}
}
// Should be no more batches.
select {
case <-ls.ReceiveChan():
t.Error("unexpected batch received")
case <-time.After(100 * time.Millisecond):
}
}