Skip to content

Commit ce77736

Browse files
authored
Revert "feat: refactor subscriptions (#714)" (#720)
This reverts commit 0e3626c.
1 parent 1808812 commit ce77736

30 files changed

+437
-1150
lines changed

v2/examples/federation/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ require (
3232
github.com/imdario/mergo v0.3.8 // indirect
3333
github.com/jensneuse/byte-template v0.0.0-20200214152254-4f3cf06e5c68 // indirect
3434
github.com/jensneuse/pipeline v0.0.0-20200117120358-9fb4de085cd6 // indirect
35-
github.com/klauspost/compress v1.17.0 // indirect
35+
github.com/klauspost/compress v1.14.4 // indirect
3636
github.com/logrusorgru/aurora/v3 v3.0.0 // indirect
3737
github.com/mattn/go-colorable v0.1.13 // indirect
3838
github.com/mattn/go-isatty v0.0.19 // indirect

v2/examples/federation/go.sum

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
8787
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
8888
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
8989
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
90-
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
9190
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
9291
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
9392
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=

v2/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ require (
3434

3535
require (
3636
github.com/agnivade/levenshtein v1.1.1 // indirect
37-
github.com/alitto/pond v1.8.3 // indirect
3837
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect
3938
github.com/gobwas/pool v0.2.0 // indirect
4039
github.com/hashicorp/errwrap v1.0.0 // indirect

v2/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi
55
github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=
66
github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8=
77
github.com/agnivade/levenshtein v1.1.1/go.mod h1:veldBMzWxcCG2ZvUTKD2kJNRdCk5hVbJomOvKkmgYbo=
8-
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
9-
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
108
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
119
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
1210
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q=

v2/internal/pkg/xcontext/xcontext.go

Lines changed: 0 additions & 23 deletions
This file was deleted.

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"slices"
1212

1313
"github.com/buger/jsonparser"
14-
"github.com/cespare/xxhash/v2"
1514
"github.com/jensneuse/abstractlogger"
1615
"github.com/tidwall/sjson"
1716

@@ -1754,8 +1753,7 @@ func (s *Source) Load(ctx context.Context, input []byte, writer io.Writer) (err
17541753
}
17551754

17561755
type GraphQLSubscriptionClient interface {
1757-
Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error
1758-
UniqueRequestID(ctx *resolve.Context, options GraphQLSubscriptionOptions, hash *xxhash.Digest) (err error)
1756+
Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, next chan<- []byte) error
17591757
}
17601758

17611759
type GraphQLSubscriptionOptions struct {
@@ -1780,7 +1778,7 @@ type SubscriptionSource struct {
17801778
client GraphQLSubscriptionClient
17811779
}
17821780

1783-
func (s *SubscriptionSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error {
1781+
func (s *SubscriptionSource) Start(ctx *resolve.Context, input []byte, next chan<- []byte) error {
17841782
var options GraphQLSubscriptionOptions
17851783
err := json.Unmarshal(input, &options)
17861784
if err != nil {
@@ -1789,22 +1787,5 @@ func (s *SubscriptionSource) Start(ctx *resolve.Context, input []byte, updater r
17891787
if options.Body.Query == "" {
17901788
return resolve.ErrUnableToResolve
17911789
}
1792-
return s.client.Subscribe(ctx, options, updater)
1793-
}
1794-
1795-
var (
1796-
dataSouceName = []byte("graphql")
1797-
)
1798-
1799-
func (s *SubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) (err error) {
1800-
_, err = xxh.Write(dataSouceName)
1801-
if err != nil {
1802-
return err
1803-
}
1804-
var options GraphQLSubscriptionOptions
1805-
err = json.Unmarshal(input, &options)
1806-
if err != nil {
1807-
return err
1808-
}
1809-
return s.client.UniqueRequestID(ctx, options, xxh)
1790+
return s.client.Subscribe(ctx, options, next)
18101791
}

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go

Lines changed: 41 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@ import (
99
"io"
1010
"net/http"
1111
"net/http/httptest"
12-
"sync"
1312
"testing"
1413
"time"
1514

16-
"github.com/cespare/xxhash/v2"
1715
"github.com/stretchr/testify/assert"
1816
"github.com/stretchr/testify/require"
1917

@@ -8076,70 +8074,10 @@ var errSubscriptionClientFail = errors.New("subscription client fail error")
80768074

80778075
type FailingSubscriptionClient struct{}
80788076

8079-
func (f *FailingSubscriptionClient) Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error {
8077+
func (f FailingSubscriptionClient) Subscribe(_ *resolve.Context, _ GraphQLSubscriptionOptions, _ chan<- []byte) error {
80808078
return errSubscriptionClientFail
80818079
}
80828080

8083-
func (f *FailingSubscriptionClient) UniqueRequestID(ctx *resolve.Context, options GraphQLSubscriptionOptions, hash *xxhash.Digest) (err error) {
8084-
return errSubscriptionClientFail
8085-
}
8086-
8087-
type testSubscriptionUpdater struct {
8088-
updates []string
8089-
done bool
8090-
mux sync.Mutex
8091-
}
8092-
8093-
func (t *testSubscriptionUpdater) AwaitUpdates(tt *testing.T, timeout time.Duration, count int) {
8094-
ticker := time.NewTicker(timeout)
8095-
defer ticker.Stop()
8096-
for {
8097-
time.Sleep(10 * time.Millisecond)
8098-
select {
8099-
case <-ticker.C:
8100-
tt.Fatalf("timed out waiting for updates")
8101-
default:
8102-
t.mux.Lock()
8103-
if len(t.updates) == count {
8104-
t.mux.Unlock()
8105-
return
8106-
}
8107-
t.mux.Unlock()
8108-
}
8109-
}
8110-
}
8111-
8112-
func (t *testSubscriptionUpdater) AwaitDone(tt *testing.T, timeout time.Duration) {
8113-
ticker := time.NewTicker(timeout)
8114-
defer ticker.Stop()
8115-
for {
8116-
time.Sleep(10 * time.Millisecond)
8117-
select {
8118-
case <-ticker.C:
8119-
tt.Fatalf("timed out waiting for done")
8120-
default:
8121-
t.mux.Lock()
8122-
if t.done {
8123-
t.mux.Unlock()
8124-
return
8125-
}
8126-
t.mux.Unlock()
8127-
}
8128-
}
8129-
}
8130-
8131-
func (t *testSubscriptionUpdater) Update(data []byte) {
8132-
t.mux.Lock()
8133-
defer t.mux.Unlock()
8134-
t.updates = append(t.updates, string(data))
8135-
}
8136-
8137-
func (t *testSubscriptionUpdater) Done() {
8138-
t.mux.Lock()
8139-
defer t.mux.Unlock()
8140-
t.done = true
8141-
}
8142-
81438081
func TestSubscriptionSource_Start(t *testing.T) {
81448082
chatServer := httptest.NewServer(subscriptiontesting.ChatGraphQLEndpointHandler())
81458083
defer chatServer.Close()
@@ -8182,86 +8120,84 @@ func TestSubscriptionSource_Start(t *testing.T) {
81828120
}
81838121

81848122
t.Run("should return error when input is invalid", func(t *testing.T) {
8185-
source := SubscriptionSource{client: &FailingSubscriptionClient{}}
8123+
source := SubscriptionSource{client: FailingSubscriptionClient{}}
81868124
err := source.Start(resolve.NewContext(context.Background()), []byte(`{"url": "", "body": "", "header": null}`), nil)
81878125
assert.Error(t, err)
81888126
})
81898127

81908128
t.Run("should return error when subscription client returns an error", func(t *testing.T) {
8191-
source := SubscriptionSource{client: &FailingSubscriptionClient{}}
8129+
source := SubscriptionSource{client: FailingSubscriptionClient{}}
81928130
err := source.Start(resolve.NewContext(context.Background()), []byte(`{"url": "", "body": {}, "header": null}`), nil)
81938131
assert.Error(t, err)
81948132
assert.Equal(t, resolve.ErrUnableToResolve, err)
81958133
})
81968134

81978135
t.Run("invalid json: should stop before sending to upstream", func(t *testing.T) {
8136+
next := make(chan []byte)
81988137
ctx := resolve.NewContext(context.Background())
81998138
defer ctx.Context().Done()
82008139

8201-
updater := &testSubscriptionUpdater{}
8202-
82038140
source := newSubscriptionSource(ctx.Context())
82048141
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: "#test") { text createdBy } }"}`)
8205-
err := source.Start(ctx, chatSubscriptionOptions, updater)
8142+
err := source.Start(ctx, chatSubscriptionOptions, next)
82068143
require.ErrorIs(t, err, resolve.ErrUnableToResolve)
82078144
})
82088145

82098146
t.Run("invalid syntax (roomNam)", func(t *testing.T) {
8147+
next := make(chan []byte)
82108148
ctx := resolve.NewContext(context.Background())
82118149
defer ctx.Context().Done()
82128150

8213-
updater := &testSubscriptionUpdater{}
8214-
82158151
source := newSubscriptionSource(ctx.Context())
82168152
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomNam: \"#test\") { text createdBy } }"}`)
8217-
err := source.Start(ctx, chatSubscriptionOptions, updater)
8153+
err := source.Start(ctx, chatSubscriptionOptions, next)
82188154
require.NoError(t, err)
8219-
updater.AwaitUpdates(t, time.Second, 1)
8220-
assert.Len(t, updater.updates, 1)
8221-
assert.Equal(t, `{"errors":[{"message":"Unknown argument \"roomNam\" on field \"Subscription.messageAdded\". Did you mean \"roomName\"?","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}},{"message":"Field \"messageAdded\" argument \"roomName\" of type \"String!\" is required, but it was not provided.","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}`, updater.updates[0])
8222-
assert.Equal(t, true, updater.done)
8155+
8156+
msg, ok := <-next
8157+
assert.True(t, ok)
8158+
assert.Equal(t, `{"errors":[{"message":"Unknown argument \"roomNam\" on field \"Subscription.messageAdded\". Did you mean \"roomName\"?","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}},{"message":"Field \"messageAdded\" argument \"roomName\" of type \"String!\" is required, but it was not provided.","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}`, string(msg))
8159+
_, ok = <-next
8160+
assert.False(t, ok)
82238161
})
82248162

82258163
t.Run("should close connection on stop message", func(t *testing.T) {
8164+
next := make(chan []byte)
82268165
subscriptionLifecycle, cancelSubscription := context.WithCancel(context.Background())
82278166
resolverLifecycle, cancelResolver := context.WithCancel(context.Background())
82288167
defer cancelResolver()
82298168

8230-
updater := &testSubscriptionUpdater{}
8231-
82328169
source := newSubscriptionSource(resolverLifecycle)
82338170
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)
8234-
err := source.Start(resolve.NewContext(subscriptionLifecycle), chatSubscriptionOptions, updater)
8171+
err := source.Start(resolve.NewContext(subscriptionLifecycle), chatSubscriptionOptions, next)
82358172
require.NoError(t, err)
82368173

82378174
username := "myuser"
82388175
message := "hello world!"
82398176
go sendChatMessage(t, username, message)
82408177

8241-
updater.AwaitUpdates(t, time.Second, 1)
8178+
nextBytes := <-next
8179+
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, string(nextBytes))
82428180
cancelSubscription()
8243-
updater.AwaitDone(t, time.Second*5)
8244-
assert.Len(t, updater.updates, 1)
8245-
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])
8181+
_, ok := <-next
8182+
assert.False(t, ok)
82468183
})
82478184

82488185
t.Run("should successfully subscribe with chat example", func(t *testing.T) {
8186+
next := make(chan []byte)
82498187
ctx := resolve.NewContext(context.Background())
82508188
defer ctx.Context().Done()
82518189

8252-
updater := &testSubscriptionUpdater{}
8253-
82548190
source := newSubscriptionSource(ctx.Context())
82558191
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)
8256-
err := source.Start(ctx, chatSubscriptionOptions, updater)
8192+
err := source.Start(ctx, chatSubscriptionOptions, next)
82578193
require.NoError(t, err)
82588194

82598195
username := "myuser"
82608196
message := "hello world!"
82618197
go sendChatMessage(t, username, message)
8262-
updater.AwaitUpdates(t, time.Second, 1)
8263-
assert.Len(t, updater.updates, 1)
8264-
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])
8198+
8199+
nextBytes := <-next
8200+
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, string(nextBytes))
82658201
})
82668202
}
82678203

@@ -8309,65 +8245,60 @@ func TestSubscription_GTWS_SubProtocol(t *testing.T) {
83098245
}
83108246

83118247
t.Run("invalid syntax (roomNam)", func(t *testing.T) {
8248+
next := make(chan []byte)
83128249
ctx := resolve.NewContext(context.Background())
83138250
defer ctx.Context().Done()
83148251

8315-
updater := &testSubscriptionUpdater{}
8316-
83178252
source := newSubscriptionSource(ctx.Context())
83188253
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomNam: \"#test\") { text createdBy } }"}`)
8319-
err := source.Start(ctx, chatSubscriptionOptions, updater)
8254+
err := source.Start(ctx, chatSubscriptionOptions, next)
83208255
require.NoError(t, err)
83218256

8322-
updater.AwaitUpdates(t, time.Second, 1)
8323-
assert.Len(t, updater.updates, 1)
8324-
assert.Equal(t, `{"errors":[{"message":"Unknown argument \"roomNam\" on field \"Subscription.messageAdded\". Did you mean \"roomName\"?","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}},{"message":"Field \"messageAdded\" argument \"roomName\" of type \"String!\" is required, but it was not provided.","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}`, updater.updates[0])
8325-
updater.AwaitDone(t, time.Second)
8326-
assert.Equal(t, true, updater.done)
8257+
msg, ok := <-next
8258+
assert.True(t, ok)
8259+
assert.Equal(t, `{"errors":[{"message":"Unknown argument \"roomNam\" on field \"Subscription.messageAdded\". Did you mean \"roomName\"?","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}},{"message":"Field \"messageAdded\" argument \"roomName\" of type \"String!\" is required, but it was not provided.","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}`, string(msg))
8260+
_, ok = <-next
8261+
assert.False(t, ok)
83278262
})
83288263

83298264
t.Run("should close connection on stop message", func(t *testing.T) {
8265+
next := make(chan []byte)
83308266
subscriptionLifecycle, cancelSubscription := context.WithCancel(context.Background())
83318267
resolverLifecycle, cancelResolver := context.WithCancel(context.Background())
83328268
defer cancelResolver()
83338269

8334-
updater := &testSubscriptionUpdater{}
8335-
83368270
source := newSubscriptionSource(resolverLifecycle)
83378271
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)
8338-
err := source.Start(resolve.NewContext(subscriptionLifecycle), chatSubscriptionOptions, updater)
8272+
err := source.Start(resolve.NewContext(subscriptionLifecycle), chatSubscriptionOptions, next)
83398273
require.NoError(t, err)
83408274

83418275
username := "myuser"
83428276
message := "hello world!"
83438277
go sendChatMessage(t, username, message)
83448278

8345-
updater.AwaitUpdates(t, time.Second, 1)
8346-
assert.Len(t, updater.updates, 1)
8347-
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])
8279+
nextBytes := <-next
8280+
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, string(nextBytes))
83488281
cancelSubscription()
8349-
updater.AwaitDone(t, time.Second*5)
8350-
assert.Equal(t, true, updater.done)
8282+
_, ok := <-next
8283+
assert.False(t, ok)
83518284
})
83528285

83538286
t.Run("should successfully subscribe with chat example", func(t *testing.T) {
8287+
next := make(chan []byte)
83548288
ctx := resolve.NewContext(context.Background())
83558289
defer ctx.Context().Done()
83568290

8357-
updater := &testSubscriptionUpdater{}
8358-
83598291
source := newSubscriptionSource(ctx.Context())
83608292
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)
8361-
err := source.Start(ctx, chatSubscriptionOptions, updater)
8293+
err := source.Start(ctx, chatSubscriptionOptions, next)
83628294
require.NoError(t, err)
83638295

83648296
username := "myuser"
83658297
message := "hello world!"
83668298
go sendChatMessage(t, username, message)
83678299

8368-
updater.AwaitUpdates(t, time.Second, 1)
8369-
assert.Len(t, updater.updates, 1)
8370-
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])
8300+
nextBytes := <-next
8301+
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, string(nextBytes))
83718302
})
83728303
}
83738304

0 commit comments

Comments
 (0)