@@ -20,6 +20,7 @@ import (
2020
2121 rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
2222 "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
23+ "github.com/ray-project/kuberay/ray-operator/pkg/features"
2324)
2425
2526var testMemoryLimit = resource .MustParse ("1Gi" )
@@ -681,7 +682,7 @@ func TestBuildPod(t *testing.T) {
681682 worker := cluster .Spec .WorkerGroupSpecs [0 ]
682683 podName = cluster .Name + utils .DashSymbol + string (rayv1 .WorkerNode ) + utils .DashSymbol + worker .GroupName + utils .DashSymbol + utils .FormatInt32 (0 )
683684 fqdnRayIP := utils .GenerateFQDNServiceName (ctx , * cluster , cluster .Namespace )
684- podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" )
685+ podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" , "" , 0 )
685686 pod = BuildPod (ctx , podTemplateSpec , rayv1 .WorkerNode , worker .RayStartParams , "6379" , false , utils .GetCRDType ("" ), fqdnRayIP )
686687
687688 // Check resources
@@ -752,7 +753,7 @@ func TestBuildPod_WithNoCPULimits(t *testing.T) {
752753 worker := cluster .Spec .WorkerGroupSpecs [0 ]
753754 podName = cluster .Name + utils .DashSymbol + string (rayv1 .WorkerNode ) + utils .DashSymbol + worker .GroupName + utils .DashSymbol + utils .FormatInt32 (0 )
754755 fqdnRayIP := utils .GenerateFQDNServiceName (ctx , * cluster , cluster .Namespace )
755- podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" )
756+ podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" , "" , 0 )
756757 pod = BuildPod (ctx , podTemplateSpec , rayv1 .WorkerNode , worker .RayStartParams , "6379" , false , utils .GetCRDType ("" ), fqdnRayIP )
757758 expectedCommandArg = splitAndSort ("ulimit -n 65536; ray start --block --dashboard-agent-listen-port=52365 --memory=1073741824 --num-cpus=2 --num-gpus=3 --address=raycluster-sample-head-svc.default.svc.cluster.local:6379 --port=6379 --metrics-export-port=8080" )
758759 actualCommandArg = splitAndSort (pod .Spec .Containers [0 ].Args [0 ])
@@ -783,7 +784,7 @@ func TestBuildPod_WithOverwriteCommand(t *testing.T) {
783784 worker := cluster .Spec .WorkerGroupSpecs [0 ]
784785 podName = cluster .Name + utils .DashSymbol + string (rayv1 .WorkerNode ) + utils .DashSymbol + worker .GroupName + utils .DashSymbol + utils .FormatInt32 (0 )
785786 fqdnRayIP := utils .GenerateFQDNServiceName (ctx , * cluster , cluster .Namespace )
786- podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" )
787+ podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" , "" , 0 )
787788 workerPod := BuildPod (ctx , podTemplateSpec , rayv1 .WorkerNode , worker .RayStartParams , "6379" , false , utils .GetCRDType ("" ), fqdnRayIP )
788789 workerContainer := workerPod .Spec .Containers [utils .RayContainerIndex ]
789790 assert .Equal (t , []string {"I am worker" }, workerContainer .Command )
@@ -838,7 +839,7 @@ func TestBuildPod_WithCreatedByRayService(t *testing.T) {
838839 worker := cluster .Spec .WorkerGroupSpecs [0 ]
839840 podName = cluster .Name + utils .DashSymbol + string (rayv1 .WorkerNode ) + utils .DashSymbol + worker .GroupName + utils .DashSymbol + utils .FormatInt32 (0 )
840841 fqdnRayIP := utils .GenerateFQDNServiceName (ctx , * cluster , cluster .Namespace )
841- podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" )
842+ podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" , "" , 0 )
842843 pod = BuildPod (ctx , podTemplateSpec , rayv1 .WorkerNode , worker .RayStartParams , "6379" , false , utils .RayServiceCRD , fqdnRayIP )
843844
844845 val , ok = pod .Labels [utils .RayClusterServingServiceLabelKey ]
@@ -894,7 +895,7 @@ func TestBuildPod_WithLoginBash(t *testing.T) {
894895 worker := cluster .Spec .WorkerGroupSpecs [0 ]
895896 podName = cluster .Name + utils .DashSymbol + string (rayv1 .WorkerNode ) + utils .DashSymbol + worker .GroupName + utils .DashSymbol + utils .FormatInt32 (0 )
896897 fqdnRayIP := utils .GenerateFQDNServiceName (ctx , * cluster , cluster .Namespace )
897- podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" )
898+ podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" , "" , 0 )
898899 workerPod := BuildPod (ctx , podTemplateSpec , rayv1 .WorkerNode , worker .RayStartParams , "6379" , false , utils .RayServiceCRD , fqdnRayIP )
899900
900901 // Verify worker container command
@@ -1157,11 +1158,35 @@ func TestDefaultWorkerPodTemplateWithName(t *testing.T) {
11571158 expectedWorker := * worker .DeepCopy ()
11581159
11591160 // Pass a deep copy of worker (*worker.DeepCopy()) to prevent "worker" from updating.
1160- podTemplateSpec := DefaultWorkerPodTemplate (ctx , * cluster , * worker .DeepCopy (), podName , fqdnRayIP , "6379" )
1161+ podTemplateSpec := DefaultWorkerPodTemplate (ctx , * cluster , * worker .DeepCopy (), podName , fqdnRayIP , "6379" , "" , 0 )
11611162 assert .Empty (t , podTemplateSpec .ObjectMeta .Name )
11621163 assert .Equal (t , expectedWorker , worker )
11631164}
11641165
1166+ func TestDeafultWorkerPodTemplateWithReplicaGrpAndIndex (t * testing.T ) {
1167+ ctx := context .Background ()
1168+
1169+ cluster := instance .DeepCopy ()
1170+
1171+ fqdnRayIP := utils .GenerateFQDNServiceName (ctx , * cluster , cluster .Namespace )
1172+ worker := cluster .Spec .WorkerGroupSpecs [0 ]
1173+
1174+ multihostIndexingEnabled := features .Enabled (features .RayMulithostIndexing ) && worker .NumOfHosts > 1
1175+ if multihostIndexingEnabled {
1176+ worker .Template .ObjectMeta .Name = "ray-worker-test"
1177+ podName := cluster .Name + utils .DashSymbol + string (rayv1 .WorkerNode ) + utils .DashSymbol + worker .GroupName + utils .DashSymbol + utils .FormatInt32 (0 )
1178+ groupReplicaName := utils .GenerateRayWorkerReplicaGroupName (worker .GroupName )
1179+
1180+ // Pass a deep copy of worker (*worker.DeepCopy()) to prevent "worker" from updating.
1181+ podTemplateSpec := DefaultWorkerPodTemplate (ctx , * cluster , * worker .DeepCopy (), podName , fqdnRayIP , "6379" , groupReplicaName , 2 )
1182+ assert .Empty (t , podTemplateSpec .ObjectMeta .Name )
1183+ assert .Equal (t , podTemplateSpec .Labels [utils .RayWorkerReplicaIndexKey ], groupReplicaName )
1184+ assert .Equal (t , "2" , podTemplateSpec .Labels [utils .RayHostIndexKey ])
1185+ } else {
1186+ t .Log ("Test skipped because RayMulithostIndexing feature is not enabled" )
1187+ }
1188+ }
1189+
11651190func containerPortExists (ports []corev1.ContainerPort , containerPort int32 ) error {
11661191 name := utils .MetricsPortName
11671192 for _ , port := range ports {
@@ -1204,7 +1229,7 @@ func TestDefaultWorkerPodTemplateWithConfigurablePorts(t *testing.T) {
12041229 worker := cluster .Spec .WorkerGroupSpecs [0 ]
12051230 podName := cluster .Name + utils .DashSymbol + string (rayv1 .WorkerNode ) + utils .DashSymbol + worker .GroupName + utils .DashSymbol + utils .FormatInt32 (0 )
12061231 fqdnRayIP := utils .GenerateFQDNServiceName (ctx , * cluster , cluster .Namespace )
1207- podTemplateSpec := DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" )
1232+ podTemplateSpec := DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" , "" , 0 )
12081233 // DefaultWorkerPodTemplate will add the default metrics port if user doesn't specify it.
12091234 // Verify the default metrics port exists.
12101235 require .NoError (t , containerPortExists (podTemplateSpec .Spec .Containers [0 ].Ports , int32 (utils .DefaultMetricsPort )))
@@ -1214,7 +1239,7 @@ func TestDefaultWorkerPodTemplateWithConfigurablePorts(t *testing.T) {
12141239 ContainerPort : customMetricsPort ,
12151240 }
12161241 cluster .Spec .WorkerGroupSpecs [0 ].Template .Spec .Containers [0 ].Ports = []corev1.ContainerPort {metricsPort }
1217- podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" )
1242+ podTemplateSpec = DefaultWorkerPodTemplate (ctx , * cluster , worker , podName , fqdnRayIP , "6379" , "" , 0 )
12181243 // Verify the custom metrics port exists.
12191244 require .NoError (t , containerPortExists (podTemplateSpec .Spec .Containers [0 ].Ports , customMetricsPort ))
12201245}
@@ -1253,7 +1278,7 @@ func TestDefaultWorkerPodTemplate_Autoscaling(t *testing.T) {
12531278
12541279 for name , tc := range tests {
12551280 t .Run (name , func (t * testing.T ) {
1256- podTemplateSpec := DefaultWorkerPodTemplate (ctx , tc .cluster , tc .cluster .Spec .WorkerGroupSpecs [0 ], podName , fqdnRayIP , "6379" )
1281+ podTemplateSpec := DefaultWorkerPodTemplate (ctx , tc .cluster , tc .cluster .Spec .WorkerGroupSpecs [0 ], podName , fqdnRayIP , "6379" , "" , 0 )
12571282 assert .Equal (t , tc .expectedRestartPolicy , podTemplateSpec .Spec .RestartPolicy )
12581283 })
12591284 }
@@ -1269,7 +1294,7 @@ func TestDefaultInitContainer(t *testing.T) {
12691294 expectedResult := len (cluster .Spec .WorkerGroupSpecs [0 ].Template .Spec .InitContainers ) + 1
12701295
12711296 // Pass a deep copy of worker (*worker.DeepCopy()) to prevent "worker" from updating.
1272- podTemplateSpec := DefaultWorkerPodTemplate (ctx , * cluster , * worker .DeepCopy (), podName , fqdnRayIP , "6379" )
1297+ podTemplateSpec := DefaultWorkerPodTemplate (ctx , * cluster , * worker .DeepCopy (), podName , fqdnRayIP , "6379" , "" , 0 )
12731298 numInitContainers := len (podTemplateSpec .Spec .InitContainers )
12741299 assert .Equal (t , expectedResult , numInitContainers , "A default init container is expected to be added." )
12751300
@@ -1328,7 +1353,7 @@ func TestDefaultInitContainerImagePullPolicy(t *testing.T) {
13281353 // set ray container imagePullPolicy
13291354 worker .Template .Spec .Containers [utils .RayContainerIndex ].ImagePullPolicy = tc .imagePullPolicy
13301355
1331- podTemplateSpec := DefaultWorkerPodTemplate (ctx , * cluster , * worker .DeepCopy (), podName , fqdnRayIP , "6379" )
1356+ podTemplateSpec := DefaultWorkerPodTemplate (ctx , * cluster , * worker .DeepCopy (), podName , fqdnRayIP , "6379" , "" , 0 )
13321357
13331358 healthCheckContainer := podTemplateSpec .Spec .InitContainers [len (podTemplateSpec .Spec .InitContainers )- 1 ]
13341359 assert .Equal (t , tc .expectedPullPolicy , healthCheckContainer .ImagePullPolicy , "The ImagePullPolicy of the init container should be the same as the Ray container." )
0 commit comments