diff --git a/router-tests/timeout_test.go b/router-tests/timeout_test.go index c96d97303c..418a9f38a0 100644 --- a/router-tests/timeout_test.go +++ b/router-tests/timeout_test.go @@ -1,13 +1,15 @@ package integration import ( + "net/http" + "testing" + "time" + "github.com/stretchr/testify/require" + "github.com/wundergraph/cosmo/router-tests/testenv" "github.com/wundergraph/cosmo/router/core" "github.com/wundergraph/cosmo/router/pkg/config" - "net/http" - "testing" - "time" ) func TestTimeouts(t *testing.T) { @@ -30,36 +32,73 @@ func TestTimeouts(t *testing.T) { } }` - t.Run("applies RequestTimeout", func(t *testing.T) { + t.Run("Per subgraph timeouts", func(t *testing.T) { t.Parallel() - hobbySubgraphSleep := testenv.SubgraphsConfig{ - Hobbies: testenv.SubgraphConfig{ - Middleware: func(handler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - time.Sleep(5 * time.Millisecond) // Slow response - w.Write([]byte("Hello, world!")) - }) + subgraphSleep := func(hobbies, employees time.Duration) testenv.SubgraphsConfig { + return testenv.SubgraphsConfig{ + Hobbies: testenv.SubgraphConfig{ + Middleware: func(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(hobbies) + handler.ServeHTTP(w, r) + }) + }, }, - }, + Employees: testenv.SubgraphConfig{ + Middleware: func(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(employees) // Slow response + handler.ServeHTTP(w, r) + }) + }, + }, + } } trafficConfig := config.TrafficShapingRules{ All: config.GlobalSubgraphRequestRule{ - RequestTimeout: 500 * time.Millisecond, + RequestTimeout: 200 * time.Millisecond, }, Subgraphs: map[string]*config.GlobalSubgraphRequestRule{ "hobbies": { - RequestTimeout: 3 * time.Millisecond, + RequestTimeout: 300 * time.Millisecond, }, }, } - t.Run("applied subgraph timeout to request", func(t *testing.T) { + + t.Run("no timeout on hobbies subgraph", func(t *testing.T) { + t.Parallel() + + hobbiesDelay := 200 * time.Millisecond // 200ms is lower than the hobbies 300ms timeout + employeesDelay := 100 * time.Millisecond // 100ms is lower than the global 200ms timeout + + testenv.Run(t, &testenv.Config{ + Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay), + RouterOptions: []core.Option{ + core.WithSubgraphTransportOptions( + core.NewSubgraphTransportOptions(trafficConfig)), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: queryEmployeeWithHobby, + }) + + // It can also result in invalid JSON, but we don't care about that here + require.NotContains(t, res.Body, "Failed to fetch from Subgraph 'hobbies'") + + require.Equal(t, `{"data":{"employee":{"id":1,"hobbies":[{},{"name":"Counter Strike"},{},{},{}]}}}`, res.Body) + }) + }) + + t.Run("timeout on hobbies request", func(t *testing.T) { t.Parallel() + hobbiesDelay := 500 * time.Millisecond // 500 is bigger than hobbies 300ms timeout + employeesDelay := 100 * time.Millisecond // 100ms is lower than the global 200ms timeout + testenv.Run(t, &testenv.Config{ - Subgraphs: hobbySubgraphSleep, + Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay), RouterOptions: []core.Option{ core.WithSubgraphTransportOptions( core.NewSubgraphTransportOptions(trafficConfig)), @@ -74,11 +113,14 @@ func TestTimeouts(t *testing.T) { }) }) - t.Run("Subgraph timeout options don't affect unrelated subgraph", func(t *testing.T) { + t.Run("no timeout on employees subgraph", func(t *testing.T) { t.Parallel() + hobbiesDelay := 500 * time.Millisecond // hobbies delay doesn't matter in this test case + employeesDelay := 100 * time.Millisecond // 100ms is lower than the global 200ms timeout + testenv.Run(t, &testenv.Config{ - Subgraphs: hobbySubgraphSleep, + Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay), RouterOptions: []core.Option{ core.WithSubgraphTransportOptions( core.NewSubgraphTransportOptions(trafficConfig)), @@ -90,6 +132,27 @@ func TestTimeouts(t *testing.T) { require.Equal(t, `{"data":{"employee":{"id":1}}}`, res.Body) }) }) + + t.Run("timeout on employees subgraph", func(t *testing.T) { + t.Parallel() + + hobbiesDelay := 500 * time.Millisecond // 500 is bigger than hobbies 300ms timeout + employeesDelay := 300 * time.Millisecond // 300ms is bigger than the global 200ms timeout + + testenv.Run(t, &testenv.Config{ + Subgraphs: subgraphSleep(hobbiesDelay, employeesDelay), + RouterOptions: []core.Option{ + core.WithSubgraphTransportOptions( + core.NewSubgraphTransportOptions(trafficConfig)), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: queryEmployeeWithNoHobby, + }) + + require.Contains(t, res.Body, "Failed to fetch from Subgraph 'employees'") + }) + }) }) t.Run("ResponseHeaderTimeout exceeded", func(t *testing.T) { diff --git a/router/core/factoryresolver.go b/router/core/factoryresolver.go index f69b701cec..6ed55a1479 100644 --- a/router/core/factoryresolver.go +++ b/router/core/factoryresolver.go @@ -7,7 +7,6 @@ import ( "net/http" "net/url" "slices" - "time" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argument_templates" @@ -41,7 +40,6 @@ type FactoryResolver interface { type ApiTransportFactory interface { RoundTripper(enableSingleFlight bool, transport http.RoundTripper) http.RoundTripper - DefaultTransportTimeout() time.Duration DefaultHTTPProxyURL() *url.URL } @@ -73,7 +71,6 @@ func NewDefaultFactoryResolver( ) *DefaultFactoryResolver { defaultHttpClient := &http.Client{ - Timeout: transportFactory.DefaultTransportTimeout(), Transport: transportFactory.RoundTripper(enableSingleFlight, baseTransport), } streamingClient := &http.Client{ diff --git a/router/core/timeout_transport.go b/router/core/timeout_transport.go index 7289f67075..18a41fd6d3 100644 --- a/router/core/timeout_transport.go +++ b/router/core/timeout_transport.go @@ -2,8 +2,9 @@ package core import ( "context" - "go.uber.org/zap" "net/http" + + "go.uber.org/zap" ) type TimeoutTransport struct { @@ -40,14 +41,24 @@ func (tt *TimeoutTransport) RoundTrip(req *http.Request) (*http.Response, error) return nil, nil } subgraph := rq.ActiveSubgraph(req) + if subgraph != nil && subgraph.Name != "" && tt.subgraphTrippers[subgraph.Name] != nil { timeout := tt.opts.SubgraphMap[subgraph.Name].RequestTimeout if timeout > 0 { ctx, cancel := context.WithTimeout(req.Context(), timeout) defer cancel() + return tt.subgraphTrippers[subgraph.Name].RoundTrip(req.WithContext(ctx)) } return tt.subgraphTrippers[subgraph.Name].RoundTrip(req) } + + if tt.opts.RequestTimeout > 0 { + ctx, cancel := context.WithTimeout(req.Context(), tt.opts.RequestTimeout) + defer cancel() + + return tt.defaultTransport.RoundTrip(req.WithContext(ctx)) + } + return tt.defaultTransport.RoundTrip(req) } diff --git a/router/core/timeout_transport_test.go b/router/core/timeout_transport_test.go index b0a3d864c6..b1bd5396ba 100644 --- a/router/core/timeout_transport_test.go +++ b/router/core/timeout_transport_test.go @@ -3,13 +3,14 @@ package core import ( "context" "crypto/tls" - "github.com/stretchr/testify/require" - "go.uber.org/zap" "net/http" "net/http/httptest" "os" "testing" "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestTimeoutTransport(t *testing.T) { diff --git a/router/core/transport.go b/router/core/transport.go index 2efe07a5ee..40a55c0bbe 100644 --- a/router/core/transport.go +++ b/router/core/transport.go @@ -3,14 +3,14 @@ package core import ( "bytes" "fmt" - "go.opentelemetry.io/otel/propagation" "io" "net/http" "net/url" "sort" "strconv" "sync" - "time" + + "go.opentelemetry.io/otel/propagation" otelmetric "go.opentelemetry.io/otel/metric" @@ -18,22 +18,20 @@ import ( "go.opentelemetry.io/otel/attribute" sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + "github.com/wundergraph/cosmo/router/pkg/metric" "github.com/wundergraph/cosmo/router/pkg/otel" "github.com/wundergraph/cosmo/router/pkg/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.21.0" - "github.com/wundergraph/cosmo/router/internal/docker" - "github.com/wundergraph/cosmo/router/internal/retrytransport" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" "github.com/wundergraph/graphql-go-tools/v2/pkg/pool" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" otrace "go.opentelemetry.io/otel/trace" "go.uber.org/zap" -) -var ( - defaultTimeout = 60 * time.Second + "github.com/wundergraph/cosmo/router/internal/docker" + "github.com/wundergraph/cosmo/router/internal/retrytransport" ) type TransportPreHandler func(req *http.Request, ctx RequestContext) (*http.Request, *http.Response) @@ -397,13 +395,6 @@ func (t TransportFactory) RoundTripper(enableSingleFlight bool, baseTransport ht return tp } -func (t TransportFactory) DefaultTransportTimeout() time.Duration { - if t.subgraphTransportOptions != nil { - return t.subgraphTransportOptions.RequestTimeout - } - return defaultTimeout -} - func (t TransportFactory) DefaultHTTPProxyURL() *url.URL { return nil }