From cc1ce8de7423215c354d65a7254ce591e6ea5b5a Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Wed, 5 Nov 2025 22:31:57 -0500 Subject: [PATCH 1/5] Add e2e test for on-success-sink Signed-off-by: Vaibhav Tiwari --- test/e2e/functional_test.go | 17 ++++++++++ test/e2e/testdata/simple-on-success.yaml | 41 ++++++++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 test/e2e/testdata/simple-on-success.yaml diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index f0049b3e0d..bc5e95755e 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -321,6 +321,23 @@ func (s *FunctionalSuite) TestFallbackSink() { w.Expect().RedisSinkContains("simple-fallback-output", "fallback-message") } +func (s *FunctionalSuite) TestOnSuccessSink() { + + w := s.Given().Pipeline("@testdata/simple-on-success.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "simple-on-success" + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + // send a message to the pipeline + w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("on-success-message"))) + + w.Expect().RedisSinkContains("simple-on-success-output", "on-success-message") +} + func (s *FunctionalSuite) TestExponentialBackoffRetryStrategyForPipeline() { w := s.Given().Pipeline("@testdata/simple-pipeline-with-retry-strategy.yaml"). When(). diff --git a/test/e2e/testdata/simple-on-success.yaml b/test/e2e/testdata/simple-on-success.yaml new file mode 100644 index 0000000000..771b546a15 --- /dev/null +++ b/test/e2e/testdata/simple-on-success.yaml @@ -0,0 +1,41 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: simple-on-success +spec: + vertices: + - name: in + limits: + readBatchSize: 1 + source: + http: {} + - name: udf + scale: + min: 1 + udf: + container: + image: quay.io/numaio/numaflow-go/map-cat:stable # A UDF which simply cats the message + imagePullPolicy: IfNotPresent + - name: output + scale: + min: 1 + sink: + udsink: + container: + image: quay.io/numaio/numaflow-go/on-success-log:stable + imagePullPolicy: IfNotPresent + onSuccess: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/examples/sinker/redis_sink + image: quay.io/numaio/numaflow-go/redis-sink:stable + imagePullPolicy: IfNotPresent + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "simple-on-success-output" + edges: + - from: in + to: udf + - from: udf + to: output From 7505efe1c8b34b9f735f323484b45b6713b6293d Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Thu, 6 Nov 2025 10:41:46 -0500 Subject: [PATCH 2/5] Get pod logs when it fails to come up Signed-off-by: Vaibhav Tiwari --- test/fixtures/util.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/fixtures/util.go b/test/fixtures/util.go index dd71011352..e70c1ea0f6 100644 --- a/test/fixtures/util.go +++ b/test/fixtures/util.go @@ -326,6 +326,28 @@ func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowp for { select { case <-ctx.Done(): + + // TODO: ADDED FOR DEBUGGING, REMOVE BEFORE MERGING + podList, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) + if err != nil { + return fmt.Errorf("error getting vertex pod name: %w", err) + } + for _, p := range podList.Items { + stream, err := kubeClient.CoreV1().Pods(namespace).GetLogs(p.Name, &corev1.PodLogOptions{Follow: true}).Stream(ctx) + if err != nil { + return fmt.Errorf("timeout after %v waiting for vertex pod running. Error getting logs: %s", timeout, err) + } + + scanner := bufio.NewScanner(stream) + for scanner.Scan() { + log.Println(scanner.Text()) + } + + err = stream.Close() + if err != nil { + log.Println("Error closing stream: ", err) + } + } return fmt.Errorf("timeout after %v waiting for vertex pod running", timeout) default: } From 051c5d53f2da878423cb923102c63bd66ce4ebd5 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Thu, 6 Nov 2025 10:54:48 -0500 Subject: [PATCH 3/5] Get numa container logs for pod which fails to come up Signed-off-by: Vaibhav Tiwari --- test/fixtures/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures/util.go b/test/fixtures/util.go index e70c1ea0f6..6fa0e06f8b 100644 --- a/test/fixtures/util.go +++ b/test/fixtures/util.go @@ -333,7 +333,7 @@ func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowp return fmt.Errorf("error getting vertex pod name: %w", err) } for _, p := range podList.Items { - stream, err := kubeClient.CoreV1().Pods(namespace).GetLogs(p.Name, &corev1.PodLogOptions{Follow: true}).Stream(ctx) + stream, err := kubeClient.CoreV1().Pods(namespace).GetLogs(p.Name, &corev1.PodLogOptions{Follow: false, Container: "numa"}).Stream(ctx) if err != nil { return fmt.Errorf("timeout after %v waiting for vertex pod running. Error getting logs: %s", timeout, err) } From b5d1bbc047b3b48277708ef6197279172f8c1e56 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Thu, 6 Nov 2025 11:41:17 -0500 Subject: [PATCH 4/5] Get numa container logs for pod which fails to come up Signed-off-by: Vaibhav Tiwari --- test/fixtures/util.go | 39 +++++++++++++++++---------------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/test/fixtures/util.go b/test/fixtures/util.go index 6fa0e06f8b..7a195dd50b 100644 --- a/test/fixtures/util.go +++ b/test/fixtures/util.go @@ -326,28 +326,6 @@ func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowp for { select { case <-ctx.Done(): - - // TODO: ADDED FOR DEBUGGING, REMOVE BEFORE MERGING - podList, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) - if err != nil { - return fmt.Errorf("error getting vertex pod name: %w", err) - } - for _, p := range podList.Items { - stream, err := kubeClient.CoreV1().Pods(namespace).GetLogs(p.Name, &corev1.PodLogOptions{Follow: false, Container: "numa"}).Stream(ctx) - if err != nil { - return fmt.Errorf("timeout after %v waiting for vertex pod running. Error getting logs: %s", timeout, err) - } - - scanner := bufio.NewScanner(stream) - for scanner.Scan() { - log.Println(scanner.Text()) - } - - err = stream.Close() - if err != nil { - log.Println("Error closing stream: ", err) - } - } return fmt.Errorf("timeout after %v waiting for vertex pod running", timeout) default: } @@ -364,6 +342,23 @@ func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowp for _, p := range podList.Items { log.Println("Checking for vertex pod ready: ", p.Name) ok = ok && isPodReady(p) + // TODO: ADDED FOR DEBUGGING, REMOVE BEFORE MERGING + if !ok { + stream, err := kubeClient.CoreV1().Pods(namespace).GetLogs(p.Name, &corev1.PodLogOptions{Follow: false, Container: "numa"}).Stream(ctx) + if err != nil { + return fmt.Errorf("timeout after %v waiting for vertex pod running. Error getting logs: %s", timeout, err) + } + + scanner := bufio.NewScanner(stream) + for scanner.Scan() { + log.Println(scanner.Text()) + } + + err = stream.Close() + if err != nil { + log.Println("Error closing stream: ", err) + } + } } if ok { return nil From 7de8a67ac2fbeafeec5e122dd662731ad074bcab Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Thu, 6 Nov 2025 12:12:16 -0500 Subject: [PATCH 5/5] Revert log emission for debugging Signed-off-by: Vaibhav Tiwari --- test/fixtures/util.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/test/fixtures/util.go b/test/fixtures/util.go index 7a195dd50b..dd71011352 100644 --- a/test/fixtures/util.go +++ b/test/fixtures/util.go @@ -342,23 +342,6 @@ func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowp for _, p := range podList.Items { log.Println("Checking for vertex pod ready: ", p.Name) ok = ok && isPodReady(p) - // TODO: ADDED FOR DEBUGGING, REMOVE BEFORE MERGING - if !ok { - stream, err := kubeClient.CoreV1().Pods(namespace).GetLogs(p.Name, &corev1.PodLogOptions{Follow: false, Container: "numa"}).Stream(ctx) - if err != nil { - return fmt.Errorf("timeout after %v waiting for vertex pod running. Error getting logs: %s", timeout, err) - } - - scanner := bufio.NewScanner(stream) - for scanner.Scan() { - log.Println(scanner.Text()) - } - - err = stream.Close() - if err != nil { - log.Println("Error closing stream: ", err) - } - } } if ok { return nil