diff --git a/cmd/operator/controller/start.go b/cmd/operator/controller/start.go index 94a9659b4..223af8d73 100644 --- a/cmd/operator/controller/start.go +++ b/cmd/operator/controller/start.go @@ -107,6 +107,7 @@ var ( leaderElectionRetryPeriod time.Duration driverPodCreationGracePeriod time.Duration + labelSelectorFilter string // Metrics enableMetrics bool @@ -189,6 +190,8 @@ func NewStartCommand() *cobra.Command { command.Flags().DurationVar(&driverPodCreationGracePeriod, "driver-pod-creation-grace-period", 10*time.Second, "Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created.") + command.Flags().StringVar(&labelSelectorFilter, "label-selector-filter", "", "Label selector to filter SparkApplications.") + command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Enable metrics.") command.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", "0", "The address the metric endpoint binds to. "+ "Use the port :8080. If not set, it will be 0 in order to disable the metrics server") @@ -389,6 +392,16 @@ func newCacheOptions() cache.Options { } } + filterByObject := cache.ByObject{} + if labelSelectorFilter != "" { + selector, err := labels.Parse(labelSelectorFilter) + if err != nil { + logger.Error(err, "failed to parse spark application label selector", "label-selector", labelSelectorFilter) + os.Exit(1) + } + filterByObject.Label = selector + } + options := cache.Options{ Scheme: scheme, DefaultNamespaces: defaultNamespaces, @@ -401,7 +414,7 @@ func newCacheOptions() cache.Options { &corev1.ConfigMap{}: {}, &corev1.PersistentVolumeClaim{}: {}, &corev1.Service{}: {}, - &v1beta2.SparkApplication{}: {}, + &v1beta2.SparkApplication{}: filterByObject, &v1beta2.ScheduledSparkApplication{}: {}, &v1alpha1.SparkConnect{}: {}, },