From e8504f00953abb18b3c163bd908ffb306f89992f Mon Sep 17 00:00:00 2001 From: Jonas Fredin Date: Tue, 22 Jan 2019 10:25:19 +0100 Subject: [PATCH 1/4] Making sure the idleConnectionSubscription is not unsubscribed and stops working --- .../pool/PooledConnectionProviderImpl.java | 50 ++++++++++++++----- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java index fcbb9508..0168dcdc 100644 --- a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java +++ b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java @@ -56,32 +56,23 @@ public final class PooledConnectionProviderImpl extends PooledConnectionPr private static final Logger logger = LoggerFactory.getLogger(PooledConnectionProviderImpl.class); - private final Subscription idleConnCleanupSubscription; + private Subscription idleConnCleanupSubscription; private final IdleConnectionsHolder idleConnectionsHolder; private final PoolLimitDeterminationStrategy limitDeterminationStrategy; private final long maxIdleTimeMillis; + private final PoolConfig poolConfig; private final HostConnector hostConnector; private volatile boolean isShutdown; public PooledConnectionProviderImpl(PoolConfig poolConfig, HostConnector hostConnector) { + this.poolConfig = poolConfig; this.hostConnector = hostConnector; idleConnectionsHolder = poolConfig.getIdleConnectionsHolder(); limitDeterminationStrategy = poolConfig.getPoolLimitDeterminationStrategy(); maxIdleTimeMillis = poolConfig.getMaxIdleTimeMillis(); // In case, there is no cleanup required, this observable should never give a tick. - idleConnCleanupSubscription = poolConfig.getIdleConnectionsCleanupTimer() - .doOnError(LogErrorAction.INSTANCE) - .retry() // Retry when there is an error in timer. - .concatMap(new IdleConnectionCleanupTask()) - .onErrorResumeNext(new Func1>() { - @Override - public Observable call(Throwable throwable) { - logger.error("Ignoring error cleaning up idle connections.", - throwable); - return Observable.empty(); - } - }).subscribe(Actions.empty()); // Errors are logged and ignored. + idleConnCleanupSubscription = getIdleConnectionCleanUpSubscription(); // Errors are logged and ignored. hostConnector.getHost() .getCloseNotifier() @@ -103,6 +94,39 @@ public Observable call(Throwable throwable) { .subscribe(Actions.empty()); } + /** + * Creates a cleanup subscription to clean up the idle connections + * @return + */ + private Subscription getIdleConnectionCleanUpSubscription() { + return poolConfig.getIdleConnCleanupTicker() + .doOnError(LogErrorAction.INSTANCE) + .retry() // Retry when there is an error in timer. + .concatMap(new IdleConnectionCleanupTask()) + .doOnUnsubscribe(new Action0() { + + /* + * the idle connection ticker is for some reason unsubscribed and + * To make the ticker work as expected by ticking every once in a while + * we reinitiate the cleanUp subscription + */ + @Override + public void call() { + if(!isShutdown) { + idleConnCleanupSubscription = getIdleConnectionCleanUpSubscription(); + } + } + }) + .onErrorResumeNext(new Func1>() { + @Override + public Observable call(Throwable throwable) { + logger.error("Ignoring error cleaning up idle connections.", + throwable); + return Observable.empty(); + } + }).subscribe(); + } + @Override public Observable> newConnectionRequest() { return Observable.create(new OnSubscribe>() { From 0859981bb7d269b3451b5c12f7d3a7bf209c7c54 Mon Sep 17 00:00:00 2001 From: Jonas Fredin Date: Tue, 22 Jan 2019 13:08:51 +0100 Subject: [PATCH 2/4] using interval instead of timer in idleConnectionCleanupTicker Timer will only tick once and then complete. Interval will tick once every specified millisecond. --- .../netty/client/pool/PoolConfig.java | 2 +- .../pool/PooledConnectionProviderImpl.java | 50 +++++-------------- 2 files changed, 14 insertions(+), 38 deletions(-) diff --git a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PoolConfig.java b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PoolConfig.java index 2f030784..2a11cce1 100644 --- a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PoolConfig.java +++ b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PoolConfig.java @@ -37,7 +37,7 @@ public class PoolConfig { public PoolConfig() { maxIdleTimeMillis = DEFAULT_MAX_IDLE_TIME_MILLIS; - idleConnCleanupTicker = Observable.timer(maxIdleTimeMillis, TimeUnit.MILLISECONDS); + idleConnCleanupTicker = Observable.interval(maxIdleTimeMillis, TimeUnit.MILLISECONDS); idleConnectionsHolder = new FIFOIdleConnectionsHolder<>(); limitDeterminationStrategy = UnboundedPoolLimitDeterminationStrategy.INSTANCE; } diff --git a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java index 0168dcdc..809c2255 100644 --- a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java +++ b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java @@ -56,23 +56,32 @@ public final class PooledConnectionProviderImpl extends PooledConnectionPr private static final Logger logger = LoggerFactory.getLogger(PooledConnectionProviderImpl.class); - private Subscription idleConnCleanupSubscription; + private final Subscription idleConnCleanupSubscription; private final IdleConnectionsHolder idleConnectionsHolder; private final PoolLimitDeterminationStrategy limitDeterminationStrategy; private final long maxIdleTimeMillis; - private final PoolConfig poolConfig; private final HostConnector hostConnector; private volatile boolean isShutdown; public PooledConnectionProviderImpl(PoolConfig poolConfig, HostConnector hostConnector) { - this.poolConfig = poolConfig; this.hostConnector = hostConnector; idleConnectionsHolder = poolConfig.getIdleConnectionsHolder(); limitDeterminationStrategy = poolConfig.getPoolLimitDeterminationStrategy(); maxIdleTimeMillis = poolConfig.getMaxIdleTimeMillis(); // In case, there is no cleanup required, this observable should never give a tick. - idleConnCleanupSubscription = getIdleConnectionCleanUpSubscription(); // Errors are logged and ignored. + idleConnCleanupSubscription = poolConfig.getIdleConnCleanupTicker() + .doOnError(LogErrorAction.INSTANCE) + .retry() // Retry when there is an error in timer. + .concatMap(new IdleConnectionCleanupTask()) + .onErrorResumeNext(new Func1>() { + @Override + public Observable call(Throwable throwable) { + logger.error("Ignoring error cleaning up idle connections.", + throwable); + return Observable.empty(); + } + }).subscribe(); hostConnector.getHost() .getCloseNotifier() @@ -94,39 +103,6 @@ public Observable call(Throwable throwable) { .subscribe(Actions.empty()); } - /** - * Creates a cleanup subscription to clean up the idle connections - * @return - */ - private Subscription getIdleConnectionCleanUpSubscription() { - return poolConfig.getIdleConnCleanupTicker() - .doOnError(LogErrorAction.INSTANCE) - .retry() // Retry when there is an error in timer. - .concatMap(new IdleConnectionCleanupTask()) - .doOnUnsubscribe(new Action0() { - - /* - * the idle connection ticker is for some reason unsubscribed and - * To make the ticker work as expected by ticking every once in a while - * we reinitiate the cleanUp subscription - */ - @Override - public void call() { - if(!isShutdown) { - idleConnCleanupSubscription = getIdleConnectionCleanUpSubscription(); - } - } - }) - .onErrorResumeNext(new Func1>() { - @Override - public Observable call(Throwable throwable) { - logger.error("Ignoring error cleaning up idle connections.", - throwable); - return Observable.empty(); - } - }).subscribe(); - } - @Override public Observable> newConnectionRequest() { return Observable.create(new OnSubscribe>() { From 615fe49ed9760610c4c1cc106bbe695dd08f4333 Mon Sep 17 00:00:00 2001 From: Jonas Fredin Date: Thu, 14 Feb 2019 08:55:03 +0100 Subject: [PATCH 3/4] change error handling to log on error but make sure the observable is subscribed by retrying --- .../netty/client/pool/PooledConnectionProviderImpl.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java index 809c2255..92f78ca7 100644 --- a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java +++ b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PooledConnectionProviderImpl.java @@ -74,14 +74,15 @@ public PooledConnectionProviderImpl(PoolConfig poolConfig, HostConnector>() { + .doOnError(new Action1() { @Override - public Observable call(Throwable throwable) { + public void call(Throwable throwable) { logger.error("Ignoring error cleaning up idle connections.", throwable); - return Observable.empty(); } - }).subscribe(); + }) + .retry() + .subscribe(); hostConnector.getHost() .getCloseNotifier() From 52b67b2f99592b75fb516e3fec51cd4ea38d3723 Mon Sep 17 00:00:00 2001 From: Jonas Fredin Date: Thu, 14 Feb 2019 11:36:22 +0100 Subject: [PATCH 4/4] Add initial delay to idleConnectionCleanup --- .../main/java/io/reactivex/netty/client/pool/PoolConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PoolConfig.java b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PoolConfig.java index 2a11cce1..926748cb 100644 --- a/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PoolConfig.java +++ b/rxnetty-common/src/main/java/io/reactivex/netty/client/pool/PoolConfig.java @@ -37,7 +37,7 @@ public class PoolConfig { public PoolConfig() { maxIdleTimeMillis = DEFAULT_MAX_IDLE_TIME_MILLIS; - idleConnCleanupTicker = Observable.interval(maxIdleTimeMillis, TimeUnit.MILLISECONDS); + idleConnCleanupTicker = Observable.interval(maxIdleTimeMillis, maxIdleTimeMillis, TimeUnit.MILLISECONDS); idleConnectionsHolder = new FIFOIdleConnectionsHolder<>(); limitDeterminationStrategy = UnboundedPoolLimitDeterminationStrategy.INSTANCE; }