Skip to content

Commit

Permalink
Update Journald fields to better match ECS
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Jan 22, 2025
1 parent 27adc68 commit 33cbd59
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954]
- Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954]
- The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762]
- The fields produced by the Journald input are updated to better match ECS. Renamed fields: `syslog.priority` -> `log.syslog.priority`, `syslog.facility` -> `log.syslog.facility.code`, `syslog.identifier` -> `log.syslog.appname`, `syslog.pid` -> `log.syslog.procid`. Dropped fields: `container.id_truncated`, `container.log.tag`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`, otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]

*Heartbeat*

Expand Down
14 changes: 7 additions & 7 deletions filebeat/docs/inputs/input-journald.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,10 @@ journald fields:
`_MACHINE_ID`:: `host.id`
`_MESSAGE`:: `message`
`_PID`:: `process.pid`
`_PRIORITY`:: `syslog.priority`
`_SYSLOG_FACILITY`:: `syslog.facility`
`_SYSLOG_IDENTIFIER`:: `syslog.identifier`
`_SYSLOG_PID`:: `syslog.pid`
`_PRIORITY`:: `log.syslog.priority`
`_SYSLOG_FACILITY`:: `log.syslog.facility.code`
`_SYSLOG_IDENTIFIER`:: `log.syslog.appname`
`_SYSLOG_PID`:: `log.syslog.procid`
`_SYSTEMD_CGROUP`:: `systemd.cgroup`
`_SYSTEMD_INVOCATION_ID`:: `systemd.invocation_id`
`_SYSTEMD_OWNER_UID`:: `systemd.owner_uid`
Expand All @@ -484,13 +484,13 @@ https://docs.docker.com/config/containers/logging/journald/[Docker] are also
available:

[horizontal]
`CONTAINER_ID`:: `container.id_truncated`
`CONTAINER_ID_FULL`:: `container.id`
`CONTAINER_NAME`:: `container.name`
`CONTAINER_PARTIAL_MESSAGE`:: `container.partial`
`CONTAINER_TAG`:: `container.log.tag`
`IMAGE_NAME`:: `container.image.name`

If `CONTAINER_PARTIAL_MESSAGE` is present and it is true, then the tag
`partial_message` is added to the final event.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
15 changes: 15 additions & 0 deletions filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package journald
import (
"errors"
"fmt"
"strconv"
"time"

"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalctl"
Expand Down Expand Up @@ -279,6 +280,20 @@ func (r *readerAdapter) Next() (reader.Message, error) {
fields.Put("event.kind", "event")
fields.Put("event.created", created)

// IF 'container.partial' is present, we can parse it and it's true, then
// add 'partial_message' to tags.
if partialMessageRaw, err := fields.GetValue("container.partial"); err == nil {
partialMessage, err := strconv.ParseBool(fmt.Sprint(partialMessageRaw))
if err == nil && partialMessage {
// 'fields' came directly from the journal,
// so there is no chance tags already exist
fields.Put("tags", []string{"partial_message"})
}
}

// Delete 'container.partial', if there are any errors, ignore it
_ = fields.Delete("container.partial")

// if entry is coming from a remote journal, add_host_metadata overwrites
// the source hostname, so it has to be copied to a different field
if r.saveRemoteHostname {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestInputIncludeMatches(t *testing.T) {
"single match condition": {
includeMatches: map[string]interface{}{
"match": []string{
"syslog.facility=3",
"log.syslog.facility.code=3",
},
},
expectedMessages: []string{
Expand All @@ -159,7 +159,7 @@ func TestInputIncludeMatches(t *testing.T) {
includeMatches: map[string]interface{}{
"match": []string{
"journald.process.name=systemd",
"syslog.facility=3",
"log.syslog.facility.code=3",
},
},
expectedMessages: []string{
Expand Down
35 changes: 35 additions & 0 deletions filebeat/input/journald/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ func TestCompareGoSystemdWithJournalctl(t *testing.T) {
t.Fatalf("expecting %d events, got %d", len(goldenEvents), len(events))
}

// After the golden events were generated we changed some field names
// to better align with ECS, so we do the same conversion in the golden
// events.
updateGoldenEvents(goldenEvents)

// The timestamps can have different locations set, but still be equal,
// this causes the require.EqualValues to fail, so we compare them manually
// and set them all to the same time.
Expand All @@ -190,6 +195,36 @@ func TestCompareGoSystemdWithJournalctl(t *testing.T) {
require.EqualValues(t, goldenEvents, events, "events do not match reference")
}

func updateGoldenEvents(events []beat.Event) {
convTable := map[string]string{
"message_id": "log.syslog.msgid",
"syslog.priority": "log.syslog.priority",
"syslog.facility": "log.syslog.facility.code",
"syslog.identifier": "log.syslog.appname",
"syslog.pid": "log.syslog.procid",
}
for _, evt := range events {
for oldKey, newKey := range convTable {
value, err := evt.Fields.GetValue(oldKey)
// an error means the key does not exist
if err != nil {
continue
}
// Ignore any error
_ = evt.Fields.Delete(oldKey)
_, _ = evt.Fields.Put(newKey, value)
}
syslog, err := evt.Fields.GetValue("syslog")
if err != nil {
continue
}
syslogMap, isMap := syslog.(map[string]any)
if isMap && len(syslogMap) == 0 {
_ = evt.Fields.Delete("syslog")
}
}
}

func TestMatchers(t *testing.T) {
out := decompress(t, filepath.Join("testdata", "matchers.journal.gz"))
// If this test fails, uncomment the following line to see the debug logs
Expand Down
21 changes: 12 additions & 9 deletions filebeat/input/journald/pkg/journalfield/conv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func TestConversion(t *testing.T) {
"SYSLOG_PID": "123456",
},
want: mapstr.M{
"syslog": mapstr.M{
"pid": int64(123456),
"log": mapstr.M{
"syslog": mapstr.M{
"procid": int64(123456),
},
},
},
},
Expand All @@ -60,9 +62,6 @@ func TestConversion(t *testing.T) {
"PRIORITY": "123456, ",
},
want: mapstr.M{
"syslog": mapstr.M{
"priority": int64(123456),
},
"log": mapstr.M{
"syslog": mapstr.M{
"priority": int64(123456),
Expand All @@ -75,8 +74,10 @@ func TestConversion(t *testing.T) {
"SYSLOG_PID": "123456,root",
},
want: mapstr.M{
"syslog": mapstr.M{
"pid": int64(123456),
"log": mapstr.M{
"syslog": mapstr.M{
"procid": int64(123456),
},
},
},
},
Expand All @@ -85,8 +86,10 @@ func TestConversion(t *testing.T) {
"SYSLOG_PID": "",
},
want: mapstr.M{
"syslog": mapstr.M{
"pid": "",
"log": mapstr.M{
"syslog": mapstr.M{
"procid": "",
},
},
},
},
Expand Down
14 changes: 7 additions & 7 deletions filebeat/input/journald/pkg/journalfield/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var journaldEventFields = FieldConversion{
"COREDUMP_UNIT": text("journald.coredump.unit"),
"COREDUMP_USER_UNIT": text("journald.coredump.user_unit"),
"MESSAGE": text("message"),
"MESSAGE_ID": text("message_id"),
"MESSAGE_ID": text("log.syslog.msgid"),
"OBJECT_AUDIT_LOGINUID": integer("journald.object.audit.login_uid"),
"OBJECT_AUDIT_SESSION": integer("journald.object.audit.session"),
"OBJECT_CMDLINE": text("journald.object.process.command_line"),
Expand All @@ -38,10 +38,10 @@ var journaldEventFields = FieldConversion{
"OBJECT_SYSTEMD_UNIT": text("journald.object.systemd.unit"),
"OBJECT_SYSTEMD_USER_UNIT": text("journald.object.systemd.user_unit"),
"OBJECT_UID": integer("journald.object.uid"),
"PRIORITY": integer("syslog.priority", "log.syslog.priority"),
"SYSLOG_FACILITY": integer("syslog.facility", "log.syslog.facility.code"),
"SYSLOG_IDENTIFIER": text("syslog.identifier"),
"SYSLOG_PID": integer("syslog.pid"),
"PRIORITY": integer("log.syslog.priority"),
"SYSLOG_FACILITY": integer("log.syslog.facility.code"),
"SYSLOG_IDENTIFIER": text("log.syslog.appname"),
"SYSLOG_PID": integer("log.syslog.procid"),
"UNIT": text("journald.unit"),
"_AUDIT_LOGINUID": integer("journald.audit.login_uid"),
"_AUDIT_SESSION": text("journald.audit.session"),
Expand Down Expand Up @@ -74,14 +74,14 @@ var journaldEventFields = FieldConversion{
"_UID": integer("journald.uid"),

// docker journald fields from: https://docs.docker.com/config/containers/logging/journald/
"CONTAINER_ID": text("container.id_truncated"),
"CONTAINER_ID_FULL": text("container.id"),
"CONTAINER_NAME": text("container.name"),
"CONTAINER_TAG": text("container.log.tag"),
"CONTAINER_PARTIAL_MESSAGE": text("container.partial"),
"IMAGE_NAME": text("container.image.name"),

// dropped fields
"CONTAINER_ID": ignoredField,
"CONTAINER_TAG": ignoredField,
"_SOURCE_MONOTONIC_TIMESTAMP": ignoredField, // saved in the registry
"_SOURCE_REALTIME_TIMESTAMP": ignoredField, // saved in the registry
"__CURSOR": ignoredField, // saved in the registry
Expand Down

0 comments on commit 33cbd59

Please sign in to comment.