-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathsubscriptions.go
230 lines (188 loc) · 6.45 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package graphqlws
import (
"errors"
"github.com/graphql-go/graphql"
"github.com/graphql-go/graphql/gqlerrors"
"github.com/graphql-go/graphql/language/ast"
"github.com/graphql-go/graphql/language/parser"
log "github.com/sirupsen/logrus"
)
// ErrorsFromGraphQLErrors convert from GraphQL errors to regular errors.
func ErrorsFromGraphQLErrors(errors []gqlerrors.FormattedError) []error {
if len(errors) == 0 {
return nil
}
out := make([]error, len(errors))
for i := range errors {
out[i] = errors[i]
}
return out
}
// SubscriptionSendDataFunc is a function that sends updated data
// for a specific subscription to the corresponding subscriber.
type SubscriptionSendDataFunc func(*DataMessagePayload)
// Subscription holds all information about a GraphQL subscription
// made by a client, including a function to send data back to the
// client when there are updates to the subscription query result.
type Subscription struct {
ID string
Query string
Variables map[string]interface{}
OperationName string
Document *ast.Document
Fields []string
Connection Connection
SendData SubscriptionSendDataFunc
}
// MatchesField returns true if the subscription is for data that
// belongs to the given field.
func (s *Subscription) MatchesField(field string) bool {
if s.Document == nil || len(s.Fields) == 0 {
return false
}
// The subscription matches the field if any of the queries have
// the same name as the field
for _, name := range s.Fields {
if name == field {
return true
}
}
return false
}
// ConnectionSubscriptions defines a map of all subscriptions of
// a connection by their IDs.
type ConnectionSubscriptions map[string]*Subscription
// Subscriptions defines a map of connections to a map of
// subscription IDs to subscriptions.
type Subscriptions map[Connection]ConnectionSubscriptions
// SubscriptionManager provides a high-level interface to managing
// and accessing the subscriptions made by GraphQL WS clients.
type SubscriptionManager interface {
// Subscriptions returns all registered subscriptions, grouped
// by connection.
Subscriptions() Subscriptions
// AddSubscription adds a new subscription to the manager.
AddSubscription(Connection, *Subscription) []error
// RemoveSubscription removes a subscription from the manager.
RemoveSubscription(Connection, *Subscription)
// RemoveSubscriptions removes all subscriptions of a client connection.
RemoveSubscriptions(Connection)
}
/**
* The default implementation of the SubscriptionManager interface.
*/
type subscriptionManager struct {
subscriptions Subscriptions
schema *graphql.Schema
logger *log.Entry
}
func NewSubscriptionManagerWithLogger(schema *graphql.Schema, logger *log.Entry) SubscriptionManager {
return newSubscriptionManager(schema, logger)
}
// NewSubscriptionManager creates a new subscription manager.
func NewSubscriptionManager(schema *graphql.Schema) SubscriptionManager {
return newSubscriptionManager(schema, NewLogger("subscriptions"))
}
func newSubscriptionManager(schema *graphql.Schema, logger *log.Entry) SubscriptionManager {
manager := new(subscriptionManager)
manager.subscriptions = make(Subscriptions)
manager.logger = logger
manager.schema = schema
return manager
}
func (m *subscriptionManager) Subscriptions() Subscriptions {
return m.subscriptions
}
func (m *subscriptionManager) AddSubscription(
conn Connection,
subscription *Subscription,
) []error {
m.logger.WithFields(log.Fields{
"conn": conn.ID(),
"subscription": subscription.ID,
}).Info("Add subscription")
if errors := validateSubscription(subscription); len(errors) > 0 {
m.logger.WithField("errors", errors).Warn("Failed to add invalid subscription")
return errors
}
// Parse the subscription query
document, err := parser.Parse(parser.ParseParams{
Source: subscription.Query,
})
if err != nil {
m.logger.WithField("err", err).Warn("Failed to parse subscription query")
return []error{err}
}
// Validate the query document
validation := graphql.ValidateDocument(m.schema, document, nil)
if !validation.IsValid {
m.logger.WithFields(log.Fields{
"errors": validation.Errors,
}).Warn("Failed to validate subscription query")
return ErrorsFromGraphQLErrors(validation.Errors)
}
// Remember the query document for later
subscription.Document = document
// Extract query names from the document (typically, there should only be one)
subscription.Fields = subscriptionFieldNamesFromDocument(document)
// Allocate the connection's map of subscription IDs to
// subscriptions on demand
if m.subscriptions[conn] == nil {
m.subscriptions[conn] = make(ConnectionSubscriptions)
}
// Add the subscription if it hasn't already been added
if m.subscriptions[conn][subscription.ID] != nil {
m.logger.WithFields(log.Fields{
"conn": conn.ID(),
"subscription": subscription.ID,
}).Warn("Cannot register subscription twice")
return []error{errors.New("Cannot register subscription twice")}
}
m.subscriptions[conn][subscription.ID] = subscription
return nil
}
func (m *subscriptionManager) RemoveSubscription(
conn Connection,
subscription *Subscription,
) {
m.logger.WithFields(log.Fields{
"conn": conn.ID(),
"subscription": subscription.ID,
}).Info("Remove subscription")
// Remove the subscription from its connections' subscription map
delete(m.subscriptions[conn], subscription.ID)
// Remove the connection as well if there are no subscriptions left
if len(m.subscriptions[conn]) == 0 {
delete(m.subscriptions, conn)
}
}
func (m *subscriptionManager) RemoveSubscriptions(conn Connection) {
m.logger.WithFields(log.Fields{
"conn": conn.ID(),
}).Info("Remove subscriptions")
// Only remove subscriptions if we know the connection
if m.subscriptions[conn] != nil {
// Remove subscriptions one by one
for opID := range m.subscriptions[conn] {
m.RemoveSubscription(conn, m.subscriptions[conn][opID])
}
// Remove the connection's subscription map altogether
delete(m.subscriptions, conn)
}
}
func validateSubscription(s *Subscription) []error {
errs := []error{}
if s.ID == "" {
errs = append(errs, errors.New("Subscription ID is empty"))
}
if s.Connection == nil {
errs = append(errs, errors.New("Subscription is not associated with a connection"))
}
if s.Query == "" {
errs = append(errs, errors.New("Subscription query is empty"))
}
if s.SendData == nil {
errs = append(errs, errors.New("Subscription has no SendData function set"))
}
return errs
}