Skip to content

Commit

Permalink
NETOBSERV-1956: NETOBSERV-1957: webhook conntrack cases (#991)
Browse files Browse the repository at this point in the history
- Warn on performance impacts when using conntrack All
- Make conntrack+no-loki invalid
  • Loading branch information
jotak authored Jan 20, 2025
1 parent 39805a2 commit 4128895
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 43 deletions.
8 changes: 4 additions & 4 deletions apis/flowcollector/v1beta2/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,10 @@ type FlowCollectorFLP struct {
KafkaConsumerBatchSize int `json:"kafkaConsumerBatchSize"`

// `logTypes` defines the desired record types to generate. Possible values are:<br>
// - `Flows` (default) to export regular network flows<br>
// - `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates<br>
// - `EndedConversations` to generate only ended conversations events<br>
// - `All` to generate both network flows and all conversations events<br>
// - `Flows` (default) to export regular network flows.<br>
// - `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates.<br>
// - `EndedConversations` to generate only ended conversations events.<br>
// - `All` to generate both network flows and all conversations events. It is not recommended due to the impact on resources footprint.<br>
// +kubebuilder:validation:Optional
// +kubebuilder:validation:Enum:="Flows";"Conversations";"EndedConversations";"All"
// +kubebuilder:default:=Flows
Expand Down
46 changes: 30 additions & 16 deletions apis/flowcollector/v1beta2/flowcollector_validation_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ func (r *FlowCollector) ValidateDelete(_ context.Context, _ runtime.Object) (adm
func (r *FlowCollector) validate(ctx context.Context, fc *FlowCollector) (admission.Warnings, error) {
var allW admission.Warnings
var allE []error
w, errs := r.validateAgent(ctx, fc)
w, errs := r.validateAgent(ctx, &fc.Spec)
allW, allE = collect(allW, allE, w, errs)
w = r.warnLogLevels(fc)
w, errs = r.validateConversationTracking(ctx, &fc.Spec)
allW, allE = collect(allW, allE, w, errs)
w = r.warnLogLevels(&fc.Spec)
allW, allE = collect(allW, allE, w, nil)
return allW, errors.Join(allE...)
}
Expand All @@ -69,21 +71,21 @@ func collect(wPool admission.Warnings, errsPool []error, w admission.Warnings, e
return wPool, errsPool
}

func (r *FlowCollector) warnLogLevels(fc *FlowCollector) admission.Warnings {
func (r *FlowCollector) warnLogLevels(fc *FlowCollectorSpec) admission.Warnings {
var w admission.Warnings
if fc.Spec.Agent.EBPF.LogLevel == "debug" || fc.Spec.Agent.EBPF.LogLevel == "trace" {
w = append(w, fmt.Sprintf("The log level for the eBPF agent is %s, which impacts performance and resource footprint.", fc.Spec.Agent.EBPF.LogLevel))
if fc.Agent.EBPF.LogLevel == "debug" || fc.Agent.EBPF.LogLevel == "trace" {
w = append(w, fmt.Sprintf("The log level for the eBPF agent is %s, which impacts performance and resource footprint.", fc.Agent.EBPF.LogLevel))
}
if fc.Spec.Processor.LogLevel == "debug" || fc.Spec.Processor.LogLevel == "trace" {
w = append(w, fmt.Sprintf("The log level for the processor (flowlogs-pipeline) is %s, which impacts performance and resource footprint.", fc.Spec.Processor.LogLevel))
if fc.Processor.LogLevel == "debug" || fc.Processor.LogLevel == "trace" {
w = append(w, fmt.Sprintf("The log level for the processor (flowlogs-pipeline) is %s, which impacts performance and resource footprint.", fc.Processor.LogLevel))
}
return w
}

// nolint:cyclop
func (r *FlowCollector) validateAgent(_ context.Context, fc *FlowCollector) (admission.Warnings, []error) {
func (r *FlowCollector) validateAgent(_ context.Context, fc *FlowCollectorSpec) (admission.Warnings, []error) {
var warnings admission.Warnings
if slices.Contains(fc.Spec.Agent.EBPF.Features, NetworkEvents) {
if slices.Contains(fc.Agent.EBPF.Features, NetworkEvents) {
// Make sure required version of ocp is installed
if CurrentClusterInfo != nil && CurrentClusterInfo.IsOpenShift() {
b, err := CurrentClusterInfo.OpenShiftVersionIsAtLeast("4.18.0")
Expand All @@ -95,29 +97,29 @@ func (r *FlowCollector) validateAgent(_ context.Context, fc *FlowCollector) (adm
} else {
warnings = append(warnings, "The NetworkEvents feature is only supported with OpenShift")
}
if !fc.Spec.Agent.EBPF.Privileged {
if !fc.Agent.EBPF.Privileged {
warnings = append(warnings, "The NetworkEvents feature requires eBPF Agent to run in privileged mode")
}
}
if slices.Contains(fc.Spec.Agent.EBPF.Features, PacketDrop) && !fc.Spec.Agent.EBPF.Privileged {
if slices.Contains(fc.Agent.EBPF.Features, PacketDrop) && !fc.Agent.EBPF.Privileged {
warnings = append(warnings, "The PacketDrop feature requires eBPF Agent to run in privileged mode")
}
if slices.Contains(fc.Spec.Agent.EBPF.Features, EbpfManager) && !fc.Spec.Agent.EBPF.Privileged {
if slices.Contains(fc.Agent.EBPF.Features, EbpfManager) && !fc.Agent.EBPF.Privileged {
warnings = append(warnings, "The BPF Manager feature requires eBPF Agent to run in privileged mode")
}
var errs []error
if fc.Spec.Agent.EBPF.FlowFilter != nil && fc.Spec.Agent.EBPF.FlowFilter.Enable != nil && *fc.Spec.Agent.EBPF.FlowFilter.Enable {
if fc.Agent.EBPF.FlowFilter != nil && fc.Agent.EBPF.FlowFilter.Enable != nil && *fc.Agent.EBPF.FlowFilter.Enable {
m := make(map[string]bool)
for i := range fc.Spec.Agent.EBPF.FlowFilter.FlowFilterRules {
rule := fc.Spec.Agent.EBPF.FlowFilter.FlowFilterRules[i]
for i := range fc.Agent.EBPF.FlowFilter.FlowFilterRules {
rule := fc.Agent.EBPF.FlowFilter.FlowFilterRules[i]
if found := m[rule.CIDR]; found {
errs = append(errs, fmt.Errorf("flow filter rule CIDR %s already exists", rule.CIDR))
break
}
m[rule.CIDR] = true
errs = append(errs, validateFilter(&rule)...)
}
errs = append(errs, validateFilter(fc.Spec.Agent.EBPF.FlowFilter)...)
errs = append(errs, validateFilter(fc.Agent.EBPF.FlowFilter)...)
}

return warnings, errs
Expand Down Expand Up @@ -256,3 +258,15 @@ func validatePortString(s string) (uint16, error) {
}
return uint16(p), nil
}

func (r *FlowCollector) validateConversationTracking(_ context.Context, fc *FlowCollectorSpec) (admission.Warnings, []error) {
var warnings admission.Warnings
if fc.Processor.LogTypes != nil && *fc.Processor.LogTypes == LogTypeAll {
warnings = append(warnings, "Enabling all log types (in spec.processor.logTypes) has a high impact on resources footprint")
}
var errs []error
if fc.Processor.LogTypes != nil && *fc.Processor.LogTypes != LogTypeFlows && fc.Loki.Enable != nil && !*fc.Loki.Enable {
errs = append(errs, errors.New("enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit"))
}
return warnings, errs
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func TestValidateAgent(t *testing.T) {
CurrentClusterInfo = &cluster.Info{}
for _, test := range tests {
CurrentClusterInfo.MockOpenShiftVersion(test.ocpVersion)
warnings, errs := test.fc.validateAgent(context.TODO(), test.fc)
warnings, errs := test.fc.validateAgent(context.TODO(), &test.fc.Spec)
if test.expectedError == "" {
assert.Empty(t, errs, test.name)
} else {
Expand All @@ -356,3 +356,74 @@ func TestValidateAgent(t *testing.T) {
assert.Equal(t, test.expectedWarnings, warnings, test.name)
}
}

func TestValidateConntrack(t *testing.T) {
tests := []struct {
name string
fc *FlowCollector
expectedError string
expectedWarnings admission.Warnings
}{
{
name: "Conntrack with Loki is valid",
fc: &FlowCollector{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster",
},
Spec: FlowCollectorSpec{
Processor: FlowCollectorFLP{
LogTypes: ptr.To(LogTypeConversations),
},
Loki: FlowCollectorLoki{
Enable: ptr.To(true),
},
},
},
},
{
name: "Conntrack ALL is not recommended",
fc: &FlowCollector{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster",
},
Spec: FlowCollectorSpec{
Processor: FlowCollectorFLP{
LogTypes: ptr.To(LogTypeAll),
},
Loki: FlowCollectorLoki{
Enable: ptr.To(true),
},
},
},
expectedWarnings: admission.Warnings{"Enabling all log types (in spec.processor.logTypes) has a high impact on resources footprint"},
},
{
name: "Conntrack without Loki is not recommended",
fc: &FlowCollector{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster",
},
Spec: FlowCollectorSpec{
Processor: FlowCollectorFLP{
LogTypes: ptr.To(LogTypeConversations),
},
Loki: FlowCollectorLoki{
Enable: ptr.To(false),
},
},
},
expectedError: "enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit",
},
}

CurrentClusterInfo = &cluster.Info{}
for _, test := range tests {
warnings, err := test.fc.validate(context.TODO(), test.fc)
if test.expectedError == "" {
assert.Nil(t, err, test.name)
} else {
assert.ErrorContains(t, err, test.expectedError, test.name)
}
assert.Equal(t, test.expectedWarnings, warnings, test.name)
}
}
8 changes: 4 additions & 4 deletions bundle/manifests/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8594,10 +8594,10 @@ spec:
default: Flows
description: |-
`logTypes` defines the desired record types to generate. Possible values are:<br>
- `Flows` (default) to export regular network flows<br>
- `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates<br>
- `EndedConversations` to generate only ended conversations events<br>
- `All` to generate both network flows and all conversations events<br>
- `Flows` (default) to export regular network flows.<br>
- `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates.<br>
- `EndedConversations` to generate only ended conversations events.<br>
- `All` to generate both network flows and all conversations events. It is not recommended due to the impact on resources footprint.<br>
enum:
- Flows
- Conversations
Expand Down
8 changes: 4 additions & 4 deletions config/crd/bases/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7915,10 +7915,10 @@ spec:
default: Flows
description: |-
`logTypes` defines the desired record types to generate. Possible values are:<br>
- `Flows` (default) to export regular network flows<br>
- `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates<br>
- `EndedConversations` to generate only ended conversations events<br>
- `All` to generate both network flows and all conversations events<br>
- `Flows` (default) to export regular network flows.<br>
- `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates.<br>
- `EndedConversations` to generate only ended conversations events.<br>
- `All` to generate both network flows and all conversations events. It is not recommended due to the impact on resources footprint.<br>
enum:
- Flows
- Conversations
Expand Down
18 changes: 8 additions & 10 deletions config/samples/flows_v1beta2_flowcollector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ spec:
excludeInterfaces: ["lo"]
kafkaBatchSize: 1048576
#flowFilter:
# enable: true
# rules:
# - action: Accept
# cidr: 0.0.0.0/0
# - action: Accept
# cidr: 10.128.0.1/24
# ports: 6443
# protocol: TCP
Expand All @@ -40,17 +43,12 @@ spec:
# ports: 53
# protocol: UDP
# sampling: 20
# sourcePorts: 443
# - action: Accept
# cidr: 172.30.0.0/16
# tcpFlags: "SYN"
# cidr: 2.2.2.2/24
# protocol: TCP
# sampling: 30
# sourcePorts: 443
# tcpFlags: "SYN"
# action: Accept
# cidr: 2.2.2.2/24
# protocol: TCP
# sourcePorts: 53
# enable: true
# sourcePorts: 53
metrics:
server:
port: 9400
Expand Down Expand Up @@ -78,7 +76,7 @@ spec:
processor:
imagePullPolicy: IfNotPresent
logLevel: info
# Change logTypes to "CONVERSATIONS" or "ALL" to enable conversation tracking
# Change logTypes to "Conversations", "EndedConversations" or "All" to enable conversation tracking
logTypes: Flows
# Append a unique cluster name to each record
# clusterName: <CLUSTER NAME>
Expand Down
8 changes: 4 additions & 4 deletions docs/FlowCollector.md
Original file line number Diff line number Diff line change
Expand Up @@ -14421,10 +14421,10 @@ This setting is ignored when Kafka is disabled.<br/>
<td>enum</td>
<td>
`logTypes` defines the desired record types to generate. Possible values are:<br>
- `Flows` (default) to export regular network flows<br>
- `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates<br>
- `EndedConversations` to generate only ended conversations events<br>
- `All` to generate both network flows and all conversations events<br><br/>
- `Flows` (default) to export regular network flows.<br>
- `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates.<br>
- `EndedConversations` to generate only ended conversations events.<br>
- `All` to generate both network flows and all conversations events. It is not recommended due to the impact on resources footprint.<br><br/>
<br/>
<i>Enum</i>: Flows, Conversations, EndedConversations, All<br/>
<i>Default</i>: Flows<br/>
Expand Down

0 comments on commit 4128895

Please sign in to comment.