diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0605320b2f14..8110a5e8759c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954] - Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954] - The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762] +- The fields produced by the Journald input are updated to better match ECS. Renamed fields: `syslog.priority` -> `log.syslog.priority`, `syslog.facility` -> `log.syslog.facility.code`, `syslog.identifier` -> `log.syslog.appname`, `syslog.pid` -> `log.syslog.procid`. Dropped fields: `container.id_truncated`, `container.log.tag`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`, otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] *Heartbeat* diff --git a/filebeat/docs/inputs/input-journald.asciidoc b/filebeat/docs/inputs/input-journald.asciidoc index f7655f51419f..d2bf341d60f0 100644 --- a/filebeat/docs/inputs/input-journald.asciidoc +++ b/filebeat/docs/inputs/input-journald.asciidoc @@ -461,10 +461,10 @@ journald fields: `_MACHINE_ID`:: `host.id` `_MESSAGE`:: `message` `_PID`:: `process.pid` -`_PRIORITY`:: `syslog.priority` -`_SYSLOG_FACILITY`:: `syslog.facility` -`_SYSLOG_IDENTIFIER`:: `syslog.identifier` -`_SYSLOG_PID`:: `syslog.pid` +`_PRIORITY`:: `log.syslog.priority` +`_SYSLOG_FACILITY`:: `log.syslog.facility.code` +`_SYSLOG_IDENTIFIER`:: `log.syslog.appname` +`_SYSLOG_PID`:: `log.syslog.procid` `_SYSTEMD_CGROUP`:: `systemd.cgroup` `_SYSTEMD_INVOCATION_ID`:: `systemd.invocation_id` `_SYSTEMD_OWNER_UID`:: `systemd.owner_uid` @@ -484,13 +484,13 @@ https://docs.docker.com/config/containers/logging/journald/[Docker] are also available: [horizontal] -`CONTAINER_ID`:: `container.id_truncated` `CONTAINER_ID_FULL`:: `container.id` `CONTAINER_NAME`:: `container.name` -`CONTAINER_PARTIAL_MESSAGE`:: `container.partial` -`CONTAINER_TAG`:: `container.log.tag` `IMAGE_NAME`:: `container.image.name` +If `CONTAINER_PARTIAL_MESSAGE` is present and it is true, then the tag +`partial_message` is added to the final event. + [id="{beatname_lc}-input-{type}-common-options"] include::../inputs/input-common-options.asciidoc[] diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index 0ab3c5481775..e8eb8a7a8773 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -22,6 +22,7 @@ package journald import ( "errors" "fmt" + "strconv" "time" "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalctl" @@ -279,6 +280,20 @@ func (r *readerAdapter) Next() (reader.Message, error) { fields.Put("event.kind", "event") fields.Put("event.created", created) + // IF 'container.partial' is present, we can parse it and it's true, then + // add 'partial_message' to tags. + if partialMessageRaw, err := fields.GetValue("container.partial"); err == nil { + partialMessage, err := strconv.ParseBool(fmt.Sprint(partialMessageRaw)) + if err == nil && partialMessage { + // 'fields' came directly from the journal, + // so there is no chance tags already exist + fields.Put("tags", []string{"partial_message"}) + } + } + + // Delete 'container.partial', if there are any errors, ignore it + _ = fields.Delete("container.partial") + // if entry is coming from a remote journal, add_host_metadata overwrites // the source hostname, so it has to be copied to a different field if r.saveRemoteHostname { diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index 220f71e2d9ba..27632735ffce 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -142,7 +142,7 @@ func TestInputIncludeMatches(t *testing.T) { "single match condition": { includeMatches: map[string]interface{}{ "match": []string{ - "syslog.facility=3", + "log.syslog.facility.code=3", }, }, expectedMessages: []string{ @@ -159,7 +159,7 @@ func TestInputIncludeMatches(t *testing.T) { includeMatches: map[string]interface{}{ "match": []string{ "journald.process.name=systemd", - "syslog.facility=3", + "log.syslog.facility.code=3", }, }, expectedMessages: []string{ diff --git a/filebeat/input/journald/input_test.go b/filebeat/input/journald/input_test.go index 776115d5d8ac..2cf320fa4d25 100644 --- a/filebeat/input/journald/input_test.go +++ b/filebeat/input/journald/input_test.go @@ -171,6 +171,11 @@ func TestCompareGoSystemdWithJournalctl(t *testing.T) { t.Fatalf("expecting %d events, got %d", len(goldenEvents), len(events)) } + // After the golden events were generated we changed some field names + // to better align with ECS, so we do the same conversion in the golden + // events. + updateGoldenEvents(goldenEvents) + // The timestamps can have different locations set, but still be equal, // this causes the require.EqualValues to fail, so we compare them manually // and set them all to the same time. @@ -190,6 +195,36 @@ func TestCompareGoSystemdWithJournalctl(t *testing.T) { require.EqualValues(t, goldenEvents, events, "events do not match reference") } +func updateGoldenEvents(events []beat.Event) { + convTable := map[string]string{ + "message_id": "log.syslog.msgid", + "syslog.priority": "log.syslog.priority", + "syslog.facility": "log.syslog.facility.code", + "syslog.identifier": "log.syslog.appname", + "syslog.pid": "log.syslog.procid", + } + for _, evt := range events { + for oldKey, newKey := range convTable { + value, err := evt.Fields.GetValue(oldKey) + // an error means the key does not exist + if err != nil { + continue + } + // Ignore any error + _ = evt.Fields.Delete(oldKey) + _, _ = evt.Fields.Put(newKey, value) + } + syslog, err := evt.Fields.GetValue("syslog") + if err != nil { + continue + } + syslogMap, isMap := syslog.(map[string]any) + if isMap && len(syslogMap) == 0 { + _ = evt.Fields.Delete("syslog") + } + } +} + func TestMatchers(t *testing.T) { out := decompress(t, filepath.Join("testdata", "matchers.journal.gz")) // If this test fails, uncomment the following line to see the debug logs diff --git a/filebeat/input/journald/pkg/journalfield/conv_test.go b/filebeat/input/journald/pkg/journalfield/conv_test.go index c92d3f15bcb0..936b7ce485ee 100644 --- a/filebeat/input/journald/pkg/journalfield/conv_test.go +++ b/filebeat/input/journald/pkg/journalfield/conv_test.go @@ -50,8 +50,10 @@ func TestConversion(t *testing.T) { "SYSLOG_PID": "123456", }, want: mapstr.M{ - "syslog": mapstr.M{ - "pid": int64(123456), + "log": mapstr.M{ + "syslog": mapstr.M{ + "procid": int64(123456), + }, }, }, }, @@ -60,9 +62,6 @@ func TestConversion(t *testing.T) { "PRIORITY": "123456, ", }, want: mapstr.M{ - "syslog": mapstr.M{ - "priority": int64(123456), - }, "log": mapstr.M{ "syslog": mapstr.M{ "priority": int64(123456), @@ -75,8 +74,10 @@ func TestConversion(t *testing.T) { "SYSLOG_PID": "123456,root", }, want: mapstr.M{ - "syslog": mapstr.M{ - "pid": int64(123456), + "log": mapstr.M{ + "syslog": mapstr.M{ + "procid": int64(123456), + }, }, }, }, @@ -85,8 +86,10 @@ func TestConversion(t *testing.T) { "SYSLOG_PID": "", }, want: mapstr.M{ - "syslog": mapstr.M{ - "pid": "", + "log": mapstr.M{ + "syslog": mapstr.M{ + "procid": "", + }, }, }, }, diff --git a/filebeat/input/journald/pkg/journalfield/default.go b/filebeat/input/journald/pkg/journalfield/default.go index d2ec76fcd393..a8809ce08423 100644 --- a/filebeat/input/journald/pkg/journalfield/default.go +++ b/filebeat/input/journald/pkg/journalfield/default.go @@ -25,7 +25,7 @@ var journaldEventFields = FieldConversion{ "COREDUMP_UNIT": text("journald.coredump.unit"), "COREDUMP_USER_UNIT": text("journald.coredump.user_unit"), "MESSAGE": text("message"), - "MESSAGE_ID": text("message_id"), + "MESSAGE_ID": text("log.syslog.msgid"), "OBJECT_AUDIT_LOGINUID": integer("journald.object.audit.login_uid"), "OBJECT_AUDIT_SESSION": integer("journald.object.audit.session"), "OBJECT_CMDLINE": text("journald.object.process.command_line"), @@ -38,10 +38,10 @@ var journaldEventFields = FieldConversion{ "OBJECT_SYSTEMD_UNIT": text("journald.object.systemd.unit"), "OBJECT_SYSTEMD_USER_UNIT": text("journald.object.systemd.user_unit"), "OBJECT_UID": integer("journald.object.uid"), - "PRIORITY": integer("syslog.priority", "log.syslog.priority"), - "SYSLOG_FACILITY": integer("syslog.facility", "log.syslog.facility.code"), - "SYSLOG_IDENTIFIER": text("syslog.identifier"), - "SYSLOG_PID": integer("syslog.pid"), + "PRIORITY": integer("log.syslog.priority"), + "SYSLOG_FACILITY": integer("log.syslog.facility.code"), + "SYSLOG_IDENTIFIER": text("log.syslog.appname"), + "SYSLOG_PID": integer("log.syslog.procid"), "UNIT": text("journald.unit"), "_AUDIT_LOGINUID": integer("journald.audit.login_uid"), "_AUDIT_SESSION": text("journald.audit.session"), @@ -74,14 +74,14 @@ var journaldEventFields = FieldConversion{ "_UID": integer("journald.uid"), // docker journald fields from: https://docs.docker.com/config/containers/logging/journald/ - "CONTAINER_ID": text("container.id_truncated"), "CONTAINER_ID_FULL": text("container.id"), "CONTAINER_NAME": text("container.name"), - "CONTAINER_TAG": text("container.log.tag"), "CONTAINER_PARTIAL_MESSAGE": text("container.partial"), "IMAGE_NAME": text("container.image.name"), // dropped fields + "CONTAINER_ID": ignoredField, + "CONTAINER_TAG": ignoredField, "_SOURCE_MONOTONIC_TIMESTAMP": ignoredField, // saved in the registry "_SOURCE_REALTIME_TIMESTAMP": ignoredField, // saved in the registry "__CURSOR": ignoredField, // saved in the registry