Skip to content

Commit

Permalink
fixup! implement an upper bound limit to the number of tracked executor
Browse files Browse the repository at this point in the history
  • Loading branch information
ImpSy committed Sep 27, 2024
1 parent 385bc75 commit 7de884e
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 10 deletions.
10 changes: 5 additions & 5 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ var (
namespaces []string

// Controller
controllerThreads int
cacheSyncTimeout time.Duration
maxTrackedExectorPerApp int
controllerThreads int
cacheSyncTimeout time.Duration
maxTrackedExecutorPerApp int

// Batch scheduler
enableBatchScheduler bool
Expand Down Expand Up @@ -134,7 +134,7 @@ func NewStartCommand() *cobra.Command {
command.Flags().IntVar(&controllerThreads, "controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
command.Flags().StringSliceVar(&namespaces, "namespaces", []string{}, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset or contains empty string.")
command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.")
command.Flags().IntVar(&maxTrackedExectorPerApp, "max-tracked-executor-per-app", 1000, "The maximum number of tracked executors per SparkApplication.")
command.Flags().IntVar(&maxTrackedExecutorPerApp, "max-tracked-executor-per-app", 1000, "The maximum number of tracked executors per SparkApplication.")

command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.")
command.Flags().StringSliceVar(&kubeSchedulerNames, "kube-scheduler-names", []string{}, "The kube-scheduler names for scheduling Spark applications.")
Expand Down Expand Up @@ -378,7 +378,7 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options {
DefaultBatchScheduler: defaultBatchScheduler,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
MaxTrackedExecutorPerApp: maxTrackedExectorPerApp,
MaxTrackedExecutorPerApp: maxTrackedExecutorPerApp,
}
if enableBatchScheduler {
options.KubeSchedulerNames = kubeSchedulerNames
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func createExecutorPod(appName string, appNamespace string, id int) *corev1.Pod
common.LabelSparkAppName: appName,
common.LabelLaunchedBySparkOperator: "true",
common.LabelSparkRole: common.SparkRoleExecutor,
common.SparkExecutorIDLabel: fmt.Sprintf("%d", id),
common.LabelSparkExecutorID: fmt.Sprintf("%d", id),
},
},
Spec: corev1.PodSpec{
Expand Down
4 changes: 2 additions & 2 deletions pkg/common/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ const (
// LabelSubmissionID is the label that records the submission ID of the current run of an application.
LabelSubmissionID = LabelAnnotationPrefix + "submission-id"

// SparkExecutorIDLabel is the label that records executor pod ID
SparkExecutorIDLabel = "spark-exec-id"
// LabelSparkExecutorID is the label that records executor pod ID
LabelSparkExecutorID = "spark-exec-id"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/sparkpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func IsExecutorPod(pod *corev1.Pod) bool {

// GetSparkExecutorID returns the Spark executor ID by checking out pod labels.
func GetSparkExecutorID(pod *corev1.Pod) string {
return pod.Labels[common.SparkExecutorIDLabel]
return pod.Labels[common.LabelSparkExecutorID]
}

// GetAppName returns the spark application name by checking out pod labels.
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/sparkpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ var _ = Describe("GetSparkExecutorID", func() {
Namespace: "test-namespace",
Labels: map[string]string{
common.LabelSparkAppName: "test-app",
common.SparkExecutorIDLabel: "1",
common.LabelSparkExecutorID: "1",
},
},
}
Expand Down

0 comments on commit 7de884e

Please sign in to comment.