From bf8776437e11512e7898da42e933a60121f8196e Mon Sep 17 00:00:00 2001 From: win5923 Date: Tue, 18 Nov 2025 15:53:35 +0000 Subject: [PATCH 1/5] [Feature] Support recreate pods for RayCluster using RayClusterSpec Signed-off-by: win5923 --- docs/reference/api.md | 30 ++++++ .../crds/ray.io_rayclusters.yaml | 5 + .../kuberay-operator/crds/ray.io_rayjobs.yaml | 5 + .../crds/ray.io_rayservices.yaml | 5 + ray-operator/apis/ray/v1/raycluster_types.go | 18 ++++ ray-operator/apis/ray/v1/rayservice_types.go | 2 +- .../apis/ray/v1/zz_generated.deepcopy.go | 25 +++++ .../config/crd/bases/ray.io_rayclusters.yaml | 5 + .../config/crd/bases/ray.io_rayjobs.yaml | 5 + .../config/crd/bases/ray.io_rayservices.yaml | 5 + ray-operator/controllers/ray/common/pod.go | 39 ++++++++ .../controllers/ray/raycluster_controller.go | 93 +++++++++++++++++++ .../ray/rayservice_controller_unit_test.go | 12 +-- .../controllers/ray/utils/constant.go | 1 + .../controllers/ray/utils/validation.go | 14 ++- .../ray/v1/rayclusterspec.go | 29 ++++-- .../ray/v1/rayclusterupgradestrategy.go | 27 ++++++ .../pkg/client/applyconfiguration/utils.go | 2 + .../rayservice_in_place_update_test.go | 2 +- 19 files changed, 304 insertions(+), 20 deletions(-) create mode 100644 ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go diff --git a/docs/reference/api.md b/docs/reference/api.md index fb6a624645e..2a2c3bbf470 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -297,6 +297,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | +| `upgradeStrategy` _[RayClusterUpgradeStrategy](#rayclusterupgradestrategy)_ | UpgradeStrategy defines the scaling policy used when upgrading the RayCluster | | | | `authOptions` _[AuthOptions](#authoptions)_ | AuthOptions specifies the authentication options for the RayCluster. | | | | `suspend` _boolean_ | Suspend indicates whether a RayCluster should be suspended.
A suspended RayCluster will have head pods and worker pods deleted. | | | | `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayCluster.
The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.
The kuberay-operator reconciles a RayCluster which doesn't have this field at all or
the field value is the reserved string 'ray.io/kuberay-operator',
but delegates reconciling the RayCluster with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable. | | | @@ -309,6 +310,35 @@ _Appears in:_ | `workerGroupSpecs` _[WorkerGroupSpec](#workergroupspec) array_ | WorkerGroupSpecs are the specs for the worker pods | | | +#### RayClusterUpgradeStrategy + + + + + + + +_Appears in:_ +- [RayClusterSpec](#rayclusterspec) + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `type` _[RayClusterUpgradeType](#rayclusterupgradetype)_ | Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. | | | + + +#### RayClusterUpgradeType + +_Underlying type:_ _string_ + + + + + +_Appears in:_ +- [RayClusterUpgradeStrategy](#rayclusterupgradestrategy) + + + #### RayJob diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml index 45f5406c411..e731ca0105f 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml @@ -4532,6 +4532,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index 1f4c8432168..b5d7ce86490 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -4614,6 +4614,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index baec5cfe1f3..634b54b59de 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -4512,6 +4512,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index f67a80ba854..b3113d65fd1 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -11,6 +11,9 @@ import ( // RayClusterSpec defines the desired state of RayCluster type RayClusterSpec struct { + // UpgradeStrategy defines the scaling policy used when upgrading the RayCluster + // +optional + UpgradeStrategy *RayClusterUpgradeStrategy `json:"upgradeStrategy,omitempty"` // AuthOptions specifies the authentication options for the RayCluster. // +optional AuthOptions *AuthOptions `json:"authOptions,omitempty"` @@ -49,6 +52,21 @@ type RayClusterSpec struct { WorkerGroupSpecs []WorkerGroupSpec `json:"workerGroupSpecs,omitempty"` } +type RayClusterUpgradeType string + +const ( + // During upgrade, Recreate strategy will delete all existing pods before creating new ones + Recreate RayClusterUpgradeType = "Recreate" + // No new pod will be created while the strategy is set to None + RayClusterUpgradeNone RayClusterUpgradeType = "None" +) + +type RayClusterUpgradeStrategy struct { + // Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. + // +optional + Type *RayClusterUpgradeType `json:"type,omitempty"` +} + // AuthMode describes the authentication mode for the Ray cluster. type AuthMode string diff --git a/ray-operator/apis/ray/v1/rayservice_types.go b/ray-operator/apis/ray/v1/rayservice_types.go index 83b9775f377..d8b8304a806 100644 --- a/ray-operator/apis/ray/v1/rayservice_types.go +++ b/ray-operator/apis/ray/v1/rayservice_types.go @@ -28,7 +28,7 @@ const ( // During upgrade, NewCluster strategy will create new upgraded cluster and switch to it when it becomes ready NewCluster RayServiceUpgradeType = "NewCluster" // No new cluster will be created while the strategy is set to None - None RayServiceUpgradeType = "None" + RayServiceUpgradeNone RayServiceUpgradeType = "None" ) // These statuses should match Ray Serve's application statuses diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index cd710592d98..ec261a2c7d6 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -378,6 +378,11 @@ func (in *RayClusterList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RayClusterSpec) DeepCopyInto(out *RayClusterSpec) { *out = *in + if in.UpgradeStrategy != nil { + in, out := &in.UpgradeStrategy, &out.UpgradeStrategy + *out = new(RayClusterUpgradeStrategy) + (*in).DeepCopyInto(*out) + } if in.AuthOptions != nil { in, out := &in.AuthOptions, &out.AuthOptions *out = new(AuthOptions) @@ -480,6 +485,26 @@ func (in *RayClusterStatus) DeepCopy() *RayClusterStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RayClusterUpgradeStrategy) DeepCopyInto(out *RayClusterUpgradeStrategy) { + *out = *in + if in.Type != nil { + in, out := &in.Type, &out.Type + *out = new(RayClusterUpgradeType) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RayClusterUpgradeStrategy. +func (in *RayClusterUpgradeStrategy) DeepCopy() *RayClusterUpgradeStrategy { + if in == nil { + return nil + } + out := new(RayClusterUpgradeStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RayJob) DeepCopyInto(out *RayJob) { *out = *in diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index 45f5406c411..e731ca0105f 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -4532,6 +4532,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index 1f4c8432168..b5d7ce86490 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -4614,6 +4614,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index baec5cfe1f3..634b54b59de 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -4512,6 +4512,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index a42f9184912..6df0b750975 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -158,11 +158,24 @@ func configureGCSFaultTolerance(podTemplate *corev1.PodTemplateSpec, instance ra } } +func GeneratePodTemplateHash(template corev1.PodTemplateSpec) (string, error) { + originalTemplate := template.DeepCopy() + return utils.GenerateJsonHash(*originalTemplate) +} + // DefaultHeadPodTemplate sets the config values func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, headSpec rayv1.HeadGroupSpec, podName string, headPort string) corev1.PodTemplateSpec { // TODO (Dmitri) The argument headPort is essentially unused; // headPort is passed into setMissingRayStartParams but unused there for the head pod. // To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic. + + // Calculate the pod template hash before any modifications + // This ensures the hash reflects the original user-defined template for upgrade detection + templateHash := "" + if hash, err := GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template); err == nil { + templateHash = hash + } + podTemplate := headSpec.Template if utils.IsDeterministicHeadPodNameEnabled() { podTemplate.Name = podName @@ -173,6 +186,13 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash + } + // Update rayStartParams with top-level Resources for head group. updateRayStartParamsResources(ctx, headSpec.RayStartParams, headSpec.Resources) @@ -295,12 +315,31 @@ func getEnableProbesInjection() bool { // DefaultWorkerPodTemplate sets the config values func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec { + // Calculate the pod template hash before any modifications + // This ensures the hash reflects the original user-defined template for upgrade detection + templateHash := "" + for _, wg := range instance.Spec.WorkerGroupSpecs { + if wg.GroupName == workerSpec.GroupName { + if hash, err := GeneratePodTemplateHash(wg.Template); err == nil { + templateHash = hash + } + break + } + } + podTemplate := workerSpec.Template podTemplate.GenerateName = podName + // Pods created by RayCluster should be restricted to the namespace of the RayCluster. // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash + } // The Ray worker should only start once the GCS server is ready. // only inject init container only when ENABLE_INIT_CONTAINER_INJECTION is true enableInitContainerInjection := getEnableInitContainerInjection() diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 7d4d8fab8f6..1e95c365343 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -180,6 +180,13 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance return ctrl.Result{}, nil } + if err := utils.ValidateRayClusterUpgradeOptions(instance); err != nil { + logger.Error(err, "The RayCluster UpgradeStrategy is invalid") + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterSpec), + "The RayCluster UpgradeStrategy is invalid %s/%s: %v", instance.Namespace, instance.Name, err) + return ctrl.Result{}, nil + } + if err := utils.ValidateRayClusterStatus(instance); err != nil { logger.Error(err, "The RayCluster status is invalid") r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterStatus), @@ -637,6 +644,21 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv } } + // Check if pods need to be recreated with Recreate upgradeStrategy + if r.shouldRecreatePodsForUpgrade(ctx, instance) { + logger.Info("RayCluster spec changed with Recreate upgradeStrategy, deleting all pods") + if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil { + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePodCollection), + "Failed deleting Pods due to spec change with Recreate upgradeStrategy for RayCluster %s/%s, %v", + instance.Namespace, instance.Name, err) + return errstd.Join(utils.ErrFailedDeleteAllPods, err) + } + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedPod), + "Deleted all Pods for RayCluster %s/%s due to spec change with Recreate upgradeStrategy", + instance.Namespace, instance.Name) + return nil + } + // check if all the pods exist headPods := corev1.PodList{} if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { @@ -1094,6 +1116,77 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context return nil } +// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on Head/Worker pod template changes +func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { + logger := ctrl.LoggerFrom(ctx) + + if instance.Spec.UpgradeStrategy == nil || instance.Spec.UpgradeStrategy.Type == nil { + return false + } + if *instance.Spec.UpgradeStrategy.Type != rayv1.Recreate { + return false + } + + allPods := corev1.PodList{} + if err := r.List(ctx, &allPods, common.RayClusterAllPodsAssociationOptions(instance).ToListOptions()...); err != nil { + logger.Error(err, "Failed to list pods for upgrade check") + return false + } + + if len(allPods.Items) == 0 { + return false + } + + headHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) + if err != nil { + logger.Error(err, "Failed to generate head template hash") + return false + } + + workerHashMap := make(map[string]string) + for _, workerGroup := range instance.Spec.WorkerGroupSpecs { + hash, err := common.GeneratePodTemplateHash(workerGroup.Template) + if err != nil { + logger.Error(err, "Failed to generate worker template hash", "groupName", workerGroup.GroupName) + continue + } + workerHashMap[workerGroup.GroupName] = hash + } + + // Check each pod to see if its template hash matches the current spec + for _, pod := range allPods.Items { + nodeType := pod.Labels[utils.RayNodeTypeLabelKey] + actualHash := pod.Annotations[utils.PodTemplateHashKey] + + var expectedHash string + if nodeType == string(rayv1.HeadNode) { + expectedHash = headHash + } else if nodeType == string(rayv1.WorkerNode) { + groupName := pod.Labels[utils.RayNodeGroupLabelKey] + var ok bool + expectedHash, ok = workerHashMap[groupName] + if !ok { + logger.Info("Worker group not found in spec, skipping pod", "pod", pod.Name, "groupName", groupName) + continue + } + } else { + continue + } + + if actualHash != expectedHash { + logger.Info("Pod template has changed, will recreate all pods", + "pod", pod.Name, + "nodeType", nodeType, + "actualHash", actualHash, + "expectedHash", expectedHash, + "upgradeStrategy", *instance.Spec.UpgradeStrategy.Type) + return true + } + } + + return false +} + // shouldDeletePod returns whether the Pod should be deleted and the reason // // @param pod: The Pod to be checked. diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 4818fbd6729..e40c4bd554b 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -1262,20 +1262,20 @@ func TestIsZeroDowntimeUpgradeEnabled(t *testing.T) { expected: true, }, { - name: "upgrade strategy is set to None, and env var is not set", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is not set", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "", expected: false, }, { - name: "upgrade strategy is set to None, and env var is set to true", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is set to true", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "true", expected: false, }, { - name: "upgrade strategy is set to None, and env var is set to false", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is set to false", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "false", expected: false, }, diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 3e0e86d1096..6980306d463 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -26,6 +26,7 @@ const ( HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete" NumWorkerGroupsKey = "ray.io/num-worker-groups" KubeRayVersion = "ray.io/kuberay-version" + PodTemplateHashKey = "ray.io/pod-template-hash" // Labels for feature RayMultihostIndexing // diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index a3e6da66e80..a9a2b69ac0a 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -37,6 +37,16 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { return nil } +func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { + if instance.Spec.UpgradeStrategy != nil { + creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) + if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { + return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) + } + } + return nil +} + // validateRayGroupResources checks for conflicting resource definitions. func validateRayGroupResources(groupName string, rayStartParams, resources map[string]string) error { hasRayStartResources := rayStartParams["num-cpus"] != "" || @@ -369,10 +379,10 @@ func ValidateRayServiceSpec(rayService *rayv1.RayService) error { // only NewClusterWithIncrementalUpgrade, NewCluster, and None are valid upgradeType if rayService.Spec.UpgradeStrategy != nil && rayService.Spec.UpgradeStrategy.Type != nil && - *rayService.Spec.UpgradeStrategy.Type != rayv1.None && + *rayService.Spec.UpgradeStrategy.Type != rayv1.RayServiceUpgradeNone && *rayService.Spec.UpgradeStrategy.Type != rayv1.NewCluster && *rayService.Spec.UpgradeStrategy.Type != rayv1.NewClusterWithIncrementalUpgrade { - return fmt.Errorf("Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.NewClusterWithIncrementalUpgrade, rayv1.NewCluster, rayv1.None) + return fmt.Errorf("Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.NewClusterWithIncrementalUpgrade, rayv1.NewCluster, rayv1.RayServiceUpgradeNone) } if rayService.Spec.RayClusterDeletionDelaySeconds != nil && diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go index 14cb592037f..144ff1812d5 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go @@ -5,16 +5,17 @@ package v1 // RayClusterSpecApplyConfiguration represents a declarative configuration of the RayClusterSpec type for use // with apply. type RayClusterSpecApplyConfiguration struct { - AuthOptions *AuthOptionsApplyConfiguration `json:"authOptions,omitempty"` - Suspend *bool `json:"suspend,omitempty"` - ManagedBy *string `json:"managedBy,omitempty"` - AutoscalerOptions *AutoscalerOptionsApplyConfiguration `json:"autoscalerOptions,omitempty"` - HeadServiceAnnotations map[string]string `json:"headServiceAnnotations,omitempty"` - EnableInTreeAutoscaling *bool `json:"enableInTreeAutoscaling,omitempty"` - GcsFaultToleranceOptions *GcsFaultToleranceOptionsApplyConfiguration `json:"gcsFaultToleranceOptions,omitempty"` - HeadGroupSpec *HeadGroupSpecApplyConfiguration `json:"headGroupSpec,omitempty"` - RayVersion *string `json:"rayVersion,omitempty"` - WorkerGroupSpecs []WorkerGroupSpecApplyConfiguration `json:"workerGroupSpecs,omitempty"` + UpgradeStrategy *RayClusterUpgradeStrategyApplyConfiguration `json:"upgradeStrategy,omitempty"` + AuthOptions *AuthOptionsApplyConfiguration `json:"authOptions,omitempty"` + Suspend *bool `json:"suspend,omitempty"` + ManagedBy *string `json:"managedBy,omitempty"` + AutoscalerOptions *AutoscalerOptionsApplyConfiguration `json:"autoscalerOptions,omitempty"` + HeadServiceAnnotations map[string]string `json:"headServiceAnnotations,omitempty"` + EnableInTreeAutoscaling *bool `json:"enableInTreeAutoscaling,omitempty"` + GcsFaultToleranceOptions *GcsFaultToleranceOptionsApplyConfiguration `json:"gcsFaultToleranceOptions,omitempty"` + HeadGroupSpec *HeadGroupSpecApplyConfiguration `json:"headGroupSpec,omitempty"` + RayVersion *string `json:"rayVersion,omitempty"` + WorkerGroupSpecs []WorkerGroupSpecApplyConfiguration `json:"workerGroupSpecs,omitempty"` } // RayClusterSpecApplyConfiguration constructs a declarative configuration of the RayClusterSpec type for use with @@ -23,6 +24,14 @@ func RayClusterSpec() *RayClusterSpecApplyConfiguration { return &RayClusterSpecApplyConfiguration{} } +// WithUpgradeStrategy sets the UpgradeStrategy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UpgradeStrategy field is set to the value of the last call. +func (b *RayClusterSpecApplyConfiguration) WithUpgradeStrategy(value *RayClusterUpgradeStrategyApplyConfiguration) *RayClusterSpecApplyConfiguration { + b.UpgradeStrategy = value + return b +} + // WithAuthOptions sets the AuthOptions field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the AuthOptions field is set to the value of the last call. diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go new file mode 100644 index 00000000000..f1a1aa39064 --- /dev/null +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go @@ -0,0 +1,27 @@ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1 + +import ( + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" +) + +// RayClusterUpgradeStrategyApplyConfiguration represents a declarative configuration of the RayClusterUpgradeStrategy type for use +// with apply. +type RayClusterUpgradeStrategyApplyConfiguration struct { + Type *rayv1.RayClusterUpgradeType `json:"type,omitempty"` +} + +// RayClusterUpgradeStrategyApplyConfiguration constructs a declarative configuration of the RayClusterUpgradeStrategy type for use with +// apply. +func RayClusterUpgradeStrategy() *RayClusterUpgradeStrategyApplyConfiguration { + return &RayClusterUpgradeStrategyApplyConfiguration{} +} + +// WithType sets the Type field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Type field is set to the value of the last call. +func (b *RayClusterUpgradeStrategyApplyConfiguration) WithType(value rayv1.RayClusterUpgradeType) *RayClusterUpgradeStrategyApplyConfiguration { + b.Type = &value + return b +} diff --git a/ray-operator/pkg/client/applyconfiguration/utils.go b/ray-operator/pkg/client/applyconfiguration/utils.go index 9ad6513e0c8..16a92835cf5 100644 --- a/ray-operator/pkg/client/applyconfiguration/utils.go +++ b/ray-operator/pkg/client/applyconfiguration/utils.go @@ -44,6 +44,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &rayv1.RayClusterSpecApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayClusterStatus"): return &rayv1.RayClusterStatusApplyConfiguration{} + case v1.SchemeGroupVersion.WithKind("RayClusterUpgradeStrategy"): + return &rayv1.RayClusterUpgradeStrategyApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJob"): return &rayv1.RayJobApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJobSpec"): diff --git a/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go b/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go index e38da613bda..461d1c270e8 100644 --- a/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go +++ b/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go @@ -182,7 +182,7 @@ func TestUpdateServeConfigAndRayClusterSpecWithUpgradeDisabled(t *testing.T) { rayServiceName := "rayservice-sample" rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).WithSpec(RayServiceSampleYamlApplyConfiguration(). - WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy().WithType(rayv1.None))) + WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy().WithType(rayv1.RayServiceUpgradeNone))) rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) g.Expect(err).NotTo(HaveOccurred()) From 9359c5f87b4e409adacd2d84072fb14f1fb260e6 Mon Sep 17 00:00:00 2001 From: win5923 Date: Thu, 20 Nov 2025 14:12:35 +0000 Subject: [PATCH 2/5] Add test Signed-off-by: win5923 --- .../ray/raycluster_controller_unit_test.go | 122 ++++++++++++++++++ .../controllers/ray/utils/validation.go | 13 +- .../controllers/ray/utils/validation_test.go | 89 +++++++++++++ ray-operator/test/e2e/raycluster_test.go | 57 ++++++++ 4 files changed, 276 insertions(+), 5 deletions(-) diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index 9c1ac017023..901d4cf8165 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -3643,3 +3643,125 @@ func TestReconcile_PodsWithAuthToken(t *testing.T) { assert.True(t, authModeEnvFound, "Auth mode env vars not found") } } + +func TestShouldRecreatePodsForUpgrade(t *testing.T) { + setupTest(t) + ctx := context.Background() + + // Calculate template hashes for matching pods + headHash, err := common.GeneratePodTemplateHash(testRayCluster.Spec.HeadGroupSpec.Template) + require.NoError(t, err, "Failed to generate head template hash") + workerHash, err := common.GeneratePodTemplateHash(testRayCluster.Spec.WorkerGroupSpecs[0].Template) + require.NoError(t, err, "Failed to generate worker template hash") + + // Helper function to create a pod with specific template hash + createPodWithHash := func(name string, nodeType rayv1.RayNodeType, groupName string, templateHash string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespaceStr, + Labels: map[string]string{ + utils.RayNodeLabelKey: "yes", + utils.RayClusterLabelKey: instanceName, + utils.RayNodeTypeLabelKey: string(nodeType), + utils.RayNodeGroupLabelKey: groupName, + }, + Annotations: map[string]string{ + utils.PodTemplateHashKey: templateHash, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "ray-head", Image: "rayproject/ray:latest"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + } + + tests := []struct { + name string + upgradeStrategy *rayv1.RayClusterUpgradeStrategy + pods []runtime.Object + expectedRecreate bool + }{ + { + name: "No upgrade strategy", + upgradeStrategy: nil, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Upgrade strategy type is nil", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{Type: nil}, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Upgrade strategy type is None", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeNone), + }, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Recreate strategy but no pods exist", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{}, + expectedRecreate: false, + }, + { + name: "Recreate strategy with matching template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, workerHash), + }, + expectedRecreate: false, + }, + { + name: "Recreate strategy with mismatched head template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, "old-head-hash"), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, workerHash), + }, + expectedRecreate: true, + }, + { + name: "Recreate strategy with mismatched worker template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, "old-worker-hash"), + }, + expectedRecreate: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cluster := testRayCluster.DeepCopy() + cluster.Spec.UpgradeStrategy = tc.upgradeStrategy + + fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects(tc.pods...).Build() + testRayClusterReconciler := &RayClusterReconciler{ + Client: fakeClient, + Scheme: scheme.Scheme, + Recorder: &record.FakeRecorder{}, + } + + result := testRayClusterReconciler.shouldRecreatePodsForUpgrade(ctx, cluster) + assert.Equal(t, tc.expectedRecreate, result) + }) + } +} diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index a9a2b69ac0a..aa8ce262517 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -38,11 +38,14 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { } func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { - if instance.Spec.UpgradeStrategy != nil { - creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) - if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { - return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) - } + strategy := instance.Spec.UpgradeStrategy + if strategy == nil || strategy.Type == nil || *strategy.Type == rayv1.RayClusterUpgradeNone { + return nil + } + + creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) + if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { + return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) } return nil } diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index 494fe39c29c..d95b0f55a4d 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -1887,3 +1887,92 @@ func TestValidateClusterUpgradeOptions(t *testing.T) { }) } } + +func TestValidateRayClusterUpgradeOptions(t *testing.T) { + tests := []struct { + upgradeStrategy *rayv1.RayClusterUpgradeStrategy + name string + originatedFromCRD string + errorMessage string + expectError bool + }{ + { + name: "No upgrade strategy", + upgradeStrategy: nil, + expectError: false, + }, + { + name: "Upgrade strategy set None for RayCluster", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeNone), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: false, + }, + { + name: "Upgrade strategy set Recreate for RayCluster", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: false, + }, + { + name: "Upgrade strategy set Recreate for RayCluster created by RayJob", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + originatedFromCRD: string(RayJobCRD), + expectError: true, + errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayJob", + }, + { + name: "Upgrade strategy set Recreate for RayCluster created by RayService", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + originatedFromCRD: string(RayServiceCRD), + expectError: true, + errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayService", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cluster := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + Labels: map[string]string{}, + }, + Spec: rayv1.RayClusterSpec{ + UpgradeStrategy: tt.upgradeStrategy, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "ray-head", Image: "rayproject/ray:latest"}, + }, + }, + }, + }, + }, + } + + if tt.originatedFromCRD != "" { + cluster.Labels[RayOriginatedFromCRDLabelKey] = tt.originatedFromCRD + } + + err := ValidateRayClusterUpgradeOptions(cluster) + + if tt.expectError { + require.Error(t, err, "Expected error for test case: %s", tt.name) + if tt.errorMessage != "" { + assert.Contains(t, err.Error(), tt.errorMessage, "Error message mismatch for test case: %s", tt.name) + } + } else { + require.NoError(t, err, "Unexpected error for test case: %s", tt.name) + } + }) + } +} diff --git a/ray-operator/test/e2e/raycluster_test.go b/ray-operator/test/e2e/raycluster_test.go index 3c26221c46b..719ee651f32 100644 --- a/ray-operator/test/e2e/raycluster_test.go +++ b/ray-operator/test/e2e/raycluster_test.go @@ -208,3 +208,60 @@ func TestRayClusterScalingDown(t *testing.T) { g.Expect(err).NotTo(HaveOccurred(), "Failed to remove finalizer from pod %s/%s", namespace.Name, pod.Name) } } + +func TestRayClusterUpgradeStrategy(t *testing.T) { + test := With(t) + g := NewWithT(t) + + namespace := test.NewTestNamespace() + + rayClusterAC := rayv1ac.RayCluster("raycluster-upgrade-recreate", namespace.Name).WithSpec(NewRayClusterSpec()) + rayClusterAC.Spec.UpgradeStrategy = rayv1ac.RayClusterUpgradeStrategy().WithType(rayv1.Recreate) + + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayCluster %s/%s successfully", namespace.Name, rayCluster.Name) + + LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", namespace.Name, rayCluster.Name) + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + initialHeadPodName := headPod.Name + + workerPods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(workerPods).To(HaveLen(1)) + + LogWithTimestamp(test.T(), "Updating RayCluster %s/%s pod template to trigger upgrade", rayCluster.Namespace, rayCluster.Name) + // Update head pod template spec to trigger pod template hash change + // Add an annotation to change the pod spec + rayClusterAC.Spec.HeadGroupSpec.Template. + WithAnnotations(map[string]string{"upgrade-trigger": "test"}) + rayCluster, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Updated RayCluster pod template") + + LogWithTimestamp(test.T(), "Waiting for new head pod to be running after recreate") + g.Eventually(func() bool { + newHeadPod, err := GetHeadPod(test, rayCluster) + if err != nil { + return false + } + return newHeadPod.Name != initialHeadPodName && newHeadPod.Status.Phase == corev1.PodRunning + }, TestTimeoutMedium).Should(BeTrue()) + + // Wait for cluster to become ready + LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready again", rayCluster.Namespace, rayCluster.Name) + g.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + newHeadPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(newHeadPod.Name).NotTo(Equal(initialHeadPodName)) + + newWorkerPods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(newWorkerPods).To(HaveLen(1)) +} From acd77391083378684989f1416e16bb2b1f1709e8 Mon Sep 17 00:00:00 2001 From: win5923 Date: Sat, 22 Nov 2025 10:44:47 +0000 Subject: [PATCH 3/5] improve readability Signed-off-by: win5923 --- .../controllers/ray/raycluster_controller.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 1e95c365343..358e75beec5 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -1116,7 +1116,7 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context return nil } -// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on Head/Worker pod template changes +// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on PodTemplateSpec changes func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { logger := ctrl.LoggerFrom(ctx) @@ -1159,9 +1159,10 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, actualHash := pod.Annotations[utils.PodTemplateHashKey] var expectedHash string - if nodeType == string(rayv1.HeadNode) { + switch rayv1.RayNodeType(nodeType) { + case rayv1.HeadNode: expectedHash = headHash - } else if nodeType == string(rayv1.WorkerNode) { + case rayv1.WorkerNode: groupName := pod.Labels[utils.RayNodeGroupLabelKey] var ok bool expectedHash, ok = workerHashMap[groupName] @@ -1169,21 +1170,16 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, logger.Info("Worker group not found in spec, skipping pod", "pod", pod.Name, "groupName", groupName) continue } - } else { + default: continue } if actualHash != expectedHash { logger.Info("Pod template has changed, will recreate all pods", - "pod", pod.Name, - "nodeType", nodeType, - "actualHash", actualHash, - "expectedHash", expectedHash, - "upgradeStrategy", *instance.Spec.UpgradeStrategy.Type) + "rayCluster", instance.Name) return true } } - return false } From 5fb6869ff2094554686f63af5b66b5d5089a3482 Mon Sep 17 00:00:00 2001 From: win5923 Date: Tue, 2 Dec 2025 14:37:29 +0000 Subject: [PATCH 4/5] Remove deepcopy in GeneratePodTemplateHash Signed-off-by: win5923 --- ray-operator/controllers/ray/common/pod.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index e9b304aaf14..25960c8e082 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -159,8 +159,7 @@ func configureGCSFaultTolerance(podTemplate *corev1.PodTemplateSpec, instance ra } func GeneratePodTemplateHash(template corev1.PodTemplateSpec) (string, error) { - originalTemplate := template.DeepCopy() - return utils.GenerateJsonHash(*originalTemplate) + return utils.GenerateJsonHash(template) } // DefaultHeadPodTemplate sets the config values From 808311e608fef769c2e85d83a9d0c94503da405e Mon Sep 17 00:00:00 2001 From: win5923 Date: Tue, 2 Dec 2025 15:07:31 +0000 Subject: [PATCH 5/5] Refactor ValidateRayClusterUpgradeOptions Signed-off-by: win5923 --- ray-operator/controllers/ray/common/pod.go | 6 ++++++ ray-operator/controllers/ray/utils/validation.go | 9 ++++++--- ray-operator/controllers/ray/utils/validation_test.go | 9 +++++++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index 25960c8e082..235a96bb680 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -168,11 +168,14 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // headPort is passed into setMissingRayStartParams but unused there for the head pod. // To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic. + log := ctrl.LoggerFrom(ctx) // Calculate the pod template hash before any modifications // This ensures the hash reflects the original user-defined template for upgrade detection templateHash := "" if hash, err := GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template); err == nil { templateHash = hash + } else { + log.Error(err, "Failed to generate pod template hash for head group") } podTemplate := headSpec.Template @@ -315,6 +318,7 @@ func getEnableProbesInjection() bool { // DefaultWorkerPodTemplate sets the config values func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec { + log := ctrl.LoggerFrom(ctx) // Calculate the pod template hash before any modifications // This ensures the hash reflects the original user-defined template for upgrade detection templateHash := "" @@ -322,6 +326,8 @@ func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, wo if wg.GroupName == workerSpec.GroupName { if hash, err := GeneratePodTemplateHash(wg.Template); err == nil { templateHash = hash + } else { + log.Error(err, "Failed to generate pod template hash for worker group", "groupName", workerSpec.GroupName) } break } diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 9c5b8170031..c7e7a80cd5d 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -38,11 +38,14 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { } func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { - strategy := instance.Spec.UpgradeStrategy - if strategy == nil || strategy.Type == nil || *strategy.Type == rayv1.RayClusterUpgradeNone { - return nil + // only Recreate and None are valid upgradeType + if instance.Spec.UpgradeStrategy != nil && instance.Spec.UpgradeStrategy.Type != nil && + *instance.Spec.UpgradeStrategy.Type != rayv1.Recreate && + *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterUpgradeNone { + return fmt.Errorf("The RayCluster spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s or %s", *instance.Spec.UpgradeStrategy.Type, rayv1.Recreate, rayv1.RayClusterUpgradeNone) } + // only allow UpgradeStrategy to be set when RayCluster is created directly by user creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index c130a789ba3..c1d93bb6063 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -1930,6 +1930,15 @@ func TestValidateRayClusterUpgradeOptions(t *testing.T) { expectError: true, errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayService", }, + { + name: "Invalid upgrade strategy value", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeType("InvalidStrategy")), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: true, + errorMessage: "Spec.UpgradeStrategy.Type value InvalidStrategy is invalid, valid options are Recreate or None", + }, } for _, tt := range tests {