Skip to content

Commit 69e4e92

Browse files
committed
packetbeat/{beater,protos,sniffer}: allow assignment of protocols to interfaces
Previously the protocols association was held in a *protocols.ProtocolStruct in common for all interfaces. This prevented individual interfaces that were only associated with a subset of the protocols that were being captured from working specifically on that subset. The new code moves the construction of the protocols sets into setupSniffer where they are associated with the interface that will be capturing them. They are then assigned to child sniffers in sniffer.New based on the interface. This change has no effect on the behaviour of packetbeat as it stands as configs from packetbeat and from agent are only able to express the existence of a single interface. Changes to configuration format/shape will enable multiple interfaces to be enabled in packetbeat and within a single agent policy, making use of the change here.
1 parent 55df09f commit 69e4e92

File tree

4 files changed

+79
-62
lines changed

4 files changed

+79
-62
lines changed

CHANGELOG-developer.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
177177
- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619]
178178
- Add file-backed cache for cache enrichment processor. {pull}36686[36686] {pull}36696[36696]
179179
- Elide retryable HTTP client construction in Filebeat HTTPJSON and CEL inputs if not needed. {pull}36916[36916]
180+
- Allow assignment of packetbeat protocols to interfaces. {issue}36574[36564] {pull}[]
180181

181182
==== Deprecated
182183

packetbeat/beater/processor.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,10 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
153153
return nil, err
154154
}
155155

156-
watcher := &procs.ProcessesWatcher{}
156+
var watch procs.ProcessesWatcher
157157
// Enable the process watcher only if capturing live traffic
158158
if config.Interfaces[0].File == "" {
159-
err = watcher.Init(config.Procs)
159+
err = watch.Init(config.Procs)
160160
if err != nil {
161161
logp.Critical(err.Error())
162162
return nil, err
@@ -165,17 +165,11 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
165165
logp.Info("Process watcher disabled when file input is used")
166166
}
167167

168-
logp.Debug("main", "Initializing protocol plugins")
169-
protocols := protos.NewProtocols()
170-
err = protocols.Init(false, publisher, watcher, config.Protocols, config.ProtocolsList)
171-
if err != nil {
172-
return nil, fmt.Errorf("failed to initialize protocol analyzers: %w", err)
173-
}
174-
flows, err := setupFlows(pipeline, watcher, config)
168+
flows, err := setupFlows(pipeline, &watch, config)
175169
if err != nil {
176170
return nil, err
177171
}
178-
sniffer, err := setupSniffer(id, config, protocols, sniffer.DecodersFor(id, publisher, protocols, watcher, flows, config))
172+
sniffer, err := setupSniffer(id, config, publisher, &watch, flows)
179173
if err != nil {
180174
return nil, err
181175
}
@@ -185,7 +179,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
185179

186180
// setupFlows returns a *flows.Flows that will publish to the provided pipeline,
187181
// configured with cfg and process enrichment via the provided watcher.
188-
func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) {
182+
func setupFlows(pipeline beat.Pipeline, watch *procs.ProcessesWatcher, cfg config.Config) (*flows.Flows, error) {
189183
if !cfg.Flows.IsEnabled() {
190184
return nil, nil
191185
}
@@ -211,10 +205,10 @@ func setupFlows(pipeline beat.Pipeline, watcher *procs.ProcessesWatcher, cfg con
211205
return nil, err
212206
}
213207

214-
return flows.NewFlows(client.PublishAll, watcher, cfg.Flows)
208+
return flows.NewFlows(client.PublishAll, watch, cfg.Flows)
215209
}
216210

217-
func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruct, decoders sniffer.Decoders) (*sniffer.Sniffer, error) {
211+
func setupSniffer(id string, cfg config.Config, pub *publish.TransactionPublisher, watch *procs.ProcessesWatcher, flows *flows.Flows) (*sniffer.Sniffer, error) {
218212
icmp, err := cfg.ICMP()
219213
if err != nil {
220214
return nil, err
@@ -237,7 +231,15 @@ func setupSniffer(id string, cfg config.Config, protocols *protos.ProtocolsStruc
237231
interfaces = append(interfaces, iface)
238232
}
239233

234+
logp.Debug("main", "Initializing protocol plugins")
235+
decoders := make(map[string]sniffer.Decoders)
240236
for i, iface := range interfaces {
237+
protocols := protos.NewProtocols()
238+
err = protocols.InitFiltered(false, iface.Device, pub, watch, cfg.Protocols, cfg.ProtocolsList)
239+
if err != nil {
240+
return nil, fmt.Errorf("failed to initialize protocol analyzers for %s: %w", iface.Device, err)
241+
}
242+
decoders[iface.Device] = sniffer.DecodersFor(id, pub, protocols, watch, flows, cfg)
241243
if iface.BpfFilter != "" || cfg.Flows.IsEnabled() {
242244
continue
243245
}

packetbeat/protos/protos.go

Lines changed: 56 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -110,50 +110,45 @@ type reporterFactory interface {
110110
CreateReporter(*conf.C) (func(beat.Event), error)
111111
}
112112

113-
func (s ProtocolsStruct) Init(
114-
testMode bool,
115-
pub reporterFactory,
116-
watcher *procs.ProcessesWatcher,
117-
configs map[string]*conf.C,
118-
listConfigs []*conf.C,
119-
) error {
120-
if len(configs) > 0 {
113+
func (s ProtocolsStruct) Init(test bool, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error {
114+
return s.InitFiltered(test, "", pub, watch, cfgs, list)
115+
}
116+
117+
func (s ProtocolsStruct) InitFiltered(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, cfgs map[string]*conf.C, list []*conf.C) error {
118+
if len(cfgs) != 0 {
121119
cfgwarn.Deprecate("7.0.0", "dictionary style protocols configuration has been deprecated. Please use list-style protocols configuration.")
122120
}
123121

124122
for proto := range protocolSyms {
125123
logp.Debug("protos", "registered protocol plugin: %v", proto)
126124
}
127125

128-
for name, config := range configs {
129-
if err := s.configureProtocol(testMode, pub, watcher, name, config); err != nil {
126+
for name, cfg := range cfgs {
127+
err := s.configureProtocol(test, device, pub, watch, name, cfg)
128+
if err != nil {
130129
return err
131130
}
132131
}
133132

134-
for _, config := range listConfigs {
133+
for _, cfg := range list {
135134
module := struct {
136135
Name string `config:"type" validate:"required"`
137136
}{}
138-
if err := config.Unpack(&module); err != nil {
137+
err := cfg.Unpack(&module)
138+
if err != nil {
139139
return err
140140
}
141141

142-
if err := s.configureProtocol(testMode, pub, watcher, module.Name, config); err != nil {
142+
err = s.configureProtocol(test, device, pub, watch, module.Name, cfg)
143+
if err != nil {
143144
return err
144145
}
145146
}
146147

147148
return nil
148149
}
149150

150-
func (s ProtocolsStruct) configureProtocol(
151-
testMode bool,
152-
pub reporterFactory,
153-
watcher *procs.ProcessesWatcher,
154-
name string,
155-
config *conf.C,
156-
) error {
151+
func (s ProtocolsStruct) configureProtocol(test bool, device string, pub reporterFactory, watch *procs.ProcessesWatcher, name string, config *conf.C) error {
157152
// XXX: icmp is special, ignore here :/
158153
if name == "icmp" {
159154
return nil
@@ -176,17 +171,31 @@ func (s ProtocolsStruct) configureProtocol(
176171
return nil
177172
}
178173

174+
if device != "" {
175+
// This could happen earlier, but let any errors be found first.
176+
var protocol struct {
177+
Device string `config:"interface"`
178+
}
179+
err := config.Unpack(&protocol)
180+
if err != nil {
181+
return err
182+
}
183+
if protocol.Device != "" && protocol.Device != device {
184+
return nil
185+
}
186+
}
187+
179188
var client beat.Client
180189
results := func(beat.Event) {}
181-
if !testMode {
190+
if !test {
182191
var err error
183192
results, err = pub.CreateReporter(config)
184193
if err != nil {
185194
return err
186195
}
187196
}
188197

189-
inst, err := plugin(testMode, results, watcher, config)
198+
inst, err := plugin(test, results, watch, config)
190199
if err != nil {
191200
logp.Err("Failed to register protocol plugin: %v", err)
192201
return err
@@ -196,6 +205,30 @@ func (s ProtocolsStruct) configureProtocol(
196205
return nil
197206
}
198207

208+
func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) {
209+
if _, exists := s.all[proto]; exists {
210+
logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String())
211+
}
212+
213+
s.all[proto] = protocolInstance{
214+
client: client,
215+
plugin: plugin,
216+
}
217+
218+
success := false
219+
if tcp, ok := plugin.(TCPPlugin); ok {
220+
s.tcp[proto] = tcp
221+
success = true
222+
}
223+
if udp, ok := plugin.(UDPPlugin); ok {
224+
s.udp[proto] = udp
225+
success = true
226+
}
227+
if !success {
228+
logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts())
229+
}
230+
}
231+
199232
func (s ProtocolsStruct) GetTCP(proto Protocol) TCPPlugin {
200233
plugin, exists := s.tcp[proto]
201234
if !exists {
@@ -228,7 +261,7 @@ func (s ProtocolsStruct) GetAllUDP() map[Protocol]UDPPlugin {
228261
// and unencapsulated packets
229262
func (s ProtocolsStruct) BpfFilter(withVlans bool, withICMP bool) string {
230263
// Sort the protocol IDs so that the return value is consistent.
231-
var protos []int
264+
protos := make([]int, 0, len(s.all))
232265
for proto := range s.all {
233266
protos = append(protos, int(proto))
234267
}
@@ -272,27 +305,3 @@ func (s ProtocolsStruct) BpfFilter(withVlans bool, withICMP bool) string {
272305
}
273306
return filter
274307
}
275-
276-
func (s ProtocolsStruct) register(proto Protocol, client beat.Client, plugin Plugin) {
277-
if _, exists := s.all[proto]; exists {
278-
logp.Warn("Protocol (%s) plugin will overwritten by another plugin", proto.String())
279-
}
280-
281-
s.all[proto] = protocolInstance{
282-
client: client,
283-
plugin: plugin,
284-
}
285-
286-
success := false
287-
if tcp, ok := plugin.(TCPPlugin); ok {
288-
s.tcp[proto] = tcp
289-
success = true
290-
}
291-
if udp, ok := plugin.(UDPPlugin); ok {
292-
s.udp[proto] = udp
293-
success = true
294-
}
295-
if !success {
296-
logp.Warn("Protocol (%s) register failed, port: %v", proto.String(), plugin.GetPorts())
297-
}
298-
}

packetbeat/sniffer/sniffer.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,24 @@ const (
9090
// only, but no device is opened yet. Accessing and configuring the actual device
9191
// is done by the Run method. The id parameter is used to specify the metric
9292
// collection ID for AF_PACKET sniffers on Linux.
93-
func New(id string, testMode bool, _ string, decoders Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) {
93+
func New(id string, testMode bool, _ string, decoders map[string]Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) {
9494
s := &Sniffer{
9595
sniffers: make([]sniffer, len(interfaces)),
9696
log: logp.NewLogger("sniffer"),
9797
}
9898

9999
for i, iface := range interfaces {
100+
dec, ok := decoders[iface.Device]
101+
if !ok {
102+
// This should never happen.
103+
return nil, fmt.Errorf("no decoder for %s", iface.Device)
104+
}
100105
child := sniffer{
101106
state: atomic.MakeInt32(snifferInactive),
102107
followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"),
103108
id: id,
104109
idx: i,
105-
decoders: decoders,
110+
decoders: dec,
106111
log: s.log,
107112
}
108113

0 commit comments

Comments
 (0)