Skip to content

Commit

Permalink
actually closing the connection instead of just creating the observab…
Browse files Browse the repository at this point in the history
…le without any subscription to it.

Added test that fails without this fix
  • Loading branch information
fnxrassmate committed Jan 22, 2019
1 parent cdf30e4 commit 73d1081
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ protected void connectCloseToChannelClose() {
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
close(false); // Close this connection when the channel is closed.
closeNow(); // Close this connection when the channel is closed.
}
});
nettyChannel.attr(CONNECTION_ATTRIBUTE_KEY).set(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void testMetricEventCallback() throws Throwable {
assertThat("Unexpected release attempted count.", eventsListener.getReleaseAttemptedCount(), is(2L));
assertThat("Unexpected release succeeded count.", eventsListener.getReleaseSucceededCount(), is(2L));
assertThat("Unexpected release failed count.", eventsListener.getReleaseFailedCount(), is(0L));
assertThat("Unexpected connection eviction count.", eventsListener.getEvictionCount(), is(1L));
assertThat("Unexpected connection eviction count.", eventsListener.getEvictionCount(), is(2L));
}

private PooledConnection<String, String> _testRelease() throws Exception {
Expand Down Expand Up @@ -379,4 +379,4 @@ public void call(Subscriber<? super Connection<String, String>> s) {
public @interface MaxConnections {
int value() default DEFAULT_MAX_CONNECTIONS;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,20 @@
import io.netty.channel.embedded.EmbeddedChannel;
import io.reactivex.netty.client.pool.PooledConnection;
import org.junit.Rule;
import org.junit.Test;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.AssertableSubscriber;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static rx.Observable.fromCallable;
import static rx.Observable.just;

/**
* This tests the code paths which are not invoked for {@link EmbeddedChannel} as it does not schedule any task
Expand All @@ -49,4 +60,83 @@ public void testReuse() throws Exception {

assertThat("Connection is not reused.", connection2, is(connection));
}

@Test
/**
*
* Load test to prove concurrency issues mainly seen on heavy load.
*
*/
public void assertPermitsAreReleasedWhenMergingObservablesWithExceptions() {
clientRule.startServer(10, true);

MockTcpClientEventListener listener = new MockTcpClientEventListener();
clientRule.getClient().subscribe(listener);

int number_of_iterations = 1;
int numberOfRequests = 3;

makeRequests(number_of_iterations, numberOfRequests);

sleep(clientRule.getPoolConfig().getMaxIdleTimeMillis());

assertThat("Permits should be 10", clientRule.getPoolConfig().getPoolLimitDeterminationStrategy().getAvailablePermits(), equalTo(10));
}

private void sleep(long i) {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void makeRequests(int number_of_iterations, int numberOfRequests) {
for (int j = 0; j < number_of_iterations; j++) {

//List<Observable<String>> results = new ArrayList<>();

sleep(100);

List<Observable<String>> results = new ArrayList<>();

//Just giving the client some time to recover
sleep(100);

for (int i = 0; i < numberOfRequests; i++) {
results.add(
fromCallable(new Func0<PooledConnection<ByteBuf, ByteBuf>>() {
@Override
public PooledConnection<ByteBuf, ByteBuf> call() {
return clientRule.connect();
}
})
.flatMap(new Func1<PooledConnection<ByteBuf, ByteBuf>, Observable<String>>() {
@Override
public Observable<String> call(PooledConnection<ByteBuf, ByteBuf> connection) {
return connection.writeStringAndFlushOnEach(just("Hello"))
.toCompletable()
.<ByteBuf>toObservable()
.concatWith(connection.getInput())
.take(1)
.single()
.map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf byteBuf) {
try {
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
return new String(bytes);
} finally {
byteBuf.release();
}
}
});
}
}));
}
AssertableSubscriber<String> test = Observable.merge(results).test();
test.awaitTerminalEvent();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.client.ConnectionProvider;
import io.reactivex.netty.client.ConnectionProviderFactory;
import io.reactivex.netty.client.Host;
import io.reactivex.netty.client.HostConnector;
import io.reactivex.netty.client.pool.PoolConfig;
import io.reactivex.netty.client.pool.PooledConnection;
import io.reactivex.netty.client.pool.PooledConnectionProvider;
import io.reactivex.netty.client.pool.SingleHostPoolingProviderFactory;
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
Expand All @@ -35,13 +32,14 @@

import java.net.InetSocketAddress;

import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

public class TcpClientRule extends ExternalResource {

private TcpServer<ByteBuf, ByteBuf> server;
private TcpClient<ByteBuf, ByteBuf> client;
private TcpServer<ByteBuf, ByteBuf> server;
private TcpClient<ByteBuf, ByteBuf> client;
private PoolConfig<ByteBuf, ByteBuf> poolConfig;

@Override
public Statement apply(final Statement base, Description description) {
Expand All @@ -55,10 +53,18 @@ public void evaluate() throws Throwable {
}

public void startServer(int maxConnections) {
startServer(maxConnections, false);
}

public void startServer(int maxConnections, final boolean failing) {
server.start(new ConnectionHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(Connection<ByteBuf, ByteBuf> newConnection) {
return newConnection.writeAndFlushOnEach(newConnection.getInput());
if(failing) {
throw new RuntimeException("exception");
} else {
return newConnection.writeAndFlushOnEach(newConnection.getInput());
}
}
});
createClient(maxConnections);
Expand All @@ -85,8 +91,9 @@ public PooledConnection<ByteBuf, ByteBuf> connect() {

private void createClient(final int maxConnections) {
InetSocketAddress serverAddr = new InetSocketAddress("127.0.0.1", server.getServerPort());
client = TcpClient.newClient(SingleHostPoolingProviderFactory.<ByteBuf, ByteBuf>createBounded(maxConnections),
Observable.just(new Host(serverAddr)));
poolConfig = new PoolConfig<ByteBuf, ByteBuf>().maxConnections(maxConnections);
SingleHostPoolingProviderFactory<ByteBuf, ByteBuf> bounded = SingleHostPoolingProviderFactory.create(poolConfig);
client = TcpClient.newClient(bounded, Observable.just(new Host(serverAddr)));
}

public TcpServer<ByteBuf, ByteBuf> getServer() {
Expand All @@ -96,4 +103,8 @@ public TcpServer<ByteBuf, ByteBuf> getServer() {
public TcpClient<ByteBuf, ByteBuf> getClient() {
return client;
}

public PoolConfig<ByteBuf, ByteBuf> getPoolConfig() {
return poolConfig;
}
}

0 comments on commit 73d1081

Please sign in to comment.