diff --git a/v2/doc.go b/v2/doc.go index 7b86fb0ac..0c91b0d9e 100644 --- a/v2/doc.go +++ b/v2/doc.go @@ -513,7 +513,9 @@ This plan can now be executed by using the Resolver. func ExampleExecuteOperation() { var preparedPlan plan.Plan - resolver := resolve.New(context.Background()) + resolver := resolve.New(context.Background(), resolve.ResolverOptions{ + MaxConcurrency: 1024, + }) ctx := resolve.NewContext(context.Background()) diff --git a/v2/pkg/engine/resolve/resolvable.go b/v2/pkg/engine/resolve/resolvable.go index 0873a8b22..f00e0848e 100644 --- a/v2/pkg/engine/resolve/resolvable.go +++ b/v2/pkg/engine/resolve/resolvable.go @@ -32,9 +32,7 @@ type Resolvable struct { func NewResolvable() *Resolvable { return &Resolvable{ - storage: &astjson.JSON{ - Nodes: make([]astjson.Node, 0, 4096), - }, + storage: &astjson.JSON{}, } } diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 6d3d5899b..9f8f64488 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -3,6 +3,7 @@ package resolve import ( + "bytes" "context" "io" "sync" @@ -10,12 +11,13 @@ import ( "github.com/buger/jsonparser" "github.com/pkg/errors" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" - "github.com/wundergraph/graphql-go-tools/v2/pkg/pool" ) type Resolver struct { - ctx context.Context - toolPool sync.Pool + ctx context.Context + toolPool sync.Pool + limitMaxConcurrency bool + maxConcurrency chan struct{} } type tools struct { @@ -23,9 +25,19 @@ type tools struct { loader *Loader } +type ResolverOptions struct { + // MaxConcurrency limits the number of concurrent resolve operations + // if set to 0, no limit is applied + // It is advised to set this to a reasonable value to prevent excessive memory usage + // Each concurrent resolve operation allocates ~50kb of memory + // In addition, there's a limit of how many concurrent requests can be efficiently resolved + // This depends on the number of CPU cores available, the complexity of the operations, and the origin services + MaxConcurrency int +} + // New returns a new Resolver, ctx.Done() is used to cancel all active subscriptions & streams -func New(ctx context.Context) *Resolver { - return &Resolver{ +func New(ctx context.Context, options ResolverOptions) *Resolver { + resolver := &Resolver{ ctx: ctx, toolPool: sync.Pool{ New: func() interface{} { @@ -36,9 +48,21 @@ func New(ctx context.Context) *Resolver { }, }, } + if options.MaxConcurrency > 0 { + semaphore := make(chan struct{}, options.MaxConcurrency) + for i := 0; i < options.MaxConcurrency; i++ { + semaphore <- struct{}{} + } + resolver.limitMaxConcurrency = true + resolver.maxConcurrency = semaphore + } + return resolver } func (r *Resolver) getTools() *tools { + if r.limitMaxConcurrency { + <-r.maxConcurrency + } t := r.toolPool.Get().(*tools) return t } @@ -47,6 +71,9 @@ func (r *Resolver) putTools(t *tools) { t.loader.Free() t.resolvable.Reset() r.toolPool.Put(t) + if r.limitMaxConcurrency { + r.maxConcurrency <- struct{}{} + } } func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLResponse, data []byte, writer io.Writer) (err error) { @@ -79,14 +106,12 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ return writeAndFlush(writer, msg) } - buf := pool.BytesBuffer.Get() - defer pool.BytesBuffer.Put(buf) - if err := subscription.Trigger.InputTemplate.Render(ctx, nil, buf); err != nil { + buf := bytes.NewBuffer(nil) + err = subscription.Trigger.InputTemplate.Render(ctx, nil, buf) + if err != nil { return err } - rendered := buf.Bytes() - subscriptionInput := make([]byte, len(rendered)) - copy(subscriptionInput, rendered) + subscriptionInput := buf.Bytes() if len(ctx.InitialPayload) > 0 { subscriptionInput, err = jsonparser.Set(subscriptionInput, ctx.InitialPayload, "initial_payload") @@ -115,9 +140,6 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ return err } - t := r.getTools() - defer r.putTools(t) - for { select { case <-resolverDone: @@ -126,17 +148,21 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ if !ok { return nil } - t.resolvable.Reset() + t := r.getTools() if err := t.resolvable.InitSubscription(ctx, data, subscription.Trigger.PostProcessing); err != nil { + r.putTools(t) return err } if err := t.loader.LoadGraphQLResponseData(ctx, subscription.Response, t.resolvable); err != nil { + r.putTools(t) return err } if err := t.resolvable.Resolve(ctx.ctx, subscription.Response.Data, writer); err != nil { + r.putTools(t) return err } writer.Flush() + r.putTools(t) } } } diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 1073a8461..9d411fed9 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -55,7 +55,9 @@ func fakeDataSourceWithInputCheck(t TestingTB, input []byte, data []byte) *_fake } func newResolver(ctx context.Context) *Resolver { - return New(ctx) + return New(ctx, ResolverOptions{ + MaxConcurrency: 1024, + }) } type customResolver struct{} diff --git a/v2/pkg/graphql/execution_engine_v2.go b/v2/pkg/graphql/execution_engine_v2.go index 97d7c39ea..9a2f7ccb4 100644 --- a/v2/pkg/graphql/execution_engine_v2.go +++ b/v2/pkg/graphql/execution_engine_v2.go @@ -201,10 +201,12 @@ func NewExecutionEngineV2(ctx context.Context, logger abstractlogger.Logger, eng } return &ExecutionEngineV2{ - logger: logger, - config: engineConfig, - planner: plan.NewPlanner(ctx, engineConfig.plannerConfig), - resolver: resolve.New(ctx), + logger: logger, + config: engineConfig, + planner: plan.NewPlanner(ctx, engineConfig.plannerConfig), + resolver: resolve.New(ctx, resolve.ResolverOptions{ + MaxConcurrency: 1024, + }), internalExecutionContextPool: sync.Pool{ New: func() interface{} { return newInternalExecutionContext()