diff --git a/rxnetty-common/src/main/java/io/reactivex/netty/channel/Connection.java b/rxnetty-common/src/main/java/io/reactivex/netty/channel/Connection.java index a71ff132..1aa80918 100644 --- a/rxnetty-common/src/main/java/io/reactivex/netty/channel/Connection.java +++ b/rxnetty-common/src/main/java/io/reactivex/netty/channel/Connection.java @@ -317,7 +317,7 @@ protected void connectCloseToChannelClose() { .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - close(false); // Close this connection when the channel is closed. + closeNow(); // Close this connection when the channel is closed. } }); nettyChannel.attr(CONNECTION_ATTRIBUTE_KEY).set(this); diff --git a/rxnetty-common/src/test/java/io/reactivex/netty/client/pool/PooledConnectionProviderImplTest.java b/rxnetty-common/src/test/java/io/reactivex/netty/client/pool/PooledConnectionProviderImplTest.java index 56b9055d..0d7bab74 100644 --- a/rxnetty-common/src/test/java/io/reactivex/netty/client/pool/PooledConnectionProviderImplTest.java +++ b/rxnetty-common/src/test/java/io/reactivex/netty/client/pool/PooledConnectionProviderImplTest.java @@ -213,7 +213,7 @@ public void testMetricEventCallback() throws Throwable { assertThat("Unexpected release attempted count.", eventsListener.getReleaseAttemptedCount(), is(2L)); assertThat("Unexpected release succeeded count.", eventsListener.getReleaseSucceededCount(), is(2L)); assertThat("Unexpected release failed count.", eventsListener.getReleaseFailedCount(), is(0L)); - assertThat("Unexpected connection eviction count.", eventsListener.getEvictionCount(), is(1L)); + assertThat("Unexpected connection eviction count.", eventsListener.getEvictionCount(), is(2L)); } private PooledConnection _testRelease() throws Exception { @@ -379,4 +379,4 @@ public void call(Subscriber> s) { public @interface MaxConnections { int value() default DEFAULT_MAX_CONNECTIONS; } -} \ No newline at end of file +} diff --git a/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java b/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java index ad10f846..8713ced2 100644 --- a/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java +++ b/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java @@ -20,9 +20,20 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.reactivex.netty.client.pool.PooledConnection; import org.junit.Rule; +import org.junit.Test; +import rx.Observable; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.observers.AssertableSubscriber; -import static org.hamcrest.MatcherAssert.*; -import static org.hamcrest.Matchers.*; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static rx.Observable.fromCallable; +import static rx.Observable.just; /** * This tests the code paths which are not invoked for {@link EmbeddedChannel} as it does not schedule any task @@ -49,4 +60,83 @@ public void testReuse() throws Exception { assertThat("Connection is not reused.", connection2, is(connection)); } + + @Test + /** + * + * Load test to prove concurrency issues mainly seen on heavy load. + * + */ + public void assertPermitsAreReleasedWhenMergingObservablesWithExceptions() { + clientRule.startServer(10, true); + + MockTcpClientEventListener listener = new MockTcpClientEventListener(); + clientRule.getClient().subscribe(listener); + + int number_of_iterations = 1; + int numberOfRequests = 3; + + makeRequests(number_of_iterations, numberOfRequests); + + sleep(clientRule.getPoolConfig().getMaxIdleTimeMillis()); + + assertThat("Permits should be 10", clientRule.getPoolConfig().getPoolLimitDeterminationStrategy().getAvailablePermits(), equalTo(10)); + } + + private void sleep(long i) { + try { + Thread.sleep(i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private void makeRequests(int number_of_iterations, int numberOfRequests) { + for (int j = 0; j < number_of_iterations; j++) { + + //List> results = new ArrayList<>(); + + sleep(100); + + List> results = new ArrayList<>(); + + //Just giving the client some time to recover + sleep(100); + + for (int i = 0; i < numberOfRequests; i++) { + results.add( + fromCallable(new Func0>() { + @Override + public PooledConnection call() { + return clientRule.connect(); + } + }) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(PooledConnection connection) { + return connection.writeStringAndFlushOnEach(just("Hello")) + .toCompletable() + .toObservable() + .concatWith(connection.getInput()) + .take(1) + .single() + .map(new Func1() { + @Override + public String call(ByteBuf byteBuf) { + try { + byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + return new String(bytes); + } finally { + byteBuf.release(); + } + } + }); + } + })); + } + AssertableSubscriber test = Observable.merge(results).test(); + test.awaitTerminalEvent(); + } + } } diff --git a/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java b/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java index 411c9b20..22fa3436 100644 --- a/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java +++ b/rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/TcpClientRule.java @@ -18,12 +18,9 @@ import io.netty.buffer.ByteBuf; import io.reactivex.netty.channel.Connection; -import io.reactivex.netty.client.ConnectionProvider; -import io.reactivex.netty.client.ConnectionProviderFactory; import io.reactivex.netty.client.Host; -import io.reactivex.netty.client.HostConnector; +import io.reactivex.netty.client.pool.PoolConfig; import io.reactivex.netty.client.pool.PooledConnection; -import io.reactivex.netty.client.pool.PooledConnectionProvider; import io.reactivex.netty.client.pool.SingleHostPoolingProviderFactory; import io.reactivex.netty.protocol.tcp.server.ConnectionHandler; import io.reactivex.netty.protocol.tcp.server.TcpServer; @@ -35,13 +32,14 @@ import java.net.InetSocketAddress; -import static org.hamcrest.MatcherAssert.*; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; public class TcpClientRule extends ExternalResource { - private TcpServer server; - private TcpClient client; + private TcpServer server; + private TcpClient client; + private PoolConfig poolConfig; @Override public Statement apply(final Statement base, Description description) { @@ -55,10 +53,18 @@ public void evaluate() throws Throwable { } public void startServer(int maxConnections) { + startServer(maxConnections, false); + } + + public void startServer(int maxConnections, final boolean failing) { server.start(new ConnectionHandler() { @Override public Observable handle(Connection newConnection) { - return newConnection.writeAndFlushOnEach(newConnection.getInput()); + if(failing) { + throw new RuntimeException("exception"); + } else { + return newConnection.writeAndFlushOnEach(newConnection.getInput()); + } } }); createClient(maxConnections); @@ -85,8 +91,9 @@ public PooledConnection connect() { private void createClient(final int maxConnections) { InetSocketAddress serverAddr = new InetSocketAddress("127.0.0.1", server.getServerPort()); - client = TcpClient.newClient(SingleHostPoolingProviderFactory.createBounded(maxConnections), - Observable.just(new Host(serverAddr))); + poolConfig = new PoolConfig().maxConnections(maxConnections); + SingleHostPoolingProviderFactory bounded = SingleHostPoolingProviderFactory.create(poolConfig); + client = TcpClient.newClient(bounded, Observable.just(new Host(serverAddr))); } public TcpServer getServer() { @@ -96,4 +103,8 @@ public TcpServer getServer() { public TcpClient getClient() { return client; } + + public PoolConfig getPoolConfig() { + return poolConfig; + } }