From 99d13c6d01b0d5b7b5616ed40b0f884c55b94310 Mon Sep 17 00:00:00 2001 From: Aaron Liang Date: Sun, 24 Aug 2025 01:01:29 +0000 Subject: [PATCH 1/8] Adding multi-host indexing Signed-off-by: Aaron Liang --- ray-operator/controllers/ray/common/pod.go | 12 +- .../controllers/ray/common/pod_test.go | 45 ++++-- .../controllers/ray/raycluster_controller.go | 151 +++++++++++++++--- .../ray/raycluster_controller_test.go | 32 ++++ .../controllers/ray/utils/constant.go | 8 + ray-operator/controllers/ray/utils/util.go | 6 + ray-operator/pkg/features/features.go | 7 + 7 files changed, 230 insertions(+), 31 deletions(-) diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index fc69417643f..d92e5f1b2f1 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -17,6 +17,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/ray-project/kuberay/ray-operator/pkg/features" ) const ( @@ -250,7 +251,7 @@ func getEnableProbesInjection() bool { } // DefaultWorkerPodTemplate sets the config values -func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string) corev1.PodTemplateSpec { +func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, numHostIndex int) corev1.PodTemplateSpec { podTemplate := workerSpec.Template podTemplate.GenerateName = podName // Pods created by RayCluster should be restricted to the namespace of the RayCluster. @@ -641,6 +642,15 @@ func labelPod(rayNodeType rayv1.RayNodeType, rayClusterName string, groupName st return labels } +// addMultihostIndexingPodLabels returns labels that contain RayMultihostIndexing feature labels +func addMultihostIndexingPodLabels(currentLabels map[string]string, replicaGrpName string, numHostIndex int) map[string]string { + labels := currentLabels + labels[utils.RayWorkerReplicaIndexKey] = replicaGrpName + labels[utils.RayHostIndexKey] = strconv.Itoa(numHostIndex) + + return labels +} + func setInitContainerEnvVars(container *corev1.Container, fqdnRayIP string) { if len(container.Env) == 0 { container.Env = []corev1.EnvVar{} diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index 6906cc61d28..4c75afae0a4 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -20,6 +20,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/ray-project/kuberay/ray-operator/pkg/features" ) var testMemoryLimit = resource.MustParse("1Gi") @@ -686,7 +687,7 @@ func TestBuildPod(t *testing.T) { worker := cluster.Spec.WorkerGroupSpecs[0] podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0) fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace) - podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379") + podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379", "", 0) pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", false, utils.GetCRDType(""), fqdnRayIP, defaultContainerEnvs) // Check resources @@ -760,7 +761,7 @@ func TestBuildPod_WithNoCPULimits(t *testing.T) { worker := cluster.Spec.WorkerGroupSpecs[0] podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0) fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace) - podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379") + podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379", "", 0) pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", false, utils.GetCRDType(""), fqdnRayIP, nil) expectedCommandArg = splitAndSort("ulimit -n 65536; ray start --block --dashboard-agent-listen-port=52365 --memory=1073741824 --num-cpus=2 --num-gpus=3 --address=raycluster-sample-head-svc.default.svc.cluster.local:6379 --port=6379 --metrics-export-port=8080") actualCommandArg = splitAndSort(pod.Spec.Containers[0].Args[0]) @@ -791,7 +792,7 @@ func TestBuildPod_WithOverwriteCommand(t *testing.T) { worker := cluster.Spec.WorkerGroupSpecs[0] podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0) fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace) - podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379") + podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379", "", 0) workerPod := BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", false, utils.GetCRDType(""), fqdnRayIP, nil) workerContainer := workerPod.Spec.Containers[utils.RayContainerIndex] assert.Equal(t, []string{"I am worker"}, workerContainer.Command) @@ -846,7 +847,7 @@ func TestBuildPod_WithCreatedByRayService(t *testing.T) { worker := cluster.Spec.WorkerGroupSpecs[0] podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0) fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace) - podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379") + podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379", "", 0) pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", false, utils.RayServiceCRD, fqdnRayIP, nil) val, ok = pod.Labels[utils.RayClusterServingServiceLabelKey] @@ -902,7 +903,7 @@ func TestBuildPod_WithLoginBash(t *testing.T) { worker := cluster.Spec.WorkerGroupSpecs[0] podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0) fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace) - podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379") + podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379", "", 0) workerPod := BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", false, utils.RayServiceCRD, fqdnRayIP, nil) // Verify worker container command @@ -1165,11 +1166,33 @@ func TestDefaultWorkerPodTemplateWithName(t *testing.T) { expectedWorker := *worker.DeepCopy() // Pass a deep copy of worker (*worker.DeepCopy()) to prevent "worker" from updating. - podTemplateSpec := DefaultWorkerPodTemplate(ctx, *cluster, *worker.DeepCopy(), podName, fqdnRayIP, "6379") + podTemplateSpec := DefaultWorkerPodTemplate(ctx, *cluster, *worker.DeepCopy(), podName, fqdnRayIP, "6379", "", 0) assert.Empty(t, podTemplateSpec.ObjectMeta.Name) assert.Equal(t, expectedWorker, worker) } +func TestDeafultWorkerPodTemplateWithReplicaGrpAndIndex(t *testing.T) { + ctx := context.Background() + + cluster := instance.DeepCopy() + + fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace) + worker := cluster.Spec.WorkerGroupSpecs[0] + + features.SetFeatureGateDuringTest(t, features.RayMulithostIndexing, true) + + worker.Template.ObjectMeta.Name = "ray-worker-test" + worker.NumOfHosts = 4 + podName := cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0) + groupReplicaName := utils.GenerateRayWorkerReplicaGroupName(worker.GroupName) + + // Pass a deep copy of worker (*worker.DeepCopy()) to prevent "worker" from updating. + podTemplateSpec := DefaultWorkerPodTemplate(ctx, *cluster, *worker.DeepCopy(), podName, fqdnRayIP, "6379", groupReplicaName, 2) + assert.Empty(t, podTemplateSpec.ObjectMeta.Name) + assert.Equal(t, podTemplateSpec.Labels[utils.RayWorkerReplicaIndexKey], groupReplicaName) + assert.Equal(t, "2", podTemplateSpec.Labels[utils.RayHostIndexKey]) +} + func containerPortExists(ports []corev1.ContainerPort, containerPort int32) error { name := utils.MetricsPortName for _, port := range ports { @@ -1212,7 +1235,7 @@ func TestDefaultWorkerPodTemplateWithConfigurablePorts(t *testing.T) { worker := cluster.Spec.WorkerGroupSpecs[0] podName := cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0) fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace) - podTemplateSpec := DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379") + podTemplateSpec := DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379", "", 0) // DefaultWorkerPodTemplate will add the default metrics port if user doesn't specify it. // Verify the default metrics port exists. require.NoError(t, containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, int32(utils.DefaultMetricsPort))) @@ -1222,7 +1245,7 @@ func TestDefaultWorkerPodTemplateWithConfigurablePorts(t *testing.T) { ContainerPort: customMetricsPort, } cluster.Spec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Ports = []corev1.ContainerPort{metricsPort} - podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379") + podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379", "", 0) // Verify the custom metrics port exists. require.NoError(t, containerPortExists(podTemplateSpec.Spec.Containers[0].Ports, customMetricsPort)) } @@ -1261,7 +1284,7 @@ func TestDefaultWorkerPodTemplate_Autoscaling(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - podTemplateSpec := DefaultWorkerPodTemplate(ctx, tc.cluster, tc.cluster.Spec.WorkerGroupSpecs[0], podName, fqdnRayIP, "6379") + podTemplateSpec := DefaultWorkerPodTemplate(ctx, tc.cluster, tc.cluster.Spec.WorkerGroupSpecs[0], podName, fqdnRayIP, "6379", "", 0) assert.Equal(t, tc.expectedRestartPolicy, podTemplateSpec.Spec.RestartPolicy) }) } @@ -1277,7 +1300,7 @@ func TestDefaultInitContainer(t *testing.T) { expectedResult := len(cluster.Spec.WorkerGroupSpecs[0].Template.Spec.InitContainers) + 1 // Pass a deep copy of worker (*worker.DeepCopy()) to prevent "worker" from updating. - podTemplateSpec := DefaultWorkerPodTemplate(ctx, *cluster, *worker.DeepCopy(), podName, fqdnRayIP, "6379") + podTemplateSpec := DefaultWorkerPodTemplate(ctx, *cluster, *worker.DeepCopy(), podName, fqdnRayIP, "6379", "", 0) numInitContainers := len(podTemplateSpec.Spec.InitContainers) assert.Equal(t, expectedResult, numInitContainers, "A default init container is expected to be added.") @@ -1336,7 +1359,7 @@ func TestDefaultInitContainerImagePullPolicy(t *testing.T) { // set ray container imagePullPolicy worker.Template.Spec.Containers[utils.RayContainerIndex].ImagePullPolicy = tc.imagePullPolicy - podTemplateSpec := DefaultWorkerPodTemplate(ctx, *cluster, *worker.DeepCopy(), podName, fqdnRayIP, "6379") + podTemplateSpec := DefaultWorkerPodTemplate(ctx, *cluster, *worker.DeepCopy(), podName, fqdnRayIP, "6379", "", 0) healthCheckContainer := podTemplateSpec.Spec.InitContainers[len(podTemplateSpec.Spec.InitContainers)-1] assert.Equal(t, tc.expectedPullPolicy, healthCheckContainer.ImagePullPolicy, "The ImagePullPolicy of the init container should be the same as the Ray container.") diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 7e970eb3e82..d697b3c120a 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -663,14 +663,54 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv continue } + // Check if RayTpuMulithostIndexing feature is enabled + multihostIndexingEnabled := features.Enabled(features.RayMulithostIndexing) && worker.NumOfHosts > 1 + var workerGrpReplicaMap map[string][]corev1.Pod + + // Map current existing pods by label + if multihostIndexingEnabled { + workerGrpReplicaMap = make(map[string][]corev1.Pod) + for _, pod := range workerPods.Items { + if value, ok := pod.GetLabels()[utils.RayWorkerReplicaIndexKey]; ok { + podList := workerGrpReplicaMap[value] + workerGrpReplicaMap[value] = append(podList, pod) + } + } + } + // Delete unhealthy worker Pods. deletedWorkers := make(map[string]struct{}) deleted := struct{}{} numDeletedUnhealthyWorkerPods := 0 for _, workerPod := range workerPods.Items { shouldDelete, reason := shouldDeletePod(workerPod, rayv1.WorkerNode) + // This check will see if the pod was already deleted + if _, ok := deletedWorkers[workerPod.Name]; ok && shouldDelete { + continue + } logger.Info("reconcilePods", "worker Pod", workerPod.Name, "shouldDelete", shouldDelete, "reason", reason) - if shouldDelete { + // Atomic replica group delete if RayMulithostIndexing is enabled and numOfHost > 1 + if multihostIndexingEnabled && shouldDelete { + numDeletedUnhealthyWorkerPods += int(worker.NumOfHosts) + // find all worker pods with the same replica group + replicaGrpName := workerPod.Labels[utils.RayWorkerReplicaIndexKey] + replicaGrpPodListToDelete := workerGrpReplicaMap[replicaGrpName] + // delete all pods within the group replica + for _, replicaGrpPod := range replicaGrpPodListToDelete { + deletedWorkers[replicaGrpPod.Name] = deleted + if err := r.Delete(ctx, &replicaGrpPod); err != nil { + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), + "Failed deleting worker Pod %s/%s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v, %v", + replicaGrpPod.Namespace, replicaGrpPod.Name, replicaGrpPod.Status.Phase, replicaGrpPod.Spec.RestartPolicy, getRayContainerStateTerminated(replicaGrpPod), err) + return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) + } + r.rayClusterScaleExpectation.ExpectScalePod(replicaGrpPod.Namespace, instance.Name, worker.GroupName, replicaGrpPod.Name, expectations.Delete) + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), + "Deleted worker Pod %s/%s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v", + replicaGrpPod.Namespace, replicaGrpPod.Name, replicaGrpPod.Status.Phase, replicaGrpPod.Spec.RestartPolicy, getRayContainerStateTerminated(replicaGrpPod)) + } + delete(workerGrpReplicaMap, replicaGrpName) + } else if shouldDelete { numDeletedUnhealthyWorkerPods++ deletedWorkers[workerPod.Name] = deleted if err := r.Delete(ctx, &workerPod); err != nil { @@ -733,11 +773,35 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv if diff > 0 { // pods need to be added logger.Info("reconcilePods", "Number workers to add", diff, "Worker group", worker.GroupName) - // create all workers of this group - for i := 0; i < diff; i++ { - logger.Info("reconcilePods", "creating worker for group", worker.GroupName, "index", i, "total", diff) - if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil { - return errstd.Join(utils.ErrFailedCreateWorkerPod, err) + + // Worker creation path for multi-host indexing + if multihostIndexingEnabled { + // Check if the number of missing workers are groups of NumOfHost. This case should not occur as pods will be deleted in replica groups + if diff%int(worker.NumOfHosts) != 0 { + logger.Info("Uneven number of pods were found for multi-host worker groups", + "worker group", worker.GroupName, + ) + return fmt.Errorf("%d pods were found, was expecting multiple of %d for group %s. This could be caused by worker being deleted before the RayMultihostIndexing feature was enabled, ", len(headPods.Items), worker.NumOfHosts, worker.GroupName) + } + // Create the missing NumOfHost in groups. + replicasGrpsToCreate := diff / int(worker.NumOfHosts) + for i := 0; i < replicasGrpsToCreate; i++ { + replicaGrpName := utils.GenerateRayWorkerReplicaGroupName(worker.GroupName) + for j := 0; j < int(worker.NumOfHosts); j++ { + logger.Info("reconciledPods", "creating worker for group", worker.GroupName, "replica group index", i, "host index", j, "total", diff) + if err := r.createWorkerPodWithIndex(ctx, *instance, *worker.DeepCopy(), replicaGrpName, j); err != nil { + return errstd.Join(utils.ErrFailedCreateWorkerPod, err) + } + } + } + } else { + // Original pathing + // create all workers of this group + for i := 0; i < diff; i++ { + logger.Info("reconcilePods", "creating worker for group", worker.GroupName, "index", i, "total", diff) + if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil { + return errstd.Join(utils.ErrFailedCreateWorkerPod, err) + } } } } else if diff == 0 { @@ -765,18 +829,43 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv // diff < 0 means that we need to delete some Pods to meet the desired number of replicas. randomlyRemovedWorkers := -diff logger.Info("reconcilePods", "Number workers to delete randomly", randomlyRemovedWorkers, "Worker group", worker.GroupName) - for i := 0; i < randomlyRemovedWorkers; i++ { - randomPodToDelete := runningPods.Items[i] - logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name) - if err := r.Delete(ctx, &randomPodToDelete); err != nil { - if !errors.IsNotFound(err) { - r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), "Failed deleting Pod %s/%s, %v", randomPodToDelete.Namespace, randomPodToDelete.Name, err) - return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) + if multihostIndexingEnabled { + // randomlyRemovedWorkers have to be a multiple of numOfHosts so we will delete replica groups together + numOfReplicaGrpToDelete := randomlyRemovedWorkers % int(worker.NumOfHosts) + numOfReplicaGrpDeleted := 0 + for _, replicaGrpPodListToDelete := range workerGrpReplicaMap { + if numOfReplicaGrpDeleted >= numOfReplicaGrpToDelete { + break } - logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name) + for _, replicaGrpPod := range replicaGrpPodListToDelete { + if err := r.Delete(ctx, &replicaGrpPod); err != nil { + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), + "Failed deleting worker Pod %s/%s, %v", + replicaGrpPod.Namespace, replicaGrpPod.Name, err) + return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) + } + r.rayClusterScaleExpectation.ExpectScalePod(replicaGrpPod.Namespace, instance.Name, worker.GroupName, replicaGrpPod.Name, expectations.Delete) + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), + "Deleted worker Pod %s/%s", + replicaGrpPod.Namespace, replicaGrpPod.Name) + } + numOfReplicaGrpDeleted++ + } + } else { + // Original pathing withput multihostIndexingEnabled + for i := 0; i < randomlyRemovedWorkers; i++ { + randomPodToDelete := runningPods.Items[i] + logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name) + if err := r.Delete(ctx, &randomPodToDelete); err != nil { + if !errors.IsNotFound(err) { + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), "Failed deleting Pod %s/%s, %v", randomPodToDelete.Namespace, randomPodToDelete.Name, err) + return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) + } + logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name) + } + r.rayClusterScaleExpectation.ExpectScalePod(randomPodToDelete.Namespace, instance.Name, worker.GroupName, randomPodToDelete.Name, expectations.Delete) + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), "Deleted Pod %s/%s", randomPodToDelete.Namespace, randomPodToDelete.Name) } - r.rayClusterScaleExpectation.ExpectScalePod(randomPodToDelete.Namespace, instance.Name, worker.GroupName, randomPodToDelete.Name, expectations.Delete) - r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), "Deleted Pod %s/%s", randomPodToDelete.Namespace, randomPodToDelete.Name) } } else { logger.Info("Random Pod deletion is disabled for the cluster. The only decision-maker for Pod deletions is Autoscaler.") @@ -949,7 +1038,31 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray logger := ctrl.LoggerFrom(ctx) // build the pod then create it - pod := r.buildWorkerPod(ctx, instance, worker) + pod := r.buildWorkerPod(ctx, instance, worker, "", 0) + if r.options.BatchSchedulerManager != nil { + if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { + scheduler.AddMetadataToPod(ctx, &instance, worker.GroupName, &pod) + } else { + return err + } + } + + replica := pod + if err := r.Create(ctx, &replica); err != nil { + r.Recorder.Eventf(&instance, corev1.EventTypeWarning, string(utils.FailedToCreateWorkerPod), "Failed to create worker Pod for the cluster %s/%s, %v", instance.Namespace, instance.Name, err) + return err + } + r.rayClusterScaleExpectation.ExpectScalePod(replica.Namespace, instance.Name, worker.GroupName, replica.Name, expectations.Create) + logger.Info("Created worker Pod for RayCluster", "name", replica.Name) + r.Recorder.Eventf(&instance, corev1.EventTypeNormal, string(utils.CreatedWorkerPod), "Created worker Pod %s/%s", replica.Namespace, replica.Name) + return nil +} + +func (r *RayClusterReconciler) createWorkerPodWithIndex(ctx context.Context, instance rayv1.RayCluster, worker rayv1.WorkerGroupSpec, replicaGrpName string, hostIndex int) error { + logger := ctrl.LoggerFrom(ctx) + + // build the pod then create it + pod := r.buildWorkerPod(ctx, instance, worker, replicaGrpName, hostIndex) if r.options.BatchSchedulerManager != nil { if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { scheduler.AddMetadataToPod(ctx, &instance, worker.GroupName, &pod) @@ -998,7 +1111,7 @@ func getCreatorCRDType(instance rayv1.RayCluster) utils.CRDType { } // Build worker instance pods. -func (r *RayClusterReconciler) buildWorkerPod(ctx context.Context, instance rayv1.RayCluster, worker rayv1.WorkerGroupSpec) corev1.Pod { +func (r *RayClusterReconciler) buildWorkerPod(ctx context.Context, instance rayv1.RayCluster, worker rayv1.WorkerGroupSpec, replicaGrpName string, hostIndex int) corev1.Pod { logger := ctrl.LoggerFrom(ctx) podName := utils.PodName(fmt.Sprintf("%s-%s", instance.Name, worker.GroupName), rayv1.WorkerNode, true) fqdnRayIP := utils.GenerateFQDNServiceName(ctx, instance, instance.Namespace) // Fully Qualified Domain Name @@ -1006,7 +1119,7 @@ func (r *RayClusterReconciler) buildWorkerPod(ctx context.Context, instance rayv // The Ray head port used by workers to connect to the cluster (GCS server port for Ray >= 1.11.0, Redis port for older Ray.) headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams) autoscalingEnabled := utils.IsAutoscalingEnabled(&instance.Spec) - podTemplateSpec := common.DefaultWorkerPodTemplate(ctx, instance, worker, podName, fqdnRayIP, headPort) + podTemplateSpec := common.DefaultWorkerPodTemplate(ctx, instance, worker, podName, fqdnRayIP, headPort, replicaGrpName, hostIndex) if len(r.options.WorkerSidecarContainers) > 0 { podTemplateSpec.Spec.Containers = append(podTemplateSpec.Spec.Containers, r.options.WorkerSidecarContainers...) } diff --git a/ray-operator/controllers/ray/raycluster_controller_test.go b/ray-operator/controllers/ray/raycluster_controller_test.go index de8b7719161..97ba362d2c9 100644 --- a/ray-operator/controllers/ray/raycluster_controller_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_test.go @@ -19,6 +19,8 @@ import ( "context" "errors" "fmt" + "slices" + "strconv" "strings" "time" @@ -920,6 +922,9 @@ var _ = Context("Inside the default namespace", func() { numWorkerPods := 3 * int(numOfHosts) workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions() + // Checks if the multi-host indexing is enabled + multihostIndexingEnabled := features.Enabled(features.RayMulithostIndexing) + It("Verify RayCluster spec", func() { // These test are designed based on the following assumptions: // (1) Ray Autoscaler is enabled. @@ -963,6 +968,27 @@ var _ = Context("Inside the default namespace", func() { isAllPodsRunningByFilters).WithContext(ctx).WithArguments(workerPods, workerFilters).WithTimeout(time.Second*3).WithPolling(time.Millisecond*500).Should(BeTrue(), "All worker Pods should be running.") }) + It("All multi-host pods are properly labeled", func() { + if multihostIndexingEnabled { + workerGrpReplicaMap := make(map[string][]string) + for _, pod := range workerPods.Items { + hostIndex := pod.Labels[utils.RayHostIndexKey] + hostGrpId := pod.Labels[utils.RayWorkerReplicaIndexKey] + + grpReplicaIndexList, grpIdExists := workerGrpReplicaMap[hostGrpId] + if grpIdExists { + Expect(strconv.Atoi(hostIndex)).Should(BeNumerically("<", numOfHosts)) + Expect(strconv.Atoi(hostIndex)).Should(BeNumerically(">=", 0)) + Expect(slices.Contains(grpReplicaIndexList, hostIndex)).To(BeFalse()) + workerGrpReplicaMap[hostGrpId] = append(grpReplicaIndexList, hostIndex) + } else { + workerGrpReplicaMap[hostGrpId] = []string{} + Expect(len(workerGrpReplicaMap)).Should(BeNumerically("<", replicas)) + } + } + } + }) + It("RayCluster's .status.state transitions to 'ready' when all worker Pods are Running and check pod counts are correct", func() { desiredWorkerPods := replicas * numOfHosts minWorkerPods := minReplicas * numOfHosts @@ -1033,6 +1059,12 @@ var _ = Context("Inside the default namespace", func() { pod := workerPods.Items[0] err := k8sClient.Delete(ctx, &pod, &client.DeleteOptions{GracePeriodSeconds: ptr.To[int64](0)}) Expect(err).NotTo(HaveOccurred(), "Failed to delete a Pod") + if multihostIndexingEnabled { + // Number of pods should go down by num of hosts but then be re-created + Eventually( + listResourceFunc(ctx, &workerPods, workerFilters...), + time.Second*3, time.Millisecond*500).Should(Equal(numWorkerPods-int(numOfHosts)), fmt.Sprintf("workerGroup %v", workerPods.Items)) + } Eventually( listResourceFunc(ctx, &workerPods, workerFilters...), time.Second*3, time.Millisecond*500).Should(Equal(numWorkerPods), fmt.Sprintf("workerGroup %v", workerPods.Items)) diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 8ed43a813f6..20e44e0f888 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -27,6 +27,14 @@ const ( NumWorkerGroupsKey = "ray.io/num-worker-groups" KubeRayVersion = "ray.io/kuberay-version" + // Labels for feature RayMultihostIndexing + // RayWorkerReplicaIndexKey label is the unique label for the replica in a specific worker group. It is made up of the worker group name + // and a unique identifier. e.g. multi-host-worker-group-xh3hf + // + // RayHostIndexKey label represents the index of the host within the RayWorkerReplicaIndexKey. + RayWorkerReplicaIndexKey = "ray.io/worker-group-replica-index" + RayHostIndexKey = "ray.io/replica-host-index" + // In KubeRay, the Ray container must be the first application container in a head or worker Pod. RayContainerIndex = 0 diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 3bb63f79189..baa64ec300e 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -330,6 +330,12 @@ func GenerateRayJobId(rayjob string) string { return fmt.Sprintf("%s-%s", rayjob, rand.String(5)) } +// GenerateRayWorkerReplicaGroupName generates a name for the replica group +// currently used for RayMulithostIndexing +func GenerateRayWorkerReplicaGroupName(workerGroupName string) string { + return fmt.Sprintf("%s-%s", workerGroupName, rand.String(5)) +} + // GenerateIdentifier generates identifier of same group pods func GenerateIdentifier(clusterName string, nodeType rayv1.RayNodeType) string { return fmt.Sprintf("%s-%s", clusterName, nodeType) diff --git a/ray-operator/pkg/features/features.go b/ray-operator/pkg/features/features.go index 2abea2ffbbb..1a8729d228a 100644 --- a/ray-operator/pkg/features/features.go +++ b/ray-operator/pkg/features/features.go @@ -24,6 +24,12 @@ const ( // // Enables new deletion policy API in RayJob RayJobDeletionPolicy featuregate.Feature = "RayJobDeletionPolicy" + + // owner: @aaronliang + // rep: N/A + // alpha: v1.0 + // Enables multihost worker indexing + RayMulithostIndexing featuregate.Feature = "RayMultihostIndexing" ) func init() { @@ -33,6 +39,7 @@ func init() { var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ RayClusterStatusConditions: {Default: true, PreRelease: featuregate.Beta}, RayJobDeletionPolicy: {Default: false, PreRelease: featuregate.Alpha}, + RayMulithostIndexing: {Default: false, PreRelease: featuregate.Alpha}, } // SetFeatureGateDuringTest is a helper method to override feature gates in tests. From 728bd3faab8111eecd95c3bc36de3c9b7b527dd5 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Fri, 17 Oct 2025 12:37:08 +0000 Subject: [PATCH 2/8] Refactor and add e2e test Signed-off-by: Ryan O'Leary --- .../controllers/ray/common/pod_test.go | 2 +- .../controllers/ray/raycluster_controller.go | 284 +++++++++++------- .../ray/raycluster_controller_test.go | 52 ++-- ray-operator/controllers/ray/utils/util.go | 2 +- ray-operator/pkg/features/features.go | 6 +- .../test/e2e/raycluster_multi_host_test.go | 173 +++++++++++ 6 files changed, 388 insertions(+), 131 deletions(-) create mode 100644 ray-operator/test/e2e/raycluster_multi_host_test.go diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index 4c75afae0a4..41ec52c308c 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -1179,7 +1179,7 @@ func TestDeafultWorkerPodTemplateWithReplicaGrpAndIndex(t *testing.T) { fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace) worker := cluster.Spec.WorkerGroupSpecs[0] - features.SetFeatureGateDuringTest(t, features.RayMulithostIndexing, true) + features.SetFeatureGateDuringTest(t, features.RayMultiHostIndexing, true) worker.Template.ObjectMeta.Name = "ray-worker-test" worker.NumOfHosts = 4 diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index d697b3c120a..cc1958e182c 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -663,19 +663,13 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv continue } - // Check if RayTpuMulithostIndexing feature is enabled - multihostIndexingEnabled := features.Enabled(features.RayMulithostIndexing) && worker.NumOfHosts > 1 - var workerGrpReplicaMap map[string][]corev1.Pod - - // Map current existing pods by label - if multihostIndexingEnabled { - workerGrpReplicaMap = make(map[string][]corev1.Pod) - for _, pod := range workerPods.Items { - if value, ok := pod.GetLabels()[utils.RayWorkerReplicaIndexKey]; ok { - podList := workerGrpReplicaMap[value] - workerGrpReplicaMap[value] = append(podList, pod) - } + isRayMultiHostIndexing := worker.NumOfHosts > 1 && features.Enabled(features.RayMultiHostIndexing) + if isRayMultiHostIndexing { + if err := r.reconcileMultiHostWorkerGroup(ctx, instance, &worker, workerPods.Items); err != nil { + return err } + // Skip to the next worker as we've already handled multi-host reconciliation. + continue } // Delete unhealthy worker Pods. @@ -684,33 +678,8 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv numDeletedUnhealthyWorkerPods := 0 for _, workerPod := range workerPods.Items { shouldDelete, reason := shouldDeletePod(workerPod, rayv1.WorkerNode) - // This check will see if the pod was already deleted - if _, ok := deletedWorkers[workerPod.Name]; ok && shouldDelete { - continue - } logger.Info("reconcilePods", "worker Pod", workerPod.Name, "shouldDelete", shouldDelete, "reason", reason) - // Atomic replica group delete if RayMulithostIndexing is enabled and numOfHost > 1 - if multihostIndexingEnabled && shouldDelete { - numDeletedUnhealthyWorkerPods += int(worker.NumOfHosts) - // find all worker pods with the same replica group - replicaGrpName := workerPod.Labels[utils.RayWorkerReplicaIndexKey] - replicaGrpPodListToDelete := workerGrpReplicaMap[replicaGrpName] - // delete all pods within the group replica - for _, replicaGrpPod := range replicaGrpPodListToDelete { - deletedWorkers[replicaGrpPod.Name] = deleted - if err := r.Delete(ctx, &replicaGrpPod); err != nil { - r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), - "Failed deleting worker Pod %s/%s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v, %v", - replicaGrpPod.Namespace, replicaGrpPod.Name, replicaGrpPod.Status.Phase, replicaGrpPod.Spec.RestartPolicy, getRayContainerStateTerminated(replicaGrpPod), err) - return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) - } - r.rayClusterScaleExpectation.ExpectScalePod(replicaGrpPod.Namespace, instance.Name, worker.GroupName, replicaGrpPod.Name, expectations.Delete) - r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), - "Deleted worker Pod %s/%s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v", - replicaGrpPod.Namespace, replicaGrpPod.Name, replicaGrpPod.Status.Phase, replicaGrpPod.Spec.RestartPolicy, getRayContainerStateTerminated(replicaGrpPod)) - } - delete(workerGrpReplicaMap, replicaGrpName) - } else if shouldDelete { + if shouldDelete { numDeletedUnhealthyWorkerPods++ deletedWorkers[workerPod.Name] = deleted if err := r.Delete(ctx, &workerPod); err != nil { @@ -773,35 +742,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv if diff > 0 { // pods need to be added logger.Info("reconcilePods", "Number workers to add", diff, "Worker group", worker.GroupName) - - // Worker creation path for multi-host indexing - if multihostIndexingEnabled { - // Check if the number of missing workers are groups of NumOfHost. This case should not occur as pods will be deleted in replica groups - if diff%int(worker.NumOfHosts) != 0 { - logger.Info("Uneven number of pods were found for multi-host worker groups", - "worker group", worker.GroupName, - ) - return fmt.Errorf("%d pods were found, was expecting multiple of %d for group %s. This could be caused by worker being deleted before the RayMultihostIndexing feature was enabled, ", len(headPods.Items), worker.NumOfHosts, worker.GroupName) - } - // Create the missing NumOfHost in groups. - replicasGrpsToCreate := diff / int(worker.NumOfHosts) - for i := 0; i < replicasGrpsToCreate; i++ { - replicaGrpName := utils.GenerateRayWorkerReplicaGroupName(worker.GroupName) - for j := 0; j < int(worker.NumOfHosts); j++ { - logger.Info("reconciledPods", "creating worker for group", worker.GroupName, "replica group index", i, "host index", j, "total", diff) - if err := r.createWorkerPodWithIndex(ctx, *instance, *worker.DeepCopy(), replicaGrpName, j); err != nil { - return errstd.Join(utils.ErrFailedCreateWorkerPod, err) - } - } - } - } else { - // Original pathing - // create all workers of this group - for i := 0; i < diff; i++ { - logger.Info("reconcilePods", "creating worker for group", worker.GroupName, "index", i, "total", diff) - if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil { - return errstd.Join(utils.ErrFailedCreateWorkerPod, err) - } + // create all workers of this group + for i := 0; i < diff; i++ { + logger.Info("reconcilePods", "creating worker for group", worker.GroupName, "index", i, "total", diff) + if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil { + return errstd.Join(utils.ErrFailedCreateWorkerPod, err) } } } else if diff == 0 { @@ -829,43 +774,18 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv // diff < 0 means that we need to delete some Pods to meet the desired number of replicas. randomlyRemovedWorkers := -diff logger.Info("reconcilePods", "Number workers to delete randomly", randomlyRemovedWorkers, "Worker group", worker.GroupName) - if multihostIndexingEnabled { - // randomlyRemovedWorkers have to be a multiple of numOfHosts so we will delete replica groups together - numOfReplicaGrpToDelete := randomlyRemovedWorkers % int(worker.NumOfHosts) - numOfReplicaGrpDeleted := 0 - for _, replicaGrpPodListToDelete := range workerGrpReplicaMap { - if numOfReplicaGrpDeleted >= numOfReplicaGrpToDelete { - break - } - for _, replicaGrpPod := range replicaGrpPodListToDelete { - if err := r.Delete(ctx, &replicaGrpPod); err != nil { - r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), - "Failed deleting worker Pod %s/%s, %v", - replicaGrpPod.Namespace, replicaGrpPod.Name, err) - return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) - } - r.rayClusterScaleExpectation.ExpectScalePod(replicaGrpPod.Namespace, instance.Name, worker.GroupName, replicaGrpPod.Name, expectations.Delete) - r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), - "Deleted worker Pod %s/%s", - replicaGrpPod.Namespace, replicaGrpPod.Name) + for i := 0; i < randomlyRemovedWorkers; i++ { + randomPodToDelete := runningPods.Items[i] + logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name) + if err := r.Delete(ctx, &randomPodToDelete); err != nil { + if !errors.IsNotFound(err) { + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), "Failed deleting Pod %s/%s, %v", randomPodToDelete.Namespace, randomPodToDelete.Name, err) + return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) } - numOfReplicaGrpDeleted++ - } - } else { - // Original pathing withput multihostIndexingEnabled - for i := 0; i < randomlyRemovedWorkers; i++ { - randomPodToDelete := runningPods.Items[i] - logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name) - if err := r.Delete(ctx, &randomPodToDelete); err != nil { - if !errors.IsNotFound(err) { - r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), "Failed deleting Pod %s/%s, %v", randomPodToDelete.Namespace, randomPodToDelete.Name, err) - return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) - } - logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name) - } - r.rayClusterScaleExpectation.ExpectScalePod(randomPodToDelete.Namespace, instance.Name, worker.GroupName, randomPodToDelete.Name, expectations.Delete) - r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), "Deleted Pod %s/%s", randomPodToDelete.Namespace, randomPodToDelete.Name) + logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name) } + r.rayClusterScaleExpectation.ExpectScalePod(randomPodToDelete.Namespace, instance.Name, worker.GroupName, randomPodToDelete.Name, expectations.Delete) + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), "Deleted Pod %s/%s", randomPodToDelete.Namespace, randomPodToDelete.Name) } } else { logger.Info("Random Pod deletion is disabled for the cluster. The only decision-maker for Pod deletions is Autoscaler.") @@ -875,6 +795,166 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv return nil } +// deletePods is a helper function to handle the deletion of a list of Pods, setting scale expectations +// and recording events. +func (r *RayClusterReconciler) deletePods(ctx context.Context, instance *rayv1.RayCluster, podsToDelete []corev1.Pod, groupName string, reason string) error { + for i := range podsToDelete { + pod := podsToDelete[i] + if err := r.Delete(ctx, &pod); err != nil { + if errors.IsNotFound(err) { + continue + } + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPod), + "Failed deleting worker Pod %s/%s for group %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v, %v", + pod.Namespace, pod.Name, groupName, pod.Status.Phase, pod.Spec.RestartPolicy, getRayContainerStateTerminated(pod), err) + return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) + } + r.rayClusterScaleExpectation.ExpectScalePod(pod.Namespace, instance.Name, groupName, pod.Name, expectations.Delete) + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod), + "Deleted worker Pod %s/%s for group %s: %s", pod.Namespace, pod.Name, groupName, reason) + } + return nil +} + +// reconcileMultiHostWorkerGroup handles reconciliation and Pod deletion for worker groups with NumOfHosts > 1 when +// the RayMultihostIndexing feature is enabled. This function is responsible for: +// 1. Deleting incomplete or unhealthy multi-host groups atomically. +// 2. Explicit deletes of entire multi-host groups for the autoscaler. +// 3. Scale up/down of multi-host groups. +func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context, instance *rayv1.RayCluster, worker *rayv1.WorkerGroupSpec, workerPods []corev1.Pod) error { + logger := ctrl.LoggerFrom(ctx) + + // 1. Group existing pods by ray.io/worker-group-replica-index. + replicaMap := make(map[string][]corev1.Pod) + for _, pod := range workerPods { + if replicaName, ok := pod.Labels[utils.RayWorkerReplicaIndexKey]; ok { + replicaMap[replicaName] = append(replicaMap[replicaName], pod) + } + } + + // 2. Clean up incomplete replica groups with deleted Pods caused by external deletion. + for replicaName, podList := range replicaMap { + if len(podList) < int(worker.NumOfHosts) { + logger.Info("Found incomplete multi-host replica group, deleting all remaining pods to maintain atomicity.", "group", worker.GroupName, "replica", replicaName, "found", len(podList), "expected", worker.NumOfHosts) + if err := r.deletePods(ctx, instance, podList, worker.GroupName, "cleanup of incomplete multi-host group"); err != nil { + return err + } + // Requeue to avoid creating new Pods on this reconciliation. + return fmt.Errorf("cleaned up incomplete replica group %s, requeueing", replicaName) + } + } + + // 3. Delete unhealthy replica groups. + deletedPods := make(map[string]struct{}) + for _, pod := range workerPods { + if _, alreadyDeleted := deletedPods[pod.Name]; alreadyDeleted { + continue + } + if shouldDelete, reason := shouldDeletePod(pod, rayv1.WorkerNode); shouldDelete { + replicaName := pod.Labels[utils.RayWorkerReplicaIndexKey] + podsToDelete, ok := replicaMap[replicaName] + if !ok { + continue + } + logger.Info("Deleting unhealthy replica group.", "group", worker.GroupName, "replica", replicaName, "reason", reason) + if err := r.deletePods(ctx, instance, podsToDelete, worker.GroupName, reason); err != nil { + return err + } + // All Pods in the group have been deleted. + for _, p := range podsToDelete { + deletedPods[p.Name] = struct{}{} + } + } + } + if len(deletedPods) > 0 { + return fmt.Errorf("deleted %d unhealthy worker Pods in multi-host groups, requeueing", len(deletedPods)) + } + + // 4. Handle explicit deletions from the autoscaler. + if len(worker.ScaleStrategy.WorkersToDelete) > 0 { + podsToDeleteFromStrategy := make(map[string]corev1.Pod) + for _, podName := range worker.ScaleStrategy.WorkersToDelete { + for _, pod := range workerPods { + if pod.Name == podName { + replicaName := pod.Labels[utils.RayWorkerReplicaIndexKey] + for _, p := range replicaMap[replicaName] { + podsToDeleteFromStrategy[p.Name] = p + } + break + } + } + } + + if len(podsToDeleteFromStrategy) > 0 { + logger.Info("removing the pods in the scaleStrategy of", "group", worker.GroupName, "podsToDelete", len(podsToDeleteFromStrategy)) + var podsToDel []corev1.Pod + for _, p := range podsToDeleteFromStrategy { + podsToDel = append(podsToDel, p) + } + if err := r.deletePods(ctx, instance, podsToDel, worker.GroupName, "autoscaler scale-down request"); err != nil { + return err + } + worker.ScaleStrategy.WorkersToDelete = []string{} + return fmt.Errorf("deleted %d worker Pods based on ScaleStrategy, requeueing", len(podsToDel)) + } + // Clear WorkersToDelete after deletion. + worker.ScaleStrategy.WorkersToDelete = []string{} + } + + // 5. Calculate Pod diff for scaling up or down by NumOfHosts. + runningPodsCount := len(workerPods) - len(deletedPods) + numExpectedWorkerPods := int(utils.GetWorkerGroupDesiredReplicas(ctx, *worker)) + diff := numExpectedWorkerPods - runningPodsCount + logger.Info("Reconciling multi-host group", "group", worker.GroupName, "expectedPods", numExpectedWorkerPods, "runningPods", runningPodsCount, "diff", diff) + + // Scale up NumOfHost workers per replica. + if diff > 0 { + logger.Info("reconcileMultiHostWorkerGroup", "Number workers to add", diff, "Worker group", worker.GroupName) + if diff%int(worker.NumOfHosts) != 0 { + return fmt.Errorf("cannot scale up multi-host group %s: required %d pods, which is not a multiple of NumOfHosts (%d)", worker.GroupName, diff, worker.NumOfHosts) + } + replicasToCreate := diff / int(worker.NumOfHosts) + logger.Info("Scaling up multi-host group", "group", worker.GroupName, "replicasToCreate", replicasToCreate) + for i := 0; i < replicasToCreate; i++ { + replicaName := utils.GenerateRayWorkerReplicaGroupName(worker.GroupName) + for j := 0; j < int(worker.NumOfHosts); j++ { + if err := r.createWorkerPodWithIndex(ctx, *instance, *worker, replicaName, j); err != nil { + return errstd.Join(utils.ErrFailedCreateWorkerPod, err) + } + } + } + } else if diff < 0 { + // Scale down NumOfHost workers per replica. + enableInTreeAutoscaling := utils.IsAutoscalingEnabled(&instance.Spec) + enableRandomPodDelete := false + if enableInTreeAutoscaling { + if s := os.Getenv(utils.ENABLE_RANDOM_POD_DELETE); strings.ToLower(s) == "true" { + enableRandomPodDelete = true + } + } + if !enableInTreeAutoscaling || enableRandomPodDelete { + workersToRemove := -diff + groupsToRemove := (workersToRemove + int(worker.NumOfHosts) - 1) / int(worker.NumOfHosts) + logger.Info("Scaling down multi-host group by randomly deleting replica groups", "group", worker.GroupName, "groupsToRemove", groupsToRemove) + + groupsDeleted := 0 + for _, podList := range replicaMap { + if groupsDeleted >= groupsToRemove { + break + } + if err := r.deletePods(ctx, instance, podList, worker.GroupName, "scaling down"); err != nil { + return err + } + groupsDeleted++ + } + } else { + logger.Info("Random replica group deletion is disabled for the cluster. The only decision-maker for Pod deletions is the Ray Autoscaler.") + } + } + + return nil +} + // shouldDeletePod returns whether the Pod should be deleted and the reason // // @param pod: The Pod to be checked. diff --git a/ray-operator/controllers/ray/raycluster_controller_test.go b/ray-operator/controllers/ray/raycluster_controller_test.go index 97ba362d2c9..56290797589 100644 --- a/ray-operator/controllers/ray/raycluster_controller_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_test.go @@ -49,6 +49,15 @@ func rayClusterTemplate(name string, namespace string) *rayv1.RayCluster { maxReplicas int32 = 4 replicas int32 = 3 ) + sharedMemVolume := corev1.Volume{ + Name: "shared-mem", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + SizeLimit: ptr.To(resource.MustParse("1Gi")), + }, + }, + } return &rayv1.RayCluster{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -58,6 +67,7 @@ func rayClusterTemplate(name string, namespace string) *rayv1.RayCluster { HeadGroupSpec: rayv1.HeadGroupSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{sharedMemVolume}, Containers: []corev1.Container{ { Name: "ray-head", @@ -75,6 +85,7 @@ func rayClusterTemplate(name string, namespace string) *rayv1.RayCluster { GroupName: "small-group", Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{sharedMemVolume}, Containers: []corev1.Container{ { Name: "ray-worker", @@ -922,8 +933,9 @@ var _ = Context("Inside the default namespace", func() { numWorkerPods := 3 * int(numOfHosts) workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions() - // Checks if the multi-host indexing is enabled - multihostIndexingEnabled := features.Enabled(features.RayMulithostIndexing) + BeforeEach(func() { + features.SetFeatureGateDuringTest(GinkgoTB(), features.RayMultiHostIndexing, true) + }) It("Verify RayCluster spec", func() { // These test are designed based on the following assumptions: @@ -969,22 +981,20 @@ var _ = Context("Inside the default namespace", func() { }) It("All multi-host pods are properly labeled", func() { - if multihostIndexingEnabled { - workerGrpReplicaMap := make(map[string][]string) - for _, pod := range workerPods.Items { - hostIndex := pod.Labels[utils.RayHostIndexKey] - hostGrpId := pod.Labels[utils.RayWorkerReplicaIndexKey] - - grpReplicaIndexList, grpIdExists := workerGrpReplicaMap[hostGrpId] - if grpIdExists { - Expect(strconv.Atoi(hostIndex)).Should(BeNumerically("<", numOfHosts)) - Expect(strconv.Atoi(hostIndex)).Should(BeNumerically(">=", 0)) - Expect(slices.Contains(grpReplicaIndexList, hostIndex)).To(BeFalse()) - workerGrpReplicaMap[hostGrpId] = append(grpReplicaIndexList, hostIndex) - } else { - workerGrpReplicaMap[hostGrpId] = []string{} - Expect(len(workerGrpReplicaMap)).Should(BeNumerically("<", replicas)) - } + workerGrpReplicaMap := make(map[string][]string) + for _, pod := range workerPods.Items { + hostIndex := pod.Labels[utils.RayHostIndexKey] + hostGrpId := pod.Labels[utils.RayWorkerReplicaIndexKey] + + grpReplicaIndexList, grpIdExists := workerGrpReplicaMap[hostGrpId] + if grpIdExists { + Expect(strconv.Atoi(hostIndex)).Should(BeNumerically("<", numOfHosts)) + Expect(strconv.Atoi(hostIndex)).Should(BeNumerically(">=", 0)) + Expect(slices.Contains(grpReplicaIndexList, hostIndex)).To(BeFalse()) + workerGrpReplicaMap[hostGrpId] = append(grpReplicaIndexList, hostIndex) + } else { + workerGrpReplicaMap[hostGrpId] = []string{} + Expect(len(workerGrpReplicaMap)).Should(BeNumerically("<=", int(replicas))) } } }) @@ -1059,12 +1069,6 @@ var _ = Context("Inside the default namespace", func() { pod := workerPods.Items[0] err := k8sClient.Delete(ctx, &pod, &client.DeleteOptions{GracePeriodSeconds: ptr.To[int64](0)}) Expect(err).NotTo(HaveOccurred(), "Failed to delete a Pod") - if multihostIndexingEnabled { - // Number of pods should go down by num of hosts but then be re-created - Eventually( - listResourceFunc(ctx, &workerPods, workerFilters...), - time.Second*3, time.Millisecond*500).Should(Equal(numWorkerPods-int(numOfHosts)), fmt.Sprintf("workerGroup %v", workerPods.Items)) - } Eventually( listResourceFunc(ctx, &workerPods, workerFilters...), time.Second*3, time.Millisecond*500).Should(Equal(numWorkerPods), fmt.Sprintf("workerGroup %v", workerPods.Items)) diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index baa64ec300e..e7406b44c99 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -331,7 +331,7 @@ func GenerateRayJobId(rayjob string) string { } // GenerateRayWorkerReplicaGroupName generates a name for the replica group -// currently used for RayMulithostIndexing +// currently used for RayMultiHostIndexing func GenerateRayWorkerReplicaGroupName(workerGroupName string) string { return fmt.Sprintf("%s-%s", workerGroupName, rand.String(5)) } diff --git a/ray-operator/pkg/features/features.go b/ray-operator/pkg/features/features.go index 1a8729d228a..5aedc155c81 100644 --- a/ray-operator/pkg/features/features.go +++ b/ray-operator/pkg/features/features.go @@ -28,8 +28,8 @@ const ( // owner: @aaronliang // rep: N/A // alpha: v1.0 - // Enables multihost worker indexing - RayMulithostIndexing featuregate.Feature = "RayMultihostIndexing" + // Enables multi-host worker indexing + RayMultiHostIndexing featuregate.Feature = "RayMultiHostIndexing" ) func init() { @@ -39,7 +39,7 @@ func init() { var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ RayClusterStatusConditions: {Default: true, PreRelease: featuregate.Beta}, RayJobDeletionPolicy: {Default: false, PreRelease: featuregate.Alpha}, - RayMulithostIndexing: {Default: false, PreRelease: featuregate.Alpha}, + RayMultiHostIndexing: {Default: false, PreRelease: featuregate.Alpha}, } // SetFeatureGateDuringTest is a helper method to override feature gates in tests. diff --git a/ray-operator/test/e2e/raycluster_multi_host_test.go b/ray-operator/test/e2e/raycluster_multi_host_test.go new file mode 100644 index 00000000000..68b40dc86f6 --- /dev/null +++ b/ray-operator/test/e2e/raycluster_multi_host_test.go @@ -0,0 +1,173 @@ +package e2e + +import ( + "testing" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/api/resource" + corev1ac "k8s.io/client-go/applyconfigurations/core/v1" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" + "github.com/ray-project/kuberay/ray-operator/pkg/features" + . "github.com/ray-project/kuberay/ray-operator/test/support" +) + +func TestRayClusterMultiHost(t *testing.T) { + test := With(t) + g := NewWithT(t) + + // Create a namespace + namespace := test.NewTestNamespace() + + features.SetFeatureGateDuringTest(t, features.RayMultiHostIndexing, true) + + const ( + numOfHosts = 4 + initialReplicas = 2 + clusterName = "raycluster-multihost" + ) + sharedMemVolumeAC := corev1ac.Volume(). + WithName("shared-mem"). + WithEmptyDir(corev1ac.EmptyDirVolumeSource(). + WithMedium(corev1.StorageMediumMemory). + WithSizeLimit(resource.MustParse("1Gi")), + ) + + // Define the RayCluster spec with a multi-host worker group. + rayClusterAC := rayv1ac.RayCluster(clusterName, namespace.Name). + WithSpec(rayv1ac.RayClusterSpec(). + WithRayVersion(GetRayVersion()). + WithEnableInTreeAutoscaling(true). + WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). + WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}). + WithTemplate(HeadPodTemplateApplyConfiguration(). + // All PodSpec configurations go inside WithSpec. + WithSpec(corev1ac.PodSpec(). + WithVolumes(sharedMemVolumeAC). + WithRestartPolicy(corev1.RestartPolicyNever). + WithContainers(corev1ac.Container(). + WithName("ray-head"). + WithImage(GetRayImage()). + WithEnv(corev1ac.EnvVar().WithName(utils.RAY_ENABLE_AUTOSCALER_V2).WithValue("1")). + WithPorts( + corev1ac.ContainerPort().WithName(utils.GcsServerPortName).WithContainerPort(utils.DefaultGcsServerPort), + corev1ac.ContainerPort().WithName(utils.ServingPortName).WithContainerPort(utils.DefaultServingPort), + corev1ac.ContainerPort().WithName(utils.DashboardPortName).WithContainerPort(utils.DefaultDashboardPort), + corev1ac.ContainerPort().WithName(utils.ClientPortName).WithContainerPort(utils.DefaultClientPort), + ). + WithResources(corev1ac.ResourceRequirements(). + WithRequests(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("3Gi"), + }). + WithLimits(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("3Gi"), + })), + ), + ), + ), + ). + WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). + WithGroupName("multi-host-group"). + WithReplicas(initialReplicas). + WithMinReplicas(0). + WithMaxReplicas(5). + WithNumOfHosts(numOfHosts). + WithTemplate(WorkerPodTemplateApplyConfiguration(). + // All PodSpec configurations go inside WithSpec here as well. + WithSpec(corev1ac.PodSpec(). + WithVolumes(sharedMemVolumeAC). + WithRestartPolicy(corev1.RestartPolicyNever). + WithContainers(corev1ac.Container(). + WithName("ray-worker"). + WithImage(GetRayImage()). + WithResources(corev1ac.ResourceRequirements(). + WithRequests(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("300m"), + corev1.ResourceMemory: resource.MustParse("1G"), + }). + WithLimits(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("1G"), + })), + ), + ), + ), + ), + ) + + // Create the RayCluster. + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name) + + // Wait for the cluster to become Ready and verify the initial Pod count. + LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", rayCluster.Namespace, rayCluster.Name) + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutLong). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + expectedPodCount := initialReplicas * numOfHosts + g.Eventually(func() ([]corev1.Pod, error) { + return GetWorkerPods(test, rayCluster) + }, TestTimeoutShort).Should(HaveLen(expectedPodCount)) + + // Verify that all pods are correctly labeled. + LogWithTimestamp(test.T(), "Verifying labels on multi-host pods for %s/%s", rayCluster.Namespace, rayCluster.Name) + workerPods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + replicaMap := make(map[string][]string) + for _, pod := range workerPods { + replicaName, ok := pod.Labels[utils.RayWorkerReplicaIndexKey] + g.Expect(ok).To(BeTrue(), "Pod %s should have a replica index label", pod.Name) + hostIndex, ok := pod.Labels[utils.RayHostIndexKey] + g.Expect(ok).To(BeTrue(), "Pod %s should have a host index label", pod.Name) + replicaMap[replicaName] = append(replicaMap[replicaName], hostIndex) + } + g.Expect(replicaMap).To(HaveLen(initialReplicas), "Should have the correct number of replica groups") + for replicaName, hostIndices := range replicaMap { + g.Expect(hostIndices).To(HaveLen(numOfHosts), "Replica group %s should be complete", replicaName) + } + + // Scale down replicas from 2 to 1. Verify we scale by a multiple of NumOfHosts. + LogWithTimestamp(test.T(), "Scaling down RayCluster %s/%s", rayCluster.Namespace, rayCluster.Name) + rayClusterAC.Spec.WorkerGroupSpecs[0].WithReplicas(1) + _, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + + expectedPodCount = 1 * numOfHosts + g.Eventually(func() ([]corev1.Pod, error) { + return GetWorkerPods(test, rayCluster) + }, TestTimeoutShort).Should(HaveLen(expectedPodCount), "Should scale down to 1 multi-host group (4 pods)") + + // Test scale up: Increase replicas from 1 to 3. + LogWithTimestamp(test.T(), "Scaling up RayCluster %s/%s", rayCluster.Namespace, rayCluster.Name) + rayClusterAC.Spec.WorkerGroupSpecs[0].WithReplicas(3) + _, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + + expectedPodCount = 3 * numOfHosts + g.Eventually(func() ([]corev1.Pod, error) { + return GetWorkerPods(test, rayCluster) + }, TestTimeoutShort).Should(HaveLen(expectedPodCount), "Should scale up to 3 multi-host groups (12 pods)") + + // Manually delete a single pod and verify the controller atomically re-creates the slice. + LogWithTimestamp(test.T(), "Testing atomic multi-host group recreation for RayCluster %s/%s", rayCluster.Namespace, rayCluster.Name) + workerPods, err = GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + podToDelete := workerPods[0] + err = test.Client().Core().CoreV1().Pods(namespace.Name).Delete(test.Ctx(), podToDelete.Name, metav1.DeleteOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + // The controller should first clean up the broken multi-host group (-4 pods), and then re-scale it up (+4 pods). + LogWithTimestamp(test.T(), "Waiting for controller to reconcile multi-host group.") + // Reconcilation happens too quickly to catch the state where expectedPodCount-NumOfHosts, but we can test + // that externally deleted Pods will be re-created to satisfy the expected number. + g.Eventually(func() ([]corev1.Pod, error) { + return GetWorkerPods(test, rayCluster) + }, TestTimeoutShort).Should(HaveLen(expectedPodCount), "Controller restored cluster to the correct number of pods.") +} From bfcbc736d3671acc6db1b6c121c08e9f40c158b3 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Fri, 17 Oct 2025 20:45:42 +0000 Subject: [PATCH 3/8] Rebase and lint Signed-off-by: Ryan O'Leary --- ray-operator/test/e2e/raycluster_multi_host_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ray-operator/test/e2e/raycluster_multi_host_test.go b/ray-operator/test/e2e/raycluster_multi_host_test.go index 68b40dc86f6..6c29ce9776f 100644 --- a/ray-operator/test/e2e/raycluster_multi_host_test.go +++ b/ray-operator/test/e2e/raycluster_multi_host_test.go @@ -5,8 +5,8 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1ac "k8s.io/client-go/applyconfigurations/core/v1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" @@ -165,7 +165,7 @@ func TestRayClusterMultiHost(t *testing.T) { // The controller should first clean up the broken multi-host group (-4 pods), and then re-scale it up (+4 pods). LogWithTimestamp(test.T(), "Waiting for controller to reconcile multi-host group.") - // Reconcilation happens too quickly to catch the state where expectedPodCount-NumOfHosts, but we can test + // Reconciliation happens too quickly to catch the state where expectedPodCount-NumOfHosts, but we can test // that externally deleted Pods will be re-created to satisfy the expected number. g.Eventually(func() ([]corev1.Pod, error) { return GetWorkerPods(test, rayCluster) From c449e6c8d2e8352bc4b65e5ffe9b9d02661a3be3 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Fri, 17 Oct 2025 21:33:31 +0000 Subject: [PATCH 4/8] Add test to buildkite Signed-off-by: Ryan O'Leary --- .buildkite/values-kuberay-operator-override.yaml | 2 ++ helm-chart/kuberay-operator/README.md | 2 ++ helm-chart/kuberay-operator/values.yaml | 2 ++ 3 files changed, 6 insertions(+) diff --git a/.buildkite/values-kuberay-operator-override.yaml b/.buildkite/values-kuberay-operator-override.yaml index 7dc396edd71..41fdc9f1a31 100644 --- a/.buildkite/values-kuberay-operator-override.yaml +++ b/.buildkite/values-kuberay-operator-override.yaml @@ -16,3 +16,5 @@ featureGates: enabled: true - name: RayJobDeletionPolicy enabled: true + - name: RayMultiHostIndexing + enabled: true diff --git a/helm-chart/kuberay-operator/README.md b/helm-chart/kuberay-operator/README.md index fedcc71829b..c28384b8f54 100644 --- a/helm-chart/kuberay-operator/README.md +++ b/helm-chart/kuberay-operator/README.md @@ -172,6 +172,8 @@ spec: | featureGates[0].enabled | bool | `true` | | | featureGates[1].name | string | `"RayJobDeletionPolicy"` | | | featureGates[1].enabled | bool | `false` | | +| featureGates[2].name | string | `"RayMultiHostIndexing"` | | +| featureGates[2].enabeld | bool | `false` | | | metrics.enabled | bool | `true` | Whether KubeRay operator should emit control plane metrics. | | metrics.serviceMonitor.enabled | bool | `false` | Enable a prometheus ServiceMonitor | | metrics.serviceMonitor.interval | string | `"30s"` | Prometheus ServiceMonitor interval | diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml index 91cc9b0f578..1e38bd04123 100644 --- a/helm-chart/kuberay-operator/values.yaml +++ b/helm-chart/kuberay-operator/values.yaml @@ -117,6 +117,8 @@ featureGates: enabled: true - name: RayJobDeletionPolicy enabled: false +- name: RayMultiHostIndexing + enabeld: false # Configurations for KubeRay operator metrics. metrics: From f5e7250dd696907aa5b572106ba769d1141fc419 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Fri, 17 Oct 2025 22:59:17 +0000 Subject: [PATCH 5/8] Fix typo in helm Signed-off-by: Ryan O'Leary --- helm-chart/kuberay-operator/README.md | 2 +- helm-chart/kuberay-operator/values.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/helm-chart/kuberay-operator/README.md b/helm-chart/kuberay-operator/README.md index c28384b8f54..2a50677e9e1 100644 --- a/helm-chart/kuberay-operator/README.md +++ b/helm-chart/kuberay-operator/README.md @@ -173,7 +173,7 @@ spec: | featureGates[1].name | string | `"RayJobDeletionPolicy"` | | | featureGates[1].enabled | bool | `false` | | | featureGates[2].name | string | `"RayMultiHostIndexing"` | | -| featureGates[2].enabeld | bool | `false` | | +| featureGates[2].enabled | bool | `false` | | | metrics.enabled | bool | `true` | Whether KubeRay operator should emit control plane metrics. | | metrics.serviceMonitor.enabled | bool | `false` | Enable a prometheus ServiceMonitor | | metrics.serviceMonitor.interval | string | `"30s"` | Prometheus ServiceMonitor interval | diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml index 1e38bd04123..3bc1d2765c4 100644 --- a/helm-chart/kuberay-operator/values.yaml +++ b/helm-chart/kuberay-operator/values.yaml @@ -118,7 +118,7 @@ featureGates: - name: RayJobDeletionPolicy enabled: false - name: RayMultiHostIndexing - enabeld: false + enabled: false # Configurations for KubeRay operator metrics. metrics: From c49b6d3bf3a8d525861a00d43ad8a3fc78945289 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Mon, 20 Oct 2025 21:56:25 +0000 Subject: [PATCH 6/8] remove util function Signed-off-by: Ryan O'Leary --- ray-operator/controllers/ray/common/pod.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index d92e5f1b2f1..ba8ffd0cdfb 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -329,6 +329,12 @@ func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, wo mergedLabels := mergeLabels(workerSpec.Template.ObjectMeta.Labels, workerSpec.Labels) podTemplate.Labels = labelPod(rayv1.WorkerNode, instance.Name, workerSpec.GroupName, mergedLabels) + // Add additional labels for RayMultihostIndexing + multihostIndexingEnabled := features.Enabled(features.RayMultiHostIndexing) && workerSpec.NumOfHosts > 1 + if multihostIndexingEnabled { + podTemplate.Labels[utils.RayWorkerReplicaIndexKey] = replicaGrpName + podTemplate.Labels[utils.RayHostIndexKey] = strconv.Itoa(numHostIndex) + } workerSpec.RayStartParams = setMissingRayStartParams(ctx, workerSpec.RayStartParams, rayv1.WorkerNode, headPort, fqdnRayIP) initTemplateAnnotations(instance, &podTemplate) @@ -642,15 +648,6 @@ func labelPod(rayNodeType rayv1.RayNodeType, rayClusterName string, groupName st return labels } -// addMultihostIndexingPodLabels returns labels that contain RayMultihostIndexing feature labels -func addMultihostIndexingPodLabels(currentLabels map[string]string, replicaGrpName string, numHostIndex int) map[string]string { - labels := currentLabels - labels[utils.RayWorkerReplicaIndexKey] = replicaGrpName - labels[utils.RayHostIndexKey] = strconv.Itoa(numHostIndex) - - return labels -} - func setInitContainerEnvVars(container *corev1.Container, fqdnRayIP string) { if len(container.Env) == 0 { container.Env = []corev1.EnvVar{} From 9afd2595f6489871dcf0a383f74be04657d2e413 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Mon, 20 Oct 2025 22:19:40 +0000 Subject: [PATCH 7/8] fix test Signed-off-by: Ryan O'Leary --- ray-operator/test/e2e/raycluster_multi_host_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/ray-operator/test/e2e/raycluster_multi_host_test.go b/ray-operator/test/e2e/raycluster_multi_host_test.go index 6c29ce9776f..3a234a7dba2 100644 --- a/ray-operator/test/e2e/raycluster_multi_host_test.go +++ b/ray-operator/test/e2e/raycluster_multi_host_test.go @@ -41,7 +41,6 @@ func TestRayClusterMultiHost(t *testing.T) { rayClusterAC := rayv1ac.RayCluster(clusterName, namespace.Name). WithSpec(rayv1ac.RayClusterSpec(). WithRayVersion(GetRayVersion()). - WithEnableInTreeAutoscaling(true). WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}). WithTemplate(HeadPodTemplateApplyConfiguration(). @@ -52,7 +51,6 @@ func TestRayClusterMultiHost(t *testing.T) { WithContainers(corev1ac.Container(). WithName("ray-head"). WithImage(GetRayImage()). - WithEnv(corev1ac.EnvVar().WithName(utils.RAY_ENABLE_AUTOSCALER_V2).WithValue("1")). WithPorts( corev1ac.ContainerPort().WithName(utils.GcsServerPortName).WithContainerPort(utils.DefaultGcsServerPort), corev1ac.ContainerPort().WithName(utils.ServingPortName).WithContainerPort(utils.DefaultServingPort), @@ -79,7 +77,6 @@ func TestRayClusterMultiHost(t *testing.T) { WithMaxReplicas(5). WithNumOfHosts(numOfHosts). WithTemplate(WorkerPodTemplateApplyConfiguration(). - // All PodSpec configurations go inside WithSpec here as well. WithSpec(corev1ac.PodSpec(). WithVolumes(sharedMemVolumeAC). WithRestartPolicy(corev1.RestartPolicyNever). From 29924f140bbb24ee73f8064b4a5343646bddfbb5 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Tue, 21 Oct 2025 09:58:00 +0000 Subject: [PATCH 8/8] Enable feature in CI and remove requeue on delete Signed-off-by: Ryan O'Leary --- .../config/overlays/test-overrides/deployment-override.yaml | 2 +- ray-operator/controllers/ray/raycluster_controller.go | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/ray-operator/config/overlays/test-overrides/deployment-override.yaml b/ray-operator/config/overlays/test-overrides/deployment-override.yaml index 5f7a1eba665..7f2a4e2403c 100644 --- a/ray-operator/config/overlays/test-overrides/deployment-override.yaml +++ b/ray-operator/config/overlays/test-overrides/deployment-override.yaml @@ -9,4 +9,4 @@ spec: containers: - name: kuberay-operator args: - - --feature-gates=RayClusterStatusConditions=true,RayJobDeletionPolicy=true + - --feature-gates=RayClusterStatusConditions=true,RayJobDeletionPolicy=true,RayMultiHostIndexing=true diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index cc1958e182c..7a0b2130744 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -866,9 +866,6 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context } } } - if len(deletedPods) > 0 { - return fmt.Errorf("deleted %d unhealthy worker Pods in multi-host groups, requeueing", len(deletedPods)) - } // 4. Handle explicit deletions from the autoscaler. if len(worker.ScaleStrategy.WorkersToDelete) > 0 {