Skip to content

Commit 80722aa

Browse files
committed
Using Vertx thread for RxJava
1 parent f9076b3 commit 80722aa

File tree

3 files changed

+41
-35
lines changed

3 files changed

+41
-35
lines changed

src/main/java/in/erail/service/RESTServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void start() throws InstantiationException, IllegalAccessException {
5050
.eventBus()
5151
.<JsonObject>consumer(getServiceUniqueId())
5252
.toFlowable()
53-
.subscribeOn(getScheduler())
53+
.observeOn(getScheduler())
5454
.flatMapSingle(this::handleRequest)
5555
.doOnSubscribe((s) -> getLog().info(() -> String.format("%s[%s] service started", getServiceUniqueId(), Thread.currentThread().getName())))
5656
.doOnTerminate(() -> getLog().info(() -> String.format("%s[%s] service stopped", getServiceUniqueId(), Thread.currentThread().getName())))

src/main/java/in/erail/service/SingletonServiceImpl.java

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import in.erail.glue.annotation.StartService;
44
import io.reactivex.Completable;
55
import io.reactivex.Single;
6-
import io.reactivex.schedulers.Schedulers;
76
import io.vertx.core.spi.cluster.ClusterManager;
87
import io.vertx.core.spi.cluster.NodeListener;
98
import io.vertx.reactivex.core.Vertx;
9+
import java.util.Optional;
1010
import org.apache.logging.log4j.Logger;
1111

1212
/**
@@ -29,31 +29,41 @@ public void start() {
2929
return;
3030
}
3131

32-
allowServiceToStart()
33-
.subscribeOn(Schedulers.io())
32+
Single
33+
.just(Optional.<String>empty())
34+
.flatMapCompletable(this::init)
35+
.subscribe();
36+
}
37+
38+
protected Completable init(Optional<String> pOldNodeID) {
39+
final String serviceName = getServiceName();
40+
final String thisNodeId = getClusterManager().getNodeID();
41+
42+
return Single
43+
.just(getServiceMapName())
44+
.flatMap(name -> getVertx().sharedData().<String, String>rxGetClusterWideMap(name))
45+
.flatMap(m -> {
46+
if (pOldNodeID.isPresent()) {
47+
return m
48+
.rxReplaceIfPresent(getServiceName(), pOldNodeID.get(), thisNodeId)
49+
.map(v -> v ? thisNodeId : "");
50+
}
51+
return m
52+
.rxPutIfAbsent(serviceName, thisNodeId)
53+
.switchIfEmpty(Single.just(thisNodeId));
54+
})
55+
.doOnSuccess(serviceOwnerId -> getLog().debug(() -> "Service Owner ID:" + serviceOwnerId + ", This Node ID:" + thisNodeId))
56+
.map(serviceOwnerId -> thisNodeId.equals(serviceOwnerId))
57+
.doOnSuccess(t -> getLog().debug(() -> "Service Start Decision:" + getServiceName() + ":" + t))
3458
.flatMapCompletable((success) -> {
3559
if (success) {
3660
getLog().info(String.format("Starting Service:[%s]", getServiceName()));
3761
return startService()
3862
.doOnComplete(() -> getLog().info(String.format("Service:[%s] started", getServiceName())));
3963
}
4064
return Completable.complete();
41-
})
42-
.blockingAwait();
43-
}
65+
});
4466

45-
protected Single<Boolean> allowServiceToStart() {
46-
final String serviceName = getServiceName();
47-
final String value = getClusterManager().getNodeID();
48-
49-
return getVertx()
50-
.sharedData()
51-
.<String, String>rxGetClusterWideMap(getServiceMapName())
52-
.flatMapMaybe(m -> m.rxPutIfAbsent(serviceName, value))
53-
.toSingle(value)
54-
.doOnSuccess(serviceOwnerId -> getLog().debug(() -> "Service Owner ID:" + serviceOwnerId + ", This Node ID:" + value))
55-
.map(serviceOwnerId -> getClusterManager().getNodeID().equals(serviceOwnerId))
56-
.doOnSuccess(t -> getLog().debug(() -> "Service Start Decision:" + getServiceName() + ":" + t));
5767
}
5868

5969
@Override
@@ -67,20 +77,10 @@ public void nodeLeft(String pNodeID) {
6777
return;
6878
}
6979

70-
getVertx()
71-
.sharedData()
72-
.<String, String>rxGetClusterWideMap(getServiceMapName())
73-
.subscribeOn(Schedulers.io())
74-
.flatMap((m) -> m.rxReplaceIfPresent(getServiceName(), pNodeID, getClusterManager().getNodeID()))
75-
.flatMapCompletable((success) -> {
76-
if (success) {
77-
getLog().info(String.format("Starting Service:[%s] becuase of cluster state update", getServiceName()));
78-
return startService()
79-
.doOnComplete(() -> getLog().info(String.format("Service:[%s] start complete because of cluster state update", getServiceName())));
80-
}
81-
return Completable.complete();
82-
})
83-
.blockingAwait();
80+
Single
81+
.just(Optional.of(pNodeID))
82+
.flatMapCompletable(this::init)
83+
.subscribe();
8484
}
8585

8686
public Vertx getVertx() {

src/main/java/io/vertx/core/VertxInstance.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
import java.util.concurrent.CompletableFuture;
55
import java.util.concurrent.ExecutionException;
66
import in.erail.glue.annotation.StartService;
7+
import io.reactivex.plugins.RxJavaPlugins;
8+
import io.vertx.reactivex.core.RxHelper;
79

810
public class VertxInstance {
911

1012
private VertxOptions mVertxOptions;
1113
private boolean mClusterEnable = true;
12-
private CompletableFuture<Vertx> mVertx = new CompletableFuture<>();
14+
private final CompletableFuture<Vertx> mVertx = new CompletableFuture<>();
1315

1416
@StartService
1517
public void start() {
@@ -22,7 +24,11 @@ public void start() {
2224
}
2325

2426
public Vertx create() throws InterruptedException, ExecutionException {
25-
return mVertx.get();
27+
Vertx v = mVertx.get();
28+
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(v));
29+
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(v));
30+
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(v));
31+
return v;
2632
}
2733

2834
public VertxOptions getVertxOptions() {

0 commit comments

Comments
 (0)