Skip to content

Commit

Permalink
code review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jrmolin committed Feb 3, 2025
1 parent 7106b06 commit e2cf24a
Show file tree
Hide file tree
Showing 18 changed files with 120 additions and 68 deletions.
11 changes: 4 additions & 7 deletions x-pack/filebeat/input/netflow/decoder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ type Config struct {

// Defaults returns a configuration object with defaults settings:
// - no protocols are enabled.
// - log output is discarded
// - log output is set to the logger that is passed in.
// - session expiration is checked once every hour.
// - resets are detected.
// - templates are not shared.
// - cache is disabled.
func Defaults(logger *logp.Logger) Config {
return Config{
protocols: []string{},
Expand All @@ -50,12 +53,6 @@ func (c *Config) WithProtocols(protos ...string) *Config {
return c
}

// WithLogOutput sets the output io.Writer for logging.
func (c *Config) WithLogOutput(output *logp.Logger) *Config {
c.logOutput = output
return c
}

// WithExpiration configures the expiration timeout for sessions and templates.
// A value of zero disables expiration.
func (c *Config) WithExpiration(timeout time.Duration) *Config {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import (
func main() {
logger := logp.L().Named("netflow")

decoder, err := decoder.NewDecoder(decoder.NewConfig().
WithLogOutput(logger).
decoder, err := decoder.NewDecoder(decoder.NewConfig(logger).
WithProtocols("v1", "v5", "v9", "ipfix"))
if err != nil {
logger.Fatal("Failed creating decoder:", err)
Expand Down
16 changes: 11 additions & 5 deletions x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test"
v9 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/v9"
)

func init() {
logp.TestingSetup()
}

func TestMessageWithOptions(t *testing.T) {
rawString := "" +
"000a01e45bf435e1000000a500000000000200480400001000080004000c0004" +
Expand Down Expand Up @@ -67,7 +73,7 @@ func TestMessageWithOptions(t *testing.T) {
"version": uint64(10),
},
}
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
flows, err := proto.OnPacket(bytes.NewBuffer(raw), test.MakeAddress(t, "127.0.0.1:1234"))
assert.NoError(t, err)
if assert.Len(t, flows, 7) {
Expand All @@ -84,7 +90,7 @@ func TestOptionTemplates(t *testing.T) {
key := v9.MakeSessionKey(addr, 1234, false)

t.Run("Single options template", func(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
flows, err := proto.OnPacket(test.MakePacket([]uint16{
// Header
// Version, Length, Ts, SeqNo, Source
Expand Down Expand Up @@ -113,7 +119,7 @@ func TestOptionTemplates(t *testing.T) {
})

t.Run("Multiple options template", func(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
raw := test.MakePacket([]uint16{
// Header
// Version, Count, Ts, SeqNo, Source
Expand Down Expand Up @@ -151,7 +157,7 @@ func TestOptionTemplates(t *testing.T) {
})

t.Run("records discarded", func(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
raw := test.MakePacket([]uint16{
// Header
// Version, Count, Ts, SeqNo, Source
Expand Down Expand Up @@ -193,7 +199,7 @@ func TestOptionTemplates(t *testing.T) {
func TestCustomFields(t *testing.T) {
addr := test.MakeAddress(t, "127.0.0.1:12345")

conf := config.Defaults()
conf := config.Defaults(logp.L())
conf.WithCustomFields(fields.FieldDict{
fields.Key{EnterpriseID: 0x12345678, FieldID: 33}: &fields.Field{Name: "customField", Decoder: fields.String},
})
Expand Down
12 changes: 9 additions & 3 deletions x-pack/filebeat/input/netflow/decoder/protocol/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
)

func init() {
logp.TestingSetup()
}

type testProto int

func (testProto) Version() uint16 {
Expand Down Expand Up @@ -61,7 +67,7 @@ func TestRegistry_Get(t *testing.T) {
assert.NoError(t, err)
gen, err := registry.Get("my_proto")
assert.NoError(t, err)
assert.Equal(t, testProto(0), gen(config.Defaults()))
assert.Equal(t, testProto(0), gen(config.Defaults(logp.L())))
})
t.Run("two protocols", func(t *testing.T) {
registry := ProtocolRegistry{}
Expand All @@ -71,10 +77,10 @@ func TestRegistry_Get(t *testing.T) {
assert.NoError(t, err)
gen, err := registry.Get("my_proto")
assert.NoError(t, err)
assert.Equal(t, testProto(1), gen(config.Defaults()))
assert.Equal(t, testProto(1), gen(config.Defaults(logp.L())))
gen, err = registry.Get("other_proto")
assert.NoError(t, err)
assert.Equal(t, testProto(2), gen(config.Defaults()))
assert.Equal(t, testProto(2), gen(config.Defaults(logp.L())))
})
t.Run("not registered", func(t *testing.T) {
registry := ProtocolRegistry{}
Expand Down
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/netflow/decoder/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ type NetflowProtocol struct {
}

func init() {
_ := protocol.Registry.Register(ProtocolName, New)
if err := protocol.Registry.Register(ProtocolName, New); err != nil {
panic(err)
}
}

func New(config config.Config) protocol.Protocol {
return NewProtocol(ProtocolID, &templateV1, readV1Header, logp.L().Named(LogPrefix))
return NewProtocol(ProtocolID, &templateV1, readV1Header, config.LogOutput().Named(LogPrefix))
}

func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *logp.Logger) protocol.Protocol {
Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/input/netflow/decoder/v1/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
template2 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test"
"github.com/elastic/elastic-agent-libs/logp"
)

func init() {
logp.TestingSetup()
}

func TestNetflowProtocol_New(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

assert.Nil(t, proto.Start())
assert.Equal(t, uint16(1), proto.Version())
assert.Nil(t, proto.Stop())
}

func TestNetflowProtocol_OnPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00010002000000015bf689f605946fb0" +
"acd910e5c0a8017b00000000000000000000000e00002cfa" +
Expand Down Expand Up @@ -105,7 +110,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) {
}

func TestNetflowProtocol_BadPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00010002000000015bf689f605"
raw, err := hex.DecodeString(rawS)
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/netflow/decoder/v5/v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ var templateV5 = template.Template{
}

func init() {
_ := protocol.Registry.Register(ProtocolName, New)
if err := protocol.Registry.Register(ProtocolName, New); err != nil {
panic(err)
}
}

func New(config config.Config) protocol.Protocol {
Expand Down
12 changes: 9 additions & 3 deletions x-pack/filebeat/input/netflow/decoder/v5/v5_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,28 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test"
)

func init() {
logp.TestingSetup()
}

func TestNetflowProtocol_New(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

assert.Nil(t, proto.Start())
assert.Equal(t, uint16(5), proto.Version())
assert.Nil(t, proto.Stop())
}

func TestNetflowProtocol_OnPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00050002000000015bf68d8b35fcb9780000000000000000" +
"acd910e5c0a8017b00000000000000000000000e00002cfa" +
Expand Down Expand Up @@ -119,7 +125,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) {
}

func TestNetflowProtocol_BadPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00050002000000015bf689f605"
raw, err := hex.DecodeString(rawS)
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/netflow/decoder/v6/v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ var templateV6 = template.Template{
}

func init() {
_ := protocol.Registry.Register(ProtocolName, New)
if err := protocol.Registry.Register(ProtocolName, New); err != nil {
panic(err)
}
}

func New(config config.Config) protocol.Protocol {
Expand Down
12 changes: 9 additions & 3 deletions x-pack/filebeat/input/netflow/decoder/v6/v6_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,28 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test"
)

func init() {
logp.TestingSetup()
}

func TestNetflowProtocol_New(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

assert.Nil(t, proto.Start())
assert.Equal(t, uint16(6), proto.Version())
assert.Nil(t, proto.Stop())
}

func TestNetflowProtocol_OnPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00060002000000015bf68d8b35fcb9780000000000000000" +
"acd910e5c0a8017b00000000000000000000000e00002cfa" +
Expand Down Expand Up @@ -121,7 +127,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) {
}

func TestNetflowProtocol_BadPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00060002000000015bf689f605"
raw, err := hex.DecodeString(rawS)
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/netflow/decoder/v7/v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ var v7template = template.Template{
}

func init() {
_ := protocol.Registry.Register(ProtocolName, New)
if err := protocol.Registry.Register(ProtocolName, New); err != nil {
panic(err)
}
}

func New(config config.Config) protocol.Protocol {
Expand Down
8 changes: 5 additions & 3 deletions x-pack/filebeat/input/netflow/decoder/v7/v7_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test"
)

func TestNetflowProtocol_New(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

assert.Nil(t, proto.Start())
assert.Equal(t, uint16(7), proto.Version())
assert.Nil(t, proto.Stop())
}

func TestNetflowProtocol_OnPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00070002000000015bf68d8b35fcb9780000000000000000" +
"acd910e5c0a8017b00000000000000000000000e00002cfa" +
Expand Down Expand Up @@ -119,7 +121,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) {
}

func TestNetflowProtocol_BadPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00060002000000015bf689f605"
raw, err := hex.DecodeString(rawS)
Expand Down
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/netflow/decoder/v8/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,14 @@ type NetflowV8Protocol struct {
}

func init() {
_ := protocol.Registry.Register(ProtocolName, New)
if err := protocol.Registry.Register(ProtocolName, New); err != nil {
panic(err)
}
}

func New(config config.Config) protocol.Protocol {
return &NetflowV8Protocol{
logger: logp.L().Named(LogPrefix),
logger: config.LogOutput().Named(LogPrefix),
}
}

Expand Down
Loading

0 comments on commit e2cf24a

Please sign in to comment.