Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ var (
leaderElectionRetryPeriod time.Duration

driverPodCreationGracePeriod time.Duration
labelSelectorFilter string

// Metrics
enableMetrics bool
Expand Down Expand Up @@ -189,6 +190,8 @@ func NewStartCommand() *cobra.Command {

command.Flags().DurationVar(&driverPodCreationGracePeriod, "driver-pod-creation-grace-period", 10*time.Second, "Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created.")

command.Flags().StringVar(&labelSelectorFilter, "label-selector-filter", "", "Label selector to filter SparkApplications.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
command.Flags().StringVar(&labelSelectorFilter, "label-selector-filter", "", "Label selector to filter SparkApplications.")
command.Flags().StringVar(&labelSelectorFilter, "label-selector-filter", "", "Comma-separated label selector to filter resources. Supports key=value, key!=value, key in (val1,val2), and key existence checks.")


command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Enable metrics.")
command.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", "0", "The address the metric endpoint binds to. "+
"Use the port :8080. If not set, it will be 0 in order to disable the metrics server")
Expand Down Expand Up @@ -389,6 +392,16 @@ func newCacheOptions() cache.Options {
}
}

filterByObject := cache.ByObject{}
if labelSelectorFilter != "" {
selector, err := labels.Parse(labelSelectorFilter)
if err != nil {
logger.Error(err, "failed to parse spark application label selector", "label-selector", labelSelectorFilter)
os.Exit(1)
}
filterByObject.Label = selector
}

options := cache.Options{
Scheme: scheme,
DefaultNamespaces: defaultNamespaces,
Expand All @@ -401,7 +414,7 @@ func newCacheOptions() cache.Options {
&corev1.ConfigMap{}: {},
&corev1.PersistentVolumeClaim{}: {},
&corev1.Service{}: {},
&v1beta2.SparkApplication{}: {},
&v1beta2.SparkApplication{}: filterByObject,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the only type we want to watch?

&v1beta2.ScheduledSparkApplication{}: {},
&v1alpha1.SparkConnect{}: {},
},
Expand Down