Skip to content

Commit

Permalink
[RayCluster] Make headpod name deterministic (#3028)
Browse files Browse the repository at this point in the history
  • Loading branch information
owenowenisme authored Feb 14, 2025
1 parent a860884 commit b9d8b1a
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 36 deletions.
6 changes: 3 additions & 3 deletions kubectl-plugin/test/e2e/kubectl_ray_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var _ = Describe("Calling ray plugin `log` command on Ray Cluster", func() {

It("succeed in retrieving all ray cluster logs", func() {
expectedDirPath := "./raycluster-kuberay"
expectedOutputStringFormat := `No output directory specified, creating dir under current directory using resource name\.\nCommand set to retrieve both head and worker node logs\.\nDownloading log for Ray Node raycluster-kuberay-head-\w+\nDownloading log for Ray Node raycluster-kuberay-workergroup-worker-\w+`
expectedOutputStringFormat := `No output directory specified, creating dir under current directory using resource name\.\nCommand set to retrieve both head and worker node logs\.\nDownloading log for Ray Node raycluster-kuberay-head\nDownloading log for Ray Node raycluster-kuberay-workergroup-worker-\w+`

cmd := exec.Command("kubectl", "ray", "log", "--namespace", namespace, "raycluster-kuberay", "--node-type", "all")
output, err := cmd.CombinedOutput()
Expand Down Expand Up @@ -84,7 +84,7 @@ var _ = Describe("Calling ray plugin `log` command on Ray Cluster", func() {

It("succeed in retrieving ray cluster head logs", func() {
expectedDirPath := "./raycluster-kuberay"
expectedOutputStringFormat := `No output directory specified, creating dir under current directory using resource name\.\nCommand set to retrieve only head node logs\.\nDownloading log for Ray Node raycluster-kuberay-head-\w+`
expectedOutputStringFormat := `No output directory specified, creating dir under current directory using resource name\.\nCommand set to retrieve only head node logs\.\nDownloading log for Ray Node raycluster-kuberay-head`

cmd := exec.Command("kubectl", "ray", "log", "--namespace", namespace, "raycluster-kuberay", "--node-type", "head")
output, err := cmd.CombinedOutput()
Expand Down Expand Up @@ -191,7 +191,7 @@ var _ = Describe("Calling ray plugin `log` command on Ray Cluster", func() {

It("succeed in retrieving ray cluster logs within designated directory", func() {
expectedDirPath := "./temporary-directory"
expectedOutputStringFormat := `Command set to retrieve both head and worker node logs\.\nDownloading log for Ray Node raycluster-kuberay-head-\w+\nDownloading log for Ray Node raycluster-kuberay-workergroup-worker-\w+`
expectedOutputStringFormat := `Command set to retrieve both head and worker node logs\.\nDownloading log for Ray Node raycluster-kuberay-head\nDownloading log for Ray Node raycluster-kuberay-workergroup-worker-\w+`

err := os.MkdirAll(expectedDirPath, 0o755)
Expect(err).NotTo(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head
// headPort is passed into setMissingRayStartParams but unused there for the head pod.
// To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic.
podTemplate := headSpec.Template
podTemplate.GenerateName = podName
podTemplate.Name = podName
// Pods created by RayCluster should be restricted to the namespace of the RayCluster.
// This ensures privilege of KubeRay users are contained within the namespace of the RayCluster.
podTemplate.ObjectMeta.Namespace = instance.Namespace
Expand Down
32 changes: 13 additions & 19 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,24 +733,18 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return errstd.Join(utils.ErrFailedCreateHeadPod, err)
}
common.SuccessfulClustersCounterInc(instance.Namespace)
} else if len(headPods.Items) > 1 {
logger.Info("reconcilePods: Found more than one head Pods; deleting extra head Pods.", "nHeadPods", len(headPods.Items))
// TODO (kevin85421): In-place update may not be a good idea.
itemLength := len(headPods.Items)
for index := 0; index < itemLength; index++ {
if headPods.Items[index].Status.Phase == corev1.PodRunning || headPods.Items[index].Status.Phase == corev1.PodPending {
headPods.Items[index] = headPods.Items[len(headPods.Items)-1] // Replace healthy pod at index i with the last element from the list of pods to delete.
headPods.Items = headPods.Items[:len(headPods.Items)-1] // Truncate slice.
itemLength--
}
}
// delete all the extra head pod pods
for _, extraHeadPodToDelete := range headPods.Items {
if err := r.Delete(ctx, &extraHeadPodToDelete); err != nil {
return errstd.Join(utils.ErrFailedDeleteHeadPod, err)
}
r.rayClusterScaleExpectation.ExpectScalePod(extraHeadPodToDelete.Namespace, instance.Name, expectations.HeadGroup, extraHeadPodToDelete.Name, expectations.Delete)
} else if len(headPods.Items) > 1 { // This should never happen. This protects against the case that users manually create headpod.
correctHeadPodName := instance.Name + "-head"
headPodNames := make([]string, len(headPods.Items))
for i, pod := range headPods.Items {
headPodNames[i] = pod.Name
}

logger.Info("Multiple head pods found, it should only exist one head pod. Please delete extra head pods.",
"found pods", headPodNames,
"should only leave", correctHeadPodName,
)
return fmt.Errorf("%d head pods found %v. Please delete extra head pods and leave only the head pod with name %s", len(headPods.Items), headPodNames, correctHeadPodName)
}

// Reconcile worker pods now
Expand Down Expand Up @@ -1092,7 +1086,7 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray
// Build head instance pod(s).
func (r *RayClusterReconciler) buildHeadPod(ctx context.Context, instance rayv1.RayCluster) corev1.Pod {
logger := ctrl.LoggerFrom(ctx)
podName := utils.PodGenerateName(instance.Name, rayv1.HeadNode)
podName := utils.PodName(instance.Name, rayv1.HeadNode, false)
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, instance, instance.Namespace) // Fully Qualified Domain Name
// The Ray head port used by workers to connect to the cluster (GCS server port for Ray >= 1.11.0, Redis port for older Ray.)
headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams)
Expand All @@ -1119,7 +1113,7 @@ func getCreatorCRDType(instance rayv1.RayCluster) utils.CRDType {
// Build worker instance pods.
func (r *RayClusterReconciler) buildWorkerPod(ctx context.Context, instance rayv1.RayCluster, worker rayv1.WorkerGroupSpec) corev1.Pod {
logger := ctrl.LoggerFrom(ctx)
podName := utils.PodGenerateName(fmt.Sprintf("%s-%s", instance.Name, worker.GroupName), rayv1.WorkerNode)
podName := utils.PodName(fmt.Sprintf("%s-%s", instance.Name, worker.GroupName), rayv1.WorkerNode, true)
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, instance, instance.Namespace) // Fully Qualified Domain Name

// The Ray head port used by workers to connect to the cluster (GCS server port for Ray >= 1.11.0, Redis port for older Ray.)
Expand Down
12 changes: 8 additions & 4 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func CheckRouteName(ctx context.Context, s string, n string) string {
return CheckName(s)
}

// PodGenerateName returns the value that should be used for a Pod's generateName
// PodName returns the value that should be used for a Pod's Name or GenerateName
// based on the RayCluster name and node type (head or worker).
func PodGenerateName(prefix string, nodeType rayv1.RayNodeType) string {
maxPrefixLength := 50 // 63 - (max(8,6) + 5 ) // 6 to 8 char are consumed at the end with "-head-" or -worker- + 5 generated.
func PodName(prefix string, nodeType rayv1.RayNodeType, isGenerateName bool) string {
maxPrefixLength := 50 // 63 - ( 8 + 5 ) // 8 char are consumed at the end with "-worker-" + 5 generated.

var podPrefix string
if len(prefix) <= maxPrefixLength {
Expand All @@ -177,7 +177,11 @@ func PodGenerateName(prefix string, nodeType rayv1.RayNodeType) string {
podPrefix = prefix[:maxPrefixLength]
}

return strings.ToLower(podPrefix + DashSymbol + string(nodeType) + DashSymbol)
result := strings.ToLower(podPrefix + DashSymbol + string(nodeType))
if isGenerateName {
result += DashSymbol
}
return result
}

// CheckName makes sure the name does not start with a numeric value and the total length is < 63 char
Expand Down
11 changes: 6 additions & 5 deletions ray-operator/controllers/ray/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestCheckAllPodsRunning(t *testing.T) {
}
}

func TestPodGenerateName(t *testing.T) {
func TestPodName(t *testing.T) {
tests := []struct {
name string
prefix string
Expand All @@ -114,7 +114,7 @@ func TestPodGenerateName(t *testing.T) {
name: "short cluster name, head pod",
prefix: "ray-cluster-01",
nodeType: rayv1.HeadNode,
expected: "ray-cluster-01-head-",
expected: "ray-cluster-01-head",
},
{
name: "short cluster name, worker pod",
Expand All @@ -126,7 +126,7 @@ func TestPodGenerateName(t *testing.T) {
name: "long cluster name, head pod",
prefix: "ray-cluster-0000000000000000000000011111111122222233333333333333",
nodeType: rayv1.HeadNode,
expected: "ray-cluster-00000000000000000000000111111111222222-head-",
expected: "ray-cluster-00000000000000000000000111111111222222-head",
},
{
name: "long cluster name, worker pod",
Expand All @@ -138,11 +138,12 @@ func TestPodGenerateName(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
str := PodGenerateName(test.prefix, test.nodeType)
isPodNameGenerated := test.nodeType == rayv1.WorkerNode // HeadPod name is now fixed
str := PodName(test.prefix, test.nodeType, isPodNameGenerated)
if str != test.expected {
t.Logf("expected: %q", test.expected)
t.Logf("actual: %q", str)
t.Error("PodGenerateName returned an unexpected string")
t.Error("PodName returned an unexpected string")
}

// 63 (max pod name length) - 5 random hexadecimal characters from generateName
Expand Down
8 changes: 5 additions & 3 deletions ray-operator/test/e2e/raycluster_gcs_ft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,16 @@ func TestRayClusterGCSFaultTolerence(t *testing.T) {
err = test.Client().Core().CoreV1().Pods(namespace.Name).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())

testPodNameChanged := func(p *corev1.Pod) bool { return p.Name != headPod.Name }
PodUID := func(p *corev1.Pod) string { return string(p.UID) }
g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
Should(WithTransform(testPodNameChanged, Equal(true)))
ShouldNot(WithTransform(PodUID, Equal(string(headPod.UID)))) // Use UID to check if the new head pod is created.

g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
Should(WithTransform(PodState, Equal("Running")))

headPod, _ = GetHeadPod(test, rayCluster)
headPod, err = GetHeadPod(test, rayCluster) // Replace the old head pod
g.Expect(err).NotTo(HaveOccurred())

expectedOutput = "4"

ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "samples/test_detached_actor_2.py", rayNamespace, expectedOutput})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestRedeployRayServe(t *testing.T) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(endpoints.Subsets).To(HaveLen(1))
g.Expect(endpoints.Subsets[0].Addresses).To(HaveLen(1))
g.Expect(endpoints.Subsets[0].Addresses[0].TargetRef.Name).NotTo(Equal(oldHeadPod.Name))
g.Expect(endpoints.Subsets[0].Addresses[0].TargetRef.UID).NotTo(Equal(oldHeadPod.UID))
}, TestTimeoutMedium).Should(Succeed())

LogWithTimestamp(test.T(), "Waiting for RayService %s/%s to running", rayService.Namespace, rayService.Name)
Expand Down

0 comments on commit b9d8b1a

Please sign in to comment.