From 55a1137280cec553a3a5f470c6f2ceb65d4d69a8 Mon Sep 17 00:00:00 2001 From: Lee E Hinman <57081003+leehinman@users.noreply.github.com> Date: Tue, 25 Nov 2025 14:39:56 -0600 Subject: [PATCH 1/3] remove otel.component.id and otel.component.kind fields from beat receivers (#47729) * remove component id and component kind fields from beat receivers * add real PR link * update integration tests * fix merge mistake (cherry picked from commit ca1c17bca6ad79874ff8ebf0c1177bf516980337) # Conflicts: # x-pack/filebeat/input/gcppubsub/otel_test.go # x-pack/filebeat/tests/integration/otel_lsexporter_test.go # x-pack/filebeat/tests/integration/otel_test.go # x-pack/libbeat/outputs/otelconsumer/otelconsumer.go # x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go # x-pack/metricbeat/mbreceiver/receiver_test.go --- ...ponent.kind-from-beat-receiver-events.yaml | 45 ++ x-pack/filebeat/fbreceiver/receiver_test.go | 9 +- x-pack/filebeat/input/gcppubsub/otel_test.go | 181 +++++++ .../tests/integration/otel_lsexporter_test.go | 501 ++++++++++++++++++ .../filebeat/tests/integration/otel_test.go | 365 ++++++++++++- .../outputs/otelconsumer/otelconsumer.go | 12 +- .../outputs/otelconsumer/otelconsumer_test.go | 8 + x-pack/metricbeat/mbreceiver/receiver_test.go | 19 +- .../metricbeat/tests/integration/otel_test.go | 22 +- 9 files changed, 1121 insertions(+), 41 deletions(-) create mode 100644 changelog/fragments/1763578431-remove-otel.component.id-and-otel.component.kind-from-beat-receiver-events.yaml create mode 100644 x-pack/filebeat/input/gcppubsub/otel_test.go create mode 100644 x-pack/filebeat/tests/integration/otel_lsexporter_test.go diff --git a/changelog/fragments/1763578431-remove-otel.component.id-and-otel.component.kind-from-beat-receiver-events.yaml b/changelog/fragments/1763578431-remove-otel.component.id-and-otel.component.kind-from-beat-receiver-events.yaml new file mode 100644 index 000000000000..bd9e7017aa07 --- /dev/null +++ b/changelog/fragments/1763578431-remove-otel.component.id-and-otel.component.kind-from-beat-receiver-events.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: breaking-change + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: remove otel.component.id and otel.component.kind from beat receiver events + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: all + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/beats/pull/47729 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/beats/issues/47600 diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index f791e58f7ffd..b0b478583bc6 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -93,8 +93,7 @@ func TestNewReceiver(t *testing.T) { AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) { _ = zapLogs require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"])) - assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") - assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.Equal(c, "test", logs["r1"][0].Flatten()["message"], "expected message field to contain string 'test'") var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket, "stats") @@ -242,10 +241,8 @@ func TestMultipleReceivers(t *testing.T) { require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs") require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs") - assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") - assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") - assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") - assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record") + assert.Equal(c, "test", logs["r1"][0].Flatten()["message"], "expected r1 message field to be 'test'") + assert.Equal(c, "test", logs["r2"][0].Flatten()["message"], "expected r2 message field to be 'test'") // Make sure that each receiver has a separate logger // instance and does not interfere with others. Previously, the diff --git a/x-pack/filebeat/input/gcppubsub/otel_test.go b/x-pack/filebeat/input/gcppubsub/otel_test.go new file mode 100644 index 000000000000..ea2d3ded6068 --- /dev/null +++ b/x-pack/filebeat/input/gcppubsub/otel_test.go @@ -0,0 +1,181 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration && !agentbeat + +package gcppubsub + +import ( + "bytes" + "context" + "fmt" + "testing" + "text/template" + "time" + + "github.com/gofrs/uuid/v5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest" + "github.com/elastic/beats/v7/libbeat/tests/integration" + + "github.com/elastic/elastic-agent-libs/testing/estools" +) + +func TestGCPInputOTelE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + // Create pubsub client for setting up and communicating to emulator. + client, clientCancel := testSetup(t) + defer func() { + clientCancel() + client.Close() + }() + + createTopic(t, client) + createSubscription(t, "test-subscription-otel", client) + createSubscription(t, "test-subscription-fb", client) + const numMsgs = 10 + publishMessages(t, client, numMsgs) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + // create a random uuid and make sure it doesn't contain dashes/ + otelNamespace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4())) + fbNameSpace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4())) + + otelIndex := "logs-integration-" + otelNamespace + fbIndex := "logs-integration-" + fbNameSpace + + type options struct { + Namespace string + ESURL string + Username string + Password string + Subscription string + } + + gcpConfig := `filebeat.inputs: +- type: gcp-pubsub + project_id: test-project-id + topic: test-topic-foo + subscription.name: {{ .Subscription }} + credentials_file: "testdata/fake.json" + +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: logs-integration-{{ .Namespace }} + +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + optionsValue := options{ + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + } + + var configBuffer bytes.Buffer + optionsValue.Namespace = otelNamespace + optionsValue.Subscription = "test-subscription-otel" + require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue)) + + filebeatOTel.WriteConfigFile(configBuffer.String()) + + filebeatOTel.Start() + defer filebeatOTel.Stop() + + // reset buffer + configBuffer.Reset() + + optionsValue.Namespace = fbNameSpace + optionsValue.Subscription = "test-subscription-fb" + require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue)) + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + filebeat.WriteConfigFile(configBuffer.String()) + filebeat.Start() + defer filebeat.Stop() + + // prepare to query ES + es := integration.GetESClient(t, "http") + + t.Cleanup(func() { + _, err := es.Indices.DeleteDataStream([]string{ + otelIndex, + fbIndex, + }) + require.NoError(t, err, "failed to delete indices") + }) + + rawQuery := map[string]any{ + "query": map[string]any{ + "match_phrase": map[string]any{ + "input.type": "gcp-pubsub", + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + // wait for logs to be published + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+otelIndex+"*", es) + assert.NoError(ct, err) + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 otel document, got %d", otelDocs.Hits.Total.Value) + + filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+fbIndex+"*", es) + assert.NoError(ct, err) + assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, 1, "expected at least 1 filebeat document, got %d", filebeatDocs.Hits.Total.Value) + }, + 3*time.Minute, 1*time.Second, "expected at least 1 document for both filebeat and otel modes") + + filebeatDoc := filebeatDocs.Hits.Hits[0].Source + otelDoc := otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "event.created", + } + + oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") +} diff --git a/x-pack/filebeat/tests/integration/otel_lsexporter_test.go b/x-pack/filebeat/tests/integration/otel_lsexporter_test.go new file mode 100644 index 000000000000..cb673ada68b8 --- /dev/null +++ b/x-pack/filebeat/tests/integration/otel_lsexporter_test.go @@ -0,0 +1,501 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration && !agentbeat + +package integration + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "math" + "math/rand/v2" + "net/http" + "os" + "path/filepath" + "runtime" + "sort" + "testing" + "time" + + "github.com/brianvoe/gofakeit" + "github.com/gofrs/uuid/v5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest" + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/oteltestcol" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +type event struct { + ID string `json:"id"` + Timestamp string `json:"timestamp"` + StringField string `json:"string_field"` + NumberField int `json:"number_field"` + FloatField float64 `json:"float_field"` + BooleanField bool `json:"boolean_field"` + ArrayField []interface{} `json:"array_field"` + ObjectField map[string]interface{} `json:"object_field"` + KVField map[string]interface{} `json:"kv_field"` +} + +type eventWithID struct { + id string + data mapstr.M +} + +// TestDataShapeOTelVSClassicE2E verifies that the event data shape of filebeat in otel mode is the same as filebeat. +// Two Filebeat instances are started: +// +// one for classic mode sending to Logstash on port 5044 and +// one for Otel mode sending to Logstash on port 5055. +// +// Logstash runs two pipelines listening on those ports and writes the resulting events into a shared Docker volume. +// Nginx container serves that volume over HTTP so the test can fetch the generated files without relying on host filesystem permissions. +// Finally, the test downloads both files to ./tests/integration/logstash/testdata and compares the sorted events line by line. +func TestDataShapeOTelVSClassicE2E(t *testing.T) { + // ensure the size of events is big enough (1024b) for filebeat to ingest + numEvents := 3 + + // Agent does not support `index` setting, while beats does. + // Our focus is on agent classic vs otel mode comparison, so we do not test `index` for filebeat + beatsCfgFile := ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + paths: + - %s +output.logstash: + hosts: + - "localhost:%s" + pipelining: 0 + worker: 1 +queue.mem.flush.timeout: 0s +processors: + - add_host_metadata: ~ + - add_fields: + target: "" + fields: + testcase: %s +` + testCaseName := uuid.Must(uuid.NewV4()).String() + events := generateEvents(numEvents) + + tmpdir := t.TempDir() + inputFilePath := filepath.Join(tmpdir, "event.json") + writeEvents(t, inputFilePath, events) + + otelConfig := fmt.Sprintf(`receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-input-id + enabled: true + paths: + - %s + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + processors: + - add_host_metadata: ~ + - add_fields: + target: "" + fields: + testcase: %s + queue.mem.flush.timeout: 0s + setup.template.enabled: false + path.home: %s +exporters: + logstash: + hosts: + - "localhost:5055" + tls: + insecure: true +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - logstash + telemetry: + metrics: + level: none +`, inputFilePath, testCaseName, tmpdir) + + // Start OTel collector with filebeatreceiver + oteltestcol.New(t, otelConfig) + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + inputFilePath = filepath.Join(filebeat.TempDir(), "event.json") + writeEvents(t, inputFilePath, events) + + filebeat.WriteConfigFile(fmt.Sprintf(beatsCfgFile, inputFilePath, "5044", testCaseName)) + filebeat.Start() + defer filebeat.Stop() + + // Nginx endpoint URLs + baseURL := "http://localhost:8082" + outFileURL := fmt.Sprintf("%s/%s_fb.json", baseURL, testCaseName) + outOTelFileURL := fmt.Sprintf("%s/%s_otel.json", baseURL, testCaseName) + + // wait for logs to be published over HTTP + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + for _, url := range []string{outFileURL, outOTelFileURL} { + checkURLHasContent(ct, url) + } + }, + 30*time.Second, 1*time.Second, "expected Nginx to serve json files over HTTP") + + // download files from Nginx into testdata directory + fbFilePath := downloadToTestData(t, outFileURL, fmt.Sprintf("%s_fb.json", testCaseName)) + otelFilePath := downloadToTestData(t, outOTelFileURL, fmt.Sprintf("%s_otel.json", testCaseName)) + + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "log.file.inode", + "log.file.path", + // only present in beats receivers + "log.file.device_id", + "log.file.fingerprint", + } + + compareOutputFiles(t, fbFilePath, otelFilePath, ignoredFields) +} + +// TestLogstashExporterProxyURL verifies that Filebeat OTel mode can send data to Logstash via a SOCKS5 proxy. +// Filebeat otel mode sends events to "logstash" via a socks5-proxy container running on localhost:1080 +func TestLogstashExporterProxyURL(t *testing.T) { + // ensure the size of events is big enough + numEvents := 3 + + testCaseName := uuid.Must(uuid.NewV4()).String() + events := generateEvents(numEvents) + + // Create OTel collector configuration with filebeatreceiver + tmpdir := t.TempDir() + inputFilePath := filepath.Join(tmpdir, "event.json") + writeEvents(t, inputFilePath, events) + + otelConfig := fmt.Sprintf(`receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-input-id + enabled: true + paths: + - %s + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + processors: + - add_host_metadata: ~ + - add_fields: + target: "" + fields: + testcase: %s + queue.mem.flush.timeout: 0s + setup.template.enabled: false + path.home: %s +exporters: + logstash: + hosts: + - "logstash:5055" + ttl: 0s + proxy_url: "socks5://elastic:changeme@localhost:1080" + proxy_use_local_resolver: false + worker: 1 + workers: 0 + max_retries: 3 +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - logstash + telemetry: + metrics: + level: none +`, inputFilePath, testCaseName, tmpdir) + + // Start OTel collector with filebeatreceiver + oteltestcol.New(t, otelConfig) + + // Nginx endpoint URLs + baseURL := "http://localhost:8082" + outOTelFileURL := fmt.Sprintf("%s/%s_otel.json", baseURL, testCaseName) + + // Logstash is outputting to a file inside its container, to access + // this file we use Nginx to serve the output folder via HTTP + // (see docker-compose.yml for Logstash and Nginx configuration). + // Wait to ensure the file can be downloaded via HTTP, the file + // being available indicates that Filebeat successfully sent data + // to Logstash + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + for _, url := range []string{outOTelFileURL} { + checkURLHasContent(ct, url) + } + }, + 30*time.Second, + 1*time.Second, + "did not find Logstash output file served via Nginx") + + // download files from Nginx + otelFilePath := downloadToTestData(t, outOTelFileURL, fmt.Sprintf("%s_otel.json", testCaseName)) + otelEvents, err := readAllEvents(otelFilePath) + + require.NoError(t, err, "failed to read otel events") + require.Equal(t, numEvents, len(otelEvents), + "different number of events: sent=%d, get=%d", numEvents, len(otelEvents)) +} + +func generateEvents(numEvents int) []string { + gofakeit.Seed(time.Now().UnixNano()) + + events := make([]string, 0, numEvents) + for i := 0; i < numEvents; i++ { + // Generate mixed-type array field + arrayField := make([]interface{}, 9) + arrayField[0] = gofakeit.Word() + arrayField[1] = gofakeit.Int64() + arrayField[2] = gofakeit.Float64() + arrayField[3] = rand.IntN(2) == 0 // bool + arrayField[4] = gofakeit.Name() + arrayField[5] = math.MaxInt + arrayField[6] = math.MinInt + arrayField[7] = math.MaxFloat64 + arrayField[8] = math.SmallestNonzeroFloat64 + + kvArrayField := make([]interface{}, 4) + kvArrayField[0] = gofakeit.Color() + kvArrayField[1] = gofakeit.Number(-100, 100) + kvArrayField[2] = gofakeit.Float32Range(0, 50) + kvArrayField[3] = rand.IntN(2) == 0 // bool + + ev := event{ + ID: uuid.Must(uuid.NewV4()).String(), + Timestamp: time.Now().Format(time.RFC3339Nano), + StringField: gofakeit.Sentence(2), + NumberField: rand.IntN(1000), + FloatField: rand.Float64() * 100, + BooleanField: rand.IntN(2) == 0, + ArrayField: arrayField, + ObjectField: map[string]interface{}{ + "nested_key": "nested_value", + "nested_number": gofakeit.Number(1, 1000), + }, + KVField: map[string]interface{}{ + "key_string": gofakeit.Word(), + "key_number": gofakeit.Number(1, 5000), + "key_bool": rand.IntN(2) == 0, + "key_array": kvArrayField, + "key_object": map[string]interface{}{ + "inner1": rand.IntN(2) == 0, + "inner2": gofakeit.Float64Range(0, 10), + "inner_obj": map[string]interface{}{ + "deep_key": gofakeit.HipsterSentence(3), + "deep_arr": kvArrayField, + }, + }, + }, + } + + b, _ := json.Marshal(ev) + events = append(events, string(b)) + } + return events +} + +func writeEvents(t *testing.T, filepath string, events []string) { + f, err := os.Create(filepath) + if err != nil { + t.Fatalf("cannot create file '%s': %s", filepath, err) + } + + for _, event := range events { + if _, err := f.WriteString(event + "\n"); err != nil { + t.Fatalf("cannot write log file '%s': %s", filepath, err) + } + } + + if err := f.Sync(); err != nil { + t.Errorf("cannot sync %q: %s", filepath, err) + } + if err := f.Close(); err != nil { + t.Errorf("cannot close %q: %s", filepath, err) + } +} + +func parseJson(jsonStr string) (mapstr.M, error) { + var data map[string]interface{} + if err := json.Unmarshal([]byte(jsonStr), &data); err != nil { + return nil, err + } + return data, nil +} + +func compareOutputFiles(t *testing.T, fbFilePath, otelFilePath string, ignoredFields []string) { + fbEvents, err := readAllEvents(fbFilePath) + require.NoError(t, err, "failed to read filebeat events") + + otelEvents, err := readAllEvents(otelFilePath) + require.NoError(t, err, "failed to read otel events") + + require.Equal(t, len(fbEvents), len(otelEvents), + "different number of events: filebeat=%d, otel=%d", len(fbEvents), len(otelEvents)) + + sortEventsByID(fbEvents) + sortEventsByID(otelEvents) + + // compare sorted events + for i := 0; i < len(fbEvents); i++ { + fbEvent := fbEvents[i] + otelEvent := otelEvents[i] + + oteltest.AssertMapsEqual(t, fbEvent.data, otelEvent.data, ignoredFields, + fmt.Sprintf("event comparison failed for ID %s (line %d)", fbEvent.id, i)) + + assert.Equal(t, "filebeat", otelEvent.data.Flatten()["agent.type"], "expected agent.type to be 'filebeat' in otel data") + assert.Equal(t, "filebeat", fbEvent.data.Flatten()["agent.type"], "expected agent.type to be 'filebeat' in filebeat data") + } +} + +func readAllEvents(filePath string) ([]eventWithID, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, err + } + defer file.Close() + + var events []eventWithID + scanner := bufio.NewScanner(file) + lineNumber := 0 + + for scanner.Scan() { + lineNumber++ + line := scanner.Text() + + // parse json line + outerData, err := parseJson(line) + if err != nil { + return nil, fmt.Errorf("failed to parse outer JSON at line %d: %w", lineNumber, err) + } + + // extract the message field + messageField, exists := outerData["message"] + if !exists { + return nil, fmt.Errorf("missing 'message' field at line %d", lineNumber) + } + + messageStr, ok := messageField.(string) + if !ok { + return nil, fmt.Errorf("'message' field is not a string at line %d", lineNumber) + } + + // parse original event + innerData, err := parseJson(messageStr) + if err != nil { + return nil, fmt.Errorf("failed to parse inner JSON from message field at line %d: %w", lineNumber, err) + } + + // extract original event id + id, _ := innerData["id"].(string) + if id == "" { + return nil, fmt.Errorf("missing or invalid ID field in inner JSON at line %d", lineNumber) + } + + events = append(events, eventWithID{ + id: id, + data: outerData, + }) + } + + if err := scanner.Err(); err != nil { + return nil, err + } + + return events, nil +} + +func sortEventsByID(events []eventWithID) { + sort.Slice(events, func(i, j int) bool { + return events[i].id < events[j].id + }) +} + +func checkURLHasContent(ct *assert.CollectT, url string) { + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) + if !assert.NoError(ct, err, "failed to create request for URL %s", url) { + return + } + + resp, err := http.DefaultClient.Do(req) + if !assert.NoError(ct, err, "URL %s should exist", url) { + return + } + defer resp.Body.Close() + + if !assert.Equal(ct, http.StatusOK, resp.StatusCode, "URL %s should return HTTP 200", url) { + return + } + + body, err := io.ReadAll(resp.Body) + if !assert.NoError(ct, err, "failed to read body from %s", url) { + return + } + + if !assert.Greater(ct, len(body), 0, "URL %s should have content", url) { + return + } +} + +func downloadToTestData(t *testing.T, url string, filename string) string { + // get http response + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) + require.NoError(t, err, "error creating request") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err, "calling nginx endpoint") + defer resp.Body.Close() + + // get path to current file + _, currentFile, _, ok := runtime.Caller(0) + require.True(t, ok, "failed to get current file path") + + // create testdata directory + filePath := filepath.Join(filepath.Dir(currentFile), "logstash", "testdata", filename) + err = os.MkdirAll(filepath.Dir(filePath), 0o755) + require.NoError(t, err, "failed to create testdata directory") + + // create file + file, err := os.Create(filePath) + require.NoError(t, err, "failed to create file %s", filePath) + defer file.Close() + + _, err = io.Copy(file, resp.Body) + require.NoError(t, err, "failed to copy data from %s", url) + + err = file.Sync() + require.NoError(t, err, "failed to sync file %s", filePath) + + return filePath +} diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index eac40156dc3d..31ce34a2e0b1 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -139,6 +139,37 @@ service: writeEventsToLogFile(t, logFilePath, numEvents) oteltestcol.New(t, fmt.Sprintf(otelCfgFile, logFilePath, tmpdir, otelMonitoringPort, fbOtelIndex)) +<<<<<<< HEAD +======= + beatsCfgFile := ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false + paths: + - %s +output: + elasticsearch: + hosts: + - localhost:9200 + username: admin + password: testing + index: %s +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +http.enabled: true +http.host: localhost +http.port: %d +` + +>>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) // start filebeat filebeat := integration.NewBeat( t, @@ -199,22 +230,193 @@ setup.template.pattern: logs-filebeat-default "agent.id", "log.file.inode", "log.file.path", - // only present in beats receivers - "agent.otelcol.component.id", - "agent.otelcol.component.kind", "log.file.device_id", // changes value between filebeat and otel receiver "container.id", // only present in filebeat } assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") - assert.Equal(t, "filebeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") - assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") - assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record") - assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record") + assert.Equal(t, "filebeat", otelDoc.Flatten()["agent.type"], "expected agent.type field to be 'filebeat' in otel docs") + assert.Equal(t, "filebeat", filebeatDoc.Flatten()["agent.type"], "expected agent.type field to be 'filebeat' in filebeat docs") assertMonitoring(t, otelMonitoringPort) } +<<<<<<< HEAD +======= +func TestFilebeatOTelHTTPJSONInput(t *testing.T) { + integration.EnsureESIsRunning(t) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + // create a random uuid and make sure it doesn't contain dashes/ + otelNamespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + fbNameSpace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + + type options struct { + Namespace string + ESURL string + Username string + Password string + } + + // The request url is a http mock server started using streams + configFile := ` +filebeat.inputs: + - type: httpjson + id: httpjson-e2e-otel + request.url: http://localhost:8090/test + +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: logs-integration-{{ .Namespace }} + +setup.template.enabled: false +queue.mem.flush.timeout: 0s +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + + otelConfigFile := `receivers: + filebeatreceiver: + filebeat: + inputs: + - type: httpjson + id: httpjson-e2e-otel + request.url: http://localhost:8090/test + processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ + queue.mem.flush.timeout: 0s + setup.template.enabled: false +exporters: + elasticsearch: + auth: + authenticator: beatsauth + compression: gzip + compression_params: + level: 1 + endpoints: + - {{ .ESURL }} + logs_index: logs-integration-{{ .Namespace }} + mapping: + mode: bodymap + max_conns_per_host: 1 + password: {{ .Password }} + retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 + sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 3200 + wait_for_result: true + user: {{ .Username }} +extensions: + beatsauth: + idle_connection_timeout: 3s + proxy_disable: false + timeout: 1m30s +service: + extensions: + - beatsauth + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - elasticsearch +` + + optionsValue := options{ + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + } + + var configBuffer bytes.Buffer + optionsValue.Namespace = otelNamespace + require.NoError(t, template.Must(template.New("config").Parse(otelConfigFile)).Execute(&configBuffer, optionsValue)) + oteltestcol.New(t, configBuffer.String()) + + // reset buffer + configBuffer.Reset() + + optionsValue.Namespace = fbNameSpace + require.NoError(t, template.Must(template.New("config").Parse(configFile)).Execute(&configBuffer, optionsValue)) + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + filebeat.WriteConfigFile(configBuffer.String()) + filebeat.Start() + + // prepare to query ES + es := integration.GetESClient(t, "http") + + rawQuery := map[string]any{ + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + // wait for logs to be published + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-logs-integration-"+otelNamespace+"*", es) + assert.NoError(ct, err) + + filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-logs-integration-"+fbNameSpace+"*", es) + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 otel event, got %d", otelDocs.Hits.Total.Value) + assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, 1, "expected at least 1 filebeat event, got %d", filebeatDocs.Hits.Total.Value) + }, + 2*time.Minute, 1*time.Second, "expected at least 1 event for both filebeat and otel") + + filebeatDoc := filebeatDocs.Hits.Hits[0].Source + otelDoc := otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "event.created", + } + + oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") +} + +>>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) func writeEventsToLogFile(t *testing.T, filename string, numEvents int) { t.Helper() logFile, err := os.Create(filename) @@ -667,3 +869,152 @@ service: }) } } +<<<<<<< HEAD +======= + +func TestFileBeatKerberos(t *testing.T) { + wantEvents := 1 + krbURL := "http://localhost:9203" // this is kerberos client - we've hardcoded the URL here + tempFile := t.TempDir() + // ES client + esCfg := elasticsearch.Config{ + Addresses: []string{krbURL}, + Username: "admin", + Password: "testing", + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec // this is only for testing + }, + }, + } + + es, err := elasticsearch.NewClient(esCfg) + require.NoError(t, err, "could not get elasticsearch client") + + setupRoleMapping(t, es) + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + filebeatIndex := "logs-filebeat.kerberos-" + namespace + + otelConfig := struct { + Index string + InputFile string + PathHome string + Endpoint string + }{ + Index: filebeatIndex, + InputFile: filepath.Join(tempFile, "log.log"), + PathHome: tempFile, + Endpoint: krbURL, + } + + cfg := `receivers: + filebeatreceiver/filestream: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + queue.mem.flush.timeout: 0s + management.otel.enabled: true + path.home: {{.PathHome}} +extensions: + beatsauth: + kerberos: + auth_type: "password" + config_path: "../../../../libbeat/outputs/elasticsearch/testdata/krb5.conf" + username: "beats" + password: "testing" + realm: "elastic" +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - {{.Endpoint}} + logs_index: {{.Index}} + mapping: + mode: bodymap + auth: + authenticator: beatsauth +service: + extensions: + - beatsauth + pipelines: + logs: + receivers: + - filebeatreceiver/filestream + exporters: + - elasticsearch/log + - debug +` + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + writeEventsToLogFile(t, otelConfig.InputFile, wantEvents) + oteltestcol.New(t, string(configContents)) + + // wait for logs to be published + require.EventuallyWithT(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*") + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d events, got %d", wantEvents, otelDocs.Hits.Total.Value) + }, + 2*time.Minute, 1*time.Second) +} + +// setupRoleMapping sets up role mapping for the Kerberos user beats@elastic +func setupRoleMapping(t *testing.T, client *elasticsearch.Client) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // prepare to query ES + roleMappingURL := "http://localhost:9203/_security/role_mapping/kerbrolemapping" + + body := map[string]interface{}{ + "roles": []string{"superuser"}, + "enabled": true, + "rules": map[string]interface{}{ + "field": map[string]interface{}{ + "username": "beats@elastic", + }, + }, + } + + jsonData, err := json.Marshal(body) + require.NoError(t, err, "could not marshal role mapping body to json") + + // Build request + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + roleMappingURL, + bytes.NewReader(jsonData)) + require.NoError(t, err, "could not create role mapping request") + + // Set content type header + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Perform(req) + require.NoError(t, err, "could not perform role mapping request") + defer resp.Body.Close() + + require.Equal(t, resp.StatusCode, http.StatusOK, "incorrect response code") +} +>>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go index ad776c757e4f..fb6370176b02 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go @@ -32,12 +32,15 @@ import ( const ( // esDocumentIDAttribute is the attribute key used to store the document ID in the log record. esDocumentIDAttribute = "elasticsearch.document_id" +<<<<<<< HEAD beatNameCtxKey = "beat_name" beatVersionCtxtKey = "beat_version" // otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event. otelComponentIDKey = "otelcol.component.id" // otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver". otelComponentKindKey = "otelcol.component.kind" +======= +>>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) ) func init() { @@ -149,15 +152,6 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) } logRecord.SetObservedTimestamp(observedTimestamp) - if agent, _ := beatEvent.GetValue("agent"); agent != nil { - switch agent := agent.(type) { - case mapstr.M: - agent[otelComponentIDKey] = out.beatInfo.ComponentID - agent[otelComponentKindKey] = "receiver" - beatEvent["agent"] = agent - } - } - otelmap.ConvertNonPrimitive(beatEvent) // if data_stream field is set on beatEvent. Add it to logrecord.Attributes to support dynamic indexing diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go index 420de1e8aca3..156700f62dfe 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -258,8 +258,13 @@ func TestPublish(t *testing.T) { batch := outest.NewBatch(event1) otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { cm := client.FromContext(ctx).Metadata +<<<<<<< HEAD assert.Equal(t, beatInfo.Beat, cm.Get(beatNameCtxKey)[0]) assert.Equal(t, beatInfo.Version, cm.Get(beatVersionCtxtKey)[0]) +======= + assert.Equal(t, beatInfo.Beat, cm.Get(otelctx.BeatNameCtxKey)[0]) + assert.Equal(t, beatInfo.Version, cm.Get(otelctx.BeatVersionCtxKey)[0]) +>>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) return nil }) @@ -268,6 +273,7 @@ func TestPublish(t *testing.T) { assert.Len(t, batch.Signals, 1) assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) }) +<<<<<<< HEAD t.Run("sets otel specific-fields", func(t *testing.T) { testCases := []struct { name string @@ -337,4 +343,6 @@ func TestPublish(t *testing.T) { }) } }) +======= +>>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) } diff --git a/x-pack/metricbeat/mbreceiver/receiver_test.go b/x-pack/metricbeat/mbreceiver/receiver_test.go index 9691c61cde9e..ee399a77aadc 100644 --- a/x-pack/metricbeat/mbreceiver/receiver_test.go +++ b/x-pack/metricbeat/mbreceiver/receiver_test.go @@ -87,8 +87,8 @@ func TestNewReceiver(t *testing.T) { require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 }, "expected at least one ingest log, got logs: %v", logs["r1"]) - assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") - assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.Equal(c, "metricbeat", logs["r1"][0].Flatten()["agent.type"], "expected agent.type field in to be 'metricbeat'") + var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket, "stats") @@ -204,10 +204,25 @@ func TestMultipleReceivers(t *testing.T) { require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 && len(logs["r2"]) > 0 }, "expected at least one ingest log for each receiver, got logs: %v", logs) +<<<<<<< HEAD assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") assert.Equal(c, "metricbeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected otelcol.component.kind field in r2 log record") +======= + assert.Equal(c, "metricbeat", logs["r1"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r1") + assert.Equal(c, "metricbeat", logs["r2"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r2") + + // Make sure that each receiver has a separate logger + // instance and does not interfere with others. Previously, the + // logger in Beats was global, causing logger fields to be + // overwritten when multiple receivers started in the same process. + r1StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "metricbeatreceiver/r1")) + assert.Equal(c, 1, r1StartLogs.Len(), "r1 should have a single start log") + r2StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "metricbeatreceiver/r2")) + assert.Equal(c, 1, r2StartLogs.Len(), "r2 should have a single start log") + +>>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) var lastError strings.Builder assert.Conditionf(c, func() bool { tests := []string{monitorSocket1, monitorSocket2} diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go index 80afc7433684..faa25addf5dc 100644 --- a/x-pack/metricbeat/tests/integration/otel_test.go +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -123,7 +123,7 @@ service: oteltestcol.New(t, configBuffer.String()) - var beatsCfgFile = ` + beatsCfgFile := ` metricbeat: modules: - module: system @@ -206,16 +206,9 @@ http.port: {{.MonitoringPort}} var metricbeatDoc, otelDoc mapstr.M otelDoc = otelDocs.Hits.Hits[0].Source metricbeatDoc = metricbeatDocs.Hits.Hits[0].Source - ignoredFields := []string{ - // only present in beats receivers - "agent.otelcol.component.id", - "agent.otelcol.component.kind", - } - assert.Equal(t, "metricbeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") - assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") - assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in metricbeat log record") - assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in metricbeat log record") - assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, ignoredFields, "expected documents keys to be equal") + assert.Equal(t, "metricbeat", otelDoc.Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in otel docs") + assert.Equal(t, "metricbeat", metricbeatDoc.Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in metricbeat docs") + assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, nil, "expected documents keys to be equal") assertMonitoring(t, metricbeatMonitoringPort) } @@ -546,12 +539,7 @@ service: assert.GreaterOrEqualf(ct, r1Docs.Hits.Total.Value, 1, "expected at least 1 log for receiver 1, got %d", r1Docs.Hits.Total.Value) }, 1*time.Minute, 100*time.Millisecond, "expected at least 1 log for each receiver") - ignoredFields := []string{ - // only present in beats receivers - "agent.otelcol.component.id", - "agent.otelcol.component.kind", - } - assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, ignoredFields, "expected documents keys to be equal") + assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, nil, "expected documents keys to be equal") for _, rec := range otelConfig.Receivers { assertMonitoring(t, rec.MonitoringPort) } From a704d3afbf9d63fc8270d271560da329dcda0b3b Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 25 Nov 2025 15:36:36 -0600 Subject: [PATCH 2/3] fix merge conflicts --- x-pack/filebeat/input/gcppubsub/otel_test.go | 181 ------- .../tests/integration/otel_lsexporter_test.go | 501 ------------------ .../filebeat/tests/integration/otel_test.go | 360 +------------ .../outputs/otelconsumer/otelconsumer.go | 7 - .../outputs/otelconsumer/otelconsumer_test.go | 77 --- x-pack/metricbeat/mbreceiver/receiver_test.go | 7 - 6 files changed, 2 insertions(+), 1131 deletions(-) delete mode 100644 x-pack/filebeat/input/gcppubsub/otel_test.go delete mode 100644 x-pack/filebeat/tests/integration/otel_lsexporter_test.go diff --git a/x-pack/filebeat/input/gcppubsub/otel_test.go b/x-pack/filebeat/input/gcppubsub/otel_test.go deleted file mode 100644 index ea2d3ded6068..000000000000 --- a/x-pack/filebeat/input/gcppubsub/otel_test.go +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration && !agentbeat - -package gcppubsub - -import ( - "bytes" - "context" - "fmt" - "testing" - "text/template" - "time" - - "github.com/gofrs/uuid/v5" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest" - "github.com/elastic/beats/v7/libbeat/tests/integration" - - "github.com/elastic/elastic-agent-libs/testing/estools" -) - -func TestGCPInputOTelE2E(t *testing.T) { - integration.EnsureESIsRunning(t) - - // Create pubsub client for setting up and communicating to emulator. - client, clientCancel := testSetup(t) - defer func() { - clientCancel() - client.Close() - }() - - createTopic(t, client) - createSubscription(t, "test-subscription-otel", client) - createSubscription(t, "test-subscription-fb", client) - const numMsgs = 10 - publishMessages(t, client, numMsgs) - - host := integration.GetESURL(t, "http") - user := host.User.Username() - password, _ := host.User.Password() - - // create a random uuid and make sure it doesn't contain dashes/ - otelNamespace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4())) - fbNameSpace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4())) - - otelIndex := "logs-integration-" + otelNamespace - fbIndex := "logs-integration-" + fbNameSpace - - type options struct { - Namespace string - ESURL string - Username string - Password string - Subscription string - } - - gcpConfig := `filebeat.inputs: -- type: gcp-pubsub - project_id: test-project-id - topic: test-topic-foo - subscription.name: {{ .Subscription }} - credentials_file: "testdata/fake.json" - -output: - elasticsearch: - hosts: - - {{ .ESURL }} - username: {{ .Username }} - password: {{ .Password }} - index: logs-integration-{{ .Namespace }} - -queue.mem.flush.timeout: 0s -setup.template.enabled: false -processors: - - add_host_metadata: ~ - - add_cloud_metadata: ~ - - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ -` - - // start filebeat in otel mode - filebeatOTel := integration.NewBeat( - t, - "filebeat-otel", - "../../filebeat.test", - "otel", - ) - - optionsValue := options{ - ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), - Username: user, - Password: password, - } - - var configBuffer bytes.Buffer - optionsValue.Namespace = otelNamespace - optionsValue.Subscription = "test-subscription-otel" - require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue)) - - filebeatOTel.WriteConfigFile(configBuffer.String()) - - filebeatOTel.Start() - defer filebeatOTel.Stop() - - // reset buffer - configBuffer.Reset() - - optionsValue.Namespace = fbNameSpace - optionsValue.Subscription = "test-subscription-fb" - require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue)) - - // start filebeat - filebeat := integration.NewBeat( - t, - "filebeat", - "../../filebeat.test", - ) - - filebeat.WriteConfigFile(configBuffer.String()) - filebeat.Start() - defer filebeat.Stop() - - // prepare to query ES - es := integration.GetESClient(t, "http") - - t.Cleanup(func() { - _, err := es.Indices.DeleteDataStream([]string{ - otelIndex, - fbIndex, - }) - require.NoError(t, err, "failed to delete indices") - }) - - rawQuery := map[string]any{ - "query": map[string]any{ - "match_phrase": map[string]any{ - "input.type": "gcp-pubsub", - }, - }, - "sort": []map[string]any{ - {"@timestamp": map[string]any{"order": "asc"}}, - }, - } - - var filebeatDocs estools.Documents - var otelDocs estools.Documents - var err error - - // wait for logs to be published - require.EventuallyWithTf(t, - func(ct *assert.CollectT) { - findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer findCancel() - - otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+otelIndex+"*", es) - assert.NoError(ct, err) - assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 otel document, got %d", otelDocs.Hits.Total.Value) - - filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+fbIndex+"*", es) - assert.NoError(ct, err) - assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, 1, "expected at least 1 filebeat document, got %d", filebeatDocs.Hits.Total.Value) - }, - 3*time.Minute, 1*time.Second, "expected at least 1 document for both filebeat and otel modes") - - filebeatDoc := filebeatDocs.Hits.Hits[0].Source - otelDoc := otelDocs.Hits.Hits[0].Source - ignoredFields := []string{ - // Expected to change between agentDocs and OtelDocs - "@timestamp", - "agent.ephemeral_id", - "agent.id", - "event.created", - } - - oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") -} diff --git a/x-pack/filebeat/tests/integration/otel_lsexporter_test.go b/x-pack/filebeat/tests/integration/otel_lsexporter_test.go deleted file mode 100644 index cb673ada68b8..000000000000 --- a/x-pack/filebeat/tests/integration/otel_lsexporter_test.go +++ /dev/null @@ -1,501 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration && !agentbeat - -package integration - -import ( - "bufio" - "context" - "encoding/json" - "fmt" - "io" - "math" - "math/rand/v2" - "net/http" - "os" - "path/filepath" - "runtime" - "sort" - "testing" - "time" - - "github.com/brianvoe/gofakeit" - "github.com/gofrs/uuid/v5" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest" - "github.com/elastic/beats/v7/libbeat/tests/integration" - "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/oteltestcol" - "github.com/elastic/elastic-agent-libs/mapstr" -) - -type event struct { - ID string `json:"id"` - Timestamp string `json:"timestamp"` - StringField string `json:"string_field"` - NumberField int `json:"number_field"` - FloatField float64 `json:"float_field"` - BooleanField bool `json:"boolean_field"` - ArrayField []interface{} `json:"array_field"` - ObjectField map[string]interface{} `json:"object_field"` - KVField map[string]interface{} `json:"kv_field"` -} - -type eventWithID struct { - id string - data mapstr.M -} - -// TestDataShapeOTelVSClassicE2E verifies that the event data shape of filebeat in otel mode is the same as filebeat. -// Two Filebeat instances are started: -// -// one for classic mode sending to Logstash on port 5044 and -// one for Otel mode sending to Logstash on port 5055. -// -// Logstash runs two pipelines listening on those ports and writes the resulting events into a shared Docker volume. -// Nginx container serves that volume over HTTP so the test can fetch the generated files without relying on host filesystem permissions. -// Finally, the test downloads both files to ./tests/integration/logstash/testdata and compares the sorted events line by line. -func TestDataShapeOTelVSClassicE2E(t *testing.T) { - // ensure the size of events is big enough (1024b) for filebeat to ingest - numEvents := 3 - - // Agent does not support `index` setting, while beats does. - // Our focus is on agent classic vs otel mode comparison, so we do not test `index` for filebeat - beatsCfgFile := ` -filebeat.inputs: - - type: filestream - id: filestream-input-id - enabled: true - paths: - - %s -output.logstash: - hosts: - - "localhost:%s" - pipelining: 0 - worker: 1 -queue.mem.flush.timeout: 0s -processors: - - add_host_metadata: ~ - - add_fields: - target: "" - fields: - testcase: %s -` - testCaseName := uuid.Must(uuid.NewV4()).String() - events := generateEvents(numEvents) - - tmpdir := t.TempDir() - inputFilePath := filepath.Join(tmpdir, "event.json") - writeEvents(t, inputFilePath, events) - - otelConfig := fmt.Sprintf(`receivers: - filebeatreceiver: - filebeat: - inputs: - - type: filestream - id: filestream-input-id - enabled: true - paths: - - %s - prospector.scanner.fingerprint.enabled: false - file_identity.native: ~ - processors: - - add_host_metadata: ~ - - add_fields: - target: "" - fields: - testcase: %s - queue.mem.flush.timeout: 0s - setup.template.enabled: false - path.home: %s -exporters: - logstash: - hosts: - - "localhost:5055" - tls: - insecure: true -service: - pipelines: - logs: - receivers: - - filebeatreceiver - exporters: - - logstash - telemetry: - metrics: - level: none -`, inputFilePath, testCaseName, tmpdir) - - // Start OTel collector with filebeatreceiver - oteltestcol.New(t, otelConfig) - - // start filebeat - filebeat := integration.NewBeat( - t, - "filebeat", - "../../filebeat.test", - ) - - inputFilePath = filepath.Join(filebeat.TempDir(), "event.json") - writeEvents(t, inputFilePath, events) - - filebeat.WriteConfigFile(fmt.Sprintf(beatsCfgFile, inputFilePath, "5044", testCaseName)) - filebeat.Start() - defer filebeat.Stop() - - // Nginx endpoint URLs - baseURL := "http://localhost:8082" - outFileURL := fmt.Sprintf("%s/%s_fb.json", baseURL, testCaseName) - outOTelFileURL := fmt.Sprintf("%s/%s_otel.json", baseURL, testCaseName) - - // wait for logs to be published over HTTP - require.EventuallyWithTf(t, - func(ct *assert.CollectT) { - for _, url := range []string{outFileURL, outOTelFileURL} { - checkURLHasContent(ct, url) - } - }, - 30*time.Second, 1*time.Second, "expected Nginx to serve json files over HTTP") - - // download files from Nginx into testdata directory - fbFilePath := downloadToTestData(t, outFileURL, fmt.Sprintf("%s_fb.json", testCaseName)) - otelFilePath := downloadToTestData(t, outOTelFileURL, fmt.Sprintf("%s_otel.json", testCaseName)) - - ignoredFields := []string{ - // Expected to change between agentDocs and OtelDocs - "@timestamp", - "agent.ephemeral_id", - "agent.id", - "log.file.inode", - "log.file.path", - // only present in beats receivers - "log.file.device_id", - "log.file.fingerprint", - } - - compareOutputFiles(t, fbFilePath, otelFilePath, ignoredFields) -} - -// TestLogstashExporterProxyURL verifies that Filebeat OTel mode can send data to Logstash via a SOCKS5 proxy. -// Filebeat otel mode sends events to "logstash" via a socks5-proxy container running on localhost:1080 -func TestLogstashExporterProxyURL(t *testing.T) { - // ensure the size of events is big enough - numEvents := 3 - - testCaseName := uuid.Must(uuid.NewV4()).String() - events := generateEvents(numEvents) - - // Create OTel collector configuration with filebeatreceiver - tmpdir := t.TempDir() - inputFilePath := filepath.Join(tmpdir, "event.json") - writeEvents(t, inputFilePath, events) - - otelConfig := fmt.Sprintf(`receivers: - filebeatreceiver: - filebeat: - inputs: - - type: filestream - id: filestream-input-id - enabled: true - paths: - - %s - prospector.scanner.fingerprint.enabled: false - file_identity.native: ~ - processors: - - add_host_metadata: ~ - - add_fields: - target: "" - fields: - testcase: %s - queue.mem.flush.timeout: 0s - setup.template.enabled: false - path.home: %s -exporters: - logstash: - hosts: - - "logstash:5055" - ttl: 0s - proxy_url: "socks5://elastic:changeme@localhost:1080" - proxy_use_local_resolver: false - worker: 1 - workers: 0 - max_retries: 3 -service: - pipelines: - logs: - receivers: - - filebeatreceiver - exporters: - - logstash - telemetry: - metrics: - level: none -`, inputFilePath, testCaseName, tmpdir) - - // Start OTel collector with filebeatreceiver - oteltestcol.New(t, otelConfig) - - // Nginx endpoint URLs - baseURL := "http://localhost:8082" - outOTelFileURL := fmt.Sprintf("%s/%s_otel.json", baseURL, testCaseName) - - // Logstash is outputting to a file inside its container, to access - // this file we use Nginx to serve the output folder via HTTP - // (see docker-compose.yml for Logstash and Nginx configuration). - // Wait to ensure the file can be downloaded via HTTP, the file - // being available indicates that Filebeat successfully sent data - // to Logstash - require.EventuallyWithTf(t, - func(ct *assert.CollectT) { - for _, url := range []string{outOTelFileURL} { - checkURLHasContent(ct, url) - } - }, - 30*time.Second, - 1*time.Second, - "did not find Logstash output file served via Nginx") - - // download files from Nginx - otelFilePath := downloadToTestData(t, outOTelFileURL, fmt.Sprintf("%s_otel.json", testCaseName)) - otelEvents, err := readAllEvents(otelFilePath) - - require.NoError(t, err, "failed to read otel events") - require.Equal(t, numEvents, len(otelEvents), - "different number of events: sent=%d, get=%d", numEvents, len(otelEvents)) -} - -func generateEvents(numEvents int) []string { - gofakeit.Seed(time.Now().UnixNano()) - - events := make([]string, 0, numEvents) - for i := 0; i < numEvents; i++ { - // Generate mixed-type array field - arrayField := make([]interface{}, 9) - arrayField[0] = gofakeit.Word() - arrayField[1] = gofakeit.Int64() - arrayField[2] = gofakeit.Float64() - arrayField[3] = rand.IntN(2) == 0 // bool - arrayField[4] = gofakeit.Name() - arrayField[5] = math.MaxInt - arrayField[6] = math.MinInt - arrayField[7] = math.MaxFloat64 - arrayField[8] = math.SmallestNonzeroFloat64 - - kvArrayField := make([]interface{}, 4) - kvArrayField[0] = gofakeit.Color() - kvArrayField[1] = gofakeit.Number(-100, 100) - kvArrayField[2] = gofakeit.Float32Range(0, 50) - kvArrayField[3] = rand.IntN(2) == 0 // bool - - ev := event{ - ID: uuid.Must(uuid.NewV4()).String(), - Timestamp: time.Now().Format(time.RFC3339Nano), - StringField: gofakeit.Sentence(2), - NumberField: rand.IntN(1000), - FloatField: rand.Float64() * 100, - BooleanField: rand.IntN(2) == 0, - ArrayField: arrayField, - ObjectField: map[string]interface{}{ - "nested_key": "nested_value", - "nested_number": gofakeit.Number(1, 1000), - }, - KVField: map[string]interface{}{ - "key_string": gofakeit.Word(), - "key_number": gofakeit.Number(1, 5000), - "key_bool": rand.IntN(2) == 0, - "key_array": kvArrayField, - "key_object": map[string]interface{}{ - "inner1": rand.IntN(2) == 0, - "inner2": gofakeit.Float64Range(0, 10), - "inner_obj": map[string]interface{}{ - "deep_key": gofakeit.HipsterSentence(3), - "deep_arr": kvArrayField, - }, - }, - }, - } - - b, _ := json.Marshal(ev) - events = append(events, string(b)) - } - return events -} - -func writeEvents(t *testing.T, filepath string, events []string) { - f, err := os.Create(filepath) - if err != nil { - t.Fatalf("cannot create file '%s': %s", filepath, err) - } - - for _, event := range events { - if _, err := f.WriteString(event + "\n"); err != nil { - t.Fatalf("cannot write log file '%s': %s", filepath, err) - } - } - - if err := f.Sync(); err != nil { - t.Errorf("cannot sync %q: %s", filepath, err) - } - if err := f.Close(); err != nil { - t.Errorf("cannot close %q: %s", filepath, err) - } -} - -func parseJson(jsonStr string) (mapstr.M, error) { - var data map[string]interface{} - if err := json.Unmarshal([]byte(jsonStr), &data); err != nil { - return nil, err - } - return data, nil -} - -func compareOutputFiles(t *testing.T, fbFilePath, otelFilePath string, ignoredFields []string) { - fbEvents, err := readAllEvents(fbFilePath) - require.NoError(t, err, "failed to read filebeat events") - - otelEvents, err := readAllEvents(otelFilePath) - require.NoError(t, err, "failed to read otel events") - - require.Equal(t, len(fbEvents), len(otelEvents), - "different number of events: filebeat=%d, otel=%d", len(fbEvents), len(otelEvents)) - - sortEventsByID(fbEvents) - sortEventsByID(otelEvents) - - // compare sorted events - for i := 0; i < len(fbEvents); i++ { - fbEvent := fbEvents[i] - otelEvent := otelEvents[i] - - oteltest.AssertMapsEqual(t, fbEvent.data, otelEvent.data, ignoredFields, - fmt.Sprintf("event comparison failed for ID %s (line %d)", fbEvent.id, i)) - - assert.Equal(t, "filebeat", otelEvent.data.Flatten()["agent.type"], "expected agent.type to be 'filebeat' in otel data") - assert.Equal(t, "filebeat", fbEvent.data.Flatten()["agent.type"], "expected agent.type to be 'filebeat' in filebeat data") - } -} - -func readAllEvents(filePath string) ([]eventWithID, error) { - file, err := os.Open(filePath) - if err != nil { - return nil, err - } - defer file.Close() - - var events []eventWithID - scanner := bufio.NewScanner(file) - lineNumber := 0 - - for scanner.Scan() { - lineNumber++ - line := scanner.Text() - - // parse json line - outerData, err := parseJson(line) - if err != nil { - return nil, fmt.Errorf("failed to parse outer JSON at line %d: %w", lineNumber, err) - } - - // extract the message field - messageField, exists := outerData["message"] - if !exists { - return nil, fmt.Errorf("missing 'message' field at line %d", lineNumber) - } - - messageStr, ok := messageField.(string) - if !ok { - return nil, fmt.Errorf("'message' field is not a string at line %d", lineNumber) - } - - // parse original event - innerData, err := parseJson(messageStr) - if err != nil { - return nil, fmt.Errorf("failed to parse inner JSON from message field at line %d: %w", lineNumber, err) - } - - // extract original event id - id, _ := innerData["id"].(string) - if id == "" { - return nil, fmt.Errorf("missing or invalid ID field in inner JSON at line %d", lineNumber) - } - - events = append(events, eventWithID{ - id: id, - data: outerData, - }) - } - - if err := scanner.Err(); err != nil { - return nil, err - } - - return events, nil -} - -func sortEventsByID(events []eventWithID) { - sort.Slice(events, func(i, j int) bool { - return events[i].id < events[j].id - }) -} - -func checkURLHasContent(ct *assert.CollectT, url string) { - req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) - if !assert.NoError(ct, err, "failed to create request for URL %s", url) { - return - } - - resp, err := http.DefaultClient.Do(req) - if !assert.NoError(ct, err, "URL %s should exist", url) { - return - } - defer resp.Body.Close() - - if !assert.Equal(ct, http.StatusOK, resp.StatusCode, "URL %s should return HTTP 200", url) { - return - } - - body, err := io.ReadAll(resp.Body) - if !assert.NoError(ct, err, "failed to read body from %s", url) { - return - } - - if !assert.Greater(ct, len(body), 0, "URL %s should have content", url) { - return - } -} - -func downloadToTestData(t *testing.T, url string, filename string) string { - // get http response - req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) - require.NoError(t, err, "error creating request") - - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err, "calling nginx endpoint") - defer resp.Body.Close() - - // get path to current file - _, currentFile, _, ok := runtime.Caller(0) - require.True(t, ok, "failed to get current file path") - - // create testdata directory - filePath := filepath.Join(filepath.Dir(currentFile), "logstash", "testdata", filename) - err = os.MkdirAll(filepath.Dir(filePath), 0o755) - require.NoError(t, err, "failed to create testdata directory") - - // create file - file, err := os.Create(filePath) - require.NoError(t, err, "failed to create file %s", filePath) - defer file.Close() - - _, err = io.Copy(file, resp.Body) - require.NoError(t, err, "failed to copy data from %s", url) - - err = file.Sync() - require.NoError(t, err, "failed to sync file %s", filePath) - - return filePath -} diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 31ce34a2e0b1..c9298985e29a 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -52,7 +52,7 @@ func TestFilebeatOTelE2E(t *testing.T) { otelMonitoringPort := int(libbeattesting.MustAvailableTCP4Port(t)) filebeatMonitoringPort := int(libbeattesting.MustAvailableTCP4Port(t)) - var beatsCfgFile = ` + beatsCfgFile := ` filebeat.inputs: - type: filestream id: filestream-input-id @@ -139,37 +139,6 @@ service: writeEventsToLogFile(t, logFilePath, numEvents) oteltestcol.New(t, fmt.Sprintf(otelCfgFile, logFilePath, tmpdir, otelMonitoringPort, fbOtelIndex)) -<<<<<<< HEAD -======= - beatsCfgFile := ` -filebeat.inputs: - - type: filestream - id: filestream-input-id - enabled: true - file_identity.native: ~ - prospector.scanner.fingerprint.enabled: false - paths: - - %s -output: - elasticsearch: - hosts: - - localhost:9200 - username: admin - password: testing - index: %s -queue.mem.flush.timeout: 0s -setup.template.enabled: false -processors: - - add_host_metadata: ~ - - add_cloud_metadata: ~ - - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ -http.enabled: true -http.host: localhost -http.port: %d -` - ->>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) // start filebeat filebeat := integration.NewBeat( t, @@ -241,182 +210,6 @@ setup.template.pattern: logs-filebeat-default assertMonitoring(t, otelMonitoringPort) } -<<<<<<< HEAD -======= -func TestFilebeatOTelHTTPJSONInput(t *testing.T) { - integration.EnsureESIsRunning(t) - - host := integration.GetESURL(t, "http") - user := host.User.Username() - password, _ := host.User.Password() - - // create a random uuid and make sure it doesn't contain dashes/ - otelNamespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") - fbNameSpace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") - - type options struct { - Namespace string - ESURL string - Username string - Password string - } - - // The request url is a http mock server started using streams - configFile := ` -filebeat.inputs: - - type: httpjson - id: httpjson-e2e-otel - request.url: http://localhost:8090/test - -output: - elasticsearch: - hosts: - - {{ .ESURL }} - username: {{ .Username }} - password: {{ .Password }} - index: logs-integration-{{ .Namespace }} - -setup.template.enabled: false -queue.mem.flush.timeout: 0s -processors: - - add_host_metadata: ~ - - add_cloud_metadata: ~ - - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ -` - - otelConfigFile := `receivers: - filebeatreceiver: - filebeat: - inputs: - - type: httpjson - id: httpjson-e2e-otel - request.url: http://localhost:8090/test - processors: - - add_host_metadata: ~ - - add_cloud_metadata: ~ - - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ - queue.mem.flush.timeout: 0s - setup.template.enabled: false -exporters: - elasticsearch: - auth: - authenticator: beatsauth - compression: gzip - compression_params: - level: 1 - endpoints: - - {{ .ESURL }} - logs_index: logs-integration-{{ .Namespace }} - mapping: - mode: bodymap - max_conns_per_host: 1 - password: {{ .Password }} - retry: - enabled: true - initial_interval: 1s - max_interval: 1m0s - max_retries: 3 - sending_queue: - batch: - flush_timeout: 10s - max_size: 1600 - min_size: 0 - sizer: items - block_on_overflow: true - enabled: true - num_consumers: 1 - queue_size: 3200 - wait_for_result: true - user: {{ .Username }} -extensions: - beatsauth: - idle_connection_timeout: 3s - proxy_disable: false - timeout: 1m30s -service: - extensions: - - beatsauth - pipelines: - logs: - receivers: - - filebeatreceiver - exporters: - - elasticsearch -` - - optionsValue := options{ - ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), - Username: user, - Password: password, - } - - var configBuffer bytes.Buffer - optionsValue.Namespace = otelNamespace - require.NoError(t, template.Must(template.New("config").Parse(otelConfigFile)).Execute(&configBuffer, optionsValue)) - oteltestcol.New(t, configBuffer.String()) - - // reset buffer - configBuffer.Reset() - - optionsValue.Namespace = fbNameSpace - require.NoError(t, template.Must(template.New("config").Parse(configFile)).Execute(&configBuffer, optionsValue)) - - // start filebeat - filebeat := integration.NewBeat( - t, - "filebeat", - "../../filebeat.test", - ) - - filebeat.WriteConfigFile(configBuffer.String()) - filebeat.Start() - - // prepare to query ES - es := integration.GetESClient(t, "http") - - rawQuery := map[string]any{ - "sort": []map[string]any{ - {"@timestamp": map[string]any{"order": "asc"}}, - }, - } - - var filebeatDocs estools.Documents - var otelDocs estools.Documents - var err error - - // wait for logs to be published - require.EventuallyWithTf(t, - func(ct *assert.CollectT) { - findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer findCancel() - - otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-logs-integration-"+otelNamespace+"*", es) - assert.NoError(ct, err) - - filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-logs-integration-"+fbNameSpace+"*", es) - assert.NoError(ct, err) - - assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 otel event, got %d", otelDocs.Hits.Total.Value) - assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, 1, "expected at least 1 filebeat event, got %d", filebeatDocs.Hits.Total.Value) - }, - 2*time.Minute, 1*time.Second, "expected at least 1 event for both filebeat and otel") - - filebeatDoc := filebeatDocs.Hits.Hits[0].Source - otelDoc := otelDocs.Hits.Hits[0].Source - ignoredFields := []string{ - // Expected to change between agentDocs and OtelDocs - "@timestamp", - "agent.ephemeral_id", - "agent.id", - "event.created", - } - - oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") -} - ->>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) func writeEventsToLogFile(t *testing.T, filename string, numEvents int) { t.Helper() logFile, err := os.Create(filename) @@ -480,7 +273,7 @@ func TestFilebeatOTelInspect(t *testing.T) { "otel", ) - var beatsCfgFile = ` + beatsCfgFile := ` filebeat.inputs: - type: filestream id: filestream-input-id @@ -869,152 +662,3 @@ service: }) } } -<<<<<<< HEAD -======= - -func TestFileBeatKerberos(t *testing.T) { - wantEvents := 1 - krbURL := "http://localhost:9203" // this is kerberos client - we've hardcoded the URL here - tempFile := t.TempDir() - // ES client - esCfg := elasticsearch.Config{ - Addresses: []string{krbURL}, - Username: "admin", - Password: "testing", - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, //nolint:gosec // this is only for testing - }, - }, - } - - es, err := elasticsearch.NewClient(esCfg) - require.NoError(t, err, "could not get elasticsearch client") - - setupRoleMapping(t, es) - - namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") - filebeatIndex := "logs-filebeat.kerberos-" + namespace - - otelConfig := struct { - Index string - InputFile string - PathHome string - Endpoint string - }{ - Index: filebeatIndex, - InputFile: filepath.Join(tempFile, "log.log"), - PathHome: tempFile, - Endpoint: krbURL, - } - - cfg := `receivers: - filebeatreceiver/filestream: - filebeat: - inputs: - - type: filestream - id: filestream-fbreceiver - enabled: true - paths: - - {{.InputFile}} - prospector.scanner.fingerprint.enabled: false - file_identity.native: ~ - queue.mem.flush.timeout: 0s - management.otel.enabled: true - path.home: {{.PathHome}} -extensions: - beatsauth: - kerberos: - auth_type: "password" - config_path: "../../../../libbeat/outputs/elasticsearch/testdata/krb5.conf" - username: "beats" - password: "testing" - realm: "elastic" -exporters: - debug: - use_internal_logger: false - verbosity: detailed - elasticsearch/log: - endpoints: - - {{.Endpoint}} - logs_index: {{.Index}} - mapping: - mode: bodymap - auth: - authenticator: beatsauth -service: - extensions: - - beatsauth - pipelines: - logs: - receivers: - - filebeatreceiver/filestream - exporters: - - elasticsearch/log - - debug -` - - var configBuffer bytes.Buffer - require.NoError(t, - template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) - configContents := configBuffer.Bytes() - t.Cleanup(func() { - if t.Failed() { - t.Logf("Config contents:\n%s", configContents) - } - }) - - writeEventsToLogFile(t, otelConfig.InputFile, wantEvents) - oteltestcol.New(t, string(configContents)) - - // wait for logs to be published - require.EventuallyWithT(t, - func(ct *assert.CollectT) { - findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer findCancel() - - otelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*") - assert.NoError(ct, err) - - assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d events, got %d", wantEvents, otelDocs.Hits.Total.Value) - }, - 2*time.Minute, 1*time.Second) -} - -// setupRoleMapping sets up role mapping for the Kerberos user beats@elastic -func setupRoleMapping(t *testing.T, client *elasticsearch.Client) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // prepare to query ES - roleMappingURL := "http://localhost:9203/_security/role_mapping/kerbrolemapping" - - body := map[string]interface{}{ - "roles": []string{"superuser"}, - "enabled": true, - "rules": map[string]interface{}{ - "field": map[string]interface{}{ - "username": "beats@elastic", - }, - }, - } - - jsonData, err := json.Marshal(body) - require.NoError(t, err, "could not marshal role mapping body to json") - - // Build request - req, err := http.NewRequestWithContext(ctx, http.MethodPost, - roleMappingURL, - bytes.NewReader(jsonData)) - require.NoError(t, err, "could not create role mapping request") - - // Set content type header - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Perform(req) - require.NoError(t, err, "could not perform role mapping request") - defer resp.Body.Close() - - require.Equal(t, resp.StatusCode, http.StatusOK, "incorrect response code") -} ->>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go index fb6370176b02..fcc4131c7539 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go @@ -32,15 +32,8 @@ import ( const ( // esDocumentIDAttribute is the attribute key used to store the document ID in the log record. esDocumentIDAttribute = "elasticsearch.document_id" -<<<<<<< HEAD beatNameCtxKey = "beat_name" beatVersionCtxtKey = "beat_version" - // otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event. - otelComponentIDKey = "otelcol.component.id" - // otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver". - otelComponentKindKey = "otelcol.component.kind" -======= ->>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) ) func init() { diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go index 156700f62dfe..beadfac61098 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -258,13 +258,8 @@ func TestPublish(t *testing.T) { batch := outest.NewBatch(event1) otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { cm := client.FromContext(ctx).Metadata -<<<<<<< HEAD assert.Equal(t, beatInfo.Beat, cm.Get(beatNameCtxKey)[0]) assert.Equal(t, beatInfo.Version, cm.Get(beatVersionCtxtKey)[0]) -======= - assert.Equal(t, beatInfo.Beat, cm.Get(otelctx.BeatNameCtxKey)[0]) - assert.Equal(t, beatInfo.Version, cm.Get(otelctx.BeatVersionCtxKey)[0]) ->>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) return nil }) @@ -273,76 +268,4 @@ func TestPublish(t *testing.T) { assert.Len(t, batch.Signals, 1) assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) }) -<<<<<<< HEAD - t.Run("sets otel specific-fields", func(t *testing.T) { - testCases := []struct { - name string - componentID string - componentKind string - expectedComponentID string - expectedComponentKind string - }{ - { - name: "sets beat component ID", - componentID: "filebeatreceiver/1", - expectedComponentID: "filebeatreceiver/1", - expectedComponentKind: "receiver", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - event := beat.Event{ - Fields: mapstr.M{ - "field": 1, - "agent": mapstr.M{}, - }, - Meta: mapstr.M{ - "_id": "abc123", - }, - } - ch := make(chan plog.Logs, 1) - batch := outest.NewBatch(event) - var countLogs int - otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { - countLogs = countLogs + ld.LogRecordCount() - ch <- ld - return nil - }) - otelConsumer.beatInfo.ComponentID = tc.componentID - err := otelConsumer.Publish(ctx, batch) - assert.NoError(t, err) - assert.Len(t, batch.Signals, 1) - assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) - assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed") - log := <-ch - for i := 0; i < log.ResourceLogs().Len(); i++ { - resourceLog := log.ResourceLogs().At(i) - for j := 0; j < resourceLog.ScopeLogs().Len(); j++ { - scopeLog := resourceLog.ScopeLogs().At(j) - for k := 0; k < scopeLog.LogRecords().Len(); k++ { - logRecord := scopeLog.LogRecords().At(k) - body := logRecord.Body().Map() - - // Traverse nested "agent.otelcol.component" structure - agentVal, ok := body.Get("agent") - require.True(t, ok, "expected 'agent' in log body") - - agentMap := agentVal.Map() - idVal, ok := agentMap.Get("otelcol.component.id") - require.True(t, ok, "expected 'agent.otelcol.component.id' in log body") - assert.Equal(t, tc.expectedComponentID, idVal.AsString()) - - kindVal, ok := agentMap.Get("otelcol.component.kind") - require.True(t, ok, "expected 'agent.otelcol.component.kind' in log body") - assert.Equal(t, tc.expectedComponentKind, kindVal.AsString()) - } - } - } - - }) - } - }) -======= ->>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) } diff --git a/x-pack/metricbeat/mbreceiver/receiver_test.go b/x-pack/metricbeat/mbreceiver/receiver_test.go index ee399a77aadc..af94362543b4 100644 --- a/x-pack/metricbeat/mbreceiver/receiver_test.go +++ b/x-pack/metricbeat/mbreceiver/receiver_test.go @@ -204,12 +204,6 @@ func TestMultipleReceivers(t *testing.T) { require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 && len(logs["r2"]) > 0 }, "expected at least one ingest log for each receiver, got logs: %v", logs) -<<<<<<< HEAD - assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") - assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") - assert.Equal(c, "metricbeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") - assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected otelcol.component.kind field in r2 log record") -======= assert.Equal(c, "metricbeat", logs["r1"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r1") assert.Equal(c, "metricbeat", logs["r2"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r2") @@ -222,7 +216,6 @@ func TestMultipleReceivers(t *testing.T) { r2StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "metricbeatreceiver/r2")) assert.Equal(c, 1, r2StartLogs.Len(), "r2 should have a single start log") ->>>>>>> ca1c17bca (remove otel.component.id and otel.component.kind fields from beat receivers (#47729)) var lastError strings.Builder assert.Conditionf(c, func() bool { tests := []string{monitorSocket1, monitorSocket2} From 11f9923b901ca9b7caf45171a174af154a93150d Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 25 Nov 2025 15:40:43 -0600 Subject: [PATCH 3/3] another merge fix --- x-pack/metricbeat/mbreceiver/receiver_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/x-pack/metricbeat/mbreceiver/receiver_test.go b/x-pack/metricbeat/mbreceiver/receiver_test.go index af94362543b4..503b0553d548 100644 --- a/x-pack/metricbeat/mbreceiver/receiver_test.go +++ b/x-pack/metricbeat/mbreceiver/receiver_test.go @@ -207,15 +207,6 @@ func TestMultipleReceivers(t *testing.T) { assert.Equal(c, "metricbeat", logs["r1"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r1") assert.Equal(c, "metricbeat", logs["r2"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r2") - // Make sure that each receiver has a separate logger - // instance and does not interfere with others. Previously, the - // logger in Beats was global, causing logger fields to be - // overwritten when multiple receivers started in the same process. - r1StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "metricbeatreceiver/r1")) - assert.Equal(c, 1, r1StartLogs.Len(), "r1 should have a single start log") - r2StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "metricbeatreceiver/r2")) - assert.Equal(c, 1, r2StartLogs.Len(), "r2 should have a single start log") - var lastError strings.Builder assert.Conditionf(c, func() bool { tests := []string{monitorSocket1, monitorSocket2}