Skip to content

Commit

Permalink
Merge pull request #310 from salesforce/subscription-ordering
Browse files Browse the repository at this point in the history
Don't call gRPC until the caller executes subscribe()
  • Loading branch information
rmichela authored Apr 11, 2023
2 parents 5f3e1be + 10a8a9a commit ae3f588
Show file tree
Hide file tree
Showing 9 changed files with 597 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<canteen.plugin.version>1.1.0</canteen.plugin.version>

<!-- Dependency Versions -->
<reactive.streams.version>1.0.3</reactive.streams.version>
<reactive.streams.version>1.0.4</reactive.streams.version>
<grpc.version>1.42.1</grpc.version>
<protoc.version>3.19.1</protoc.version> <!-- Same version as grpc-proto -->
<jprotoc.version>1.2.0</jprotoc.version>
Expand Down
13 changes: 0 additions & 13 deletions reactor/reactor-grpc-stub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,6 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ public static <TRequest, TResponse> Mono<TResponse> manyToOne(
s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
subscriberAndGRPCProducer::cancel
);
delegate.apply(observerAndPublisher);

return Flux.from(observerAndPublisher)
.singleOrEmpty();
.doOnSubscribe(s -> delegate.apply(observerAndPublisher))
.singleOrEmpty();
} catch (Throwable throwable) {
return Mono.error(throwable);
}
Expand All @@ -134,9 +134,8 @@ public static <TRequest, TResponse> Flux<TResponse> manyToMany(
s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
subscriberAndGRPCProducer::cancel, prefetch, lowTide
);
delegate.apply(observerAndPublisher);

return Flux.from(observerAndPublisher);
return Flux.from(observerAndPublisher).doOnSubscribe(s -> delegate.apply(observerAndPublisher));
} catch (Throwable throwable) {
return Flux.error(throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
package com.salesforce.reactorgrpc.stub;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
Expand All @@ -22,7 +20,6 @@


public class ReactorClientStreamObserverAndPublisherTest {
private static final Logger log = LoggerFactory.getLogger(ReactorClientStreamObserverAndPublisherTest.class.getName());

private static final int DEFAULT_CHUNK_SIZE = 512;
private static final int PART_OF_CHUNK = DEFAULT_CHUNK_SIZE * 2 / 3;
Expand Down Expand Up @@ -76,18 +73,12 @@ public void discardQueueTest() {
AtomicBoolean firstHandled = new AtomicBoolean();
Flux<Integer> consumer =
Flux.from(processor)
.doOnDiscard(Integer.class, i -> {
log.info("Processor: discarding {}", i);
discardedByObserverAndPublisher.add(i);
})
.doOnDiscard(Integer.class, discardedByObserverAndPublisher::add)
.log("processor")
.limitRate(1)
.publishOn(Schedulers.parallel())
.limitRate(1)
.doOnDiscard(Integer.class, i -> {
log.info("publishOn: discarding {}", i);
discardedByPublishOn.add(i);
})
.doOnDiscard(Integer.class, discardedByPublishOn::add)
.<Integer>handle((i, sink) -> {
if (firstHandled.compareAndSet(false, true)) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright (c) 2019, Salesforce.com, Inc.
* All rights reserved.
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/

package com.salesforce.reactorgrpc;

import io.grpc.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;

import static org.assertj.core.api.Assertions.assertThat;

@SuppressWarnings("Duplicates")
@RunWith(Parameterized.class)
public class DoNotCallUntilSubscribeIntegrationTest {
private Server server;
private ManagedChannel channel;
private WasCalledInterceptor interceptor;

@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ new TestService(), false },
{ new FusedTestService(), true }
});
}

private final ReactorGreeterGrpc.GreeterImplBase service;
private final boolean expectFusion;

public DoNotCallUntilSubscribeIntegrationTest(ReactorGreeterGrpc.GreeterImplBase service, boolean expectFusion) {
this.service = service;
this.expectFusion = expectFusion;
}

private static class WasCalledInterceptor implements ServerInterceptor {
private boolean wasCalled = false;
private boolean didRespond = false;

public boolean wasCalled() {
return wasCalled;
}

public boolean didRespond() {
return didRespond;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
didRespond = true;
super.sendMessage(message);
}
}, headers)) {
@Override
public void onMessage(ReqT message) {
wasCalled = true;
super.onMessage(message);
}
};
}
}

@Before
public void setupServer() throws Exception {
interceptor = new WasCalledInterceptor();
server = ServerBuilder.forPort(9000).addService(service).intercept(interceptor).build().start();
channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build();
}

@After
public void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
channel.shutdown();

server = null;
channel = null;
}

@Test
public void oneToOne() throws Exception {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactorjava").build());
Mono<HelloResponse> resp = req.transform(stub::sayHello);

Thread.sleep(100);
assertThat(interceptor.wasCalled()).isFalse();
assertThat(interceptor.didRespond()).isFalse();
}

@Test
public void oneToMany() throws Exception {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactorjava").build());
Flux<HelloResponse> resp = req.as(stub::sayHelloRespStream);

Thread.sleep(100);
assertThat(interceptor.wasCalled()).isFalse();
assertThat(interceptor.didRespond()).isFalse();
}

@Test
public void manyToOne() throws Exception {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Flux<HelloRequest> req = Flux.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());

if (!expectFusion) {
req = req.hide();
}

Mono<HelloResponse> resp = req.as(stub::sayHelloReqStream);

Thread.sleep(100);
assertThat(interceptor.wasCalled()).isFalse();
assertThat(interceptor.didRespond()).isFalse();
}

@Test
public void manyToMany() throws Exception {
ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(channel);
Flux<HelloRequest> req = Flux.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build(),
HelloRequest.newBuilder().setName("d").build(),
HelloRequest.newBuilder().setName("e").build());

if (!expectFusion) {
req = req.hide();
}

Flux<HelloResponse> resp = req.transform(stub::sayHelloBothStream);

Thread.sleep(100);
assertThat(interceptor.wasCalled()).isFalse();
assertThat(interceptor.didRespond()).isFalse();
}

static class TestService extends ReactorGreeterGrpc.GreeterImplBase {

@Override
public Mono<HelloResponse> sayHello(HelloRequest protoRequest) {
return Mono.fromCallable(() -> greet("Hello", protoRequest));
}

@Override
public Flux<HelloResponse> sayHelloRespStream(HelloRequest protoRequest) {
return Flux.just(
greet("Hello", protoRequest),
greet("Hi", protoRequest),
greet("Greetings", protoRequest));
}

@Override
public Mono<HelloResponse> sayHelloReqStream(Flux<HelloRequest> reactorRequest) {
return reactorRequest
.hide()
.map(HelloRequest::getName)
.collectList()
.map(names -> greet("Hello", String.join(" and ", names)))
.hide();
}

@Override
public Flux<HelloResponse> sayHelloBothStream(Flux<HelloRequest> reactorRequest) {
return reactorRequest
.hide()
.map(HelloRequest::getName)
.buffer(2)
.map(names -> greet("Hello", String.join(" and ", names)))
.hide();
}

private HelloResponse greet(String greeting, HelloRequest request) {
return greet(greeting, request.getName());
}

private HelloResponse greet(String greeting, String name) {
return HelloResponse.newBuilder().setMessage(greeting + " " + name).build();
}
}

static class FusedTestService extends ReactorGreeterGrpc.GreeterImplBase {

@Override
public Mono<HelloResponse> sayHello(Mono<HelloRequest> reactorRequest) {
return reactorRequest.map(protoRequest -> greet("Hello", protoRequest));
}

@Override
public Flux<HelloResponse> sayHelloRespStream(Mono<HelloRequest> reactorRequest) {
return reactorRequest.flatMapMany(protoRequest -> Flux.just(
greet("Hello", protoRequest),
greet("Hi", protoRequest),
greet("Greetings", protoRequest)));
}

@Override
public Mono<HelloResponse> sayHelloReqStream(Flux<HelloRequest> reactorRequest) {
return reactorRequest
.map(HelloRequest::getName)
.collectList()
.map(names -> greet("Hello", String.join(" and ", names)));
}

@Override
public Flux<HelloResponse> sayHelloBothStream(Flux<HelloRequest> reactorRequest) {
return reactorRequest
.map(HelloRequest::getName)
.buffer(2)
.map(names -> greet("Hello", String.join(" and ", names)));
}

private HelloResponse greet(String greeting, HelloRequest request) {
return greet(greeting, request.getName());
}

private HelloResponse greet(String greeting, String name) {
return HelloResponse.newBuilder().setMessage(greeting + " " + name).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ public void run() {
}
}
);
delegate.apply(observerAndPublisher);

return Flowable.fromPublisher(observerAndPublisher)
.singleOrError();
.doOnSubscribe(s -> delegate.apply(observerAndPublisher))
.singleOrError();
} catch (Throwable throwable) {
return Single.error(throwable);
}
Expand Down Expand Up @@ -175,9 +175,8 @@ public void run() {
}
},
prefetch, lowTide);
delegate.apply(observerAndPublisher);

return Flowable.fromPublisher(observerAndPublisher);
return Flowable.fromPublisher(observerAndPublisher).doOnSubscribe(s -> delegate.apply(observerAndPublisher));
} catch (Throwable throwable) {
return Flowable.error(throwable);
}
Expand Down
Loading

0 comments on commit ae3f588

Please sign in to comment.