Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ _Appears in:_

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `upgradeStrategy` _[RayClusterUpgradeStrategy](#rayclusterupgradestrategy)_ | UpgradeStrategy defines the scaling policy used when upgrading the RayCluster | | |
| `authOptions` _[AuthOptions](#authoptions)_ | AuthOptions specifies the authentication options for the RayCluster. | | |
| `suspend` _boolean_ | Suspend indicates whether a RayCluster should be suspended.<br />A suspended RayCluster will have head pods and worker pods deleted. | | |
| `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayCluster.<br />The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.<br />The kuberay-operator reconciles a RayCluster which doesn't have this field at all or<br />the field value is the reserved string 'ray.io/kuberay-operator',<br />but delegates reconciling the RayCluster with 'kueue.x-k8s.io/multikueue' to the Kueue.<br />The field is immutable. | | |
Expand All @@ -309,6 +310,35 @@ _Appears in:_
| `workerGroupSpecs` _[WorkerGroupSpec](#workergroupspec) array_ | WorkerGroupSpecs are the specs for the worker pods | | |


#### RayClusterUpgradeStrategy







_Appears in:_
- [RayClusterSpec](#rayclusterspec)

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `type` _[RayClusterUpgradeType](#rayclusterupgradetype)_ | Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. | | |


#### RayClusterUpgradeType

_Underlying type:_ _string_





_Appears in:_
- [RayClusterUpgradeStrategy](#rayclusterupgradestrategy)



#### RayJob


Expand Down
5 changes: 5 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (

// RayClusterSpec defines the desired state of RayCluster
type RayClusterSpec struct {
// UpgradeStrategy defines the scaling policy used when upgrading the RayCluster
// +optional
UpgradeStrategy *RayClusterUpgradeStrategy `json:"upgradeStrategy,omitempty"`
// AuthOptions specifies the authentication options for the RayCluster.
// +optional
AuthOptions *AuthOptions `json:"authOptions,omitempty"`
Expand Down Expand Up @@ -49,6 +52,21 @@ type RayClusterSpec struct {
WorkerGroupSpecs []WorkerGroupSpec `json:"workerGroupSpecs,omitempty"`
}

type RayClusterUpgradeType string

const (
// During upgrade, Recreate strategy will delete all existing pods before creating new ones
Recreate RayClusterUpgradeType = "Recreate"
// No new pod will be created while the strategy is set to None
RayClusterUpgradeNone RayClusterUpgradeType = "None"
)

type RayClusterUpgradeStrategy struct {
// Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`.
// +optional
Type *RayClusterUpgradeType `json:"type,omitempty"`
}

// AuthMode describes the authentication mode for the Ray cluster.
type AuthMode string

Expand Down
2 changes: 1 addition & 1 deletion ray-operator/apis/ray/v1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
// During upgrade, NewCluster strategy will create new upgraded cluster and switch to it when it becomes ready
NewCluster RayServiceUpgradeType = "NewCluster"
// No new cluster will be created while the strategy is set to None
None RayServiceUpgradeType = "None"
RayServiceUpgradeNone RayServiceUpgradeType = "None"
)

// These statuses should match Ray Serve's application statuses
Expand Down
25 changes: 25 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,24 @@ func configureGCSFaultTolerance(podTemplate *corev1.PodTemplateSpec, instance ra
}
}

func GeneratePodTemplateHash(template corev1.PodTemplateSpec) (string, error) {
originalTemplate := template.DeepCopy()
return utils.GenerateJsonHash(*originalTemplate)
}

// DefaultHeadPodTemplate sets the config values
func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, headSpec rayv1.HeadGroupSpec, podName string, headPort string) corev1.PodTemplateSpec {
// TODO (Dmitri) The argument headPort is essentially unused;
// headPort is passed into setMissingRayStartParams but unused there for the head pod.
// To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic.

// Calculate the pod template hash before any modifications
// This ensures the hash reflects the original user-defined template for upgrade detection
templateHash := ""
if hash, err := GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template); err == nil {
templateHash = hash
}
Copy link
Contributor

@400Ping 400Ping Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging the error and returning it up the call stack so that reconciliation can be retried.


podTemplate := headSpec.Template
if utils.IsDeterministicHeadPodNameEnabled() {
podTemplate.Name = podName
Expand All @@ -173,6 +186,13 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head
// This ensures privilege of KubeRay users are contained within the namespace of the RayCluster.
podTemplate.ObjectMeta.Namespace = instance.Namespace

if templateHash != "" {
if podTemplate.Annotations == nil {
podTemplate.Annotations = make(map[string]string)
}
podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash
}

// Update rayStartParams with top-level Resources for head group.
updateRayStartParamsResources(ctx, headSpec.RayStartParams, headSpec.Resources)

Expand Down Expand Up @@ -296,12 +316,31 @@ func getEnableProbesInjection() bool {

// DefaultWorkerPodTemplate sets the config values
func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec {
// Calculate the pod template hash before any modifications
// This ensures the hash reflects the original user-defined template for upgrade detection
templateHash := ""
for _, wg := range instance.Spec.WorkerGroupSpecs {
if wg.GroupName == workerSpec.GroupName {
if hash, err := GeneratePodTemplateHash(wg.Template); err == nil {
templateHash = hash
}
break
}
}

podTemplate := workerSpec.Template
podTemplate.GenerateName = podName

// Pods created by RayCluster should be restricted to the namespace of the RayCluster.
// This ensures privilege of KubeRay users are contained within the namespace of the RayCluster.
podTemplate.ObjectMeta.Namespace = instance.Namespace

if templateHash != "" {
if podTemplate.Annotations == nil {
podTemplate.Annotations = make(map[string]string)
}
podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash
}
// The Ray worker should only start once the GCS server is ready.
// only inject init container only when ENABLE_INIT_CONTAINER_INJECTION is true
enableInitContainerInjection := getEnableInitContainerInjection()
Expand Down
89 changes: 89 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance
return ctrl.Result{}, nil
}

if err := utils.ValidateRayClusterUpgradeOptions(instance); err != nil {
logger.Error(err, "The RayCluster UpgradeStrategy is invalid")
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterSpec),
"The RayCluster UpgradeStrategy is invalid %s/%s: %v", instance.Namespace, instance.Name, err)
return ctrl.Result{}, nil
}

if err := utils.ValidateRayClusterStatus(instance); err != nil {
logger.Error(err, "The RayCluster status is invalid")
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterStatus),
Expand Down Expand Up @@ -637,6 +644,21 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
}
}

// Check if pods need to be recreated with Recreate upgradeStrategy
if r.shouldRecreatePodsForUpgrade(ctx, instance) {
logger.Info("RayCluster spec changed with Recreate upgradeStrategy, deleting all pods")
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePodCollection),
"Failed deleting Pods due to spec change with Recreate upgradeStrategy for RayCluster %s/%s, %v",
instance.Namespace, instance.Name, err)
return errstd.Join(utils.ErrFailedDeleteAllPods, err)
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedPod),
"Deleted all Pods for RayCluster %s/%s due to spec change with Recreate upgradeStrategy",
instance.Namespace, instance.Name)
return nil
}

// check if all the pods exist
headPods := corev1.PodList{}
if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil {
Expand Down Expand Up @@ -1094,6 +1116,73 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context
return nil
}

// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on PodTemplateSpec changes
func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool {
logger := ctrl.LoggerFrom(ctx)

if instance.Spec.UpgradeStrategy == nil || instance.Spec.UpgradeStrategy.Type == nil {
return false
}
if *instance.Spec.UpgradeStrategy.Type != rayv1.Recreate {
return false
}

allPods := corev1.PodList{}
if err := r.List(ctx, &allPods, common.RayClusterAllPodsAssociationOptions(instance).ToListOptions()...); err != nil {
logger.Error(err, "Failed to list pods for upgrade check")
return false
}

if len(allPods.Items) == 0 {
return false
}

headHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template)
if err != nil {
logger.Error(err, "Failed to generate head template hash")
return false
}

workerHashMap := make(map[string]string)
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
hash, err := common.GeneratePodTemplateHash(workerGroup.Template)
if err != nil {
logger.Error(err, "Failed to generate worker template hash", "groupName", workerGroup.GroupName)
continue
}
workerHashMap[workerGroup.GroupName] = hash
}

// Check each pod to see if its template hash matches the current spec
for _, pod := range allPods.Items {
nodeType := pod.Labels[utils.RayNodeTypeLabelKey]
actualHash := pod.Annotations[utils.PodTemplateHashKey]

var expectedHash string
switch rayv1.RayNodeType(nodeType) {
case rayv1.HeadNode:
expectedHash = headHash
case rayv1.WorkerNode:
groupName := pod.Labels[utils.RayNodeGroupLabelKey]
var ok bool
expectedHash, ok = workerHashMap[groupName]
if !ok {
logger.Info("Worker group not found in spec, skipping pod", "pod", pod.Name, "groupName", groupName)
continue
}
default:
continue
}

if actualHash != expectedHash {
logger.Info("Pod template has changed, will recreate all pods",
"rayCluster", instance.Name)
return true
}
}
return false
}

// shouldDeletePod returns whether the Pod should be deleted and the reason
//
// @param pod: The Pod to be checked.
Expand Down
Loading
Loading