Skip to content

Commit

Permalink
Merge pull request #185 from scalecube/develop
Browse files Browse the repository at this point in the history
Prepare new release
  • Loading branch information
artem-v authored Jul 10, 2018
2 parents 1adf297 + c798245 commit a43cce0
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<jackson.version>2.9.0</jackson.version>
<scalecube.version>1.0.9</scalecube.version>
<scalecube-benchmarks.version>1.1.1</scalecube-benchmarks.version>
<reactor.version>3.1.6.RELEASE</reactor.version>
<reactor.version>3.1.8.RELEASE</reactor.version>
<rxjava.version>1.3.8</rxjava.version>
<metrics.version>3.1.2</metrics.version>
<rsocket.version>0.11.3</rsocket.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ public RSocketServiceClientAdapter(Mono<RSocket> rSocket, ServiceMessageCodec co
@Override
public Mono<ServiceMessage> requestResponse(ServiceMessage message) {
return rSocket
.flatMap(rSocket -> rSocket.requestResponse(toPayload(message)).or(listenConnectionClose(rSocket)))
.flatMap(rSocket -> rSocket.requestResponse(toPayload(message))
.takeUntilOther(listenConnectionClose(rSocket)))
.map(this::toMessage);
}

@Override
public Flux<ServiceMessage> requestStream(ServiceMessage message) {
return rSocket
.flatMapMany(rSocket -> rSocket.requestStream(toPayload(message)).or(listenConnectionClose(rSocket)))
.flatMapMany(rSocket -> rSocket.requestStream(toPayload(message))
.takeUntilOther(listenConnectionClose(rSocket)))
.map(this::toMessage);
}

Expand All @@ -43,7 +45,7 @@ public Flux<ServiceMessage> requestChannel(Publisher<ServiceMessage> publisher)
return rSocket
.flatMapMany(rSocket -> rSocket
.requestChannel(Flux.from(publisher).map(this::toPayload))
.or(listenConnectionClose(rSocket)))
.takeUntilOther(listenConnectionClose(rSocket)))
.map(this::toMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import reactor.core.Disposable;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import reactor.core.Disposable;

public class ServiceTransportTest {

private static AtomicInteger port = new AtomicInteger(6000);
Expand All @@ -29,6 +29,8 @@ public class ServiceTransportTest {
ServiceMessage.builder().qualifier(QuoteService.NAME, "justNever").build();
private static final ServiceMessage JUST_MANY_NEVER =
ServiceMessage.builder().qualifier(QuoteService.NAME, "justManyNever").build();
private static final ServiceMessage ONLY_ONE_AND_THEN_NEVER =
ServiceMessage.builder().qualifier(QuoteService.NAME, "onlyOneAndThenNever").build();

private Microservices gateway;
private Microservices serviceNode;
Expand Down Expand Up @@ -63,7 +65,7 @@ public void cleanUp() {
}

@Test
public void test_remote_node_died_mono() throws Exception {
public void test_remote_node_died_mono_never() throws Exception {
int batchSize = 1;

final CountDownLatch latch1 = new CountDownLatch(batchSize);
Expand Down Expand Up @@ -92,15 +94,44 @@ public void test_remote_node_died_mono() throws Exception {
}

@Test
public void test_remote_node_died_flux() throws Exception {
public void test_remote_node_died_many_never() throws Exception {
int batchSize = 1;

final CountDownLatch latch1 = new CountDownLatch(batchSize);
AtomicReference<Disposable> sub1 = new AtomicReference<>(null);
AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);

ServiceCall serviceCall = gateway.call().create();
sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).log("test_remote_node_died_many_never")
.doOnError(exceptionHolder::set)
.subscribe());

gateway.cluster().listenMembership()
.filter(MembershipEvent::isRemoved)
.subscribe(onNext -> latch1.countDown(), System.err::println);

// service node goes down
TimeUnit.SECONDS.sleep(3);
serviceNode.shutdown().block(Duration.ofSeconds(6));

latch1.await(20, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS.sleep(100);

assertEquals(0, latch1.getCount());
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
assertTrue(sub1.get().isDisposed());
}

@Test
public void test_remote_node_died_many_then_never() throws Exception {
int batchSize = 1;

final CountDownLatch latch1 = new CountDownLatch(batchSize);
AtomicReference<Disposable> sub1 = new AtomicReference<>(null);
AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);

ServiceCall serviceCall = gateway.call().create();
sub1.set(serviceCall.requestMany(JUST_MANY_NEVER).log("test_remote_node_died_flux")
sub1.set(serviceCall.requestMany(ONLY_ONE_AND_THEN_NEVER).log("test_remote_node_died_many_only_one_and_then_never")
.doOnError(exceptionHolder::set)
.subscribe());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ public interface QuoteService {

@ServiceMethod
Flux<String> justManyNever();

@ServiceMethod
Flux<String> onlyOneAndThenNever();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public Mono<String> justNever() {
public Flux<String> justManyNever() {
return Flux.never();
}

@Override
public Flux<String> onlyOneAndThenNever() {
return Flux.merge(Mono.just("only first"), Mono.never());
}
}

0 comments on commit a43cce0

Please sign in to comment.