From c051d83472a35a3a8c352e0df465bfdba8b3b4b1 Mon Sep 17 00:00:00 2001
From: Olivier Cazade <ocazade@redhat.com>
Date: Thu, 2 Nov 2023 16:23:23 +0100
Subject: [PATCH] Added flag to enable multiCluster configuration

---
 api/v1alpha1/flowcollector_webhook.go         |  7 ++++
 api/v1alpha1/zz_generated.conversion.go       |  1 +
 api/v1beta1/flowcollector_types.go            |  4 +++
 api/v1beta1/zz_generated.conversion.go        |  2 ++
 api/v1beta1/zz_generated.deepcopy.go          |  5 +++
 api/v1beta2/flowcollector_types.go            |  4 +++
 api/v1beta2/zz_generated.deepcopy.go          |  5 +++
 .../flows.netobserv.io_flowcollectors.yaml    | 12 +++++++
 .../flows.netobserv.io_flowcollectors.yaml    | 12 +++++++
 .../flowcollector_controller_iso_test.go      |  2 ++
 .../flowlogspipeline/flp_common_objects.go    | 34 ++++++++++++-------
 docs/FlowCollector.md                         | 18 ++++++++++
 ...ned.flows.netobserv.io_flowcollectors.yaml |  8 +++++
 13 files changed, 101 insertions(+), 13 deletions(-)

diff --git a/api/v1alpha1/flowcollector_webhook.go b/api/v1alpha1/flowcollector_webhook.go
index fc1284e07..934a95d57 100644
--- a/api/v1alpha1/flowcollector_webhook.go
+++ b/api/v1alpha1/flowcollector_webhook.go
@@ -64,6 +64,13 @@ func (r *FlowCollector) ConvertTo(dstRaw conversion.Hub) error {
 	if restored.Spec.Processor.Metrics.DisableAlerts != nil {
 		dst.Spec.Processor.Metrics.DisableAlerts = restored.Spec.Processor.Metrics.DisableAlerts
 	}
+	if restored.Spec.Processor.ClusterName != "" {
+		dst.Spec.Processor.ClusterName = restored.Spec.Processor.ClusterName
+	}
+	if restored.Spec.Processor.MultiClusterDeployment != nil {
+		dst.Spec.Processor.MultiClusterDeployment = restored.Spec.Processor.MultiClusterDeployment
+	}
+
 	dst.Spec.Processor.Metrics.Server.TLS.InsecureSkipVerify = restored.Spec.Processor.Metrics.Server.TLS.InsecureSkipVerify
 	dst.Spec.Processor.Metrics.Server.TLS.ProvidedCaFile = restored.Spec.Processor.Metrics.Server.TLS.ProvidedCaFile
 
diff --git a/api/v1alpha1/zz_generated.conversion.go b/api/v1alpha1/zz_generated.conversion.go
index ca75958fa..2fe3b6732 100644
--- a/api/v1alpha1/zz_generated.conversion.go
+++ b/api/v1alpha1/zz_generated.conversion.go
@@ -725,6 +725,7 @@ func autoConvert_v1beta2_FlowCollectorFLP_To_v1alpha1_FlowCollectorFLP(in *v1bet
 	// WARNING: in.ConversationEndTimeout requires manual conversion: does not exist in peer-type
 	// WARNING: in.ConversationTerminatingTimeout requires manual conversion: does not exist in peer-type
 	// WARNING: in.ClusterName requires manual conversion: does not exist in peer-type
+	// WARNING: in.MultiClusterDeployment requires manual conversion: does not exist in peer-type
 	if err := Convert_v1beta2_DebugConfig_To_v1alpha1_DebugConfig(&in.Debug, &out.Debug, s); err != nil {
 		return err
 	}
diff --git a/api/v1beta1/flowcollector_types.go b/api/v1beta1/flowcollector_types.go
index cf7254da1..7b10631e9 100644
--- a/api/v1beta1/flowcollector_types.go
+++ b/api/v1beta1/flowcollector_types.go
@@ -469,6 +469,10 @@ type FlowCollectorFLP struct {
 	// `clusterName` is the name of the cluster to appear in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined.
 	ClusterName string `json:"clusterName,omitempty"`
 
+	//+kubebuilder:default:=false
+	// Set `multiClusterDeployment` to `true` to enable multi clusters feature. This will add clusterName label to flows data
+	MultiClusterDeployment *bool `json:"multiClusterDeployment,omitempty"`
+
 	// `debug` allows setting some aspects of the internal configuration of the flow processor.
 	// This section is aimed exclusively for debugging and fine-grained performance optimizations,
 	// such as `GOGC` and `GOMAXPROCS` env vars. Users setting its values do it at their own risk.
diff --git a/api/v1beta1/zz_generated.conversion.go b/api/v1beta1/zz_generated.conversion.go
index 602260f54..339ff795d 100644
--- a/api/v1beta1/zz_generated.conversion.go
+++ b/api/v1beta1/zz_generated.conversion.go
@@ -685,6 +685,7 @@ func autoConvert_v1beta1_FlowCollectorFLP_To_v1beta2_FlowCollectorFLP(in *FlowCo
 	out.ConversationEndTimeout = (*v1.Duration)(unsafe.Pointer(in.ConversationEndTimeout))
 	out.ConversationTerminatingTimeout = (*v1.Duration)(unsafe.Pointer(in.ConversationTerminatingTimeout))
 	out.ClusterName = in.ClusterName
+	out.MultiClusterDeployment = (*bool)(unsafe.Pointer(in.MultiClusterDeployment))
 	if err := Convert_v1beta1_DebugConfig_To_v1beta2_DebugConfig(&in.Debug, &out.Debug, s); err != nil {
 		return err
 	}
@@ -719,6 +720,7 @@ func autoConvert_v1beta2_FlowCollectorFLP_To_v1beta1_FlowCollectorFLP(in *v1beta
 	out.ConversationEndTimeout = (*v1.Duration)(unsafe.Pointer(in.ConversationEndTimeout))
 	out.ConversationTerminatingTimeout = (*v1.Duration)(unsafe.Pointer(in.ConversationTerminatingTimeout))
 	out.ClusterName = in.ClusterName
+	out.MultiClusterDeployment = (*bool)(unsafe.Pointer(in.MultiClusterDeployment))
 	if err := Convert_v1beta2_DebugConfig_To_v1beta1_DebugConfig(&in.Debug, &out.Debug, s); err != nil {
 		return err
 	}
diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go
index 09748c294..0324e3349 100644
--- a/api/v1beta1/zz_generated.deepcopy.go
+++ b/api/v1beta1/zz_generated.deepcopy.go
@@ -343,6 +343,11 @@ func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) {
 		*out = new(v1.Duration)
 		**out = **in
 	}
+	if in.MultiClusterDeployment != nil {
+		in, out := &in.MultiClusterDeployment, &out.MultiClusterDeployment
+		*out = new(bool)
+		**out = **in
+	}
 	in.Debug.DeepCopyInto(&out.Debug)
 }
 
diff --git a/api/v1beta2/flowcollector_types.go b/api/v1beta2/flowcollector_types.go
index 1f6932924..c5e25c3fd 100644
--- a/api/v1beta2/flowcollector_types.go
+++ b/api/v1beta2/flowcollector_types.go
@@ -469,6 +469,10 @@ type FlowCollectorFLP struct {
 	// `clusterName` is the name of the cluster to appear in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined.
 	ClusterName string `json:"clusterName,omitempty"`
 
+	//+kubebuilder:default:=false
+	// Set `multiClusterDeployment` to `true` to enable multi clusters feature. This will add clusterName label to flows data
+	MultiClusterDeployment *bool `json:"multiClusterDeployment,omitempty"`
+
 	// `debug` allows setting some aspects of the internal configuration of the flow processor.
 	// This section is aimed exclusively for debugging and fine-grained performance optimizations,
 	// such as `GOGC` and `GOMAXPROCS` env vars. Users setting its values do it at their own risk.
diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go
index 3f25bdcb2..a70c54936 100644
--- a/api/v1beta2/zz_generated.deepcopy.go
+++ b/api/v1beta2/zz_generated.deepcopy.go
@@ -343,6 +343,11 @@ func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) {
 		*out = new(v1.Duration)
 		**out = **in
 	}
+	if in.MultiClusterDeployment != nil {
+		in, out := &in.MultiClusterDeployment, &out.MultiClusterDeployment
+		*out = new(bool)
+		**out = **in
+	}
 	in.Debug.DeepCopyInto(&out.Debug)
 }
 
diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml
index e5d2264d3..03661ab1a 100644
--- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml
+++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml
@@ -4925,6 +4925,12 @@ spec:
                             type: object
                         type: object
                     type: object
+                  multiClusterDeployment:
+                    default: false
+                    description: Set `multiClusterDeployment` to `true` to enable
+                      multi clusters feature. This will add clusterName label to flows
+                      data
+                    type: boolean
                   port:
                     default: 2055
                     description: Port of the flow collector (host port). By convention,
@@ -7807,6 +7813,12 @@ spec:
                             type: object
                         type: object
                     type: object
+                  multiClusterDeployment:
+                    default: false
+                    description: Set `multiClusterDeployment` to `true` to enable
+                      multi clusters feature. This will add clusterName label to flows
+                      data
+                    type: boolean
                   port:
                     default: 2055
                     description: Port of the flow collector (host port). By convention,
diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
index f1b694203..fa2e1efc7 100644
--- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
+++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
@@ -4911,6 +4911,12 @@ spec:
                             type: object
                         type: object
                     type: object
+                  multiClusterDeployment:
+                    default: false
+                    description: Set `multiClusterDeployment` to `true` to enable
+                      multi clusters feature. This will add clusterName label to flows
+                      data
+                    type: boolean
                   port:
                     default: 2055
                     description: Port of the flow collector (host port). By convention,
@@ -7793,6 +7799,12 @@ spec:
                             type: object
                         type: object
                     type: object
+                  multiClusterDeployment:
+                    default: false
+                    description: Set `multiClusterDeployment` to `true` to enable
+                      multi clusters feature. This will add clusterName label to flows
+                      data
+                    type: boolean
                   port:
                     default: 2055
                     description: Port of the flow collector (host port). By convention,
diff --git a/controllers/flowcollector_controller_iso_test.go b/controllers/flowcollector_controller_iso_test.go
index 19f5f704d..8f45bb955 100644
--- a/controllers/flowcollector_controller_iso_test.go
+++ b/controllers/flowcollector_controller_iso_test.go
@@ -52,6 +52,7 @@ func flowCollectorIsoSpecs() {
 				CertKey:  "",
 			},
 		}
+		trueBool := true
 
 		specInput := flowslatest.FlowCollectorSpec{
 			Namespace:       operatorNamespace,
@@ -70,6 +71,7 @@ func flowCollectorIsoSpecs() {
 				ConversationHeartbeatInterval:  &metav1.Duration{Duration: time.Second},
 				ConversationEndTimeout:         &metav1.Duration{Duration: time.Second},
 				ConversationTerminatingTimeout: &metav1.Duration{Duration: time.Second},
+				MultiClusterDeployment:         &trueBool,
 				ClusterName:                    "testCluster",
 				Debug:                          flowslatest.DebugConfig{},
 				LogTypes:                       &outputRecordTypes,
diff --git a/controllers/flowlogspipeline/flp_common_objects.go b/controllers/flowlogspipeline/flp_common_objects.go
index d89e8ef3c..c7b49463b 100644
--- a/controllers/flowlogspipeline/flp_common_objects.go
+++ b/controllers/flowlogspipeline/flp_common_objects.go
@@ -45,6 +45,7 @@ const (
 	conntrackTerminatingTimeout = 5 * time.Second
 	conntrackEndTimeout         = 10 * time.Second
 	conntrackHeartbeatInterval  = 30 * time.Second
+	clusterNameLabelName        = "K8S_ClusterName"
 )
 
 type ConfKind string
@@ -289,6 +290,10 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) error {
 
 	indexFields, lastStage = b.addConnectionTracking(indexFields, lastStage)
 
+	if b.desired.Processor.MultiClusterDeployment != nil && *b.desired.Processor.MultiClusterDeployment {
+		indexFields = append(indexFields, clusterNameLabelName)
+	}
+
 	// enrich stage (transform) configuration
 	enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{
 		Rules: api.NetworkTransformRules{{
@@ -554,20 +559,23 @@ func (b *builder) addTransformFilter(lastStage config.PipelineBuilderStage) conf
 	var clusterName string
 	transformFilterRules := []api.TransformFilterRule{}
 
-	if b.desired.Processor.ClusterName != "" {
-		clusterName = b.desired.Processor.ClusterName
-	} else {
-		//take clustername from openshift
-		clusterName = string(globals.DefaultClusterID)
-	}
-	if clusterName != "" {
-		transformFilterRules = []api.TransformFilterRule{
-			{
-				Input: "K8S_ClusterName",
-				Type:  "add_field_if_doesnt_exist",
-				Value: clusterName,
-			},
+	if b.desired.Processor.MultiClusterDeployment != nil && *b.desired.Processor.MultiClusterDeployment {
+		if b.desired.Processor.ClusterName != "" {
+			clusterName = b.desired.Processor.ClusterName
+		} else {
+			//take clustername from openshift
+			clusterName = string(globals.DefaultClusterID)
+		}
+		if clusterName != "" {
+			transformFilterRules = []api.TransformFilterRule{
+				{
+					Input: clusterNameLabelName,
+					Type:  "add_field_if_doesnt_exist",
+					Value: clusterName,
+				},
+			}
 		}
+
 	}
 
 	// Filter-out unused fields?
diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md
index 06a2fb0ae..f2c10d7a3 100644
--- a/docs/FlowCollector.md
+++ b/docs/FlowCollector.md
@@ -7537,6 +7537,15 @@ TLS client configuration for Loki URL.
           `Metrics` define the processor configuration regarding metrics<br/>
         </td>
         <td>false</td>
+      </tr><tr>
+        <td><b>multiClusterDeployment</b></td>
+        <td>boolean</td>
+        <td>
+          Set `multiClusterDeployment` to `true` to enable multi clusters feature. This will add clusterName label to flows data<br/>
+          <br/>
+            <i>Default</i>: false<br/>
+        </td>
+        <td>false</td>
       </tr><tr>
         <td><b>port</b></td>
         <td>integer</td>
@@ -12698,6 +12707,15 @@ TLS client configuration for Loki URL.
           `Metrics` define the processor configuration regarding metrics<br/>
         </td>
         <td>false</td>
+      </tr><tr>
+        <td><b>multiClusterDeployment</b></td>
+        <td>boolean</td>
+        <td>
+          Set `multiClusterDeployment` to `true` to enable multi clusters feature. This will add clusterName label to flows data<br/>
+          <br/>
+            <i>Default</i>: false<br/>
+        </td>
+        <td>false</td>
       </tr><tr>
         <td><b>port</b></td>
         <td>integer</td>
diff --git a/hack/cloned.flows.netobserv.io_flowcollectors.yaml b/hack/cloned.flows.netobserv.io_flowcollectors.yaml
index 202789ec0..9708a7700 100644
--- a/hack/cloned.flows.netobserv.io_flowcollectors.yaml
+++ b/hack/cloned.flows.netobserv.io_flowcollectors.yaml
@@ -3409,6 +3409,10 @@ spec:
                               type: object
                           type: object
                       type: object
+                    multiClusterDeployment:
+                      default: false
+                      description: Set `multiClusterDeployment` to `true` to enable multi clusters feature. This will add clusterName label to flows data
+                      type: boolean
                     port:
                       default: 2055
                       description: Port of the flow collector (host port). By convention, some values are forbidden. It must be greater than 1024 and different from 4500, 4789 and 6081.
@@ -5396,6 +5400,10 @@ spec:
                               type: object
                           type: object
                       type: object
+                    multiClusterDeployment:
+                      default: false
+                      description: Set `multiClusterDeployment` to `true` to enable multi clusters feature. This will add clusterName label to flows data
+                      type: boolean
                     port:
                       default: 2055
                       description: Port of the flow collector (host port). By convention, some values are forbidden. It must be greater than 1024 and different from 4500, 4789 and 6081.