Skip to content

Commit

Permalink
feat: add control for max concurrency to engine (#706)
Browse files Browse the repository at this point in the history
  • Loading branch information
jensneuse authored Dec 20, 2023
1 parent 97595b4 commit c1449c3
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 24 deletions.
4 changes: 3 additions & 1 deletion v2/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,9 @@ This plan can now be executed by using the Resolver.

func ExampleExecuteOperation() {
var preparedPlan plan.Plan
resolver := resolve.New(context.Background())
resolver := resolve.New(context.Background(), resolve.ResolverOptions{
MaxConcurrency: 1024,
})

ctx := resolve.NewContext(context.Background())

Expand Down
4 changes: 1 addition & 3 deletions v2/pkg/engine/resolve/resolvable.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ type Resolvable struct {

func NewResolvable() *Resolvable {
return &Resolvable{
storage: &astjson.JSON{
Nodes: make([]astjson.Node, 0, 4096),
},
storage: &astjson.JSON{},
}
}

Expand Down
56 changes: 41 additions & 15 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,41 @@
package resolve

import (
"bytes"
"context"
"io"
"sync"

"github.com/buger/jsonparser"
"github.com/pkg/errors"
"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
"github.com/wundergraph/graphql-go-tools/v2/pkg/pool"
)

type Resolver struct {
ctx context.Context
toolPool sync.Pool
ctx context.Context
toolPool sync.Pool
limitMaxConcurrency bool
maxConcurrency chan struct{}
}

type tools struct {
resolvable *Resolvable
loader *Loader
}

type ResolverOptions struct {
// MaxConcurrency limits the number of concurrent resolve operations
// if set to 0, no limit is applied
// It is advised to set this to a reasonable value to prevent excessive memory usage
// Each concurrent resolve operation allocates ~50kb of memory
// In addition, there's a limit of how many concurrent requests can be efficiently resolved
// This depends on the number of CPU cores available, the complexity of the operations, and the origin services
MaxConcurrency int
}

// New returns a new Resolver, ctx.Done() is used to cancel all active subscriptions & streams
func New(ctx context.Context) *Resolver {
return &Resolver{
func New(ctx context.Context, options ResolverOptions) *Resolver {
resolver := &Resolver{
ctx: ctx,
toolPool: sync.Pool{
New: func() interface{} {
Expand All @@ -36,9 +48,21 @@ func New(ctx context.Context) *Resolver {
},
},
}
if options.MaxConcurrency > 0 {
semaphore := make(chan struct{}, options.MaxConcurrency)
for i := 0; i < options.MaxConcurrency; i++ {
semaphore <- struct{}{}
}
resolver.limitMaxConcurrency = true
resolver.maxConcurrency = semaphore
}
return resolver
}

func (r *Resolver) getTools() *tools {
if r.limitMaxConcurrency {
<-r.maxConcurrency
}
t := r.toolPool.Get().(*tools)
return t
}
Expand All @@ -47,6 +71,9 @@ func (r *Resolver) putTools(t *tools) {
t.loader.Free()
t.resolvable.Reset()
r.toolPool.Put(t)
if r.limitMaxConcurrency {
r.maxConcurrency <- struct{}{}
}
}

func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLResponse, data []byte, writer io.Writer) (err error) {
Expand Down Expand Up @@ -79,14 +106,12 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ
return writeAndFlush(writer, msg)
}

buf := pool.BytesBuffer.Get()
defer pool.BytesBuffer.Put(buf)
if err := subscription.Trigger.InputTemplate.Render(ctx, nil, buf); err != nil {
buf := bytes.NewBuffer(nil)
err = subscription.Trigger.InputTemplate.Render(ctx, nil, buf)
if err != nil {
return err
}
rendered := buf.Bytes()
subscriptionInput := make([]byte, len(rendered))
copy(subscriptionInput, rendered)
subscriptionInput := buf.Bytes()

if len(ctx.InitialPayload) > 0 {
subscriptionInput, err = jsonparser.Set(subscriptionInput, ctx.InitialPayload, "initial_payload")
Expand Down Expand Up @@ -115,9 +140,6 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ
return err
}

t := r.getTools()
defer r.putTools(t)

for {
select {
case <-resolverDone:
Expand All @@ -126,17 +148,21 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ
if !ok {
return nil
}
t.resolvable.Reset()
t := r.getTools()
if err := t.resolvable.InitSubscription(ctx, data, subscription.Trigger.PostProcessing); err != nil {
r.putTools(t)
return err
}
if err := t.loader.LoadGraphQLResponseData(ctx, subscription.Response, t.resolvable); err != nil {
r.putTools(t)
return err
}
if err := t.resolvable.Resolve(ctx.ctx, subscription.Response.Data, writer); err != nil {
r.putTools(t)
return err
}
writer.Flush()
r.putTools(t)
}
}
}
4 changes: 3 additions & 1 deletion v2/pkg/engine/resolve/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func fakeDataSourceWithInputCheck(t TestingTB, input []byte, data []byte) *_fake
}

func newResolver(ctx context.Context) *Resolver {
return New(ctx)
return New(ctx, ResolverOptions{
MaxConcurrency: 1024,
})
}

type customResolver struct{}
Expand Down
10 changes: 6 additions & 4 deletions v2/pkg/graphql/execution_engine_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,12 @@ func NewExecutionEngineV2(ctx context.Context, logger abstractlogger.Logger, eng
}

return &ExecutionEngineV2{
logger: logger,
config: engineConfig,
planner: plan.NewPlanner(ctx, engineConfig.plannerConfig),
resolver: resolve.New(ctx),
logger: logger,
config: engineConfig,
planner: plan.NewPlanner(ctx, engineConfig.plannerConfig),
resolver: resolve.New(ctx, resolve.ResolverOptions{
MaxConcurrency: 1024,
}),
internalExecutionContextPool: sync.Pool{
New: func() interface{} {
return newInternalExecutionContext()
Expand Down

0 comments on commit c1449c3

Please sign in to comment.