diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index ba2ed93f8831..2effce78592e 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -68,6 +68,7 @@ jobs: with: name: collector-binary path: bin/ + - run: chmod +x bin/* - name: Build Docker Image run: | make docker-otelcontribcol diff --git a/internal/k8stest/go.mod b/internal/k8stest/go.mod index b989b2948e75..17879954fcc0 100644 --- a/internal/k8stest/go.mod +++ b/internal/k8stest/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/docker/docker v24.0.7+incompatible github.com/stretchr/testify v1.8.4 + k8s.io/api v0.28.4 k8s.io/apimachinery v0.28.4 k8s.io/client-go v0.28.4 ) @@ -20,7 +21,6 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/kr/text v0.2.0 // indirect github.com/moby/term v0.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -39,7 +39,6 @@ require ( golang.org/x/tools v0.15.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.31.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/internal/k8stest/go.sum b/internal/k8stest/go.sum index 046c73d6b153..3b2a5b65ba42 100644 --- a/internal/k8stest/go.sum +++ b/internal/k8stest/go.sum @@ -1,7 +1,6 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -38,12 +37,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= @@ -125,7 +120,6 @@ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -136,6 +130,7 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o= gotest.tools/v3 v3.4.0/go.mod h1:CtbdzLSsqVhDgMtKsx03ird5YTGB3ar27v0u/yKBW5g= k8s.io/api v0.28.4 h1:8ZBrLjwosLl/NYgv1P7EQLqoO8MGQApnbgH8tu3BMzY= +k8s.io/api v0.28.4/go.mod h1:axWTGrY88s/5YE+JSt4uUi6NMM+gur1en2REMR7IRj0= k8s.io/apimachinery v0.28.4 h1:zOSJe1mc+GxuMnFzD4Z/U1wst50X28ZNsn5bhgIIao8= k8s.io/apimachinery v0.28.4/go.mod h1:wI37ncBvfAoswfq626yPTe6Bz1c22L7uaJ8dho83mgg= k8s.io/client-go v0.28.4 h1:Np5ocjlZcTrkyRJ3+T3PkXDpe4UpatQxj85+xjaD2wY= diff --git a/internal/k8stest/k8s_collector.go b/internal/k8stest/k8s_collector.go index ebfe5035b28e..0530e9caeac3 100644 --- a/internal/k8stest/k8s_collector.go +++ b/internal/k8stest/k8s_collector.go @@ -13,8 +13,10 @@ import ( "time" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" ) @@ -36,7 +38,8 @@ func CreateCollectorObjects(t *testing.T, client *dynamic.DynamicClient, testID })) obj, err := CreateObject(client, manifest.Bytes()) require.NoErrorf(t, err, "failed to create collector object from manifest %s", manifestFile.Name()) - if obj.GetKind() == "Deployment" { + objKind := obj.GetKind() + if objKind == "Deployment" || objKind == "DaemonSet" { podNamespace = obj.GetNamespace() selector := obj.Object["spec"].(map[string]any)["selector"] podLabels = selector.(map[string]any)["matchLabels"].(map[string]any) @@ -53,15 +56,49 @@ func WaitForCollectorToStart(t *testing.T, client *dynamic.DynamicClient, podNam podGVR := schema.GroupVersionResource{Version: "v1", Resource: "pods"} listOptions := metav1.ListOptions{LabelSelector: SelectorFromMap(podLabels).String()} podTimeoutMinutes := 3 - var podPhase string - require.Eventually(t, func() bool { + t.Logf("waiting for collector pods to be ready") + require.Eventuallyf(t, func() bool { list, err := client.Resource(podGVR).Namespace(podNamespace).List(context.Background(), listOptions) require.NoError(t, err, "failed to list collector pods") - if len(list.Items) == 0 { + podsNotReady := len(list.Items) + if podsNotReady == 0 { + t.Log("did not find collector pods") return false } - podPhase = list.Items[0].Object["status"].(map[string]any)["phase"].(string) - return podPhase == "Running" - }, time.Duration(podTimeoutMinutes)*time.Minute, 50*time.Millisecond, - "collector pod haven't started within %d minutes, latest pod phase is %s", podTimeoutMinutes, podPhase) + + var pods v1.PodList + err = runtime.DefaultUnstructuredConverter.FromUnstructured(list.UnstructuredContent(), &pods) + require.NoError(t, err, "failed to convert unstructured to podList") + + for _, pod := range pods.Items { + podReady := false + if pod.Status.Phase != v1.PodRunning { + t.Logf("pod %v is not running, current phase: %v", pod.Name, pod.Status.Phase) + continue + } + for _, cond := range pod.Status.Conditions { + if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { + podsNotReady-- + podReady = true + } + } + // Add some debug logs for crashing pods + if !podReady { + for _, cs := range pod.Status.ContainerStatuses { + restartCount := cs.RestartCount + if restartCount > 0 && cs.LastTerminationState.Terminated != nil { + t.Logf("restart count = %d for container %s in pod %s, last terminated reason: %s", restartCount, cs.Name, pod.Name, cs.LastTerminationState.Terminated.Reason) + t.Logf("termination message: %s", cs.LastTerminationState.Terminated.Message) + } + } + } + } + if podsNotReady == 0 { + t.Logf("collector pods are ready") + return true + } + return false + + }, time.Duration(podTimeoutMinutes)*time.Minute, 2*time.Second, + "collector pods were not ready within %d minutes", podTimeoutMinutes) } diff --git a/internal/k8stest/k8s_objects.go b/internal/k8stest/k8s_objects.go index e3e4455e9ba8..bbbff8d6e56b 100644 --- a/internal/k8stest/k8s_objects.go +++ b/internal/k8stest/k8s_objects.go @@ -36,5 +36,12 @@ func DeleteObject(client *dynamic.DynamicClient, obj *unstructured.Unstructured) Version: gvk.Version, Resource: strings.ToLower(gvk.Kind + "s"), } - return client.Resource(gvr).Namespace(obj.GetNamespace()).Delete(context.Background(), obj.GetName(), metav1.DeleteOptions{}) + + options := metav1.DeleteOptions{} + policy := metav1.DeletePropagationBackground + if gvk.Kind == "Job" { + options.PropagationPolicy = &policy + } + + return client.Resource(gvr).Namespace(obj.GetNamespace()).Delete(context.Background(), obj.GetName(), options) } diff --git a/processor/k8sattributesprocessor/e2e_test.go b/processor/k8sattributesprocessor/e2e_test.go index fbc41f47a310..2f4a8cc4901b 100644 --- a/processor/k8sattributesprocessor/e2e_test.go +++ b/processor/k8sattributesprocessor/e2e_test.go @@ -57,13 +57,18 @@ func newExpectedValue(mode int, value string) *expectedValue { // make docker-otelcontribcol // KUBECONFIG=/tmp/kube-config-otelcol-e2e-testing kind load docker-image otelcontribcol:latest func TestE2E(t *testing.T) { - t.Skip("skipping flaky test see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29892") kubeConfig, err := clientcmd.BuildConfigFromFlags("", testKubeConfig) require.NoError(t, err) dynamicClient, err := dynamic.NewForConfig(kubeConfig) require.NoError(t, err) + metricsConsumer := new(consumertest.MetricsSink) + tracesConsumer := new(consumertest.TracesSink) + logsConsumer := new(consumertest.LogsSink) + shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer) + defer shutdownSinks() + testID := uuid.NewString()[:8] collectorObjs := k8stest.CreateCollectorObjects(t, dynamicClient, testID) telemetryGenObjs, telemetryGenObjInfos := k8stest.CreateTelemetryGenObjects(t, dynamicClient, testID) @@ -77,9 +82,6 @@ func TestE2E(t *testing.T) { k8stest.WaitForTelemetryGenToStart(t, dynamicClient, info.Namespace, info.PodLabelSelectors, info.Workload, info.DataType) } - metricsConsumer := new(consumertest.MetricsSink) - tracesConsumer := new(consumertest.TracesSink) - logsConsumer := new(consumertest.LogsSink) wantEntries := 128 // Minimal number of metrics/traces/logs to wait for. waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer) @@ -486,7 +488,7 @@ func resourceHasAttributes(resource pcommon.Resource, kvs map[string]*expectedVa return err } -func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink, tc *consumertest.TracesSink, lc *consumertest.LogsSink) { +func startUpSinks(t *testing.T, mc *consumertest.MetricsSink, tc *consumertest.TracesSink, lc *consumertest.LogsSink) func() { f := otlpreceiver.NewFactory() cfg := f.CreateDefaultConfig().(*otlpreceiver.Config) @@ -497,10 +499,12 @@ func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink, tc rcvr, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, lc) require.NoError(t, err, "failed creating logs receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) - defer func() { + return func() { assert.NoError(t, rcvr.Shutdown(context.Background())) - }() + } +} +func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink, tc *consumertest.TracesSink, lc *consumertest.LogsSink) { timeoutMinutes := 3 require.Eventuallyf(t, func() bool { return len(mc.AllMetrics()) > entriesNum && len(tc.AllTraces()) > entriesNum && len(lc.AllLogs()) > entriesNum diff --git a/processor/k8sattributesprocessor/testdata/e2e/collector/deployment.yaml b/processor/k8sattributesprocessor/testdata/e2e/collector/deployment.yaml index f7f21c5d4856..841c472b04f4 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/collector/deployment.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/collector/deployment.yaml @@ -37,10 +37,12 @@ spec: httpGet: path: / port: 13133 + initialDelaySeconds: 3 readinessProbe: httpGet: path: / port: 13133 + initialDelaySeconds: 3 resources: limits: cpu: 128m diff --git a/receiver/k8sclusterreceiver/e2e_test.go b/receiver/k8sclusterreceiver/e2e_test.go index a9723932d737..d9216b494e23 100644 --- a/receiver/k8sclusterreceiver/e2e_test.go +++ b/receiver/k8sclusterreceiver/e2e_test.go @@ -39,7 +39,6 @@ const testKubeConfig = "/tmp/kube-config-otelcol-e2e-testing" // make docker-otelcontribcol // KUBECONFIG=/tmp/kube-config-otelcol-e2e-testing kind load docker-image otelcontribcol:latest func TestE2E(t *testing.T) { - t.Skip("skipping flaky test see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29892") var expected pmetric.Metrics expectedFile := filepath.Join("testdata", "e2e", "expected.yaml") @@ -50,6 +49,10 @@ func TestE2E(t *testing.T) { dynamicClient, err := dynamic.NewForConfig(kubeConfig) require.NoError(t, err) + metricsConsumer := new(consumertest.MetricsSink) + shutdownSink := startUpSink(t, metricsConsumer) + defer shutdownSink() + testID := uuid.NewString()[:8] collectorObjs := k8stest.CreateCollectorObjects(t, dynamicClient, testID) @@ -59,7 +62,6 @@ func TestE2E(t *testing.T) { } }() - metricsConsumer := new(consumertest.MetricsSink) wantEntries := 10 // Minimal number of metrics to wait for. waitForData(t, wantEntries, metricsConsumer) @@ -110,17 +112,19 @@ func TestE2E(t *testing.T) { ) } -func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink) { +func startUpSink(t *testing.T, mc *consumertest.MetricsSink) func() { f := otlpreceiver.NewFactory() cfg := f.CreateDefaultConfig().(*otlpreceiver.Config) rcvr, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, mc) require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, err, "failed creating metrics receiver") - defer func() { + return func() { assert.NoError(t, rcvr.Shutdown(context.Background())) - }() + } +} +func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink) { timeoutMinutes := 3 require.Eventuallyf(t, func() bool { return len(mc.AllMetrics()) > entriesNum diff --git a/receiver/k8sclusterreceiver/testdata/e2e/collector/deployment.yaml b/receiver/k8sclusterreceiver/testdata/e2e/collector/deployment.yaml index f7f21c5d4856..841c472b04f4 100644 --- a/receiver/k8sclusterreceiver/testdata/e2e/collector/deployment.yaml +++ b/receiver/k8sclusterreceiver/testdata/e2e/collector/deployment.yaml @@ -37,10 +37,12 @@ spec: httpGet: path: / port: 13133 + initialDelaySeconds: 3 readinessProbe: httpGet: path: / port: 13133 + initialDelaySeconds: 3 resources: limits: cpu: 128m diff --git a/receiver/k8sobjectsreceiver/e2e_test.go b/receiver/k8sobjectsreceiver/e2e_test.go index 0e21adbcf0a6..c4ef205aee89 100644 --- a/receiver/k8sobjectsreceiver/e2e_test.go +++ b/receiver/k8sobjectsreceiver/e2e_test.go @@ -43,7 +43,6 @@ const ( ) func TestE2E(t *testing.T) { - t.Skip("skipping flaky test see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29892") kubeConfig, err := clientcmd.BuildConfigFromFlags("", testKubeConfig) require.NoError(t, err) diff --git a/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events.yaml b/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events.yaml index 1220124e949c..642bda8b3482 100644 --- a/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events.yaml +++ b/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events.yaml @@ -16,7 +16,7 @@ resourceLogs: values: - key: note value: - stringValue: test event + stringValue: test event of type warning - key: eventTime value: stringValue: "2023-01-01T00:00:00.000000Z" @@ -28,7 +28,7 @@ resourceLogs: stringValue: test - key: reason value: - stringValue: Test event of type warning + stringValue: TestEvent - key: regarding value: kvlistValue: diff --git a/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events_core.yaml b/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events_core.yaml index 755b3f5cf59f..a8d432c0d829 100644 --- a/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events_core.yaml +++ b/receiver/k8sobjectsreceiver/testdata/e2e/expected/watch_events_core.yaml @@ -29,7 +29,7 @@ resourceLogs: stringValue: test - key: message value: - stringValue: test event + stringValue: test event of type warning - key: source value: kvlistValue: {} @@ -112,7 +112,7 @@ resourceLogs: stringValue: Warning - key: reason value: - stringValue: Test event of type warning + stringValue: TestEvent - key: kind value: stringValue: Event diff --git a/receiver/k8sobjectsreceiver/testdata/e2e/testobjects/event.yaml b/receiver/k8sobjectsreceiver/testdata/e2e/testobjects/event.yaml index 7f25edfdacd7..7bb741666186 100644 --- a/receiver/k8sobjectsreceiver/testdata/e2e/testobjects/event.yaml +++ b/receiver/k8sobjectsreceiver/testdata/e2e/testobjects/event.yaml @@ -5,8 +5,8 @@ kind: Event metadata: name: test-k8sobjects-receiver namespace: default -note: test event -reason: Test event of type warning +note: test event of type warning +reason: TestEvent reportingController: e2etest reportingInstance: k8sobjectsreceiver regarding: diff --git a/receiver/k8sobjectsreceiver/testdata/e2e/testobjects/event_core.yaml b/receiver/k8sobjectsreceiver/testdata/e2e/testobjects/event_core.yaml index ba286c7a8fe4..7a2c2f3138fa 100644 --- a/receiver/k8sobjectsreceiver/testdata/e2e/testobjects/event_core.yaml +++ b/receiver/k8sobjectsreceiver/testdata/e2e/testobjects/event_core.yaml @@ -5,8 +5,8 @@ kind: Event metadata: name: test-k8sobjects-receiver-events-core namespace: default -note: test event -reason: Test event of type warning +note: test event of type warning +reason: TestEvent reportingController: e2etest-events-core reportingInstance: k8sobjectsreceiver regarding: diff --git a/receiver/kubeletstatsreceiver/e2e_test.go b/receiver/kubeletstatsreceiver/e2e_test.go index dd204ffcba1b..1cb44b0fa3f1 100644 --- a/receiver/kubeletstatsreceiver/e2e_test.go +++ b/receiver/kubeletstatsreceiver/e2e_test.go @@ -31,7 +31,6 @@ import ( const testKubeConfig = "/tmp/kube-config-otelcol-e2e-testing" func TestE2E(t *testing.T) { - t.Skip("skipping flaky test see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29892") var expected pmetric.Metrics expectedFile := filepath.Join("testdata", "e2e", "expected.yaml") @@ -42,6 +41,10 @@ func TestE2E(t *testing.T) { dynamicClient, err := dynamic.NewForConfig(kubeConfig) require.NoError(t, err) + metricsConsumer := new(consumertest.MetricsSink) + shutdownSink := startUpSink(t, metricsConsumer) + defer shutdownSink() + testID := uuid.NewString()[:8] collectorObjs := k8stest.CreateCollectorObjects(t, dynamicClient, testID) @@ -51,7 +54,6 @@ func TestE2E(t *testing.T) { } }() - metricsConsumer := new(consumertest.MetricsSink) wantEntries := 10 // Minimal number of metrics to wait for. waitForData(t, wantEntries, metricsConsumer) @@ -68,17 +70,19 @@ func TestE2E(t *testing.T) { ) } -func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink) { +func startUpSink(t *testing.T, mc *consumertest.MetricsSink) func() { f := otlpreceiver.NewFactory() cfg := f.CreateDefaultConfig().(*otlpreceiver.Config) rcvr, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, mc) require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, err, "failed creating metrics receiver") - defer func() { + return func() { assert.NoError(t, rcvr.Shutdown(context.Background())) - }() + } +} +func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink) { timeoutMinutes := 3 require.Eventuallyf(t, func() bool { return len(mc.AllMetrics()) > entriesNum