Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1beta2/sparkapplication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ type SparkPodSpec struct {
// Affinity specifies the affinity/anti-affinity settings for the pod.
// +optional
Affinity *corev1.Affinity `json:"affinity,omitempty"`
// Affinity specifies the affinity/anti-affinity settings for the pod.
// +optional
TopologySpreadConstraint []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"`
// Tolerations specifies the tolerations listed in ".spec.tolerations" to be applied to the pod.
// +optional
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,43 @@ spec:
driver:
description: Driver is the driver specification.
properties:
topologySpreadConstraints:
type: object
required:
- maxSkew
- topologyKey
- whenUnsatisfiable
properties:
maxSkew:
type: integer
format: int32
description: >-
Degree to which pods can be unevenly distributed across topology domains.
topologyKey:
type: string
description: >-
The key of node labels. Nodes that have the same value for this label
belong to the same topology domain.
whenUnsatisfiable:
type: string
enum: [ "DoNotSchedule", "ScheduleAnyway" ]
description: >-
Action when it’s impossible to satisfy the constraint.
labelSelector:
$ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector"
matchLabelKeys:
type: array
items:
type: string
minDomains:
type: integer
format: int32
nodeAffinityPolicy:
type: string
enum: [ "Honor", "Ignore" ]
nodeTaintsPolicy:
type: string
enum: [ "Honor", "Ignore" ]
affinity:
description: Affinity specifies the affinity/anti-affinity
settings for the pod.
Expand Down Expand Up @@ -5313,6 +5350,40 @@ spec:
executor:
description: Executor is the executor specification.
properties:
topologySpreadConstraints:
type: object
required:
- maxSkew
- topologyKey
- whenUnsatisfiable
properties:
maxSkew:
type: integer
format: int32
description: >-
Degree to which pods can be unevenly distributed across topology domains.
topologyKey:
type: string
description: >-
The key of node labels. Nodes that have the same value for this label
belong to the same topology domain.
whenUnsatisfiable:
type: string
enum: [ "DoNotSchedule", "ScheduleAnyway" ]
description: >-
Action when it’s impossible to satisfy the constraint.
labelSelector:
$ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector"
matchLabelKeys:
type: array
items:
type: string
minDomains:
type: integer
format: int32
nodeAffinityPolicy:
type: string
enum: [ "Honor", "Ignore" ]
affinity:
description: Affinity specifies the affinity/anti-affinity
settings for the pod.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,43 @@ spec:
driver:
description: Driver is the driver specification.
properties:
topologySpreadConstraints:
type: object
required:
- maxSkew
- topologyKey
- whenUnsatisfiable
properties:
maxSkew:
type: integer
format: int32
description: >-
Degree to which pods can be unevenly distributed across topology domains.
topologyKey:
type: string
description: >-
The key of node labels. Nodes that have the same value for this label
belong to the same topology domain.
whenUnsatisfiable:
type: string
enum: [ "DoNotSchedule", "ScheduleAnyway" ]
description: >-
Action when it’s impossible to satisfy the constraint.
labelSelector:
$ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector"
matchLabelKeys:
type: array
items:
type: string
minDomains:
type: integer
format: int32
nodeAffinityPolicy:
type: string
enum: [ "Honor", "Ignore" ]
nodeTaintsPolicy:
type: string
enum: [ "Honor", "Ignore" ]
affinity:
description: Affinity specifies the affinity/anti-affinity settings
for the pod.
Expand Down Expand Up @@ -5255,6 +5292,43 @@ spec:
executor:
description: Executor is the executor specification.
properties:
topologySpreadConstraints:
type: object
required:
- maxSkew
- topologyKey
- whenUnsatisfiable
properties:
maxSkew:
type: integer
format: int32
description: >-
Degree to which pods can be unevenly distributed across topology domains.
topologyKey:
type: string
description: >-
The key of node labels. Nodes that have the same value for this label
belong to the same topology domain.
whenUnsatisfiable:
type: string
enum: [ "DoNotSchedule", "ScheduleAnyway" ]
description: >-
Action when it’s impossible to satisfy the constraint.
labelSelector:
$ref: "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector"
matchLabelKeys:
type: array
items:
type: string
minDomains:
type: integer
format: int32
nodeAffinityPolicy:
type: string
enum: [ "Honor", "Ignore" ]
nodeTaintsPolicy:
type: string
enum: [ "Honor", "Ignore" ]
affinity:
description: Affinity specifies the affinity/anti-affinity settings
for the pod.
Expand Down
14 changes: 14 additions & 0 deletions docs/api-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3149,6 +3149,20 @@ Kubernetes core/v1.Affinity
</tr>
<tr>
<td>
<code>topologySpreadConstraints</code><br/>
<em>
<a href="https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.24/#topologyspreadconstraint-v1-core">
[]Kubernetes core/v1.TopologySpreadConstraint
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>TopologySpreadConstraint specifies how to spread matching pods among the given topology.</p>
</td>
</tr>
<tr>
<td>
<code>tolerations</code><br/>
<em>
<a href="https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.24/#toleration-v1-core">
Expand Down
15 changes: 15 additions & 0 deletions internal/webhook/sparkpod_defaulter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func mutateSparkPod(pod *corev1.Pod, app *v1beta2.SparkApplication) error {
addSchedulerName,
addNodeSelectors,
addAffinity,
addTopologySpreadConstraints,
addTolerations,
addMemoryLimit,
addGPU,
Expand Down Expand Up @@ -458,6 +459,20 @@ func addAffinity(pod *corev1.Pod, app *v1beta2.SparkApplication) error {
return nil
}

func addTopologySpreadConstraints(pod *corev1.Pod, app *v1beta2.SparkApplication) error {
var topologySpreadConstraints []corev1.TopologySpreadConstraint
if util.IsDriverPod(pod) {
topologySpreadConstraints = app.Spec.Driver.TopologySpreadConstraint
} else if util.IsExecutorPod(pod) {
topologySpreadConstraints = app.Spec.Executor.TopologySpreadConstraint
}
if topologySpreadConstraints == nil {
return nil
}
pod.Spec.TopologySpreadConstraints = append(pod.Spec.TopologySpreadConstraints, topologySpreadConstraints...)
return nil
}

func addTolerations(pod *corev1.Pod, app *v1beta2.SparkApplication) error {
var tolerations []corev1.Toleration
if util.IsDriverPod(pod) {
Expand Down
61 changes: 61 additions & 0 deletions internal/webhook/sparkpod_defaulter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,67 @@ func TestPatchSparkPod_Tolerations(t *testing.T) {
assert.Equal(t, app.Spec.Driver.Tolerations[1], modifiedPod.Spec.Tolerations[1])
}

func TestPatchSparkPod_TopologySpreadConstraints(t *testing.T) {
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "spark-test",
UID: "spark-test-1",
},
Spec: v1beta2.SparkApplicationSpec{
Driver: v1beta2.DriverSpec{
SparkPodSpec: v1beta2.SparkPodSpec{
TopologySpreadConstraint: []corev1.TopologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "topologyKey1",
WhenUnsatisfiable: corev1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"spark-role": "driver"}},
},
{
MaxSkew: 1,
TopologyKey: "topologyKey1",
WhenUnsatisfiable: corev1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"spark-role": "driver", "sparkJobId": "sparkjob123"}},
MinDomains: func(i int32) *int32 { return &i }(1),
NodeAffinityPolicy: func(v corev1.NodeInclusionPolicy) *corev1.NodeInclusionPolicy { return &v }(corev1.NodeInclusionPolicyIgnore),
NodeTaintsPolicy: func(v corev1.NodeInclusionPolicy) *corev1.NodeInclusionPolicy { return &v }(corev1.NodeInclusionPolicyIgnore),
MatchLabelKeys: []string{"sparkDriver"},
},
},
},
},
},
}

// Test patching a pod with a TopologySpreadConstraint.
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "spark-driver",
Labels: map[string]string{
common.LabelSparkRole: common.SparkRoleDriver,
common.LabelLaunchedBySparkOperator: "true",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: common.SparkDriverContainerName,
Image: "spark-driver:latest",
},
},
},
}

modifiedPod, err := getModifiedPod(pod, app)
if err != nil {
t.Fatal(err)
}

assert.Len(t, modifiedPod.Spec.TopologySpreadConstraints, 2)
assert.Equal(t, app.Spec.Driver.TopologySpreadConstraint[0], modifiedPod.Spec.TopologySpreadConstraints[0])
assert.Equal(t, app.Spec.Driver.TopologySpreadConstraint[1], modifiedPod.Spec.TopologySpreadConstraints[1])
}

func TestPatchSparkPod_SecurityContext(t *testing.T) {
var user int64 = 1000
var user2 int64 = 2000
Expand Down