44
55//go:build integration && !agentbeat
66
7- package gcppubsub
7+ package gcppubsub_test
88
99import (
1010 "bytes"
@@ -20,6 +20,8 @@ import (
2020
2121 "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
2222 "github.com/elastic/beats/v7/libbeat/tests/integration"
23+ "github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub/testutil"
24+ "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/oteltestcol"
2325
2426 "github.com/elastic/elastic-agent-libs/testing/estools"
2527)
@@ -28,17 +30,17 @@ func TestGCPInputOTelE2E(t *testing.T) {
2830 integration .EnsureESIsRunning (t )
2931
3032 // Create pubsub client for setting up and communicating to emulator.
31- client , clientCancel := testSetup (t )
33+ client , clientCancel := testutil . TestSetup (t )
3234 defer func () {
3335 clientCancel ()
3436 client .Close ()
3537 }()
3638
37- createTopic (t , client )
38- createSubscription (t , "test-subscription-otel" , client )
39- createSubscription (t , "test-subscription-fb" , client )
39+ testutil . CreateTopic (t , client )
40+ testutil . CreateSubscription (t , "test-subscription-otel" , client )
41+ testutil . CreateSubscription (t , "test-subscription-fb" , client )
4042 const numMsgs = 10
41- publishMessages (t , client , numMsgs )
43+ testutil . PublishMessages (t , client , numMsgs )
4244
4345 host := integration .GetESURL (t , "http" )
4446 user := host .User .Username ()
@@ -59,7 +61,7 @@ func TestGCPInputOTelE2E(t *testing.T) {
5961 Subscription string
6062 }
6163
62- gcpConfig := `filebeat.inputs:
64+ gcpFilebeatConfig := `filebeat.inputs:
6365- type: gcp-pubsub
6466 project_id: test-project-id
6567 topic: test-topic-foo
@@ -83,13 +85,76 @@ processors:
8385 - add_kubernetes_metadata: ~
8486`
8587
86- // start filebeat in otel mode
87- filebeatOTel := integration .NewBeat (
88- t ,
89- "filebeat-otel" ,
90- "../../filebeat.test" ,
91- "otel" ,
92- )
88+ gcpOTelConfig := `exporters:
89+ elasticsearch:
90+ auth:
91+ authenticator: beatsauth
92+ compression: gzip
93+ compression_params:
94+ level: 1
95+ endpoints:
96+ - {{ .ESURL }}
97+ logs_dynamic_pipeline:
98+ enabled: true
99+ logs_index: logs-integration-{{ .Namespace }}
100+ mapping:
101+ mode: bodymap
102+ max_conns_per_host: 1
103+ password: {{ .Password }}
104+ retry:
105+ enabled: true
106+ initial_interval: 1s
107+ max_interval: 1m0s
108+ max_retries: 3
109+ sending_queue:
110+ batch:
111+ flush_timeout: 10s
112+ max_size: 1600
113+ min_size: 0
114+ sizer: items
115+ block_on_overflow: true
116+ enabled: true
117+ num_consumers: 1
118+ queue_size: 3200
119+ wait_for_result: true
120+ user: {{ .Username }}
121+ extensions:
122+ beatsauth:
123+ idle_connection_timeout: 3s
124+ proxy_disable: false
125+ timeout: 1m30s
126+ receivers:
127+ filebeatreceiver:
128+ filebeat:
129+ inputs:
130+ - credentials_file: "testdata/fake.json"
131+ project_id: test-project-id
132+ subscription:
133+ name: {{ .Subscription }}
134+ topic: test-topic-foo
135+ type: gcp-pubsub
136+ output:
137+ otelconsumer:
138+ processors:
139+ - add_host_metadata: ~
140+ - add_cloud_metadata: ~
141+ - add_docker_metadata: ~
142+ - add_kubernetes_metadata: ~
143+ queue.mem.flush.timeout: 0s
144+ setup.template.enabled: false
145+ service:
146+ extensions:
147+ - beatsauth
148+ pipelines:
149+ logs:
150+ exporters:
151+ - elasticsearch
152+ receivers:
153+ - filebeatreceiver
154+ telemetry:
155+ metrics:
156+ level: none
157+ `
93158
94159 optionsValue := options {
95160 ESURL : fmt .Sprintf ("%s://%s" , host .Scheme , host .Host ),
@@ -100,19 +165,16 @@ processors:
100165 var configBuffer bytes.Buffer
101166 optionsValue .Namespace = otelNamespace
102167 optionsValue .Subscription = "test-subscription-otel"
103- require .NoError (t , template .Must (template .New ("config" ).Parse (gcpConfig )).Execute (& configBuffer , optionsValue ))
104-
105- filebeatOTel .WriteConfigFile (configBuffer .String ())
168+ require .NoError (t , template .Must (template .New ("config" ).Parse (gcpOTelConfig )).Execute (& configBuffer , optionsValue ))
106169
107- filebeatOTel .Start ()
108- defer filebeatOTel .Stop ()
170+ oteltestcol .New (t , configBuffer .String ())
109171
110172 // reset buffer
111173 configBuffer .Reset ()
112174
113175 optionsValue .Namespace = fbNameSpace
114176 optionsValue .Subscription = "test-subscription-fb"
115- require .NoError (t , template .Must (template .New ("config" ).Parse (gcpConfig )).Execute (& configBuffer , optionsValue ))
177+ require .NoError (t , template .Must (template .New ("config" ).Parse (gcpFilebeatConfig )).Execute (& configBuffer , optionsValue ))
116178
117179 // start filebeat
118180 filebeat := integration .NewBeat (
@@ -181,5 +243,4 @@ processors:
181243 }
182244
183245 oteltest .AssertMapsEqual (t , filebeatDoc , otelDoc , ignoredFields , "expected documents to be equal" )
184-
185246}
0 commit comments