Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

packetbeat/{beater,protos,sniffer}: allow assignment of protocols to interfaces #36852

Merged
merged 1 commit into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686] {pull}36696[36696]
- Elide retryable HTTP client construction in Filebeat HTTPJSON and CEL inputs if not needed. {pull}36916[36916]
- Allow assignment of packetbeat protocols to interfaces. {issue}36574[36564] {pull}[]

==== Deprecated

Expand Down
28 changes: 15 additions & 13 deletions packetbeat/beater/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
return nil, err
}

watcher := &procs.ProcessesWatcher{}
var watch procs.ProcessesWatcher
// Enable the process watcher only if capturing live traffic
if config.Interfaces[0].File == "" {
err = watcher.Init(config.Procs)
err = watch.Init(config.Procs)
if err != nil {
logp.Critical(err.Error())
return nil, err
Expand All @@ -165,17 +165,11 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
logp.Info("Process watcher disabled when file input is used")
}

logp.Debug("main", "Initializing protocol plugins")
protocols := protos.NewProtocols()
err = protocols.Init(false, publisher, watcher, config.Protocols, config.ProtocolsList)
if err != nil {
return nil, fmt.Errorf("failed to initialize protocol analyzers: %w", err)
}
flows, err := setupFlows(pipeline, watcher, config)
flows, err := setupFlows(pipeline, &watch, config)
if err != nil {
return nil, err
}
sniffer, err := setupSniffer(id, config, protocols, sniffer.DecodersFor(id, publisher, protocols, watcher, flows, config))
sniffer, err := setupSniffer(id, config, publisher, &watch, flows)
if err != nil {
return nil, err
}
Expand All @@ -185,7 +179,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)

// setupFlows returns a *flows.Flows that will publish to the provided pipeline,
// configured with cfg and process enrichment via the provided watcher.
func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) {
func setupFlows(pipeline beat.Pipeline, watch *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) {
if !cfg.Flows.IsEnabled() {
return nil, nil
}
Expand All @@ -211,10 +205,10 @@ func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg con
return nil, err
}

return flows.NewFlows(client.PublishAll, watcher, cfg.Flows)
return flows.NewFlows(client.PublishAll, watch, cfg.Flows)
}

func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruct, decoders sniffer.Decoders) (*sniffer.Sniffer, error) {
func setupSniffer(id string, cfg config.Config, pub *publish.TransactionPublisher, watch *procs.ProcessesWatcher, flows *flows.Flows) (*sniffer.Sniffer, error) {
icmp, err := cfg.ICMP()
if err != nil {
return nil, err
Expand All @@ -237,7 +231,15 @@ func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruc
interfaces = append(interfaces, iface)
}

logp.Debug("main", "Initializing protocol plugins")
decoders := make(map[string]sniffer.Decoders)
for i, iface := range interfaces {
protocols := protos.NewProtocols()
err = protocols.InitFiltered(false, iface.Device, pub, watch, cfg.Protocols, cfg.ProtocolsList)
if err != nil {
return nil, fmt.Errorf("failed to initialize protocol analyzers for %s: %w", iface.Device, err)
}
decoders[iface.Device] = sniffer.DecodersFor(id, pub, protocols, watch, flows, cfg)
if iface.BpfFilter != "" || cfg.Flows.IsEnabled() {
continue
}
Expand Down
103 changes: 56 additions & 47 deletions packetbeat/protos/protos.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,50 +110,45 @@ type reporterFactory interface {
CreateReporter(*conf.C) (func(beat.Event), error)
}

func (s ProtocolsStruct) Init(
testMode bool,
pub reporterFactory,
watcher *procs.ProcessesWatcher,
configs map[string]*conf.C,
listConfigs []*conf.C,
) error {
if len(configs) > 0 {
func (s ProtocolsStruct) Init(test bool, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error {
return s.InitFiltered(test, "", pub, watch, cfgs, list)
}

func (s ProtocolsStruct) InitFiltered(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error {
if len(cfgs) != 0 {
cfgwarn.Deprecate("7.0.0", "dictionary style protocols configuration has been deprecated. Please use list-style protocols configuration.")
}

for proto := range protocolSyms {
logp.Debug("protos", "registered protocol plugin: %v", proto)
}

for name, config := range configs {
if err := s.configureProtocol(testMode, pub, watcher, name, config); err != nil {
for name, cfg := range cfgs {
err := s.configureProtocol(test, device, pub, watch, name, cfg)
if err != nil {
return err
}
}

for _, config := range listConfigs {
for _, cfg := range list {
module := struct {
Name string `config:"type" validate:"required"`
}{}
if err := config.Unpack(&module); err != nil {
err := cfg.Unpack(&module)
if err != nil {
return err
}

if err := s.configureProtocol(testMode, pub, watcher, module.Name, config); err != nil {
err = s.configureProtocol(test, device, pub, watch, module.Name, cfg)
if err != nil {
return err
}
}

return nil
}

func (s ProtocolsStruct) configureProtocol(
testMode bool,
pub reporterFactory,
watcher *procs.ProcessesWatcher,
name string,
config *conf.C,
) error {
func (s ProtocolsStruct) configureProtocol(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, name string, config *conf.C) error {
// XXX: icmp is special, ignore here :/
if name == "icmp" {
return nil
Expand All @@ -176,17 +171,31 @@ func (s ProtocolsStruct) configureProtocol(
return nil
}

if device != "" {
// This could happen earlier, but let any errors be found first.
var protocol struct {
Device string `config:"interface"`
}
err := config.Unpack(&protocol)
if err != nil {
return err
}
if protocol.Device != "" && protocol.Device != device {
return nil
}
}

var client beat.Client
results := func(beat.Event) {}
if !testMode {
if !test {
var err error
results, err = pub.CreateReporter(config)
if err != nil {
return err
}
}

inst, err := plugin(testMode, results, watcher, config)
inst, err := plugin(test, results, watch, config)
if err != nil {
logp.Err("Failed to register protocol plugin: %v", err)
return err
Expand All @@ -196,6 +205,30 @@ func (s ProtocolsStruct) configureProtocol(
return nil
}

func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) {
if _, exists := s.all[proto]; exists {
logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String())
}

s.all[proto] = protocolInstance{
client: client,
plugin: plugin,
}

success := false
if tcp, ok := plugin.(TCPPlugin); ok {
s.tcp[proto] = tcp
success = true
}
if udp, ok := plugin.(UDPPlugin); ok {
s.udp[proto] = udp
success = true
}
if !success {
logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts())
}
}

func (s ProtocolsStruct) GetTCP(proto Protocol) TCPPlugin {
plugin, exists := s.tcp[proto]
if !exists {
Expand Down Expand Up @@ -228,7 +261,7 @@ func (s ProtocolsStruct) GetAllUDP() map[Protocol]UDPPlugin {
// and unencapsulated packets
func (s ProtocolsStruct) BpfFilter(withVlans bool, withICMP bool) string {
// Sort the protocol IDs so that the return value is consistent.
var protos []int
protos := make([]int, 0, len(s.all))
for proto := range s.all {
protos = append(protos, int(proto))
}
Expand Down Expand Up @@ -272,27 +305,3 @@ func (s ProtocolsStruct) BpfFilter(withVlans bool, withICMP bool) string {
}
return filter
}

func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) {
if _, exists := s.all[proto]; exists {
logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String())
}

s.all[proto] = protocolInstance{
client: client,
plugin: plugin,
}

success := false
if tcp, ok := plugin.(TCPPlugin); ok {
s.tcp[proto] = tcp
success = true
}
if udp, ok := plugin.(UDPPlugin); ok {
s.udp[proto] = udp
success = true
}
if !success {
logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts())
}
}
9 changes: 7 additions & 2 deletions packetbeat/sniffer/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,24 @@ const (
// only, but no device is opened yet. Accessing and configuring the actual device
// is done by the Run method. The id parameter is used to specify the metric
// collection ID for AF_PACKET sniffers on Linux.
func New(id string, testMode bool, _ string, decoders Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) {
func New(id string, testMode bool, _ string, decoders map[string]Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) {
s := &Sniffer{
sniffers: make([]sniffer, len(interfaces)),
log: logp.NewLogger("sniffer"),
}

for i, iface := range interfaces {
dec, ok := decoders[iface.Device]
if !ok {
// This should never happen.
return nil, fmt.Errorf("no decoder for %s", iface.Device)
}
child := sniffer{
state: atomic.MakeInt32(snifferInactive),
followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"),
id: id,
idx: i,
decoders: decoders,
decoders: dec,
log: s.log,
}

Expand Down