diff --git a/docs/reference/api.md b/docs/reference/api.md index fb6a624645e..2a2c3bbf470 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -297,6 +297,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | +| `upgradeStrategy` _[RayClusterUpgradeStrategy](#rayclusterupgradestrategy)_ | UpgradeStrategy defines the scaling policy used when upgrading the RayCluster | | | | `authOptions` _[AuthOptions](#authoptions)_ | AuthOptions specifies the authentication options for the RayCluster. | | | | `suspend` _boolean_ | Suspend indicates whether a RayCluster should be suspended.
A suspended RayCluster will have head pods and worker pods deleted. | | | | `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayCluster.
The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.
The kuberay-operator reconciles a RayCluster which doesn't have this field at all or
the field value is the reserved string 'ray.io/kuberay-operator',
but delegates reconciling the RayCluster with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable. | | | @@ -309,6 +310,35 @@ _Appears in:_ | `workerGroupSpecs` _[WorkerGroupSpec](#workergroupspec) array_ | WorkerGroupSpecs are the specs for the worker pods | | | +#### RayClusterUpgradeStrategy + + + + + + + +_Appears in:_ +- [RayClusterSpec](#rayclusterspec) + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `type` _[RayClusterUpgradeType](#rayclusterupgradetype)_ | Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. | | | + + +#### RayClusterUpgradeType + +_Underlying type:_ _string_ + + + + + +_Appears in:_ +- [RayClusterUpgradeStrategy](#rayclusterupgradestrategy) + + + #### RayJob diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml index 45f5406c411..e731ca0105f 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml @@ -4532,6 +4532,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index 1f4c8432168..b5d7ce86490 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -4614,6 +4614,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index baec5cfe1f3..634b54b59de 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -4512,6 +4512,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index f67a80ba854..b3113d65fd1 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -11,6 +11,9 @@ import ( // RayClusterSpec defines the desired state of RayCluster type RayClusterSpec struct { + // UpgradeStrategy defines the scaling policy used when upgrading the RayCluster + // +optional + UpgradeStrategy *RayClusterUpgradeStrategy `json:"upgradeStrategy,omitempty"` // AuthOptions specifies the authentication options for the RayCluster. // +optional AuthOptions *AuthOptions `json:"authOptions,omitempty"` @@ -49,6 +52,21 @@ type RayClusterSpec struct { WorkerGroupSpecs []WorkerGroupSpec `json:"workerGroupSpecs,omitempty"` } +type RayClusterUpgradeType string + +const ( + // During upgrade, Recreate strategy will delete all existing pods before creating new ones + Recreate RayClusterUpgradeType = "Recreate" + // No new pod will be created while the strategy is set to None + RayClusterUpgradeNone RayClusterUpgradeType = "None" +) + +type RayClusterUpgradeStrategy struct { + // Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. + // +optional + Type *RayClusterUpgradeType `json:"type,omitempty"` +} + // AuthMode describes the authentication mode for the Ray cluster. type AuthMode string diff --git a/ray-operator/apis/ray/v1/rayservice_types.go b/ray-operator/apis/ray/v1/rayservice_types.go index 55d142822db..4bda8b3b864 100644 --- a/ray-operator/apis/ray/v1/rayservice_types.go +++ b/ray-operator/apis/ray/v1/rayservice_types.go @@ -28,7 +28,7 @@ const ( // During upgrade, NewCluster strategy will create new upgraded cluster and switch to it when it becomes ready NewCluster RayServiceUpgradeType = "NewCluster" // No new cluster will be created while the strategy is set to None - None RayServiceUpgradeType = "None" + RayServiceUpgradeNone RayServiceUpgradeType = "None" ) // These statuses should match Ray Serve's application statuses diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index cd710592d98..ec261a2c7d6 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -378,6 +378,11 @@ func (in *RayClusterList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RayClusterSpec) DeepCopyInto(out *RayClusterSpec) { *out = *in + if in.UpgradeStrategy != nil { + in, out := &in.UpgradeStrategy, &out.UpgradeStrategy + *out = new(RayClusterUpgradeStrategy) + (*in).DeepCopyInto(*out) + } if in.AuthOptions != nil { in, out := &in.AuthOptions, &out.AuthOptions *out = new(AuthOptions) @@ -480,6 +485,26 @@ func (in *RayClusterStatus) DeepCopy() *RayClusterStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RayClusterUpgradeStrategy) DeepCopyInto(out *RayClusterUpgradeStrategy) { + *out = *in + if in.Type != nil { + in, out := &in.Type, &out.Type + *out = new(RayClusterUpgradeType) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RayClusterUpgradeStrategy. +func (in *RayClusterUpgradeStrategy) DeepCopy() *RayClusterUpgradeStrategy { + if in == nil { + return nil + } + out := new(RayClusterUpgradeStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RayJob) DeepCopyInto(out *RayJob) { *out = *in diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index 45f5406c411..e731ca0105f 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -4532,6 +4532,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index 1f4c8432168..b5d7ce86490 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -4614,6 +4614,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index baec5cfe1f3..634b54b59de 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -4512,6 +4512,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index be4a64cdfaf..e9b304aaf14 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -158,11 +158,24 @@ func configureGCSFaultTolerance(podTemplate *corev1.PodTemplateSpec, instance ra } } +func GeneratePodTemplateHash(template corev1.PodTemplateSpec) (string, error) { + originalTemplate := template.DeepCopy() + return utils.GenerateJsonHash(*originalTemplate) +} + // DefaultHeadPodTemplate sets the config values func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, headSpec rayv1.HeadGroupSpec, podName string, headPort string) corev1.PodTemplateSpec { // TODO (Dmitri) The argument headPort is essentially unused; // headPort is passed into setMissingRayStartParams but unused there for the head pod. // To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic. + + // Calculate the pod template hash before any modifications + // This ensures the hash reflects the original user-defined template for upgrade detection + templateHash := "" + if hash, err := GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template); err == nil { + templateHash = hash + } + podTemplate := headSpec.Template if utils.IsDeterministicHeadPodNameEnabled() { podTemplate.Name = podName @@ -173,6 +186,13 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash + } + // Update rayStartParams with top-level Resources for head group. updateRayStartParamsResources(ctx, headSpec.RayStartParams, headSpec.Resources) @@ -296,12 +316,31 @@ func getEnableProbesInjection() bool { // DefaultWorkerPodTemplate sets the config values func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec { + // Calculate the pod template hash before any modifications + // This ensures the hash reflects the original user-defined template for upgrade detection + templateHash := "" + for _, wg := range instance.Spec.WorkerGroupSpecs { + if wg.GroupName == workerSpec.GroupName { + if hash, err := GeneratePodTemplateHash(wg.Template); err == nil { + templateHash = hash + } + break + } + } + podTemplate := workerSpec.Template podTemplate.GenerateName = podName + // Pods created by RayCluster should be restricted to the namespace of the RayCluster. // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash + } // The Ray worker should only start once the GCS server is ready. // only inject init container only when ENABLE_INIT_CONTAINER_INJECTION is true enableInitContainerInjection := getEnableInitContainerInjection() diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 76b21e47527..ab934b6abc7 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -180,6 +180,13 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance return ctrl.Result{}, nil } + if err := utils.ValidateRayClusterUpgradeOptions(instance); err != nil { + logger.Error(err, "The RayCluster UpgradeStrategy is invalid") + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterSpec), + "The RayCluster UpgradeStrategy is invalid %s/%s: %v", instance.Namespace, instance.Name, err) + return ctrl.Result{}, nil + } + if err := utils.ValidateRayClusterStatus(instance); err != nil { logger.Error(err, "The RayCluster status is invalid") r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterStatus), @@ -637,6 +644,21 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv } } + // Check if pods need to be recreated with Recreate upgradeStrategy + if r.shouldRecreatePodsForUpgrade(ctx, instance) { + logger.Info("RayCluster spec changed with Recreate upgradeStrategy, deleting all pods") + if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil { + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePodCollection), + "Failed deleting Pods due to spec change with Recreate upgradeStrategy for RayCluster %s/%s, %v", + instance.Namespace, instance.Name, err) + return errstd.Join(utils.ErrFailedDeleteAllPods, err) + } + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedPod), + "Deleted all Pods for RayCluster %s/%s due to spec change with Recreate upgradeStrategy", + instance.Namespace, instance.Name) + return nil + } + // check if all the pods exist headPods := corev1.PodList{} if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { @@ -1094,6 +1116,73 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context return nil } +// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on PodTemplateSpec changes +func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { + logger := ctrl.LoggerFrom(ctx) + + if instance.Spec.UpgradeStrategy == nil || instance.Spec.UpgradeStrategy.Type == nil { + return false + } + if *instance.Spec.UpgradeStrategy.Type != rayv1.Recreate { + return false + } + + allPods := corev1.PodList{} + if err := r.List(ctx, &allPods, common.RayClusterAllPodsAssociationOptions(instance).ToListOptions()...); err != nil { + logger.Error(err, "Failed to list pods for upgrade check") + return false + } + + if len(allPods.Items) == 0 { + return false + } + + headHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) + if err != nil { + logger.Error(err, "Failed to generate head template hash") + return false + } + + workerHashMap := make(map[string]string) + for _, workerGroup := range instance.Spec.WorkerGroupSpecs { + hash, err := common.GeneratePodTemplateHash(workerGroup.Template) + if err != nil { + logger.Error(err, "Failed to generate worker template hash", "groupName", workerGroup.GroupName) + continue + } + workerHashMap[workerGroup.GroupName] = hash + } + + // Check each pod to see if its template hash matches the current spec + for _, pod := range allPods.Items { + nodeType := pod.Labels[utils.RayNodeTypeLabelKey] + actualHash := pod.Annotations[utils.PodTemplateHashKey] + + var expectedHash string + switch rayv1.RayNodeType(nodeType) { + case rayv1.HeadNode: + expectedHash = headHash + case rayv1.WorkerNode: + groupName := pod.Labels[utils.RayNodeGroupLabelKey] + var ok bool + expectedHash, ok = workerHashMap[groupName] + if !ok { + logger.Info("Worker group not found in spec, skipping pod", "pod", pod.Name, "groupName", groupName) + continue + } + default: + continue + } + + if actualHash != expectedHash { + logger.Info("Pod template has changed, will recreate all pods", + "rayCluster", instance.Name) + return true + } + } + return false +} + // shouldDeletePod returns whether the Pod should be deleted and the reason // // @param pod: The Pod to be checked. diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index 9c1ac017023..901d4cf8165 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -3643,3 +3643,125 @@ func TestReconcile_PodsWithAuthToken(t *testing.T) { assert.True(t, authModeEnvFound, "Auth mode env vars not found") } } + +func TestShouldRecreatePodsForUpgrade(t *testing.T) { + setupTest(t) + ctx := context.Background() + + // Calculate template hashes for matching pods + headHash, err := common.GeneratePodTemplateHash(testRayCluster.Spec.HeadGroupSpec.Template) + require.NoError(t, err, "Failed to generate head template hash") + workerHash, err := common.GeneratePodTemplateHash(testRayCluster.Spec.WorkerGroupSpecs[0].Template) + require.NoError(t, err, "Failed to generate worker template hash") + + // Helper function to create a pod with specific template hash + createPodWithHash := func(name string, nodeType rayv1.RayNodeType, groupName string, templateHash string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespaceStr, + Labels: map[string]string{ + utils.RayNodeLabelKey: "yes", + utils.RayClusterLabelKey: instanceName, + utils.RayNodeTypeLabelKey: string(nodeType), + utils.RayNodeGroupLabelKey: groupName, + }, + Annotations: map[string]string{ + utils.PodTemplateHashKey: templateHash, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "ray-head", Image: "rayproject/ray:latest"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + } + + tests := []struct { + name string + upgradeStrategy *rayv1.RayClusterUpgradeStrategy + pods []runtime.Object + expectedRecreate bool + }{ + { + name: "No upgrade strategy", + upgradeStrategy: nil, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Upgrade strategy type is nil", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{Type: nil}, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Upgrade strategy type is None", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeNone), + }, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Recreate strategy but no pods exist", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{}, + expectedRecreate: false, + }, + { + name: "Recreate strategy with matching template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, workerHash), + }, + expectedRecreate: false, + }, + { + name: "Recreate strategy with mismatched head template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, "old-head-hash"), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, workerHash), + }, + expectedRecreate: true, + }, + { + name: "Recreate strategy with mismatched worker template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, "old-worker-hash"), + }, + expectedRecreate: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cluster := testRayCluster.DeepCopy() + cluster.Spec.UpgradeStrategy = tc.upgradeStrategy + + fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects(tc.pods...).Build() + testRayClusterReconciler := &RayClusterReconciler{ + Client: fakeClient, + Scheme: scheme.Scheme, + Recorder: &record.FakeRecorder{}, + } + + result := testRayClusterReconciler.shouldRecreatePodsForUpgrade(ctx, cluster) + assert.Equal(t, tc.expectedRecreate, result) + }) + } +} diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 4818fbd6729..e40c4bd554b 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -1262,20 +1262,20 @@ func TestIsZeroDowntimeUpgradeEnabled(t *testing.T) { expected: true, }, { - name: "upgrade strategy is set to None, and env var is not set", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is not set", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "", expected: false, }, { - name: "upgrade strategy is set to None, and env var is set to true", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is set to true", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "true", expected: false, }, { - name: "upgrade strategy is set to None, and env var is set to false", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is set to false", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "false", expected: false, }, diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 3e0e86d1096..6980306d463 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -26,6 +26,7 @@ const ( HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete" NumWorkerGroupsKey = "ray.io/num-worker-groups" KubeRayVersion = "ray.io/kuberay-version" + PodTemplateHashKey = "ray.io/pod-template-hash" // Labels for feature RayMultihostIndexing // diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 1bcda9af903..9c5b8170031 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -37,6 +37,19 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { return nil } +func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { + strategy := instance.Spec.UpgradeStrategy + if strategy == nil || strategy.Type == nil || *strategy.Type == rayv1.RayClusterUpgradeNone { + return nil + } + + creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) + if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { + return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) + } + return nil +} + // validateRayGroupResources checks for conflicting resource definitions. func validateRayGroupResources(groupName string, rayStartParams, resources map[string]string) error { hasRayStartResources := rayStartParams["num-cpus"] != "" || @@ -369,10 +382,10 @@ func ValidateRayServiceSpec(rayService *rayv1.RayService) error { // only NewClusterWithIncrementalUpgrade, NewCluster, and None are valid upgradeType if rayService.Spec.UpgradeStrategy != nil && rayService.Spec.UpgradeStrategy.Type != nil && - *rayService.Spec.UpgradeStrategy.Type != rayv1.None && + *rayService.Spec.UpgradeStrategy.Type != rayv1.RayServiceUpgradeNone && *rayService.Spec.UpgradeStrategy.Type != rayv1.NewCluster && *rayService.Spec.UpgradeStrategy.Type != rayv1.NewClusterWithIncrementalUpgrade { - return fmt.Errorf("The RayService spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.NewClusterWithIncrementalUpgrade, rayv1.NewCluster, rayv1.None) + return fmt.Errorf("The RayService spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.NewClusterWithIncrementalUpgrade, rayv1.NewCluster, rayv1.RayServiceUpgradeNone) } if rayService.Spec.RayClusterDeletionDelaySeconds != nil && diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index 014d0917d68..c130a789ba3 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -1882,3 +1882,92 @@ func TestValidateClusterUpgradeOptions(t *testing.T) { }) } } + +func TestValidateRayClusterUpgradeOptions(t *testing.T) { + tests := []struct { + upgradeStrategy *rayv1.RayClusterUpgradeStrategy + name string + originatedFromCRD string + errorMessage string + expectError bool + }{ + { + name: "No upgrade strategy", + upgradeStrategy: nil, + expectError: false, + }, + { + name: "Upgrade strategy set None for RayCluster", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeNone), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: false, + }, + { + name: "Upgrade strategy set Recreate for RayCluster", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: false, + }, + { + name: "Upgrade strategy set Recreate for RayCluster created by RayJob", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + originatedFromCRD: string(RayJobCRD), + expectError: true, + errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayJob", + }, + { + name: "Upgrade strategy set Recreate for RayCluster created by RayService", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + originatedFromCRD: string(RayServiceCRD), + expectError: true, + errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayService", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cluster := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + Labels: map[string]string{}, + }, + Spec: rayv1.RayClusterSpec{ + UpgradeStrategy: tt.upgradeStrategy, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "ray-head", Image: "rayproject/ray:latest"}, + }, + }, + }, + }, + }, + } + + if tt.originatedFromCRD != "" { + cluster.Labels[RayOriginatedFromCRDLabelKey] = tt.originatedFromCRD + } + + err := ValidateRayClusterUpgradeOptions(cluster) + + if tt.expectError { + require.Error(t, err, "Expected error for test case: %s", tt.name) + if tt.errorMessage != "" { + assert.Contains(t, err.Error(), tt.errorMessage, "Error message mismatch for test case: %s", tt.name) + } + } else { + require.NoError(t, err, "Unexpected error for test case: %s", tt.name) + } + }) + } +} diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go index 14cb592037f..144ff1812d5 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go @@ -5,16 +5,17 @@ package v1 // RayClusterSpecApplyConfiguration represents a declarative configuration of the RayClusterSpec type for use // with apply. type RayClusterSpecApplyConfiguration struct { - AuthOptions *AuthOptionsApplyConfiguration `json:"authOptions,omitempty"` - Suspend *bool `json:"suspend,omitempty"` - ManagedBy *string `json:"managedBy,omitempty"` - AutoscalerOptions *AutoscalerOptionsApplyConfiguration `json:"autoscalerOptions,omitempty"` - HeadServiceAnnotations map[string]string `json:"headServiceAnnotations,omitempty"` - EnableInTreeAutoscaling *bool `json:"enableInTreeAutoscaling,omitempty"` - GcsFaultToleranceOptions *GcsFaultToleranceOptionsApplyConfiguration `json:"gcsFaultToleranceOptions,omitempty"` - HeadGroupSpec *HeadGroupSpecApplyConfiguration `json:"headGroupSpec,omitempty"` - RayVersion *string `json:"rayVersion,omitempty"` - WorkerGroupSpecs []WorkerGroupSpecApplyConfiguration `json:"workerGroupSpecs,omitempty"` + UpgradeStrategy *RayClusterUpgradeStrategyApplyConfiguration `json:"upgradeStrategy,omitempty"` + AuthOptions *AuthOptionsApplyConfiguration `json:"authOptions,omitempty"` + Suspend *bool `json:"suspend,omitempty"` + ManagedBy *string `json:"managedBy,omitempty"` + AutoscalerOptions *AutoscalerOptionsApplyConfiguration `json:"autoscalerOptions,omitempty"` + HeadServiceAnnotations map[string]string `json:"headServiceAnnotations,omitempty"` + EnableInTreeAutoscaling *bool `json:"enableInTreeAutoscaling,omitempty"` + GcsFaultToleranceOptions *GcsFaultToleranceOptionsApplyConfiguration `json:"gcsFaultToleranceOptions,omitempty"` + HeadGroupSpec *HeadGroupSpecApplyConfiguration `json:"headGroupSpec,omitempty"` + RayVersion *string `json:"rayVersion,omitempty"` + WorkerGroupSpecs []WorkerGroupSpecApplyConfiguration `json:"workerGroupSpecs,omitempty"` } // RayClusterSpecApplyConfiguration constructs a declarative configuration of the RayClusterSpec type for use with @@ -23,6 +24,14 @@ func RayClusterSpec() *RayClusterSpecApplyConfiguration { return &RayClusterSpecApplyConfiguration{} } +// WithUpgradeStrategy sets the UpgradeStrategy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UpgradeStrategy field is set to the value of the last call. +func (b *RayClusterSpecApplyConfiguration) WithUpgradeStrategy(value *RayClusterUpgradeStrategyApplyConfiguration) *RayClusterSpecApplyConfiguration { + b.UpgradeStrategy = value + return b +} + // WithAuthOptions sets the AuthOptions field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the AuthOptions field is set to the value of the last call. diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go new file mode 100644 index 00000000000..f1a1aa39064 --- /dev/null +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go @@ -0,0 +1,27 @@ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1 + +import ( + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" +) + +// RayClusterUpgradeStrategyApplyConfiguration represents a declarative configuration of the RayClusterUpgradeStrategy type for use +// with apply. +type RayClusterUpgradeStrategyApplyConfiguration struct { + Type *rayv1.RayClusterUpgradeType `json:"type,omitempty"` +} + +// RayClusterUpgradeStrategyApplyConfiguration constructs a declarative configuration of the RayClusterUpgradeStrategy type for use with +// apply. +func RayClusterUpgradeStrategy() *RayClusterUpgradeStrategyApplyConfiguration { + return &RayClusterUpgradeStrategyApplyConfiguration{} +} + +// WithType sets the Type field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Type field is set to the value of the last call. +func (b *RayClusterUpgradeStrategyApplyConfiguration) WithType(value rayv1.RayClusterUpgradeType) *RayClusterUpgradeStrategyApplyConfiguration { + b.Type = &value + return b +} diff --git a/ray-operator/pkg/client/applyconfiguration/utils.go b/ray-operator/pkg/client/applyconfiguration/utils.go index 9ad6513e0c8..16a92835cf5 100644 --- a/ray-operator/pkg/client/applyconfiguration/utils.go +++ b/ray-operator/pkg/client/applyconfiguration/utils.go @@ -44,6 +44,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &rayv1.RayClusterSpecApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayClusterStatus"): return &rayv1.RayClusterStatusApplyConfiguration{} + case v1.SchemeGroupVersion.WithKind("RayClusterUpgradeStrategy"): + return &rayv1.RayClusterUpgradeStrategyApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJob"): return &rayv1.RayJobApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJobSpec"): diff --git a/ray-operator/test/e2e/raycluster_test.go b/ray-operator/test/e2e/raycluster_test.go index 3c26221c46b..719ee651f32 100644 --- a/ray-operator/test/e2e/raycluster_test.go +++ b/ray-operator/test/e2e/raycluster_test.go @@ -208,3 +208,60 @@ func TestRayClusterScalingDown(t *testing.T) { g.Expect(err).NotTo(HaveOccurred(), "Failed to remove finalizer from pod %s/%s", namespace.Name, pod.Name) } } + +func TestRayClusterUpgradeStrategy(t *testing.T) { + test := With(t) + g := NewWithT(t) + + namespace := test.NewTestNamespace() + + rayClusterAC := rayv1ac.RayCluster("raycluster-upgrade-recreate", namespace.Name).WithSpec(NewRayClusterSpec()) + rayClusterAC.Spec.UpgradeStrategy = rayv1ac.RayClusterUpgradeStrategy().WithType(rayv1.Recreate) + + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayCluster %s/%s successfully", namespace.Name, rayCluster.Name) + + LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", namespace.Name, rayCluster.Name) + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + initialHeadPodName := headPod.Name + + workerPods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(workerPods).To(HaveLen(1)) + + LogWithTimestamp(test.T(), "Updating RayCluster %s/%s pod template to trigger upgrade", rayCluster.Namespace, rayCluster.Name) + // Update head pod template spec to trigger pod template hash change + // Add an annotation to change the pod spec + rayClusterAC.Spec.HeadGroupSpec.Template. + WithAnnotations(map[string]string{"upgrade-trigger": "test"}) + rayCluster, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Updated RayCluster pod template") + + LogWithTimestamp(test.T(), "Waiting for new head pod to be running after recreate") + g.Eventually(func() bool { + newHeadPod, err := GetHeadPod(test, rayCluster) + if err != nil { + return false + } + return newHeadPod.Name != initialHeadPodName && newHeadPod.Status.Phase == corev1.PodRunning + }, TestTimeoutMedium).Should(BeTrue()) + + // Wait for cluster to become ready + LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready again", rayCluster.Namespace, rayCluster.Name) + g.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + newHeadPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(newHeadPod.Name).NotTo(Equal(initialHeadPodName)) + + newWorkerPods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(newWorkerPods).To(HaveLen(1)) +} diff --git a/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go b/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go index e38da613bda..461d1c270e8 100644 --- a/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go +++ b/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go @@ -182,7 +182,7 @@ func TestUpdateServeConfigAndRayClusterSpecWithUpgradeDisabled(t *testing.T) { rayServiceName := "rayservice-sample" rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).WithSpec(RayServiceSampleYamlApplyConfiguration(). - WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy().WithType(rayv1.None))) + WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy().WithType(rayv1.RayServiceUpgradeNone))) rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) g.Expect(err).NotTo(HaveOccurred())