Skip to content

Commit

Permalink
fix: subgraph timeout can't be bigger than global timeout (#1548)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsergiy authored Jan 31, 2025
1 parent b99e0bc commit 92b69a3
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 40 deletions.
101 changes: 82 additions & 19 deletions router-tests/timeout_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package integration

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/config"
"net/http"
"testing"
"time"
)

func TestTimeouts(t *testing.T) {
Expand All @@ -30,36 +32,73 @@ func TestTimeouts(t *testing.T) {
}
}`

t.Run("applies RequestTimeout", func(t *testing.T) {
t.Run("Per subgraph timeouts", func(t *testing.T) {
t.Parallel()

hobbySubgraphSleep := testenv.SubgraphsConfig{
Hobbies: testenv.SubgraphConfig{
Middleware: func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
time.Sleep(5 * time.Millisecond) // Slow response
w.Write([]byte("Hello, world!"))
})
subgraphSleep := func(hobbies, employees time.Duration) testenv.SubgraphsConfig {
return testenv.SubgraphsConfig{
Hobbies: testenv.SubgraphConfig{
Middleware: func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(hobbies)
handler.ServeHTTP(w, r)
})
},
},
},
Employees: testenv.SubgraphConfig{
Middleware: func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(employees) // Slow response
handler.ServeHTTP(w, r)
})
},
},
}
}

trafficConfig := config.TrafficShapingRules{
All: config.GlobalSubgraphRequestRule{
RequestTimeout: 500 * time.Millisecond,
RequestTimeout: 200 * time.Millisecond,
},
Subgraphs: map[string]*config.GlobalSubgraphRequestRule{
"hobbies": {
RequestTimeout: 3 * time.Millisecond,
RequestTimeout: 300 * time.Millisecond,
},
},
}
t.Run("applied subgraph timeout to request", func(t *testing.T) {

t.Run("no timeout on hobbies subgraph", func(t *testing.T) {
t.Parallel()

hobbiesDelay := 200 * time.Millisecond // 200ms is lower than the hobbies 300ms timeout
employeesDelay := 100 * time.Millisecond // 100ms is lower than the global 200ms timeout

testenv.Run(t, &testenv.Config{
Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay),
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: queryEmployeeWithHobby,
})

// It can also result in invalid JSON, but we don't care about that here
require.NotContains(t, res.Body, "Failed to fetch from Subgraph 'hobbies'")

require.Equal(t, `{"data":{"employee":{"id":1,"hobbies":[{},{"name":"Counter Strike"},{},{},{}]}}}`, res.Body)
})
})

t.Run("timeout on hobbies request", func(t *testing.T) {
t.Parallel()

hobbiesDelay := 500 * time.Millisecond // 500 is bigger than hobbies 300ms timeout
employeesDelay := 100 * time.Millisecond // 100ms is lower than the global 200ms timeout

testenv.Run(t, &testenv.Config{
Subgraphs: hobbySubgraphSleep,
Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay),
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
Expand All @@ -74,11 +113,14 @@ func TestTimeouts(t *testing.T) {
})
})

t.Run("Subgraph timeout options don't affect unrelated subgraph", func(t *testing.T) {
t.Run("no timeout on employees subgraph", func(t *testing.T) {
t.Parallel()

hobbiesDelay := 500 * time.Millisecond // hobbies delay doesn't matter in this test case
employeesDelay := 100 * time.Millisecond // 100ms is lower than the global 200ms timeout

testenv.Run(t, &testenv.Config{
Subgraphs: hobbySubgraphSleep,
Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay),
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
Expand All @@ -90,6 +132,27 @@ func TestTimeouts(t *testing.T) {
require.Equal(t, `{"data":{"employee":{"id":1}}}`, res.Body)
})
})

t.Run("timeout on employees subgraph", func(t *testing.T) {
t.Parallel()

hobbiesDelay := 500 * time.Millisecond // 500 is bigger than hobbies 300ms timeout
employeesDelay := 300 * time.Millisecond // 300ms is bigger than the global 200ms timeout

testenv.Run(t, &testenv.Config{
Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay),
RouterOptions: []core.Option{
core.WithSubgraphTransportOptions(
core.NewSubgraphTransportOptions(trafficConfig)),
},
}, func(t *testing.T, xEnv *testenv.Environment) {
res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
Query: queryEmployeeWithNoHobby,
})

require.Contains(t, res.Body, "Failed to fetch from Subgraph 'employees'")
})
})
})

t.Run("ResponseHeaderTimeout exceeded", func(t *testing.T) {
Expand Down
3 changes: 0 additions & 3 deletions router/core/factoryresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"net/url"
"slices"
"time"

"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argument_templates"

Expand Down Expand Up @@ -41,7 +40,6 @@ type FactoryResolver interface {

type ApiTransportFactory interface {
RoundTripper(enableSingleFlight bool, transport http.RoundTripper) http.RoundTripper
DefaultTransportTimeout() time.Duration
DefaultHTTPProxyURL() *url.URL
}

Expand Down Expand Up @@ -73,7 +71,6 @@ func NewDefaultFactoryResolver(
) *DefaultFactoryResolver {

defaultHttpClient := &http.Client{
Timeout: transportFactory.DefaultTransportTimeout(),
Transport: transportFactory.RoundTripper(enableSingleFlight, baseTransport),
}
streamingClient := &http.Client{
Expand Down
13 changes: 12 additions & 1 deletion router/core/timeout_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package core

import (
"context"
"go.uber.org/zap"
"net/http"

"go.uber.org/zap"
)

type TimeoutTransport struct {
Expand Down Expand Up @@ -40,14 +41,24 @@ func (tt *TimeoutTransport) RoundTrip(req *http.Request) (*http.Response, error)
return nil, nil
}
subgraph := rq.ActiveSubgraph(req)

if subgraph != nil && subgraph.Name != "" && tt.subgraphTrippers[subgraph.Name] != nil {
timeout := tt.opts.SubgraphMap[subgraph.Name].RequestTimeout
if timeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()

return tt.subgraphTrippers[subgraph.Name].RoundTrip(req.WithContext(ctx))
}
return tt.subgraphTrippers[subgraph.Name].RoundTrip(req)
}

if tt.opts.RequestTimeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), tt.opts.RequestTimeout)
defer cancel()

return tt.defaultTransport.RoundTrip(req.WithContext(ctx))
}

return tt.defaultTransport.RoundTrip(req)
}
5 changes: 3 additions & 2 deletions router/core/timeout_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package core
import (
"context"
"crypto/tls"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestTimeoutTransport(t *testing.T) {
Expand Down
21 changes: 6 additions & 15 deletions router/core/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,35 @@ package core
import (
"bytes"
"fmt"
"go.opentelemetry.io/otel/propagation"
"io"
"net/http"
"net/url"
"sort"
"strconv"
"sync"
"time"

"go.opentelemetry.io/otel/propagation"

otelmetric "go.opentelemetry.io/otel/metric"

"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
"go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"

semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/wundergraph/cosmo/router/pkg/metric"
"github.com/wundergraph/cosmo/router/pkg/otel"
"github.com/wundergraph/cosmo/router/pkg/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/wundergraph/cosmo/router/internal/docker"
"github.com/wundergraph/cosmo/router/internal/retrytransport"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"github.com/wundergraph/graphql-go-tools/v2/pkg/pool"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
otrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

var (
defaultTimeout = 60 * time.Second
"github.com/wundergraph/cosmo/router/internal/docker"
"github.com/wundergraph/cosmo/router/internal/retrytransport"
)

type TransportPreHandler func(req *http.Request, ctx RequestContext) (*http.Request, *http.Response)
Expand Down Expand Up @@ -397,13 +395,6 @@ func (t TransportFactory) RoundTripper(enableSingleFlight bool, baseTransport ht
return tp
}

func (t TransportFactory) DefaultTransportTimeout() time.Duration {
if t.subgraphTransportOptions != nil {
return t.subgraphTransportOptions.RequestTimeout
}
return defaultTimeout
}

func (t TransportFactory) DefaultHTTPProxyURL() *url.URL {
return nil
}
Expand Down

0 comments on commit 92b69a3

Please sign in to comment.