Skip to content

Commit 1b2c9d9

Browse files
Chaitanya Kuduvalli RamachandraChaitanya Kuduvalli Ramachandra
authored andcommitted
Adding support for injecting TopologySpreadConstraint through webhook
Signed-off-by: Chaitanya Kuduvalli Ramachandra <chaitanyakr@Chaitanyas-MacBook-Pro.local>
1 parent 44972be commit 1b2c9d9

File tree

3 files changed

+79
-0
lines changed

3 files changed

+79
-0
lines changed

api/v1beta2/sparkapplication_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,9 @@ type SparkPodSpec struct {
474474
// Affinity specifies the affinity/anti-affinity settings for the pod.
475475
// +optional
476476
Affinity *corev1.Affinity `json:"affinity,omitempty"`
477+
// Affinity specifies the affinity/anti-affinity settings for the pod.
478+
// +optional
479+
TopologySpreadConstraint []corev1.TopologySpreadConstraint `json:"topologySpreadConstraint,omitempty"`
477480
// Tolerations specifies the tolerations listed in ".spec.tolerations" to be applied to the pod.
478481
// +optional
479482
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`

internal/webhook/sparkpod_defaulter.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func mutateSparkPod(pod *corev1.Pod, app *v1beta2.SparkApplication) error {
155155
addSchedulerName,
156156
addNodeSelectors,
157157
addAffinity,
158+
addTopologySpreadConstraints,
158159
addTolerations,
159160
addMemoryLimit,
160161
addGPU,
@@ -458,6 +459,20 @@ func addAffinity(pod *corev1.Pod, app *v1beta2.SparkApplication) error {
458459
return nil
459460
}
460461

462+
func addTopologySpreadConstraints(pod *corev1.Pod, app *v1beta2.SparkApplication) error {
463+
var topologySpreadConstraints []corev1.TopologySpreadConstraint
464+
if util.IsDriverPod(pod) {
465+
topologySpreadConstraints = app.Spec.Driver.TopologySpreadConstraint
466+
} else if util.IsExecutorPod(pod) {
467+
topologySpreadConstraints = app.Spec.Executor.TopologySpreadConstraint
468+
}
469+
if topologySpreadConstraints == nil {
470+
return nil
471+
}
472+
pod.Spec.TopologySpreadConstraints = append(pod.Spec.TopologySpreadConstraints, topologySpreadConstraints...)
473+
return nil
474+
}
475+
461476
func addTolerations(pod *corev1.Pod, app *v1beta2.SparkApplication) error {
462477
var tolerations []corev1.Toleration
463478
if util.IsDriverPod(pod) {

internal/webhook/sparkpod_defaulter_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,67 @@ func TestPatchSparkPod_Tolerations(t *testing.T) {
614614
assert.Equal(t, app.Spec.Driver.Tolerations[1], modifiedPod.Spec.Tolerations[1])
615615
}
616616

617+
func TestPatchSparkPod_TopologySpreadConstraints(t *testing.T) {
618+
app := &v1beta2.SparkApplication{
619+
ObjectMeta: metav1.ObjectMeta{
620+
Name: "spark-test",
621+
UID: "spark-test-1",
622+
},
623+
Spec: v1beta2.SparkApplicationSpec{
624+
Driver: v1beta2.DriverSpec{
625+
SparkPodSpec: v1beta2.SparkPodSpec{
626+
TopologySpreadConstraint: []corev1.TopologySpreadConstraint{
627+
{
628+
MaxSkew: 1,
629+
TopologyKey: "topologyKey1",
630+
WhenUnsatisfiable: corev1.DoNotSchedule,
631+
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"spark-role": "driver"}},
632+
},
633+
{
634+
MaxSkew: 1,
635+
TopologyKey: "topologyKey1",
636+
WhenUnsatisfiable: corev1.DoNotSchedule,
637+
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"spark-role": "driver", "sparkJobId": "sparkjob123"}},
638+
MinDomains: func(i int32) *int32 { return &i }(1),
639+
NodeAffinityPolicy: func(v corev1.NodeInclusionPolicy) *corev1.NodeInclusionPolicy { return &v }(corev1.NodeInclusionPolicyIgnore),
640+
NodeTaintsPolicy: func(v corev1.NodeInclusionPolicy) *corev1.NodeInclusionPolicy { return &v }(corev1.NodeInclusionPolicyIgnore),
641+
MatchLabelKeys: []string{"sparkDriver"},
642+
},
643+
},
644+
},
645+
},
646+
},
647+
}
648+
649+
// Test patching a pod with a TopologySpreadConstraint.
650+
pod := &corev1.Pod{
651+
ObjectMeta: metav1.ObjectMeta{
652+
Name: "spark-driver",
653+
Labels: map[string]string{
654+
common.LabelSparkRole: common.SparkRoleDriver,
655+
common.LabelLaunchedBySparkOperator: "true",
656+
},
657+
},
658+
Spec: corev1.PodSpec{
659+
Containers: []corev1.Container{
660+
{
661+
Name: common.SparkDriverContainerName,
662+
Image: "spark-driver:latest",
663+
},
664+
},
665+
},
666+
}
667+
668+
modifiedPod, err := getModifiedPod(pod, app)
669+
if err != nil {
670+
t.Fatal(err)
671+
}
672+
673+
assert.Len(t, modifiedPod.Spec.TopologySpreadConstraints, 2)
674+
assert.Equal(t, app.Spec.Driver.TopologySpreadConstraint[0], modifiedPod.Spec.TopologySpreadConstraints[0])
675+
assert.Equal(t, app.Spec.Driver.TopologySpreadConstraint[1], modifiedPod.Spec.TopologySpreadConstraints[1])
676+
}
677+
617678
func TestPatchSparkPod_SecurityContext(t *testing.T) {
618679
var user int64 = 1000
619680
var user2 int64 = 2000

0 commit comments

Comments
 (0)