diff --git a/api/v1alpha1/flowcollector_webhook.go b/api/v1alpha1/flowcollector_webhook.go
index fc1284e07..934a95d57 100644
--- a/api/v1alpha1/flowcollector_webhook.go
+++ b/api/v1alpha1/flowcollector_webhook.go
@@ -64,6 +64,13 @@ func (r *FlowCollector) ConvertTo(dstRaw conversion.Hub) error {
if restored.Spec.Processor.Metrics.DisableAlerts != nil {
dst.Spec.Processor.Metrics.DisableAlerts = restored.Spec.Processor.Metrics.DisableAlerts
}
+ if restored.Spec.Processor.ClusterName != "" {
+ dst.Spec.Processor.ClusterName = restored.Spec.Processor.ClusterName
+ }
+ if restored.Spec.Processor.MultiClusterDeployment != nil {
+ dst.Spec.Processor.MultiClusterDeployment = restored.Spec.Processor.MultiClusterDeployment
+ }
+
dst.Spec.Processor.Metrics.Server.TLS.InsecureSkipVerify = restored.Spec.Processor.Metrics.Server.TLS.InsecureSkipVerify
dst.Spec.Processor.Metrics.Server.TLS.ProvidedCaFile = restored.Spec.Processor.Metrics.Server.TLS.ProvidedCaFile
diff --git a/api/v1alpha1/zz_generated.conversion.go b/api/v1alpha1/zz_generated.conversion.go
index ca75958fa..2fe3b6732 100644
--- a/api/v1alpha1/zz_generated.conversion.go
+++ b/api/v1alpha1/zz_generated.conversion.go
@@ -725,6 +725,7 @@ func autoConvert_v1beta2_FlowCollectorFLP_To_v1alpha1_FlowCollectorFLP(in *v1bet
// WARNING: in.ConversationEndTimeout requires manual conversion: does not exist in peer-type
// WARNING: in.ConversationTerminatingTimeout requires manual conversion: does not exist in peer-type
// WARNING: in.ClusterName requires manual conversion: does not exist in peer-type
+ // WARNING: in.MultiClusterDeployment requires manual conversion: does not exist in peer-type
if err := Convert_v1beta2_DebugConfig_To_v1alpha1_DebugConfig(&in.Debug, &out.Debug, s); err != nil {
return err
}
diff --git a/api/v1beta1/flowcollector_types.go b/api/v1beta1/flowcollector_types.go
index cf7254da1..7b10631e9 100644
--- a/api/v1beta1/flowcollector_types.go
+++ b/api/v1beta1/flowcollector_types.go
@@ -469,6 +469,10 @@ type FlowCollectorFLP struct {
// `clusterName` is the name of the cluster to appear in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined.
ClusterName string `json:"clusterName,omitempty"`
+ //+kubebuilder:default:=false
+ // Set `multiClusterDeployment` to `true` to enable multi clusters feature. This will add clusterName label to flows data
+ MultiClusterDeployment *bool `json:"multiClusterDeployment,omitempty"`
+
// `debug` allows setting some aspects of the internal configuration of the flow processor.
// This section is aimed exclusively for debugging and fine-grained performance optimizations,
// such as `GOGC` and `GOMAXPROCS` env vars. Users setting its values do it at their own risk.
diff --git a/api/v1beta1/zz_generated.conversion.go b/api/v1beta1/zz_generated.conversion.go
index 602260f54..339ff795d 100644
--- a/api/v1beta1/zz_generated.conversion.go
+++ b/api/v1beta1/zz_generated.conversion.go
@@ -685,6 +685,7 @@ func autoConvert_v1beta1_FlowCollectorFLP_To_v1beta2_FlowCollectorFLP(in *FlowCo
out.ConversationEndTimeout = (*v1.Duration)(unsafe.Pointer(in.ConversationEndTimeout))
out.ConversationTerminatingTimeout = (*v1.Duration)(unsafe.Pointer(in.ConversationTerminatingTimeout))
out.ClusterName = in.ClusterName
+ out.MultiClusterDeployment = (*bool)(unsafe.Pointer(in.MultiClusterDeployment))
if err := Convert_v1beta1_DebugConfig_To_v1beta2_DebugConfig(&in.Debug, &out.Debug, s); err != nil {
return err
}
@@ -719,6 +720,7 @@ func autoConvert_v1beta2_FlowCollectorFLP_To_v1beta1_FlowCollectorFLP(in *v1beta
out.ConversationEndTimeout = (*v1.Duration)(unsafe.Pointer(in.ConversationEndTimeout))
out.ConversationTerminatingTimeout = (*v1.Duration)(unsafe.Pointer(in.ConversationTerminatingTimeout))
out.ClusterName = in.ClusterName
+ out.MultiClusterDeployment = (*bool)(unsafe.Pointer(in.MultiClusterDeployment))
if err := Convert_v1beta2_DebugConfig_To_v1beta1_DebugConfig(&in.Debug, &out.Debug, s); err != nil {
return err
}
diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go
index 09748c294..0324e3349 100644
--- a/api/v1beta1/zz_generated.deepcopy.go
+++ b/api/v1beta1/zz_generated.deepcopy.go
@@ -343,6 +343,11 @@ func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) {
*out = new(v1.Duration)
**out = **in
}
+ if in.MultiClusterDeployment != nil {
+ in, out := &in.MultiClusterDeployment, &out.MultiClusterDeployment
+ *out = new(bool)
+ **out = **in
+ }
in.Debug.DeepCopyInto(&out.Debug)
}
diff --git a/api/v1beta2/flowcollector_types.go b/api/v1beta2/flowcollector_types.go
index 1f6932924..c5e25c3fd 100644
--- a/api/v1beta2/flowcollector_types.go
+++ b/api/v1beta2/flowcollector_types.go
@@ -469,6 +469,10 @@ type FlowCollectorFLP struct {
// `clusterName` is the name of the cluster to appear in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined.
ClusterName string `json:"clusterName,omitempty"`
+ //+kubebuilder:default:=false
+ // Set `multiClusterDeployment` to `true` to enable multi clusters feature. This will add clusterName label to flows data
+ MultiClusterDeployment *bool `json:"multiClusterDeployment,omitempty"`
+
// `debug` allows setting some aspects of the internal configuration of the flow processor.
// This section is aimed exclusively for debugging and fine-grained performance optimizations,
// such as `GOGC` and `GOMAXPROCS` env vars. Users setting its values do it at their own risk.
diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go
index 3f25bdcb2..a70c54936 100644
--- a/api/v1beta2/zz_generated.deepcopy.go
+++ b/api/v1beta2/zz_generated.deepcopy.go
@@ -343,6 +343,11 @@ func (in *FlowCollectorFLP) DeepCopyInto(out *FlowCollectorFLP) {
*out = new(v1.Duration)
**out = **in
}
+ if in.MultiClusterDeployment != nil {
+ in, out := &in.MultiClusterDeployment, &out.MultiClusterDeployment
+ *out = new(bool)
+ **out = **in
+ }
in.Debug.DeepCopyInto(&out.Debug)
}
diff --git a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml
index e5d2264d3..03661ab1a 100644
--- a/bundle/manifests/flows.netobserv.io_flowcollectors.yaml
+++ b/bundle/manifests/flows.netobserv.io_flowcollectors.yaml
@@ -4925,6 +4925,12 @@ spec:
type: object
type: object
type: object
+ multiClusterDeployment:
+ default: false
+ description: Set `multiClusterDeployment` to `true` to enable
+ multi clusters feature. This will add clusterName label to flows
+ data
+ type: boolean
port:
default: 2055
description: Port of the flow collector (host port). By convention,
@@ -7807,6 +7813,12 @@ spec:
type: object
type: object
type: object
+ multiClusterDeployment:
+ default: false
+ description: Set `multiClusterDeployment` to `true` to enable
+ multi clusters feature. This will add clusterName label to flows
+ data
+ type: boolean
port:
default: 2055
description: Port of the flow collector (host port). By convention,
diff --git a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
index f1b694203..fa2e1efc7 100644
--- a/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
+++ b/config/crd/bases/flows.netobserv.io_flowcollectors.yaml
@@ -4911,6 +4911,12 @@ spec:
type: object
type: object
type: object
+ multiClusterDeployment:
+ default: false
+ description: Set `multiClusterDeployment` to `true` to enable
+ multi clusters feature. This will add clusterName label to flows
+ data
+ type: boolean
port:
default: 2055
description: Port of the flow collector (host port). By convention,
@@ -7793,6 +7799,12 @@ spec:
type: object
type: object
type: object
+ multiClusterDeployment:
+ default: false
+ description: Set `multiClusterDeployment` to `true` to enable
+ multi clusters feature. This will add clusterName label to flows
+ data
+ type: boolean
port:
default: 2055
description: Port of the flow collector (host port). By convention,
diff --git a/controllers/flowcollector_controller_iso_test.go b/controllers/flowcollector_controller_iso_test.go
index 19f5f704d..8f45bb955 100644
--- a/controllers/flowcollector_controller_iso_test.go
+++ b/controllers/flowcollector_controller_iso_test.go
@@ -52,6 +52,7 @@ func flowCollectorIsoSpecs() {
CertKey: "",
},
}
+ trueBool := true
specInput := flowslatest.FlowCollectorSpec{
Namespace: operatorNamespace,
@@ -70,6 +71,7 @@ func flowCollectorIsoSpecs() {
ConversationHeartbeatInterval: &metav1.Duration{Duration: time.Second},
ConversationEndTimeout: &metav1.Duration{Duration: time.Second},
ConversationTerminatingTimeout: &metav1.Duration{Duration: time.Second},
+ MultiClusterDeployment: &trueBool,
ClusterName: "testCluster",
Debug: flowslatest.DebugConfig{},
LogTypes: &outputRecordTypes,
diff --git a/controllers/flowlogspipeline/flp_common_objects.go b/controllers/flowlogspipeline/flp_common_objects.go
index d89e8ef3c..c7b49463b 100644
--- a/controllers/flowlogspipeline/flp_common_objects.go
+++ b/controllers/flowlogspipeline/flp_common_objects.go
@@ -45,6 +45,7 @@ const (
conntrackTerminatingTimeout = 5 * time.Second
conntrackEndTimeout = 10 * time.Second
conntrackHeartbeatInterval = 30 * time.Second
+ clusterNameLabelName = "K8S_ClusterName"
)
type ConfKind string
@@ -289,6 +290,10 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) error {
indexFields, lastStage = b.addConnectionTracking(indexFields, lastStage)
+ if b.desired.Processor.MultiClusterDeployment != nil && *b.desired.Processor.MultiClusterDeployment {
+ indexFields = append(indexFields, clusterNameLabelName)
+ }
+
// enrich stage (transform) configuration
enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{
Rules: api.NetworkTransformRules{{
@@ -554,20 +559,23 @@ func (b *builder) addTransformFilter(lastStage config.PipelineBuilderStage) conf
var clusterName string
transformFilterRules := []api.TransformFilterRule{}
- if b.desired.Processor.ClusterName != "" {
- clusterName = b.desired.Processor.ClusterName
- } else {
- //take clustername from openshift
- clusterName = string(globals.DefaultClusterID)
- }
- if clusterName != "" {
- transformFilterRules = []api.TransformFilterRule{
- {
- Input: "K8S_ClusterName",
- Type: "add_field_if_doesnt_exist",
- Value: clusterName,
- },
+ if b.desired.Processor.MultiClusterDeployment != nil && *b.desired.Processor.MultiClusterDeployment {
+ if b.desired.Processor.ClusterName != "" {
+ clusterName = b.desired.Processor.ClusterName
+ } else {
+ //take clustername from openshift
+ clusterName = string(globals.DefaultClusterID)
+ }
+ if clusterName != "" {
+ transformFilterRules = []api.TransformFilterRule{
+ {
+ Input: clusterNameLabelName,
+ Type: "add_field_if_doesnt_exist",
+ Value: clusterName,
+ },
+ }
}
+
}
// Filter-out unused fields?
diff --git a/docs/FlowCollector.md b/docs/FlowCollector.md
index 06a2fb0ae..f2c10d7a3 100644
--- a/docs/FlowCollector.md
+++ b/docs/FlowCollector.md
@@ -7537,6 +7537,15 @@ TLS client configuration for Loki URL.
`Metrics` define the processor configuration regarding metrics