diff --git a/packetbeat/beater/processor.go b/packetbeat/beater/processor.go index 4099099b7ea1..135f0c18ac7c 100644 --- a/packetbeat/beater/processor.go +++ b/packetbeat/beater/processor.go @@ -153,10 +153,10 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C) return nil, err } - watcher := &procs.ProcessesWatcher{} + var watch procs.ProcessesWatcher // Enable the process watcher only if capturing live traffic if config.Interfaces[0].File == "" { - err = watcher.Init(config.Procs) + err = watch.Init(config.Procs) if err != nil { logp.Critical(err.Error()) return nil, err @@ -165,17 +165,11 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C) logp.Info("Process watcher disabled when file input is used") } - logp.Debug("main", "Initializing protocol plugins") - protocols := protos.NewProtocols() - err = protocols.Init(false, publisher, watcher, config.Protocols, config.ProtocolsList) - if err != nil { - return nil, fmt.Errorf("failed to initialize protocol analyzers: %w", err) - } - flows, err := setupFlows(pipeline, watcher, config) + flows, err := setupFlows(pipeline, &watch, config) if err != nil { return nil, err } - sniffer, err := setupSniffer(id, config, protocols, sniffer.DecodersFor(id, publisher, protocols, watcher, flows, config)) + sniffer, err := setupSniffer(id, config, publisher, &watch, flows) if err != nil { return nil, err } @@ -185,7 +179,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C) // setupFlows returns a *flows.Flows that will publish to the provided pipeline, // configured with cfg and process enrichment via the provided watcher. -func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) { +func setupFlows(pipeline beat.Pipeline, watch *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) { if !cfg.Flows.IsEnabled() { return nil, nil } @@ -211,10 +205,10 @@ func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg con return nil, err } - return flows.NewFlows(client.PublishAll, watcher, cfg.Flows) + return flows.NewFlows(client.PublishAll, watch, cfg.Flows) } -func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruct, decoders sniffer.Decoders) (*sniffer.Sniffer, error) { +func setupSniffer(id string, cfg config.Config, pub *publish.TransactionPublisher, watch *procs.ProcessesWatcher, flows *flows.Flows) (*sniffer.Sniffer, error) { icmp, err := cfg.ICMP() if err != nil { return nil, err @@ -237,7 +231,15 @@ func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruc interfaces = append(interfaces, iface) } + logp.Debug("main", "Initializing protocol plugins") + decoders := make(map[string]sniffer.Decoders) for i, iface := range interfaces { + protocols := protos.NewProtocols() + err = protocols.InitFiltered(false, iface.Device, pub, watch, cfg.Protocols, cfg.ProtocolsList) + if err != nil { + return nil, fmt.Errorf("failed to initialize protocol analyzers for %s: %w", iface.Device, err) + } + decoders[iface.Device] = sniffer.DecodersFor(id, pub, protocols, watch, flows, cfg) if iface.BpfFilter != "" || cfg.Flows.IsEnabled() { continue } diff --git a/packetbeat/protos/protos.go b/packetbeat/protos/protos.go index 3841db8f91b0..26e2783d2163 100644 --- a/packetbeat/protos/protos.go +++ b/packetbeat/protos/protos.go @@ -110,14 +110,12 @@ type reporterFactory interface { CreateReporter(*conf.C) (func(beat.Event), error) } -func (s ProtocolsStruct) Init( - testMode bool, - pub reporterFactory, - watcher *procs.ProcessesWatcher, - configs map[string]*conf.C, - listConfigs []*conf.C, -) error { - if len(configs) > 0 { +func (s ProtocolsStruct) Init(test bool, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error { + return s.InitFiltered(test, "", pub, watch, cfgs, list) +} + +func (s ProtocolsStruct) InitFiltered(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error { + if len(cfgs) != 0 { cfgwarn.Deprecate("7.0.0", "dictionary style protocols configuration has been deprecated. Please use list-style protocols configuration.") } @@ -125,21 +123,24 @@ func (s ProtocolsStruct) Init( logp.Debug("protos", "registered protocol plugin: %v", proto) } - for name, config := range configs { - if err := s.configureProtocol(testMode, pub, watcher, name, config); err != nil { + for name, cfg := range cfgs { + err := s.configureProtocol(test, device, pub, watch, name, cfg) + if err != nil { return err } } - for _, config := range listConfigs { + for _, cfg := range list { module := struct { Name string `config:"type" validate:"required"` }{} - if err := config.Unpack(&module); err != nil { + err := cfg.Unpack(&module) + if err != nil { return err } - if err := s.configureProtocol(testMode, pub, watcher, module.Name, config); err != nil { + err = s.configureProtocol(test, device, pub, watch, module.Name, cfg) + if err != nil { return err } } @@ -147,13 +148,7 @@ func (s ProtocolsStruct) Init( return nil } -func (s ProtocolsStruct) configureProtocol( - testMode bool, - pub reporterFactory, - watcher *procs.ProcessesWatcher, - name string, - config *conf.C, -) error { +func (s ProtocolsStruct) configureProtocol(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, name string, config *conf.C) error { // XXX: icmp is special, ignore here :/ if name == "icmp" { return nil @@ -176,9 +171,23 @@ func (s ProtocolsStruct) configureProtocol( return nil } + if device != "" { + // This could happen earlier, but let any errors be found first. + var protocol struct { + Device string `config:"interface"` + } + err := config.Unpack(&protocol) + if err != nil { + return err + } + if protocol.Device != "" && protocol.Device != device { + return nil + } + } + var client beat.Client results := func(beat.Event) {} - if !testMode { + if !test { var err error results, err = pub.CreateReporter(config) if err != nil { @@ -186,7 +195,7 @@ func (s ProtocolsStruct) configureProtocol( } } - inst, err := plugin(testMode, results, watcher, config) + inst, err := plugin(test, results, watch, config) if err != nil { logp.Err("Failed to register protocol plugin: %v", err) return err @@ -196,6 +205,30 @@ func (s ProtocolsStruct) configureProtocol( return nil } +func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) { + if _, exists := s.all[proto]; exists { + logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String()) + } + + s.all[proto] = protocolInstance{ + client: client, + plugin: plugin, + } + + success := false + if tcp, ok := plugin.(TCPPlugin); ok { + s.tcp[proto] = tcp + success = true + } + if udp, ok := plugin.(UDPPlugin); ok { + s.udp[proto] = udp + success = true + } + if !success { + logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts()) + } +} + func (s ProtocolsStruct) GetTCP(proto Protocol) TCPPlugin { plugin, exists := s.tcp[proto] if !exists { @@ -228,7 +261,7 @@ func (s ProtocolsStruct) GetAllUDP() map[Protocol]UDPPlugin { // and unencapsulated packets func (s ProtocolsStruct) BpfFilter(withVlans bool, withICMP bool) string { // Sort the protocol IDs so that the return value is consistent. - var protos []int + protos := make([]int, 0, len(s.all)) for proto := range s.all { protos = append(protos, int(proto)) } @@ -272,27 +305,3 @@ func (s ProtocolsStruct) BpfFilter(withVlans bool, withICMP bool) string { } return filter } - -func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) { - if _, exists := s.all[proto]; exists { - logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String()) - } - - s.all[proto] = protocolInstance{ - client: client, - plugin: plugin, - } - - success := false - if tcp, ok := plugin.(TCPPlugin); ok { - s.tcp[proto] = tcp - success = true - } - if udp, ok := plugin.(UDPPlugin); ok { - s.udp[proto] = udp - success = true - } - if !success { - logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts()) - } -} diff --git a/packetbeat/sniffer/sniffer.go b/packetbeat/sniffer/sniffer.go index 3cbff09483b2..d8043032a64a 100644 --- a/packetbeat/sniffer/sniffer.go +++ b/packetbeat/sniffer/sniffer.go @@ -90,19 +90,24 @@ const ( // only, but no device is opened yet. Accessing and configuring the actual device // is done by the Run method. The id parameter is used to specify the metric // collection ID for AF_PACKET sniffers on Linux. -func New(id string, testMode bool, _ string, decoders Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) { +func New(id string, testMode bool, _ string, decoders map[string]Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) { s := &Sniffer{ sniffers: make([]sniffer, len(interfaces)), log: logp.NewLogger("sniffer"), } for i, iface := range interfaces { + dec, ok := decoders[iface.Device] + if !ok { + // This should never happen. + return nil, fmt.Errorf("no decoder for %s", iface.Device) + } child := sniffer{ state: atomic.MakeInt32(snifferInactive), followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"), id: id, idx: i, - decoders: decoders, + decoders: dec, log: s.log, }