Skip to content

Commit 09512c3

Browse files
committed
packetbeat/{beater,protos,sniffer}: allow assignment of protocols to interfaces
WIP
1 parent b8a241b commit 09512c3

File tree

3 files changed

+75
-62
lines changed

3 files changed

+75
-62
lines changed

packetbeat/beater/processor.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,10 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
153153
return nil, err
154154
}
155155

156-
watcher := &procs.ProcessesWatcher{}
156+
watch := &procs.ProcessesWatcher{}
157157
// Enable the process watcher only if capturing live traffic
158158
if config.Interfaces[0].File == "" {
159-
err = watcher.Init(config.Procs)
159+
err = watch.Init(config.Procs)
160160
if err != nil {
161161
logp.Critical(err.Error())
162162
return nil, err
@@ -165,17 +165,11 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
165165
logp.Info("Process watcher disabled when file input is used")
166166
}
167167

168-
logp.Debug("main", "Initializing protocol plugins")
169-
protocols := protos.NewProtocols()
170-
err = protocols.Init(false, publisher, watcher, config.Protocols, config.ProtocolsList)
171-
if err != nil {
172-
return nil, fmt.Errorf("failed to initialize protocol analyzers: %w", err)
173-
}
174-
flows, err := setupFlows(pipeline, watcher, config)
168+
flows, err := setupFlows(pipeline, watch, config)
175169
if err != nil {
176170
return nil, err
177171
}
178-
sniffer, err := setupSniffer(id, config, protocols, sniffer.DecodersFor(id, publisher, protocols, watcher, flows, config))
172+
sniffer, err := setupSniffer(id, config, publisher, watch, flows)
179173
if err != nil {
180174
return nil, err
181175
}
@@ -185,7 +179,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
185179

186180
// setupFlows returns a *flows.Flows that will publish to the provided pipeline,
187181
// configured with cfg and process enrichment via the provided watcher.
188-
func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) {
182+
func setupFlows(pipeline beat.Pipeline, watch *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) {
189183
if !cfg.Flows.IsEnabled() {
190184
return nil, nil
191185
}
@@ -211,10 +205,10 @@ func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg con
211205
return nil, err
212206
}
213207

214-
return flows.NewFlows(client.PublishAll, watcher, cfg.Flows)
208+
return flows.NewFlows(client.PublishAll, watch, cfg.Flows)
215209
}
216210

217-
func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruct, decoders sniffer.Decoders) (*sniffer.Sniffer, error) {
211+
func setupSniffer(id string, cfg config.Config, pub *publish.TransactionPublisher, watch *procs.ProcessesWatcher, flows *flows.Flows) (*sniffer.Sniffer, error) {
218212
icmp, err := cfg.ICMP()
219213
if err != nil {
220214
return nil, err
@@ -237,7 +231,15 @@ func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruc
237231
interfaces = append(interfaces, iface)
238232
}
239233

234+
logp.Debug("main", "Initializing protocol plugins")
235+
decoders := make(map[string]sniffer.Decoders)
240236
for i, iface := range interfaces {
237+
protocols := protos.NewProtocols()
238+
err = protocols.InitFiltered(false, iface.Device, pub, watch, cfg.Protocols, cfg.ProtocolsList)
239+
if err != nil {
240+
return nil, fmt.Errorf("failed to initialize protocol analyzers for %s: %w", iface.Device, err)
241+
}
242+
decoders[iface.Device] = sniffer.DecodersFor(id, pub, protocols, watch, flows, cfg)
241243
if iface.BpfFilter != "" || cfg.Flows.IsEnabled() {
242244
continue
243245
}

packetbeat/protos/protos.go

Lines changed: 53 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -110,50 +110,45 @@ type reporterFactory interface {
110110
CreateReporter(*conf.C) (func(beat.Event), error)
111111
}
112112

113-
func (s ProtocolsStruct) Init(
114-
testMode bool,
115-
pub reporterFactory,
116-
watcher *procs.ProcessesWatcher,
117-
configs map[string]*conf.C,
118-
listConfigs []*conf.C,
119-
) error {
120-
if len(configs) > 0 {
113+
func (s ProtocolsStruct) Init(test bool, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error {
114+
return s.InitFiltered(test, "", pub, watch, cfgs, list)
115+
}
116+
117+
func (s ProtocolsStruct) InitFiltered(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error {
118+
if len(cfgs) > 0 {
121119
cfgwarn.Deprecate("7.0.0", "dictionary style protocols configuration has been deprecated. Please use list-style protocols configuration.")
122120
}
123121

124122
for proto := range protocolSyms {
125123
logp.Debug("protos", "registered protocol plugin: %v", proto)
126124
}
127125

128-
for name, config := range configs {
129-
if err := s.configureProtocol(testMode, pub, watcher, name, config); err != nil {
126+
for name, cfg := range cfgs {
127+
err := s.configureProtocol(test, device, pub, watch, name, cfg)
128+
if err != nil {
130129
return err
131130
}
132131
}
133132

134-
for _, config := range listConfigs {
133+
for _, cfg := range list {
135134
module := struct {
136135
Name string `config:"type" validate:"required"`
137136
}{}
138-
if err := config.Unpack(&module); err != nil {
137+
err := cfg.Unpack(&module)
138+
if err != nil {
139139
return err
140140
}
141141

142-
if err := s.configureProtocol(testMode, pub, watcher, module.Name, config); err != nil {
142+
err = s.configureProtocol(test, device, pub, watch, module.Name, cfg)
143+
if err != nil {
143144
return err
144145
}
145146
}
146147

147148
return nil
148149
}
149150

150-
func (s ProtocolsStruct) configureProtocol(
151-
testMode bool,
152-
pub reporterFactory,
153-
watcher *procs.ProcessesWatcher,
154-
name string,
155-
config *conf.C,
156-
) error {
151+
func (s ProtocolsStruct) configureProtocol(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, name string, config *conf.C) error {
157152
// XXX: icmp is special, ignore here :/
158153
if name == "icmp" {
159154
return nil
@@ -176,17 +171,28 @@ func (s ProtocolsStruct) configureProtocol(
176171
return nil
177172
}
178173

174+
// This could happen earlier, but let the errors be found first.
175+
var protocol struct {
176+
Device string `config:"interface"`
177+
}
178+
err := config.Unpack(&protocol)
179+
if err != nil {
180+
return err
181+
}
182+
if protocol.Device != "" && protocol.Device != device {
183+
return nil
184+
}
185+
179186
var client beat.Client
180187
results := func(beat.Event) {}
181-
if !testMode {
182-
var err error
188+
if !test {
183189
results, err = pub.CreateReporter(config)
184190
if err != nil {
185191
return err
186192
}
187193
}
188194

189-
inst, err := plugin(testMode, results, watcher, config)
195+
inst, err := plugin(test, results, watch, config)
190196
if err != nil {
191197
logp.Err("Failed to register protocol plugin: %v", err)
192198
return err
@@ -196,6 +202,30 @@ func (s ProtocolsStruct) configureProtocol(
196202
return nil
197203
}
198204

205+
func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) {
206+
if _, exists := s.all[proto]; exists {
207+
logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String())
208+
}
209+
210+
s.all[proto] = protocolInstance{
211+
client: client,
212+
plugin: plugin,
213+
}
214+
215+
success := false
216+
if tcp, ok := plugin.(TCPPlugin); ok {
217+
s.tcp[proto] = tcp
218+
success = true
219+
}
220+
if udp, ok := plugin.(UDPPlugin); ok {
221+
s.udp[proto] = udp
222+
success = true
223+
}
224+
if !success {
225+
logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts())
226+
}
227+
}
228+
199229
func (s ProtocolsStruct) GetTCP(proto Protocol) TCPPlugin {
200230
plugin, exists := s.tcp[proto]
201231
if !exists {
@@ -272,27 +302,3 @@ func (s ProtocolsStruct) BpfFilter(withVlans bool, withICMP bool) string {
272302
}
273303
return filter
274304
}
275-
276-
func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) {
277-
if _, exists := s.all[proto]; exists {
278-
logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String())
279-
}
280-
281-
s.all[proto] = protocolInstance{
282-
client: client,
283-
plugin: plugin,
284-
}
285-
286-
success := false
287-
if tcp, ok := plugin.(TCPPlugin); ok {
288-
s.tcp[proto] = tcp
289-
success = true
290-
}
291-
if udp, ok := plugin.(UDPPlugin); ok {
292-
s.udp[proto] = udp
293-
success = true
294-
}
295-
if !success {
296-
logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts())
297-
}
298-
}

packetbeat/sniffer/sniffer.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,24 @@ const (
9090
// only, but no device is opened yet. Accessing and configuring the actual device
9191
// is done by the Run method. The id parameter is used to specify the metric
9292
// collection ID for AF_PACKET sniffers on Linux.
93-
func New(id string, testMode bool, _ string, decoders Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) {
93+
func New(id string, testMode bool, _ string, decoders map[string]Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) {
9494
s := &Sniffer{
9595
sniffers: make([]sniffer, len(interfaces)),
9696
log: logp.NewLogger("sniffer"),
9797
}
9898

9999
for i, iface := range interfaces {
100+
dec, ok := decoders[iface.Device]
101+
if !ok {
102+
// This should never happen.
103+
return nil, fmt.Errorf("no decoder for %s", iface.Device)
104+
}
100105
child := sniffer{
101106
state: atomic.MakeInt32(snifferInactive),
102107
followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"),
103108
id: id,
104109
idx: i,
105-
decoders: decoders,
110+
decoders: dec,
106111
log: s.log,
107112
}
108113

0 commit comments

Comments
 (0)