Skip to content

Commit e8c884f

Browse files
committed
[Scheduler] Replace AddMetadataToPod with AddMetadataToChildResource across all schedulers
Signed-off-by: win5923 <ken89@kimo.com>
1 parent 2d52001 commit e8c884f

File tree

8 files changed

+54
-84
lines changed

8 files changed

+54
-84
lines changed

ray-operator/controllers/ray/batchscheduler/interface/interface.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@ package schedulerinterface
33
import (
44
"context"
55

6-
corev1 "k8s.io/api/core/v1"
76
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
87
"k8s.io/apimachinery/pkg/runtime"
98
"k8s.io/client-go/rest"
109
"sigs.k8s.io/controller-runtime/pkg/builder"
1110
"sigs.k8s.io/controller-runtime/pkg/client"
12-
13-
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1411
)
1512

1613
// BatchScheduler manages submitting RayCluster pods to a third-party scheduler.
@@ -23,11 +20,6 @@ type BatchScheduler interface {
2320
// For most batch schedulers, this results in the creation of a PodGroup.
2421
DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error
2522

26-
// AddMetadataToPod enriches the pod with metadata necessary to tie it to the scheduler.
27-
// For example, setting labels for queues / priority, and setting schedulerName.
28-
// This function will be removed once Rayjob Volcano scheduler integration is completed.
29-
AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod)
30-
3123
// AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler.
3224
// For example, setting labels for queues / priority, and setting schedulerName.
3325
AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string)
@@ -63,9 +55,6 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context,
6355
return nil
6456
}
6557

66-
func (d *DefaultBatchScheduler) AddMetadataToPod(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) {
67-
}
68-
6958
func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
7059
}
7160

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"sigs.k8s.io/controller-runtime/pkg/builder"
1919
"sigs.k8s.io/controller-runtime/pkg/client"
2020

21-
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
2221
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
2322
)
2423

@@ -38,23 +37,33 @@ func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1
3837
return nil
3938
}
4039

41-
func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
40+
func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, _ string) {
4241
logger := ctrl.LoggerFrom(ctx).WithName("kai-scheduler")
43-
pod.Spec.SchedulerName = k.Name()
42+
addSchedulerNameToObject(child, k.Name())
4443

45-
queue, ok := app.Labels[QueueLabelName]
44+
parentLabel := parent.GetLabels()
45+
queue, ok := parentLabel[QueueLabelName]
4646
if !ok || queue == "" {
47-
logger.Info("Queue label missing from RayCluster; pods will remain pending",
47+
logger.Info("Queue label missing from parent; child will remain pending",
4848
"requiredLabel", QueueLabelName)
4949
return
5050
}
51-
if pod.Labels == nil {
52-
pod.Labels = make(map[string]string)
51+
52+
childLabels := child.GetLabels()
53+
if childLabels == nil {
54+
childLabels = make(map[string]string)
5355
}
54-
pod.Labels[QueueLabelName] = queue
56+
childLabels[QueueLabelName] = queue
57+
child.SetLabels(childLabels)
5558
}
5659

57-
func (k *KaiScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
60+
func addSchedulerNameToObject(obj metav1.Object, schedulerName string) {
61+
switch obj := obj.(type) {
62+
case *corev1.Pod:
63+
obj.Spec.SchedulerName = schedulerName
64+
case *corev1.PodTemplateSpec:
65+
obj.Spec.SchedulerName = schedulerName
66+
}
5867
}
5968

6069
func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func createTestPod() *corev1.Pod {
4141
}
4242
}
4343

44-
func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
44+
func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) {
4545
a := assert.New(t)
4646
scheduler := &KaiScheduler{}
4747
ctx := context.Background()
@@ -52,8 +52,8 @@ func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
5252
})
5353
pod := createTestPod()
5454

55-
// Call AddMetadataToPod
56-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
55+
// Call AddMetadataToChildResource
56+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
5757

5858
// Assert scheduler name is set to kai-scheduler
5959
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -63,7 +63,7 @@ func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
6363
a.Equal("test-queue", pod.Labels[QueueLabelName])
6464
}
6565

66-
func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
66+
func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) {
6767
a := assert.New(t)
6868
scheduler := &KaiScheduler{}
6969
ctx := context.Background()
@@ -72,8 +72,8 @@ func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
7272
rayCluster := createTestRayCluster(map[string]string{})
7373
pod := createTestPod()
7474

75-
// Call AddMetadataToPod
76-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
75+
// Call AddMetadataToChildResource
76+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
7777

7878
// Assert scheduler name is still set (always required)
7979
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -85,7 +85,7 @@ func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
8585
}
8686
}
8787

88-
func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
88+
func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) {
8989
a := assert.New(t)
9090
scheduler := &KaiScheduler{}
9191
ctx := context.Background()
@@ -96,8 +96,8 @@ func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
9696
})
9797
pod := createTestPod()
9898

99-
// Call AddMetadataToPod
100-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
99+
// Call AddMetadataToChildResource
100+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
101101

102102
// Assert scheduler name is still set
103103
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -109,7 +109,7 @@ func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
109109
}
110110
}
111111

112-
func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) {
112+
func TestAddMetadataToChildResource_PreservesExistingPodLabels(t *testing.T) {
113113
a := assert.New(t)
114114
scheduler := &KaiScheduler{}
115115
ctx := context.Background()
@@ -126,8 +126,8 @@ func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) {
126126
"app": "ray",
127127
}
128128

129-
// Call AddMetadataToPod
130-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
129+
// Call AddMetadataToChildResource
130+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
131131

132132
// Assert scheduler name is set
133133
a.Equal("kai-scheduler", pod.Spec.SchedulerName)

ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,32 @@ func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, objec
9393
return nil
9494
}
9595

96-
// AddMetadataToPod adds essential labels and annotations to the Ray pod
96+
// AddMetadataToChildResource adds essential labels and annotations to the child resource.
9797
// the scheduler needs these labels and annotations in order to do the scheduling properly
98-
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, rayCluster *rayv1.RayCluster, _ string, pod *corev1.Pod) {
99-
// when gang scheduling is enabled, extra labels need to be added to all pods
100-
if k.isGangSchedulingEnabled(rayCluster) {
101-
pod.Labels[kubeSchedulerPodGroupLabelKey] = rayCluster.Name
98+
func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, _ string) {
99+
// when gang scheduling is enabled, extra labels need to be added to all child resources
100+
if k.isGangSchedulingEnabled(parent) {
101+
labels := child.GetLabels()
102+
if labels == nil {
103+
labels = make(map[string]string)
104+
}
105+
labels[kubeSchedulerPodGroupLabelKey] = parent.GetName()
106+
child.SetLabels(labels)
102107
}
103-
pod.Spec.SchedulerName = k.Name()
108+
addSchedulerNameToObject(child, k.Name())
104109
}
105110

106-
func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
111+
func addSchedulerNameToObject(obj metav1.Object, schedulerName string) {
112+
switch obj := obj.(type) {
113+
case *corev1.Pod:
114+
obj.Spec.SchedulerName = schedulerName
115+
case *corev1.PodTemplateSpec:
116+
obj.Spec.SchedulerName = schedulerName
117+
}
107118
}
108119

109-
func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
110-
_, exist := app.Labels[utils.RayGangSchedulingEnabled]
120+
func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool {
121+
_, exist := obj.GetLabels()[utils.RayGangSchedulingEnabled]
111122
return exist
112123
}
113124

ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func TestCreatePodGroupWithMultipleHosts(t *testing.T) {
117117
a.Equal(int32(5), podGroup.Spec.MinMember)
118118
}
119119

120-
func TestAddMetadataToPod(t *testing.T) {
120+
func TestAddMetadataToChildResource(t *testing.T) {
121121
tests := []struct {
122122
name string
123123
enableGang bool
@@ -150,7 +150,7 @@ func TestAddMetadataToPod(t *testing.T) {
150150
}
151151

152152
scheduler := &KubeScheduler{}
153-
scheduler.AddMetadataToPod(context.TODO(), &cluster, "worker", pod)
153+
scheduler.AddMetadataToChildResource(context.TODO(), &cluster, pod, "worker")
154154

155155
if tt.enableGang {
156156
a.Equal(cluster.Name, pod.Labels[kubeSchedulerPodGroupLabelKey])

ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -251,19 +251,6 @@ func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, pa
251251
addSchedulerName(child, v.Name())
252252
}
253253

254-
// This function will be removed in interface migration PR
255-
func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
256-
pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app)
257-
pod.Annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName
258-
if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok {
259-
pod.Labels[QueueNameLabelKey] = queue
260-
}
261-
if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok {
262-
pod.Spec.PriorityClassName = priorityClassName
263-
}
264-
pod.Spec.SchedulerName = v.Name()
265-
}
266-
267254
func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
268255
if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil {
269256
return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err)

ray-operator/controllers/ray/batchscheduler/yunikorn/yunikorn_scheduler.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -122,32 +122,6 @@ func (y *YuniKornScheduler) isGangSchedulingEnabled(obj metav1.Object) bool {
122122
return exist
123123
}
124124

125-
// AddMetadataToPod adds essential labels and annotations to the Ray pod
126-
// the yunikorn scheduler needs these labels and annotations in order to do the scheduling properly
127-
func (y *YuniKornScheduler) AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
128-
logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName)
129-
// the applicationID and queue name must be provided in the labels
130-
populateLabelsFromObject(rayCluster, pod, RayApplicationIDLabelName, YuniKornPodApplicationIDLabelName)
131-
populateLabelsFromObject(rayCluster, pod, RayApplicationQueueLabelName, YuniKornPodQueueLabelName)
132-
addSchedulerNameToObject(pod, y.Name())
133-
134-
// when gang scheduling is enabled, extra annotations need to be added to all pods
135-
if y.isGangSchedulingEnabled(rayCluster) {
136-
// populate the taskGroups info to each pod
137-
err := propagateTaskGroupsAnnotation(rayCluster, pod)
138-
if err != nil {
139-
logger.Error(err, "failed to add gang scheduling related annotations to pod, "+
140-
"gang scheduling will not be enabled for this workload",
141-
"name", pod.Name, "namespace", pod.Namespace)
142-
return
143-
}
144-
145-
// set the task group name based on the head or worker group name
146-
// the group name for the head and each of the worker group should be different
147-
pod.Annotations[YuniKornTaskGroupNameAnnotationName] = groupName
148-
}
149-
}
150-
151125
func (y *YuniKornScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string) {
152126
logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName)
153127

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,7 +1095,7 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1
10951095
// call the scheduler plugin if so
10961096
if r.options.BatchSchedulerManager != nil {
10971097
if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil {
1098-
scheduler.AddMetadataToPod(ctx, &instance, utils.RayNodeHeadGroupLabelValue, &pod)
1098+
scheduler.AddMetadataToChildResource(ctx, &instance, &pod, utils.RayNodeHeadGroupLabelValue)
10991099
} else {
11001100
return err
11011101
}
@@ -1142,7 +1142,7 @@ func (r *RayClusterReconciler) createWorkerPodWithIndex(ctx context.Context, ins
11421142
pod := r.buildWorkerPod(ctx, instance, worker, replicaGrpName, hostIndex)
11431143
if r.options.BatchSchedulerManager != nil {
11441144
if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil {
1145-
scheduler.AddMetadataToPod(ctx, &instance, worker.GroupName, &pod)
1145+
scheduler.AddMetadataToChildResource(ctx, &instance, &pod, worker.GroupName)
11461146
} else {
11471147
return err
11481148
}

0 commit comments

Comments
 (0)