Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/netflow] Add the netflow receiver - PR 1 #34164

Merged
merged 41 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e4b5115
netflowreceiver - factory and config
dlopes7 Jul 14, 2024
be502c2
netflowreceiver - add minimum files
dlopes7 Jul 18, 2024
3ae8ad1
netflowreceiver - run make commands
dlopes7 Jul 18, 2024
9aa77b6
netflowreceiver - adjust config and tests
dlopes7 Jul 18, 2024
b619e40
netflowreceiver - run make generate and config_test
dlopes7 Jul 18, 2024
5f03ff9
netflowreceiver - add config tests
dlopes7 Jul 18, 2024
040136f
netflowreceiver - add tests, mod tidy
dlopes7 Jul 18, 2024
c4e9bd2
netflowreceiver - readme
dlopes7 Jul 19, 2024
8de9251
netflowreceiver - add readme table
dlopes7 Jul 19, 2024
cf4720b
netflowreceiver - make multimod-verify
dlopes7 Jul 19, 2024
c5ee961
netflowreceiver - make addlicense
dlopes7 Jul 19, 2024
b9f74cf
Merge branch 'main' into netflow-receiver
dlopes7 Jul 19, 2024
0996c1c
Update receiver/netflowreceiver/README.md
dlopes7 Aug 9, 2024
97e1bec
Update receiver/netflowreceiver/testdata/config.yaml
dlopes7 Aug 9, 2024
77270e6
Update receiver/netflowreceiver/factory.go
dlopes7 Aug 9, 2024
8c7bd24
Update receiver/netflowreceiver/README.md
dlopes7 Aug 9, 2024
d446489
Merge branch 'open-telemetry:main' into netflow-receiver
dlopes7 Aug 9, 2024
7561af8
netflow - pr suggestions
dlopes7 Aug 10, 2024
871f2ff
netflow - update deps
dlopes7 Aug 20, 2024
f8d4c2c
netflow - add to CODEOWNERS
dlopes7 Aug 20, 2024
5cea6ec
netflow - adjust linting and go.mod
dlopes7 Aug 20, 2024
d6dae6e
Merge branch 'main' into netflow-receiver
dlopes7 Aug 20, 2024
3b42bc4
netflowreceiver - gofmt and gci
dlopes7 Aug 22, 2024
306b267
Merge branch 'netflow-receiver' of github.com:dlopes7/opentelemetry-c…
dlopes7 Aug 22, 2024
3b1019e
netflowreceiver - go.mod version update
dlopes7 Aug 22, 2024
7e13976
Merge branch 'main' into netflow-receiver
dlopes7 Aug 22, 2024
062d07a
netflow - add to versions.yaml
dlopes7 Sep 5, 2024
f62e092
netflow - run make checks
dlopes7 Sep 5, 2024
ca5128c
netflow - update go.mod
dlopes7 Sep 5, 2024
8c65324
Update receiver/netflowreceiver/factory.go
dlopes7 Oct 14, 2024
56b1024
#34164 - add dlopes7 to githubgen allowlist
dlopes7 Nov 8, 2024
f3e45ed
netflow - update readme with new format
dlopes7 Nov 8, 2024
9680294
Merge remote-tracking branch 'upstream/main' into netflow-receiver
dlopes7 Nov 11, 2024
cd49448
netflow - update to 0.113.0
dlopes7 Nov 11, 2024
e1c03bf
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
dlopes7 Nov 11, 2024
20eee81
netflow - generate
dlopes7 Nov 11, 2024
f56b4ae
Merge branch 'main' into netflow-receiver
dlopes7 Nov 12, 2024
4428150
Merge branch 'main' into netflow-receiver
dlopes7 Nov 20, 2024
0ff2e6c
Merge branch 'main' into netflow-receiver
dlopes7 Nov 22, 2024
e6b99ac
Fix linting errors
evan-bradley Nov 23, 2024
58b4baa
Update to latest Collector version
evan-bradley Nov 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/netflow-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: netflowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce the netflow receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32732]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ receiver/mongodbatlasreceiver/ @open-telemetry/collector-cont
receiver/mongodbreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski @schmikei
receiver/mysqlreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski
receiver/namedpipereceiver/ @open-telemetry/collector-contrib-approvers @sinkingpoint @djaglowski
receiver/netflowreceiver/ @open-telemetry/collector-contrib-approvers @evan-bradley @dlopes7
receiver/nginxreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski
receiver/nsxtreceiver/ @open-telemetry/collector-contrib-approvers @dashpole @schmikei
receiver/ntpreceiver/ @open-telemetry/collector-contrib-approvers @atoulme
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/ntp
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/ntp
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/ntp
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/ntp
Expand Down
3 changes: 2 additions & 1 deletion cmd/githubgen/allowlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ Hemansh31
shazlehu
dsimil
KiranmayiB
harishbohara11
harishbohara11
dlopes7
1 change: 1 addition & 0 deletions receiver/netflowreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
102 changes: 102 additions & 0 deletions receiver/netflowreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Netflow receiver
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fnetflow%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fnetflow) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fnetflow%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fnetflow) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@evan-bradley](https://www.github.com/evan-bradley), [@dlopes7](https://www.github.com/dlopes7) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->

The netflow receiver can listen for [netflow](https://en.wikipedia.org/wiki/NetFlow), [sflow](https://en.wikipedia.org/wiki/SFlow), and [ipfix](https://en.wikipedia.org/wiki/IP_Flow_Information_Export) data and convert it to OpenTelemetry logs. The receiver is based on the [goflow2](https://github.com/netsampler/goflow2) project.

This gives OpenTelemetry users the capability of monitoring network traffic, and answer questions like:

* Which protocols are passing through the network?
* Which servers and clients are producing the highest amount of traffic?
* What ports are involved in these network calls?
* How many bytes and packets are being sent and received?

## Getting started

By default the receiver will listen for ipfix and netflow on port `2055`. The receiver can be configured to listen on different ports and protocols.

Example configuration:

```yaml
receivers:
netflow:
- scheme: netflow
port: 2055
sockets: 16
workers: 32

processors:
batch:
send_batch_size: 2000
timeout: 30s

exporters:
debug:
verbosity: detailed

service:
pipelines:
logs:
receivers: [netflow]
processors: [batch]
exporters: [debug]
telemetry:
logs:
level: debug
```

We recommend using the batch processor to reduce the number of log requests being sent to the exporter. The batch processor will batch log records together and send them in a single request to the exporter.

You would then configure your network devices to send netflow, sflow, or ipfix data to the Collector on the specified ports.

## Configuration
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

| Field | Description | Examples | Default |
|-------|-------------|--------| ------- |
| scheme | The type of flow data that to receive | `sflow`, `netflow`, `flow` | `netflow` |
| hostname | The hostname or IP address to bind to | `localhost` | `0.0.0.0` |
| port | The port to bind to | `2055` or `6343` | `2055` |
| sockets | The number of sockets to use | 1 | 1 |
| workers | The number of workers used to decode incoming flow messages | 2 | 2 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying library suggests that the workers should be the # of CPU cores available and that there should be 2 workers for each socket. Do we want to mention those recommendations here under a performance header?

Copy link
Contributor

@evan-bradley evan-bradley Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlopes7 Do we still want to put in a performance section that describes these recommendations?

| queue_size | The size of the incoming netflow packets queue | 1000 | 1000000 |

## Data format

The netflow data is standardized for the different schemas and is converted to OpenTelemetry logs following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes)

The output will adhere the format:

```json
{
"destination": {
"address": "192.168.0.1",
"port": 22
},
"flow": {
"end": 1731073104662487000,
"sampler_address": "192.168.0.2",
"sequence_num": 49,
"start": 1731073077662487000,
"time_received": 1731073138662487000,
"type": "NETFLOW_V5"
},
"io": {
"bytes": 529,
"packets": 378
},
"source": {
"address": "192.168.0.3",
"port": 40
},
"transport": "TCP",
"type": "IPv4"
}
```
65 changes: 65 additions & 0 deletions receiver/netflowreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import "fmt"

// Config represents the receiver config settings within the collector's config.yaml
type Config struct {
// The scheme defines the type of flow data that the listener will receive
// The scheme must be one of sflow, netflow, or flow
Scheme string `mapstructure:"scheme"`

// The hostname or IP address that the listener will bind to
Hostname string `mapstructure:"hostname"`

// The port that the listener will bind to
Port int `mapstructure:"port"`

// The number of sockets that the listener will use
Sockets int `mapstructure:"sockets"`
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

// The number of workers that the listener will use to decode incoming flow messages
// By default it will be two times the number of sockets
// Ideally set this to the number of CPU cores
Workers int `mapstructure:"workers"`

// The size of the queue that the listener will use
// This is a buffer that will hold flow messages before they are processed by a worker
QueueSize int `mapstructure:"queue_size"`
}

// Validate checks if the receiver configuration is valid
func (cfg *Config) Validate() error {
validSchemes := [3]string{"sflow", "netflow", "flow"}

validScheme := false
for _, scheme := range validSchemes {
if cfg.Scheme == scheme {
validScheme = true
break
}
}
if !validScheme {
return fmt.Errorf("scheme must be one of sflow, netflow, or flow")
}

if cfg.Sockets <= 0 {
return fmt.Errorf("sockets must be greater than 0")
}

if cfg.Workers <= 0 {
return fmt.Errorf("workers must be greater than 0")
}

if cfg.QueueSize <= 0 {
cfg.QueueSize = defaultQueueSize
}

if cfg.Port <= 0 {
return fmt.Errorf("port must be greater than 0")
}

return nil
}
92 changes: 92 additions & 0 deletions receiver/netflowreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expected component.Config
}{
{
id: component.NewIDWithName(metadata.Type, "defaults"),
expected: createDefaultConfig(),
},
{
id: component.NewIDWithName(metadata.Type, "one_listener"),
expected: &Config{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000000,
},
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
})
}
}

func TestInvalidConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
err string
}{
{
id: component.NewIDWithName(metadata.Type, "invalid_schema"),
err: "scheme must be one of sflow, netflow, or flow",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_port"),
err: "port must be greater than 0",
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

err = component.ValidateConfig(cfg)
assert.ErrorContains(t, err, tt.err)
})
}
}
6 changes: 6 additions & 0 deletions receiver/netflowreceiver/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"
51 changes: 51 additions & 0 deletions receiver/netflowreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata"
)

const (
defaultSockets = 1
defaultWorkers = 2
defaultQueueSize = 1_000_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the queue size here is # of UDP Packets, a UDP packet can be up to 9KB which would be 9GB of memory usage with a full queue.

Perhaps the default queue should be smaller?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's a good default? Should we make this 1_000 so it is only 9MB?

)

// NewFactory creates a factory for netflow receiver.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
}

func createDefaultConfig() component.Config {
return &Config{
Scheme: "netflow",
Port: 2055,
Sockets: defaultSockets,
Workers: defaultWorkers,
QueueSize: defaultQueueSize,
}
}

func createLogsReceiver(_ context.Context, params receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
logger := params.Logger
conf := cfg.(*Config)

nr := &netflowReceiver{
logger: logger,
logConsumer: consumer,
config: conf,
}

return nr, nil
}
Loading