From 11bdedb7b182fb669885b0162bb27c7d7a5eef7a Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 26 Nov 2025 08:03:22 -0300 Subject: [PATCH 1/2] otel: fix remaining tests to use in process testing collector (#47772) (cherry picked from commit 7577d59648c9d4d352c0f2a249e6dca550c27b2f) # Conflicts: # x-pack/filebeat/input/gcppubsub/otel_test.go # x-pack/filebeat/input/gcppubsub/pubsub_test.go # x-pack/filebeat/tests/integration/otel_test.go --- x-pack/filebeat/input/gcppubsub/otel_test.go | 243 +++++++++++++ .../filebeat/input/gcppubsub/pubsub_test.go | 38 +- .../input/gcppubsub/testutil/testutil.go | 182 ++++++++++ .../filebeat/tests/integration/otel_test.go | 340 ++++++++++++++++++ 4 files changed, 786 insertions(+), 17 deletions(-) create mode 100644 x-pack/filebeat/input/gcppubsub/otel_test.go create mode 100644 x-pack/filebeat/input/gcppubsub/testutil/testutil.go diff --git a/x-pack/filebeat/input/gcppubsub/otel_test.go b/x-pack/filebeat/input/gcppubsub/otel_test.go new file mode 100644 index 000000000000..341c465e47bc --- /dev/null +++ b/x-pack/filebeat/input/gcppubsub/otel_test.go @@ -0,0 +1,243 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration && !agentbeat + +package gcppubsub_test + +import ( + "bytes" + "context" + "fmt" + "testing" + "text/template" + "time" + + "github.com/gofrs/uuid/v5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest" + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub/testutil" + "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/oteltestcol" + + "github.com/elastic/elastic-agent-libs/testing/estools" +) + +func TestGCPInputOTelE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + // Create pubsub client for setting up and communicating to emulator. + client, clientCancel := testutil.TestSetup(t) + defer func() { + clientCancel() + client.Close() + }() + + testutil.CreateTopic(t, client) + testutil.CreateSubscription(t, "test-subscription-otel", client) + testutil.CreateSubscription(t, "test-subscription-fb", client) + const numMsgs = 10 + testutil.PublishMessages(t, client, numMsgs) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + // create a random uuid and make sure it doesn't contain dashes/ + otelNamespace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4())) + fbNameSpace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4())) + + otelIndex := "logs-integration-" + otelNamespace + fbIndex := "logs-integration-" + fbNameSpace + + type options struct { + Namespace string + ESURL string + Username string + Password string + Subscription string + } + + gcpFilebeatConfig := `filebeat.inputs: +- type: gcp-pubsub + project_id: test-project-id + topic: test-topic-foo + subscription.name: {{ .Subscription }} + credentials_file: "testdata/fake.json" + +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: logs-integration-{{ .Namespace }} + +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + + gcpOTelConfig := `exporters: + elasticsearch: + auth: + authenticator: beatsauth + compression: gzip + compression_params: + level: 1 + endpoints: + - {{ .ESURL }} + logs_dynamic_pipeline: + enabled: true + logs_index: logs-integration-{{ .Namespace }} + mapping: + mode: bodymap + max_conns_per_host: 1 + password: {{ .Password }} + retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 + sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 3200 + wait_for_result: true + user: {{ .Username }} +extensions: + beatsauth: + idle_connection_timeout: 3s + proxy_disable: false + timeout: 1m30s +receivers: + filebeatreceiver: + filebeat: + inputs: + - credentials_file: "testdata/fake.json" + project_id: test-project-id + subscription: + name: {{ .Subscription }} + topic: test-topic-foo + type: gcp-pubsub + output: + otelconsumer: + processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ + queue.mem.flush.timeout: 0s + setup.template.enabled: false +service: + extensions: + - beatsauth + pipelines: + logs: + exporters: + - elasticsearch + receivers: + - filebeatreceiver + telemetry: + metrics: + level: none +` + + optionsValue := options{ + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + } + + var configBuffer bytes.Buffer + optionsValue.Namespace = otelNamespace + optionsValue.Subscription = "test-subscription-otel" + require.NoError(t, template.Must(template.New("config").Parse(gcpOTelConfig)).Execute(&configBuffer, optionsValue)) + + oteltestcol.New(t, configBuffer.String()) + + // reset buffer + configBuffer.Reset() + + optionsValue.Namespace = fbNameSpace + optionsValue.Subscription = "test-subscription-fb" + require.NoError(t, template.Must(template.New("config").Parse(gcpFilebeatConfig)).Execute(&configBuffer, optionsValue)) + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + filebeat.WriteConfigFile(configBuffer.String()) + filebeat.Start() + defer filebeat.Stop() + + // prepare to query ES + es := integration.GetESClient(t, "http") + + t.Cleanup(func() { + _, err := es.Indices.DeleteDataStream([]string{ + otelIndex, + fbIndex, + }) + require.NoError(t, err, "failed to delete indices") + }) + + rawQuery := map[string]any{ + "query": map[string]any{ + "match_phrase": map[string]any{ + "input.type": "gcp-pubsub", + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + // wait for logs to be published + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+otelIndex+"*", es) + assert.NoError(ct, err) + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 otel document, got %d", otelDocs.Hits.Total.Value) + + filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+fbIndex+"*", es) + assert.NoError(ct, err) + assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, 1, "expected at least 1 filebeat document, got %d", filebeatDocs.Hits.Total.Value) + }, + 3*time.Minute, 1*time.Second, "expected at least 1 document for both filebeat and otel modes") + + filebeatDoc := filebeatDocs.Hits.Hits[0].Source + otelDoc := otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "event.created", + } + + oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") +} diff --git a/x-pack/filebeat/input/gcppubsub/pubsub_test.go b/x-pack/filebeat/input/gcppubsub/pubsub_test.go index 6e845d3f3d1d..abd09621355c 100644 --- a/x-pack/filebeat/input/gcppubsub/pubsub_test.go +++ b/x-pack/filebeat/input/gcppubsub/pubsub_test.go @@ -8,11 +8,6 @@ package gcppubsub import ( "context" - "errors" - "io" - "net/http" - "os" - "strconv" "sync" "sync/atomic" "testing" @@ -21,13 +16,12 @@ import ( "cloud.google.com/go/pubsub" "github.com/stretchr/testify/assert" "golang.org/x/sync/errgroup" - "google.golang.org/api/iterator" "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/tests/compose" "github.com/elastic/beats/v7/libbeat/tests/resources" + "github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub/testutil" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp/logptest" ) @@ -38,6 +32,7 @@ const ( emulatorSubscription = "test-subscription-bar" ) +<<<<<<< HEAD var once sync.Once func testSetup(t *testing.T) (*pubsub.Client, context.CancelFunc) { @@ -189,6 +184,8 @@ func createSubscription(t *testing.T, client *pubsub.Client) { t.Log("New subscription created:", sub.ID()) } +======= +>>>>>>> 7577d5964 (otel: fix remaining tests to use in process testing collector (#47772)) func ifNotDone(ctx context.Context, f func()) func() { return func() { select { @@ -212,22 +209,18 @@ func defaultTestConfig() *conf.C { }) } -func isInDockerIntegTestEnv() bool { - return os.Getenv("BEATS_INSIDE_INTEGRATION_TEST_ENV") != "" -} - func runTest(t *testing.T, cfg *conf.C, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) { runTestWithACKer(t, cfg, ackEvent, run) } func runTestWithACKer(t *testing.T, cfg *conf.C, onEvent eventHandler, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) { - if !isInDockerIntegTestEnv() { + if !testutil.IsInDockerIntegTestEnv() { // Don't test goroutines when using our compose.EnsureUp. defer resources.NewGoroutinesChecker().Check(t) } // Create pubsub client for setting up and communicating to emulator. - client, clientCancel := testSetup(t) + client, clientCancel := testutil.TestSetup(t) defer clientCancel() defer client.Close() @@ -355,7 +348,7 @@ func TestSubscriptionDoesNotExistError(t *testing.T) { _ = cfg.SetBool("subscription.create", -1, false) runTest(t, cfg, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) { - createTopic(t, client) + testutil.CreateTopic(t, client) err := input.run() if assert.Error(t, err) { @@ -368,9 +361,15 @@ func TestSubscriptionExists(t *testing.T) { cfg := defaultTestConfig() runTest(t, cfg, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) { +<<<<<<< HEAD createTopic(t, client) createSubscription(t, client) publishMessages(t, client, 5) +======= + testutil.CreateTopic(t, client) + testutil.CreateSubscription(t, emulatorSubscription, client) + testutil.PublishMessages(t, client, 5) +>>>>>>> 7577d5964 (otel: fix remaining tests to use in process testing collector (#47772)) var group errgroup.Group group.Go(input.run) @@ -392,12 +391,12 @@ func TestSubscriptionCreate(t *testing.T) { cfg := defaultTestConfig() runTest(t, cfg, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) { - createTopic(t, client) + testutil.CreateTopic(t, client) group, ctx := errgroup.WithContext(context.Background()) group.Go(input.run) - time.AfterFunc(1*time.Second, ifNotDone(ctx, func() { publishMessages(t, client, 5) })) + time.AfterFunc(1*time.Second, ifNotDone(ctx, func() { testutil.PublishMessages(t, client, 5) })) time.AfterFunc(10*time.Second, func() { out.Close() }) events, ok := out.waitForEvents(5) @@ -442,14 +441,19 @@ func TestEndToEndACK(t *testing.T) { } runTestWithACKer(t, cfg, halfAcker, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) { +<<<<<<< HEAD createTopic(t, client) createSubscription(t, client) +======= + testutil.CreateTopic(t, client) + testutil.CreateSubscription(t, emulatorSubscription, client) +>>>>>>> 7577d5964 (otel: fix remaining tests to use in process testing collector (#47772)) group, _ := errgroup.WithContext(context.Background()) group.Go(input.run) const numMsgs = 10 - publishMessages(t, client, numMsgs) + testutil.PublishMessages(t, client, numMsgs) events, ok := out.waitForEvents(numMsgs) if !ok { t.Fatalf("Expected %d events, but got %d.", 1, len(events)) diff --git a/x-pack/filebeat/input/gcppubsub/testutil/testutil.go b/x-pack/filebeat/input/gcppubsub/testutil/testutil.go new file mode 100644 index 000000000000..b2bc45f63f62 --- /dev/null +++ b/x-pack/filebeat/input/gcppubsub/testutil/testutil.go @@ -0,0 +1,182 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package testutil + +import ( + "context" + "errors" + "io" + "net/http" + "os" + "strconv" + "sync" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/tests/compose" + + "cloud.google.com/go/pubsub" + "google.golang.org/api/iterator" +) + +const ( + emulatorProjectID = "test-project-id" + emulatorTopic = "test-topic-foo" +) + +var once sync.Once + +func TestSetup(t *testing.T) (*pubsub.Client, context.CancelFunc) { + t.Helper() + + var host string + if IsInDockerIntegTestEnv() { + // We're running inside of integration test environment so + // make sure that that googlepubsub container is running. + host = compose.EnsureUp(t, "googlepubsub").Host() + os.Setenv("PUBSUB_EMULATOR_HOST", host) + } else { + host = os.Getenv("PUBSUB_EMULATOR_HOST") + if host == "" { + t.Skip("PUBSUB_EMULATOR_HOST is not set in environment. You can start " + + "the emulator with \"docker-compose up\" from the _meta directory. " + + "The default address is PUBSUB_EMULATOR_HOST=localhost:8432") + } + } + + once.Do(func() { + + // Disable HTTP keep-alives to ensure no extra goroutines hang around. + httpClient := http.Client{Transport: &http.Transport{DisableKeepAlives: true}} + + // Sanity check the emulator. + //nolint:noctx // this is just for the tests + resp, err := httpClient.Get("http://" + host) + if err != nil { + t.Fatalf("pubsub emulator at %s is not healthy: %v", host, err) + } + defer resp.Body.Close() + + _, err = io.ReadAll(resp.Body) + if err != nil { + t.Fatal("failed to read response", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("pubsub emulator is not healthy, got status code %d", resp.StatusCode) + } + }) + + ctx, cancel := context.WithCancel(context.Background()) + client, err := pubsub.NewClient(ctx, emulatorProjectID) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + resetPubSub(t, client) + return client, cancel +} + +func CreateTopic(t *testing.T, client *pubsub.Client) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + topic := client.Topic(emulatorTopic) + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatalf("failed to check if topic exists: %v", err) + } + if !exists { + if topic, err = client.CreateTopic(ctx, emulatorTopic); err != nil { + t.Fatalf("failed to create the topic: %v", err) + } + t.Log("Topic created:", topic.ID()) + } +} + +func CreateSubscription(t *testing.T, subscription string, client *pubsub.Client) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sub := client.Subscription(subscription) + exists, err := sub.Exists(ctx) + if err != nil { + t.Fatalf("failed to check if sub exists: %v", err) + } + if exists { + return + } + + sub, err = client.CreateSubscription(ctx, subscription, pubsub.SubscriptionConfig{ + Topic: client.Topic(emulatorTopic), + }) + if err != nil { + t.Fatalf("failed to create subscription: %v", err) + } + t.Log("New subscription created:", sub.ID()) +} + +func PublishMessages(t *testing.T, client *pubsub.Client, numMsgs int) []string { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + topic := client.Topic(emulatorTopic) + defer topic.Stop() + + messageIDs := make([]string, numMsgs) + for i := 0; i < numMsgs; i++ { + result := topic.Publish(ctx, &pubsub.Message{ + Data: []byte(time.Now().UTC().Format(time.RFC3339Nano) + ": hello world " + strconv.Itoa(i)), + }) + + // Wait for message to publish and get assigned ID. + id, err := result.Get(ctx) + if err != nil { + t.Fatal(err) + } + messageIDs[i] = id + } + t.Logf("Published %d messages to topic %v. ID range: [%v, %v]", len(messageIDs), topic.ID(), messageIDs[0], messageIDs[len(messageIDs)-1]) + return messageIDs +} + +func resetPubSub(t *testing.T, client *pubsub.Client) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Clear topics. + topics := client.Topics(ctx) + for { + topic, err := topics.Next() + if errors.Is(err, iterator.Done) { + break + } + if err != nil { + t.Fatal(err) + } + if err = topic.Delete(ctx); err != nil { + t.Fatalf("failed to delete topic %v: %v", topic.ID(), err) + } + } + + // Clear subscriptions. + subs := client.Subscriptions(ctx) + for { + sub, err := subs.Next() + if errors.Is(err, iterator.Done) { + break + } + if err != nil { + t.Fatal(err) + } + + if err = sub.Delete(ctx); err != nil { + t.Fatalf("failed to delete subscription %v: %v", sub.ID(), err) + } + } +} + +func IsInDockerIntegTestEnv() bool { + return os.Getenv("BEATS_INSIDE_INTEGRATION_TEST_ENV") != "" +} diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index eac40156dc3d..0de25dbc87c7 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -667,3 +667,343 @@ service: }) } } +<<<<<<< HEAD +======= + +func TestFileBeatKerberos(t *testing.T) { + wantEvents := 1 + krbURL := "http://localhost:9203" // this is kerberos client - we've hardcoded the URL here + tempFile := t.TempDir() + // ES client + esCfg := elasticsearch.Config{ + Addresses: []string{krbURL}, + Username: "admin", + Password: "testing", + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec // this is only for testing + }, + }, + } + + es, err := elasticsearch.NewClient(esCfg) + require.NoError(t, err, "could not get elasticsearch client") + + setupRoleMapping(t, es) + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + filebeatIndex := "logs-filebeat.kerberos-" + namespace + + otelConfig := struct { + Index string + InputFile string + PathHome string + Endpoint string + }{ + Index: filebeatIndex, + InputFile: filepath.Join(tempFile, "log.log"), + PathHome: tempFile, + Endpoint: krbURL, + } + + cfg := `receivers: + filebeatreceiver/filestream: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + queue.mem.flush.timeout: 0s + management.otel.enabled: true + path.home: {{.PathHome}} +extensions: + beatsauth: + kerberos: + auth_type: "password" + config_path: "../../../../libbeat/outputs/elasticsearch/testdata/krb5.conf" + username: "beats" + password: "testing" + realm: "elastic" +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - {{.Endpoint}} + logs_index: {{.Index}} + mapping: + mode: bodymap + auth: + authenticator: beatsauth +service: + extensions: + - beatsauth + pipelines: + logs: + receivers: + - filebeatreceiver/filestream + exporters: + - elasticsearch/log + - debug +` + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + writeEventsToLogFile(t, otelConfig.InputFile, wantEvents) + oteltestcol.New(t, string(configContents)) + + // wait for logs to be published + require.EventuallyWithT(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*") + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d events, got %d", wantEvents, otelDocs.Hits.Total.Value) + }, + 2*time.Minute, 1*time.Second) +} + +func TestFilebeatOTelBeatProcessorE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + wantEvents := 1 + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + processorIndex := "logs-processor-" + namespace + receiverIndex := "logs-receiver-" + namespace + + configParameters := struct { + Index string + InputFile string + PathHome string + }{ + Index: processorIndex, + InputFile: filepath.Join(t.TempDir(), "log.log"), + PathHome: t.TempDir(), + } + + configTemplate := ` +service: + pipelines: + logs: + receivers: + - filebeatreceiver + processors: + - beat + exporters: + - elasticsearch/log + - debug + telemetry: + metrics: + level: none # Disable collector's own metrics to prevent conflict on port 8888. We don't use those metrics anyway. +receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + processors: + # Configure a processor to prevent enabling default processors + - add_fields: + fields: + custom_field: "custom_value" + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: {{.PathHome}} +processors: + beat: + processors: + - add_host_metadata: +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: admin + password: testing + logs_index: {{.Index}} + sending_queue: + enabled: true + batch: + flush_timeout: 1s + mapping: + mode: bodymap +` + var renderedConfig bytes.Buffer + require.NoError(t, template.Must(template.New("config").Parse(configTemplate)).Execute(&renderedConfig, configParameters)) + configContents := renderedConfig.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Processor config:\n%s", configContents) + } + }) + + writeEventsToLogFile(t, configParameters.InputFile, wantEvents) + oteltestcol.New(t, string(configContents)) + + receiverConfig := ` +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - elasticsearch/log + - debug + telemetry: + metrics: + level: none # Disable collector's own metrics to prevent conflict on port 8888. We don't use those metrics anyway. +receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - %s + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + processors: + - add_fields: + fields: + custom_field: "custom_value" + - add_host_metadata: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: %s +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: admin + password: testing + logs_index: %s + sending_queue: + enabled: true + batch: + flush_timeout: 1s + mapping: + mode: bodymap +` + logFilePath := filepath.Join(t.TempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, wantEvents) + receiverRenderedConfig := fmt.Sprintf(receiverConfig, + logFilePath, + t.TempDir(), + receiverIndex, + ) + t.Cleanup(func() { + if t.Failed() { + t.Logf("Receiver config:\n%s", receiverRenderedConfig) + } + }) + oteltestcol.New(t, receiverRenderedConfig) + + es := integration.GetESClient(t, "http") + + var processorDocuments estools.Documents + var receiverDocuments estools.Documents + var err error + + // wait for logs to be published + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + processorDocuments, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+processorIndex+"*") + assert.NoError(ct, err) + + receiverDocuments, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+receiverIndex+"*") + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, processorDocuments.Hits.Total.Value, wantEvents, "expected at least %d otel events, got %d", wantEvents, processorDocuments.Hits.Total.Value) + assert.GreaterOrEqual(ct, receiverDocuments.Hits.Total.Value, wantEvents, "expected at least %d filebeat events, got %d", wantEvents, receiverDocuments.Hits.Total.Value) + }, + 2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", wantEvents) + + processorDoc := processorDocuments.Hits.Hits[0].Source + receiverDoc := receiverDocuments.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between the agents + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "log.file.inode", + "log.file.path", + } + + oteltest.AssertMapsEqual(t, receiverDoc, processorDoc, ignoredFields, "expected documents to be equal") +} + +// setupRoleMapping sets up role mapping for the Kerberos user beats@elastic +func setupRoleMapping(t *testing.T, client *elasticsearch.Client) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // prepare to query ES + roleMappingURL := "http://localhost:9203/_security/role_mapping/kerbrolemapping" + + body := map[string]interface{}{ + "roles": []string{"superuser"}, + "enabled": true, + "rules": map[string]interface{}{ + "field": map[string]interface{}{ + "username": "beats@elastic", + }, + }, + } + + jsonData, err := json.Marshal(body) + require.NoError(t, err, "could not marshal role mapping body to json") + + // Build request + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + roleMappingURL, + bytes.NewReader(jsonData)) + require.NoError(t, err, "could not create role mapping request") + + // Set content type header + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Perform(req) + require.NoError(t, err, "could not perform role mapping request") + defer resp.Body.Close() + + require.Equal(t, resp.StatusCode, http.StatusOK, "incorrect response code") +} +>>>>>>> 7577d5964 (otel: fix remaining tests to use in process testing collector (#47772)) From 4da7db81f9c2df14fbd7b771587d1cb06d1316e4 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 26 Nov 2025 14:14:21 -0300 Subject: [PATCH 2/2] fix conflicts in filebeat otel test --- .../filebeat/tests/integration/otel_test.go | 340 ------------------ 1 file changed, 340 deletions(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 0de25dbc87c7..eac40156dc3d 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -667,343 +667,3 @@ service: }) } } -<<<<<<< HEAD -======= - -func TestFileBeatKerberos(t *testing.T) { - wantEvents := 1 - krbURL := "http://localhost:9203" // this is kerberos client - we've hardcoded the URL here - tempFile := t.TempDir() - // ES client - esCfg := elasticsearch.Config{ - Addresses: []string{krbURL}, - Username: "admin", - Password: "testing", - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, //nolint:gosec // this is only for testing - }, - }, - } - - es, err := elasticsearch.NewClient(esCfg) - require.NoError(t, err, "could not get elasticsearch client") - - setupRoleMapping(t, es) - - namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") - filebeatIndex := "logs-filebeat.kerberos-" + namespace - - otelConfig := struct { - Index string - InputFile string - PathHome string - Endpoint string - }{ - Index: filebeatIndex, - InputFile: filepath.Join(tempFile, "log.log"), - PathHome: tempFile, - Endpoint: krbURL, - } - - cfg := `receivers: - filebeatreceiver/filestream: - filebeat: - inputs: - - type: filestream - id: filestream-fbreceiver - enabled: true - paths: - - {{.InputFile}} - prospector.scanner.fingerprint.enabled: false - file_identity.native: ~ - queue.mem.flush.timeout: 0s - management.otel.enabled: true - path.home: {{.PathHome}} -extensions: - beatsauth: - kerberos: - auth_type: "password" - config_path: "../../../../libbeat/outputs/elasticsearch/testdata/krb5.conf" - username: "beats" - password: "testing" - realm: "elastic" -exporters: - debug: - use_internal_logger: false - verbosity: detailed - elasticsearch/log: - endpoints: - - {{.Endpoint}} - logs_index: {{.Index}} - mapping: - mode: bodymap - auth: - authenticator: beatsauth -service: - extensions: - - beatsauth - pipelines: - logs: - receivers: - - filebeatreceiver/filestream - exporters: - - elasticsearch/log - - debug -` - - var configBuffer bytes.Buffer - require.NoError(t, - template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) - configContents := configBuffer.Bytes() - t.Cleanup(func() { - if t.Failed() { - t.Logf("Config contents:\n%s", configContents) - } - }) - - writeEventsToLogFile(t, otelConfig.InputFile, wantEvents) - oteltestcol.New(t, string(configContents)) - - // wait for logs to be published - require.EventuallyWithT(t, - func(ct *assert.CollectT) { - findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer findCancel() - - otelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*") - assert.NoError(ct, err) - - assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d events, got %d", wantEvents, otelDocs.Hits.Total.Value) - }, - 2*time.Minute, 1*time.Second) -} - -func TestFilebeatOTelBeatProcessorE2E(t *testing.T) { - integration.EnsureESIsRunning(t) - wantEvents := 1 - - namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") - processorIndex := "logs-processor-" + namespace - receiverIndex := "logs-receiver-" + namespace - - configParameters := struct { - Index string - InputFile string - PathHome string - }{ - Index: processorIndex, - InputFile: filepath.Join(t.TempDir(), "log.log"), - PathHome: t.TempDir(), - } - - configTemplate := ` -service: - pipelines: - logs: - receivers: - - filebeatreceiver - processors: - - beat - exporters: - - elasticsearch/log - - debug - telemetry: - metrics: - level: none # Disable collector's own metrics to prevent conflict on port 8888. We don't use those metrics anyway. -receivers: - filebeatreceiver: - filebeat: - inputs: - - type: filestream - id: filestream-fbreceiver - enabled: true - paths: - - {{.InputFile}} - prospector.scanner.fingerprint.enabled: false - file_identity.native: ~ - processors: - # Configure a processor to prevent enabling default processors - - add_fields: - fields: - custom_field: "custom_value" - logging: - level: info - selectors: - - '*' - queue.mem.flush.timeout: 0s - path.home: {{.PathHome}} -processors: - beat: - processors: - - add_host_metadata: -exporters: - debug: - use_internal_logger: false - verbosity: detailed - elasticsearch/log: - endpoints: - - http://localhost:9200 - compression: none - user: admin - password: testing - logs_index: {{.Index}} - sending_queue: - enabled: true - batch: - flush_timeout: 1s - mapping: - mode: bodymap -` - var renderedConfig bytes.Buffer - require.NoError(t, template.Must(template.New("config").Parse(configTemplate)).Execute(&renderedConfig, configParameters)) - configContents := renderedConfig.Bytes() - t.Cleanup(func() { - if t.Failed() { - t.Logf("Processor config:\n%s", configContents) - } - }) - - writeEventsToLogFile(t, configParameters.InputFile, wantEvents) - oteltestcol.New(t, string(configContents)) - - receiverConfig := ` -service: - pipelines: - logs: - receivers: - - filebeatreceiver - exporters: - - elasticsearch/log - - debug - telemetry: - metrics: - level: none # Disable collector's own metrics to prevent conflict on port 8888. We don't use those metrics anyway. -receivers: - filebeatreceiver: - filebeat: - inputs: - - type: filestream - id: filestream-fbreceiver - enabled: true - paths: - - %s - prospector.scanner.fingerprint.enabled: false - file_identity.native: ~ - processors: - - add_fields: - fields: - custom_field: "custom_value" - - add_host_metadata: - logging: - level: info - selectors: - - '*' - queue.mem.flush.timeout: 0s - path.home: %s -exporters: - debug: - use_internal_logger: false - verbosity: detailed - elasticsearch/log: - endpoints: - - http://localhost:9200 - compression: none - user: admin - password: testing - logs_index: %s - sending_queue: - enabled: true - batch: - flush_timeout: 1s - mapping: - mode: bodymap -` - logFilePath := filepath.Join(t.TempDir(), "log.log") - writeEventsToLogFile(t, logFilePath, wantEvents) - receiverRenderedConfig := fmt.Sprintf(receiverConfig, - logFilePath, - t.TempDir(), - receiverIndex, - ) - t.Cleanup(func() { - if t.Failed() { - t.Logf("Receiver config:\n%s", receiverRenderedConfig) - } - }) - oteltestcol.New(t, receiverRenderedConfig) - - es := integration.GetESClient(t, "http") - - var processorDocuments estools.Documents - var receiverDocuments estools.Documents - var err error - - // wait for logs to be published - require.EventuallyWithTf(t, - func(ct *assert.CollectT) { - findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer findCancel() - - processorDocuments, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+processorIndex+"*") - assert.NoError(ct, err) - - receiverDocuments, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+receiverIndex+"*") - assert.NoError(ct, err) - - assert.GreaterOrEqual(ct, processorDocuments.Hits.Total.Value, wantEvents, "expected at least %d otel events, got %d", wantEvents, processorDocuments.Hits.Total.Value) - assert.GreaterOrEqual(ct, receiverDocuments.Hits.Total.Value, wantEvents, "expected at least %d filebeat events, got %d", wantEvents, receiverDocuments.Hits.Total.Value) - }, - 2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", wantEvents) - - processorDoc := processorDocuments.Hits.Hits[0].Source - receiverDoc := receiverDocuments.Hits.Hits[0].Source - ignoredFields := []string{ - // Expected to change between the agents - "@timestamp", - "agent.ephemeral_id", - "agent.id", - "log.file.inode", - "log.file.path", - } - - oteltest.AssertMapsEqual(t, receiverDoc, processorDoc, ignoredFields, "expected documents to be equal") -} - -// setupRoleMapping sets up role mapping for the Kerberos user beats@elastic -func setupRoleMapping(t *testing.T, client *elasticsearch.Client) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // prepare to query ES - roleMappingURL := "http://localhost:9203/_security/role_mapping/kerbrolemapping" - - body := map[string]interface{}{ - "roles": []string{"superuser"}, - "enabled": true, - "rules": map[string]interface{}{ - "field": map[string]interface{}{ - "username": "beats@elastic", - }, - }, - } - - jsonData, err := json.Marshal(body) - require.NoError(t, err, "could not marshal role mapping body to json") - - // Build request - req, err := http.NewRequestWithContext(ctx, http.MethodPost, - roleMappingURL, - bytes.NewReader(jsonData)) - require.NoError(t, err, "could not create role mapping request") - - // Set content type header - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Perform(req) - require.NoError(t, err, "could not perform role mapping request") - defer resp.Body.Close() - - require.Equal(t, resp.StatusCode, http.StatusOK, "incorrect response code") -} ->>>>>>> 7577d5964 (otel: fix remaining tests to use in process testing collector (#47772))