diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 6f0338a2a1d..58a2964186c 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -812,7 +812,11 @@ func (s *Runner) newLibbeatFinalBatchProcessor( output, err := outputs.Load(indexSupporter, beatInfo, stats, outputName, s.outputConfig.Config()) return outputName, output, err } - pipeline, err := pipeline.Load(beatInfo, monitors, pipeline.Config{}, nopProcessingSupporter{}, outputFactory) + var pipelineConfig pipeline.Config + if err := s.rawConfig.Unpack(&pipelineConfig); err != nil { + return nil, nil, fmt.Errorf("failed to unpack libbeat pipeline config: %w", err) + } + pipeline, err := pipeline.Load(beatInfo, monitors, pipelineConfig, nopProcessingSupporter{}, outputFactory) if err != nil { return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err) } diff --git a/systemtest/apmservertest/config.go b/systemtest/apmservertest/config.go index 83503cbf004..163f09d5548 100644 --- a/systemtest/apmservertest/config.go +++ b/systemtest/apmservertest/config.go @@ -238,6 +238,7 @@ type InstrumentationConfig struct { type OutputConfig struct { Console *ConsoleOutputConfig `json:"console,omitempty"` Elasticsearch *ElasticsearchOutputConfig `json:"elasticsearch,omitempty"` + Logstash *LogstashOutputConfig `json:"logstash,omitempty"` } // ConsoleOutputConfig holds APM Server libbeat console output configuration. @@ -245,6 +246,13 @@ type ConsoleOutputConfig struct { Enabled bool `json:"enabled"` } +// LogstashOutputConfig holds APM Server libbeat logstash output configuration. +type LogstashOutputConfig struct { + Enabled bool `json:"enabled"` + Hosts []string `json:"hosts,omitempty"` + BulkMaxSize int `json:"bulk_max_size,omitempty"` +} + // ElasticsearchOutputConfig holds APM Server libbeat Elasticsearch output configuration. type ElasticsearchOutputConfig struct { Enabled bool `json:"enabled"` diff --git a/systemtest/go.mod b/systemtest/go.mod index fbdd9587f22..99b32859982 100644 --- a/systemtest/go.mod +++ b/systemtest/go.mod @@ -6,6 +6,7 @@ require ( github.com/docker/docker v23.0.3+incompatible github.com/docker/go-connections v0.4.0 github.com/elastic/go-elasticsearch/v8 v8.4.0 + github.com/elastic/go-lumber v0.1.1 github.com/fatih/color v1.13.0 github.com/google/go-cmp v0.5.9 github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 @@ -37,7 +38,6 @@ require ( golang.org/x/sys v0.5.0 golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 google.golang.org/grpc v1.48.0 - google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -97,6 +97,7 @@ require ( golang.org/x/text v0.7.0 // indirect golang.org/x/tools v0.1.12 // indirect google.golang.org/genproto v0.0.0-20220728213248-dd149ef739b9 // indirect + google.golang.org/protobuf v1.28.1 // indirect howett.net/plist v1.0.0 // indirect ) diff --git a/systemtest/go.sum b/systemtest/go.sum index 308139b5a60..3d769990729 100644 --- a/systemtest/go.sum +++ b/systemtest/go.sum @@ -145,6 +145,8 @@ github.com/elastic/go-elasticsearch/v8 v8.4.0 h1:Rn1mcqaIMcNT43hnx2H62cIFZ+B6mjW github.com/elastic/go-elasticsearch/v8 v8.4.0/go.mod h1:yY52i2Vj0unLz+N3Nwx1gM5LXwoj3h2dgptNGBYkMLA= github.com/elastic/go-licenser v0.4.0 h1:jLq6A5SilDS/Iz1ABRkO6BHy91B9jBora8FwGRsDqUI= github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU= +github.com/elastic/go-lumber v0.1.1 h1:aae5rSBnwBvdB0aShJ7AbOYPyvP1/wS/JIOC1A4D1DM= +github.com/elastic/go-lumber v0.1.1/go.mod h1:DMVoFv7YM71enE9X5vWJWWv7wvQNtzXh7bPeKukDccY= github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0= github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= @@ -408,6 +410,7 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.11.2/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4= github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/systemtest/logstash_test.go b/systemtest/logstash_test.go new file mode 100644 index 00000000000..114a1b26377 --- /dev/null +++ b/systemtest/logstash_test.go @@ -0,0 +1,79 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package systemtest_test + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/systemtest/apmservertest" + lumberserver "github.com/elastic/go-lumber/server" +) + +func TestLogstashOutput(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) // ignore error; ls.Close closes it too + + ls, err := lumberserver.NewWithListener(listener, lumberserver.V2(true)) + require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, ls.Close()) }) + + srv := apmservertest.NewUnstartedServerTB(t, + "-E", "queue.mem.events=20000", + "-E", "queue.mem.flush.min_events=10000", + "-E", "queue.mem.flush.timeout=60s", + ) + srv.Config.Output = apmservertest.OutputConfig{ + Logstash: &apmservertest.LogstashOutputConfig{ + Enabled: true, + BulkMaxSize: 5000, + Hosts: []string{listener.Addr().String()}, + }, + } + require.NoError(t, srv.Start()) + + // Send 20000 events. We should receive 4 batches of 5000. + tracer := srv.Tracer() + for i := 0; i < 20; i++ { + for i := 0; i < 1000; i++ { + tx := tracer.StartTransaction("name", "type") + tx.End() + } + tracer.Flush(nil) + } + for i := 0; i < 4; i++ { + select { + case batch := <-ls.ReceiveChan(): + batch.ACK() + assert.Len(t, batch.Events, 5000) + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for batch") + } + } + // Should be no more batches. + select { + case <-ls.ReceiveChan(): + t.Error("unexpected batch received") + case <-time.After(100 * time.Millisecond): + } +}