Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/patch-3' into patch-5
Browse files Browse the repository at this point in the history
# Conflicts:
#	rxnetty-tcp/src/test/java/io/reactivex/netty/protocol/tcp/client/PoolingWithRealChannelTest.java
  • Loading branch information
fnxrassmate committed Jan 22, 2019
2 parents 73d1081 + 5d37d86 commit d67b6b0
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,9 @@ public void onNext(PooledConnection<R, W> conn) {
onNextArrived = true;
_terminated = terminated;
_error = error;
delegate.onNext(conn);
}

delegate.onNext(conn);

if (_terminated) {
if (null != error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.reactivex.netty.client.pool.PooledConnection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.AssertableSubscriber;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.AssertableSubscriber;

import java.util.ArrayList;
import java.util.List;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -34,6 +44,10 @@
import static org.hamcrest.Matchers.is;
import static rx.Observable.fromCallable;
import static rx.Observable.just;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
import static rx.Observable.fromCallable;
import static rx.Observable.just;

/**
* This tests the code paths which are not invoked for {@link EmbeddedChannel} as it does not schedule any task
Expand Down Expand Up @@ -61,6 +75,79 @@ public void testReuse() throws Exception {
assertThat("Connection is not reused.", connection2, is(connection));
}

@Test
/**
*
* Load test to prove concurrency issues mainly seen on heavy load.
*
*/
public void testLoad() {
clientRule.startServer(1000);

MockTcpClientEventListener listener = new MockTcpClientEventListener();
clientRule.getClient().subscribe(listener);


int number_of_iterations = 300;
int numberOfRequests = 10;

for(int j = 0; j < number_of_iterations; j++) {

List<Observable<String>> results = new ArrayList<>();

//Just giving the client some time to recover
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

for (int i = 0; i < numberOfRequests; i++) {
results.add(
fromCallable(new Func0<PooledConnection<ByteBuf, ByteBuf>>() {
@Override
public PooledConnection<ByteBuf, ByteBuf> call() {
return clientRule.connectWithCheck();
}
})
.flatMap(new Func1<PooledConnection<ByteBuf, ByteBuf>, Observable<String>>() {
@Override
public Observable<String> call(PooledConnection<ByteBuf, ByteBuf> connection) {
return connection.writeStringAndFlushOnEach(just("Hello"))
.toCompletable()
.<ByteBuf>toObservable()
.concatWith(connection.getInput())
.take(1)
.single()
.map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf byteBuf) {
try {

byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String result = new String(bytes);
return result;
} finally {
byteBuf.release();
}
}
}).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Assert.fail("Did not expect exception: " + throwable.getMessage());
throwable.printStackTrace();
}
});
}
}));
}
AssertableSubscriber<String> test = Observable.merge(results).test();
test.awaitTerminalEvent();
test.assertNoErrors();
}
}

@Test
/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import io.reactivex.netty.client.pool.SingleHostPoolingProviderFactory;
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import rx.Observable;
import rx.Observer;
import rx.functions.Func0;
import rx.observers.TestSubscriber;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -89,6 +93,48 @@ public PooledConnection<ByteBuf, ByteBuf> connect() {
return (PooledConnection<ByteBuf, ByteBuf>) cSub.getOnNextEvents().get(0);
}

public PooledConnection<ByteBuf, ByteBuf> connectWithCheck() {

final AtomicBoolean gotOnNext = new AtomicBoolean(false);

Observable<Connection<ByteBuf, ByteBuf>> got_no_connection = client.createConnectionRequest()
.doOnEach(new Observer<Connection<ByteBuf, ByteBuf>>() {
@Override
public void onCompleted() {
if(!gotOnNext.get()) {
//A PooledConnection could sometimes send onCompleted before the onNext event occurred.
Assert.fail("Should not get onCompletedBefore onNext");
}
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Connection<ByteBuf, ByteBuf> byteBufByteBufConnection) {
gotOnNext.set(true);
}
})
.switchIfEmpty(Observable.defer(new Func0<Observable<PooledConnection<ByteBuf, ByteBuf>>>() {
@Override
public Observable<PooledConnection<ByteBuf, ByteBuf>> call() {
return Observable.empty();
}
}));

TestSubscriber<Connection<ByteBuf, ByteBuf>> cSub = new TestSubscriber<>();
got_no_connection.subscribe(cSub);

cSub.awaitTerminalEvent();

cSub.assertNoErrors();

assertThat("No connection received.", cSub.getOnNextEvents(), hasSize(1));

return (PooledConnection<ByteBuf, ByteBuf>) cSub.getOnNextEvents().get(0);
}

private void createClient(final int maxConnections) {
InetSocketAddress serverAddr = new InetSocketAddress("127.0.0.1", server.getServerPort());
poolConfig = new PoolConfig<ByteBuf, ByteBuf>().maxConnections(maxConnections);
Expand Down

0 comments on commit d67b6b0

Please sign in to comment.