diff --git a/api/v1beta2/sparkapplication_types.go b/api/v1beta2/sparkapplication_types.go index 4a6f6545b..cf43c7fce 100644 --- a/api/v1beta2/sparkapplication_types.go +++ b/api/v1beta2/sparkapplication_types.go @@ -474,6 +474,9 @@ type SparkPodSpec struct { // Affinity specifies the affinity/anti-affinity settings for the pod. // +optional Affinity *corev1.Affinity `json:"affinity,omitempty"` + // Affinity specifies the affinity/anti-affinity settings for the pod. + // +optional + TopologySpreadConstraint []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"` // Tolerations specifies the tolerations listed in ".spec.tolerations" to be applied to the pod. // +optional Tolerations []corev1.Toleration `json:"tolerations,omitempty"` diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml index 0a2d9c7f7..d6b46317d 100644 --- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml +++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml @@ -183,6 +183,43 @@ spec: driver: description: Driver is the driver specification. properties: + topologySpreadConstraints: + type: object + required: + - maxSkew + - topologyKey + - whenUnsatisfiable + properties: + maxSkew: + type: integer + format: int32 + description: >- + Degree to which pods can be unevenly distributed across topology domains. + topologyKey: + type: string + description: >- + The key of node labels. Nodes that have the same value for this label + belong to the same topology domain. + whenUnsatisfiable: + type: string + enum: [ "DoNotSchedule", "ScheduleAnyway" ] + description: >- + Action when it’s impossible to satisfy the constraint. + labelSelector: + $ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector" + matchLabelKeys: + type: array + items: + type: string + minDomains: + type: integer + format: int32 + nodeAffinityPolicy: + type: string + enum: [ "Honor", "Ignore" ] + nodeTaintsPolicy: + type: string + enum: [ "Honor", "Ignore" ] affinity: description: Affinity specifies the affinity/anti-affinity settings for the pod. @@ -5313,6 +5350,40 @@ spec: executor: description: Executor is the executor specification. properties: + topologySpreadConstraints: + type: object + required: + - maxSkew + - topologyKey + - whenUnsatisfiable + properties: + maxSkew: + type: integer + format: int32 + description: >- + Degree to which pods can be unevenly distributed across topology domains. + topologyKey: + type: string + description: >- + The key of node labels. Nodes that have the same value for this label + belong to the same topology domain. + whenUnsatisfiable: + type: string + enum: [ "DoNotSchedule", "ScheduleAnyway" ] + description: >- + Action when it’s impossible to satisfy the constraint. + labelSelector: + $ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector" + matchLabelKeys: + type: array + items: + type: string + minDomains: + type: integer + format: int32 + nodeAffinityPolicy: + type: string + enum: [ "Honor", "Ignore" ] affinity: description: Affinity specifies the affinity/anti-affinity settings for the pod. diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml index c3d4c59ec..b3c0b88b6 100644 --- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml +++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml @@ -149,6 +149,43 @@ spec: driver: description: Driver is the driver specification. properties: + topologySpreadConstraints: + type: object + required: + - maxSkew + - topologyKey + - whenUnsatisfiable + properties: + maxSkew: + type: integer + format: int32 + description: >- + Degree to which pods can be unevenly distributed across topology domains. + topologyKey: + type: string + description: >- + The key of node labels. Nodes that have the same value for this label + belong to the same topology domain. + whenUnsatisfiable: + type: string + enum: [ "DoNotSchedule", "ScheduleAnyway" ] + description: >- + Action when it’s impossible to satisfy the constraint. + labelSelector: + $ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector" + matchLabelKeys: + type: array + items: + type: string + minDomains: + type: integer + format: int32 + nodeAffinityPolicy: + type: string + enum: [ "Honor", "Ignore" ] + nodeTaintsPolicy: + type: string + enum: [ "Honor", "Ignore" ] affinity: description: Affinity specifies the affinity/anti-affinity settings for the pod. @@ -5255,6 +5292,43 @@ spec: executor: description: Executor is the executor specification. properties: + topologySpreadConstraints: + type: object + required: + - maxSkew + - topologyKey + - whenUnsatisfiable + properties: + maxSkew: + type: integer + format: int32 + description: >- + Degree to which pods can be unevenly distributed across topology domains. + topologyKey: + type: string + description: >- + The key of node labels. Nodes that have the same value for this label + belong to the same topology domain. + whenUnsatisfiable: + type: string + enum: [ "DoNotSchedule", "ScheduleAnyway" ] + description: >- + Action when it’s impossible to satisfy the constraint. + labelSelector: + $ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector" + matchLabelKeys: + type: array + items: + type: string + minDomains: + type: integer + format: int32 + nodeAffinityPolicy: + type: string + enum: [ "Honor", "Ignore" ] + nodeTaintsPolicy: + type: string + enum: [ "Honor", "Ignore" ] affinity: description: Affinity specifies the affinity/anti-affinity settings for the pod. diff --git a/docs/api-docs.md b/docs/api-docs.md index 861c507cf..c9ef015a5 100644 --- a/docs/api-docs.md +++ b/docs/api-docs.md @@ -3149,6 +3149,20 @@ Kubernetes core/v1.Affinity +topologySpreadConstraints
+ + +[]Kubernetes core/v1.TopologySpreadConstraint + + + + +(Optional) +

TopologySpreadConstraint specifies how to spread matching pods among the given topology.

+ + + + tolerations
diff --git a/internal/webhook/sparkpod_defaulter.go b/internal/webhook/sparkpod_defaulter.go index 3cd5eef0f..2ad56346d 100644 --- a/internal/webhook/sparkpod_defaulter.go +++ b/internal/webhook/sparkpod_defaulter.go @@ -155,6 +155,7 @@ func mutateSparkPod(pod *corev1.Pod, app *v1beta2.SparkApplication) error { addSchedulerName, addNodeSelectors, addAffinity, + addTopologySpreadConstraints, addTolerations, addMemoryLimit, addGPU, @@ -458,6 +459,20 @@ func addAffinity(pod *corev1.Pod, app *v1beta2.SparkApplication) error { return nil } +func addTopologySpreadConstraints(pod *corev1.Pod, app *v1beta2.SparkApplication) error { + var topologySpreadConstraints []corev1.TopologySpreadConstraint + if util.IsDriverPod(pod) { + topologySpreadConstraints = app.Spec.Driver.TopologySpreadConstraint + } else if util.IsExecutorPod(pod) { + topologySpreadConstraints = app.Spec.Executor.TopologySpreadConstraint + } + if topologySpreadConstraints == nil { + return nil + } + pod.Spec.TopologySpreadConstraints = append(pod.Spec.TopologySpreadConstraints, topologySpreadConstraints...) + return nil +} + func addTolerations(pod *corev1.Pod, app *v1beta2.SparkApplication) error { var tolerations []corev1.Toleration if util.IsDriverPod(pod) { diff --git a/internal/webhook/sparkpod_defaulter_test.go b/internal/webhook/sparkpod_defaulter_test.go index 64dfb96d3..600bfccaa 100644 --- a/internal/webhook/sparkpod_defaulter_test.go +++ b/internal/webhook/sparkpod_defaulter_test.go @@ -614,6 +614,67 @@ func TestPatchSparkPod_Tolerations(t *testing.T) { assert.Equal(t, app.Spec.Driver.Tolerations[1], modifiedPod.Spec.Tolerations[1]) } +func TestPatchSparkPod_TopologySpreadConstraints(t *testing.T) { + app := &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spark-test", + UID: "spark-test-1", + }, + Spec: v1beta2.SparkApplicationSpec{ + Driver: v1beta2.DriverSpec{ + SparkPodSpec: v1beta2.SparkPodSpec{ + TopologySpreadConstraint: []corev1.TopologySpreadConstraint{ + { + MaxSkew: 1, + TopologyKey: "topologyKey1", + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"spark-role": "driver"}}, + }, + { + MaxSkew: 1, + TopologyKey: "topologyKey1", + WhenUnsatisfiable: corev1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"spark-role": "driver", "sparkJobId": "sparkjob123"}}, + MinDomains: func(i int32) *int32 { return &i }(1), + NodeAffinityPolicy: func(v corev1.NodeInclusionPolicy) *corev1.NodeInclusionPolicy { return &v }(corev1.NodeInclusionPolicyIgnore), + NodeTaintsPolicy: func(v corev1.NodeInclusionPolicy) *corev1.NodeInclusionPolicy { return &v }(corev1.NodeInclusionPolicyIgnore), + MatchLabelKeys: []string{"sparkDriver"}, + }, + }, + }, + }, + }, + } + + // Test patching a pod with a TopologySpreadConstraint. + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spark-driver", + Labels: map[string]string{ + common.LabelSparkRole: common.SparkRoleDriver, + common.LabelLaunchedBySparkOperator: "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: common.SparkDriverContainerName, + Image: "spark-driver:latest", + }, + }, + }, + } + + modifiedPod, err := getModifiedPod(pod, app) + if err != nil { + t.Fatal(err) + } + + assert.Len(t, modifiedPod.Spec.TopologySpreadConstraints, 2) + assert.Equal(t, app.Spec.Driver.TopologySpreadConstraint[0], modifiedPod.Spec.TopologySpreadConstraints[0]) + assert.Equal(t, app.Spec.Driver.TopologySpreadConstraint[1], modifiedPod.Spec.TopologySpreadConstraints[1]) +} + func TestPatchSparkPod_SecurityContext(t *testing.T) { var user int64 = 1000 var user2 int64 = 2000