diff --git a/apis/flowcollector/v1beta2/flowcollector_types.go b/apis/flowcollector/v1beta2/flowcollector_types.go index 02057da06..1974f1f33 100644 --- a/apis/flowcollector/v1beta2/flowcollector_types.go +++ b/apis/flowcollector/v1beta2/flowcollector_types.go @@ -637,10 +637,10 @@ type FlowCollectorFLP struct { KafkaConsumerBatchSize int `json:"kafkaConsumerBatchSize"` // `logTypes` defines the desired record types to generate. Possible values are:
- // - `Flows` (default) to export regular network flows
- // - `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates
- // - `EndedConversations` to generate only ended conversations events
- // - `All` to generate both network flows and all conversations events
+ // - `Flows` (default) to export regular network flows.
+ // - `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates.
+ // - `EndedConversations` to generate only ended conversations events.
+ // - `All` to generate both network flows and all conversations events. It is not recommended due to the impact on resources footprint.
// +kubebuilder:validation:Optional // +kubebuilder:validation:Enum:="Flows";"Conversations";"EndedConversations";"All" // +kubebuilder:default:=Flows diff --git a/apis/flowcollector/v1beta2/flowcollector_validation_webhook.go b/apis/flowcollector/v1beta2/flowcollector_validation_webhook.go index d8e5e295c..d9c063609 100644 --- a/apis/flowcollector/v1beta2/flowcollector_validation_webhook.go +++ b/apis/flowcollector/v1beta2/flowcollector_validation_webhook.go @@ -52,9 +52,11 @@ func (r *FlowCollector) ValidateDelete(_ context.Context, _ runtime.Object) (adm func (r *FlowCollector) validate(ctx context.Context, fc *FlowCollector) (admission.Warnings, error) { var allW admission.Warnings var allE []error - w, errs := r.validateAgent(ctx, fc) + w, errs := r.validateAgent(ctx, &fc.Spec) allW, allE = collect(allW, allE, w, errs) - w = r.warnLogLevels(fc) + w, errs = r.validateConversationTracking(ctx, &fc.Spec) + allW, allE = collect(allW, allE, w, errs) + w = r.warnLogLevels(&fc.Spec) allW, allE = collect(allW, allE, w, nil) return allW, errors.Join(allE...) } @@ -69,21 +71,21 @@ func collect(wPool admission.Warnings, errsPool []error, w admission.Warnings, e return wPool, errsPool } -func (r *FlowCollector) warnLogLevels(fc *FlowCollector) admission.Warnings { +func (r *FlowCollector) warnLogLevels(fc *FlowCollectorSpec) admission.Warnings { var w admission.Warnings - if fc.Spec.Agent.EBPF.LogLevel == "debug" || fc.Spec.Agent.EBPF.LogLevel == "trace" { - w = append(w, fmt.Sprintf("The log level for the eBPF agent is %s, which impacts performance and resource footprint.", fc.Spec.Agent.EBPF.LogLevel)) + if fc.Agent.EBPF.LogLevel == "debug" || fc.Agent.EBPF.LogLevel == "trace" { + w = append(w, fmt.Sprintf("The log level for the eBPF agent is %s, which impacts performance and resource footprint.", fc.Agent.EBPF.LogLevel)) } - if fc.Spec.Processor.LogLevel == "debug" || fc.Spec.Processor.LogLevel == "trace" { - w = append(w, fmt.Sprintf("The log level for the processor (flowlogs-pipeline) is %s, which impacts performance and resource footprint.", fc.Spec.Processor.LogLevel)) + if fc.Processor.LogLevel == "debug" || fc.Processor.LogLevel == "trace" { + w = append(w, fmt.Sprintf("The log level for the processor (flowlogs-pipeline) is %s, which impacts performance and resource footprint.", fc.Processor.LogLevel)) } return w } // nolint:cyclop -func (r *FlowCollector) validateAgent(_ context.Context, fc *FlowCollector) (admission.Warnings, []error) { +func (r *FlowCollector) validateAgent(_ context.Context, fc *FlowCollectorSpec) (admission.Warnings, []error) { var warnings admission.Warnings - if slices.Contains(fc.Spec.Agent.EBPF.Features, NetworkEvents) { + if slices.Contains(fc.Agent.EBPF.Features, NetworkEvents) { // Make sure required version of ocp is installed if CurrentClusterInfo != nil && CurrentClusterInfo.IsOpenShift() { b, err := CurrentClusterInfo.OpenShiftVersionIsAtLeast("4.18.0") @@ -95,21 +97,21 @@ func (r *FlowCollector) validateAgent(_ context.Context, fc *FlowCollector) (adm } else { warnings = append(warnings, "The NetworkEvents feature is only supported with OpenShift") } - if !fc.Spec.Agent.EBPF.Privileged { + if !fc.Agent.EBPF.Privileged { warnings = append(warnings, "The NetworkEvents feature requires eBPF Agent to run in privileged mode") } } - if slices.Contains(fc.Spec.Agent.EBPF.Features, PacketDrop) && !fc.Spec.Agent.EBPF.Privileged { + if slices.Contains(fc.Agent.EBPF.Features, PacketDrop) && !fc.Agent.EBPF.Privileged { warnings = append(warnings, "The PacketDrop feature requires eBPF Agent to run in privileged mode") } - if slices.Contains(fc.Spec.Agent.EBPF.Features, EbpfManager) && !fc.Spec.Agent.EBPF.Privileged { + if slices.Contains(fc.Agent.EBPF.Features, EbpfManager) && !fc.Agent.EBPF.Privileged { warnings = append(warnings, "The BPF Manager feature requires eBPF Agent to run in privileged mode") } var errs []error - if fc.Spec.Agent.EBPF.FlowFilter != nil && fc.Spec.Agent.EBPF.FlowFilter.Enable != nil && *fc.Spec.Agent.EBPF.FlowFilter.Enable { + if fc.Agent.EBPF.FlowFilter != nil && fc.Agent.EBPF.FlowFilter.Enable != nil && *fc.Agent.EBPF.FlowFilter.Enable { m := make(map[string]bool) - for i := range fc.Spec.Agent.EBPF.FlowFilter.FlowFilterRules { - rule := fc.Spec.Agent.EBPF.FlowFilter.FlowFilterRules[i] + for i := range fc.Agent.EBPF.FlowFilter.FlowFilterRules { + rule := fc.Agent.EBPF.FlowFilter.FlowFilterRules[i] if found := m[rule.CIDR]; found { errs = append(errs, fmt.Errorf("flow filter rule CIDR %s already exists", rule.CIDR)) break @@ -117,7 +119,7 @@ func (r *FlowCollector) validateAgent(_ context.Context, fc *FlowCollector) (adm m[rule.CIDR] = true errs = append(errs, validateFilter(&rule)...) } - errs = append(errs, validateFilter(fc.Spec.Agent.EBPF.FlowFilter)...) + errs = append(errs, validateFilter(fc.Agent.EBPF.FlowFilter)...) } return warnings, errs @@ -256,3 +258,15 @@ func validatePortString(s string) (uint16, error) { } return uint16(p), nil } + +func (r *FlowCollector) validateConversationTracking(_ context.Context, fc *FlowCollectorSpec) (admission.Warnings, []error) { + var warnings admission.Warnings + if fc.Processor.LogTypes != nil && *fc.Processor.LogTypes == LogTypeAll { + warnings = append(warnings, "Enabling all log types (in spec.processor.logTypes) has a high impact on resources footprint") + } + var errs []error + if fc.Processor.LogTypes != nil && *fc.Processor.LogTypes != LogTypeFlows && fc.Loki.Enable != nil && !*fc.Loki.Enable { + errs = append(errs, errors.New("enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit")) + } + return warnings, errs +} diff --git a/apis/flowcollector/v1beta2/flowcollector_validation_webhook_test.go b/apis/flowcollector/v1beta2/flowcollector_validation_webhook_test.go index 26645cf2d..073667306 100644 --- a/apis/flowcollector/v1beta2/flowcollector_validation_webhook_test.go +++ b/apis/flowcollector/v1beta2/flowcollector_validation_webhook_test.go @@ -346,7 +346,7 @@ func TestValidateAgent(t *testing.T) { CurrentClusterInfo = &cluster.Info{} for _, test := range tests { CurrentClusterInfo.MockOpenShiftVersion(test.ocpVersion) - warnings, errs := test.fc.validateAgent(context.TODO(), test.fc) + warnings, errs := test.fc.validateAgent(context.TODO(), &test.fc.Spec) if test.expectedError == "" { assert.Empty(t, errs, test.name) } else { @@ -356,3 +356,74 @@ func TestValidateAgent(t *testing.T) { assert.Equal(t, test.expectedWarnings, warnings, test.name) } } + +func TestValidateConntrack(t *testing.T) { + tests := []struct { + name string + fc *FlowCollector + expectedError string + expectedWarnings admission.Warnings + }{ + { + name: "Conntrack with Loki is valid", + fc: &FlowCollector{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + }, + Spec: FlowCollectorSpec{ + Processor: FlowCollectorFLP{ + LogTypes: ptr.To(LogTypeConversations), + }, + Loki: FlowCollectorLoki{ + Enable: ptr.To(true), + }, + }, + }, + }, + { + name: "Conntrack ALL is not recommended", + fc: &FlowCollector{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + }, + Spec: FlowCollectorSpec{ + Processor: FlowCollectorFLP{ + LogTypes: ptr.To(LogTypeAll), + }, + Loki: FlowCollectorLoki{ + Enable: ptr.To(true), + }, + }, + }, + expectedWarnings: admission.Warnings{"Enabling all log types (in spec.processor.logTypes) has a high impact on resources footprint"}, + }, + { + name: "Conntrack without Loki is not recommended", + fc: &FlowCollector{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + }, + Spec: FlowCollectorSpec{ + Processor: FlowCollectorFLP{ + LogTypes: ptr.To(LogTypeConversations), + }, + Loki: FlowCollectorLoki{ + Enable: ptr.To(false), + }, + }, + }, + expectedError: "enabling conversation tracking without Loki is not allowed, as it generates extra processing for no benefit", + }, + } + + CurrentClusterInfo = &cluster.Info{} + for _, test := range tests { + warnings, err := test.fc.validate(context.TODO(), test.fc) + if test.expectedError == "" { + assert.Nil(t, err, test.name) + } else { + assert.ErrorContains(t, err, test.expectedError, test.name) + } + assert.Equal(t, test.expectedWarnings, warnings, test.name) + } +} diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml index 7f6791517..5bbf3ef20 100644 --- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml +++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml @@ -8594,10 +8594,10 @@ spec: default: Flows description: |- `logTypes` defines the desired record types to generate. Possible values are:
- - `Flows` (default) to export regular network flows
- - `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates
- - `EndedConversations` to generate only ended conversations events
- - `All` to generate both network flows and all conversations events
+ - `Flows` (default) to export regular network flows.
+ - `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates.
+ - `EndedConversations` to generate only ended conversations events.
+ - `All` to generate both network flows and all conversations events. It is not recommended due to the impact on resources footprint.
enum: - Flows - Conversations diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml index 6e348b6f8..fcf902624 100644 --- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml +++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml @@ -7915,10 +7915,10 @@ spec: default: Flows description: |- `logTypes` defines the desired record types to generate. Possible values are:
- - `Flows` (default) to export regular network flows
- - `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates
- - `EndedConversations` to generate only ended conversations events
- - `All` to generate both network flows and all conversations events
+ - `Flows` (default) to export regular network flows.
+ - `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates.
+ - `EndedConversations` to generate only ended conversations events.
+ - `All` to generate both network flows and all conversations events. It is not recommended due to the impact on resources footprint.
enum: - Flows - Conversations diff --git a/config/samples/flows_v1beta2_flowcollector.yaml b/config/samples/flows_v1beta2_flowcollector.yaml index 31df2805f..28b6dfe58 100644 --- a/config/samples/flows_v1beta2_flowcollector.yaml +++ b/config/samples/flows_v1beta2_flowcollector.yaml @@ -29,8 +29,11 @@ spec: excludeInterfaces: ["lo"] kafkaBatchSize: 1048576 #flowFilter: + # enable: true # rules: # - action: Accept + # cidr: 0.0.0.0/0 + # - action: Accept # cidr: 10.128.0.1/24 # ports: 6443 # protocol: TCP @@ -40,17 +43,12 @@ spec: # ports: 53 # protocol: UDP # sampling: 20 + # sourcePorts: 443 # - action: Accept - # cidr: 172.30.0.0/16 + # tcpFlags: "SYN" + # cidr: 2.2.2.2/24 # protocol: TCP - # sampling: 30 - # sourcePorts: 443 - # tcpFlags: "SYN" - # action: Accept - # cidr: 2.2.2.2/24 - # protocol: TCP - # sourcePorts: 53 - # enable: true + # sourcePorts: 53 metrics: server: port: 9400 @@ -78,7 +76,7 @@ spec: processor: imagePullPolicy: IfNotPresent logLevel: info - # Change logTypes to "CONVERSATIONS" or "ALL" to enable conversation tracking + # Change logTypes to "Conversations", "EndedConversations" or "All" to enable conversation tracking logTypes: Flows # Append a unique cluster name to each record # clusterName: diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md index 99f6db0f9..85394fb23 100644 --- a/docs/FlowCollector.md +++ b/docs/FlowCollector.md @@ -14421,10 +14421,10 @@ This setting is ignored when Kafka is disabled.
enum `logTypes` defines the desired record types to generate. Possible values are:
-- `Flows` (default) to export regular network flows
-- `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates
-- `EndedConversations` to generate only ended conversations events
-- `All` to generate both network flows and all conversations events

+- `Flows` (default) to export regular network flows.
+- `Conversations` to generate events for started conversations, ended conversations as well as periodic "tick" updates.
+- `EndedConversations` to generate only ended conversations events.
+- `All` to generate both network flows and all conversations events. It is not recommended due to the impact on resources footprint.


Enum: Flows, Conversations, EndedConversations, All
Default: Flows