Skip to content

Commit

Permalink
packetbeat/{beater,protos,sniffer}: allow assignment of protocols to …
Browse files Browse the repository at this point in the history
…interfaces

WIP
  • Loading branch information
efd6 committed Oct 16, 2023
1 parent b8a241b commit 0c9d36d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 62 deletions.
28 changes: 15 additions & 13 deletions packetbeat/beater/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
return nil, err
}

watcher := &procs.ProcessesWatcher{}
var watch procs.ProcessesWatcher
// Enable the process watcher only if capturing live traffic
if config.Interfaces[0].File == "" {
err = watcher.Init(config.Procs)
err = watch.Init(config.Procs)
if err != nil {
logp.Critical(err.Error())
return nil, err
Expand All @@ -165,17 +165,11 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
logp.Info("Process watcher disabled when file input is used")
}

logp.Debug("main", "Initializing protocol plugins")
protocols := protos.NewProtocols()
err = protocols.Init(false, publisher, watcher, config.Protocols, config.ProtocolsList)
if err != nil {
return nil, fmt.Errorf("failed to initialize protocol analyzers: %w", err)
}
flows, err := setupFlows(pipeline, watcher, config)
flows, err := setupFlows(pipeline, &watch, config)
if err != nil {
return nil, err
}
sniffer, err := setupSniffer(id, config, protocols, sniffer.DecodersFor(id, publisher, protocols, watcher, flows, config))
sniffer, err := setupSniffer(id, config, publisher, &watch, flows)
if err != nil {
return nil, err
}
Expand All @@ -185,7 +179,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)

// setupFlows returns a *flows.Flows that will publish to the provided pipeline,
// configured with cfg and process enrichment via the provided watcher.
func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) {
func setupFlows(pipeline beat.Pipeline, watch *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) {
if !cfg.Flows.IsEnabled() {
return nil, nil
}
Expand All @@ -211,10 +205,10 @@ func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg con
return nil, err
}

return flows.NewFlows(client.PublishAll, watcher, cfg.Flows)
return flows.NewFlows(client.PublishAll, watch, cfg.Flows)
}

func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruct, decoders sniffer.Decoders) (*sniffer.Sniffer, error) {
func setupSniffer(id string, cfg config.Config, pub *publish.TransactionPublisher, watch *procs.ProcessesWatcher, flows *flows.Flows) (*sniffer.Sniffer, error) {
icmp, err := cfg.ICMP()
if err != nil {
return nil, err
Expand All @@ -237,7 +231,15 @@ func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruc
interfaces = append(interfaces, iface)
}

logp.Debug("main", "Initializing protocol plugins")
decoders := make(map[string]sniffer.Decoders)
for i, iface := range interfaces {
protocols := protos.NewProtocols()
err = protocols.InitFiltered(false, iface.Device, pub, watch, cfg.Protocols, cfg.ProtocolsList)
if err != nil {
return nil, fmt.Errorf("failed to initialize protocol analyzers for %s: %w", iface.Device, err)
}
decoders[iface.Device] = sniffer.DecodersFor(id, pub, protocols, watch, flows, cfg)
if iface.BpfFilter != "" || cfg.Flows.IsEnabled() {
continue
}
Expand Down
103 changes: 56 additions & 47 deletions packetbeat/protos/protos.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,50 +110,45 @@ type reporterFactory interface {
CreateReporter(*conf.C) (func(beat.Event), error)
}

func (s ProtocolsStruct) Init(
testMode bool,
pub reporterFactory,
watcher *procs.ProcessesWatcher,
configs map[string]*conf.C,
listConfigs []*conf.C,
) error {
if len(configs) > 0 {
func (s ProtocolsStruct) Init(test bool, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error {
return s.InitFiltered(test, "", pub, watch, cfgs, list)
}

func (s ProtocolsStruct) InitFiltered(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error {
if len(cfgs) != 0 {
cfgwarn.Deprecate("7.0.0", "dictionary style protocols configuration has been deprecated. Please use list-style protocols configuration.")
}

for proto := range protocolSyms {
logp.Debug("protos", "registered protocol plugin: %v", proto)
}

for name, config := range configs {
if err := s.configureProtocol(testMode, pub, watcher, name, config); err != nil {
for name, cfg := range cfgs {
err := s.configureProtocol(test, device, pub, watch, name, cfg)
if err != nil {
return err
}
}

for _, config := range listConfigs {
for _, cfg := range list {
module := struct {
Name string `config:"type" validate:"required"`
}{}
if err := config.Unpack(&module); err != nil {
err := cfg.Unpack(&module)
if err != nil {
return err
}

if err := s.configureProtocol(testMode, pub, watcher, module.Name, config); err != nil {
err = s.configureProtocol(test, device, pub, watch, module.Name, cfg)
if err != nil {
return err
}
}

return nil
}

func (s ProtocolsStruct) configureProtocol(
testMode bool,
pub reporterFactory,
watcher *procs.ProcessesWatcher,
name string,
config *conf.C,
) error {
func (s ProtocolsStruct) configureProtocol(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, name string, config *conf.C) error {
// XXX: icmp is special, ignore here :/
if name == "icmp" {
return nil
Expand All @@ -176,17 +171,31 @@ func (s ProtocolsStruct) configureProtocol(
return nil
}

if device != "" {
// This could happen earlier, but let any errors be found first.
var protocol struct {
Device string `config:"interface"`
}
err := config.Unpack(&protocol)
if err != nil {
return err
}
if protocol.Device != "" && protocol.Device != device {
return nil
}
}

var client beat.Client
results := func(beat.Event) {}
if !testMode {
if !test {
var err error
results, err = pub.CreateReporter(config)
if err != nil {
return err
}
}

inst, err := plugin(testMode, results, watcher, config)
inst, err := plugin(test, results, watch, config)
if err != nil {
logp.Err("Failed to register protocol plugin: %v", err)
return err
Expand All @@ -196,6 +205,30 @@ func (s ProtocolsStruct) configureProtocol(
return nil
}

func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) {
if _, exists := s.all[proto]; exists {
logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String())
}

s.all[proto] = protocolInstance{
client: client,
plugin: plugin,
}

success := false
if tcp, ok := plugin.(TCPPlugin); ok {
s.tcp[proto] = tcp
success = true
}
if udp, ok := plugin.(UDPPlugin); ok {
s.udp[proto] = udp
success = true
}
if !success {
logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts())
}
}

func (s ProtocolsStruct) GetTCP(proto Protocol) TCPPlugin {
plugin, exists := s.tcp[proto]
if !exists {
Expand Down Expand Up @@ -228,7 +261,7 @@ func (s ProtocolsStruct) GetAllUDP() map[Protocol]UDPPlugin {
// and unencapsulated packets
func (s ProtocolsStruct) BpfFilter(withVlans bool, withICMP bool) string {
// Sort the protocol IDs so that the return value is consistent.
var protos []int
protos := make([]int, 0, len(s.all))
for proto := range s.all {
protos = append(protos, int(proto))
}
Expand Down Expand Up @@ -272,27 +305,3 @@ func (s ProtocolsStruct) BpfFilter(withVlans bool, withICMP bool) string {
}
return filter
}

func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) {
if _, exists := s.all[proto]; exists {
logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String())
}

s.all[proto] = protocolInstance{
client: client,
plugin: plugin,
}

success := false
if tcp, ok := plugin.(TCPPlugin); ok {
s.tcp[proto] = tcp
success = true
}
if udp, ok := plugin.(UDPPlugin); ok {
s.udp[proto] = udp
success = true
}
if !success {
logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts())
}
}
9 changes: 7 additions & 2 deletions packetbeat/sniffer/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,24 @@ const (
// only, but no device is opened yet. Accessing and configuring the actual device
// is done by the Run method. The id parameter is used to specify the metric
// collection ID for AF_PACKET sniffers on Linux.
func New(id string, testMode bool, _ string, decoders Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) {
func New(id string, testMode bool, _ string, decoders map[string]Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) {
s := &Sniffer{
sniffers: make([]sniffer, len(interfaces)),
log: logp.NewLogger("sniffer"),
}

for i, iface := range interfaces {
dec, ok := decoders[iface.Device]
if !ok {
// This should never happen.
return nil, fmt.Errorf("no decoder for %s", iface.Device)
}
child := sniffer{
state: atomic.MakeInt32(snifferInactive),
followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"),
id: id,
idx: i,
decoders: decoders,
decoders: dec,
log: s.log,
}

Expand Down

0 comments on commit 0c9d36d

Please sign in to comment.