diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3f06bacacb6..5ba27260c3b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -205,6 +205,8 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d *Packetbeat* - Bump Windows Npcap version to v1.79. {pull}37733[37733] +- Add metrics for TCP flags. {issue}36992[36992] {pull}36975[36975] +- Add support for pipeline loading. {pull}37291[37291] *Packetbeat* diff --git a/packetbeat/_meta/config/beat.reference.yml.tmpl b/packetbeat/_meta/config/beat.reference.yml.tmpl index 649ec0e8dee..033aa1e5106 100644 --- a/packetbeat/_meta/config/beat.reference.yml.tmpl +++ b/packetbeat/_meta/config/beat.reference.yml.tmpl @@ -78,6 +78,11 @@ packetbeat.interfaces.internal_networks: # can stay enabled even after beat is shut down. #packetbeat.interfaces.auto_promisc_mode: true +# By default Ingest pipelines are not updated if a pipeline with the same ID +# already exists. If this option is enabled Packetbeat overwrites pipelines +# every time a new Elasticsearch connection is established. +#packetbeat.overwrite_pipelines: false + {{- template "windows_npcap.yml.tmpl" .}} {{header "Flows"}} diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 725f3eebc33..d8c223f1789 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -25,13 +25,16 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/service" "github.com/elastic/beats/v7/packetbeat/config" + "github.com/elastic/beats/v7/packetbeat/module" "github.com/elastic/beats/v7/packetbeat/protos" // Add packetbeat default processors @@ -80,10 +83,11 @@ func initialConfig() config.Config { // Beater object. Contains all objects needed to run the beat type packetbeat struct { - config *conf.C - factory *processorFactory - done chan struct{} - stopOnce sync.Once + config *conf.C + factory *processorFactory + overwritePipelines bool + done chan struct{} + stopOnce sync.Once } // New returns a new Packetbeat beat.Beater. @@ -98,15 +102,35 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { return nil, err } + var overwritePipelines bool + if !b.Manager.Enabled() { + // Pipeline overwrite is only enabled on standalone packetbeat + // since pipelines are managed by fleet otherwise. + config, err := configurator(rawConfig) + if err != nil { + return nil, err + } + overwritePipelines = config.OverwritePipelines + b.OverwritePipelinesCallback = func(esConfig *conf.C) error { + esClient, err := eslegclient.NewConnectedClient(esConfig, "Packetbeat") + if err != nil { + return err + } + _, err = module.UploadPipelines(b.Info, esClient, overwritePipelines) + return err + } + } + return &packetbeat{ - config: rawConfig, - factory: factory, - done: make(chan struct{}), + config: rawConfig, + factory: factory, + overwritePipelines: overwritePipelines, + done: make(chan struct{}), }, nil } // Run starts the packetbeat network capture, decoding and event publication, sending -// events to b.Publisher. If b is mananaged, packetbeat is registered with the +// events to b.Publisher. If b is managed, packetbeat is registered with the // reload.Registry and handled by fleet. Otherwise it is run until cancelled or a // fatal error. func (pb *packetbeat) Run(b *beat.Beat) error { @@ -138,11 +162,28 @@ func (pb *packetbeat) Run(b *beat.Beat) error { } if !b.Manager.Enabled() { + if b.Config.Output.Name() == "elasticsearch" { + _, err := elasticsearch.RegisterConnectCallback(func(esClient *eslegclient.Connection) error { + _, err := module.UploadPipelines(b.Info, esClient, pb.overwritePipelines) + return err + }) + if err != nil { + return err + } + } else { + logp.L().Warn(pipelinesWarning) + } + return pb.runStatic(b, pb.factory) } return pb.runManaged(b, pb.factory) } +const pipelinesWarning = "Packetbeat is unable to load the ingest pipelines for the configured" + + " modules because the Elasticsearch output is not configured/enabled. If you have" + + " already loaded the ingest pipelines or are using Logstash pipelines, you" + + " can ignore this warning." + // runStatic constructs a packetbeat runner and starts it, returning on cancellation // or the first fatal error. func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 13d00b89e44..7d579af635b 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -33,14 +33,15 @@ import ( var errFanoutGroupAFPacketOnly = errors.New("fanout_group is only valid with af_packet type") type Config struct { - Interface *InterfaceConfig `config:"interfaces"` - Interfaces []InterfaceConfig `config:"interfaces"` - Flows *Flows `config:"flows"` - Protocols map[string]*conf.C `config:"protocols"` - ProtocolsList []*conf.C `config:"protocols"` - Procs procs.ProcsConfig `config:"procs"` - IgnoreOutgoing bool `config:"ignore_outgoing"` - ShutdownTimeout time.Duration `config:"shutdown_timeout"` + Interface *InterfaceConfig `config:"interfaces"` + Interfaces []InterfaceConfig `config:"interfaces"` + Flows *Flows `config:"flows"` + Protocols map[string]*conf.C `config:"protocols"` + ProtocolsList []*conf.C `config:"protocols"` + Procs procs.ProcsConfig `config:"procs"` + IgnoreOutgoing bool `config:"ignore_outgoing"` + ShutdownTimeout time.Duration `config:"shutdown_timeout"` + OverwritePipelines bool `config:"overwrite_pipelines"` // Only used by standalone Packetbeat. } // FromStatic initializes a configuration given a config.C diff --git a/packetbeat/magefile.go b/packetbeat/magefile.go index 50c8a19310c..00e4f9dd47b 100644 --- a/packetbeat/magefile.go +++ b/packetbeat/magefile.go @@ -29,19 +29,20 @@ import ( "github.com/elastic/beats/v7/dev-tools/mage/target/build" packetbeat "github.com/elastic/beats/v7/packetbeat/scripts/mage" - // mage:import + //mage:import "github.com/elastic/beats/v7/dev-tools/mage/target/common" - // mage:import + //mage:import "github.com/elastic/beats/v7/dev-tools/mage/target/unittest" - // mage:import + //mage:import _ "github.com/elastic/beats/v7/dev-tools/mage/target/integtest/notests" - // mage:import + //mage:import _ "github.com/elastic/beats/v7/dev-tools/mage/target/test" ) func init() { common.RegisterCheckDeps(Update) unittest.RegisterPythonTestDeps(packetbeat.FieldsYML, Dashboards) + packetbeat.SelectLogic = devtools.OSSProject devtools.BeatDescription = "Packetbeat analyzes network traffic and sends the data to Elasticsearch." } diff --git a/packetbeat/module/pipeline.go b/packetbeat/module/pipeline.go new file mode 100644 index 00000000000..9e6d2384938 --- /dev/null +++ b/packetbeat/module/pipeline.go @@ -0,0 +1,188 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package module + +import ( + "embed" + "encoding/json" + "errors" + "fmt" + "os" + "path" + "path/filepath" + "strings" + + "github.com/joeshaw/multierror" + "gopkg.in/yaml.v2" + + "github.com/elastic/beats/v7/filebeat/fileset" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/elastic-agent-libs/logp" +) + +// PipelinesFS is used from the x-pack/packetbeat code to inject modules. The +// OSS version does not have modules. +var PipelinesFS *embed.FS + +var errNoFS = errors.New("no embedded file system") + +const logName = "pipeline" + +type pipeline struct { + id string + contents map[string]interface{} +} + +// UploadPipelines reads all pipelines embedded in the Packetbeat executable +// and adapts the pipeline for a given ES version, converts to JSON if +// necessary and creates or updates ingest pipeline in ES. The IDs of pipelines +// uploaded to ES are returned in loaded. +func UploadPipelines(info beat.Info, esClient *eslegclient.Connection, overwritePipelines bool) (loaded []string, err error) { + pipelines, err := readAll(info) + if err != nil { + return nil, err + } + return load(esClient, pipelines, overwritePipelines) +} + +// readAll reads pipelines from the the embedded filesystem and +// returns a slice of pipelines suitable for sending to Elasticsearch +// with load. +func readAll(info beat.Info) (pipelines []pipeline, err error) { + p, err := readDir(".", info) + if err == errNoFS { //nolint:errorlint // Bad linter! This is never wrapped. + return nil, nil + } + return p, err +} + +func readDir(dir string, info beat.Info) (pipelines []pipeline, err error) { + if PipelinesFS == nil { + return nil, errNoFS + } + dirEntries, err := PipelinesFS.ReadDir(dir) + if err != nil { + return nil, err + } + for _, de := range dirEntries { + if de.IsDir() { + subPipelines, err := readDir(path.Join(dir, de.Name()), info) + if err != nil { + return nil, err + } + pipelines = append(pipelines, subPipelines...) + continue + } + p, err := readFile(path.Join(dir, de.Name()), info) + if err == errNoFS { //nolint:errorlint // Bad linter! This is never wrapped. + continue + } + if err != nil { + return nil, err + } + pipelines = append(pipelines, p) + } + return pipelines, nil +} + +func readFile(filename string, info beat.Info) (p pipeline, err error) { + if PipelinesFS == nil { + return pipeline{}, errNoFS + } + contents, err := PipelinesFS.ReadFile(filename) + if err != nil { + return pipeline{}, err + } + updatedContent, err := applyTemplates(info.IndexPrefix, info.Version, filename, contents) + if err != nil { + return pipeline{}, err + } + ds, _, _ := strings.Cut(filename, string(os.PathSeparator)) + p = pipeline{ + id: fileset.FormatPipelineID(info.IndexPrefix, "", "", ds, info.Version), + contents: updatedContent, + } + return p, nil +} + +// load uses esClient to load pipelines to Elasticsearch cluster. +// The IDs of loaded pipelines will be returned in loaded. +// load will only overwrite existing pipelines if overwritePipelines is +// true. An error in loading one of the pipelines will cause the +// successfully loaded ones to be deleted. +func load(esClient *eslegclient.Connection, pipelines []pipeline, overwritePipelines bool) (loaded []string, err error) { + log := logp.NewLogger(logName) + + for _, pipeline := range pipelines { + err = fileset.LoadPipeline(esClient, pipeline.id, pipeline.contents, overwritePipelines, log) + if err != nil { + err = fmt.Errorf("error loading pipeline %s: %w", pipeline.id, err) + break + } + loaded = append(loaded, pipeline.id) + } + + if err != nil { + errs := multierror.Errors{err} + for _, id := range loaded { + err = fileset.DeletePipeline(esClient, id) + if err != nil { + errs = append(errs, err) + } + } + return nil, errs.Err() + } + return loaded, nil +} + +func applyTemplates(prefix string, version string, filename string, original []byte) (converted map[string]interface{}, err error) { + vars := map[string]interface{}{ + "builtin": map[string]interface{}{ + "prefix": prefix, + "module": "", + "fileset": "", + "beatVersion": version, + }, + } + + encodedString, err := fileset.ApplyTemplate(vars, string(original), true) + if err != nil { + return nil, fmt.Errorf("failed to apply template: %w", err) + } + + var content map[string]interface{} + switch extension := strings.ToLower(filepath.Ext(filename)); extension { + case ".json": + if err = json.Unmarshal([]byte(encodedString), &content); err != nil { + return nil, fmt.Errorf("error JSON decoding the pipeline file: %s: %w", filename, err) + } + case ".yaml", ".yml": + if err = yaml.Unmarshal([]byte(encodedString), &content); err != nil { + return nil, fmt.Errorf("error YAML decoding the pipeline file: %s: %w", filename, err) + } + newContent, err := fileset.FixYAMLMaps(content) + if err != nil { + return nil, fmt.Errorf("failed to sanitize the YAML pipeline file: %s: %w", filename, err) + } + content = newContent.(map[string]interface{}) + default: + return nil, fmt.Errorf("unsupported extension '%s' for pipeline file: %s", extension, filename) + } + return content, nil +} diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 1e013fb081f..c9dac77048a 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -78,6 +78,11 @@ packetbeat.interfaces.internal_networks: # can stay enabled even after beat is shut down. #packetbeat.interfaces.auto_promisc_mode: true +# By default Ingest pipelines are not updated if a pipeline with the same ID +# already exists. If this option is enabled Packetbeat overwrites pipelines +# every time a new Elasticsearch connection is established. +#packetbeat.overwrite_pipelines: false + # =================================== Flows ==================================== packetbeat.flows: diff --git a/packetbeat/scripts/mage/config.go b/packetbeat/scripts/mage/config.go index 5213f4f1f87..f41b50ffff7 100644 --- a/packetbeat/scripts/mage/config.go +++ b/packetbeat/scripts/mage/config.go @@ -30,11 +30,18 @@ func device(goos string) string { return "default_route" } +// SelectLogic configures the types of project logic to use (OSS vs X-Pack). +// It is set in the packetbeat and x-pack/packetbeat magefiles. +var SelectLogic devtools.ProjectType + // ConfigFileParams returns the default ConfigFileParams for generating // packetbeat*.yml files. func ConfigFileParams() devtools.ConfigFileParams { p := devtools.DefaultConfigFileParams() p.Templates = append(p.Templates, devtools.OSSBeatDir("_meta/config/*.tmpl")) + if SelectLogic == devtools.XPackProject { + p.Templates = append(p.Templates, devtools.XPackBeatDir("_meta/config/*.tmpl")) + } p.ExtraVars = map[string]interface{}{ "device": device, } diff --git a/x-pack/packetbeat/_meta/config/output-elasticsearch.yml.tmpl b/x-pack/packetbeat/_meta/config/output-elasticsearch.yml.tmpl new file mode 100644 index 00000000000..ffb3bc696fc --- /dev/null +++ b/x-pack/packetbeat/_meta/config/output-elasticsearch.yml.tmpl @@ -0,0 +1,15 @@ +{{subheader "Elasticsearch Output"}} +output.elasticsearch: + # Array of hosts to connect to. + hosts: ["localhost:9200"] + + # Protocol - either `http` (default) or `https`. + #protocol: "https" + + # Authentication credentials - either API key or username/password. + #api_key: "id:api_key" + #username: "elastic" + #password: "changeme" + + # Pipeline to route events to protocol pipelines. + pipeline: "packetbeat-%{[agent.version]}-routing" diff --git a/x-pack/packetbeat/cmd/root.go b/x-pack/packetbeat/cmd/root.go index f77bd827bf2..8611fe8d115 100644 --- a/x-pack/packetbeat/cmd/root.go +++ b/x-pack/packetbeat/cmd/root.go @@ -21,6 +21,9 @@ import ( // This registers the Npcap installer on Windows. _ "github.com/elastic/beats/v7/x-pack/packetbeat/npcap" + + // Enable pipelines. + _ "github.com/elastic/beats/v7/x-pack/packetbeat/module" ) // Name of this beat. diff --git a/x-pack/packetbeat/magefile.go b/x-pack/packetbeat/magefile.go index acef21538d6..03104ab9157 100644 --- a/x-pack/packetbeat/magefile.go +++ b/x-pack/packetbeat/magefile.go @@ -47,6 +47,7 @@ func init() { devtools.BeatDescription = "Packetbeat analyzes network traffic and sends the data to Elasticsearch." devtools.BeatLicense = "Elastic License" + packetbeat.SelectLogic = devtools.XPackProject } // Update updates the generated files. diff --git a/x-pack/packetbeat/module/amqp/ingest/default.yml b/x-pack/packetbeat/module/amqp/ingest/default.yml new file mode 100644 index 00000000000..7b2268f4812 --- /dev/null +++ b/x-pack/packetbeat/module/amqp/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing amqp traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + tag: gsubmac + ignore_missing: true +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + tag: gsubmac + ignore_missing: true +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreachip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipelineprocessor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/amqp/ingest/geoip.yml b/x-pack/packetbeat/module/amqp/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/amqp/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/cassandra/ingest/default.yml b/x-pack/packetbeat/module/cassandra/ingest/default.yml new file mode 100644 index 00000000000..61ce5ff4d73 --- /dev/null +++ b/x-pack/packetbeat/module/cassandra/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing cassandra traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsubmac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsubmac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreachip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipelineprocessor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/cassandra/ingest/geoip.yml b/x-pack/packetbeat/module/cassandra/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/cassandra/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/dhcpv4/ingest/default.yml b/x-pack/packetbeat/module/dhcpv4/ingest/default.yml new file mode 100644 index 00000000000..1c3a2a57264 --- /dev/null +++ b/x-pack/packetbeat/module/dhcpv4/ingest/default.yml @@ -0,0 +1,74 @@ +--- +description: Pipeline for processing dhcpv4 traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: dhcpv4.client_mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_dhcpv4_client_mac +- gsub: + field: dhcpv4.client_mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_dhcpv4_client_mac +- uppercase: + field: dhcpv4.client_mac + ignore_missing: true +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/dhcpv4/ingest/geoip.yml b/x-pack/packetbeat/module/dhcpv4/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/dhcpv4/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/dns/ingest/default.yml b/x-pack/packetbeat/module/dns/ingest/default.yml new file mode 100644 index 00000000000..ff055c3c9b3 --- /dev/null +++ b/x-pack/packetbeat/module/dns/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing dhcpv4 traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/dns/ingest/geoip.yml b/x-pack/packetbeat/module/dns/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/dns/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/flow/ingest/default.yml b/x-pack/packetbeat/module/flow/ingest/default.yml new file mode 100644 index 00000000000..6e969ea1a61 --- /dev/null +++ b/x-pack/packetbeat/module/flow/ingest/default.yml @@ -0,0 +1,89 @@ +--- +description: Pipeline for processing traffic flows +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set {host,source,destination}.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + tag: foreach_observer_ip + field: observer.ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host +- gsub: + field: source.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_source_mac +- gsub: + field: source.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_source_mac +- uppercase: + field: source.mac + ignore_missing: true +- gsub: + field: destination.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_destination_mac +- gsub: + field: destination.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_destination_mac +- uppercase: + field: destination.mac + ignore_missing: true + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/flow/ingest/geoip.yml b/x-pack/packetbeat/module/flow/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/flow/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/http/ingest/default.yml b/x-pack/packetbeat/module/http/ingest/default.yml new file mode 100644 index 00000000000..e066200becb --- /dev/null +++ b/x-pack/packetbeat/module/http/ingest/default.yml @@ -0,0 +1,72 @@ +--- +description: Pipeline for processing http traffic +processors: +- set: + field: ecs.version + value: '8.11.0' + +# Detection Rules compatibility +- set: + tag: set_compatibility_request_authorization + field: network_traffic.http.request.headers.authorization + copy_from: http.request.headers.authorization + ignore_empty_value: true +- set: + tag: set_compatibility_response_type + field: http.response.mime_type + copy_from: http.response.headers.content-type + ignore_empty_value: true + +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + tag: foreach_observer_ip + field: observer.ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/http/ingest/geoip.yml b/x-pack/packetbeat/module/http/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/http/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/icmp/ingest/default.yml b/x-pack/packetbeat/module/icmp/ingest/default.yml new file mode 100644 index 00000000000..7a50bb91cc5 --- /dev/null +++ b/x-pack/packetbeat/module/icmp/ingest/default.yml @@ -0,0 +1,66 @@ +--- +description: Pipeline for processing icmp traffic +processors: +- set: + field: ecs.version + value: '8.11.0' + +# Detection Rules compatibility +- set: + tag: set_compatibility_type + field: network.protocol + copy_from: type + +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + tag: foreach_observer_ip + field: observer.ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/icmp/ingest/geoip.yml b/x-pack/packetbeat/module/icmp/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/icmp/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/memcached/ingest/default.yml b/x-pack/packetbeat/module/memcached/ingest/default.yml new file mode 100644 index 00000000000..d0f5f18088c --- /dev/null +++ b/x-pack/packetbeat/module/memcached/ingest/default.yml @@ -0,0 +1,79 @@ +--- +description: Pipeline for processing memcached traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + tag: foreach_observer_ip + field: observer.ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +## +# Reformat memcache stats response data as a single object +## +- rename: + field: memcache.response.stats + target_field: memcache.response.stats_objects + ignore_missing: true +- foreach: + description: Build an object for memcache stats response data + if: ctx.memcache?.response?.stats_objects instanceof List + tag: foreach_memcache_response_stats_objects + field: memcache.response.stats_objects + processor: + set: + field: "memcache.response.stats.{{{_ingest._value.name}}}" + value: "{{{_ingest._value.value}}}" +- remove: + field: memcache.response.stats_objects + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/memcached/ingest/geoip.yml b/x-pack/packetbeat/module/memcached/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/memcached/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/mongodb/ingest/default.yml b/x-pack/packetbeat/module/mongodb/ingest/default.yml new file mode 100644 index 00000000000..a40e27da35d --- /dev/null +++ b/x-pack/packetbeat/module/mongodb/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing mongodb traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/mongodb/ingest/geoip.yml b/x-pack/packetbeat/module/mongodb/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/mongodb/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/mysql/ingest/default.yml b/x-pack/packetbeat/module/mysql/ingest/default.yml new file mode 100644 index 00000000000..e9cb2ebcdb0 --- /dev/null +++ b/x-pack/packetbeat/module/mysql/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing mysql traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/mysql/ingest/geoip.yml b/x-pack/packetbeat/module/mysql/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/mysql/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/nfs/ingest/default.yml b/x-pack/packetbeat/module/nfs/ingest/default.yml new file mode 100644 index 00000000000..a1b72a25217 --- /dev/null +++ b/x-pack/packetbeat/module/nfs/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing nfs traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/nfs/ingest/geoip.yml b/x-pack/packetbeat/module/nfs/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/nfs/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/pgsql/ingest/default.yml b/x-pack/packetbeat/module/pgsql/ingest/default.yml new file mode 100644 index 00000000000..bd28f9211e1 --- /dev/null +++ b/x-pack/packetbeat/module/pgsql/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing pgsql traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/pgsql/ingest/geoip.yml b/x-pack/packetbeat/module/pgsql/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/pgsql/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/pipeline.go b/x-pack/packetbeat/module/pipeline.go new file mode 100644 index 00000000000..a325fba7de4 --- /dev/null +++ b/x-pack/packetbeat/module/pipeline.go @@ -0,0 +1,20 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package module + +import ( + "embed" + + "github.com/elastic/beats/v7/packetbeat/module" +) + +// pipelineFS holds the yml representation of the ingest node pipelines +// +//go:embed */ingest/*.yml +var pipelinesFS embed.FS + +func init() { + module.PipelinesFS = &pipelinesFS +} diff --git a/x-pack/packetbeat/module/redis/ingest/default.yml b/x-pack/packetbeat/module/redis/ingest/default.yml new file mode 100644 index 00000000000..4f815adc3a9 --- /dev/null +++ b/x-pack/packetbeat/module/redis/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing redis traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/redis/ingest/geoip.yml b/x-pack/packetbeat/module/redis/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/redis/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/routing/ingest/default.yml b/x-pack/packetbeat/module/routing/ingest/default.yml new file mode 100644 index 00000000000..a11b5e79f7a --- /dev/null +++ b/x-pack/packetbeat/module/routing/ingest/default.yml @@ -0,0 +1,64 @@ +--- +description: Route to appropriate data source pipenline. +processors: + - set: + field: event.ingested + value: '{{_ingest.timestamp}}' + + - pipeline: + if: ctx.type == "amqp" + name: '{< IngestPipeline "amqp" >}' + - pipeline: + if: ctx.type == "cassandra" + name: '{< IngestPipeline "cassandra" >}' + - pipeline: + if: ctx.type == "dhcpv4" + name: '{< IngestPipeline "dhcpv4" >}' + - pipeline: + if: ctx.type == "dns" + name: '{< IngestPipeline "dns" >}' + - pipeline: + if: ctx.type == "flow" + name: '{< IngestPipeline "flow" >}' + - pipeline: + if: ctx.type == "http" + name: '{< IngestPipeline "http" >}' + - pipeline: + if: ctx.type == "icmp" + name: '{< IngestPipeline "icmp" >}' + - pipeline: + if: ctx.type == "memcache" + name: '{< IngestPipeline "memcached" >}' + - pipeline: + if: ctx.type == "mongodb" + name: '{< IngestPipeline "mongodb" >}' + - pipeline: + if: ctx.type == "mysql" + name: '{< IngestPipeline "mysql" >}' + - pipeline: + if: ctx.type == "nfs" + name: '{< IngestPipeline "nfs" >}' + - pipeline: + if: ctx.type == "pgsql" + name: '{< IngestPipeline "pgsql" >}' + - pipeline: + if: ctx.type == "redis" + name: '{< IngestPipeline "redis" >}' + - pipeline: + if: ctx.type == "sip" + name: '{< IngestPipeline "sip" >}' + - pipeline: + if: ctx.type == "thrift" + name: '{< IngestPipeline "thrift" >}' + - pipeline: + if: ctx.type == "tls" + name: '{< IngestPipeline "tls" >}' + +on_failure: + - set: + field: event.kind + value: pipeline_error + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" diff --git a/x-pack/packetbeat/module/sip/ingest/default.yml b/x-pack/packetbeat/module/sip/ingest/default.yml new file mode 100644 index 00000000000..62f3d6c1c42 --- /dev/null +++ b/x-pack/packetbeat/module/sip/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing sip traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/sip/ingest/geoip.yml b/x-pack/packetbeat/module/sip/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/sip/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/thrift/ingest/default.yml b/x-pack/packetbeat/module/thrift/ingest/default.yml new file mode 100644 index 00000000000..f2726cea96b --- /dev/null +++ b/x-pack/packetbeat/module/thrift/ingest/default.yml @@ -0,0 +1,59 @@ +--- +description: Pipeline for processing thrift traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/thrift/ingest/geoip.yml b/x-pack/packetbeat/module/thrift/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/thrift/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/tls/ingest/default.yml b/x-pack/packetbeat/module/tls/ingest/default.yml new file mode 100644 index 00000000000..94ef3b55d22 --- /dev/null +++ b/x-pack/packetbeat/module/tls/ingest/default.yml @@ -0,0 +1,99 @@ +--- +description: Pipeline for processing tls traffic +processors: +- set: + field: ecs.version + value: '8.11.0' +## +# Set host.mac to dash separated upper case value +# as per ECS recommendation +## +- gsub: + field: host.mac + pattern: '[-:.]' + replacement: '' + ignore_missing: true + tag: gsub_host_mac +- gsub: + field: host.mac + pattern: '(..)(?!$)' + replacement: '$1-' + ignore_missing: true + tag: gsub_host_mac +- uppercase: + field: host.mac + ignore_missing: true +- append: + field: related.hosts + value: "{{{observer.hostname}}}" + if: ctx.observer?.hostname != null && ctx.observer?.hostname != '' + allow_duplicates: false +- foreach: + if: ctx.observer?.ip != null && ctx.observer.ip instanceof List + field: observer.ip + tag: foreach_observer_ip + processor: + append: + field: related.ip + value: '{{{_ingest._value}}}' + allow_duplicates: false +- remove: + if: ctx.host != null && ctx.tags != null && ctx.tags.contains('forwarded') + field: host + +- pipeline: + if: ctx._conf?.geoip_enrich != null && ctx._conf.geoip_enrich + name: '{{ IngestPipeline "geoip" }}' + tag: pipeline_processor +- remove: + field: _conf + ignore_missing: true + +## +# Make tls.{client,server}.x509.version_number a string as per ECS. +## +- convert: + field: tls.client.x509.version_number + type: string + ignore_missing: true + tag: convert_tls_client_x509_version_number +- convert: + field: tls.server.x509.version_number + type: string + ignore_missing: true + tag: convert_tls_server_x509_version_number + +## +# This handles legacy TLS fields from Packetbeat 7.17. +## +- remove: + description: Remove legacy fields from Packetbeat 7.17 that are duplicated. + field: + - tls.client.x509.issuer.province # Duplicated as tls.client.x509.issuer.state_or_province. + - tls.client.x509.subject.province # Duplicated as tls.client.x509.subject.state_or_province. + - tls.client.x509.version # Duplicated as tls.client.x509.version_number. + - tls.detailed.client_certificate # Duplicated as tls.client.x509. + - tls.detailed.server_certificate # Duplicated as tls.server.x509. + - tls.server.x509.issuer.province # Duplicated as tls.server.x509.issuer.state_or_province. + - tls.server.x509.subject.province # Duplicated as tls.server.x509.subject.state_or_province. + - tls.server.x509.version # Duplicated as tls.server.x509.version_number. + ignore_missing: true + +- append: + field: related.hash + value: "{{tls.server.ja3s}}" + if: "ctx?.tls?.server?.ja3s != null" +- append: + field: related.hash + value: "{{tls.client.ja3}}" + if: "ctx?.tls?.client?.ja3 != null" + allow_duplicates: false + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/module/tls/ingest/geoip.yml b/x-pack/packetbeat/module/tls/ingest/geoip.yml new file mode 100644 index 00000000000..eb88d38caf0 --- /dev/null +++ b/x-pack/packetbeat/module/tls/ingest/geoip.yml @@ -0,0 +1,103 @@ +--- +description: GeoIP enrichment. +processors: + - geoip: + field: source.ip + target_field: source.geo + ignore_missing: true + tag: source_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: source.ip + target_field: source.as + properties: + - asn + - organization_name + ignore_missing: true + tag: source_geo + - rename: + field: source.as.asn + target_field: source.as.number + ignore_missing: true + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true + + - geoip: + field: destination.ip + target_field: destination.geo + ignore_missing: true + tag: destination_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: destination.ip + target_field: destination.as + properties: + - asn + - organization_name + ignore_missing: true + tag: destination_geo + - rename: + field: destination.as.asn + target_field: destination.as.number + ignore_missing: true + - rename: + field: destination.as.organization_name + target_field: destination.as.organization.name + ignore_missing: true + + - geoip: + field: server.ip + target_field: server.geo + ignore_missing: true + tag: server_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: server.ip + target_field: server.as + properties: + - asn + - organization_name + ignore_missing: true + tag: server_geo + - rename: + field: server.as.asn + target_field: server.as.number + ignore_missing: true + - rename: + field: server.as.organization_name + target_field: server.as.organization.name + ignore_missing: true + + - geoip: + field: client.ip + target_field: client.geo + ignore_missing: true + tag: client_geo + - geoip: + database_file: GeoLite2-ASN.mmdb + field: client.ip + target_field: client.as + properties: + - asn + - organization_name + ignore_missing: true + tag: client_geo + - rename: + field: client.as.asn + target_field: client.as.number + ignore_missing: true + - rename: + field: client.as.organization_name + target_field: client.as.organization.name + ignore_missing: true + +on_failure: + - append: + field: error.message + value: |- + Processor "{{ _ingest.on_failure_processor_type }}" with tag "{{ _ingest.on_failure_processor_tag }}" in pipeline "{{ _ingest.on_failure_pipeline }}" failed with message "{{ _ingest.on_failure_message }}" + - set: + field: event.kind + value: pipeline_error diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index 1e013fb081f..c9dac77048a 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -78,6 +78,11 @@ packetbeat.interfaces.internal_networks: # can stay enabled even after beat is shut down. #packetbeat.interfaces.auto_promisc_mode: true +# By default Ingest pipelines are not updated if a pipeline with the same ID +# already exists. If this option is enabled Packetbeat overwrites pipelines +# every time a new Elasticsearch connection is established. +#packetbeat.overwrite_pipelines: false + # =================================== Flows ==================================== packetbeat.flows: diff --git a/x-pack/packetbeat/packetbeat.yml b/x-pack/packetbeat/packetbeat.yml index fea1a2fb115..d78fb6a7ccd 100644 --- a/x-pack/packetbeat/packetbeat.yml +++ b/x-pack/packetbeat/packetbeat.yml @@ -213,10 +213,6 @@ output.elasticsearch: # Array of hosts to connect to. hosts: ["localhost:9200"] - # Performance preset - one of "balanced", "throughput", "scale", - # "latency", or "custom". - preset: balanced - # Protocol - either `http` (default) or `https`. #protocol: "https" @@ -225,6 +221,9 @@ output.elasticsearch: #username: "elastic" #password: "changeme" + # Pipeline to route events to protocol pipelines. + pipeline: "packetbeat-%{[agent.version]}-routing" + # ------------------------------ Logstash Output ------------------------------- #output.logstash: # The Logstash hosts