Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: breaking-change

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: remove otel.component.id and otel.component.kind from beat receiver events

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: all

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/beats/pull/47729

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/beats/issues/47600
9 changes: 3 additions & 6 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ func TestNewReceiver(t *testing.T) {
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
_ = zapLogs
require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"]))
assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.Equal(c, "test", logs["r1"][0].Flatten()["message"], "expected message field to contain string 'test'")
var lastError strings.Builder
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket, "stats")
Expand Down Expand Up @@ -250,10 +249,8 @@ func TestMultipleReceivers(t *testing.T) {
require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs")
require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs")

assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record")
assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record")
assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record")
assert.Equal(c, "test", logs["r1"][0].Flatten()["message"], "expected r1 message field to be 'test'")
assert.Equal(c, "test", logs["r2"][0].Flatten()["message"], "expected r2 message field to be 'test'")

// Make sure that each receiver has a separate logger
// instance and does not interfere with others. Previously, the
Expand Down
4 changes: 0 additions & 4 deletions x-pack/filebeat/input/gcppubsub/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,7 @@ processors:
"agent.ephemeral_id",
"agent.id",
"event.created",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}

oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")

}
11 changes: 3 additions & 8 deletions x-pack/filebeat/tests/integration/otel_lsexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDataShapeOTelVSClassicE2E(t *testing.T) {

// Agent does not support `index` setting, while beats does.
// Our focus is on agent classic vs otel mode comparison, so we do not test `index` for filebeat
var beatsCfgFile = `
beatsCfgFile := `
filebeat.inputs:
- type: filestream
id: filestream-input-id
Expand Down Expand Up @@ -173,8 +173,6 @@ service:
"log.file.inode",
"log.file.path",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
"log.file.device_id",
"log.file.fingerprint",
}
Expand Down Expand Up @@ -268,7 +266,6 @@ service:
require.NoError(t, err, "failed to read otel events")
require.Equal(t, numEvents, len(otelEvents),
"different number of events: sent=%d, get=%d", numEvents, len(otelEvents))

}

func generateEvents(numEvents int) []string {
Expand Down Expand Up @@ -377,10 +374,8 @@ func compareOutputFiles(t *testing.T, fbFilePath, otelFilePath string, ignoredFi
oteltest.AssertMapsEqual(t, fbEvent.data, otelEvent.data, ignoredFields,
fmt.Sprintf("event comparison failed for ID %s (line %d)", fbEvent.id, i))

assert.Equal(t, "filebeatreceiver", otelEvent.data.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, "receiver", otelEvent.data.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.NotContains(t, fbEvent.data.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record")
assert.NotContains(t, fbEvent.data.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record")
assert.Equal(t, "filebeat", otelEvent.data.Flatten()["agent.type"], "expected agent.type to be 'filebeat' in otel data")
assert.Equal(t, "filebeat", fbEvent.data.Flatten()["agent.type"], "expected agent.type to be 'filebeat' in filebeat data")
}
}

Expand Down
18 changes: 3 additions & 15 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ service:
writeEventsToLogFile(t, logFilePath, numEvents)
oteltestcol.New(t, fmt.Sprintf(otelCfgFile, logFilePath, tmpdir, otelMonitoringPort, fbOtelIndex))

var beatsCfgFile = `
beatsCfgFile := `
filebeat.inputs:
- type: filestream
id: filestream-input-id
Expand Down Expand Up @@ -185,18 +185,13 @@ http.port: %d
"agent.id",
"log.file.inode",
"log.file.path",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
"log.file.device_id", // changes value between filebeat and otel receiver
}

oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")

assert.Equal(t, "filebeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record")
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record")
assert.Equal(t, "filebeat", otelDoc.Flatten()["agent.type"], "expected agent.type field to be 'filebeat' in otel docs")
assert.Equal(t, "filebeat", filebeatDoc.Flatten()["agent.type"], "expected agent.type field to be 'filebeat' in filebeat docs")
assertMonitoring(t, otelMonitoringPort)
}

Expand Down Expand Up @@ -368,9 +363,6 @@ service:
"agent.ephemeral_id",
"agent.id",
"event.created",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}

oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
Expand Down Expand Up @@ -830,7 +822,6 @@ service:
}

func TestFileBeatKerberos(t *testing.T) {

wantEvents := 1
krbURL := "http://localhost:9203" // this is kerberos client - we've hardcoded the URL here
tempFile := t.TempDir()
Expand Down Expand Up @@ -937,12 +928,10 @@ service:
assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d events, got %d", wantEvents, otelDocs.Hits.Total.Value)
},
2*time.Minute, 1*time.Second)

}

// setupRoleMapping sets up role mapping for the Kerberos user beats@elastic
func setupRoleMapping(t *testing.T, client *elasticsearch.Client) {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Expand Down Expand Up @@ -976,5 +965,4 @@ func setupRoleMapping(t *testing.T, client *elasticsearch.Client) {
defer resp.Body.Close()

require.Equal(t, resp.StatusCode, http.StatusOK, "incorrect response code")

}
13 changes: 0 additions & 13 deletions x-pack/libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ import (
const (
// esDocumentIDAttribute is the attribute key used to store the document ID in the log record.
esDocumentIDAttribute = "elasticsearch.document_id"
// otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event.
otelComponentIDKey = "otelcol.component.id"
// otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver".
otelComponentKindKey = "otelcol.component.kind"
)

func init() {
Expand Down Expand Up @@ -153,15 +149,6 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
}
logRecord.SetObservedTimestamp(observedTimestamp)

if agent, _ := beatEvent.GetValue("agent"); agent != nil {
switch agent := agent.(type) {
case mapstr.M:
agent[otelComponentIDKey] = out.beatInfo.ComponentID
agent[otelComponentKindKey] = "receiver"
beatEvent["agent"] = agent
}
}

otelmap.ConvertNonPrimitive(beatEvent)

// if data_stream field is set on beatEvent. Add it to logrecord.Attributes to support dynamic indexing
Expand Down
69 changes: 0 additions & 69 deletions x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,75 +289,6 @@ func TestPublish(t *testing.T) {
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
})
t.Run("sets otel specific-fields", func(t *testing.T) {
testCases := []struct {
name string
componentID string
componentKind string
expectedComponentID string
expectedComponentKind string
}{
{
name: "sets beat component ID",
componentID: "filebeatreceiver/1",
expectedComponentID: "filebeatreceiver/1",
expectedComponentKind: "receiver",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
event := beat.Event{
Fields: mapstr.M{
"field": 1,
"agent": mapstr.M{},
},
Meta: mapstr.M{
"_id": "abc123",
},
}
ch := make(chan plog.Logs, 1)
batch := outest.NewBatch(event)
var countLogs int
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
countLogs = countLogs + ld.LogRecordCount()
ch <- ld
return nil
})
otelConsumer.beatInfo.ComponentID = tc.componentID
err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed")
log := <-ch
for i := 0; i < log.ResourceLogs().Len(); i++ {
resourceLog := log.ResourceLogs().At(i)
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLog := resourceLog.ScopeLogs().At(j)
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
logRecord := scopeLog.LogRecords().At(k)
body := logRecord.Body().Map()

// Traverse nested "agent.otelcol.component" structure
agentVal, ok := body.Get("agent")
require.True(t, ok, "expected 'agent' in log body")

agentMap := agentVal.Map()
idVal, ok := agentMap.Get("otelcol.component.id")
require.True(t, ok, "expected 'agent.otelcol.component.id' in log body")
assert.Equal(t, tc.expectedComponentID, idVal.AsString())

kindVal, ok := agentMap.Get("otelcol.component.kind")
require.True(t, ok, "expected 'agent.otelcol.component.kind' in log body")
assert.Equal(t, tc.expectedComponentKind, kindVal.AsString())
}
}
}

})
}
})
t.Run("sets the client context metadata with the beat info", func(t *testing.T) {
batch := outest.NewBatch(event1)
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
Expand Down
10 changes: 4 additions & 6 deletions x-pack/metricbeat/mbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func TestNewReceiver(t *testing.T) {
require.Conditionf(c, func() bool {
return len(logs["r1"]) > 0
}, "expected at least one ingest log, got logs: %v", logs["r1"])
assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.Equal(c, "metricbeat", logs["r1"][0].Flatten()["agent.type"], "expected agent.type field in to be 'metricbeat'")

var lastError strings.Builder
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket, "stats")
Expand Down Expand Up @@ -195,10 +195,8 @@ func TestMultipleReceivers(t *testing.T) {
require.Conditionf(c, func() bool {
return len(logs["r1"]) > 0 && len(logs["r2"]) > 0
}, "expected at least one ingest log for each receiver, got logs: %v", logs)
assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record")
assert.Equal(c, "metricbeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record")
assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected otelcol.component.kind field in r2 log record")
assert.Equal(c, "metricbeat", logs["r1"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r1")
assert.Equal(c, "metricbeat", logs["r2"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r2")

// Make sure that each receiver has a separate logger
// instance and does not interfere with others. Previously, the
Expand Down
22 changes: 5 additions & 17 deletions x-pack/metricbeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ service:

oteltestcol.New(t, configBuffer.String())

var beatsCfgFile = `
beatsCfgFile := `
metricbeat:
modules:
- module: system
Expand Down Expand Up @@ -205,16 +205,9 @@ http.port: {{.MonitoringPort}}
var metricbeatDoc, otelDoc mapstr.M
otelDoc = otelDocs.Hits.Hits[0].Source
metricbeatDoc = metricbeatDocs.Hits.Hits[0].Source
ignoredFields := []string{
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}
assert.Equal(t, "metricbeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in metricbeat log record")
assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in metricbeat log record")
assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, ignoredFields, "expected documents keys to be equal")
assert.Equal(t, "metricbeat", otelDoc.Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in otel docs")
assert.Equal(t, "metricbeat", metricbeatDoc.Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in metricbeat docs")
assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, nil, "expected documents keys to be equal")
assertMonitoring(t, metricbeatMonitoringPort)
}

Expand Down Expand Up @@ -381,12 +374,7 @@ service:
assert.GreaterOrEqualf(ct, r1Docs.Hits.Total.Value, 1, "expected at least 1 log for receiver 1, got %d", r1Docs.Hits.Total.Value)
},
1*time.Minute, 100*time.Millisecond, "expected at least 1 log for each receiver")
ignoredFields := []string{
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}
assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, ignoredFields, "expected documents keys to be equal")
assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, nil, "expected documents keys to be equal")
for _, rec := range otelConfig.Receivers {
assertMonitoring(t, rec.MonitoringPort)
}
Expand Down