Skip to content

Commit 0e3626c

Browse files
authored
feat: refactor subscriptions (#714)
1 parent 67093cd commit 0e3626c

30 files changed

+1150
-437
lines changed

v2/examples/federation/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ require (
3232
github.com/imdario/mergo v0.3.8 // indirect
3333
github.com/jensneuse/byte-template v0.0.0-20200214152254-4f3cf06e5c68 // indirect
3434
github.com/jensneuse/pipeline v0.0.0-20200117120358-9fb4de085cd6 // indirect
35-
github.com/klauspost/compress v1.14.4 // indirect
35+
github.com/klauspost/compress v1.17.0 // indirect
3636
github.com/logrusorgru/aurora/v3 v3.0.0 // indirect
3737
github.com/mattn/go-colorable v0.1.13 // indirect
3838
github.com/mattn/go-isatty v0.0.19 // indirect

v2/examples/federation/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
8787
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
8888
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
8989
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
90+
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
9091
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
9192
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
9293
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=

v2/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ require (
3434

3535
require (
3636
github.com/agnivade/levenshtein v1.1.1 // indirect
37+
github.com/alitto/pond v1.8.3 // indirect
3738
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect
3839
github.com/gobwas/pool v0.2.0 // indirect
3940
github.com/hashicorp/errwrap v1.0.0 // indirect

v2/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi
55
github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=
66
github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8=
77
github.com/agnivade/levenshtein v1.1.1/go.mod h1:veldBMzWxcCG2ZvUTKD2kJNRdCk5hVbJomOvKkmgYbo=
8+
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
9+
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
810
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
911
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
1012
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q=

v2/internal/pkg/xcontext/xcontext.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2019 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// Package xcontext is a package to offer the extra functionality we need
6+
// from contexts that is not available from the standard context package.
7+
package xcontext
8+
9+
import (
10+
"context"
11+
"time"
12+
)
13+
14+
// Detach returns a context that keeps all the values of its parent context
15+
// but detaches from the cancellation and error handling.
16+
func Detach(ctx context.Context) context.Context { return detachedContext{ctx} }
17+
18+
type detachedContext struct{ parent context.Context }
19+
20+
func (v detachedContext) Deadline() (time.Time, bool) { return time.Time{}, false }
21+
func (v detachedContext) Done() <-chan struct{} { return nil }
22+
func (v detachedContext) Err() error { return nil }
23+
func (v detachedContext) Value(key interface{}) interface{} { return v.parent.Value(key) }

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"slices"
1212

1313
"github.com/buger/jsonparser"
14+
"github.com/cespare/xxhash/v2"
1415
"github.com/jensneuse/abstractlogger"
1516
"github.com/tidwall/sjson"
1617

@@ -1753,7 +1754,8 @@ func (s *Source) Load(ctx context.Context, input []byte, writer io.Writer) (err
17531754
}
17541755

17551756
type GraphQLSubscriptionClient interface {
1756-
Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, next chan<- []byte) error
1757+
Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error
1758+
UniqueRequestID(ctx *resolve.Context, options GraphQLSubscriptionOptions, hash *xxhash.Digest) (err error)
17571759
}
17581760

17591761
type GraphQLSubscriptionOptions struct {
@@ -1778,7 +1780,7 @@ type SubscriptionSource struct {
17781780
client GraphQLSubscriptionClient
17791781
}
17801782

1781-
func (s *SubscriptionSource) Start(ctx *resolve.Context, input []byte, next chan<- []byte) error {
1783+
func (s *SubscriptionSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error {
17821784
var options GraphQLSubscriptionOptions
17831785
err := json.Unmarshal(input, &options)
17841786
if err != nil {
@@ -1787,5 +1789,22 @@ func (s *SubscriptionSource) Start(ctx *resolve.Context, input []byte, next chan
17871789
if options.Body.Query == "" {
17881790
return resolve.ErrUnableToResolve
17891791
}
1790-
return s.client.Subscribe(ctx, options, next)
1792+
return s.client.Subscribe(ctx, options, updater)
1793+
}
1794+
1795+
var (
1796+
dataSouceName = []byte("graphql")
1797+
)
1798+
1799+
func (s *SubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) (err error) {
1800+
_, err = xxh.Write(dataSouceName)
1801+
if err != nil {
1802+
return err
1803+
}
1804+
var options GraphQLSubscriptionOptions
1805+
err = json.Unmarshal(input, &options)
1806+
if err != nil {
1807+
return err
1808+
}
1809+
return s.client.UniqueRequestID(ctx, options, xxh)
17911810
}

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go

Lines changed: 110 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"io"
1010
"net/http"
1111
"net/http/httptest"
12+
"sync"
1213
"testing"
1314
"time"
1415

16+
"github.com/cespare/xxhash/v2"
1517
"github.com/stretchr/testify/assert"
1618
"github.com/stretchr/testify/require"
1719

@@ -7996,10 +7998,70 @@ var errSubscriptionClientFail = errors.New("subscription client fail error")
79967998

79977999
type FailingSubscriptionClient struct{}
79988000

7999-
func (f FailingSubscriptionClient) Subscribe(_ *resolve.Context, _ GraphQLSubscriptionOptions, _ chan<- []byte) error {
8001+
func (f *FailingSubscriptionClient) Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error {
80008002
return errSubscriptionClientFail
80018003
}
80028004

8005+
func (f *FailingSubscriptionClient) UniqueRequestID(ctx *resolve.Context, options GraphQLSubscriptionOptions, hash *xxhash.Digest) (err error) {
8006+
return errSubscriptionClientFail
8007+
}
8008+
8009+
type testSubscriptionUpdater struct {
8010+
updates []string
8011+
done bool
8012+
mux sync.Mutex
8013+
}
8014+
8015+
func (t *testSubscriptionUpdater) AwaitUpdates(tt *testing.T, timeout time.Duration, count int) {
8016+
ticker := time.NewTicker(timeout)
8017+
defer ticker.Stop()
8018+
for {
8019+
time.Sleep(10 * time.Millisecond)
8020+
select {
8021+
case <-ticker.C:
8022+
tt.Fatalf("timed out waiting for updates")
8023+
default:
8024+
t.mux.Lock()
8025+
if len(t.updates) == count {
8026+
t.mux.Unlock()
8027+
return
8028+
}
8029+
t.mux.Unlock()
8030+
}
8031+
}
8032+
}
8033+
8034+
func (t *testSubscriptionUpdater) AwaitDone(tt *testing.T, timeout time.Duration) {
8035+
ticker := time.NewTicker(timeout)
8036+
defer ticker.Stop()
8037+
for {
8038+
time.Sleep(10 * time.Millisecond)
8039+
select {
8040+
case <-ticker.C:
8041+
tt.Fatalf("timed out waiting for done")
8042+
default:
8043+
t.mux.Lock()
8044+
if t.done {
8045+
t.mux.Unlock()
8046+
return
8047+
}
8048+
t.mux.Unlock()
8049+
}
8050+
}
8051+
}
8052+
8053+
func (t *testSubscriptionUpdater) Update(data []byte) {
8054+
t.mux.Lock()
8055+
defer t.mux.Unlock()
8056+
t.updates = append(t.updates, string(data))
8057+
}
8058+
8059+
func (t *testSubscriptionUpdater) Done() {
8060+
t.mux.Lock()
8061+
defer t.mux.Unlock()
8062+
t.done = true
8063+
}
8064+
80038065
func TestSubscriptionSource_Start(t *testing.T) {
80048066
chatServer := httptest.NewServer(subscriptiontesting.ChatGraphQLEndpointHandler())
80058067
defer chatServer.Close()
@@ -8042,84 +8104,86 @@ func TestSubscriptionSource_Start(t *testing.T) {
80428104
}
80438105

80448106
t.Run("should return error when input is invalid", func(t *testing.T) {
8045-
source := SubscriptionSource{client: FailingSubscriptionClient{}}
8107+
source := SubscriptionSource{client: &FailingSubscriptionClient{}}
80468108
err := source.Start(resolve.NewContext(context.Background()), []byte(`{"url": "", "body": "", "header": null}`), nil)
80478109
assert.Error(t, err)
80488110
})
80498111

80508112
t.Run("should return error when subscription client returns an error", func(t *testing.T) {
8051-
source := SubscriptionSource{client: FailingSubscriptionClient{}}
8113+
source := SubscriptionSource{client: &FailingSubscriptionClient{}}
80528114
err := source.Start(resolve.NewContext(context.Background()), []byte(`{"url": "", "body": {}, "header": null}`), nil)
80538115
assert.Error(t, err)
80548116
assert.Equal(t, resolve.ErrUnableToResolve, err)
80558117
})
80568118

80578119
t.Run("invalid json: should stop before sending to upstream", func(t *testing.T) {
8058-
next := make(chan []byte)
80598120
ctx := resolve.NewContext(context.Background())
80608121
defer ctx.Context().Done()
80618122

8123+
updater := &testSubscriptionUpdater{}
8124+
80628125
source := newSubscriptionSource(ctx.Context())
80638126
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: "#test") { text createdBy } }"}`)
8064-
err := source.Start(ctx, chatSubscriptionOptions, next)
8127+
err := source.Start(ctx, chatSubscriptionOptions, updater)
80658128
require.ErrorIs(t, err, resolve.ErrUnableToResolve)
80668129
})
80678130

80688131
t.Run("invalid syntax (roomNam)", func(t *testing.T) {
8069-
next := make(chan []byte)
80708132
ctx := resolve.NewContext(context.Background())
80718133
defer ctx.Context().Done()
80728134

8135+
updater := &testSubscriptionUpdater{}
8136+
80738137
source := newSubscriptionSource(ctx.Context())
80748138
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomNam: \"#test\") { text createdBy } }"}`)
8075-
err := source.Start(ctx, chatSubscriptionOptions, next)
8139+
err := source.Start(ctx, chatSubscriptionOptions, updater)
80768140
require.NoError(t, err)
8077-
8078-
msg, ok := <-next
8079-
assert.True(t, ok)
8080-
assert.Equal(t, `{"errors":[{"message":"Unknown argument \"roomNam\" on field \"Subscription.messageAdded\". Did you mean \"roomName\"?","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}},{"message":"Field \"messageAdded\" argument \"roomName\" of type \"String!\" is required, but it was not provided.","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}`, string(msg))
8081-
_, ok = <-next
8082-
assert.False(t, ok)
8141+
updater.AwaitUpdates(t, time.Second, 1)
8142+
assert.Len(t, updater.updates, 1)
8143+
assert.Equal(t, `{"errors":[{"message":"Unknown argument \"roomNam\" on field \"Subscription.messageAdded\". Did you mean \"roomName\"?","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}},{"message":"Field \"messageAdded\" argument \"roomName\" of type \"String!\" is required, but it was not provided.","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}`, updater.updates[0])
8144+
assert.Equal(t, true, updater.done)
80838145
})
80848146

80858147
t.Run("should close connection on stop message", func(t *testing.T) {
8086-
next := make(chan []byte)
80878148
subscriptionLifecycle, cancelSubscription := context.WithCancel(context.Background())
80888149
resolverLifecycle, cancelResolver := context.WithCancel(context.Background())
80898150
defer cancelResolver()
80908151

8152+
updater := &testSubscriptionUpdater{}
8153+
80918154
source := newSubscriptionSource(resolverLifecycle)
80928155
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)
8093-
err := source.Start(resolve.NewContext(subscriptionLifecycle), chatSubscriptionOptions, next)
8156+
err := source.Start(resolve.NewContext(subscriptionLifecycle), chatSubscriptionOptions, updater)
80948157
require.NoError(t, err)
80958158

80968159
username := "myuser"
80978160
message := "hello world!"
80988161
go sendChatMessage(t, username, message)
80998162

8100-
nextBytes := <-next
8101-
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, string(nextBytes))
8163+
updater.AwaitUpdates(t, time.Second, 1)
81028164
cancelSubscription()
8103-
_, ok := <-next
8104-
assert.False(t, ok)
8165+
updater.AwaitDone(t, time.Second*5)
8166+
assert.Len(t, updater.updates, 1)
8167+
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])
81058168
})
81068169

81078170
t.Run("should successfully subscribe with chat example", func(t *testing.T) {
8108-
next := make(chan []byte)
81098171
ctx := resolve.NewContext(context.Background())
81108172
defer ctx.Context().Done()
81118173

8174+
updater := &testSubscriptionUpdater{}
8175+
81128176
source := newSubscriptionSource(ctx.Context())
81138177
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)
8114-
err := source.Start(ctx, chatSubscriptionOptions, next)
8178+
err := source.Start(ctx, chatSubscriptionOptions, updater)
81158179
require.NoError(t, err)
81168180

81178181
username := "myuser"
81188182
message := "hello world!"
81198183
go sendChatMessage(t, username, message)
8120-
8121-
nextBytes := <-next
8122-
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, string(nextBytes))
8184+
updater.AwaitUpdates(t, time.Second, 1)
8185+
assert.Len(t, updater.updates, 1)
8186+
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])
81238187
})
81248188
}
81258189

@@ -8167,60 +8231,65 @@ func TestSubscription_GTWS_SubProtocol(t *testing.T) {
81678231
}
81688232

81698233
t.Run("invalid syntax (roomNam)", func(t *testing.T) {
8170-
next := make(chan []byte)
81718234
ctx := resolve.NewContext(context.Background())
81728235
defer ctx.Context().Done()
81738236

8237+
updater := &testSubscriptionUpdater{}
8238+
81748239
source := newSubscriptionSource(ctx.Context())
81758240
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomNam: \"#test\") { text createdBy } }"}`)
8176-
err := source.Start(ctx, chatSubscriptionOptions, next)
8241+
err := source.Start(ctx, chatSubscriptionOptions, updater)
81778242
require.NoError(t, err)
81788243

8179-
msg, ok := <-next
8180-
assert.True(t, ok)
8181-
assert.Equal(t, `{"errors":[{"message":"Unknown argument \"roomNam\" on field \"Subscription.messageAdded\". Did you mean \"roomName\"?","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}},{"message":"Field \"messageAdded\" argument \"roomName\" of type \"String!\" is required, but it was not provided.","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}`, string(msg))
8182-
_, ok = <-next
8183-
assert.False(t, ok)
8244+
updater.AwaitUpdates(t, time.Second, 1)
8245+
assert.Len(t, updater.updates, 1)
8246+
assert.Equal(t, `{"errors":[{"message":"Unknown argument \"roomNam\" on field \"Subscription.messageAdded\". Did you mean \"roomName\"?","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}},{"message":"Field \"messageAdded\" argument \"roomName\" of type \"String!\" is required, but it was not provided.","locations":[{"line":1,"column":29}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}`, updater.updates[0])
8247+
updater.AwaitDone(t, time.Second)
8248+
assert.Equal(t, true, updater.done)
81848249
})
81858250

81868251
t.Run("should close connection on stop message", func(t *testing.T) {
8187-
next := make(chan []byte)
81888252
subscriptionLifecycle, cancelSubscription := context.WithCancel(context.Background())
81898253
resolverLifecycle, cancelResolver := context.WithCancel(context.Background())
81908254
defer cancelResolver()
81918255

8256+
updater := &testSubscriptionUpdater{}
8257+
81928258
source := newSubscriptionSource(resolverLifecycle)
81938259
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)
8194-
err := source.Start(resolve.NewContext(subscriptionLifecycle), chatSubscriptionOptions, next)
8260+
err := source.Start(resolve.NewContext(subscriptionLifecycle), chatSubscriptionOptions, updater)
81958261
require.NoError(t, err)
81968262

81978263
username := "myuser"
81988264
message := "hello world!"
81998265
go sendChatMessage(t, username, message)
82008266

8201-
nextBytes := <-next
8202-
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, string(nextBytes))
8267+
updater.AwaitUpdates(t, time.Second, 1)
8268+
assert.Len(t, updater.updates, 1)
8269+
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])
82038270
cancelSubscription()
8204-
_, ok := <-next
8205-
assert.False(t, ok)
8271+
updater.AwaitDone(t, time.Second*5)
8272+
assert.Equal(t, true, updater.done)
82068273
})
82078274

82088275
t.Run("should successfully subscribe with chat example", func(t *testing.T) {
8209-
next := make(chan []byte)
82108276
ctx := resolve.NewContext(context.Background())
82118277
defer ctx.Context().Done()
82128278

8279+
updater := &testSubscriptionUpdater{}
8280+
82138281
source := newSubscriptionSource(ctx.Context())
82148282
chatSubscriptionOptions := chatServerSubscriptionOptions(t, `{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)
8215-
err := source.Start(ctx, chatSubscriptionOptions, next)
8283+
err := source.Start(ctx, chatSubscriptionOptions, updater)
82168284
require.NoError(t, err)
82178285

82188286
username := "myuser"
82198287
message := "hello world!"
82208288
go sendChatMessage(t, username, message)
82218289

8222-
nextBytes := <-next
8223-
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, string(nextBytes))
8290+
updater.AwaitUpdates(t, time.Second, 1)
8291+
assert.Len(t, updater.updates, 1)
8292+
assert.Equal(t, `{"data":{"messageAdded":{"text":"hello world!","createdBy":"myuser"}}}`, updater.updates[0])
82248293
})
82258294
}
82268295

0 commit comments

Comments
 (0)