Skip to content

Commit

Permalink
implement an upper bound limit to the number of tracked executor
Browse files Browse the repository at this point in the history
Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com>
  • Loading branch information
ImpSy committed Sep 19, 2024
1 parent 75b9266 commit 1346753
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 9 deletions.
21 changes: 12 additions & 9 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ var (
namespaces []string

// Controller
controllerThreads int
cacheSyncTimeout time.Duration
controllerThreads int
cacheSyncTimeout time.Duration
maxTrackedExectorPerApp int

// Batch scheduler
enableBatchScheduler bool
Expand Down Expand Up @@ -133,6 +134,7 @@ func NewStartCommand() *cobra.Command {
command.Flags().IntVar(&controllerThreads, "controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
command.Flags().StringSliceVar(&namespaces, "namespaces", []string{}, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset or contains empty string.")
command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.")
command.Flags().IntVar(&maxTrackedExectorPerApp, "max-tracked-executor-per-app", 1000, "The maximum number of tracked executors per SparkApplication.")

command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.")
command.Flags().StringSliceVar(&kubeSchedulerNames, "kube-scheduler-names", []string{}, "The kube-scheduler names for scheduling Spark applications.")
Expand Down Expand Up @@ -369,13 +371,14 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options {
sparkExecutorMetrics.Register()
}
options := sparkapplication.Options{
Namespaces: namespaces,
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
Namespaces: namespaces,
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
MaxTrackedExecutorPerApp: maxTrackedExectorPerApp,
}
if enableBatchScheduler {
options.KubeSchedulerNames = kubeSchedulerNames
Expand Down
7 changes: 7 additions & 0 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sparkapplication
import (
"context"
"fmt"
"strconv"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -65,6 +66,8 @@ type Options struct {

SparkApplicationMetrics *metrics.SparkApplicationMetrics
SparkExecutorMetrics *metrics.SparkExecutorMetrics

MaxTrackedExecutorPerApp int
}

// Reconciler reconciles a SparkApplication object.
Expand Down Expand Up @@ -833,6 +836,10 @@ func (r *Reconciler) updateExecutorState(_ context.Context, app *v1beta2.SparkAp
var executorApplicationID string
for _, pod := range pods {
if util.IsExecutorPod(&pod) {
// If the executor number is higher than the `MaxTrackedExecutorPerApp` we want to stop persisting executors
if executorID, _ := strconv.Atoi(util.GetSparkExecutorID(&pod)); executorID > r.options.MaxTrackedExecutorPerApp {
continue
}
newState := util.GetExecutorState(&pod)
oldState, exists := app.Status.ExecutorState[pod.Name]
// Only record an executor event if the executor state is new or it has changed.
Expand Down
168 changes: 168 additions & 0 deletions internal/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package sparkapplication_test

import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kubeflow/spark-operator/api/v1beta2"
Expand Down Expand Up @@ -287,4 +290,169 @@ var _ = Describe("SparkApplication Controller", func() {
Expect(result.Requeue).To(BeFalse())
})
})

Context("When reconciling a running SparkApplication", func() {
ctx := context.Background()
appName := "test"
appNamespace := "default"
key := types.NamespacedName{
Name: appName,
Namespace: appNamespace,
}

BeforeEach(func() {
By("Creating a test SparkApplication")
app := &v1beta2.SparkApplication{}
if err := k8sClient.Get(ctx, key, app); err != nil && errors.IsNotFound(err) {
app = &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: appNamespace,
},
}
v1beta2.SetSparkApplicationDefaults(app)
Expect(k8sClient.Create(ctx, app)).To(Succeed())
}
driverPod := createDriverPod(appName, appNamespace)
Expect(k8sClient.Create(ctx, driverPod)).To(Succeed())
driverPod.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, driverPod)).To(Succeed())

app.Status.DriverInfo.PodName = driverPod.Name
app.Status.AppState.State = v1beta2.ApplicationStateRunning
Expect(k8sClient.Status().Update(ctx, app)).To(Succeed())

executorPod1 := createExecutorPod(appName, appNamespace, 1)
Expect(k8sClient.Create(ctx, executorPod1)).To(Succeed())
executorPod1.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, executorPod1)).To(Succeed())

executorPod2 := createExecutorPod(appName, appNamespace, 2)
Expect(k8sClient.Create(ctx, executorPod2)).To(Succeed())
executorPod2.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, executorPod2)).To(Succeed())
})

AfterEach(func() {
app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())

By("Deleting the created test SparkApplication")
Expect(k8sClient.Delete(ctx, app)).To(Succeed())

By("Deleting the driver pod")
driverPod := &corev1.Pod{}
Expect(k8sClient.Get(ctx, getDriverNamespacedName(appName, appNamespace), driverPod)).To(Succeed())
Expect(k8sClient.Delete(ctx, driverPod)).To(Succeed())

By("Deleting the executor pods")
executorPod1 := &corev1.Pod{}
Expect(k8sClient.Get(ctx, getExecutorNamespacedName(appName, appNamespace, 1), executorPod1)).To(Succeed())
Expect(k8sClient.Delete(ctx, executorPod1)).To(Succeed())
executorPod2 := &corev1.Pod{}
Expect(k8sClient.Get(ctx, getExecutorNamespacedName(appName, appNamespace, 2), executorPod2)).To(Succeed())
Expect(k8sClient.Delete(ctx, executorPod2)).To(Succeed())
})

It("Should add the executors to the SparkApplication", func() {
By("Reconciling the running SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
record.NewFakeRecorder(3),
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

app := &v1beta2.SparkApplication{}
k8sClient.Get(ctx, key, app)
Expect(app.Status.ExecutorState).To(HaveLen(2))
})

It("Should only add 1 executor to the SparkApplication", func() {
By("Reconciling the running SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
record.NewFakeRecorder(3),
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 1},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

app := &v1beta2.SparkApplication{}
k8sClient.Get(ctx, key, app)
Expect(app.Status.ExecutorState).To(HaveLen(1))
})
})
})

func getDriverNamespacedName(appName string, appNamespace string) types.NamespacedName {
return types.NamespacedName{
Name: fmt.Sprintf("%s-driver", appName),
Namespace: appNamespace,
}
}

func createDriverPod(appName string, appNamespace string) *corev1.Pod {
namespacedName := getDriverNamespacedName(appName, appNamespace)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: namespacedName.Name,
Namespace: namespacedName.Namespace,
Labels: map[string]string{
common.LabelSparkAppName: appName,
common.LabelLaunchedBySparkOperator: "true",
common.LabelSparkRole: common.SparkRoleDriver,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: common.SparkDriverContainerName,
Image: "spark-executor:latest",
},
},
},
}
return pod
}

func getExecutorNamespacedName(appName string, appNamespace string, id int) types.NamespacedName {
return types.NamespacedName{
Name: fmt.Sprintf("%s-exec%d", appName, id),
Namespace: appNamespace,
}
}

func createExecutorPod(appName string, appNamespace string, id int) *corev1.Pod {
namespacedName := getExecutorNamespacedName(appName, appNamespace, id)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: namespacedName.Name,
Namespace: namespacedName.Namespace,
Labels: map[string]string{
common.LabelSparkAppName: appName,
common.LabelLaunchedBySparkOperator: "true",
common.LabelSparkRole: common.SparkRoleExecutor,
common.SparkExecutorIDLabel: fmt.Sprintf("%d", id),
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: common.SparkExecutorContainerName,
Image: "spark-executor:latest",
},
},
},
}
return pod
}
3 changes: 3 additions & 0 deletions pkg/common/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ const (

// LabelSubmissionID is the label that records the submission ID of the current run of an application.
LabelSubmissionID = LabelAnnotationPrefix + "submission-id"

// SparkExecutorIDLabel is the label that records executor pod ID
SparkExecutorIDLabel = "spark-exec-id"
)

const (
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/sparkpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func IsExecutorPod(pod *corev1.Pod) bool {
return pod.Labels[common.LabelSparkRole] == common.SparkRoleExecutor
}

// GetSparkExecutorID returns the Spark executor ID by checking out pod labels.
func GetSparkExecutorID(pod *corev1.Pod) string {
return pod.Labels[common.SparkExecutorIDLabel]
}

// GetAppName returns the spark application name by checking out pod labels.
func GetAppName(pod *corev1.Pod) string {
return pod.Labels[common.LabelSparkAppName]
Expand Down
48 changes: 48 additions & 0 deletions pkg/util/sparkpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,51 @@ var _ = Describe("GetSparkApplicationID", func() {
})
})
})

var _ = Describe("GetSparkExecutorID", func() {
Context("Pod without labels", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "test-namespace",
},
}

It("Should return empty executor ID", func() {
Expect(util.GetSparkExecutorID(pod)).To(BeEmpty())
})
})

Context("Pod without executor ID label", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "test-namespace",
Labels: map[string]string{
common.LabelSparkAppName: "test-app",
},
},
}

It("Should return empty executor ID", func() {
Expect(util.GetSparkExecutorID(pod)).To(BeEmpty())
})
})

Context("Pod with executor ID label", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "test-namespace",
Labels: map[string]string{
common.LabelSparkAppName: "test-app",
common.SparkExecutorIDLabel: "1",
},
},
}

It("Should return the executor ID", func() {
Expect(util.GetSparkExecutorID(pod)).To(Equal("1"))
})
})
})

0 comments on commit 1346753

Please sign in to comment.