|
4 | 4 | "testing" |
5 | 5 |
|
6 | 6 | . "github.com/onsi/gomega" |
| 7 | + corev1 "k8s.io/api/core/v1" |
7 | 8 | k8serrors "k8s.io/apimachinery/pkg/api/errors" |
8 | 9 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
9 | 10 | corev1ac "k8s.io/client-go/applyconfigurations/core/v1" |
@@ -198,4 +199,122 @@ env_vars: |
198 | 199 | g.Expect(err).NotTo(HaveOccurred()) |
199 | 200 | LogWithTimestamp(test.T(), "Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) |
200 | 201 | }) |
| 202 | + |
| 203 | + test.T().Run("Successful RayJob in Sidecar mode with auth token", func(_ *testing.T) { |
| 204 | + rayJobAC := rayv1ac.RayJob("counter-auth", namespace.Name). |
| 205 | + WithSpec(rayv1ac.RayJobSpec(). |
| 206 | + WithSubmissionMode(rayv1.SidecarMode). |
| 207 | + WithEntrypoint("python /home/ray/jobs/counter.py"). |
| 208 | + WithRuntimeEnvYAML(` |
| 209 | +env_vars: |
| 210 | + counter_name: test_counter |
| 211 | +`). |
| 212 | + WithShutdownAfterJobFinishes(true). |
| 213 | + WithRayClusterSpec(NewRayClusterSpec( |
| 214 | + MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")). |
| 215 | + WithAuthOptions(rayv1ac.AuthOptions().WithMode(rayv1.AuthModeToken)))) |
| 216 | + |
| 217 | + rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) |
| 218 | + g.Expect(err).NotTo(HaveOccurred()) |
| 219 | + LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully with auth token", rayJob.Namespace, rayJob.Name) |
| 220 | + |
| 221 | + // Wait for RayCluster name to be populated |
| 222 | + LogWithTimestamp(test.T(), "Waiting for RayCluster to be created") |
| 223 | + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). |
| 224 | + Should(WithTransform(RayJobClusterName, Not(BeEmpty()))) |
| 225 | + |
| 226 | + // Get RayCluster name |
| 227 | + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) |
| 228 | + g.Expect(err).NotTo(HaveOccurred()) |
| 229 | + rayClusterName := rayJob.Status.RayClusterName |
| 230 | + |
| 231 | + // Wait for RayCluster to become ready |
| 232 | + LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", namespace.Name, rayClusterName) |
| 233 | + g.Eventually(RayCluster(test, namespace.Name, rayClusterName), TestTimeoutMedium). |
| 234 | + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) |
| 235 | + |
| 236 | + // Get RayCluster and verify auth token environment variables |
| 237 | + rayCluster, err := GetRayCluster(test, namespace.Name, rayClusterName) |
| 238 | + g.Expect(err).NotTo(HaveOccurred()) |
| 239 | + |
| 240 | + headPod, err := GetHeadPod(test, rayCluster) |
| 241 | + g.Expect(err).NotTo(HaveOccurred()) |
| 242 | + g.Expect(headPod).NotTo(BeNil()) |
| 243 | + |
| 244 | + // Verify Ray container has auth token env vars |
| 245 | + verifyContainerAuthTokenEnvVars(test.T(), rayCluster, headPod, utils.RayContainerIndex) |
| 246 | + |
| 247 | + // Verify submitter container has auth token env vars |
| 248 | + submitterContainerIndex := -1 |
| 249 | + for i, container := range headPod.Spec.Containers { |
| 250 | + if container.Name == utils.SubmitterContainerName { |
| 251 | + submitterContainerIndex = i |
| 252 | + break |
| 253 | + } |
| 254 | + } |
| 255 | + g.Expect(submitterContainerIndex).NotTo(Equal(-1), "submitter container should be present in head pod") |
| 256 | + verifyContainerAuthTokenEnvVars(test.T(), rayCluster, headPod, submitterContainerIndex) |
| 257 | + |
| 258 | + // Verify worker pods have auth token env vars |
| 259 | + workerPods, err := GetWorkerPods(test, rayCluster) |
| 260 | + g.Expect(err).NotTo(HaveOccurred()) |
| 261 | + g.Expect(workerPods).ToNot(BeEmpty()) |
| 262 | + for _, workerPod := range workerPods { |
| 263 | + verifyContainerAuthTokenEnvVars(test.T(), rayCluster, &workerPod, utils.RayContainerIndex) |
| 264 | + } |
| 265 | + |
| 266 | + LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) |
| 267 | + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). |
| 268 | + Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal))) |
| 269 | + |
| 270 | + // Assert the RayJob has completed successfully |
| 271 | + g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). |
| 272 | + To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded))) |
| 273 | + |
| 274 | + // And the RayJob deployment status is updated accordingly |
| 275 | + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)). |
| 276 | + Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete))) |
| 277 | + |
| 278 | + LogWithTimestamp(test.T(), "RayJob %s/%s completed successfully with auth token", rayJob.Namespace, rayJob.Name) |
| 279 | + }) |
| 280 | +} |
| 281 | + |
| 282 | +// verifyContainerAuthTokenEnvVars verifies that the specified container has the correct auth token environment variables. |
| 283 | +func verifyContainerAuthTokenEnvVars(t *testing.T, rayCluster *rayv1.RayCluster, pod *corev1.Pod, containerIndex int) { |
| 284 | + g := NewWithT(t) |
| 285 | + |
| 286 | + g.Expect(len(pod.Spec.Containers)).To(BeNumerically(">", containerIndex), |
| 287 | + "Container index %d should exist in pod", containerIndex) |
| 288 | + |
| 289 | + container := pod.Spec.Containers[containerIndex] |
| 290 | + |
| 291 | + var rayAuthModeEnvVar *corev1.EnvVar |
| 292 | + for _, envVar := range container.Env { |
| 293 | + if envVar.Name == utils.RAY_AUTH_MODE_ENV_VAR { |
| 294 | + rayAuthModeEnvVar = &envVar |
| 295 | + break |
| 296 | + } |
| 297 | + } |
| 298 | + g.Expect(rayAuthModeEnvVar).NotTo(BeNil(), |
| 299 | + "RAY_AUTH_MODE environment variable should be set in container %s", container.Name) |
| 300 | + g.Expect(rayAuthModeEnvVar.Value).To(Equal(string(rayv1.AuthModeToken)), |
| 301 | + "RAY_AUTH_MODE should be %s in container %s", rayv1.AuthModeToken, container.Name) |
| 302 | + |
| 303 | + var rayAuthTokenEnvVar *corev1.EnvVar |
| 304 | + for _, envVar := range container.Env { |
| 305 | + if envVar.Name == utils.RAY_AUTH_TOKEN_ENV_VAR { |
| 306 | + rayAuthTokenEnvVar = &envVar |
| 307 | + break |
| 308 | + } |
| 309 | + } |
| 310 | + g.Expect(rayAuthTokenEnvVar).NotTo(BeNil(), |
| 311 | + "RAY_AUTH_TOKEN environment variable should be set for AuthModeToken in container %s", container.Name) |
| 312 | + g.Expect(rayAuthTokenEnvVar.ValueFrom).NotTo(BeNil(), |
| 313 | + "RAY_AUTH_TOKEN should be populated from a secret in container %s", container.Name) |
| 314 | + g.Expect(rayAuthTokenEnvVar.ValueFrom.SecretKeyRef).NotTo(BeNil(), |
| 315 | + "RAY_AUTH_TOKEN should be populated from a secret key ref in container %s", container.Name) |
| 316 | + g.Expect(rayAuthTokenEnvVar.ValueFrom.SecretKeyRef.Name).To(ContainSubstring(rayCluster.Name), |
| 317 | + "Secret name should contain RayCluster name in container %s", container.Name) |
| 318 | + g.Expect(rayAuthTokenEnvVar.ValueFrom.SecretKeyRef.Key).To(Equal(utils.RAY_AUTH_TOKEN_SECRET_KEY), |
| 319 | + "Secret key should be %s in container %s", utils.RAY_AUTH_TOKEN_SECRET_KEY, container.Name) |
201 | 320 | } |
0 commit comments