Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 243 additions & 0 deletions x-pack/filebeat/input/gcppubsub/otel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration && !agentbeat

package gcppubsub_test

import (
"bytes"
"context"
"fmt"
"testing"
"text/template"
"time"

"github.com/gofrs/uuid/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
"github.com/elastic/beats/v7/libbeat/tests/integration"
"github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub/testutil"
"github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/oteltestcol"

"github.com/elastic/elastic-agent-libs/testing/estools"
)

func TestGCPInputOTelE2E(t *testing.T) {
integration.EnsureESIsRunning(t)

// Create pubsub client for setting up and communicating to emulator.
client, clientCancel := testutil.TestSetup(t)
defer func() {
clientCancel()
client.Close()
}()

testutil.CreateTopic(t, client)
testutil.CreateSubscription(t, "test-subscription-otel", client)
testutil.CreateSubscription(t, "test-subscription-fb", client)
const numMsgs = 10
testutil.PublishMessages(t, client, numMsgs)

host := integration.GetESURL(t, "http")
user := host.User.Username()
password, _ := host.User.Password()

// create a random uuid and make sure it doesn't contain dashes/
otelNamespace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4()))
fbNameSpace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4()))

otelIndex := "logs-integration-" + otelNamespace
fbIndex := "logs-integration-" + fbNameSpace

type options struct {
Namespace string
ESURL string
Username string
Password string
Subscription string
}

gcpFilebeatConfig := `filebeat.inputs:
- type: gcp-pubsub
project_id: test-project-id
topic: test-topic-foo
subscription.name: {{ .Subscription }}
credentials_file: "testdata/fake.json"

output:
elasticsearch:
hosts:
- {{ .ESURL }}
username: {{ .Username }}
password: {{ .Password }}
index: logs-integration-{{ .Namespace }}

queue.mem.flush.timeout: 0s
setup.template.enabled: false
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
`

gcpOTelConfig := `exporters:
elasticsearch:
auth:
authenticator: beatsauth
compression: gzip
compression_params:
level: 1
endpoints:
- {{ .ESURL }}
logs_dynamic_pipeline:
enabled: true
logs_index: logs-integration-{{ .Namespace }}
mapping:
mode: bodymap
max_conns_per_host: 1
password: {{ .Password }}
retry:
enabled: true
initial_interval: 1s
max_interval: 1m0s
max_retries: 3
sending_queue:
batch:
flush_timeout: 10s
max_size: 1600
min_size: 0
sizer: items
block_on_overflow: true
enabled: true
num_consumers: 1
queue_size: 3200
wait_for_result: true
user: {{ .Username }}
extensions:
beatsauth:
idle_connection_timeout: 3s
proxy_disable: false
timeout: 1m30s
receivers:
filebeatreceiver:
filebeat:
inputs:
- credentials_file: "testdata/fake.json"
project_id: test-project-id
subscription:
name: {{ .Subscription }}
topic: test-topic-foo
type: gcp-pubsub
output:
otelconsumer:
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
queue.mem.flush.timeout: 0s
setup.template.enabled: false
service:
extensions:
- beatsauth
pipelines:
logs:
exporters:
- elasticsearch
receivers:
- filebeatreceiver
telemetry:
metrics:
level: none
`

optionsValue := options{
ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host),
Username: user,
Password: password,
}

var configBuffer bytes.Buffer
optionsValue.Namespace = otelNamespace
optionsValue.Subscription = "test-subscription-otel"
require.NoError(t, template.Must(template.New("config").Parse(gcpOTelConfig)).Execute(&configBuffer, optionsValue))

oteltestcol.New(t, configBuffer.String())

// reset buffer
configBuffer.Reset()

optionsValue.Namespace = fbNameSpace
optionsValue.Subscription = "test-subscription-fb"
require.NoError(t, template.Must(template.New("config").Parse(gcpFilebeatConfig)).Execute(&configBuffer, optionsValue))

// start filebeat
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)

filebeat.WriteConfigFile(configBuffer.String())
filebeat.Start()
defer filebeat.Stop()

// prepare to query ES
es := integration.GetESClient(t, "http")

t.Cleanup(func() {
_, err := es.Indices.DeleteDataStream([]string{
otelIndex,
fbIndex,
})
require.NoError(t, err, "failed to delete indices")
})

rawQuery := map[string]any{
"query": map[string]any{
"match_phrase": map[string]any{
"input.type": "gcp-pubsub",
},
},
"sort": []map[string]any{
{"@timestamp": map[string]any{"order": "asc"}},
},
}

var filebeatDocs estools.Documents
var otelDocs estools.Documents
var err error

// wait for logs to be published
require.EventuallyWithTf(t,
func(ct *assert.CollectT) {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+otelIndex+"*", es)
assert.NoError(ct, err)
assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 otel document, got %d", otelDocs.Hits.Total.Value)

filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+fbIndex+"*", es)
assert.NoError(ct, err)
assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, 1, "expected at least 1 filebeat document, got %d", filebeatDocs.Hits.Total.Value)
},
3*time.Minute, 1*time.Second, "expected at least 1 document for both filebeat and otel modes")

filebeatDoc := filebeatDocs.Hits.Hits[0].Source
otelDoc := otelDocs.Hits.Hits[0].Source
ignoredFields := []string{
// Expected to change between agentDocs and OtelDocs
"@timestamp",
"agent.ephemeral_id",
"agent.id",
"event.created",
}

oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
}
38 changes: 21 additions & 17 deletions x-pack/filebeat/input/gcppubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@

import (
"context"
"errors"
"io"
"net/http"
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand All @@ -21,13 +16,12 @@
"cloud.google.com/go/pubsub"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/tests/compose"
"github.com/elastic/beats/v7/libbeat/tests/resources"
"github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub/testutil"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp/logptest"
)
Expand All @@ -38,6 +32,7 @@
emulatorSubscription = "test-subscription-bar"
)

<<<<<<< HEAD

Check failure on line 35 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: non-declaration statement outside function body
var once sync.Once

func testSetup(t *testing.T) (*pubsub.Client, context.CancelFunc) {
Expand Down Expand Up @@ -189,6 +184,8 @@
t.Log("New subscription created:", sub.ID())
}

=======

Check failure on line 187 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: non-declaration statement outside function body
>>>>>>> 7577d5964 (otel: fix remaining tests to use in process testing collector (#47772))

Check failure on line 188 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

invalid character U+0023 '#'
func ifNotDone(ctx context.Context, f func()) func() {
return func() {
select {
Expand All @@ -212,22 +209,18 @@
})
}

func isInDockerIntegTestEnv() bool {
return os.Getenv("BEATS_INSIDE_INTEGRATION_TEST_ENV") != ""
}

func runTest(t *testing.T, cfg *conf.C, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) {
runTestWithACKer(t, cfg, ackEvent, run)
}

func runTestWithACKer(t *testing.T, cfg *conf.C, onEvent eventHandler, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) {
if !isInDockerIntegTestEnv() {
if !testutil.IsInDockerIntegTestEnv() {
// Don't test goroutines when using our compose.EnsureUp.
defer resources.NewGoroutinesChecker().Check(t)
}

// Create pubsub client for setting up and communicating to emulator.
client, clientCancel := testSetup(t)
client, clientCancel := testutil.TestSetup(t)
defer clientCancel()
defer client.Close()

Expand Down Expand Up @@ -355,7 +348,7 @@
_ = cfg.SetBool("subscription.create", -1, false)

runTest(t, cfg, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) {
createTopic(t, client)
testutil.CreateTopic(t, client)

err := input.run()
if assert.Error(t, err) {
Expand All @@ -368,14 +361,20 @@
cfg := defaultTestConfig()

runTest(t, cfg, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) {
<<<<<<< HEAD

Check failure on line 364 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected <<, expected }
createTopic(t, client)
createSubscription(t, client)
publishMessages(t, client, 5)
=======

Check failure on line 368 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected ==, expected }
testutil.CreateTopic(t, client)
testutil.CreateSubscription(t, emulatorSubscription, client)
testutil.PublishMessages(t, client, 5)
>>>>>>> 7577d5964 (otel: fix remaining tests to use in process testing collector (#47772))

Check failure on line 372 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

invalid character U+0023 '#'

var group errgroup.Group
group.Go(input.run)

Check failure on line 375 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: non-declaration statement outside function body

time.AfterFunc(10*time.Second, func() { out.Close() })

Check failure on line 377 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected {, expected name

Check failure on line 377 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

method has no receiver
events, ok := out.waitForEvents(5)
if !ok {
t.Fatalf("Expected 5 events, but got %d.", len(events))
Expand All @@ -392,12 +391,12 @@
cfg := defaultTestConfig()

runTest(t, cfg, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) {
createTopic(t, client)
testutil.CreateTopic(t, client)

group, ctx := errgroup.WithContext(context.Background())
group.Go(input.run)

time.AfterFunc(1*time.Second, ifNotDone(ctx, func() { publishMessages(t, client, 5) }))
time.AfterFunc(1*time.Second, ifNotDone(ctx, func() { testutil.PublishMessages(t, client, 5) }))
time.AfterFunc(10*time.Second, func() { out.Close() })

events, ok := out.waitForEvents(5)
Expand Down Expand Up @@ -442,14 +441,19 @@
}

runTestWithACKer(t, cfg, halfAcker, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) {
<<<<<<< HEAD

Check failure on line 444 in x-pack/filebeat/input/gcppubsub/pubsub_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected <<, expected }
createTopic(t, client)
createSubscription(t, client)
=======
testutil.CreateTopic(t, client)
testutil.CreateSubscription(t, emulatorSubscription, client)
>>>>>>> 7577d5964 (otel: fix remaining tests to use in process testing collector (#47772))

group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

const numMsgs = 10
publishMessages(t, client, numMsgs)
testutil.PublishMessages(t, client, numMsgs)
events, ok := out.waitForEvents(numMsgs)
if !ok {
t.Fatalf("Expected %d events, but got %d.", 1, len(events))
Expand Down
Loading
Loading