From e171a0478ebfd03825ab4d2e33b41e59fffe7e24 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 18 Jul 2024 06:10:29 +0200 Subject: [PATCH] feat(decoders.sflow): Add decoding of drop packets (#337) --- decoders/sflow/datastructure.go | 14 ++++++++ decoders/sflow/packet.go | 12 +++++++ decoders/sflow/sflow.go | 52 +++++++++++++++++++++++++-- decoders/sflow/sflow_test.go | 62 +++++++++++++++++++++++++++++++++ decoders/utils/utils.go | 9 +++++ 5 files changed, 147 insertions(+), 2 deletions(-) diff --git a/decoders/sflow/datastructure.go b/decoders/sflow/datastructure.go index e57573d9..817500e2 100644 --- a/decoders/sflow/datastructure.go +++ b/decoders/sflow/datastructure.go @@ -66,6 +66,20 @@ type ExtendedGateway struct { LocalPref uint32 `json:"local-pref"` } +type EgressQueue struct { + Queue uint32 `json:"queue"` +} + +type ExtendedACL struct { + Number uint32 `json:"number"` + Name string `json:"name"` + Direction uint32 `json:"direction"` // 0:unknown, 1:ingress, 2:egress +} + +type ExtendedFunction struct { + Symbol string `json:"symbol"` +} + type IfCounters struct { IfIndex uint32 `json:"if-index"` IfType uint32 `json:"if-type"` diff --git a/decoders/sflow/packet.go b/decoders/sflow/packet.go index 357a9041..bdd5cbb7 100644 --- a/decoders/sflow/packet.go +++ b/decoders/sflow/packet.go @@ -55,6 +55,18 @@ type ExpandedFlowSample struct { Records []FlowRecord `json:"records"` } +// DropSample data structure according to https://sflow.org/sflow_drops.txt +type DropSample struct { + Header SampleHeader `json:"header"` + + Drops uint32 `json:"drops"` + Input uint32 `json:"input"` + Output uint32 `json:"output"` + Reason uint32 `json:"reason"` + FlowRecordsCount uint32 `json:"flow-records-count"` + Records []FlowRecord `json:"records"` +} + type RecordHeader struct { DataFormat uint32 `json:"data-format"` Length uint32 `json:"length"` diff --git a/decoders/sflow/sflow.go b/decoders/sflow/sflow.go index 5eeceb1d..befa4bdf 100644 --- a/decoders/sflow/sflow.go +++ b/decoders/sflow/sflow.go @@ -13,6 +13,7 @@ const ( SAMPLE_FORMAT_COUNTER = 2 SAMPLE_FORMAT_EXPANDED_FLOW = 3 SAMPLE_FORMAT_EXPANDED_COUNTER = 4 + SAMPLE_FORMAT_DROP = 5 ) // Opaque flow_data types according to https://sflow.org/SFLOW-STRUCTS5.txt @@ -33,6 +34,11 @@ const ( FLOW_TYPE_EXT_MPLS_FEC = 1010 FLOW_TYPE_EXT_MPLS_LVP_FEC = 1011 FLOW_TYPE_EXT_VLAN_TUNNEL = 1012 + + // According to https://sflow.org/sflow_drops.txt + FLOW_TYPE_EGRESS_QUEUE = 1036 + FLOW_TYPE_EXT_ACL = 1037 + FLOW_TYPE_EXT_FUNCTION = 1038 ) // Opaque counter_data types according to https://sflow.org/SFLOW-STRUCTS5.txt @@ -319,6 +325,24 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord, extendedGateway.Communities = communities flowRecord.Data = extendedGateway + case FLOW_TYPE_EGRESS_QUEUE: + var queue EgressQueue + if err := utils.BinaryDecoder(payload, &queue.Queue); err != nil { + return flowRecord, &RecordError{header.DataFormat, err} + } + flowRecord.Data = queue + case FLOW_TYPE_EXT_ACL: + var acl ExtendedACL + if err := utils.BinaryDecoder(payload, &acl.Number, &acl.Name, &acl.Direction); err != nil { + return flowRecord, &RecordError{header.DataFormat, err} + } + flowRecord.Data = acl + case FLOW_TYPE_EXT_FUNCTION: + var function ExtendedFunction + if err := utils.BinaryDecoder(payload, &function.Symbol); err != nil { + return flowRecord, &RecordError{header.DataFormat, err} + } + flowRecord.Data = function default: var rawRecord RawRecord rawRecord.Data = payload.Bytes() @@ -344,10 +368,9 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err if err := utils.BinaryDecoder(payload, &sourceId); err != nil { return sample, &FlowError{format, seq, fmt.Errorf("header source [%w]", err)} } - header.SourceIdType = sourceId >> 24 header.SourceIdValue = sourceId & 0x00ffffff - case SAMPLE_FORMAT_EXPANDED_FLOW, SAMPLE_FORMAT_EXPANDED_COUNTER: + case SAMPLE_FORMAT_EXPANDED_FLOW, SAMPLE_FORMAT_EXPANDED_COUNTER, SAMPLE_FORMAT_DROP: // Explicit data-source format if err := utils.BinaryDecoder(payload, &header.SourceIdType, @@ -363,6 +386,8 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err var flowSample FlowSample var counterSample CounterSample var expandedFlowSample ExpandedFlowSample + var dropSample DropSample + switch format { case SAMPLE_FORMAT_FLOW: flowSample.Header = *header @@ -410,6 +435,23 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err recordsCount = expandedFlowSample.FlowRecordsCount expandedFlowSample.Records = make([]FlowRecord, recordsCount) sample = expandedFlowSample + case SAMPLE_FORMAT_DROP: + dropSample.Header = *header + if err := utils.BinaryDecoder(payload, + &dropSample.Drops, + &dropSample.Input, + &dropSample.Output, + &dropSample.Reason, + &dropSample.FlowRecordsCount, + ); err != nil { + return sample, &FlowError{format, seq, fmt.Errorf("raw [%w]", err)} + } + recordsCount = dropSample.FlowRecordsCount + if recordsCount > 1000 { // protection against ddos + return sample, &FlowError{format, seq, fmt.Errorf("too many flow records: %d", recordsCount)} + } + dropSample.Records = make([]FlowRecord, recordsCount) // max size of 1000 for protection + sample = dropSample } for i := 0; i < int(recordsCount) && payload.Len() >= 8; i++ { recordHeader := RecordHeader{} @@ -442,6 +484,12 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err return sample, &FlowError{format, seq, fmt.Errorf("record [%w]", err)} } expandedFlowSample.Records[i] = record + case SAMPLE_FORMAT_DROP: + record, err := DecodeFlowRecord(&recordHeader, recordReader) + if err != nil { + return sample, &FlowError{format, seq, fmt.Errorf("record [%w]", err)} + } + dropSample.Records[i] = record } } return sample, nil diff --git a/decoders/sflow/sflow_test.go b/decoders/sflow/sflow_test.go index 8c5335c2..404f3a18 100644 --- a/decoders/sflow/sflow_test.go +++ b/decoders/sflow/sflow_test.go @@ -128,3 +128,65 @@ func TestExpandedSFlowDecode(t *testing.T) { var packet Packet assert.NoError(t, DecodeMessageVersion(buf, &packet)) } + +func TestSFlowDecodeDropEgressQueue(t *testing.T) { + data := []byte{ + 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0xc0, 0xa8, 0x77, 0xb8, 0x00, 0x01, 0x86, 0xa0, + 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x30, 0x7e, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05, + 0x00, 0x00, 0x00, 0x2C, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x04, 0x0c, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x2a, + } + + buf := bytes.NewBuffer(data) + var packet Packet + assert.NoError(t, DecodeMessageVersion(buf, &packet)) + assert.Len(t, packet.Samples, 1) + assert.NotNil(t, packet.Samples[0]) + sample, ok := packet.Samples[0].(DropSample) + assert.True(t, ok) + assert.Len(t, sample.Records, 1) + assert.Equal(t, EgressQueue{Queue: 42}, sample.Records[0].Data) +} + +func TestSFlowDecodeDropExtendedACL(t *testing.T) { + data := []byte{ + 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0xc0, 0xa8, 0x77, 0xb8, 0x00, 0x01, 0x86, 0xa0, + 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x30, 0x7e, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05, + 0x00, 0x00, 0x00, 0x38, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x04, 0x0d, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x2a, + 0x00, 0x00, 0x00, 0x04, 0x66, 0x6f, 0x6f, 0x21, 0x00, 0x00, 0x00, 0x02, + } + + buf := bytes.NewBuffer(data) + var packet Packet + assert.NoError(t, DecodeMessageVersion(buf, &packet)) + assert.Len(t, packet.Samples, 1) + assert.NotNil(t, packet.Samples[0]) + sample, ok := packet.Samples[0].(DropSample) + assert.True(t, ok) + assert.Len(t, sample.Records, 1) + assert.Equal(t, ExtendedACL{Number: 42, Name: "foo!", Direction: 2}, sample.Records[0].Data) +} + +func TestSFlowDecodeDropExtendedFunction(t *testing.T) { + data := []byte{ + 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x01, 0xc0, 0xa8, 0x77, 0xb8, 0x00, 0x01, 0x86, 0xa0, + 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x30, 0x7e, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05, + 0x00, 0x00, 0x00, 0x32, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x04, 0x0e, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x06, + 0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72, + } + + buf := bytes.NewBuffer(data) + var packet Packet + assert.NoError(t, DecodeMessageVersion(buf, &packet)) + assert.Len(t, packet.Samples, 1) + assert.NotNil(t, packet.Samples[0]) + sample, ok := packet.Samples[0].(DropSample) + assert.True(t, ok) + assert.Len(t, sample.Records, 1) + assert.Equal(t, ExtendedFunction{Symbol: "foobar"}, sample.Records[0].Data) +} diff --git a/decoders/utils/utils.go b/decoders/utils/utils.go index ae2c7e75..79624806 100644 --- a/decoders/utils/utils.go +++ b/decoders/utils/utils.go @@ -48,6 +48,13 @@ func BinaryRead(payload BytesBuffer, order binary.ByteOrder, data any) error { *data = int64(order.Uint64(bs)) case *uint64: *data = order.Uint64(bs) + case *string: + strlen := int(order.Uint32(bs)) + buf := payload.Next(strlen) + if len(buf) < strlen { + return io.ErrUnexpectedEOF + } + *data = string(buf) case []bool: for i, x := range bs { // Easier to loop over the input for 8-bit values. data[i] = x != 0 @@ -121,6 +128,8 @@ func intDataSize(data any) int { return 2 * len(data) case int32, uint32, *int32, *uint32: return 4 + case *string: // return the length field + return 4 case []int32: return 4 * len(data) case []uint32: