diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index f0049b3e0d..bc5e95755e 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -321,6 +321,23 @@ func (s *FunctionalSuite) TestFallbackSink() { w.Expect().RedisSinkContains("simple-fallback-output", "fallback-message") } +func (s *FunctionalSuite) TestOnSuccessSink() { + + w := s.Given().Pipeline("@testdata/simple-on-success.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "simple-on-success" + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + // send a message to the pipeline + w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("on-success-message"))) + + w.Expect().RedisSinkContains("simple-on-success-output", "on-success-message") +} + func (s *FunctionalSuite) TestExponentialBackoffRetryStrategyForPipeline() { w := s.Given().Pipeline("@testdata/simple-pipeline-with-retry-strategy.yaml"). When(). diff --git a/test/e2e/testdata/simple-on-success.yaml b/test/e2e/testdata/simple-on-success.yaml new file mode 100644 index 0000000000..771b546a15 --- /dev/null +++ b/test/e2e/testdata/simple-on-success.yaml @@ -0,0 +1,41 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: simple-on-success +spec: + vertices: + - name: in + limits: + readBatchSize: 1 + source: + http: {} + - name: udf + scale: + min: 1 + udf: + container: + image: quay.io/numaio/numaflow-go/map-cat:stable # A UDF which simply cats the message + imagePullPolicy: IfNotPresent + - name: output + scale: + min: 1 + sink: + udsink: + container: + image: quay.io/numaio/numaflow-go/on-success-log:stable + imagePullPolicy: IfNotPresent + onSuccess: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/examples/sinker/redis_sink + image: quay.io/numaio/numaflow-go/redis-sink:stable + imagePullPolicy: IfNotPresent + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "simple-on-success-output" + edges: + - from: in + to: udf + - from: udf + to: output