diff --git a/core/src/main/java/com/github/bsideup/graphql/reactive/ReactiveExecutionStrategy.java b/core/src/main/java/com/github/bsideup/graphql/reactive/ReactiveExecutionStrategy.java index e755020..5aaa397 100644 --- a/core/src/main/java/com/github/bsideup/graphql/reactive/ReactiveExecutionStrategy.java +++ b/core/src/main/java/com/github/bsideup/graphql/reactive/ReactiveExecutionStrategy.java @@ -20,6 +20,7 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; +import static java.util.Collections.emptyList; import static java.util.stream.Collectors.toList; public class ReactiveExecutionStrategy extends ExecutionStrategy { @@ -167,6 +168,8 @@ protected ExecutionResultImpl complexChangesFlow( .combineLatest((Iterable>>) subFlows::iterator, resultCombiner) // Take one, then start producing changes .take(1) + // Fallback to empty array if subFlows are empty + .singleElement().toSingle(emptyList()).toFlowable() .doOnNext(__ -> initialized.set(true)) .map(it -> new Change(context, it)) .mergeWith(Flowable diff --git a/core/src/test/java/com/github/bsideup/graphql/reactive/ReactiveExecutionStrategyTest.java b/core/src/test/java/com/github/bsideup/graphql/reactive/ReactiveExecutionStrategyTest.java index 3ea0d8f..65603a3 100644 --- a/core/src/test/java/com/github/bsideup/graphql/reactive/ReactiveExecutionStrategyTest.java +++ b/core/src/test/java/com/github/bsideup/graphql/reactive/ReactiveExecutionStrategyTest.java @@ -15,10 +15,12 @@ import reactor.test.StepVerifier; import java.time.Duration; -import java.util.Arrays; +import java.util.ArrayList; import java.util.function.Consumer; import static graphql.schema.GraphQLObjectType.newObject; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -156,7 +158,7 @@ public void testArray() throws Exception { subscriber .assertChanges(it -> it.containsExactly( - tuple("01:800", "", ImmutableMap.of("a", Arrays.asList(ImmutableMap.of("b", "0 0"), ImmutableMap.of("b", "0 1"), ImmutableMap.of("b", "0 2")))), + tuple("01:800", "", ImmutableMap.of("a", asList(ImmutableMap.of("b", "0 0"), ImmutableMap.of("b", "0 1"), ImmutableMap.of("b", "0 2")))), tuple("02:100", "a[0].b", "1 0"), tuple("02:100", "a[1].b", "1 1"), @@ -173,6 +175,42 @@ public void testArray() throws Exception { .assertComplete(); } + @Test + public void testArrayStartsWith() throws Exception { + GraphQLObjectType innerType = newObject() + .name("Inner") + .field(newStringField("b").staticValue("foo")) + .build(); + + GraphQLSchema schema = newQuerySchema(it -> it + .field( + newField("a", new GraphQLList(innerType)) + .dataFetcher(env -> Flowable + .just(asList(true, true, true)) + .delay(1, SECONDS, scheduler) + .startWith(new ArrayList()) + ) + ) + ); + + ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a { b } }"); + + Flowable.fromPublisher((Publisher) executionResult.getData()).timestamp(scheduler).subscribe(subscriber); + + scheduler.advanceTimeBy(2, SECONDS); + + subscriber + .assertChanges(it -> it.containsExactly( + tuple("00:000", "", ImmutableMap.of("a", emptyList())), + + tuple("01:000", "a", asList(ImmutableMap.of("b", "foo"), ImmutableMap.of("b", "foo"), ImmutableMap.of("b", "foo"))), + // FIXME this values are duplicated. For now, let's call it "at least once" delivery :D + tuple("01:000", "a[1]", ImmutableMap.of("b", "foo")), + tuple("01:000", "a[2]", ImmutableMap.of("b", "foo")) + )) + .assertComplete(); + } + @Test public void testAnyPublisher() throws Exception { Duration tick = Duration.ofSeconds(1);