Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Journald fields to better match ECS #42403

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954]
- Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954]
- The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762]
- The fields produced by the Journald input are updated to better match ECS. Renamed fields: `syslog.priority` -> `log.syslog.priority`, `syslog.facility` -> `log.syslog.facility.code`, `syslog.identifier` -> `log.syslog.appname`, `syslog.pid` -> `log.syslog.procid`. Dropped fields (they're duplicated): `container.id_truncated`, `container.log.tag`. The field `container.partial` is replaced by the tag `partial_message` if it was `true`, otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]

*Heartbeat*

Expand Down
14 changes: 7 additions & 7 deletions filebeat/docs/inputs/input-journald.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,10 @@ journald fields:
`_MACHINE_ID`:: `host.id`
`_MESSAGE`:: `message`
`_PID`:: `process.pid`
`_PRIORITY`:: `syslog.priority`
`_SYSLOG_FACILITY`:: `syslog.facility`
`_SYSLOG_IDENTIFIER`:: `syslog.identifier`
`_SYSLOG_PID`:: `syslog.pid`
`_PRIORITY`:: `log.syslog.priority`
`_SYSLOG_FACILITY`:: `log.syslog.facility.code`
`_SYSLOG_IDENTIFIER`:: `log.syslog.appname`
`_SYSLOG_PID`:: `log.syslog.procid`
`_SYSTEMD_CGROUP`:: `systemd.cgroup`
`_SYSTEMD_INVOCATION_ID`:: `systemd.invocation_id`
`_SYSTEMD_OWNER_UID`:: `systemd.owner_uid`
Expand All @@ -484,13 +484,13 @@ https://docs.docker.com/config/containers/logging/journald/[Docker] are also
available:

[horizontal]
`CONTAINER_ID`:: `container.id_truncated`
`CONTAINER_ID_FULL`:: `container.id`
`CONTAINER_NAME`:: `container.name`
`CONTAINER_PARTIAL_MESSAGE`:: `container.partial`
`CONTAINER_TAG`:: `container.log.tag`
`IMAGE_NAME`:: `container.image.name`

If `CONTAINER_PARTIAL_MESSAGE` is present and it is true, then the tag
`partial_message` is added to the final event.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
15 changes: 15 additions & 0 deletions filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package journald
import (
"errors"
"fmt"
"strconv"
"time"

"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalctl"
Expand Down Expand Up @@ -279,6 +280,20 @@ func (r *readerAdapter) Next() (reader.Message, error) {
fields.Put("event.kind", "event")
fields.Put("event.created", created)

// IF 'container.partial' is present, we can parse it and it's true, then
// add 'partial_message' to tags.
if partialMessageRaw, err := fields.GetValue("container.partial"); err == nil {
partialMessage, err := strconv.ParseBool(fmt.Sprint(partialMessageRaw))
if err == nil && partialMessage {
// 'fields' came directly from the journal,
// so there is no chance tags already exist
fields.Put("tags", []string{"partial_message"})
}
}

// Delete 'container.partial', if there are any errors, ignore it
_ = fields.Delete("container.partial")

// if entry is coming from a remote journal, add_host_metadata overwrites
// the source hostname, so it has to be copied to a different field
if r.saveRemoteHostname {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestInputIncludeMatches(t *testing.T) {
"single match condition": {
includeMatches: map[string]interface{}{
"match": []string{
"syslog.facility=3",
"log.syslog.facility.code=3",
},
},
expectedMessages: []string{
Expand All @@ -159,7 +159,7 @@ func TestInputIncludeMatches(t *testing.T) {
includeMatches: map[string]interface{}{
"match": []string{
"journald.process.name=systemd",
"syslog.facility=3",
"log.syslog.facility.code=3",
},
},
expectedMessages: []string{
Expand Down
27 changes: 27 additions & 0 deletions filebeat/input/journald/input_parsers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,30 @@ func TestInputParsers(t *testing.T) {
t.Errorf("expecting 'answer' from the Journal JSON to be '%d' got '%d' instead", expectedAnswer, answer)
}
}

func TestPartialMessageTag(t *testing.T) {
out := decompress(t, filepath.Join("testdata", "ndjson-parser.journal.gz"))
env := newInputTestingEnvironment(t)
inp := env.mustCreateInput(mapstr.M{
"paths": []string{out},
})

ctx, cancelInput := context.WithCancel(context.Background())
t.Cleanup(cancelInput)
env.startInput(ctx, inp)
env.waitUntilEventCount(1)
event := env.pipeline.clients[0].GetEvents()[0]

tags, err := event.Fields.GetValue("tags")
if err != nil {
t.Fatalf("'tags' not found in event: %s", err)
}

tagsStrSlice, ok := tags.([]string)
if !ok {
t.Fatalf("expecting 'tags' to be []string, got %T instead", tags)
}
if tagsStrSlice[0] != "partial_message" {
t.Fatalf("expecting the tag 'partial_message', got %v instead", tagsStrSlice)
}
}
35 changes: 35 additions & 0 deletions filebeat/input/journald/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ func TestCompareGoSystemdWithJournalctl(t *testing.T) {
t.Fatalf("expecting %d events, got %d", len(goldenEvents), len(events))
}

// After the golden events were generated we changed some field names
// to better align with ECS, so we do the same conversion in the golden
// events.
updateGoldenEvents(goldenEvents)

// The timestamps can have different locations set, but still be equal,
// this causes the require.EqualValues to fail, so we compare them manually
// and set them all to the same time.
Expand All @@ -190,6 +195,36 @@ func TestCompareGoSystemdWithJournalctl(t *testing.T) {
require.EqualValues(t, goldenEvents, events, "events do not match reference")
}

func updateGoldenEvents(events []beat.Event) {
convTable := map[string]string{
"message_id": "log.syslog.msgid",
"syslog.priority": "log.syslog.priority",
"syslog.facility": "log.syslog.facility.code",
"syslog.identifier": "log.syslog.appname",
"syslog.pid": "log.syslog.procid",
}
for _, evt := range events {
for oldKey, newKey := range convTable {
value, err := evt.Fields.GetValue(oldKey)
// an error means the key does not exist
if err != nil {
continue
}
// Ignore any error
_ = evt.Fields.Delete(oldKey)
_, _ = evt.Fields.Put(newKey, value)
}
syslog, err := evt.Fields.GetValue("syslog")
if err != nil {
continue
}
syslogMap, isMap := syslog.(map[string]any)
if isMap && len(syslogMap) == 0 {
_ = evt.Fields.Delete("syslog")
}
}
}

func TestMatchers(t *testing.T) {
out := decompress(t, filepath.Join("testdata", "matchers.journal.gz"))
// If this test fails, uncomment the following line to see the debug logs
Expand Down
21 changes: 12 additions & 9 deletions filebeat/input/journald/pkg/journalfield/conv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func TestConversion(t *testing.T) {
"SYSLOG_PID": "123456",
},
want: mapstr.M{
"syslog": mapstr.M{
"pid": int64(123456),
"log": mapstr.M{
"syslog": mapstr.M{
"procid": int64(123456),
},
},
},
},
Expand All @@ -60,9 +62,6 @@ func TestConversion(t *testing.T) {
"PRIORITY": "123456, ",
},
want: mapstr.M{
"syslog": mapstr.M{
"priority": int64(123456),
},
"log": mapstr.M{
"syslog": mapstr.M{
"priority": int64(123456),
Expand All @@ -75,8 +74,10 @@ func TestConversion(t *testing.T) {
"SYSLOG_PID": "123456,root",
},
want: mapstr.M{
"syslog": mapstr.M{
"pid": int64(123456),
"log": mapstr.M{
"syslog": mapstr.M{
"procid": int64(123456),
},
},
},
},
Expand All @@ -85,8 +86,10 @@ func TestConversion(t *testing.T) {
"SYSLOG_PID": "",
},
want: mapstr.M{
"syslog": mapstr.M{
"pid": "",
"log": mapstr.M{
"syslog": mapstr.M{
"procid": "",
},
},
},
},
Expand Down
14 changes: 7 additions & 7 deletions filebeat/input/journald/pkg/journalfield/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var journaldEventFields = FieldConversion{
"COREDUMP_UNIT": text("journald.coredump.unit"),
"COREDUMP_USER_UNIT": text("journald.coredump.user_unit"),
"MESSAGE": text("message"),
"MESSAGE_ID": text("message_id"),
"MESSAGE_ID": text("log.syslog.msgid"),
"OBJECT_AUDIT_LOGINUID": integer("journald.object.audit.login_uid"),
"OBJECT_AUDIT_SESSION": integer("journald.object.audit.session"),
"OBJECT_CMDLINE": text("journald.object.process.command_line"),
Expand All @@ -38,10 +38,10 @@ var journaldEventFields = FieldConversion{
"OBJECT_SYSTEMD_UNIT": text("journald.object.systemd.unit"),
"OBJECT_SYSTEMD_USER_UNIT": text("journald.object.systemd.user_unit"),
"OBJECT_UID": integer("journald.object.uid"),
"PRIORITY": integer("syslog.priority", "log.syslog.priority"),
"SYSLOG_FACILITY": integer("syslog.facility", "log.syslog.facility.code"),
"SYSLOG_IDENTIFIER": text("syslog.identifier"),
"SYSLOG_PID": integer("syslog.pid"),
"PRIORITY": integer("log.syslog.priority"),
"SYSLOG_FACILITY": integer("log.syslog.facility.code"),
"SYSLOG_IDENTIFIER": text("log.syslog.appname"),
"SYSLOG_PID": integer("log.syslog.procid"),
"UNIT": text("journald.unit"),
"_AUDIT_LOGINUID": integer("journald.audit.login_uid"),
"_AUDIT_SESSION": text("journald.audit.session"),
Expand Down Expand Up @@ -74,14 +74,14 @@ var journaldEventFields = FieldConversion{
"_UID": integer("journald.uid"),

// docker journald fields from: https://docs.docker.com/config/containers/logging/journald/
"CONTAINER_ID": text("container.id_truncated"),
"CONTAINER_ID_FULL": text("container.id"),
"CONTAINER_NAME": text("container.name"),
"CONTAINER_TAG": text("container.log.tag"),
"CONTAINER_PARTIAL_MESSAGE": text("container.partial"),
"IMAGE_NAME": text("container.image.name"),

// dropped fields
"CONTAINER_ID": ignoredField,
"CONTAINER_TAG": ignoredField,
"_SOURCE_MONOTONIC_TIMESTAMP": ignoredField, // saved in the registry
"_SOURCE_REALTIME_TIMESTAMP": ignoredField, // saved in the registry
"__CURSOR": ignoredField, // saved in the registry
Expand Down
Binary file modified filebeat/input/journald/testdata/ndjson-parser.export
Binary file not shown.
Binary file modified filebeat/input/journald/testdata/ndjson-parser.journal.gz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "sshd",
"log.syslog.facility.code": 4,
"log.syslog.priority": 6,
"log.syslog.procid": 26538,
"message": "Accepted publickey for vagrant from 10.0.2.2 port 48274 ssh2: ED25519 SHA256:k1kjhwoH/H3w31MbGOIGd7qxrkSQJnoAN0eYJVHDmmI",
"process.args": [
"\"sshd: vagrant [priv]\""
Expand Down Expand Up @@ -65,8 +67,10 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "sshd",
"log.syslog.facility.code": 4,
"log.syslog.priority": 6,
"log.syslog.procid": 1710,
"message": "Accepted password for vagrant from 192.168.42.119 port 55310 ssh2",
"process.args": [
"\"sshd: vagrant [priv]\""
Expand Down Expand Up @@ -111,8 +115,10 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "sshd",
"log.syslog.facility.code": 4,
"log.syslog.priority": 6,
"log.syslog.procid": 1721,
"message": "Invalid user test from 192.168.42.119 port 48890",
"process.args": [
"\"sshd: unknown [priv]\""
Expand Down Expand Up @@ -155,8 +161,10 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "sshd",
"log.syslog.facility.code": 4,
"log.syslog.priority": 6,
"log.syslog.procid": 1723,
"message": "Failed password for root from 192.168.42.119 port 46632 ssh2",
"process.args": [
"\"sshd: root [priv]\""
Expand Down Expand Up @@ -201,8 +209,10 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "sshd",
"log.syslog.facility.code": 4,
"log.syslog.priority": 6,
"log.syslog.procid": 1723,
"message": "Failed password for root from 192.168.42.119 port 46632 ssh2",
"process.args": [
"\"sshd: root [priv]\""
Expand Down Expand Up @@ -247,8 +257,10 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "sshd",
"log.syslog.facility.code": 4,
"log.syslog.priority": 6,
"log.syslog.procid": 1723,
"message": "Failed password for root from 192.168.42.119 port 46632 ssh2",
"process.args": [
"\"sshd: root [priv]\""
Expand Down Expand Up @@ -285,6 +297,7 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "sudo",
"log.syslog.facility.code": 10,
"log.syslog.priority": 5,
"message": " vagrant : TTY=pts/2 ; PWD=/home/vagrant ; USER=root ; COMMAND=/usr/bin/emacs /etc/ssh/sshd_config",
Expand Down Expand Up @@ -333,8 +346,10 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "groupadd",
"log.syslog.facility.code": 10,
"log.syslog.priority": 6,
"log.syslog.procid": 1743,
"message": "new group: name=test, GID=1001",
"process.args": [
"/sbin/groupadd",
Expand Down Expand Up @@ -363,6 +378,7 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "systemd-logind",
"log.syslog.facility.code": 4,
"log.syslog.priority": 6,
"message": "Session 8 logged out. Waiting for processes to exit.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "systemd",
"log.syslog.facility.code": 3,
"log.syslog.msgid": "9d1aaa27d60140bd96365438aad20286",
"log.syslog.priority": 6,
"message": "Stopped target getty.target - Login Prompts.",
"process.args": [
Expand All @@ -34,6 +36,7 @@
"host.hostname": "vagrant-debian-12",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "kernel",
"log.syslog.facility.code": 0,
"log.syslog.priority": 6,
"message": "Console: switching to colour frame buffer device 160x50",
Expand All @@ -51,6 +54,7 @@
"host.hostname": "bookworm",
"host.id": "5e6dc8fe417f4ea383e2afaa731f5d8a",
"input.type": "journald",
"log.syslog.appname": "kernel",
"log.syslog.facility.code": 0,
"log.syslog.priority": 6,
"message": "thermal_sys: Registered thermal governor 'power_allocator'",
Expand Down
Loading