Skip to content
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b0172bc
[historyserver] Fix getJobID for job event collection
Future-Outlier Jan 5, 2026
eeb7a7f
add jia-wei as co-author, since he debug with me together
Future-Outlier Jan 5, 2026
0ff5bcf
remove unused code
Future-Outlier Jan 5, 2026
5aea305
update rueian's advice
Future-Outlier Jan 5, 2026
5f547a2
add task profile event example
Future-Outlier Jan 5, 2026
0177160
revert back oneof solution
Future-Outlier Jan 5, 2026
420ce9a
add task profile event
Future-Outlier Jan 5, 2026
0621ba4
test: Test event type coverage in happy path
JiangJiaWei1103 Jan 5, 2026
07403f7
Merge branch 'my-master' into epic-4274/e2e-test-coverage-of-event-types
JiangJiaWei1103 Jan 6, 2026
de153d1
refactor: Remove redundant code
JiangJiaWei1103 Jan 6, 2026
d1c2b18
test: Verify event type coverage of aggregated node and job events
JiangJiaWei1103 Jan 6, 2026
14a9b52
test: Force kill worker container and verify event coverage
JiangJiaWei1103 Jan 6, 2026
598dbfd
refactor: Create an WorkerPods adapter and remove redundancy
JiangJiaWei1103 Jan 7, 2026
7492761
test: Check both head and worker logs
JiangJiaWei1103 Jan 7, 2026
9611a42
refactor: Use eventually to wrap coverage check
JiangJiaWei1103 Jan 9, 2026
86e73da
test: Check raylet.out and gcs_server.out
JiangJiaWei1103 Jan 12, 2026
ecf3648
docs: Correct docs
JiangJiaWei1103 Jan 12, 2026
9f70a21
refactor: List subdirs of job_events rather than hardcoding
JiangJiaWei1103 Jan 13, 2026
84519bc
fix: Wait for async job events flushing on worker
JiangJiaWei1103 Jan 13, 2026
c796bfc
test: Consolidate tests by checking non-empty list
JiangJiaWei1103 Jan 13, 2026
c3f73b1
fix: Aggregate all event files not just the first file obj
JiangJiaWei1103 Jan 13, 2026
302d903
fix: Avoid redundant appends
JiangJiaWei1103 Jan 13, 2026
6237c07
list job
Future-Outlier Jan 14, 2026
5c8fc59
fix: Explicitly close content body to avoid resource leaks
JiangJiaWei1103 Jan 14, 2026
10237ec
docs: Remove redundant notes
JiangJiaWei1103 Jan 14, 2026
0e28fa2
fix: Close content on failure to prevent rsc leak
JiangJiaWei1103 Jan 14, 2026
48681ab
docs: Update helper usage
JiangJiaWei1103 Jan 14, 2026
5c5c878
Merge branch 'my-master' into epic-4274/e2e-test-coverage-of-event-types
JiangJiaWei1103 Jan 14, 2026
e640798
test: Test log file existence only
JiangJiaWei1103 Jan 14, 2026
706199c
style: Remove trailing slash
JiangJiaWei1103 Jan 14, 2026
6f47105
docs: State why we use sleep
JiangJiaWei1103 Jan 15, 2026
3a2bc7a
Merge branch 'epic-4274/e2e-test-coverage-of-event-types' into epic-4…
JiangJiaWei1103 Jan 15, 2026
4204a5c
Merge branch 'my-master' into epic-4274/e2e-test-head-worker-logs
JiangJiaWei1103 Jan 19, 2026
4529c9f
refactor: Wrap event type coverage assertion in eventually
JiangJiaWei1103 Jan 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 70 additions & 73 deletions historyserver/test/e2e/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev
// Submit a Ray job to the existing cluster.
_ = applyRayJobAndWaitForCompletion(test, g, namespace)

// Define variables for constructing S3 object prefix.
clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID)
sessionID := getSessionIDFromHeadPod(test, g, rayCluster)
nodeID := getNodeIDFromHeadPod(test, g, rayCluster)
headNodeID := getNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head")
workerNodeID := getNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker")
sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, sessionID)

// Delete the Ray cluster to trigger log uploading and event flushing on deletion.
Expand All @@ -123,7 +125,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev
}, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))

// Verify logs, node_events, and job_events are successfully uploaded to S3.
verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID)
verifyS3SessionDirs(test, g, s3Client, sessionPrefix, headNodeID, workerNodeID)

// Delete S3 bucket to ensure test isolation.
deleteS3Bucket(test, g, s3Client)
Expand Down Expand Up @@ -155,7 +157,8 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1

clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID)
sessionID := getSessionIDFromHeadPod(test, g, rayCluster)
nodeID := getNodeIDFromHeadPod(test, g, rayCluster)
headNodeID := getNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head")
workerNodeID := getNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker")
sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, sessionID)

// NOTE: We use `kill 1` to simulate Kubernetes OOMKilled behavior.
Expand Down Expand Up @@ -183,7 +186,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1
}

// Verify logs, node_events, and job_events are successfully uploaded to S3.
verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID)
verifyS3SessionDirs(test, g, s3Client, sessionPrefix, headNodeID, workerNodeID)

deleteS3Bucket(test, g, s3Client)
}
Expand Down Expand Up @@ -499,83 +502,56 @@ func applyRayJobAndWaitForCompletion(test Test, g *WithT, namespace *corev1.Name
// verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3.
// There are two phases of verification:
// 1. Verify file contents in logs/ directory
// - logs/<nodeID>/raylet.out must exist and have content > 0 bytes
// - TODO(jwj): Complete docs.
// - For the head node, verify raylet.out, gcs_server.out, and monitor.out exist
// - For the worker node, verify raylet.out exists
//
// 2. Verify event type coverage in node_events/ and job_events/ directories
// - Aggregate all events from node_events/ and job_events/ directories
// - Verify that all potential event types are present in the aggregated events
//
// NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage.
func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) {
// TODO(jwj): Separate verification for logs and events.
dirs := []string{"logs"}
for _, dir := range dirs {
dirPrefix := sessionPrefix + dir + "/"

g.Eventually(func(gg Gomega) {
// Verify the directory has at least one object.
objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(s3BucketName),
Prefix: aws.String(dirPrefix),
MaxKeys: aws.Int64(10),
})
gg.Expect(err).NotTo(HaveOccurred())
keyCount := aws.Int64Value(objects.KeyCount)
gg.Expect(keyCount).To(BeNumerically(">", 0))
LogWithTimestamp(test.T(), "Verified directory %s under %s has at least one object", dir, sessionPrefix)

// Find the first file object for content verification.
var fileObj *s3.Object
for _, obj := range objects.Contents {
if !strings.HasSuffix(aws.StringValue(obj.Key), "/") {
fileObj = obj
break
}
}
gg.Expect(fileObj).NotTo(BeNil(), "No file object found in directory %s", dirPrefix)

// Verify the file has content by checking file size.
fileKey := *fileObj.Key
LogWithTimestamp(test.T(), "Checking file: %s", fileKey)
obj, err := s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(s3BucketName),
Key: aws.String(fileKey),
})
gg.Expect(err).NotTo(HaveOccurred())
fileSize := aws.Int64Value(obj.ContentLength)
gg.Expect(fileSize).To(BeNumerically(">", 0))
LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize)
}, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix)
func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, headNodeID string, workerNodeID string) {
// Verify file contents in logs/ directory.
headLogDirPrefix := fmt.Sprintf("%slogs/%s", sessionPrefix, headNodeID)
workerLogDirPrefix := fmt.Sprintf("%slogs/%s", sessionPrefix, workerNodeID)

LogWithTimestamp(test.T(), "Verifying raylet.out, gcs_server.out, and monitor.out exist in head log directory %s", headLogDirPrefix)
for _, fileName := range []string{"raylet.out", "gcs_server.out", "monitor.out"} {
assertFileExist(test, g, s3Client, headLogDirPrefix, fileName)
}

LogWithTimestamp(test.T(), "Verifying raylet.out exists in worker log directory %s", workerLogDirPrefix)
assertFileExist(test, g, s3Client, workerLogDirPrefix, "raylet.out")

// Verify event type coverage in node_events/ and job_events/ directories.
LogWithTimestamp(test.T(), "Verifying all %d event types are covered, except for EVENT_TYPE_UNSPECIFIED: %v", len(types.AllEventTypes)-1, types.AllEventTypes)
uploadedEvents := []rayEvent{}
g.Eventually(func(gg Gomega) {
uploadedEvents := []rayEvent{}

// Load events from node_events directory.
nodeEventsPrefix := sessionPrefix + "node_events/"
nodeEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, nodeEventsPrefix)
g.Expect(err).NotTo(HaveOccurred())
uploadedEvents = append(uploadedEvents, nodeEvents...)
LogWithTimestamp(test.T(), "Loaded %d events from node_events", len(nodeEvents))
// Load events from node_events directory.
nodeEventsPrefix := sessionPrefix + "node_events/"
nodeEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, nodeEventsPrefix)
gg.Expect(err).NotTo(HaveOccurred())
uploadedEvents = append(uploadedEvents, nodeEvents...)
LogWithTimestamp(test.T(), "Loaded %d events from node_events", len(nodeEvents))

// Dynamically discover and load events from job_events directories.
jobEventsPrefix := sessionPrefix + "job_events/"
jobDirs, err := listS3Directories(s3Client, s3BucketName, jobEventsPrefix)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(jobDirs).NotTo(BeEmpty())
LogWithTimestamp(test.T(), "Found %d job directories: %v", len(jobDirs), jobDirs)

for _, jobDir := range jobDirs {
jobDirPrefix := jobEventsPrefix + jobDir + "/"
jobEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, jobDirPrefix)
g.Expect(err).NotTo(HaveOccurred())
uploadedEvents = append(uploadedEvents, jobEvents...)
LogWithTimestamp(test.T(), "Loaded %d events from job_events/%s", len(jobEvents), jobDir)
}
// Dynamically discover and load events from job_events directories.
jobEventsPrefix := sessionPrefix + "job_events/"
jobDirs, err := listS3Directories(s3Client, s3BucketName, jobEventsPrefix)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(jobDirs).NotTo(BeEmpty())
LogWithTimestamp(test.T(), "Found %d job directories: %v", len(jobDirs), jobDirs)

for _, jobDir := range jobDirs {
jobDirPrefix := jobEventsPrefix + jobDir + "/"
jobEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, jobDirPrefix)
gg.Expect(err).NotTo(HaveOccurred())
uploadedEvents = append(uploadedEvents, jobEvents...)
LogWithTimestamp(test.T(), "Loaded %d events from job_events/%s", len(jobEvents), jobDir)
}

assertAllEventTypesCovered(test, g, uploadedEvents)
assertAllEventTypesCovered(test, gg, uploadedEvents)
}, TestTimeoutMedium).Should(Succeed())
}

// getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink
Expand All @@ -601,9 +577,9 @@ fi`
return sessionID
}

// getNodeIDFromHeadPod retrieves the nodeID from the Ray head pod by reading /tmp/ray/raylet_node_id.
func getNodeIDFromHeadPod(test Test, g *WithT, rayCluster *rayv1.RayCluster) string {
headPod, err := GetHeadPod(test, rayCluster)
// getNodeIDFromPod retrieves the nodeID from the Ray head or worker pod by reading /tmp/ray/raylet_node_id.
func getNodeIDFromPod(test Test, g *WithT, getPod func() (*corev1.Pod, error), containerName string) string {
pod, err := getPod()
g.Expect(err).NotTo(HaveOccurred())

getNodeIDCmd := `if [ -f "/tmp/ray/raylet_node_id" ]; then
Expand All @@ -612,7 +588,7 @@ else
echo "raylet_node_id not found"
exit 1
fi`
output, _ := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", getNodeIDCmd})
output, _ := ExecPodCmd(test, pod, containerName, []string{"sh", "-c", getNodeIDCmd})

// Parse output to extract the nodeID.
nodeID := strings.TrimSpace(output.String())
Expand Down Expand Up @@ -742,16 +718,37 @@ func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEv
return events, nil
}

// assertFileExist verifies that a file object exists under the given log directory prefix.
// For a Ray cluster with one head node and one worker node, there are two log directories to verify:
// - logs/<headNodeID>/
// - logs/<workerNodeID>/
func assertFileExist(test Test, g *WithT, s3Client *s3.S3, nodeLogDirPrefix string, fileName string) {
fileKey := fmt.Sprintf("%s/%s", nodeLogDirPrefix, fileName)
LogWithTimestamp(test.T(), "Verifying file %s exists", fileKey)
g.Eventually(func(gg Gomega) {
_, err := s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(s3BucketName),
Key: aws.String(fileKey),
})
gg.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Verified file %s exists", fileKey)
}, TestTimeoutMedium).Should(Succeed(), "Failed to verify file %s exists", fileKey)
}

// assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3.
// NOTE: EVENT_TYPE_UNSPECIFIED is excluded from verification.
func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) {
//
// This function accepts Gomega (not *WithT) because it's called from within g.Eventually() callbacks,
// which provide a nested Gomega instance for assertions.
func assertAllEventTypesCovered(test Test, g Gomega, events []rayEvent) {
foundEventTypes := map[string]bool{}
for _, event := range events {
foundEventTypes[event.EventType] = true
}

for _, eventType := range types.AllEventTypes {
if eventType == types.EVENT_TYPE_UNSPECIFIED {
LogWithTimestamp(test.T(), "Skipping verification for EVENT_TYPE_UNSPECIFIED")
continue
}
g.Expect(foundEventTypes[string(eventType)]).To(BeTrue(), "Event type %s not found", eventType)
Expand Down
Loading