Skip to content

Commit

Permalink
remove kafka globals
Browse files Browse the repository at this point in the history
  • Loading branch information
endigma committed Feb 11, 2025
1 parent 4f8f17e commit 5c8e500
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 36 deletions.
16 changes: 1 addition & 15 deletions router-tests/events/kafka_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

var subscriptionOne struct {
Expand Down Expand Up @@ -115,7 +114,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

var subscriptionOne struct {
Expand Down Expand Up @@ -201,7 +199,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

var subscriptionOne struct {
Expand Down Expand Up @@ -268,7 +265,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

var subscriptionOne struct {
Expand Down Expand Up @@ -364,7 +360,6 @@ func TestKafkaEvents(t *testing.T) {
engineExecutionConfiguration.WebSocketClientReadTimeout = time.Millisecond * 100
},
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

var subscriptionOne struct {
Expand Down Expand Up @@ -419,7 +414,7 @@ func TestKafkaEvents(t *testing.T) {
t.Run("multipart", func(t *testing.T) {
t.Parallel()

var multipartHeartbeatInterval = time.Second * 5
multipartHeartbeatInterval := time.Second * 5

t.Run("subscribe sync", func(t *testing.T) {
t.Parallel()
Expand All @@ -433,7 +428,6 @@ func TestKafkaEvents(t *testing.T) {
core.WithMultipartHeartbeatInterval(multipartHeartbeatInterval),
},
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

subscribePayload := []byte(`{"query":"subscription { employeeUpdatedMyKafka(employeeID: 1) { id details { forename surname } }}"}`)
Expand Down Expand Up @@ -529,7 +523,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

subscribePayload := []byte(`{"query":"subscription { employeeUpdatedMyKafka(employeeID: 1) { id details { forename surname } }}"}`)
Expand Down Expand Up @@ -565,7 +558,6 @@ func TestKafkaEvents(t *testing.T) {
line, _, gErr := reader.ReadLine()
require.NoError(t, gErr)
require.Equal(t, "", string(line))

}()

xEnv.WaitForSubscriptionCount(1, KafkaWaitTimeout)
Expand All @@ -590,7 +582,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

subscribePayload := []byte(`{"query":"subscription { employeeUpdatedMyKafka(employeeID: 1) { id details { forename surname } }}"}`)
Expand Down Expand Up @@ -694,7 +685,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

type subscriptionPayload struct {
Expand Down Expand Up @@ -808,7 +798,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

type subscriptionPayload struct {
Expand Down Expand Up @@ -922,7 +911,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

type subscriptionPayload struct {
Expand Down Expand Up @@ -990,7 +978,6 @@ func TestKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

var subscriptionOne struct {
Expand Down Expand Up @@ -1075,7 +1062,6 @@ func TestFlakyKafkaEvents(t *testing.T) {
RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate,
EnableKafka: true,
}, func(t *testing.T, xEnv *testenv.Environment) {

ensureTopicExists(t, xEnv, topics...)

type subscriptionPayload struct {
Expand Down
14 changes: 1 addition & 13 deletions router-tests/testenv/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ import (
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
)

var (
kafkaMux sync.Mutex
kafkaData *KafkaData
)

type KafkaData struct {
Brokers []string
}
Expand All @@ -44,14 +39,7 @@ func getHostPort(resource *dockertest.Resource, id string) string {
}

func setupKafkaServers(t testing.TB) (*KafkaData, error) {
kafkaMux.Lock()
defer kafkaMux.Unlock()

if kafkaData != nil {
return kafkaData, nil
}

kafkaData = &KafkaData{}
kafkaData := &KafkaData{}

dockerPool, err := dockertest.NewPool("")
require.NoError(t, err, "could not connect to docker")
Expand Down
10 changes: 2 additions & 8 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func Bench(b *testing.B, cfg *Config, f func(b *testing.B, xEnv *Environment)) {
assertCacheMetrics(b, env, v, ff)
}
}

}

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
Expand Down Expand Up @@ -407,9 +406,7 @@ func createTestEnv(t testing.TB, cfg *Config) (*Environment, error) {

ctx, cancel := context.WithCancelCause(context.Background())

var (
logObserver *observer.ObservedLogs
)
var logObserver *observer.ObservedLogs

if oc := cfg.LogObservation; oc.Enabled {
var zCore zapcore.Core
Expand Down Expand Up @@ -441,9 +438,7 @@ func createTestEnv(t testing.TB, cfg *Config) (*Environment, error) {
Countries: atomic.NewInt64(0),
}

var (
requiredPorts = 2
)
requiredPorts := 2

ports := freeport.GetN(t, requiredPorts)

Expand Down Expand Up @@ -1663,7 +1658,6 @@ func (e *Environment) ReadSSE(ctx context.Context, body io.ReadCloser, handler f
handler(data)
}
}

}
}

Expand Down

0 comments on commit 5c8e500

Please sign in to comment.