Skip to content

Commit

Permalink
Merge branch 'master' into ale/eng-6219-make-router-tests-less-flaky
Browse files Browse the repository at this point in the history
# Conflicts:
#	v2/pkg/engine/resolve/resolve.go
  • Loading branch information
alepane21 committed Jan 28, 2025
2 parents 0055b6f + 3cbb4b5 commit 32d6911
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 86 deletions.
2 changes: 1 addition & 1 deletion release-please-manifest.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"v2": "2.0.0-rc.142",
"v2": "2.0.0-rc.145",
"execution": "1.2.0"
}
21 changes: 21 additions & 0 deletions v2/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# Changelog

## [2.0.0-rc.145](https://github.com/wundergraph/graphql-go-tools/compare/v2.0.0-rc.144...v2.0.0-rc.145) (2025-01-27)


### Features

* add normalizedQuery to query plan and request info to trace ([#1045](https://github.com/wundergraph/graphql-go-tools/issues/1045)) ([e75a1dd](https://github.com/wundergraph/graphql-go-tools/commit/e75a1dd24d5255b6cc990269c5c7922f851f4fc1))

## [2.0.0-rc.144](https://github.com/wundergraph/graphql-go-tools/compare/v2.0.0-rc.143...v2.0.0-rc.144) (2025-01-23)


### Bug Fixes

* remove semaphore from ResolveGraphQLSubscription ([#1043](https://github.com/wundergraph/graphql-go-tools/issues/1043)) ([76d644e](https://github.com/wundergraph/graphql-go-tools/commit/76d644eb2316bfc71ae3a09cd4a5614998f26f43))

## [2.0.0-rc.143](https://github.com/wundergraph/graphql-go-tools/compare/v2.0.0-rc.142...v2.0.0-rc.143) (2025-01-23)


### Bug Fixes

* delete leftover heartbeat connections ([#1033](https://github.com/wundergraph/graphql-go-tools/issues/1033)) ([f7492d3](https://github.com/wundergraph/graphql-go-tools/commit/f7492d39b044f4901f695fb1e7718c9fe912504c))

## [2.0.0-rc.142](https://github.com/wundergraph/graphql-go-tools/compare/v2.0.0-rc.141...v2.0.0-rc.142) (2025-01-19)


Expand Down
15 changes: 15 additions & 0 deletions v2/pkg/engine/resolve/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ type PhaseStats struct {
DurationSinceStartPretty string `json:"duration_since_start_pretty"`
}

type requestContextKey struct{}

func SetTraceStart(ctx context.Context, predictableDebugTimings bool) context.Context {
info := &TraceInfo{}
if predictableDebugTimings {
Expand Down Expand Up @@ -267,3 +269,16 @@ func SetPlannerStats(ctx context.Context, stats PhaseStats) {
}
info.PlannerStats = SetDebugStats(info, stats, 4)
}

func GetRequest(ctx context.Context) *RequestData {
// The context might not have trace info, in that case we return nil
req, ok := ctx.Value(requestContextKey{}).(*RequestData)
if !ok {
return nil
}
return req
}

func SetRequest(ctx context.Context, r *RequestData) context.Context {
return context.WithValue(ctx, requestContextKey{}, r)
}
21 changes: 12 additions & 9 deletions v2/pkg/engine/resolve/fetchtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
type FetchTreeNode struct {
Kind FetchTreeNodeKind `json:"kind"`
// Only set for subscription
Trigger *FetchTreeNode `json:"trigger"`
Item *FetchItem `json:"item"`
ChildNodes []*FetchTreeNode `json:"child_nodes"`
Trigger *FetchTreeNode `json:"trigger"`
Item *FetchItem `json:"item"`
ChildNodes []*FetchTreeNode `json:"child_nodes"`
NormalizedQuery string `json:"normalized_query"`
}

type FetchTreeNodeKind string
Expand Down Expand Up @@ -147,11 +148,12 @@ func (n *FetchTreeNode) Trace() *FetchTreeTraceNode {
}

type FetchTreeQueryPlanNode struct {
Version string `json:"version,omitempty"`
Kind FetchTreeNodeKind `json:"kind"`
Trigger *FetchTreeQueryPlan `json:"trigger,omitempty"`
Children []*FetchTreeQueryPlanNode `json:"children,omitempty"`
Fetch *FetchTreeQueryPlan `json:"fetch,omitempty"`
Version string `json:"version,omitempty"`
Kind FetchTreeNodeKind `json:"kind"`
Trigger *FetchTreeQueryPlan `json:"trigger,omitempty"`
Children []*FetchTreeQueryPlanNode `json:"children,omitempty"`
Fetch *FetchTreeQueryPlan `json:"fetch,omitempty"`
NormalizedQuery string `json:"normalizedQuery,omitempty"`
}

type FetchTreeQueryPlan struct {
Expand Down Expand Up @@ -194,7 +196,8 @@ func (n *FetchTreeNode) queryPlan() *FetchTreeQueryPlanNode {
return nil
}
queryPlan := &FetchTreeQueryPlanNode{
Kind: n.Kind,
Kind: n.Kind,
NormalizedQuery: n.NormalizedQuery,
}
switch n.Kind {
case FetchTreeNodeKindSingle:
Expand Down
117 changes: 48 additions & 69 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Resolver struct {
maxConcurrency chan struct{}

triggers map[uint64]*trigger
heartbeatSubLock *sync.Mutex
heartbeatSubscriptions map[*Context]*sub
events chan subscriptionEvent
triggerEventsSem *semaphore.Weighted
Expand Down Expand Up @@ -189,6 +190,7 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
propagateSubgraphStatusCodes: options.PropagateSubgraphStatusCodes,
events: make(chan subscriptionEvent),
triggers: make(map[uint64]*trigger),
heartbeatSubLock: &sync.Mutex{},
heartbeatSubscriptions: make(map[*Context]*sub),
reporter: options.Reporter,
asyncErrorWriter: options.AsyncErrorWriter,
Expand Down Expand Up @@ -407,6 +409,9 @@ func (r *Resolver) handleEvent(event subscriptionEvent) {
}

func (r *Resolver) handleHeartbeat(data []byte) {
r.heartbeatSubLock.Lock()
defer r.heartbeatSubLock.Unlock()

if r.options.Debug {
fmt.Printf("resolver:heartbeat:%d\n", len(r.heartbeatSubscriptions))
}
Expand All @@ -417,7 +422,7 @@ func (r *Resolver) handleHeartbeat(data []byte) {
s.mux.Lock()
skipHeartbeat := now.Sub(s.lastWrite) < r.multipartSubHeartbeatInterval
s.mux.Unlock()
if skipHeartbeat {
if skipHeartbeat || (c.Context().Err() != nil && errors.Is(c.Context().Err(), context.Canceled)) {
continue
}

Expand All @@ -427,6 +432,12 @@ func (r *Resolver) handleHeartbeat(data []byte) {

s.mux.Lock()
if _, err := s.writer.Write(data); err != nil {
if errors.Is(err, context.Canceled) {
// client disconnected
s.mux.Unlock()
_ = r.AsyncUnsubscribeSubscription(s.id)
return
}
r.asyncErrorWriter.WriteError(c, err, nil, s.writer)
}
err := s.writer.Flush()
Expand Down Expand Up @@ -466,30 +477,7 @@ func (r *Resolver) handleTriggerInitialized(triggerID uint64) {
}

func (r *Resolver) handleTriggerDone(triggerID uint64) {
trig, ok := r.triggers[triggerID]
if !ok {
return
}
isInitialized := trig.initialized
wg := trig.inFlight
subscriptionCount := len(trig.subscriptions)

delete(r.triggers, triggerID)

go func() {
if wg != nil {
wg.Wait()
}
for _, s := range trig.subscriptions {
s.writer.Complete()
}
if r.reporter != nil {
r.reporter.SubscriptionCountDec(subscriptionCount)
if isInitialized {
r.reporter.TriggerCountDec(1)
}
}
}()
r.shutdownTrigger(triggerID)
}

func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription) {
Expand All @@ -508,7 +496,9 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription)
executor: add.executor,
}
if add.ctx.ExecutionOptions.SendHeartbeat {
r.heartbeatSubLock.Lock()
r.heartbeatSubscriptions[add.ctx] = s
r.heartbeatSubLock.Unlock()
}
trig, ok := r.triggers[triggerID]
if ok {
Expand Down Expand Up @@ -634,20 +624,9 @@ func (r *Resolver) handleRemoveSubscription(id SubscriptionIdentifier) {
removed := 0
for u := range r.triggers {
trig := r.triggers[u]
for ctx, s := range trig.subscriptions {
if s.id == id {

if ctx.Context().Err() == nil {
s.writer.Complete()
}
delete(r.heartbeatSubscriptions, ctx)
delete(trig.subscriptions, ctx)
if r.options.Debug {
fmt.Printf("resolver:trigger:subscription:removed:%d:%d\n", trig.id, id.SubscriptionID)
}
removed++
}
}
removed += r.shutdownTriggerSubscriptions(u, func(sID SubscriptionIdentifier) bool {
return sID == id
})
if len(trig.subscriptions) == 0 {
r.shutdownTrigger(trig.id)
}
Expand All @@ -663,20 +642,9 @@ func (r *Resolver) handleRemoveClient(id int64) {
}
removed := 0
for u := range r.triggers {
for c, s := range r.triggers[u].subscriptions {
if s.id.ConnectionID == id && !s.id.internal {

if c.Context().Err() == nil {
s.writer.Complete()
}

delete(r.triggers[u].subscriptions, c)
if r.options.Debug {
fmt.Printf("resolver:trigger:subscription:done:%d:%d\n", u, s.id.SubscriptionID)
}
removed++
}
}
removed += r.shutdownTriggerSubscriptions(u, func(sID SubscriptionIdentifier) bool {
return sID.ConnectionID == id && !sID.internal
})
if len(r.triggers[u].subscriptions) == 0 {
r.shutdownTrigger(r.triggers[u].id)
}
Expand Down Expand Up @@ -737,30 +705,46 @@ func (r *Resolver) shutdownTrigger(id uint64) {
return
}
count := len(trig.subscriptions)
r.shutdownTriggerSubscriptions(id, nil)
trig.cancel()
delete(r.triggers, id)
if r.options.Debug {
fmt.Printf("resolver:trigger:done:%d\n", trig.id)
}
if r.reporter != nil {
r.reporter.SubscriptionCountDec(count)
if trig.initialized {
r.reporter.TriggerCountDec(1)
}
}
}

func (r *Resolver) shutdownTriggerSubscriptions(id uint64, shutdownMatcher func(a SubscriptionIdentifier) bool) int {
trig, ok := r.triggers[id]
if !ok {
return 0
}
removed := 0
for c, s := range trig.subscriptions {
if shutdownMatcher != nil && !shutdownMatcher(s.id) {
continue
}
if c.Context().Err() == nil {
s.writer.Complete()
}
if s.completed != nil {
close(s.completed)
}
r.heartbeatSubLock.Lock()
delete(r.heartbeatSubscriptions, c)
r.heartbeatSubLock.Unlock()
delete(trig.subscriptions, c)
if r.options.Debug {
fmt.Printf("resolver:trigger:subscription:done:%d:%d\n", trig.id, s.id.SubscriptionID)
}
removed++
}
trig.cancel()
delete(r.triggers, id)
if r.options.Debug {
fmt.Printf("resolver:trigger:done:%d\n", trig.id)
}
if r.reporter != nil {
r.reporter.SubscriptionCountDec(count)
if trig.initialized {
r.reporter.TriggerCountDec(1)
}
}
return removed
}

func (r *Resolver) handleShutdown() {
Expand Down Expand Up @@ -819,11 +803,6 @@ func (r *Resolver) AsyncUnsubscribeClient(connectionID int64) error {
}

func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQLSubscription, writer SubscriptionResponseWriter) error {
if err := r.triggerEventsSem.Acquire(r.ctx, 1); err != nil {
return err
}
defer r.triggerEventsSem.Release(1)

if subscription.Trigger.Source == nil {
return errors.New("no data source found")
}
Expand Down
Loading

0 comments on commit 32d6911

Please sign in to comment.