From f7ff90bddb7e1085ec7d80ee2a306547d6ee3fa2 Mon Sep 17 00:00:00 2001 From: dlopes7 Date: Sun, 15 Dec 2024 13:40:23 -0600 Subject: [PATCH] #32732 - Initial implementation This adds the implementation for netflow receiver Introduces: * The OtelLogsProducerWrapper * A parser function to convert proto to otel semantics * The listener implementation as well as the error handling function --- receiver/netflowreceiver/factory.go | 2 +- receiver/netflowreceiver/go.mod | 4 +- receiver/netflowreceiver/go.sum | 4 + receiver/netflowreceiver/listener.go | 139 +++++++++++++++++++++- receiver/netflowreceiver/listener_test.go | 2 +- receiver/netflowreceiver/parser.go | 138 +++++++++++++++++++++ receiver/netflowreceiver/producer.go | 92 ++++++++++++++ receiver/netflowreceiver/receiver.go | 18 ++- 8 files changed, 388 insertions(+), 11 deletions(-) create mode 100644 receiver/netflowreceiver/parser.go create mode 100644 receiver/netflowreceiver/producer.go diff --git a/receiver/netflowreceiver/factory.go b/receiver/netflowreceiver/factory.go index 87c27e9b76ed..f5da854f02e3 100644 --- a/receiver/netflowreceiver/factory.go +++ b/receiver/netflowreceiver/factory.go @@ -16,7 +16,7 @@ import ( const ( defaultSockets = 1 defaultWorkers = 2 - defaultQueueSize = 1_000_000 + defaultQueueSize = 1_000 ) // NewFactory creates a factory for netflow receiver. diff --git a/receiver/netflowreceiver/go.mod b/receiver/netflowreceiver/go.mod index fab4afb00086..3481e6a91c5a 100644 --- a/receiver/netflowreceiver/go.mod +++ b/receiver/netflowreceiver/go.mod @@ -3,12 +3,14 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflo go 1.22.0 require ( + github.com/netsampler/goflow2/v2 v2.2.1 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.115.0 go.opentelemetry.io/collector/component/componenttest v0.115.0 go.opentelemetry.io/collector/confmap v1.21.0 go.opentelemetry.io/collector/consumer v1.21.0 go.opentelemetry.io/collector/consumer/consumertest v0.115.0 + go.opentelemetry.io/collector/pdata v1.21.0 go.opentelemetry.io/collector/receiver v0.115.0 go.opentelemetry.io/collector/receiver/receivertest v0.115.0 go.uber.org/goleak v1.3.0 @@ -26,6 +28,7 @@ require ( github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect + github.com/libp2p/go-reuseport v0.4.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -34,7 +37,6 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.115.0 // indirect - go.opentelemetry.io/collector/pdata v1.21.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect go.opentelemetry.io/collector/pipeline v0.115.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.115.0 // indirect diff --git a/receiver/netflowreceiver/go.sum b/receiver/netflowreceiver/go.sum index ebf24515df5a..dd1aa3730549 100644 --- a/receiver/netflowreceiver/go.sum +++ b/receiver/netflowreceiver/go.sum @@ -29,6 +29,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s= +github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -38,6 +40,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/netsampler/goflow2/v2 v2.2.1 h1:QzrtWS/meXsqCLv68hdouL+09NfuLKrCoVDJ1xfmuoE= +github.com/netsampler/goflow2/v2 v2.2.1/go.mod h1:057wOc/Xp7c+hUwRDB7wRqrx55m0r3vc7J0k4NrlFbM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= diff --git a/receiver/netflowreceiver/listener.go b/receiver/netflowreceiver/listener.go index d4507cadec8f..09e0b43db96e 100644 --- a/receiver/netflowreceiver/listener.go +++ b/receiver/netflowreceiver/listener.go @@ -3,9 +3,140 @@ package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" +import ( + "errors" + "fmt" + "net" + + "github.com/netsampler/goflow2/v2/decoders/netflow" + + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "github.com/netsampler/goflow2/v2/utils" + "github.com/netsampler/goflow2/v2/utils/debug" + + "go.opentelemetry.io/collector/consumer" + "go.uber.org/zap" +) + type Listener struct { - // config Config - // logger *zap.Logger - // recv *utils.UDPReceiver - // logConsumer consumer.Logs + config Config + logger *zap.Logger + recv *utils.UDPReceiver + logConsumer consumer.Logs +} + +func (l *Listener) Dropped(msg utils.Message) { + l.logger.Warn("Dropped netflow message", zap.Any("msg", msg)) +} + +func NewListener(config Config, logger *zap.Logger, logConsumer consumer.Logs) *Listener { + return &Listener{config: config, logger: logger, logConsumer: logConsumer} +} + +func (l *Listener) Start() error { + l.logger.Info("Creating the netflow UDP listener", zap.Any("config", l.config)) + cfg := &utils.UDPReceiverConfig{ + Sockets: l.config.Sockets, + Workers: l.config.Workers, + QueueSize: l.config.QueueSize, + Blocking: false, + ReceiverCallback: l, + } + recv, err := utils.NewUDPReceiver(cfg) + if err != nil { + return err + } + l.recv = recv + + decodeFunc, err := l.buildDecodeFunc() + if err != nil { + return err + } + + l.logger.Info("Start listening for NetFlow", zap.Any("config", l.config)) + if err := l.recv.Start(l.config.Hostname, l.config.Port, decodeFunc); err != nil { + return err + } + + go l.handleErrors() + + return nil +} + +// handleErrors handles errors from the listener +func (l *Listener) handleErrors() { + for err := range l.recv.Errors() { + if errors.Is(err, net.ErrClosed) { + l.logger.Info("receiver closed") + continue + } else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) { + l.logger.Error("receiver error", zap.Error(err)) + continue + } else if errors.Is(err, netflow.ErrorTemplateNotFound) { + l.logger.Warn("template was not found for this message") + continue + } else if errors.Is(err, debug.PanicError) { + var pErrMsg *debug.PanicErrorMessage + if errors.As(err, &pErrMsg) { + l.logger.Error("panic error", zap.String("panic", pErrMsg.Inner)) + l.logger.Error("receiver stacktrace", zap.String("stack", string(pErrMsg.Stacktrace))) + l.logger.Error("receiver msg", zap.Any("error", pErrMsg.Msg)) + } + l.logger.Error("receiver panic", zap.Error(err)) + + continue + } + } +} + +// buildDecodeFunc creates a decode function based on the scheme +func (l *Listener) buildDecodeFunc() (utils.DecoderFunc, error) { + + // Eventually this can be used to configure mappings + cfgProducer := &protoproducer.ProducerConfig{} + cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer + if err != nil { + return nil, err + } + // We use a goflow2 proto producer to produce messages using protobuf format + protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) + if err != nil { + return nil, err + } + + // the otel log producer converts those messages into OpenTelemetry logs + otelLogsProducer := NewOtelLogsProducer(protoProducer, l.logConsumer) + + cfgPipe := &utils.PipeConfig{ + Producer: otelLogsProducer, + // Format: &format.Format{ + // FormatDriver: &format.JSONFormatDriver{}, + // }, + } + + var decodeFunc utils.DecoderFunc + var p utils.FlowPipe + if l.config.Scheme == "sflow" { + p = utils.NewSFlowPipe(cfgPipe) + } else if l.config.Scheme == "netflow" { + p = utils.NewNetFlowPipe(cfgPipe) + } else if l.config.Scheme == "flow" { + p = utils.NewFlowPipe(cfgPipe) + } else { + return nil, fmt.Errorf("scheme does not exist: %s", l.config.Scheme) + } + + decodeFunc = p.DecodeFlow + + // We wrap panics while decoding the message to habndle them later + decodeFunc = debug.PanicDecoderWrapper(decodeFunc) + + return decodeFunc, nil +} + +func (l *Listener) Shutdown() error { + if l.recv != nil { + return l.recv.Stop() + } + return nil } diff --git a/receiver/netflowreceiver/listener_test.go b/receiver/netflowreceiver/listener_test.go index 9fbae90c700c..599e7928cd3d 100644 --- a/receiver/netflowreceiver/listener_test.go +++ b/receiver/netflowreceiver/listener_test.go @@ -23,5 +23,5 @@ func TestCreateValidDefaultListener(t *testing.T) { assert.Equal(t, 2055, receiver.(*netflowReceiver).config.Port) assert.Equal(t, 1, receiver.(*netflowReceiver).config.Sockets) assert.Equal(t, 2, receiver.(*netflowReceiver).config.Workers) - assert.Equal(t, 1_000_000, receiver.(*netflowReceiver).config.QueueSize) + assert.Equal(t, 1_000, receiver.(*netflowReceiver).config.QueueSize) } diff --git a/receiver/netflowreceiver/parser.go b/receiver/netflowreceiver/parser.go new file mode 100644 index 000000000000..fdbbdaa461ce --- /dev/null +++ b/receiver/netflowreceiver/parser.go @@ -0,0 +1,138 @@ +package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" + +import ( + "errors" + "net/netip" + "time" + + "github.com/netsampler/goflow2/v2/producer" + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" +) + +var ( + etypeName = map[uint32]string{ + 0x806: "ARP", + 0x800: "IPv4", + 0x86dd: "IPv6", + } + protoName = map[uint32]string{ + 1: "ICMP", + 6: "TCP", + 17: "UDP", + 58: "ICMPv6", + 132: "SCTP", + } + + flowTypeName = map[int32]string{ + 0: "UNKNOWN", + 1: "SFLOW_5", + 2: "NETFLOW_V5", + 3: "NETFLOW_V9", + 4: "IPFIX", + } +) + +type NetworkAddress struct { + Address string `json:"address,omitempty"` + Port uint32 `json:"port,omitempty"` +} + +type Flow struct { + Type string `json:"type,omitempty"` + TimeReceived time.Time `json:"time_received,omitempty"` + Start time.Time `json:"start,omitempty"` + End time.Time `json:"end,omitempty"` + SequenceNum uint32 `json:"sequence_num,omitempty"` + SamplingRate uint64 `json:"sampling_rate,omitempty"` + SamplerAddress string `json:"sampler_address,omitempty"` +} + +type Protocol struct { + Name []byte `json:"name,omitempty"` // Layer 7 +} + +type NetworkIO struct { + Bytes uint64 `json:"bytes,omitempty"` + Packets uint64 `json:"packets,omitempty"` +} + +type OtelNetworkMessage struct { + Source NetworkAddress `json:"source,omitempty"` + Destination NetworkAddress `json:"destination,omitempty"` + Transport string `json:"transport,omitempty"` // Layer 4 + Type string `json:"type,omitempty"` // Layer 3 + IO NetworkIO `json:"io,omitempty"` + Flow Flow `json:"flow,omitempty"` +} + +func getEtypeName(etype uint32) string { + if name, ok := etypeName[etype]; ok { + return name + } + return "unknown" +} + +func getProtoName(proto uint32) string { + if name, ok := protoName[proto]; ok { + return name + } + return "unknown" +} + +func getFlowTypeName(flowType int32) string { + if name, ok := flowTypeName[flowType]; ok { + return name + } + return "unknown" +} + +// ConvertToOtel converts a ProtoProducerMessage to an OtelNetworkMessage +func ConvertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) { + + // we know msg is ProtoProducerMessage because that is the parent producer + pm, ok := m.(*protoproducer.ProtoProducerMessage) + if !ok { + return nil, errors.New("message is not ProtoProducerMessage") + } + + // Parse IP addresses bytes to netip.Addr + srcAddr, _ := netip.AddrFromSlice(pm.SrcAddr) + dstAddr, _ := netip.AddrFromSlice(pm.DstAddr) + samplerAddr, _ := netip.AddrFromSlice(pm.SamplerAddress) + + // Time the receiver received the message + receivedTime := time.Unix(0, int64(pm.TimeReceivedNs)) + startTime := time.Unix(0, int64(pm.TimeFlowStartNs)) + endTime := time.Unix(0, int64(pm.TimeFlowEndNs)) + + // Construct the actual log record based on the otel semantic conventions + // see https://opentelemetry.io/docs/specs/semconv/general/attributes/ + otelMessage := OtelNetworkMessage{ + Source: NetworkAddress{ + Address: srcAddr.String(), + Port: pm.SrcPort, + }, + Destination: NetworkAddress{ + Address: dstAddr.String(), + Port: pm.DstPort, + }, + Type: getEtypeName(pm.Etype), // Layer 3 + Transport: getProtoName(pm.Proto), // Layer 4 + IO: NetworkIO{ + Bytes: pm.Bytes, + Packets: pm.Packets, + }, + Flow: Flow{ + Type: getFlowTypeName(int32(pm.Type)), + TimeReceived: receivedTime, + Start: startTime, + End: endTime, + SequenceNum: pm.SequenceNum, + SamplingRate: pm.SamplingRate, + SamplerAddress: samplerAddr.String(), + }, + } + + return &otelMessage, nil + +} diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go new file mode 100644 index 000000000000..0eff95256e94 --- /dev/null +++ b/receiver/netflowreceiver/producer.go @@ -0,0 +1,92 @@ +package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" + +import ( + "context" + "encoding/json" + + "github.com/netsampler/goflow2/v2/producer" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +// OtelLogsProducerWrapper is a wrapper around a producer.ProducerInterface that sends the messages to a log consumer +type OtelLogsProducerWrapper struct { + wrapped producer.ProducerInterface + logConsumer consumer.Logs +} + +// Produce converts the message into a list log records and sends them to log consumer +func (o *OtelLogsProducerWrapper) Produce(msg interface{}, args *producer.ProduceArgs) ([]producer.ProducerMessage, error) { + + // First we let the proto producer parse the message + // All the netflow protocol and structure is handled by the proto producer + flowMessageSet, err := o.wrapped.Produce(msg, args) + if err != nil { + return flowMessageSet, err + } + + // Create the otel log structure to hold our messages + log := plog.NewLogs() + scopeLog := log.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() + scopeLog.Scope().SetName(metadata.ScopeName) + scopeLog.Scope().Attributes().PutStr("receiver", metadata.Type.String()) + logRecords := scopeLog.LogRecords() + + // A single netflow packet can contain multiple flow messages + for _, msg := range flowMessageSet { + + // Convert each one to the Otel semantic dictionary format + otelMessage, err := ConvertToOtel(msg) + if err != nil { + continue + } + + logRecord := logRecords.AppendEmpty() + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(otelMessage.Flow.Start)) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(otelMessage.Flow.TimeReceived)) + + // The bytes of the message in JSON format + m, err := json.Marshal(otelMessage) + if err != nil { + continue + } + + // Convert to a map[string] + // https://opentelemetry.io/docs/specs/otel/logs/data-model/#type-mapstring-any + sec := map[string]interface{}{} + if err = json.Unmarshal(m, &sec); err != nil { + continue + } + + // Set the map to the log record body + err = logRecord.Body().SetEmptyMap().FromRaw(sec) + if err != nil { + continue + } + } + + // Send the logs to the collector, it is difficult to pass the context here + err = o.logConsumer.ConsumeLogs(context.TODO(), log) + if err != nil { + return flowMessageSet, err + } + + return flowMessageSet, nil +} + +func (o *OtelLogsProducerWrapper) Close() { + o.wrapped.Close() +} + +func (o *OtelLogsProducerWrapper) Commit(flowMessageSet []producer.ProducerMessage) { + o.wrapped.Commit(flowMessageSet) +} + +func NewOtelLogsProducer(wrapped producer.ProducerInterface, logConsumer consumer.Logs) producer.ProducerInterface { + return &OtelLogsProducerWrapper{ + wrapped: wrapped, + logConsumer: logConsumer, + } +} diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index c31bcbe7baaf..8e813a6c01bf 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -12,18 +12,28 @@ import ( ) type netflowReceiver struct { - // host component.Host - // cancel context.CancelFunc config *Config logConsumer consumer.Logs logger *zap.Logger - // listeners []*Listener + listener *Listener } -func (nr *netflowReceiver) Start(_ context.Context, _ component.Host) error { +func (nr *netflowReceiver) Start(ctx context.Context, host component.Host) error { + // TODO - Pass ctx and host here + listener := NewListener(*nr.config, nr.logger, nr.logConsumer) + if err := listener.Start(); err != nil { + return err + } + nr.listener = listener + nr.logger.Info("NetFlow receiver started") return nil } func (nr *netflowReceiver) Shutdown(_ context.Context) error { + nr.logger.Info("NetFlow receiver is shutting down") + err := nr.listener.Shutdown() + if err != nil { + return err + } return nil }