Skip to content

Commit

Permalink
{,x-pack}/packetbeat: add support for pipeline module uploads (#37291)
Browse files Browse the repository at this point in the history
* packetbeat/module: new package for supporting modules

	[git-generate]
	yoink -pkg ./winlogbeat/module -dir packetbeat -y UploadPipelines,PipelinesFS
	gsed -r -i 's/Winlogbeat/Packetbeat/g' packetbeat/module/*.go
	gsed -r -i 's/winlogbeat/packetbeat/g' packetbeat/module/*.go
	goimports -w -local github.com/elastic ./packetbeat/module
	
	yoink -pkg ./x-pack/winlogbeat/module -dir x-pack/packetbeat -y init
	gsed -r -i 's/winlogbeat/packetbeat/g' x-pack/packetbeat/module/*.go
	mkdir -p x-pack/packetbeat/module/null/ingest
	touch x-pack/packetbeat/module/null/ingest/pin.yml

* packetbeat/module: adjust pipeline names

* packetbeat/beater: add support for pipeline uploads

* x-pack/packetbeat/module: import pipelines from integrations and remove placeholder

	[git-generate]
	data_stream_root=$(go env GOPATH)/src/github.com/elastic/integrations/packages/network_traffic/data_stream
	parallel mkdir -p x-pack/packetbeat/module/{/}/ingest ::: $data_stream_root/*
	parallel 'cp {}/elasticsearch/ingest_pipeline/* x-pack/packetbeat/module/{/}/ingest/' ::: $data_stream_root/*
	rm -rf x-pack/packetbeat/module/null

* x-pack/packetbeat/module: add routing
  • Loading branch information
efd6 authored Jan 31, 2024
1 parent c331659 commit e900baf
Show file tree
Hide file tree
Showing 47 changed files with 3,098 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
*Packetbeat*

- Bump Windows Npcap version to v1.79. {pull}37733[37733]
- Add metrics for TCP flags. {issue}36992[36992] {pull}36975[36975]
- Add support for pipeline loading. {pull}37291[37291]

*Packetbeat*

Expand Down
5 changes: 5 additions & 0 deletions packetbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ packetbeat.interfaces.internal_networks:
# can stay enabled even after beat is shut down.
#packetbeat.interfaces.auto_promisc_mode: true

# By default Ingest pipelines are not updated if a pipeline with the same ID
# already exists. If this option is enabled Packetbeat overwrites pipelines
# every time a new Elasticsearch connection is established.
#packetbeat.overwrite_pipelines: false

{{- template "windows_npcap.yml.tmpl" .}}

{{header "Flows"}}
Expand Down
57 changes: 49 additions & 8 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/service"

"github.com/elastic/beats/v7/packetbeat/config"
"github.com/elastic/beats/v7/packetbeat/module"
"github.com/elastic/beats/v7/packetbeat/protos"

// Add packetbeat default processors
Expand Down Expand Up @@ -80,10 +83,11 @@ func initialConfig() config.Config {

// Beater object. Contains all objects needed to run the beat
type packetbeat struct {
config *conf.C
factory *processorFactory
done chan struct{}
stopOnce sync.Once
config *conf.C
factory *processorFactory
overwritePipelines bool
done chan struct{}
stopOnce sync.Once
}

// New returns a new Packetbeat beat.Beater.
Expand All @@ -98,15 +102,35 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
return nil, err
}

var overwritePipelines bool
if !b.Manager.Enabled() {
// Pipeline overwrite is only enabled on standalone packetbeat
// since pipelines are managed by fleet otherwise.
config, err := configurator(rawConfig)
if err != nil {
return nil, err
}
overwritePipelines = config.OverwritePipelines
b.OverwritePipelinesCallback = func(esConfig *conf.C) error {
esClient, err := eslegclient.NewConnectedClient(esConfig, "Packetbeat")
if err != nil {
return err
}
_, err = module.UploadPipelines(b.Info, esClient, overwritePipelines)
return err
}
}

return &packetbeat{
config: rawConfig,
factory: factory,
done: make(chan struct{}),
config: rawConfig,
factory: factory,
overwritePipelines: overwritePipelines,
done: make(chan struct{}),
}, nil
}

// Run starts the packetbeat network capture, decoding and event publication, sending
// events to b.Publisher. If b is mananaged, packetbeat is registered with the
// events to b.Publisher. If b is managed, packetbeat is registered with the
// reload.Registry and handled by fleet. Otherwise it is run until cancelled or a
// fatal error.
func (pb *packetbeat) Run(b *beat.Beat) error {
Expand Down Expand Up @@ -138,11 +162,28 @@ func (pb *packetbeat) Run(b *beat.Beat) error {
}

if !b.Manager.Enabled() {
if b.Config.Output.Name() == "elasticsearch" {
_, err := elasticsearch.RegisterConnectCallback(func(esClient *eslegclient.Connection) error {
_, err := module.UploadPipelines(b.Info, esClient, pb.overwritePipelines)
return err
})
if err != nil {
return err
}
} else {
logp.L().Warn(pipelinesWarning)
}

return pb.runStatic(b, pb.factory)
}
return pb.runManaged(b, pb.factory)
}

const pipelinesWarning = "Packetbeat is unable to load the ingest pipelines for the configured" +
" modules because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the ingest pipelines or are using Logstash pipelines, you" +
" can ignore this warning."

// runStatic constructs a packetbeat runner and starts it, returning on cancellation
// or the first fatal error.
func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error {
Expand Down
17 changes: 9 additions & 8 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import (
var errFanoutGroupAFPacketOnly = errors.New("fanout_group is only valid with af_packet type")

type Config struct {
Interface *InterfaceConfig `config:"interfaces"`
Interfaces []InterfaceConfig `config:"interfaces"`
Flows *Flows `config:"flows"`
Protocols map[string]*conf.C `config:"protocols"`
ProtocolsList []*conf.C `config:"protocols"`
Procs procs.ProcsConfig `config:"procs"`
IgnoreOutgoing bool `config:"ignore_outgoing"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Interface *InterfaceConfig `config:"interfaces"`
Interfaces []InterfaceConfig `config:"interfaces"`
Flows *Flows `config:"flows"`
Protocols map[string]*conf.C `config:"protocols"`
ProtocolsList []*conf.C `config:"protocols"`
Procs procs.ProcsConfig `config:"procs"`
IgnoreOutgoing bool `config:"ignore_outgoing"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
OverwritePipelines bool `config:"overwrite_pipelines"` // Only used by standalone Packetbeat.
}

// FromStatic initializes a configuration given a config.C
Expand Down
9 changes: 5 additions & 4 deletions packetbeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@ import (
"github.com/elastic/beats/v7/dev-tools/mage/target/build"
packetbeat "github.com/elastic/beats/v7/packetbeat/scripts/mage"

// mage:import
//mage:import
"github.com/elastic/beats/v7/dev-tools/mage/target/common"
// mage:import
//mage:import
"github.com/elastic/beats/v7/dev-tools/mage/target/unittest"
// mage:import
//mage:import
_ "github.com/elastic/beats/v7/dev-tools/mage/target/integtest/notests"
// mage:import
//mage:import
_ "github.com/elastic/beats/v7/dev-tools/mage/target/test"
)

func init() {
common.RegisterCheckDeps(Update)
unittest.RegisterPythonTestDeps(packetbeat.FieldsYML, Dashboards)
packetbeat.SelectLogic = devtools.OSSProject

devtools.BeatDescription = "Packetbeat analyzes network traffic and sends the data to Elasticsearch."
}
Expand Down
188 changes: 188 additions & 0 deletions packetbeat/module/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package module

import (
"embed"
"encoding/json"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"strings"

"github.com/joeshaw/multierror"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/filebeat/fileset"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/elastic-agent-libs/logp"
)

// PipelinesFS is used from the x-pack/packetbeat code to inject modules. The
// OSS version does not have modules.
var PipelinesFS *embed.FS

var errNoFS = errors.New("no embedded file system")

const logName = "pipeline"

type pipeline struct {
id string
contents map[string]interface{}
}

// UploadPipelines reads all pipelines embedded in the Packetbeat executable
// and adapts the pipeline for a given ES version, converts to JSON if
// necessary and creates or updates ingest pipeline in ES. The IDs of pipelines
// uploaded to ES are returned in loaded.
func UploadPipelines(info beat.Info, esClient *eslegclient.Connection, overwritePipelines bool) (loaded []string, err error) {
pipelines, err := readAll(info)
if err != nil {
return nil, err
}
return load(esClient, pipelines, overwritePipelines)
}

// readAll reads pipelines from the the embedded filesystem and
// returns a slice of pipelines suitable for sending to Elasticsearch
// with load.
func readAll(info beat.Info) (pipelines []pipeline, err error) {
p, err := readDir(".", info)
if err == errNoFS { //nolint:errorlint // Bad linter! This is never wrapped.
return nil, nil
}
return p, err
}

func readDir(dir string, info beat.Info) (pipelines []pipeline, err error) {
if PipelinesFS == nil {
return nil, errNoFS
}
dirEntries, err := PipelinesFS.ReadDir(dir)
if err != nil {
return nil, err
}
for _, de := range dirEntries {
if de.IsDir() {
subPipelines, err := readDir(path.Join(dir, de.Name()), info)
if err != nil {
return nil, err
}
pipelines = append(pipelines, subPipelines...)
continue
}
p, err := readFile(path.Join(dir, de.Name()), info)
if err == errNoFS { //nolint:errorlint // Bad linter! This is never wrapped.
continue
}
if err != nil {
return nil, err
}
pipelines = append(pipelines, p)
}
return pipelines, nil
}

func readFile(filename string, info beat.Info) (p pipeline, err error) {
if PipelinesFS == nil {
return pipeline{}, errNoFS
}
contents, err := PipelinesFS.ReadFile(filename)
if err != nil {
return pipeline{}, err
}
updatedContent, err := applyTemplates(info.IndexPrefix, info.Version, filename, contents)
if err != nil {
return pipeline{}, err
}
ds, _, _ := strings.Cut(filename, string(os.PathSeparator))
p = pipeline{
id: fileset.FormatPipelineID(info.IndexPrefix, "", "", ds, info.Version),
contents: updatedContent,
}
return p, nil
}

// load uses esClient to load pipelines to Elasticsearch cluster.
// The IDs of loaded pipelines will be returned in loaded.
// load will only overwrite existing pipelines if overwritePipelines is
// true. An error in loading one of the pipelines will cause the
// successfully loaded ones to be deleted.
func load(esClient *eslegclient.Connection, pipelines []pipeline, overwritePipelines bool) (loaded []string, err error) {
log := logp.NewLogger(logName)

for _, pipeline := range pipelines {
err = fileset.LoadPipeline(esClient, pipeline.id, pipeline.contents, overwritePipelines, log)
if err != nil {
err = fmt.Errorf("error loading pipeline %s: %w", pipeline.id, err)
break
}
loaded = append(loaded, pipeline.id)
}

if err != nil {
errs := multierror.Errors{err}
for _, id := range loaded {
err = fileset.DeletePipeline(esClient, id)
if err != nil {
errs = append(errs, err)
}
}
return nil, errs.Err()
}
return loaded, nil
}

func applyTemplates(prefix string, version string, filename string, original []byte) (converted map[string]interface{}, err error) {
vars := map[string]interface{}{
"builtin": map[string]interface{}{
"prefix": prefix,
"module": "",
"fileset": "",
"beatVersion": version,
},
}

encodedString, err := fileset.ApplyTemplate(vars, string(original), true)
if err != nil {
return nil, fmt.Errorf("failed to apply template: %w", err)
}

var content map[string]interface{}
switch extension := strings.ToLower(filepath.Ext(filename)); extension {
case ".json":
if err = json.Unmarshal([]byte(encodedString), &content); err != nil {
return nil, fmt.Errorf("error JSON decoding the pipeline file: %s: %w", filename, err)
}
case ".yaml", ".yml":
if err = yaml.Unmarshal([]byte(encodedString), &content); err != nil {
return nil, fmt.Errorf("error YAML decoding the pipeline file: %s: %w", filename, err)
}
newContent, err := fileset.FixYAMLMaps(content)
if err != nil {
return nil, fmt.Errorf("failed to sanitize the YAML pipeline file: %s: %w", filename, err)
}
content = newContent.(map[string]interface{})
default:
return nil, fmt.Errorf("unsupported extension '%s' for pipeline file: %s", extension, filename)
}
return content, nil
}
5 changes: 5 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ packetbeat.interfaces.internal_networks:
# can stay enabled even after beat is shut down.
#packetbeat.interfaces.auto_promisc_mode: true

# By default Ingest pipelines are not updated if a pipeline with the same ID
# already exists. If this option is enabled Packetbeat overwrites pipelines
# every time a new Elasticsearch connection is established.
#packetbeat.overwrite_pipelines: false

# =================================== Flows ====================================

packetbeat.flows:
Expand Down
7 changes: 7 additions & 0 deletions packetbeat/scripts/mage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@ func device(goos string) string {
return "default_route"
}

// SelectLogic configures the types of project logic to use (OSS vs X-Pack).
// It is set in the packetbeat and x-pack/packetbeat magefiles.
var SelectLogic devtools.ProjectType

// ConfigFileParams returns the default ConfigFileParams for generating
// packetbeat*.yml files.
func ConfigFileParams() devtools.ConfigFileParams {
p := devtools.DefaultConfigFileParams()
p.Templates = append(p.Templates, devtools.OSSBeatDir("_meta/config/*.tmpl"))
if SelectLogic == devtools.XPackProject {
p.Templates = append(p.Templates, devtools.XPackBeatDir("_meta/config/*.tmpl"))
}
p.ExtraVars = map[string]interface{}{
"device": device,
}
Expand Down
Loading

0 comments on commit e900baf

Please sign in to comment.