diff --git a/pom.xml b/pom.xml
index c6d79b151..8a82f5e8f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,7 +59,7 @@
2.9.0
1.0.9
1.1.1
- 3.1.6.RELEASE
+ 3.1.8.RELEASE
1.3.8
3.1.2
0.11.3
diff --git a/rsocket-transport/src/main/java/io/scalecube/services/transport/rsocket/client/RSocketServiceClientAdapter.java b/rsocket-transport/src/main/java/io/scalecube/services/transport/rsocket/client/RSocketServiceClientAdapter.java
index 668330a93..d0e57ca94 100644
--- a/rsocket-transport/src/main/java/io/scalecube/services/transport/rsocket/client/RSocketServiceClientAdapter.java
+++ b/rsocket-transport/src/main/java/io/scalecube/services/transport/rsocket/client/RSocketServiceClientAdapter.java
@@ -27,14 +27,16 @@ public RSocketServiceClientAdapter(Mono rSocket, ServiceMessageCodec co
@Override
public Mono requestResponse(ServiceMessage message) {
return rSocket
- .flatMap(rSocket -> rSocket.requestResponse(toPayload(message)).or(listenConnectionClose(rSocket)))
+ .flatMap(rSocket -> rSocket.requestResponse(toPayload(message))
+ .takeUntilOther(listenConnectionClose(rSocket)))
.map(this::toMessage);
}
@Override
public Flux requestStream(ServiceMessage message) {
return rSocket
- .flatMapMany(rSocket -> rSocket.requestStream(toPayload(message)).or(listenConnectionClose(rSocket)))
+ .flatMapMany(rSocket -> rSocket.requestStream(toPayload(message))
+ .takeUntilOther(listenConnectionClose(rSocket)))
.map(this::toMessage);
}
@@ -43,7 +45,7 @@ public Flux requestChannel(Publisher publisher)
return rSocket
.flatMapMany(rSocket -> rSocket
.requestChannel(Flux.from(publisher).map(this::toPayload))
- .or(listenConnectionClose(rSocket)))
+ .takeUntilOther(listenConnectionClose(rSocket)))
.map(this::toMessage);
}
diff --git a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java
index 267422032..c338422d4 100644
--- a/services/src/test/java/io/scalecube/services/ServiceTransportTest.java
+++ b/services/src/test/java/io/scalecube/services/ServiceTransportTest.java
@@ -13,14 +13,14 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import reactor.core.Disposable;
-
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import reactor.core.Disposable;
+
public class ServiceTransportTest {
private static AtomicInteger port = new AtomicInteger(6000);
@@ -29,6 +29,8 @@ public class ServiceTransportTest {
ServiceMessage.builder().qualifier(QuoteService.NAME, "justNever").build();
private static final ServiceMessage JUST_MANY_NEVER =
ServiceMessage.builder().qualifier(QuoteService.NAME, "justManyNever").build();
+ private static final ServiceMessage ONLY_ONE_AND_THEN_NEVER =
+ ServiceMessage.builder().qualifier(QuoteService.NAME, "onlyOneAndThenNever").build();
private Microservices gateway;
private Microservices serviceNode;
@@ -63,7 +65,7 @@ public void cleanUp() {
}
@Test
- public void test_remote_node_died_mono() throws Exception {
+ public void test_remote_node_died_mono_never() throws Exception {
int batchSize = 1;
final CountDownLatch latch1 = new CountDownLatch(batchSize);
@@ -92,7 +94,36 @@ public void test_remote_node_died_mono() throws Exception {
}
@Test
- public void test_remote_node_died_flux() throws Exception {
+ public void test_remote_node_died_many_never() throws Exception {
+ int batchSize = 1;
+
+ final CountDownLatch latch1 = new CountDownLatch(batchSize);
+ AtomicReference sub1 = new AtomicReference<>(null);
+ AtomicReference exceptionHolder = new AtomicReference<>(null);
+
+ ServiceCall serviceCall = gateway.call().create();
+ sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).log("test_remote_node_died_many_never")
+ .doOnError(exceptionHolder::set)
+ .subscribe());
+
+ gateway.cluster().listenMembership()
+ .filter(MembershipEvent::isRemoved)
+ .subscribe(onNext -> latch1.countDown(), System.err::println);
+
+ // service node goes down
+ TimeUnit.SECONDS.sleep(3);
+ serviceNode.shutdown().block(Duration.ofSeconds(6));
+
+ latch1.await(20, TimeUnit.SECONDS);
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ assertEquals(0, latch1.getCount());
+ assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
+ assertTrue(sub1.get().isDisposed());
+ }
+
+ @Test
+ public void test_remote_node_died_many_then_never() throws Exception {
int batchSize = 1;
final CountDownLatch latch1 = new CountDownLatch(batchSize);
@@ -100,7 +131,7 @@ public void test_remote_node_died_flux() throws Exception {
AtomicReference exceptionHolder = new AtomicReference<>(null);
ServiceCall serviceCall = gateway.call().create();
- sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).log("test_remote_node_died_flux")
+ sub1.set(serviceCall.requestMany(ONLY_ONE_AND_THEN_NEVER).log("test_remote_node_died_many_only_one_and_then_never")
.doOnError(exceptionHolder::set)
.subscribe());
diff --git a/services/src/test/java/io/scalecube/services/sut/QuoteService.java b/services/src/test/java/io/scalecube/services/sut/QuoteService.java
index b51380dc2..c6a123484 100644
--- a/services/src/test/java/io/scalecube/services/sut/QuoteService.java
+++ b/services/src/test/java/io/scalecube/services/sut/QuoteService.java
@@ -28,4 +28,7 @@ public interface QuoteService {
@ServiceMethod
Flux justManyNever();
+
+ @ServiceMethod
+ Flux onlyOneAndThenNever();
}
diff --git a/services/src/test/java/io/scalecube/services/sut/SimpleQuoteService.java b/services/src/test/java/io/scalecube/services/sut/SimpleQuoteService.java
index e4b9f7090..cb95918e6 100644
--- a/services/src/test/java/io/scalecube/services/sut/SimpleQuoteService.java
+++ b/services/src/test/java/io/scalecube/services/sut/SimpleQuoteService.java
@@ -42,4 +42,9 @@ public Mono justNever() {
public Flux justManyNever() {
return Flux.never();
}
+
+ @Override
+ public Flux onlyOneAndThenNever() {
+ return Flux.merge(Mono.just("only first"), Mono.never());
+ }
}