diff --git a/test/rekt/features/kafka_source.go b/test/rekt/features/kafka_source.go index a4065d31e0..f52e27afec 100644 --- a/test/rekt/features/kafka_source.go +++ b/test/rekt/features/kafka_source.go @@ -22,8 +22,6 @@ import ( "fmt" "strings" - "knative.dev/eventing/test/rekt/features/featureflags" - cloudevents "github.com/cloudevents/sdk-go/v2" . "github.com/cloudevents/sdk-go/v2/test" cetest "github.com/cloudevents/sdk-go/v2/test" @@ -32,6 +30,7 @@ import ( "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" + "knative.dev/eventing/test/rekt/features/featureflags" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" @@ -45,11 +44,10 @@ import ( "knative.dev/reconciler-test/pkg/manifest" "knative.dev/reconciler-test/pkg/resources/service" - "knative.dev/eventing/test/rekt/features/source" - testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/rekt/features/featuressteps" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink" + "knative.dev/eventing/test/rekt/features/source" internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" @@ -287,13 +285,9 @@ type kafkaSinkConfig struct { opts []manifest.CfgFn } -func kafkaSourceFeature(name string, +func KafkaSourceFeatureSetup(f *feature.Feature, kafkaSourceCfg kafkaSourceConfig, - kafkaSinkCfg kafkaSinkConfig, - senderOpts []eventshub.EventsHubOption, - matcher cetest.EventMatcher) *feature.Feature { - - f := feature.NewFeatureNamed(name) + kafkaSinkCfg kafkaSinkConfig) (string, string) { if kafkaSourceCfg.topic == "" { kafkaSourceCfg.topic = feature.MakeRandomK8sName("topic") @@ -308,7 +302,7 @@ func kafkaSourceFeature(name string, } receiver := feature.MakeRandomK8sName("eventshub-receiver") - sender := feature.MakeRandomK8sName("eventshub-sender") + secretName := feature.MakeRandomK8sName("secret") f.Setup("install kafka topic", kafkatopic.Install(kafkaSourceCfg.topic)) @@ -355,15 +349,22 @@ func kafkaSourceFeature(name string, f.Setup("install kafka source", kafkasource.Install(kafkaSourceCfg.sourceName, kafkaSourceOpts...)) f.Setup("kafka source is ready", kafkasource.IsReady(kafkaSourceCfg.sourceName)) + return kafkaSinkCfg.sinkName, receiver +} + +func KafkaSourceFeatureAssert(f *feature.Feature, kafkaSink, receiver string, customizeFunc CustomizeEventFunc) { + sender := feature.MakeRandomK8sName("eventshub-sender") options := []eventshub.EventsHubOption{ - eventshub.StartSenderToResource(kafkasink.GVR(), kafkaSinkCfg.sinkName), + eventshub.StartSenderToResource(kafkasink.GVR(), kafkaSink), } + + senderOpts, matcher := customizeFunc() + options = append(options, senderOpts...) + f.Requirement("install eventshub sender", eventshub.Install(sender, options...)) f.Assert("eventshub receiver gets event", matchEvent(receiver, matcher)) - - return f } func matchEvent(sink string, matcher EventMatcher) feature.StepFn { @@ -372,63 +373,86 @@ func matchEvent(sink string, matcher EventMatcher) feature.StepFn { } } -func KafkaSourceBinaryEvent() *feature.Feature { - senderOptions := []eventshub.EventsHubOption{ - eventshub.InputHeader("ce-specversion", cloudevents.VersionV1), - eventshub.InputHeader("ce-type", "com.github.pull.create"), - eventshub.InputHeader("ce-source", "github.com/cloudevents/spec/pull"), - eventshub.InputHeader("ce-subject", "123"), - eventshub.InputHeader("ce-id", "A234-1234-1234"), - eventshub.InputHeader("content-type", "application/json"), - eventshub.InputHeader("ce-comexampleextension1", "value"), - eventshub.InputHeader("ce-comexampleothervalue", "5"), - eventshub.InputBody(marshalJSON(map[string]string{ - "hello": "Francesco", - })), +// CustomizeEventFunc creates a pair of eventshub options that customize the event +// and corresponding event matcher that will match the respective event. +type CustomizeEventFunc func() ([]eventshub.EventsHubOption, EventMatcher) + +func KafkaSourceBinaryEventCustomizeFunc() CustomizeEventFunc { + return func() ([]eventshub.EventsHubOption, EventMatcher) { + id := feature.MakeRandomK8sName("id") + senderOptions := []eventshub.EventsHubOption{ + eventshub.InputHeader("ce-specversion", cloudevents.VersionV1), + eventshub.InputHeader("ce-type", "com.github.pull.create"), + eventshub.InputHeader("ce-source", "github.com/cloudevents/spec/pull"), + eventshub.InputHeader("ce-subject", "123"), + eventshub.InputHeader("ce-id", id), + eventshub.InputHeader("content-type", "application/json"), + eventshub.InputHeader("ce-comexampleextension1", "value"), + eventshub.InputHeader("ce-comexampleothervalue", "5"), + eventshub.InputBody(marshalJSON(map[string]string{ + "hello": "Francesco", + })), + } + matcher := AllOf( + HasSpecVersion(cloudevents.VersionV1), + HasType("com.github.pull.create"), + HasSource("github.com/cloudevents/spec/pull"), + HasSubject("123"), + HasId(id), + HasDataContentType("application/json"), + HasData([]byte(`{"hello":"Francesco"}`)), + HasExtension("comexampleextension1", "value"), + HasExtension("comexampleothervalue", "5"), + ) + return senderOptions, matcher } - matcher := AllOf( - HasSpecVersion(cloudevents.VersionV1), - HasType("com.github.pull.create"), - HasSource("github.com/cloudevents/spec/pull"), - HasSubject("123"), - HasId("A234-1234-1234"), - HasDataContentType("application/json"), - HasData([]byte(`{"hello":"Francesco"}`)), - HasExtension("comexampleextension1", "value"), - HasExtension("comexampleothervalue", "5"), - ) +} - return kafkaSourceFeature("KafkaSourceBinaryEvent", +func KafkaSourceBinaryEvent() *feature.Feature { + f := feature.NewFeatureNamed("KafkaSourceBinaryEvent") + + kafkaSink, receiver := KafkaSourceBinaryEventFeatureSetup(f) + KafkaSourceFeatureAssert(f, kafkaSink, receiver, KafkaSourceBinaryEventCustomizeFunc()) + + return f +} + +func KafkaSourceBinaryEventFeatureSetup(f *feature.Feature) (string, string) { + return KafkaSourceFeatureSetup(f, kafkaSourceConfig{ authMech: PlainMech, }, kafkaSinkConfig{}, - senderOptions, - matcher, ) } func KafkaSourceBinaryEventWithExtensions() *feature.Feature { - senderOptions := []eventshub.EventsHubOption{ - eventshub.InputHeader("ce-specversion", cloudevents.VersionV1), - eventshub.InputHeader("ce-type", "com.github.pull.create"), - eventshub.InputHeader("ce-source", "github.com/cloudevents/spec/pull"), - eventshub.InputHeader("ce-subject", "123"), - eventshub.InputHeader("ce-id", "A234-1234-1234"), - eventshub.InputHeader("content-type", "application/json"), + customizeFunc := func() ([]eventshub.EventsHubOption, EventMatcher) { + id := feature.MakeRandomK8sName("id") + senderOptions := []eventshub.EventsHubOption{ + eventshub.InputHeader("ce-specversion", cloudevents.VersionV1), + eventshub.InputHeader("ce-type", "com.github.pull.create"), + eventshub.InputHeader("ce-source", "github.com/cloudevents/spec/pull"), + eventshub.InputHeader("ce-subject", "123"), + eventshub.InputHeader("ce-id", id), + eventshub.InputHeader("content-type", "application/json"), + } + matcher := AllOf( + HasSpecVersion(cloudevents.VersionV1), + HasType("com.github.pull.create"), + HasSource("github.com/cloudevents/spec/pull"), + HasSubject("123"), + HasId(id), + HasDataContentType("application/json"), + HasExtension("comexampleextension1", "value"), + HasExtension("comexampleothervalue", "5"), + ) + return senderOptions, matcher } - matcher := AllOf( - HasSpecVersion(cloudevents.VersionV1), - HasType("com.github.pull.create"), - HasSource("github.com/cloudevents/spec/pull"), - HasSubject("123"), - HasId("A234-1234-1234"), - HasDataContentType("application/json"), - HasExtension("comexampleextension1", "value"), - HasExtension("comexampleothervalue", "5"), - ) - return kafkaSourceFeature("KafkaSourceBinaryEvent", + f := feature.NewFeatureNamed("KafkaSourceBinaryEventWithExtensions") + + kafkaSink, receiver := KafkaSourceFeatureSetup(f, kafkaSourceConfig{ authMech: PlainMech, opts: []manifest.CfgFn{ @@ -438,74 +462,93 @@ func KafkaSourceBinaryEventWithExtensions() *feature.Feature { })}, }, kafkaSinkConfig{}, - senderOptions, - matcher, ) + KafkaSourceFeatureAssert(f, kafkaSink, receiver, customizeFunc) + + return f } -func KafkaSourceStructuredEvent() *feature.Feature { - eventTime, _ := cetypes.ParseTime("2018-04-05T17:31:00Z") - senderOptions := []eventshub.EventsHubOption{ - eventshub.InputHeader("content-type", "application/cloudevents+json"), - eventshub.InputBody(marshalJSON(map[string]interface{}{ - "specversion": cloudevents.VersionV1, - "type": "com.github.pull.create", - "source": "https://github.com/cloudevents/spec/pull", - "subject": "123", - "id": "A234-1234-1234", - "time": "2018-04-05T17:31:00Z", - "datacontenttype": "application/json", - "data": map[string]string{ - "hello": "Francesco", - }, - "comexampleextension1": "value", - "comexampleothervalue": 5, - })), +func KafkaSourceStructuredEventCustomizeFunc() CustomizeEventFunc { + return func() ([]eventshub.EventsHubOption, EventMatcher) { + id := feature.MakeRandomK8sName("id") + eventTime, _ := cetypes.ParseTime("2018-04-05T17:31:00Z") + senderOptions := []eventshub.EventsHubOption{ + eventshub.InputHeader("content-type", "application/cloudevents+json"), + eventshub.InputBody(marshalJSON(map[string]interface{}{ + "specversion": cloudevents.VersionV1, + "type": "com.github.pull.create", + "source": "https://github.com/cloudevents/spec/pull", + "subject": "123", + "id": id, + "time": "2018-04-05T17:31:00Z", + "datacontenttype": "application/json", + "data": map[string]string{ + "hello": "Francesco", + }, + "comexampleextension1": "value", + "comexampleothervalue": 5, + })), + } + matcher := AllOf( + HasSpecVersion(cloudevents.VersionV1), + HasType("com.github.pull.create"), + HasSource("https://github.com/cloudevents/spec/pull"), + HasSubject("123"), + HasId(id), + HasTime(eventTime), + HasDataContentType("application/json"), + HasData([]byte(`{"hello":"Francesco"}`)), + HasExtension("comexampleextension1", "value"), + HasExtension("comexampleothervalue", "5"), + ) + return senderOptions, matcher } - matcher := AllOf( - HasSpecVersion(cloudevents.VersionV1), - HasType("com.github.pull.create"), - HasSource("https://github.com/cloudevents/spec/pull"), - HasSubject("123"), - HasId("A234-1234-1234"), - HasTime(eventTime), - HasDataContentType("application/json"), - HasData([]byte(`{"hello":"Francesco"}`)), - HasExtension("comexampleextension1", "value"), - HasExtension("comexampleothervalue", "5"), - ) +} + +func KafkaSourceStructuredEvent() *feature.Feature { + f := feature.NewFeatureNamed("KafkaSourceStructuredEvent") + + kafkaSink, receiver := KafkaSourceBinaryEventFeatureSetup(f) + KafkaSourceFeatureAssert(f, kafkaSink, receiver, KafkaSourceStructuredEventCustomizeFunc()) - return kafkaSourceFeature("KafkaSourceStructuredEvent", + return f +} + +func KafkaSourceStructuredEventFeatureSetup(f *feature.Feature) (string, string) { + return KafkaSourceFeatureSetup(f, kafkaSourceConfig{ authMech: PlainMech, }, - kafkaSinkConfig{ - opts: []manifest.CfgFn{kafkasink.WithContentMode("structured")}, - }, - senderOptions, - matcher, + kafkaSinkConfig{}, ) } func KafkaSourceWithExtensions() *feature.Feature { - senderOptions := []eventshub.EventsHubOption{ - eventshub.InputHeader("content-type", "application/cloudevents+json"), - eventshub.InputBody(marshalJSON(map[string]interface{}{ - "specversion": cloudevents.VersionV1, - "type": "com.github.pull.create", - "source": "https://github.com/cloudevents/spec/pull", - "id": "A234-1234-1234", - })), + customizeFunc := func() ([]eventshub.EventsHubOption, EventMatcher) { + id := feature.MakeRandomK8sName("id") + senderOptions := []eventshub.EventsHubOption{ + eventshub.InputHeader("content-type", "application/cloudevents+json"), + eventshub.InputBody(marshalJSON(map[string]interface{}{ + "specversion": cloudevents.VersionV1, + "type": "com.github.pull.create", + "source": "https://github.com/cloudevents/spec/pull", + "id": id, + })), + } + matcher := AllOf( + HasSpecVersion(cloudevents.VersionV1), + HasId(id), + HasType("com.github.pull.create"), + HasSource("https://github.com/cloudevents/spec/pull"), + HasExtension("comexampleextension1", "value"), + HasExtension("comexampleothervalue", "5"), + ) + return senderOptions, matcher } - matcher := AllOf( - HasSpecVersion(cloudevents.VersionV1), - HasType("com.github.pull.create"), - HasSource("https://github.com/cloudevents/spec/pull"), - HasExtension("comexampleextension1", "value"), - HasExtension("comexampleothervalue", "5"), - ) - return kafkaSourceFeature("KafkaSourceWithExtensions", + f := feature.NewFeatureNamed("KafkaSourceWithExtensions") + + kafkaSink, receiver := KafkaSourceFeatureSetup(f, kafkaSourceConfig{ authMech: PlainMech, opts: []manifest.CfgFn{ @@ -517,19 +560,30 @@ func KafkaSourceWithExtensions() *feature.Feature { kafkaSinkConfig{ opts: []manifest.CfgFn{kafkasink.WithContentMode("structured")}, }, - senderOptions, - matcher, ) + KafkaSourceFeatureAssert(f, kafkaSink, receiver, customizeFunc) + + return f } func KafkaSourceTLS(kafkaSource, kafkaSink, topic string) *feature.Feature { - e := cetest.FullEvent() - senderOptions := []eventshub.EventsHubOption{ - eventshub.InputEvent(e), + customizeFunc := func() ([]eventshub.EventsHubOption, EventMatcher) { + id := feature.MakeRandomK8sName("id") + e := cetest.FullEvent() + e.SetID(id) + senderOptions := []eventshub.EventsHubOption{ + eventshub.InputEvent(e), + } + matcher := AllOf( + HasData(e.Data()), + HasId(id), + ) + return senderOptions, matcher } - matcher := HasData(e.Data()) - return kafkaSourceFeature("KafkaSourceTLS", + f := feature.NewFeatureNamed("KafkaSourceTLS") + + kafkaSink, receiver := KafkaSourceFeatureSetup(f, kafkaSourceConfig{ authMech: TLSMech, topic: topic, @@ -539,9 +593,10 @@ func KafkaSourceTLS(kafkaSource, kafkaSink, topic string) *feature.Feature { kafkaSinkConfig{ sinkName: kafkaSink, }, - senderOptions, - matcher, ) + KafkaSourceFeatureAssert(f, kafkaSink, receiver, customizeFunc) + + return f } func KafkaSourceTLSSink() *feature.Feature { @@ -652,20 +707,31 @@ func KafkaSourceTLSSinkTrustBundle() *feature.Feature { } func KafkaSourceSASL() *feature.Feature { - e := cetest.FullEvent() - senderOptions := []eventshub.EventsHubOption{ - eventshub.InputEvent(e), + customizeFunc := func() ([]eventshub.EventsHubOption, EventMatcher) { + id := feature.MakeRandomK8sName("id") + e := cetest.FullEvent() + e.SetID(id) + senderOptions := []eventshub.EventsHubOption{ + eventshub.InputEvent(e), + } + matcher := AllOf( + HasData(e.Data()), + HasId(id), + ) + return senderOptions, matcher } - matcher := HasData(e.Data()) - return kafkaSourceFeature("KafkaSourceSASL", + f := feature.NewFeatureNamed("KafkaSourceSASL") + + kafkaSink, receiver := KafkaSourceFeatureSetup(f, kafkaSourceConfig{ authMech: SASLMech, }, kafkaSinkConfig{}, - senderOptions, - matcher, ) + KafkaSourceFeatureAssert(f, kafkaSink, receiver, customizeFunc) + + return f } func marshalJSON(val interface{}) string { diff --git a/test/upgrade/postdowngrade.go b/test/upgrade/postdowngrade.go deleted file mode 100644 index 834ccae17a..0000000000 --- a/test/upgrade/postdowngrade.go +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2021 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package upgrade - -import ( - pkgupgrade "knative.dev/pkg/test/upgrade" - "knative.dev/reconciler-test/pkg/environment" - - eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - "knative.dev/eventing-kafka-broker/test/e2e_sink" -) - -// BrokerPostDowngradeTest tests channel basic channel operations after -// downgrade. -func BrokerPostDowngradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("BrokerPostDowngradeTest", func(c pkgupgrade.Context) { - runBrokerSmokeTest(c.T, kafka.BrokerClass) - }) -} - -// NamespacedBrokerPostDowngradeTest tests basic namespaced broker operations after -// downgrade. -func NamespacedBrokerPostDowngradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("NamespacedBrokerPostDowngradeTest", func(c pkgupgrade.Context) { - runBrokerSmokeTest(c.T, kafka.NamespacedBrokerClass) - }) -} - -// ChannelPostDowngradeTest tests channel basic channel operations after -// downgrade. -func ChannelPostDowngradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("ChannelPostDowngradeTest", - func(c pkgupgrade.Context) { - runChannelSmokeTest(c.T) - }) -} - -// SinkPostDowngradeTest tests sink basic operations after downgrade. -func SinkPostDowngradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("SinkPostDowngradeTest", func(c pkgupgrade.Context) { - e2e_sink.RunTestKafkaSink(c.T, eventing.ModeBinary, nil) - }) -} - -// SourcePostDowngradeTest tests source operations after downgrade. -func SourcePostDowngradeTest(glob environment.GlobalEnvironment) pkgupgrade.Operation { - return pkgupgrade.NewOperation("SourcePostDowngradeTest", - func(c pkgupgrade.Context) { - runSourceSmokeTest(glob, c.T) - }) -} diff --git a/test/upgrade/postupgrade.go b/test/upgrade/postupgrade.go index 1fefbd07be..1a2ec29a75 100644 --- a/test/upgrade/postupgrade.go +++ b/test/upgrade/postupgrade.go @@ -29,68 +29,14 @@ import ( testlib "knative.dev/eventing/test/lib" "knative.dev/pkg/system" pkgupgrade "knative.dev/pkg/test/upgrade" - "knative.dev/reconciler-test/pkg/environment" - - eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - "knative.dev/eventing-kafka-broker/test/e2e_sink" ) -// BrokerPostUpgradeTest tests channel operations after upgrade. -func BrokerPostUpgradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("BrokerPostUpgradeTest", func(c pkgupgrade.Context) { - c.T.Parallel() - c.T.Run("Verify post-install", func(t *testing.T) { - verifyPostInstall(t) - }) - c.T.Run("tests", func(t *testing.T) { - runBrokerSmokeTest(t, kafka.BrokerClass) - }) - }) -} - -// NamespacedBrokerPostUpgradeTest tests channel operations after upgrade. -func NamespacedBrokerPostUpgradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("NamespacedBrokerPostUpgradeTest", func(c pkgupgrade.Context) { - c.T.Parallel() - c.T.Run("Verify post-install", func(t *testing.T) { - verifyPostInstall(t) - }) - c.T.Run("tests", func(t *testing.T) { - runBrokerSmokeTest(t, kafka.NamespacedBrokerClass) - }) +func VerifyPostInstallTest() pkgupgrade.Operation { + return pkgupgrade.NewOperation("VerifyPostInstallTest", func(c pkgupgrade.Context) { + verifyPostInstall(c.T) }) } -// ChannelPostUpgradeTest tests channel operations after upgrade. -func ChannelPostUpgradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("ChannelPostUpgradeTest", - func(c pkgupgrade.Context) { - runChannelSmokeTest(c.T) - }) -} - -// SinkPostUpgradeTest tests sink basic operations post upgrade. -func SinkPostUpgradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("SinkPostUpgradeTest", func(c pkgupgrade.Context) { - c.T.Parallel() - c.T.Run("Verify post-install", func(t *testing.T) { - verifyPostInstall(t) - }) - c.T.Run("tests", func(t *testing.T) { - e2e_sink.RunTestKafkaSink(t, eventing.ModeBinary, nil) - }) - }) -} - -// SourcePostUpgradeTest tests source operations after upgrade. -func SourcePostUpgradeTest(glob environment.GlobalEnvironment) pkgupgrade.Operation { - return pkgupgrade.NewOperation("SourcePostUpgradeTest", - func(c pkgupgrade.Context) { - runSourceSmokeTest(glob, c.T) - }) -} - func verifyPostInstall(t *testing.T) { t.Parallel() diff --git a/test/upgrade/preupgrade.go b/test/upgrade/preupgrade.go deleted file mode 100644 index 598678bf35..0000000000 --- a/test/upgrade/preupgrade.go +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2021 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package upgrade - -import ( - eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - "knative.dev/eventing-kafka-broker/test/e2e_sink" - pkgupgrade "knative.dev/pkg/test/upgrade" - "knative.dev/reconciler-test/pkg/environment" -) - -// BrokerPreUpgradeTest tests broker basic operations before upgrade. -func BrokerPreUpgradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("BrokerPreUpgradeTest", func(c pkgupgrade.Context) { - runBrokerSmokeTest(c.T, kafka.BrokerClass) - }) -} - -// NamespacedBrokerPreUpgradeTest tests broker basic operations before upgrade. -func NamespacedBrokerPreUpgradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("NamespacedBrokerPreUpgradeTest", func(c pkgupgrade.Context) { - runBrokerSmokeTest(c.T, kafka.NamespacedBrokerClass) - }) -} - -// ChannelPreUpgradeTest tests channel basic operations before upgrade. -func ChannelPreUpgradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("ChannelPreUpgradeTest", - func(c pkgupgrade.Context) { - runChannelSmokeTest(c.T) - }) -} - -// SinkPreUpgradeTest tests sink basic operations pre upgrade. -func SinkPreUpgradeTest() pkgupgrade.Operation { - return pkgupgrade.NewOperation("SinkPreUpgradeTest", func(c pkgupgrade.Context) { - e2e_sink.RunTestKafkaSink(c.T, eventing.ModeBinary, nil) - }) -} - -// SourcePreUpgradeTest tests source operations before upgrade. -func SourcePreUpgradeTest(glob environment.GlobalEnvironment) pkgupgrade.Operation { - return pkgupgrade.NewOperation("SourcePreUpgradeTest", - func(c pkgupgrade.Context) { - runSourceSmokeTest(glob, c.T) - }) -} diff --git a/test/upgrade/smoke.go b/test/upgrade/smoke.go index 5521d13cf2..42d459ca61 100644 --- a/test/upgrade/smoke.go +++ b/test/upgrade/smoke.go @@ -18,77 +18,149 @@ package upgrade import ( "context" - "testing" + "sync" - cloudevents "github.com/cloudevents/sdk-go/v2" - pkgtesting "knative.dev/eventing-kafka-broker/test/pkg" - testbroker "knative.dev/eventing-kafka-broker/test/pkg/broker" + "github.com/google/uuid" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/test/rekt/features" - eventinghelpers "knative.dev/eventing/test/e2e/helpers" - testlib "knative.dev/eventing/test/lib" + brokerfeatures "knative.dev/eventing/test/rekt/features/broker" + channelfeatures "knative.dev/eventing/test/rekt/features/channel" + "knative.dev/eventing/test/rekt/features/knconf" + brokerresources "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/channel_impl" + "knative.dev/eventing/test/rekt/resources/subscription" + eventingupgrade "knative.dev/eventing/test/upgrade" + duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/system" "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" + "knative.dev/reconciler-test/pkg/manifest" ) var ( - channelTestRunner testlib.ComponentsTestRunner + brokerConfigMux = &sync.Mutex{} + channelConfigMux = &sync.Mutex{} + opts = []environment.EnvOpts{ + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + } ) -func runBrokerSmokeTest(t *testing.T, class string) { - pkgtesting.RunMultiple(t, func(t *testing.T) { - eventinghelpers.EventTransformationForTriggerTestHelper( - context.Background(), - t, - /* broker version */ "v1", - /* trigger version */ "v1", - testbroker.CreatorForClass(class), - ) - }) +func KafkaChannelFeature(glob environment.GlobalEnvironment) *eventingupgrade.DurableFeature { + // Prevent race conditions on channel_impl.EnvCfg.ChannelGK when running tests in parallel. + channelConfigMux.Lock() + defer channelConfigMux.Unlock() + channel_impl.EnvCfg.ChannelGK = "KafkaChannel.messaging.knative.dev" + channel_impl.EnvCfg.ChannelV = "v1beta1" + + createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn { + return subscription.WithSubscriber(ref, uri, "") + } + + setupF := feature.NewFeature() + sink, ch := channelfeatures.ChannelChainSetup(setupF, 1, createSubscriberFn) + + verifyF := func() *feature.Feature { + f := feature.NewFeatureNamed(setupF.Name) + channelfeatures.ChannelChainAssert(f, sink, ch) + return f + } + + return &eventingupgrade.DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts} } -func runChannelSmokeTest(t *testing.T) { - cases := smokeTestCases() - ctx := context.Background() - for i := range cases { - tt := cases[i] - t.Run(tt.name, func(t *testing.T) { - eventinghelpers.SingleEventForChannelTestHelper( - ctx, t, tt.encoding, tt.version, - "", channelTestRunner, - ) - }) +func KafkaSinkSourceBinaryEventFeature(glob environment.GlobalEnvironment, +) *eventingupgrade.DurableFeature { + setupF := feature.NewFeature() + kafkaSink, receiver := features.KafkaSourceBinaryEventFeatureSetup(setupF) + + verifyF := func() *feature.Feature { + f := feature.NewFeatureNamed(setupF.Name) + features.KafkaSourceFeatureAssert(f, kafkaSink, receiver, features.KafkaSourceBinaryEventCustomizeFunc()) + return f } + + return &eventingupgrade.DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts} } -func runSourceSmokeTest(glob environment.GlobalEnvironment, t *testing.T) { - ctx, env := glob.Environment( - knative.WithKnativeNamespace(system.Namespace()), - knative.WithLoggingConfig, - knative.WithTracingConfig, - k8s.WithEventListener, - environment.Managed(t), - ) +func KafkaSinkSourceStructuredEventFeature(glob environment.GlobalEnvironment, +) *eventingupgrade.DurableFeature { + setupF := feature.NewFeature() + kafkaSink, receiver := features.KafkaSourceStructuredEventFeatureSetup(setupF) + verifyF := func() *feature.Feature { + f := feature.NewFeatureNamed(setupF.Name) + features.KafkaSourceFeatureAssert(f, kafkaSink, receiver, features.KafkaSourceStructuredEventCustomizeFunc()) + return f + } - env.Test(ctx, t, features.KafkaSourceStructuredEvent()) - env.Test(ctx, t, features.KafkaSourceBinaryEvent()) + return &eventingupgrade.DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts} } -type smokeTestCase struct { - name string - encoding cloudevents.Encoding - version eventinghelpers.SubscriptionVersion +func BrokerEventTransformationForTrigger(glob environment.GlobalEnvironment, +) *eventingupgrade.DurableFeature { + // Prevent race conditions on EnvCfg.BrokerClass when running tests in parallel. + brokerConfigMux.Lock() + defer brokerConfigMux.Unlock() + brokerresources.EnvCfg.BrokerClass = kafka.BrokerClass + + setupF := feature.NewFeature() + cfg := brokerfeatures.BrokerEventTransformationForTriggerSetup(setupF) + + verifyF := func() *feature.Feature { + f := feature.NewFeatureNamed(setupF.Name) + brokerfeatures.BrokerEventTransformationForTriggerAssert(f, cfg) + return f + } + + return &eventingupgrade.DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts} } -func smokeTestCases() []smokeTestCase { - return []smokeTestCase{{ - name: "BinaryV1", - encoding: cloudevents.EncodingBinary, - version: eventinghelpers.SubscriptionV1, - }, { - name: "StructuredV1", - encoding: cloudevents.EncodingStructured, - version: eventinghelpers.SubscriptionV1, - }} +func NamespacedBrokerEventTransformationForTrigger(glob environment.GlobalEnvironment, +) *eventingupgrade.DurableFeature { + // Prevent race conditions on EnvCfg.BrokerClass when running tests in parallel. + brokerConfigMux.Lock() + defer brokerConfigMux.Unlock() + brokerresources.EnvCfg.BrokerClass = kafka.NamespacedBrokerClass + + broker := "broker" + setupF := features.SetupNamespacedBroker(broker) + // Override name to match the enclosing function name. + setupF.Name = feature.NewFeature().Name + + verifyF := func() *feature.Feature { + f := feature.NewFeatureNamed(setupF.Name) + brokerAcceptsBinaryContentModeAssert(f, broker) + return f + } + + return &eventingupgrade.DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts} +} + +func brokerAcceptsBinaryContentModeAssert(f *feature.Feature, brokerName string) { + f.Assert("broker accepts binary content mode", func(ctx context.Context, t feature.T) { + source := feature.MakeRandomK8sName("source") + eventshub.Install(source, + eventshub.StartSenderToResource(brokerresources.GVR(), brokerName), + eventshub.InputHeader("ce-specversion", "1.0"), + eventshub.InputHeader("ce-type", "sometype"), + eventshub.InputHeader("ce-source", "200.request.sender.test.knative.dev"), + eventshub.InputHeader("ce-id", uuid.New().String()), + eventshub.InputHeader("content-type", "application/json"), + eventshub.InputBody("{}"), + eventshub.InputMethod("POST"), + )(ctx, t) + + store := eventshub.StoreFromContext(ctx, source) + events := knconf.Correlate(store.AssertAtLeast(ctx, t, 2, knconf.SentEventMatcher(""))) + for _, e := range events { + if e.Response.StatusCode < 200 || e.Response.StatusCode > 299 { + t.Errorf("Expected statuscode 2XX for sequence %d got %d", e.Response.Sequence, e.Response.StatusCode) + } + } + }) } diff --git a/test/upgrade/suite.go b/test/upgrade/suite.go index 335ef69b62..61271e5924 100644 --- a/test/upgrade/suite.go +++ b/test/upgrade/suite.go @@ -17,6 +17,9 @@ package upgrade import ( + "slices" + + "knative.dev/eventing/test/upgrade" pkgupgrade "knative.dev/pkg/test/upgrade" "knative.dev/reconciler-test/pkg/environment" @@ -25,30 +28,43 @@ import ( // Suite defines the whole upgrade test suite for Eventing Kafka. func Suite(glob environment.GlobalEnvironment) pkgupgrade.Suite { + g := upgrade.FeatureGroupWithUpgradeTests{ + // Features that will run the same test post-upgrade and post-downgrade. + upgrade.NewFeatureSmoke(KafkaSinkSourceBinaryEventFeature(glob)), + upgrade.NewFeatureSmoke(KafkaSinkSourceStructuredEventFeature(glob)), + upgrade.NewFeatureSmoke(BrokerEventTransformationForTrigger(glob)), + upgrade.NewFeatureSmoke(NamespacedBrokerEventTransformationForTrigger(glob)), + upgrade.NewFeatureSmoke(KafkaChannelFeature(glob)), + // Features that will be created pre-upgrade and verified/removed post-upgrade. + upgrade.NewFeatureOnlyUpgrade(KafkaSinkSourceBinaryEventFeature(glob)), + upgrade.NewFeatureOnlyUpgrade(KafkaSinkSourceStructuredEventFeature(glob)), + upgrade.NewFeatureOnlyUpgrade(BrokerEventTransformationForTrigger(glob)), + upgrade.NewFeatureOnlyUpgrade(NamespacedBrokerEventTransformationForTrigger(glob)), + upgrade.NewFeatureOnlyUpgrade(KafkaChannelFeature(glob)), + // Features that will be created pre-upgrade, verified post-upgrade, verified and removed post-downgrade. + upgrade.NewFeatureUpgradeDowngrade(KafkaSinkSourceBinaryEventFeature(glob)), + upgrade.NewFeatureUpgradeDowngrade(KafkaSinkSourceStructuredEventFeature(glob)), + upgrade.NewFeatureUpgradeDowngrade(BrokerEventTransformationForTrigger(glob)), + upgrade.NewFeatureUpgradeDowngrade(NamespacedBrokerEventTransformationForTrigger(glob)), + upgrade.NewFeatureUpgradeDowngrade(KafkaChannelFeature(glob)), + // Features that will be created post-upgrade, verified and removed post-downgrade. + upgrade.NewFeatureOnlyDowngrade(KafkaSinkSourceBinaryEventFeature(glob)), + upgrade.NewFeatureOnlyDowngrade(KafkaSinkSourceStructuredEventFeature(glob)), + upgrade.NewFeatureOnlyDowngrade(BrokerEventTransformationForTrigger(glob)), + upgrade.NewFeatureOnlyDowngrade(NamespacedBrokerEventTransformationForTrigger(glob)), + upgrade.NewFeatureOnlyDowngrade(KafkaChannelFeature(glob)), + } return pkgupgrade.Suite{ Tests: pkgupgrade.Tests{ - PreUpgrade: []pkgupgrade.Operation{ - BrokerPreUpgradeTest(), - NamespacedBrokerPreUpgradeTest(), - ChannelPreUpgradeTest(), - SinkPreUpgradeTest(), - SourcePreUpgradeTest(glob), - }, - PostUpgrade: []pkgupgrade.Operation{ - BrokerPostUpgradeTest(), - NamespacedBrokerPostUpgradeTest(), - ChannelPostUpgradeTest(), - SinkPostUpgradeTest(), - SourcePostUpgradeTest(glob), - }, - PostDowngrade: []pkgupgrade.Operation{ - BrokerPostDowngradeTest(), - NamespacedBrokerPostDowngradeTest(), - ChannelPostDowngradeTest(), - SinkPostDowngradeTest(), - SourcePostDowngradeTest(glob), - }, - Continual: ContinualTests(), + PreUpgrade: g.PreUpgradeTests(), + PostUpgrade: slices.Concat( + []pkgupgrade.Operation{ + VerifyPostInstallTest(), + }, + g.PostUpgradeTests(), + ), + PostDowngrade: g.PostDowngradeTests(), + Continual: ContinualTests(), }, Installations: pkgupgrade.Installations{ Base: []pkgupgrade.Operation{ diff --git a/vendor/knative.dev/eventing/test/upgrade/upgrade.go b/vendor/knative.dev/eventing/test/upgrade/upgrade.go new file mode 100644 index 0000000000..e56dde8d5a --- /dev/null +++ b/vendor/knative.dev/eventing/test/upgrade/upgrade.go @@ -0,0 +1,307 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "context" + "log" + "os" + "sync" + "testing" + + "knative.dev/eventing/test/rekt/features/channel" + "knative.dev/eventing/test/rekt/resources/channel_impl" + "knative.dev/eventing/test/rekt/resources/subscription" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/system" + pkgupgrade "knative.dev/pkg/test/upgrade" + "knative.dev/pkg/test/zipkin" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" + "knative.dev/reconciler-test/pkg/manifest" +) + +var channelConfigMux = &sync.Mutex{} + +// RunMainTest expects flags to be already initialized. +// This function needs to be exposed, so that test cases in other repositories can call the upgrade +// main tests in eventing. +func RunMainTest(m *testing.M) { + os.Exit(func() int { + // Any tests may SetupZipkinTracing, it will only actually be done once. This should be the ONLY + // place that cleans it up. If an individual test calls this instead, then it will break other + // tests that need the tracing in place. + defer zipkin.CleanupZipkinTracingSetup(log.Printf) + return m.Run() + }()) +} + +// DurableFeature holds the setup and verify phase of a feature. The "setup" phase should set up +// the feature. The "verify" phase should only verify its function. This function should be +// idempotent. Calling this function multiple times should still properly verify the feature +// (e.g. one call after upgrade, one call after downgrade). +type DurableFeature struct { + SetupF *feature.Feature + // EnvOpts should never include environment.Managed or environment.Cleanup as these functions + // break the functionality. + EnvOpts []environment.EnvOpts + setupEnv environment.Environment + setupCtx context.Context + VerifyF func() *feature.Feature + Global environment.GlobalEnvironment +} + +func (fe *DurableFeature) Setup(label string) pkgupgrade.Operation { + return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) { + c.T.Parallel() + ctx, env := fe.Global.Environment( + fe.EnvOpts..., + // Not managed - namespace preserved. + ) + fe.setupEnv = env + fe.setupCtx = ctx + env.Test(ctx, c.T, fe.SetupF) + }) +} + +func (fe *DurableFeature) Verify(label string) pkgupgrade.Operation { + return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) { + c.T.Parallel() + fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF()) + }) +} + +func (fe *DurableFeature) VerifyAndTeardown(label string) pkgupgrade.Operation { + return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) { + c.T.Parallel() + fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF()) + // Ensures teardown of resources/namespace. + fe.setupEnv.Finish() + }) +} + +func (fe *DurableFeature) SetupVerifyAndTeardown(label string) pkgupgrade.Operation { + return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) { + c.T.Parallel() + ctx, env := fe.Global.Environment( + append(fe.EnvOpts, environment.Managed(c.T))..., + ) + env.Test(ctx, c.T, fe.SetupF) + env.Test(ctx, c.T, fe.VerifyF()) + }) +} + +type FeatureWithUpgradeTests interface { + PreUpgradeTests() []pkgupgrade.Operation + PostUpgradeTests() []pkgupgrade.Operation + PostDowngradeTests() []pkgupgrade.Operation +} + +// NewFeatureOnlyUpgrade decorates a feature with these actions: +// Pre-upgrade: Setup, Verify +// Post-upgrade: Verify, Teardown +// Post-downgrade: no-op. +func NewFeatureOnlyUpgrade(f *DurableFeature) FeatureWithUpgradeTests { + return featureOnlyUpgrade{ + label: "OnlyUpgrade", + feature: f, + } +} + +type featureOnlyUpgrade struct { + label string + feature *DurableFeature +} + +func (f featureOnlyUpgrade) PreUpgradeTests() []pkgupgrade.Operation { + return []pkgupgrade.Operation{ + f.feature.Setup(f.label), + } +} + +func (f featureOnlyUpgrade) PostUpgradeTests() []pkgupgrade.Operation { + return []pkgupgrade.Operation{ + f.feature.VerifyAndTeardown(f.label), + } +} + +func (f featureOnlyUpgrade) PostDowngradeTests() []pkgupgrade.Operation { + // No-op. Teardown was done post-upgrade. + return nil +} + +// NewFeatureUpgradeDowngrade decorates a feature with these actions: +// Pre-upgrade: Setup, Verify. +// Post-upgrade: Verify. +// Post-downgrade: Verify, Teardown. +func NewFeatureUpgradeDowngrade(f *DurableFeature) FeatureWithUpgradeTests { + return featureUpgradeDowngrade{ + label: "BothUpgradeDowngrade", + feature: f, + } +} + +type featureUpgradeDowngrade struct { + label string + feature *DurableFeature +} + +func (f featureUpgradeDowngrade) PreUpgradeTests() []pkgupgrade.Operation { + return []pkgupgrade.Operation{ + f.feature.Setup(f.label), + } +} + +func (f featureUpgradeDowngrade) PostUpgradeTests() []pkgupgrade.Operation { + // PostUpgrade only asserts existing resources. Teardown will be done post-downgrade. + return []pkgupgrade.Operation{ + f.feature.Verify(f.label), + } +} + +func (f featureUpgradeDowngrade) PostDowngradeTests() []pkgupgrade.Operation { + return []pkgupgrade.Operation{ + f.feature.VerifyAndTeardown(f.label), + } +} + +// NewFeatureOnlyDowngrade decorates a feature with these actions: +// Pre-upgrade: no-op. +// Post-upgrade: Setup, Verify. +// Post-downgrade: Verify, Teardown. +func NewFeatureOnlyDowngrade(f *DurableFeature) FeatureWithUpgradeTests { + return featureOnlyDowngrade{ + label: "OnlyDowngrade", + feature: f, + } +} + +type featureOnlyDowngrade struct { + label string + feature *DurableFeature +} + +func (f featureOnlyDowngrade) PreUpgradeTests() []pkgupgrade.Operation { + // No-op. Resources will be created post-upgrade. + return nil +} + +func (f featureOnlyDowngrade) PostUpgradeTests() []pkgupgrade.Operation { + // Resources created post-upgrade. + return []pkgupgrade.Operation{ + f.feature.Setup(f.label), + } +} + +func (f featureOnlyDowngrade) PostDowngradeTests() []pkgupgrade.Operation { + // Assert and Teardown is done post-downgrade. + return []pkgupgrade.Operation{ + f.feature.VerifyAndTeardown(f.label), + } +} + +// NewFeatureSmoke decorates a feature with these actions: +// Pre-upgrade: no-op. +// Post-upgrade: Setup, Verify, Teardown. +// Post-downgrade: Setup, Verify, Teardown. +func NewFeatureSmoke(f *DurableFeature) FeatureWithUpgradeTests { + return featureSmoke{ + label: "Smoke", + feature: f, + } +} + +type featureSmoke struct { + label string + feature *DurableFeature +} + +func (f featureSmoke) PreUpgradeTests() []pkgupgrade.Operation { + // No-op. No need to smoke test before upgrade. + return nil +} + +func (f featureSmoke) PostUpgradeTests() []pkgupgrade.Operation { + return []pkgupgrade.Operation{ + f.feature.SetupVerifyAndTeardown(f.label), + } +} + +func (f featureSmoke) PostDowngradeTests() []pkgupgrade.Operation { + return []pkgupgrade.Operation{ + f.feature.SetupVerifyAndTeardown(f.label), + } +} + +// FeatureGroupWithUpgradeTests aggregates tests across a group of features. +type FeatureGroupWithUpgradeTests []FeatureWithUpgradeTests + +func (fg FeatureGroupWithUpgradeTests) PreUpgradeTests() []pkgupgrade.Operation { + ops := make([]pkgupgrade.Operation, 0, len(fg)) + for _, ft := range fg { + ops = append(ops, ft.PreUpgradeTests()...) + } + return ops +} + +func (fg FeatureGroupWithUpgradeTests) PostUpgradeTests() []pkgupgrade.Operation { + ops := make([]pkgupgrade.Operation, 0, len(fg)) + for _, ft := range fg { + ops = append(ops, ft.PostUpgradeTests()...) + } + return ops +} + +func (fg FeatureGroupWithUpgradeTests) PostDowngradeTests() []pkgupgrade.Operation { + ops := make([]pkgupgrade.Operation, 0, len(fg)) + for _, ft := range fg { + ops = append(ops, ft.PostDowngradeTests()...) + } + return ops +} + +func InMemoryChannelFeature(glob environment.GlobalEnvironment) *DurableFeature { + // Prevent race conditions on channel_impl.EnvCfg.ChannelGK when running tests in parallel. + channelConfigMux.Lock() + defer channelConfigMux.Unlock() + channel_impl.EnvCfg.ChannelGK = "InMemoryChannel.messaging.knative.dev" + channel_impl.EnvCfg.ChannelV = "v1" + + createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn { + return subscription.WithSubscriber(ref, uri, "") + } + + setupF := feature.NewFeature() + sink, ch := channel.ChannelChainSetup(setupF, 1, createSubscriberFn) + + verifyF := func() *feature.Feature { + f := feature.NewFeature() + channel.ChannelChainAssert(f, sink, ch) + return f + } + + opts := []environment.EnvOpts{ + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + } + + return &DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts} +}