Skip to content

Commit

Permalink
implement an upper bound limit to the number of tracked executor (#2181)
Browse files Browse the repository at this point in the history
* implement an upper bound limit to the number of tracked executor

Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com>

* add upper bound limit to the number of tracked executor to helm chart

Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com>

---------

Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com>
  • Loading branch information
ImpSy authored Oct 11, 2024
1 parent cc57f1c commit a8b5d64
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 9 deletions.
1 change: 1 addition & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| controller.replicas | int | `1` | Number of replicas of controller. |
| controller.workers | int | `10` | Reconcile concurrency, higher values might increase memory usage. |
| controller.logLevel | string | `"info"` | Configure the verbosity of logging, can be one of `debug`, `info`, `error`. |
| controller.maxTrackedExecutorPerApp | int | `1000` | Specifies the maximum number of Executor pods that can be tracked by the controller per SparkApplication. |
| controller.uiService.enable | bool | `true` | Specifies whether to create service for Spark web UI. |
| controller.uiIngress.enable | bool | `false` | Specifies whether to create ingress for Spark web UI. `controller.uiService.enable` must be `true` to enable ingress. |
| controller.uiIngress.urlFormat | string | `""` | Ingress URL format. Required if `controller.uiIngress.enable` is true. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ spec:
{{- if .Values.controller.workqueueRateLimiter.maxDelay.enable }}
- --workqueue-ratelimiter-max-delay={{ .Values.controller.workqueueRateLimiter.maxDelay.duration }}
{{- end }}
{{- if .Values.controller.maxTrackedExecutorPerApp }}
- --max-tracked-executor-per-app={{ .Values.controller.maxTrackedExecutorPerApp }}
{{- end }}
{{- if or .Values.prometheus.metrics.enable .Values.controller.pprof.enable }}
ports:
{{- if .Values.controller.pprof.enable }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,3 +624,11 @@ tests:
- notContains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --workqueue-ratelimiter-max-delay=1h
- it: Should contain `--max-tracked-executor-per-app` arg if `controller.maxTrackedExecutorPerApp` is set
set:
controller:
maxTrackedExecutorPerApp: 123
asserts:
- contains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --max-tracked-executor-per-app=123
3 changes: 3 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ controller:
# -- Configure the verbosity of logging, can be one of `debug`, `info`, `error`.
logLevel: info

# -- Specifies the maximum number of Executor pods that can be tracked by the controller per SparkApplication.
maxTrackedExecutorPerApp: 1000

uiService:
# -- Specifies whether to create service for Spark web UI.
enable: true
Expand Down
21 changes: 12 additions & 9 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ var (
namespaces []string

// Controller
controllerThreads int
cacheSyncTimeout time.Duration
controllerThreads int
cacheSyncTimeout time.Duration
maxTrackedExecutorPerApp int

//WorkQueue
workqueueRateLimiterBucketQPS int
Expand Down Expand Up @@ -140,6 +141,7 @@ func NewStartCommand() *cobra.Command {
command.Flags().IntVar(&controllerThreads, "controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
command.Flags().StringSliceVar(&namespaces, "namespaces", []string{}, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset or contains empty string.")
command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.")
command.Flags().IntVar(&maxTrackedExecutorPerApp, "max-tracked-executor-per-app", 1000, "The maximum number of tracked executors per SparkApplication.")

command.Flags().IntVar(&workqueueRateLimiterBucketQPS, "workqueue-ratelimiter-bucket-qps", 10, "QPS of the bucket rate of the workqueue.")
command.Flags().IntVar(&workqueueRateLimiterBucketSize, "workqueue-ratelimiter-bucket-size", 100, "The token bucket size of the workqueue.")
Expand Down Expand Up @@ -392,13 +394,14 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options {
sparkExecutorMetrics.Register()
}
options := sparkapplication.Options{
Namespaces: namespaces,
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
Namespaces: namespaces,
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
MaxTrackedExecutorPerApp: maxTrackedExecutorPerApp,
}
if enableBatchScheduler {
options.KubeSchedulerNames = kubeSchedulerNames
Expand Down
7 changes: 7 additions & 0 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sparkapplication
import (
"context"
"fmt"
"strconv"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -65,6 +66,8 @@ type Options struct {

SparkApplicationMetrics *metrics.SparkApplicationMetrics
SparkExecutorMetrics *metrics.SparkExecutorMetrics

MaxTrackedExecutorPerApp int
}

// Reconciler reconciles a SparkApplication object.
Expand Down Expand Up @@ -812,6 +815,10 @@ func (r *Reconciler) updateExecutorState(_ context.Context, app *v1beta2.SparkAp
var executorApplicationID string
for _, pod := range pods {
if util.IsExecutorPod(&pod) {
// If the executor number is higher than the `MaxTrackedExecutorPerApp` we want to stop persisting executors
if executorID, _ := strconv.Atoi(util.GetSparkExecutorID(&pod)); executorID > r.options.MaxTrackedExecutorPerApp {
continue
}
newState := util.GetExecutorState(&pod)
oldState, exists := app.Status.ExecutorState[pod.Name]
// Only record an executor event if the executor state is new or it has changed.
Expand Down
168 changes: 168 additions & 0 deletions internal/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package sparkapplication_test

import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kubeflow/spark-operator/api/v1beta2"
Expand Down Expand Up @@ -287,4 +290,169 @@ var _ = Describe("SparkApplication Controller", func() {
Expect(result.Requeue).To(BeFalse())
})
})

Context("When reconciling a running SparkApplication", func() {
ctx := context.Background()
appName := "test"
appNamespace := "default"
key := types.NamespacedName{
Name: appName,
Namespace: appNamespace,
}

BeforeEach(func() {
By("Creating a test SparkApplication")
app := &v1beta2.SparkApplication{}
if err := k8sClient.Get(ctx, key, app); err != nil && errors.IsNotFound(err) {
app = &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: appNamespace,
},
}
v1beta2.SetSparkApplicationDefaults(app)
Expect(k8sClient.Create(ctx, app)).To(Succeed())
}
driverPod := createDriverPod(appName, appNamespace)
Expect(k8sClient.Create(ctx, driverPod)).To(Succeed())
driverPod.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, driverPod)).To(Succeed())

app.Status.DriverInfo.PodName = driverPod.Name
app.Status.AppState.State = v1beta2.ApplicationStateRunning
Expect(k8sClient.Status().Update(ctx, app)).To(Succeed())

executorPod1 := createExecutorPod(appName, appNamespace, 1)
Expect(k8sClient.Create(ctx, executorPod1)).To(Succeed())
executorPod1.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, executorPod1)).To(Succeed())

executorPod2 := createExecutorPod(appName, appNamespace, 2)
Expect(k8sClient.Create(ctx, executorPod2)).To(Succeed())
executorPod2.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, executorPod2)).To(Succeed())
})

AfterEach(func() {
app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())

By("Deleting the created test SparkApplication")
Expect(k8sClient.Delete(ctx, app)).To(Succeed())

By("Deleting the driver pod")
driverPod := &corev1.Pod{}
Expect(k8sClient.Get(ctx, getDriverNamespacedName(appName, appNamespace), driverPod)).To(Succeed())
Expect(k8sClient.Delete(ctx, driverPod)).To(Succeed())

By("Deleting the executor pods")
executorPod1 := &corev1.Pod{}
Expect(k8sClient.Get(ctx, getExecutorNamespacedName(appName, appNamespace, 1), executorPod1)).To(Succeed())
Expect(k8sClient.Delete(ctx, executorPod1)).To(Succeed())
executorPod2 := &corev1.Pod{}
Expect(k8sClient.Get(ctx, getExecutorNamespacedName(appName, appNamespace, 2), executorPod2)).To(Succeed())
Expect(k8sClient.Delete(ctx, executorPod2)).To(Succeed())
})

It("Should add the executors to the SparkApplication", func() {
By("Reconciling the running SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
record.NewFakeRecorder(3),
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

app := &v1beta2.SparkApplication{}
k8sClient.Get(ctx, key, app)

Check failure on line 372 in internal/controller/sparkapplication/controller_test.go

View workflow job for this annotation

GitHub Actions / code-check

Error return value of `k8sClient.Get` is not checked (errcheck)
Expect(app.Status.ExecutorState).To(HaveLen(2))
})

It("Should only add 1 executor to the SparkApplication", func() {
By("Reconciling the running SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
record.NewFakeRecorder(3),
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 1},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

app := &v1beta2.SparkApplication{}
k8sClient.Get(ctx, key, app)

Check failure on line 391 in internal/controller/sparkapplication/controller_test.go

View workflow job for this annotation

GitHub Actions / code-check

Error return value of `k8sClient.Get` is not checked (errcheck)
Expect(app.Status.ExecutorState).To(HaveLen(1))
})
})
})

func getDriverNamespacedName(appName string, appNamespace string) types.NamespacedName {
return types.NamespacedName{
Name: fmt.Sprintf("%s-driver", appName),
Namespace: appNamespace,
}
}

func createDriverPod(appName string, appNamespace string) *corev1.Pod {
namespacedName := getDriverNamespacedName(appName, appNamespace)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: namespacedName.Name,
Namespace: namespacedName.Namespace,
Labels: map[string]string{
common.LabelSparkAppName: appName,
common.LabelLaunchedBySparkOperator: "true",
common.LabelSparkRole: common.SparkRoleDriver,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: common.SparkDriverContainerName,
Image: "spark-executor:latest",
},
},
},
}
return pod
}

func getExecutorNamespacedName(appName string, appNamespace string, id int) types.NamespacedName {
return types.NamespacedName{
Name: fmt.Sprintf("%s-exec%d", appName, id),
Namespace: appNamespace,
}
}

func createExecutorPod(appName string, appNamespace string, id int) *corev1.Pod {
namespacedName := getExecutorNamespacedName(appName, appNamespace, id)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: namespacedName.Name,
Namespace: namespacedName.Namespace,
Labels: map[string]string{
common.LabelSparkAppName: appName,
common.LabelLaunchedBySparkOperator: "true",
common.LabelSparkRole: common.SparkRoleExecutor,
common.LabelSparkExecutorID: fmt.Sprintf("%d", id),
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: common.SparkExecutorContainerName,
Image: "spark-executor:latest",
},
},
},
}
return pod
}
3 changes: 3 additions & 0 deletions pkg/common/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ const (

// LabelSubmissionID is the label that records the submission ID of the current run of an application.
LabelSubmissionID = LabelAnnotationPrefix + "submission-id"

// LabelSparkExecutorID is the label that records executor pod ID
LabelSparkExecutorID = "spark-exec-id"
)

const (
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/sparkpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func IsExecutorPod(pod *corev1.Pod) bool {
return pod.Labels[common.LabelSparkRole] == common.SparkRoleExecutor
}

// GetSparkExecutorID returns the Spark executor ID by checking out pod labels.
func GetSparkExecutorID(pod *corev1.Pod) string {
return pod.Labels[common.LabelSparkExecutorID]
}

// GetAppName returns the spark application name by checking out pod labels.
func GetAppName(pod *corev1.Pod) string {
return pod.Labels[common.LabelSparkAppName]
Expand Down
48 changes: 48 additions & 0 deletions pkg/util/sparkpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,51 @@ var _ = Describe("GetSparkApplicationID", func() {
})
})
})

var _ = Describe("GetSparkExecutorID", func() {
Context("Pod without labels", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "test-namespace",
},
}

It("Should return empty executor ID", func() {
Expect(util.GetSparkExecutorID(pod)).To(BeEmpty())
})
})

Context("Pod without executor ID label", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "test-namespace",
Labels: map[string]string{
common.LabelSparkAppName: "test-app",
},
},
}

It("Should return empty executor ID", func() {
Expect(util.GetSparkExecutorID(pod)).To(BeEmpty())
})
})

Context("Pod with executor ID label", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "test-namespace",
Labels: map[string]string{
common.LabelSparkAppName: "test-app",
common.LabelSparkExecutorID: "1",
},
},
}

It("Should return the executor ID", func() {
Expect(util.GetSparkExecutorID(pod)).To(Equal("1"))
})
})
})

0 comments on commit a8b5d64

Please sign in to comment.