diff --git a/README.md b/README.md index 63aa36e..f3f82b8 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ operation (leasing/releasing many connections at once). ## Gradle ```groovy -compile group: 'com.github.akurilov', name: 'netty-connection-pool', version: '1.0.3' +compile group: 'com.github.akurilov', name: 'netty-connection-pool', version: '1.0.9' ``` ## Code Example diff --git a/build.gradle b/build.gradle index 2fbf526..5f27421 100644 --- a/build.gradle +++ b/build.gradle @@ -11,12 +11,12 @@ apply plugin: "maven" apply plugin: "signing" group = "com.github.akurilov" -version = "1.0.3" +version = "1.0.9" ext { moduleName = "${group}.netty.connection.pool" depVersion = [ - javaCommons: "[2.1.2,)", + javaCommons: "2.1.6", netty: "4.1.25.Final", ] diff --git a/src/main/java/com/github/akurilov/netty/connection/pool/BasicMultiNodeConnPool.java b/src/main/java/com/github/akurilov/netty/connection/pool/MultiNodeConnPoolImpl.java similarity index 61% rename from src/main/java/com/github/akurilov/netty/connection/pool/BasicMultiNodeConnPool.java rename to src/main/java/com/github/akurilov/netty/connection/pool/MultiNodeConnPoolImpl.java index 99da7a1..6d87b0b 100644 --- a/src/main/java/com/github/akurilov/netty/connection/pool/BasicMultiNodeConnPool.java +++ b/src/main/java/com/github/akurilov/netty/connection/pool/MultiNodeConnPoolImpl.java @@ -13,14 +13,15 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; @@ -31,17 +32,19 @@ * The provided semaphore limits the count of the simultaneously used connections. * Based on netty. */ -public class BasicMultiNodeConnPool +public class MultiNodeConnPoolImpl implements NonBlockingConnPool { - private final static Logger LOG = Logger.getLogger(BasicMultiNodeConnPool.class.getName()); + private final static Logger LOG = Logger.getLogger(MultiNodeConnPoolImpl.class.getName()); private final Semaphore concurrencyThrottle; private final String nodes[]; private final int n; private final int connAttemptsLimit; + private final long connectTimeOut; + private final TimeUnit connectTimeUnit; private final Map bootstraps; - private final Map> allConns; + private final Map> allConns; private final Map> availableConns; private final Object2IntMap connCounts; private final Object2IntMap failedConnAttemptCounts; @@ -49,54 +52,57 @@ public class BasicMultiNodeConnPool /** * @param concurrencyThrottle the throttle for the concurrency level control - * @param nodes the array of the endpoint nodes, any element may contain the port (followed after ":") to override the defaultPort argument - * @param bootstrap Netty's bootstrap instance - * @param connPoolHandler channel pool handler instance being notified upon new connection is created - * @param defaultPort default port used to connect (any node address from the nodes set may override this) - * @param connAttemptsLimit the max count of the subsequent connection failures to the node before the node will be excluded from the pool, 0 means no limit + * @param nodes the array of the endpoint nodes, any element may contain the port (followed after ":") to override the defaultPort argument + * @param bootstrap Netty's bootstrap instance + * @param connPoolHandler channel pool handler instance being notified upon new connection is created + * @param defaultPort default port used to connect (any node address from the nodes set may override this) + * @param connAttemptsLimit the max count of the subsequent connection failures to the node before the node will be excluded from the pool, 0 means no limit */ - public BasicMultiNodeConnPool( - final Semaphore concurrencyThrottle, final String nodes[], final Bootstrap bootstrap, - final ChannelPoolHandler connPoolHandler, final int defaultPort, final int connAttemptsLimit + public MultiNodeConnPoolImpl( + final Semaphore concurrencyThrottle, final String nodes[], final Bootstrap bootstrap, + final ChannelPoolHandler connPoolHandler, final int defaultPort, final int connAttemptsLimit, + final long connectTimeOut, final TimeUnit connectTimeUnit ) { this.concurrencyThrottle = concurrencyThrottle; - if (nodes.length == 0) { + if(nodes.length == 0) { throw new IllegalArgumentException("Empty nodes array argument"); } this.nodes = nodes; this.connAttemptsLimit = connAttemptsLimit; + this.connectTimeOut = connectTimeOut; + this.connectTimeUnit = connectTimeUnit; this.n = nodes.length; bootstraps = new HashMap<>(n); - allConns = new HashMap<>(n); - availableConns = new HashMap<>(n); + allConns = new ConcurrentHashMap<>(n); + availableConns = new ConcurrentHashMap<>(n); connCounts = new Object2IntOpenHashMap<>(n); failedConnAttemptCounts = new Object2IntOpenHashMap<>(n); - for (final String node : nodes) { + for(final String node : nodes) { final InetSocketAddress nodeAddr; - if (node.contains(":")) { + if(node.contains(":")) { final String addrParts[] = node.split(":"); nodeAddr = new InetSocketAddress(addrParts[0], Integer.parseInt(addrParts[1])); } else { nodeAddr = new InetSocketAddress(node, defaultPort); } bootstraps.put( - node, - bootstrap - .clone() - .remoteAddress(nodeAddr) - .handler( - new ChannelInitializer() { - @Override - protected final void initChannel(final Channel conn) - throws Exception { - if (!conn.eventLoop().inEventLoop()) { - throw new AssertionError(); - } - connPoolHandler.channelCreated(conn); - } - } - ) + node, + bootstrap + .clone() + .remoteAddress(nodeAddr) + .handler( + new ChannelInitializer() { + @Override + protected final void initChannel(final Channel conn) + throws Exception { + if(!conn.eventLoop().inEventLoop()) { + throw new AssertionError(); + } + connPoolHandler.channelCreated(conn); + } + } + ) ); availableConns.put(node, new ConcurrentLinkedQueue<>()); connCounts.put(node, 0); @@ -105,20 +111,18 @@ protected final void initChannel(final Channel conn) } @Override - public void preCreateConnections(final int count) + public void preConnect(final int count) throws ConnectException, IllegalArgumentException { - if (count > 0) { - for (int i = 0; i < count; i++) { + if(count > 0) { + for(int i = 0; i < count; i ++) { final Channel conn = connectToAnyNode(); - if (conn == null) { - throw new ConnectException( - "Failed to pre-create the connections to the target nodes" - ); + if(conn == null) { + throw new ConnectException("Failed to pre-create the connections to the target nodes"); } final String nodeAddr = conn.attr(ATTR_KEY_NODE).get(); - if (conn.isActive()) { + if(conn.isActive()) { final Queue connQueue = availableConns.get(nodeAddr); - if (connQueue != null) { + if(connQueue != null) { connQueue.add(conn); } } else { @@ -144,18 +148,18 @@ private CloseChannelListener(final String nodeAddr, final Channel conn) { @Override public final void operationComplete(final ChannelFuture future) - throws Exception { + throws Exception { LOG.fine("Connection to " + nodeAddr + " closed"); closeLock.lock(); try { - synchronized (connCounts) { - if (connCounts.containsKey(nodeAddr)) { + synchronized(connCounts) { + if(connCounts.containsKey(nodeAddr)) { connCounts.put(nodeAddr, connCounts.getInt(nodeAddr) - 1); } } - synchronized (allConns) { - final List nodeConns = allConns.get(nodeAddr); - if (nodeConns != null) { + synchronized(allConns) { + final Queue nodeConns = allConns.get(nodeAddr); + if(nodeConns != null) { nodeConns.remove(conn); } } @@ -177,56 +181,50 @@ private Channel connectToAnyNode() int minConnsCount = Integer.MAX_VALUE; int nextConnsCount = 0; final int i = ThreadLocalRandom.current().nextInt(n); - for (int j = i; j < n; j++) { + for(int j = i; j < n; j ++) { nextNodeAddr = nodes[j % n]; nextConnsCount = connCounts.getInt(nextNodeAddr); - if (nextConnsCount == 0) { + if(nextConnsCount == 0) { nodeAddr = nextNodeAddr; break; - } else if (nextConnsCount < minConnsCount) { + } else if(nextConnsCount < minConnsCount) { minConnsCount = nextConnsCount; nodeAddr = nextNodeAddr; } } - if (nodeAddr != null) { + if(nodeAddr != null) { // connect to the selected endpoint node LOG.fine("New connection to \"" + nodeAddr + "\""); try { conn = connect(nodeAddr); - } catch (final Exception e) { - LOG.warning( - "Failed to create a new connection to " + nodeAddr + ": " + e.toString() - ); - if (connAttemptsLimit > 0) { - final int selectedNodeFailedConnAttemptsCount = failedConnAttemptCounts - .getInt(nodeAddr) + 1; - failedConnAttemptCounts.put( - nodeAddr, selectedNodeFailedConnAttemptsCount - ); - if (selectedNodeFailedConnAttemptsCount > connAttemptsLimit) { + } catch(final Exception e) { + LOG.warning("Failed to create a new connection to " + nodeAddr + ": " + e.toString()); + if(connAttemptsLimit > 0) { + final int selectedNodeFailedConnAttemptsCount = failedConnAttemptCounts.getInt(nodeAddr) + 1; + failedConnAttemptCounts.put(nodeAddr, selectedNodeFailedConnAttemptsCount); + if(selectedNodeFailedConnAttemptsCount > connAttemptsLimit) { LOG.warning( - "Failed to connect to the node \"" + nodeAddr + "\" " - + selectedNodeFailedConnAttemptsCount + " times successively, " - + "excluding the node from the connection pool forever" + "Failed to connect to the node \"" + nodeAddr + "\" " + selectedNodeFailedConnAttemptsCount + + " times successively, excluding the node from the connection pool forever" ); // the node having virtually Integer.MAX_VALUE established connections // will never be selected by the algorithm connCounts.put(nodeAddr, Integer.MAX_VALUE); boolean allNodesExcluded = true; - for (final String node : nodes) { - if (connCounts.getInt(node) < Integer.MAX_VALUE) { + for(final String node : nodes) { + if(connCounts.getInt(node) < Integer.MAX_VALUE) { allNodesExcluded = false; break; } } - if (allNodesExcluded) { + if(allNodesExcluded) { LOG.severe("No endpoint nodes left in the connection pool!"); } } } - if (e instanceof ConnectException) { + if(e instanceof ConnectException) { throw (ConnectException) e; } else { throw new ConnectException(e.getMessage()); @@ -234,14 +232,14 @@ private Channel connectToAnyNode() } } - if (conn != null) { + if(conn != null) { conn.closeFuture().addListener(new CloseChannelListener(nodeAddr, conn)); conn.attr(ATTR_KEY_NODE).set(nodeAddr); - allConns.computeIfAbsent(nodeAddr, na -> new ArrayList<>()).add(conn); - synchronized (connCounts) { + allConns.computeIfAbsent(nodeAddr, na -> new ConcurrentLinkedQueue<>()).add(conn); + synchronized(connCounts) { connCounts.put(nodeAddr, connCounts.getInt(nodeAddr) + 1); } - if (connAttemptsLimit > 0) { + if(connAttemptsLimit > 0) { // reset the connection failures counter if connected successfully failedConnAttemptCounts.put(nodeAddr, 0); } @@ -253,27 +251,35 @@ private Channel connectToAnyNode() protected Channel connect(final String addr) throws Exception { + Channel conn = null; final Bootstrap bootstrap = bootstraps.get(addr); - if (bootstrap != null) { - return bootstrap.connect().sync().channel(); + if(bootstrap != null) { + final ChannelFuture connFuture = bootstrap.connect(); + if(connectTimeOut > 0) { + if(connFuture.await(connectTimeOut, connectTimeUnit)) { + conn = connFuture.channel(); + } + } else { + conn = connFuture.sync().channel(); + } } - return null; + return conn; } protected Channel poll() { final int i = ThreadLocalRandom.current().nextInt(n); Queue connQueue; - Channel conn; - for (int j = i; j < i + n; j++) { + Channel conn = null; + for(int j = i; j < i + n; j ++) { connQueue = availableConns.get(nodes[j % n]); - if (connQueue != null) { + if(connQueue != null) { conn = connQueue.poll(); - if (conn != null && conn.isActive()) { - return conn; + if(conn != null && conn.isOpen()) { + break; } } } - return null; + return conn; } @Override @@ -281,45 +287,45 @@ public final Channel lease() throws ConnectException { Channel conn = null; if (concurrencyThrottle.tryAcquire()) { - if (null == (conn = poll())) { + if(null == (conn = poll())) { try { conn = connectToAnyNode(); - } catch (final ConnectException e) { + } catch(final ConnectException e) { concurrencyThrottle.release(); throw e; } } - if (conn == null) { + if(conn == null) { concurrencyThrottle.release(); throw new ConnectException(); } } return conn; } - + @Override public final int lease(final List conns, final int maxCount) throws ConnectException { int availableCount = concurrencyThrottle.drainPermits(); - if (availableCount == 0) { + if(availableCount == 0) { return availableCount; } - if (availableCount > maxCount) { + if(availableCount > maxCount) { concurrencyThrottle.release(availableCount - maxCount); availableCount = maxCount; } - + Channel conn; - for (int i = 0; i < availableCount; i++) { - if (null == (conn = poll())) { + for(int i = 0; i < availableCount; i++) { + if(null == (conn = poll())) { try { conn = connectToAnyNode(); - } catch (final ConnectException e) { + } catch(final ConnectException e) { concurrencyThrottle.release(); throw e; } } - if (conn == null) { + if(conn == null) { concurrencyThrottle.release(availableCount - i); throw new ConnectException(); } else { @@ -332,9 +338,9 @@ public final int lease(final List conns, final int maxCount) @Override public final void release(final Channel conn) { final String nodeAddr = conn.attr(ATTR_KEY_NODE).get(); - if (conn.isActive()) { + if(conn.isActive()) { final Queue connQueue = availableConns.get(nodeAddr); - if (connQueue != null) { + if(connQueue != null) { connQueue.add(conn); } concurrencyThrottle.release(); @@ -342,14 +348,14 @@ public final void release(final Channel conn) { conn.close(); } } - + @Override public final void release(final List conns) { String nodeAddr; Queue connQueue; - for (final Channel conn : conns) { + for(final Channel conn : conns) { nodeAddr = conn.attr(ATTR_KEY_NODE).get(); - if (conn.isActive()) { + if(conn.isActive()) { connQueue = availableConns.get(nodeAddr); connQueue.add(conn); concurrencyThrottle.release(); @@ -364,26 +370,30 @@ public void close() throws IOException { closeLock.lock(); int closedConnCount = 0; - for (final String nodeAddr : availableConns.keySet()) { - for (final Channel conn : availableConns.get(nodeAddr)) { - if (conn.isOpen()) { - conn.close(); - closedConnCount++; + try { + for(final String nodeAddr: availableConns.keySet()) { + for(final Channel conn: availableConns.get(nodeAddr)) { + if(conn.isOpen()) { + conn.close(); + closedConnCount ++; + } } } - } - availableConns.clear(); - for (final String nodeAddr : allConns.keySet()) { - for (final Channel conn : allConns.get(nodeAddr)) { - if (conn.isOpen()) { - conn.close(); - closedConnCount++; + availableConns.clear(); + for(final String nodeAddr: allConns.keySet()) { + for(final Channel conn: allConns.get(nodeAddr)) { + if(conn.isOpen()) { + conn.close(); + closedConnCount ++; + } } } + allConns.clear(); + bootstraps.clear(); + connCounts.clear(); + } finally { + closeLock.unlock(); } - allConns.clear(); - bootstraps.clear(); - connCounts.clear(); LOG.fine("Closed " + closedConnCount + " connections"); } } diff --git a/src/main/java/com/github/akurilov/netty/connection/pool/NonBlockingConnPool.java b/src/main/java/com/github/akurilov/netty/connection/pool/NonBlockingConnPool.java index db8275a..8b996ed 100644 --- a/src/main/java/com/github/akurilov/netty/connection/pool/NonBlockingConnPool.java +++ b/src/main/java/com/github/akurilov/netty/connection/pool/NonBlockingConnPool.java @@ -20,7 +20,7 @@ public interface NonBlockingConnPool @throws ConnectException if failed to connect @throws IllegalArgumentException if count is less than 1 */ - void preCreateConnections(final int count) + void preConnect(final int count) throws ConnectException, IllegalArgumentException; /** diff --git a/src/test/java/com/github/akurilov/netty/connection/pool/test/util/DummyChannelPoolHandler.java b/src/main/java/com/github/akurilov/netty/connection/pool/mock/DummyChannelPoolHandler.java similarity index 88% rename from src/test/java/com/github/akurilov/netty/connection/pool/test/util/DummyChannelPoolHandler.java rename to src/main/java/com/github/akurilov/netty/connection/pool/mock/DummyChannelPoolHandler.java index 1bca902..25b06a8 100644 --- a/src/test/java/com/github/akurilov/netty/connection/pool/test/util/DummyChannelPoolHandler.java +++ b/src/main/java/com/github/akurilov/netty/connection/pool/mock/DummyChannelPoolHandler.java @@ -1,4 +1,4 @@ -package com.github.akurilov.netty.connection.pool.test.util; +package com.github.akurilov.netty.connection.pool.mock; import io.netty.channel.Channel; import io.netty.channel.pool.ChannelPoolHandler; diff --git a/src/test/java/com/github/akurilov/netty/connection/pool/test/util/DummyClientChannelHandler.java b/src/main/java/com/github/akurilov/netty/connection/pool/mock/DummyClientChannelHandler.java similarity index 89% rename from src/test/java/com/github/akurilov/netty/connection/pool/test/util/DummyClientChannelHandler.java rename to src/main/java/com/github/akurilov/netty/connection/pool/mock/DummyClientChannelHandler.java index c43be73..340b7bd 100644 --- a/src/test/java/com/github/akurilov/netty/connection/pool/test/util/DummyClientChannelHandler.java +++ b/src/main/java/com/github/akurilov/netty/connection/pool/mock/DummyClientChannelHandler.java @@ -1,4 +1,4 @@ -package com.github.akurilov.netty.connection.pool.test.util; +package com.github.akurilov.netty.connection.pool.mock; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; diff --git a/src/test/java/com/github/akurilov/netty/connection/pool/test/util/BasicMultiNodeConnPoolMock.java b/src/main/java/com/github/akurilov/netty/connection/pool/mock/MultiNodeConnPoolMock.java similarity index 67% rename from src/test/java/com/github/akurilov/netty/connection/pool/test/util/BasicMultiNodeConnPoolMock.java rename to src/main/java/com/github/akurilov/netty/connection/pool/mock/MultiNodeConnPoolMock.java index b89a6b6..a3bb476 100644 --- a/src/test/java/com/github/akurilov/netty/connection/pool/test/util/BasicMultiNodeConnPoolMock.java +++ b/src/main/java/com/github/akurilov/netty/connection/pool/mock/MultiNodeConnPoolMock.java @@ -1,29 +1,30 @@ -package com.github.akurilov.netty.connection.pool.test.util; +package com.github.akurilov.netty.connection.pool.mock; -import com.github.akurilov.netty.connection.pool.BasicMultiNodeConnPool; +import com.github.akurilov.netty.connection.pool.MultiNodeConnPoolImpl; import com.github.akurilov.netty.connection.pool.NonBlockingConnPool; + import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.pool.ChannelPoolHandler; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; /** Created by andrey on 12.05.17. */ -public final class BasicMultiNodeConnPoolMock -extends BasicMultiNodeConnPool +public final class MultiNodeConnPoolMock +extends MultiNodeConnPoolImpl implements NonBlockingConnPool { - public BasicMultiNodeConnPoolMock( + public MultiNodeConnPoolMock( final Semaphore concurrencyThrottle, final String[] nodes, final Bootstrap bootstrap, - final ChannelPoolHandler connPoolHandler, final int defaultPort, - final int connFailSeqLenLimit + final ChannelPoolHandler connPoolHandler, final int defaultPort, final int connFailSeqLenLimit ) { super( - concurrencyThrottle, nodes, bootstrap, connPoolHandler, defaultPort, - connFailSeqLenLimit + concurrencyThrottle, nodes, bootstrap, connPoolHandler, defaultPort, connFailSeqLenLimit, 0, + TimeUnit.SECONDS ); } diff --git a/src/test/java/com/github/akurilov/netty/connection/pool/test/EpollConnDropTest.java b/src/test/java/com/github/akurilov/netty/connection/pool/test/EpollConnDropTest.java index c73d05a..f981dfb 100644 --- a/src/test/java/com/github/akurilov/netty/connection/pool/test/EpollConnDropTest.java +++ b/src/test/java/com/github/akurilov/netty/connection/pool/test/EpollConnDropTest.java @@ -1,9 +1,9 @@ package com.github.akurilov.netty.connection.pool.test; -import com.github.akurilov.netty.connection.pool.BasicMultiNodeConnPool; +import com.github.akurilov.netty.connection.pool.MultiNodeConnPoolImpl; import com.github.akurilov.netty.connection.pool.NonBlockingConnPool; -import com.github.akurilov.netty.connection.pool.test.util.DummyChannelPoolHandler; -import com.github.akurilov.netty.connection.pool.test.util.DummyClientChannelHandler; +import com.github.akurilov.netty.connection.pool.mock.DummyChannelPoolHandler; +import com.github.akurilov.netty.connection.pool.mock.DummyClientChannelHandler; import com.github.akurilov.netty.connection.pool.test.util.EpollConnDroppingServer; import io.netty.bootstrap.Bootstrap; @@ -73,10 +73,10 @@ protected final void initChannel(final SocketChannel conn) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.TCP_NODELAY, true); - connPool = new BasicMultiNodeConnPool( - concurrencyThrottle, NODES, bootstrap, CPH, DEFAULT_PORT, 0 + connPool = new MultiNodeConnPoolImpl( + concurrencyThrottle, NODES, bootstrap, CPH, DEFAULT_PORT, 0, 0, TimeUnit.SECONDS ); - connPool.preCreateConnections(CONCURRENCY); + connPool.preConnect(CONCURRENCY); } @After diff --git a/src/test/java/com/github/akurilov/netty/connection/pool/test/EpollConnLeakTest.java b/src/test/java/com/github/akurilov/netty/connection/pool/test/EpollConnLeakTest.java index 2b688bb..99dc8a6 100644 --- a/src/test/java/com/github/akurilov/netty/connection/pool/test/EpollConnLeakTest.java +++ b/src/test/java/com/github/akurilov/netty/connection/pool/test/EpollConnLeakTest.java @@ -1,9 +1,9 @@ package com.github.akurilov.netty.connection.pool.test; -import com.github.akurilov.netty.connection.pool.BasicMultiNodeConnPool; +import com.github.akurilov.netty.connection.pool.MultiNodeConnPoolImpl; import com.github.akurilov.netty.connection.pool.NonBlockingConnPool; -import com.github.akurilov.netty.connection.pool.test.util.DummyChannelPoolHandler; -import com.github.akurilov.netty.connection.pool.test.util.DummyClientChannelHandler; +import com.github.akurilov.netty.connection.pool.mock.DummyChannelPoolHandler; +import com.github.akurilov.netty.connection.pool.mock.DummyClientChannelHandler; import com.github.akurilov.netty.connection.pool.test.util.EpollConnDroppingServer; import com.github.akurilov.netty.connection.pool.test.util.PortTools; @@ -70,10 +70,10 @@ protected final void initChannel(final SocketChannel conn) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.TCP_NODELAY, true); - connPool = new BasicMultiNodeConnPool( - concurrencyThrottle, NODES, bootstrap, CPH, DEFAULT_PORT, 0 + connPool = new MultiNodeConnPoolImpl( + concurrencyThrottle, NODES, bootstrap, CPH, DEFAULT_PORT, 0, 0, TimeUnit.SECONDS ); - connPool.preCreateConnections(CONCURRENCY); + connPool.preConnect(CONCURRENCY); // use final ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY); diff --git a/src/test/java/com/github/akurilov/netty/connection/pool/test/BasicMultiNodeConnPoolTest.java b/src/test/java/com/github/akurilov/netty/connection/pool/test/MultiNodeConnPoolImplTest.java similarity index 91% rename from src/test/java/com/github/akurilov/netty/connection/pool/test/BasicMultiNodeConnPoolTest.java rename to src/test/java/com/github/akurilov/netty/connection/pool/test/MultiNodeConnPoolImplTest.java index 47031d8..fddd64f 100644 --- a/src/test/java/com/github/akurilov/netty/connection/pool/test/BasicMultiNodeConnPoolTest.java +++ b/src/test/java/com/github/akurilov/netty/connection/pool/test/MultiNodeConnPoolImplTest.java @@ -3,8 +3,8 @@ import static com.github.akurilov.netty.connection.pool.NonBlockingConnPool.ATTR_KEY_NODE; import com.github.akurilov.netty.connection.pool.NonBlockingConnPool; -import com.github.akurilov.netty.connection.pool.test.util.BasicMultiNodeConnPoolMock; -import com.github.akurilov.netty.connection.pool.test.util.DummyChannelPoolHandler; +import com.github.akurilov.netty.connection.pool.mock.MultiNodeConnPoolMock; +import com.github.akurilov.netty.connection.pool.mock.DummyChannelPoolHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -30,7 +30,7 @@ Created by andrey on 12.05.17. */ @RunWith(Parameterized.class) -public class BasicMultiNodeConnPoolTest { +public class MultiNodeConnPoolImplTest { private static final int TEST_STEP_TIME_SECONDS = 50; private static final int BATCH_SIZE = 0x1000; @@ -51,7 +51,7 @@ public static Collection generateData() { ); } - public BasicMultiNodeConnPoolTest(final int concurrencyLevel, final int nodeCount) { + public MultiNodeConnPoolImplTest(final int concurrencyLevel, final int nodeCount) { this.concurrencyLevel = concurrencyLevel; this.nodeCount = nodeCount; final String[] nodes = new String[nodeCount]; @@ -59,7 +59,7 @@ public BasicMultiNodeConnPoolTest(final int concurrencyLevel, final int nodeCoun nodes[i] = Integer.toString(i); } try( - final NonBlockingConnPool connPool = new BasicMultiNodeConnPoolMock( + final NonBlockingConnPool connPool = new MultiNodeConnPoolMock( new Semaphore(concurrencyLevel), nodes, new Bootstrap(), new DummyChannelPoolHandler(), 12345, 0 ) diff --git a/src/test/java/com/github/akurilov/netty/connection/pool/test/NioConnDropTest.java b/src/test/java/com/github/akurilov/netty/connection/pool/test/NioConnDropTest.java index 4542332..2d5cbd8 100644 --- a/src/test/java/com/github/akurilov/netty/connection/pool/test/NioConnDropTest.java +++ b/src/test/java/com/github/akurilov/netty/connection/pool/test/NioConnDropTest.java @@ -1,9 +1,9 @@ package com.github.akurilov.netty.connection.pool.test; -import com.github.akurilov.netty.connection.pool.BasicMultiNodeConnPool; +import com.github.akurilov.netty.connection.pool.MultiNodeConnPoolImpl; import com.github.akurilov.netty.connection.pool.NonBlockingConnPool; -import com.github.akurilov.netty.connection.pool.test.util.DummyChannelPoolHandler; -import com.github.akurilov.netty.connection.pool.test.util.DummyClientChannelHandler; +import com.github.akurilov.netty.connection.pool.mock.DummyChannelPoolHandler; +import com.github.akurilov.netty.connection.pool.mock.DummyClientChannelHandler; import com.github.akurilov.netty.connection.pool.test.util.NioConnDroppingServer; import io.netty.bootstrap.Bootstrap; @@ -72,10 +72,10 @@ protected final void initChannel(final SocketChannel conn) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.TCP_NODELAY, true); - connPool = new BasicMultiNodeConnPool( - concurrencyThrottle, NODES, bootstrap, CPH, DEFAULT_PORT, 0 + connPool = new MultiNodeConnPoolImpl( + concurrencyThrottle, NODES, bootstrap, CPH, DEFAULT_PORT, 0, 0, TimeUnit.SECONDS ); - connPool.preCreateConnections(CONCURRENCY); + connPool.preConnect(CONCURRENCY); } @After @@ -112,9 +112,8 @@ public void test() } ); } - executor.shutdown(); - executor.awaitTermination(TEST_TIME_SECONDS, TimeUnit.SECONDS); - assertTrue(executor.isTerminated()); + TimeUnit.SECONDS.sleep(TEST_TIME_SECONDS); + executor.shutdownNow(); assertEquals( CONCURRENCY * CONN_ATTEMPTS, connCounter.sum(), 2 * CONCURRENCY * CONN_ATTEMPTS / FAIL_EVERY_CONN_ATTEMPT diff --git a/src/test/java/com/github/akurilov/netty/connection/pool/test/NioReconnectionTest.java b/src/test/java/com/github/akurilov/netty/connection/pool/test/NioReconnectionTest.java index 703e98f..d255928 100644 --- a/src/test/java/com/github/akurilov/netty/connection/pool/test/NioReconnectionTest.java +++ b/src/test/java/com/github/akurilov/netty/connection/pool/test/NioReconnectionTest.java @@ -1,9 +1,9 @@ package com.github.akurilov.netty.connection.pool.test; -import com.github.akurilov.netty.connection.pool.BasicMultiNodeConnPool; +import com.github.akurilov.netty.connection.pool.MultiNodeConnPoolImpl; import com.github.akurilov.netty.connection.pool.NonBlockingConnPool; -import com.github.akurilov.netty.connection.pool.test.util.DummyChannelPoolHandler; -import com.github.akurilov.netty.connection.pool.test.util.DummyClientChannelHandler; +import com.github.akurilov.netty.connection.pool.mock.DummyChannelPoolHandler; +import com.github.akurilov.netty.connection.pool.mock.DummyClientChannelHandler; import com.github.akurilov.netty.connection.pool.test.util.NioConnDroppingServer; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; @@ -31,121 +31,101 @@ import static org.junit.Assert.assertTrue; /** - * Created by andrey on 16.11.17. + Created by andrey on 16.11.17. */ public class NioReconnectionTest { - private static final int CONCURRENCY = 10; - private static final String[] NODES = new String[]{"127.0.0.1"}; - private static final ChannelPoolHandler CPH = new DummyChannelPoolHandler(); - private static final int DEFAULT_PORT = 12_345; - private static final long TEST_TIME_SECONDS = 100; - private static final int CONN_ATTEMPTS = 10_000; - private static final int FAIL_EVERY_CONN_ATTEMPT = Integer.MAX_VALUE; - private static final ByteBuf PAYLOAD = Unpooled.directBuffer(0x1000).writeZero(0x1000); - - Closeable serverMock; - NonBlockingConnPool connPool; - EventLoopGroup group; - - @Before - public void setUp() - throws Exception { - - serverMock = new NioConnDroppingServer(DEFAULT_PORT, FAIL_EVERY_CONN_ATTEMPT); - - final Semaphore concurrencyThrottle = new Semaphore(CONCURRENCY); - group = new NioEventLoopGroup(); - final Bootstrap bootstrap = new Bootstrap() - .group(group) - .channel(NioSocketChannel.class) - .handler( - new ChannelInitializer() { - @Override - protected final void initChannel(final SocketChannel conn) - throws Exception { - conn.pipeline().addLast(new DummyClientChannelHandler()); - } - } - ) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.TCP_NODELAY, true); - connPool = new BasicMultiNodeConnPool( - concurrencyThrottle, NODES, bootstrap, CPH, DEFAULT_PORT, 0 - ); - connPool.preCreateConnections(CONCURRENCY); - } - - @After - public void tearDown() - throws Exception { - connPool.close(); - group.shutdownGracefully(); - serverMock.close(); - } - - @Test - public void test() - throws Exception { - final LongAdder connCounter = new LongAdder(); - final ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY); - for (int i = 0; i < CONCURRENCY / 2; ++i) { - executor.submit( - () -> { - Channel conn; - for (int j = 0; j < CONN_ATTEMPTS; j++) { - try { - while (null == (conn = connPool.lease())) { - Thread.sleep(1); - } - conn.writeAndFlush(PAYLOAD.retain()).sync(); - connPool.release(conn); - //connCounter.increment(); - } catch (final InterruptedException e) { - break; - } catch (final Throwable cause) { - cause.printStackTrace(System.err); - } - } - } - ); - } - - serverMock.close(); - serverMock = new NioConnDroppingServer(DEFAULT_PORT, FAIL_EVERY_CONN_ATTEMPT); - - for (int i = CONCURRENCY / 2; i < CONCURRENCY; ++i) { - executor.submit( - () -> { - Channel conn; - for (int j = 0; j < CONN_ATTEMPTS; j++) { - try { - while (null == (conn = connPool.lease())) { - Thread.sleep(1); - } - conn.writeAndFlush(PAYLOAD.retain()).sync(); - connPool.release(conn); - connCounter.increment(); - } catch (final InterruptedException e) { - break; - } catch (final Throwable cause) { - cause.printStackTrace(System.err); - } - } - } - ); - } - - executor.shutdown(); - executor.awaitTermination(TEST_TIME_SECONDS, TimeUnit.SECONDS); - - System.out.println("Expected conn: " + CONCURRENCY * CONN_ATTEMPTS / 2); - System.out.println("Created conn: " + connCounter.sum()); - - assertTrue(executor.isTerminated()); - assertEquals( - CONCURRENCY * CONN_ATTEMPTS / 2, connCounter.sum(),0 - ); - } + private static final int CONCURRENCY = 10; + private static final String[] NODES = new String[] { "127.0.0.1" }; + private static final ChannelPoolHandler CPH = new DummyChannelPoolHandler(); + private static final int DEFAULT_PORT = 12_345; + private static final long TEST_TIME_SECONDS = 100; + private static final int CONN_ATTEMPTS = 10_000; + private static final int FAIL_EVERY_CONN_ATTEMPT = Integer.MAX_VALUE; + private static final ByteBuf PAYLOAD = Unpooled.directBuffer(0x1000).writeZero(0x1000); + private Closeable serverMock; + private NonBlockingConnPool connPool; + private EventLoopGroup group; + + @Before + public void setUp() + throws Exception { + serverMock = new NioConnDroppingServer(DEFAULT_PORT, FAIL_EVERY_CONN_ATTEMPT); + final Semaphore concurrencyThrottle = new Semaphore(CONCURRENCY); + group = new NioEventLoopGroup(); + final Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler( + new ChannelInitializer() { + @Override + protected final void initChannel(final SocketChannel conn) + throws Exception { + conn.pipeline().addLast(new DummyClientChannelHandler()); + } + }).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true).option( + ChannelOption.TCP_NODELAY, true); + connPool = new MultiNodeConnPoolImpl( + concurrencyThrottle, NODES, bootstrap, CPH, DEFAULT_PORT, 0, 0, TimeUnit.SECONDS + ); + connPool.preConnect(CONCURRENCY); + } + + @After + public void tearDown() + throws Exception { + connPool.close(); + group.shutdownGracefully(); + serverMock.close(); + } + + @Test + public void test() + throws Exception { + final LongAdder connCounter = new LongAdder(); + final ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY); + for(int i = 0; i < CONCURRENCY / 2; ++ i) { + executor.submit(() -> { + Channel conn; + for(int j = 0; j < CONN_ATTEMPTS; j++) { + try { + while(null == (conn = connPool.lease())) { + Thread.sleep(1); + } + conn.writeAndFlush(PAYLOAD.retain()).sync(); + connPool.release(conn); + //connCounter.increment(); + } catch(final InterruptedException e) { + break; + } catch(final Throwable cause) { + cause.printStackTrace(System.err); + } + } + }); + } + serverMock.close(); + serverMock = new NioConnDroppingServer(DEFAULT_PORT, FAIL_EVERY_CONN_ATTEMPT); + for(int i = CONCURRENCY / 2; i < CONCURRENCY; ++ i) { + executor.submit(() -> { + Channel conn; + for(int j = 0; j < CONN_ATTEMPTS; j++) { + try { + while(null == (conn = connPool.lease())) { + Thread.sleep(1); + } + conn.writeAndFlush(PAYLOAD.retain()).sync(); + connPool.release(conn); + connCounter.increment(); + } catch(final InterruptedException e) { + break; + } catch(final Throwable cause) { + cause.printStackTrace(System.err); + } + } + }); + } + executor.shutdown(); + executor.awaitTermination(TEST_TIME_SECONDS, TimeUnit.SECONDS); + System.out.println("Expected conn: " + CONCURRENCY * CONN_ATTEMPTS / 2); + System.out.println("Created conn: " + connCounter.sum()); + assertTrue(executor.isTerminated()); + assertEquals(CONCURRENCY * CONN_ATTEMPTS / 2, connCounter.sum(), 0); + } }