From c1d2a4d92537d7e4731e4c79e4b85c5ae4d672e2 Mon Sep 17 00:00:00 2001 From: Mikko Kortelainen Date: Thu, 21 Nov 2024 17:11:28 +0200 Subject: [PATCH] Connection pool (#70) * add pool from lsh_01 * RenewableRelpConnection * move managed client code to com.teragrep.rlp_01.client * add PoolTest * improve PoolTest * extract Pool interface from UnboundPool * refactor RelpConnectionFactory to support SSLEngine provision * remove unused import from RelpConnectionFactory * SendMessageTest from rlp_03, it rather belongs here * test connection pooling * include cleanup to ManagedConnectionTest * change Period to Duration in RelpConfig * add testPooledRenewedConnections and testPooledReboundConnections * change rebound managed connections to disconnect cleanly by adding new method reconnect() to IManagedRelpConnection, improve tests * change assertAll -> assertDoesNotThrow. in SendMessageTest use AfterEach for messageList cleanup * remove Thread.sleep from trace-logging in SendMessageTest --- pom.xml | 12 + .../rlp_01/client/IManagedRelpConnection.java | 29 ++ .../rlp_01/client/IRelpConnection.java | 58 ++++ .../rlp_01/client/ManagedRelpConnection.java | 125 ++++++++ .../client/ManagedRelpConnectionStub.java | 52 +++ .../client/RebindableRelpConnection.java | 66 ++++ .../teragrep/rlp_01/client/RelpConfig.java | 23 ++ .../rlp_01/client/RelpConnectionFactory.java | 58 ++++ .../client/RelpConnectionWithConfig.java | 115 +++++++ .../client/RenewableRelpConnection.java | 55 ++++ .../rlp_01/client/SSLContextSupplier.java | 13 + .../client/SSLContextSupplierKeystore.java | 38 +++ .../rlp_01/client/SSLContextSupplierStub.java | 16 + .../java/com/teragrep/rlp_01/pool/Pool.java | 27 ++ .../com/teragrep/rlp_01/pool/Poolable.java | 23 ++ .../com/teragrep/rlp_01/pool/Stubable.java | 22 ++ .../com/teragrep/rlp_01/pool/UnboundPool.java | 100 ++++++ .../com/teragrep/rlp_01/SendMessageTest.java | 208 ++++++++++++ .../rlp_01/client/ManagedConnectionTest.java | 303 ++++++++++++++++++ .../com/teragrep/rlp_01/pool/PoolTest.java | 109 +++++++ 20 files changed, 1452 insertions(+) create mode 100644 src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/IRelpConnection.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/RelpConfig.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/RelpConnectionFactory.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/RelpConnectionWithConfig.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/SSLContextSupplier.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierKeystore.java create mode 100644 src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierStub.java create mode 100644 src/main/java/com/teragrep/rlp_01/pool/Pool.java create mode 100644 src/main/java/com/teragrep/rlp_01/pool/Poolable.java create mode 100644 src/main/java/com/teragrep/rlp_01/pool/Stubable.java create mode 100644 src/main/java/com/teragrep/rlp_01/pool/UnboundPool.java create mode 100644 src/test/java/com/teragrep/rlp_01/SendMessageTest.java create mode 100644 src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java create mode 100644 src/test/java/com/teragrep/rlp_01/pool/PoolTest.java diff --git a/pom.xml b/pom.xml index dc850a3..21541b3 100644 --- a/pom.xml +++ b/pom.xml @@ -64,6 +64,18 @@ test ${junit.version} + + com.teragrep + rlp_03 + 9.0.0 + test + + + org.slf4j + slf4j-simple + 2.0.13 + test + diff --git a/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java new file mode 100644 index 0000000..07bc935 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java @@ -0,0 +1,29 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.client; + +import com.teragrep.rlp_01.pool.Poolable; + +import java.io.IOException; + +public interface IManagedRelpConnection extends Poolable { + void reconnect(); + + void connect() throws IOException; + void forceReconnect(); + void ensureSent(byte[] bytes); +} diff --git a/src/main/java/com/teragrep/rlp_01/client/IRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/IRelpConnection.java new file mode 100644 index 0000000..ed0171c --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/IRelpConnection.java @@ -0,0 +1,58 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.client; + +import com.teragrep.rlp_01.RelpBatch; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +// TODO refactor RelpConnection into an interface and RelpConnectionImpl and remove this +public interface IRelpConnection { + + int getReadTimeout(); + + void setReadTimeout(int readTimeout); + + int getWriteTimeout(); + + void setWriteTimeout(int writeTimeout); + + int getConnectionTimeout(); + + void setConnectionTimeout(int timeout); + + void setKeepAlive(boolean on); + + int getRxBufferSize(); + + void setRxBufferSize(int size); + + int getTxBufferSize(); + + void setTxBufferSize(int size); + + boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException; + + void tearDown(); + + boolean disconnect() throws IOException, IllegalStateException, TimeoutException; + + void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException; + + RelpConfig relpConfig(); +} diff --git a/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java new file mode 100644 index 0000000..456eff6 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java @@ -0,0 +1,125 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.client; + +import com.teragrep.rlp_01.RelpBatch; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class ManagedRelpConnection implements IManagedRelpConnection { + + private final IRelpConnection relpConnection; + private boolean hasConnected; + + + public ManagedRelpConnection(IRelpConnection relpConnection) { + this.relpConnection = relpConnection; + this.hasConnected = false; + } + + @Override + public void forceReconnect() { + tearDown(); + connect(); + } + + @Override + public void reconnect() { + close(); + connect(); + } + + @Override + public void connect() { + boolean connected = false; + while (!connected) { + try { + this.hasConnected = true; + connected = relpConnection + .connect(relpConnection.relpConfig().relpTarget, relpConnection.relpConfig().relpPort); + } + catch (Exception e) { + System.err.println( + "Failed to connect to relp server <["+relpConnection.relpConfig().relpTarget+"]>:<["+relpConnection.relpConfig().relpPort+"]>: <"+e.getMessage()+">"); + + try { + Thread.sleep(relpConnection.relpConfig().relpReconnectInterval); + } + catch (InterruptedException exception) { + System.err.println("Reconnection timer interrupted, reconnecting now"); + } + } + } + } + + private void tearDown() { + /* + TODO remove: wouldn't need a check hasConnected but there is a bug in RLP-01 tearDown() + see https://github.com/teragrep/rlp_01/issues/63 for further info + */ + if (hasConnected) { + relpConnection.tearDown(); + } + } + + @Override + public void ensureSent(byte[] bytes) { + // avoid unnecessary exception for fresh connections + if (!hasConnected) { + connect(); + } + + final RelpBatch relpBatch = new RelpBatch(); + relpBatch.insert(bytes); + boolean notSent = true; + while (notSent) { + try { + relpConnection.commit(relpBatch); + } + catch (IllegalStateException | IOException | TimeoutException e) { + System.err.println("Exception <"+e.getMessage()+"> while sending relpBatch. Will retry"); + } + if (!relpBatch.verifyTransactionAll()) { + relpBatch.retryAllFailed(); + this.tearDown(); + this.connect(); + } + else { + notSent = false; + } + } + } + + @Override + public boolean isStub() { + return false; + } + + @Override + public void close() { + try { + this.relpConnection.disconnect(); + } + catch (IllegalStateException | IOException | TimeoutException e) { + System.err.println("Forcefully closing connection due to exception <"+e.getMessage()+">"); + } + finally { + tearDown(); + } + } +} diff --git a/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java new file mode 100644 index 0000000..4d24793 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java @@ -0,0 +1,52 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.client; + +import java.io.IOException; + +public class ManagedRelpConnectionStub implements IManagedRelpConnection { + + @Override + public void reconnect() { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + + @Override + public void connect() throws IOException { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + + @Override + public void forceReconnect() { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + + @Override + public void ensureSent(byte[] bytes) { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + + @Override + public boolean isStub() { + return true; + } + + @Override + public void close() { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } +} diff --git a/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java new file mode 100644 index 0000000..25acd74 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java @@ -0,0 +1,66 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.client; + +import java.io.IOException; + +public class RebindableRelpConnection implements IManagedRelpConnection { + private final IManagedRelpConnection managedRelpConnection; + private int recordsSent; + private final int rebindRequestAmount; + + public RebindableRelpConnection(IManagedRelpConnection managedRelpConnection, int rebindRequestAmount) { + this.managedRelpConnection = managedRelpConnection; + this.recordsSent = 0; + this.rebindRequestAmount = rebindRequestAmount; + } + + @Override + public void reconnect() { + managedRelpConnection.reconnect(); + } + + @Override + public void connect() throws IOException { + managedRelpConnection.connect(); + } + + @Override + public void forceReconnect() { + managedRelpConnection.forceReconnect(); + } + + @Override + public void ensureSent(byte[] bytes) { + if (recordsSent >= rebindRequestAmount) { + reconnect(); + recordsSent = 0; + } + managedRelpConnection.ensureSent(bytes); + recordsSent++; + } + + @Override + public boolean isStub() { + return false; + } + + @Override + public void close() throws IOException { + managedRelpConnection.close(); + } +} diff --git a/src/main/java/com/teragrep/rlp_01/client/RelpConfig.java b/src/main/java/com/teragrep/rlp_01/client/RelpConfig.java new file mode 100644 index 0000000..fb85d9e --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/RelpConfig.java @@ -0,0 +1,23 @@ +package com.teragrep.rlp_01.client; + +import java.time.Duration; + +public class RelpConfig { + public final String relpTarget; + public final int relpPort; + public final int relpReconnectInterval; + public final int rebindRequestAmount; + public final boolean rebindEnabled; + public final Duration maxIdle; + public final boolean maxIdleEnabled; + + public RelpConfig(String relpTarget, int relpPort, int relpReconnectInterval, int rebindRequestAmount, boolean rebindEnabled, Duration maxIdle, boolean maxIdleEnabled) { + this.relpTarget = relpTarget; + this.relpPort = relpPort; + this.relpReconnectInterval = relpReconnectInterval; + this.rebindRequestAmount = rebindRequestAmount; + this.rebindEnabled = rebindEnabled; + this.maxIdle = maxIdle; + this.maxIdleEnabled = maxIdleEnabled; + } +} diff --git a/src/main/java/com/teragrep/rlp_01/client/RelpConnectionFactory.java b/src/main/java/com/teragrep/rlp_01/client/RelpConnectionFactory.java new file mode 100644 index 0000000..e35a106 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/RelpConnectionFactory.java @@ -0,0 +1,58 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.client; + +import com.teragrep.rlp_01.RelpConnection; + +import java.util.function.Supplier; + +public class RelpConnectionFactory implements Supplier { + + private final RelpConfig relpConfig; + private final SSLContextSupplier sslContextSupplier; + + public RelpConnectionFactory(RelpConfig relpConfig) { + this(relpConfig, new SSLContextSupplierStub()); + } + public RelpConnectionFactory(RelpConfig relpConfig, SSLContextSupplier sslContextSupplier) { + this.relpConfig = relpConfig; + this.sslContextSupplier = sslContextSupplier; + } + + @Override + public IManagedRelpConnection get() { + IRelpConnection relpConnection; + if (sslContextSupplier.isStub()) { + relpConnection = new RelpConnectionWithConfig(new RelpConnection(), relpConfig); + } + else { + relpConnection = new RelpConnectionWithConfig(new RelpConnection(() -> sslContextSupplier.get().createSSLEngine()), relpConfig); + } + + IManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection); + + if (relpConfig.rebindEnabled) { + managedRelpConnection = new RebindableRelpConnection(managedRelpConnection, relpConfig.rebindRequestAmount); + } + + if (relpConfig.maxIdleEnabled) { + managedRelpConnection = new RenewableRelpConnection(managedRelpConnection, relpConfig.maxIdle); + } + + return managedRelpConnection; + } +} diff --git a/src/main/java/com/teragrep/rlp_01/client/RelpConnectionWithConfig.java b/src/main/java/com/teragrep/rlp_01/client/RelpConnectionWithConfig.java new file mode 100644 index 0000000..b20264a --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/RelpConnectionWithConfig.java @@ -0,0 +1,115 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.client; + +import com.teragrep.rlp_01.RelpBatch; +import com.teragrep.rlp_01.RelpConnection; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class RelpConnectionWithConfig implements IRelpConnection { + + private final RelpConnection relpConnection; + private final RelpConfig relpConfig; + + public RelpConnectionWithConfig(RelpConnection relpConnection, RelpConfig relpConfig) { + this.relpConnection = relpConnection; + this.relpConfig = relpConfig; + } + + @Override + public int getReadTimeout() { + return relpConnection.getReadTimeout(); + } + + @Override + public void setReadTimeout(int readTimeout) { + relpConnection.setReadTimeout(readTimeout); + } + + @Override + public int getWriteTimeout() { + return relpConnection.getWriteTimeout(); + } + + @Override + public void setWriteTimeout(int writeTimeout) { + relpConnection.setWriteTimeout(writeTimeout); + } + + @Override + public int getConnectionTimeout() { + return relpConnection.getConnectionTimeout(); + } + + @Override + public void setConnectionTimeout(int timeout) { + relpConnection.setConnectionTimeout(timeout); + } + + @Override + public void setKeepAlive(boolean on) { + relpConnection.setKeepAlive(on); + } + + @Override + public int getRxBufferSize() { + return relpConnection.getRxBufferSize(); + } + + @Override + public void setRxBufferSize(int size) { + relpConnection.setRxBufferSize(size); + } + + @Override + public int getTxBufferSize() { + return relpConnection.getTxBufferSize(); + } + + @Override + public void setTxBufferSize(int size) { + relpConnection.setTxBufferSize(size); + } + + @Override + public boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException { + return relpConnection.connect(hostname, port); + } + + @Override + public void tearDown() { + relpConnection.tearDown(); + } + + @Override + public boolean disconnect() throws IOException, IllegalStateException, TimeoutException { + return relpConnection.disconnect(); + } + + @Override + public void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException { + relpConnection.commit(relpBatch); + } + + @Override + public RelpConfig relpConfig() { + return relpConfig; + } + +} diff --git a/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java new file mode 100644 index 0000000..9d73bcd --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java @@ -0,0 +1,55 @@ +package com.teragrep.rlp_01.client; + +import java.io.IOException; +import java.time.Instant; +import java.time.Duration; + +public class RenewableRelpConnection implements IManagedRelpConnection { + + private final IManagedRelpConnection managedRelpConnection; + private final Duration maxIdle; + private Instant lastAccess; + + public RenewableRelpConnection(IManagedRelpConnection managedRelpConnection, Duration maxIdle) { + this.managedRelpConnection = managedRelpConnection; + this.maxIdle = maxIdle; + this.lastAccess = Instant.ofEpochSecond(0); + } + + @Override + public void reconnect() { + lastAccess = Instant.now(); + managedRelpConnection.reconnect(); + } + + @Override + public void connect() throws IOException { + lastAccess = Instant.now(); + managedRelpConnection.connect(); + } + + @Override + public void forceReconnect() { + lastAccess = Instant.now(); + managedRelpConnection.forceReconnect(); + } + + @Override + public void ensureSent(byte[] bytes) { + if (lastAccess.plus(maxIdle).isBefore(Instant.now())) { + forceReconnect(); + } + lastAccess = Instant.now(); + managedRelpConnection.ensureSent(bytes); + } + + @Override + public boolean isStub() { + return managedRelpConnection.isStub(); + } + + @Override + public void close() throws IOException { + managedRelpConnection.close(); + } +} diff --git a/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplier.java b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplier.java new file mode 100644 index 0000000..b3ed7df --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplier.java @@ -0,0 +1,13 @@ +package com.teragrep.rlp_01.client; + +import com.teragrep.rlp_01.pool.Stubable; + +import javax.net.ssl.SSLContext; +import java.util.function.Supplier; + +/** + * Wrapper for SSLContext because such do not have stubness property + */ +public interface SSLContextSupplier extends Supplier, Stubable { + SSLContext get(); +} diff --git a/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierKeystore.java b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierKeystore.java new file mode 100644 index 0000000..277f1a3 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierKeystore.java @@ -0,0 +1,38 @@ +package com.teragrep.rlp_01.client; + +import com.teragrep.rlp_01.SSLContextFactory; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.security.GeneralSecurityException; + +/** + * Wrapper for existing SSLContextFactory for providing stubness + */ +public class SSLContextSupplierKeystore implements SSLContextSupplier { + private final String keystorePath; + private final String keystorePassword; + private final String protocol; + + public SSLContextSupplierKeystore(String keystorePath, String keystorePassword, String protocol) { + this.keystorePath = keystorePath; + this.keystorePassword = keystorePassword; + this.protocol = protocol; + } + + @Override + public boolean isStub() { + return false; + } + + @Override + public SSLContext get() { + try { + // TODO refactor static method authenticatedContext to non-static, this is just for version compatibility + return SSLContextFactory.authenticatedContext(keystorePath, keystorePassword, protocol); + } + catch (GeneralSecurityException | IOException exception) { + throw new RuntimeException(exception); + } + } +} diff --git a/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierStub.java b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierStub.java new file mode 100644 index 0000000..4d3439f --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierStub.java @@ -0,0 +1,16 @@ +package com.teragrep.rlp_01.client; + +import javax.net.ssl.SSLContext; + +public class SSLContextSupplierStub implements SSLContextSupplier { + + @Override + public SSLContext get() { + throw new UnsupportedOperationException("stub does not support this."); + } + + @Override + public boolean isStub() { + return true; + } +} diff --git a/src/main/java/com/teragrep/rlp_01/pool/Pool.java b/src/main/java/com/teragrep/rlp_01/pool/Pool.java new file mode 100644 index 0000000..0cc9386 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/pool/Pool.java @@ -0,0 +1,27 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.pool; + +import java.util.function.Supplier; + +public interface Pool extends AutoCloseable, Supplier { + T get(); + + void offer(T object); + + void close(); +} diff --git a/src/main/java/com/teragrep/rlp_01/pool/Poolable.java b/src/main/java/com/teragrep/rlp_01/pool/Poolable.java new file mode 100644 index 0000000..5573206 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/pool/Poolable.java @@ -0,0 +1,23 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.pool; + +import java.io.Closeable; + +public interface Poolable extends Stubable, Closeable { + +} diff --git a/src/main/java/com/teragrep/rlp_01/pool/Stubable.java b/src/main/java/com/teragrep/rlp_01/pool/Stubable.java new file mode 100644 index 0000000..0708179 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/pool/Stubable.java @@ -0,0 +1,22 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.pool; + +public interface Stubable { + + boolean isStub(); +} diff --git a/src/main/java/com/teragrep/rlp_01/pool/UnboundPool.java b/src/main/java/com/teragrep/rlp_01/pool/UnboundPool.java new file mode 100644 index 0000000..a40f344 --- /dev/null +++ b/src/main/java/com/teragrep/rlp_01/pool/UnboundPool.java @@ -0,0 +1,100 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.pool; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +public class UnboundPool implements Pool { + + private final Supplier supplier; + + private final ConcurrentLinkedQueue queue; + + private final T stub; + + private final Lock lock = new ReentrantLock(); + + private final AtomicBoolean close; + + public UnboundPool(final Supplier supplier, T stub) { + this.supplier = supplier; + this.queue = new ConcurrentLinkedQueue<>(); + this.stub = stub; + this.close = new AtomicBoolean(); + } + + @Override + public T get() { + T object; + if (close.get()) { + object = stub; + } + else { + // get or create + object = queue.poll(); + if (object == null) { + object = supplier.get(); + } + } + + return object; + } + + @Override + public void offer(T object) { + if (!object.isStub()) { + queue.add(object); + } + + if (close.get()) { + while (queue.peek() != null) { + if (lock.tryLock()) { + while (true) { + T pooled = queue.poll(); + if (pooled == null) { + break; + } + else { + try { + pooled.close(); + } + catch (Exception exception) { + System.err.println("Exception <" + exception.getMessage() + "> while closing poolable <"+ pooled +">"); + } + } + } + lock.unlock(); + } + else { + break; + } + } + } + } + + @Override + public void close() { + close.set(true); + + // close all that are in the pool right now + offer(stub); + } +} diff --git a/src/test/java/com/teragrep/rlp_01/SendMessageTest.java b/src/test/java/com/teragrep/rlp_01/SendMessageTest.java new file mode 100644 index 0000000..5f44036 --- /dev/null +++ b/src/test/java/com/teragrep/rlp_01/SendMessageTest.java @@ -0,0 +1,208 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01; + +import com.teragrep.net_01.channel.socket.PlainFactory; +import com.teragrep.net_01.eventloop.EventLoop; +import com.teragrep.net_01.eventloop.EventLoopFactory; +import com.teragrep.rlp_03.frame.FrameDelegationClockFactory; +import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate; +import com.teragrep.net_01.server.ServerFactory; +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * These are a copy from rlp_03 test suite + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class SendMessageTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(SendMessageTest.class); + + private final String hostname = "localhost"; + private EventLoop eventLoop; + private Thread eventLoopThread; + + private ExecutorService executorService; + private final int port = 1236; + + private final List messageList = new LinkedList<>(); + + @BeforeAll + public void init() { + EventLoopFactory eventLoopFactory = new EventLoopFactory(); + Assertions.assertDoesNotThrow(() -> eventLoop = eventLoopFactory.create()); + + eventLoopThread = new Thread(eventLoop); + eventLoopThread.start(); + + executorService = Executors.newSingleThreadExecutor(); + ServerFactory serverFactory = new ServerFactory( + eventLoop, + executorService, + new PlainFactory(), + new FrameDelegationClockFactory(() -> new DefaultFrameDelegate((frame) -> messageList.add(frame.relpFrame().payload().toBytes()))) + ); + Assertions.assertDoesNotThrow(() -> serverFactory.create(port)); + } + + @AfterAll + public void cleanup() { + eventLoop.stop(); + executorService.shutdown(); + Assertions.assertDoesNotThrow(() -> eventLoopThread.join()); + } + + @AfterEach + public void clearMessageList() { + // clear received list + messageList.clear(); + } + + @Test + public void testSendMessage() { + RelpConnection relpSession = new RelpConnection(); + Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port)); + String msg = "<14>1 2020-05-15T13:24:03.603Z CFE-16 capsulated - - [CFE-16-metadata@48577 authentication_token=\"AUTH_TOKEN_11111\" channel=\"CHANNEL_11111\" time_source=\"generated\"][CFE-16-origin@48577] \"Hello, world!\"\n"; + byte[] data = msg.getBytes(StandardCharsets.UTF_8); + RelpBatch batch = new RelpBatch(); + long reqId = batch.insert(data); + Assertions.assertDoesNotThrow(() -> relpSession.commit(batch)); + // verify successful transaction + Assertions.assertTrue(batch.verifyTransaction(reqId)); + Assertions.assertDoesNotThrow(relpSession::disconnect); + + // message must equal to what was send + Assertions.assertEquals(msg, new String(messageList.get(0))); + } + + @Test + public void testSendSmallMessage() { + RelpConnection relpSession = new RelpConnection(); + Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port)); + String msg = "<167>Mar 1 01:00:00 1um:\n"; + byte[] data = msg.getBytes(StandardCharsets.UTF_8); + RelpBatch batch = new RelpBatch(); + long reqId = batch.insert(data); + Assertions.assertDoesNotThrow(() -> relpSession.commit(batch)); + // verify successful transaction + Assertions.assertTrue(batch.verifyTransaction(reqId)); + Assertions.assertDoesNotThrow(relpSession::disconnect); + + // message must equal to what was send + Assertions.assertEquals(msg, new String(messageList.get(0))); + } + + @Test + public void testOpenAndCloseSession() { + RelpConnection relpSession = new RelpConnection(); + Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port)); + Assertions.assertDoesNotThrow(relpSession::disconnect); + } + + @Test + public void testSessionCloseTwice() { + RelpConnection relpSession = new RelpConnection(); + Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port)); + Assertions.assertDoesNotThrow(relpSession::disconnect); + Assertions.assertThrows(IllegalStateException.class, relpSession::disconnect); + + } + + @Test + public void clientTestOpenSendClose() { + RelpConnection relpSession = new RelpConnection(); + Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port)); + String msg = "clientTestOpenSendClose"; + byte[] data = msg.getBytes(StandardCharsets.UTF_8); + RelpBatch batch = new RelpBatch(); + batch.insert(data); + Assertions.assertDoesNotThrow(() -> relpSession.commit(batch)); + Assertions.assertTrue(batch.verifyTransactionAll()); + Assertions.assertDoesNotThrow(relpSession::disconnect); + + // message must equal to what was send + Assertions.assertEquals(msg, new String(messageList.get(0))); + } + + @Test + public void clientTestSendTwo() { + RelpConnection relpSession = new RelpConnection(); + relpSession.setConnectionTimeout(5000); + relpSession.setReadTimeout(5000); + relpSession.setWriteTimeout(5000); + Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port)); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("test> Connected"); + } + String msg1 = "clientTestOpenSendClose 1"; + byte[] data1 = msg1.getBytes(StandardCharsets.UTF_8); + RelpBatch batch1 = new RelpBatch(); + batch1.insert(data1); + Assertions.assertDoesNotThrow(() -> relpSession.commit(batch1)); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("test> Committed"); + } + Assertions.assertTrue(batch1.verifyTransactionAll()); + + String msg2 = "clientTestOpenSendClose 2"; + byte[] data2 = msg2.getBytes(StandardCharsets.UTF_8); + RelpBatch batch2 = new RelpBatch(); + batch2.insert(data2); + Assertions.assertDoesNotThrow(() -> relpSession.commit(batch2)); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("test> Committed second"); + } + Assertions.assertTrue(batch1.verifyTransactionAll()); + Assertions.assertDoesNotThrow(relpSession::disconnect); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("test> Disconnected"); + } + + // messages must equal to what was send + Assertions.assertEquals(msg1, new String(messageList.get(0))); + Assertions.assertEquals(msg2, new String(messageList.get(1))); + } + + @Test + public void testSendBatch() { + RelpConnection relpSession = new RelpConnection(); + Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port)); + String msg = "Hello, world!"; + byte[] data = msg.getBytes(StandardCharsets.UTF_8); + int n = 50; + RelpBatch batch = new RelpBatch(); + for (int i = 0; i < n; i++) { + batch.insert(data); + } + Assertions.assertDoesNotThrow(() -> relpSession.commit(batch)); + Assertions.assertTrue(batch.verifyTransactionAll()); + Assertions.assertDoesNotThrow(relpSession::disconnect); + + for (int i = 0; i < n; i++) { + Assertions.assertEquals(msg, new String(messageList.get(i))); + } + } + +} diff --git a/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java b/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java new file mode 100644 index 0000000..0ed9ee4 --- /dev/null +++ b/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java @@ -0,0 +1,303 @@ +package com.teragrep.rlp_01.client; + +import com.teragrep.net_01.channel.socket.PlainFactory; +import com.teragrep.net_01.eventloop.EventLoop; +import com.teragrep.net_01.eventloop.EventLoopFactory; +import com.teragrep.net_01.server.ServerFactory; +import com.teragrep.rlp_01.pool.Pool; +import com.teragrep.rlp_01.pool.UnboundPool; +import com.teragrep.rlp_03.frame.FrameDelegationClockFactory; +import com.teragrep.rlp_03.frame.delegate.EventDelegate; +import com.teragrep.rlp_03.frame.delegate.FrameContext; +import com.teragrep.rlp_03.frame.delegate.FrameDelegate; +import com.teragrep.rlp_03.frame.delegate.event.RelpEvent; +import com.teragrep.rlp_03.frame.delegate.event.RelpEventClose; +import com.teragrep.rlp_03.frame.delegate.event.RelpEventOpen; +import com.teragrep.rlp_03.frame.delegate.event.RelpEventSyslog; +import org.junit.jupiter.api.*; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class ManagedConnectionTest { + private final String hostname = "localhost"; + private final int port = 33601; + + private EventLoop eventLoop; + private Thread eventLoopThread; + + private ExecutorService executorService; + + private final ConcurrentLinkedDeque messageList = new ConcurrentLinkedDeque<>(); + + private final AtomicLong connectionOpenCount = new AtomicLong(); + private final AtomicLong connectionCleanCloseCount = new AtomicLong(); + + @BeforeAll + public void init() { + EventLoopFactory eventLoopFactory = new EventLoopFactory(); + Assertions.assertDoesNotThrow(() -> eventLoop = eventLoopFactory.create()); + + eventLoopThread = new Thread(eventLoop); + eventLoopThread.start(); + + executorService = Executors.newSingleThreadExecutor(); + + Supplier frameDelegateSupplier = () -> { + + Map relpCommandConsumerMap = new HashMap<>(); + + relpCommandConsumerMap.put("close", new RelpEventCloseCounting(connectionCleanCloseCount)); + + relpCommandConsumerMap.put("open", new RelpEventOpenCounting(connectionOpenCount)); + + relpCommandConsumerMap.put("syslog", new RelpEventSyslog((frame) -> messageList.add(frame.relpFrame().payload().toBytes()))); + + return new EventDelegate(relpCommandConsumerMap); + }; + + ServerFactory serverFactory = new ServerFactory( + eventLoop, + executorService, + new PlainFactory(), + new FrameDelegationClockFactory(frameDelegateSupplier) + ); + Assertions.assertDoesNotThrow(() -> serverFactory.create(port)); + } + + @AfterAll + public void cleanup() { + eventLoop.stop(); + executorService.shutdown(); + Assertions.assertDoesNotThrow(() -> eventLoopThread.join()); + } + + + private static class RelpEventCloseCounting extends RelpEvent { + private final AtomicLong closeCount; + private final RelpEventClose relpEventClose; + + RelpEventCloseCounting(AtomicLong closeCount) { + this.closeCount = closeCount; + this.relpEventClose = new RelpEventClose(); + } + @Override + public void accept(FrameContext frameContext) { + relpEventClose.accept(frameContext); + closeCount.incrementAndGet(); + } + } + + private static class RelpEventOpenCounting extends RelpEvent { + private final AtomicLong openCount; + private final RelpEventOpen eventOpen; + + RelpEventOpenCounting(AtomicLong openCount) { + this.openCount = openCount; + this.eventOpen = new RelpEventOpen(); + } + + @Override + public void accept(FrameContext frameContext) { + eventOpen.accept(frameContext); + openCount.incrementAndGet(); + } + } + + + @Test + public void testFactoryProvisionedConnection() { + RelpConfig relpConfig = new RelpConfig( + hostname, + port, + 500, + 0, + false, + Duration.ZERO, + false + ); + + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + + IManagedRelpConnection relpConnection = relpConnectionFactory.get(); + + Assertions.assertDoesNotThrow(relpConnection::connect); + + String heyRelp = "hey this is relp"; + + relpConnection.ensureSent(heyRelp.getBytes(StandardCharsets.UTF_8)); + + Assertions.assertDoesNotThrow(relpConnection::close); + + Assertions.assertEquals(heyRelp, new String(messageList.remove(), StandardCharsets.UTF_8)); + } + + @Test + public void testPooledConnections() { + RelpConfig relpConfig = new RelpConfig( + hostname, + port, + 500, + 0, + false, + Duration.ZERO, + false + ); + + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + + Pool relpConnectionPool = new UnboundPool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); + + int testCycles = 1_000; + CountDownLatch countDownLatch = new CountDownLatch(testCycles); + + for (int i = 0; i < testCycles; i++) { + final String heyRelp = "hey this is relp " + i; + ForkJoinPool.commonPool().submit(() -> { + IManagedRelpConnection connection = relpConnectionPool.get(); + + connection.ensureSent(heyRelp.getBytes(StandardCharsets.UTF_8)); + relpConnectionPool.offer(connection); + countDownLatch.countDown(); + }); + } + + Assertions.assertDoesNotThrow(() -> countDownLatch.await()); + + relpConnectionPool.close(); + + Assertions.assertEquals(testCycles, messageList.size()); + + Pattern heyPattern = Pattern.compile("hey this is relp \\d+"); + while(!messageList.isEmpty()) { + byte[] payload = messageList.removeFirst(); + Assertions.assertTrue(heyPattern.matcher(new String(payload, StandardCharsets.UTF_8)).matches()); + } + + Assertions.assertTrue(connectionOpenCount.get() > 1); + Assertions.assertEquals(connectionOpenCount.get(), connectionCleanCloseCount.get()); + connectionOpenCount.set(0); + connectionCleanCloseCount.set(0); + } + + @Test + public void testPooledRenewedConnections() { + RelpConfig relpConfig = new RelpConfig( + hostname, + port, + 500, + 0, + false, + Duration.of(5, ChronoUnit.MILLIS), + true + ); + + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + + Pool relpConnectionPool = new UnboundPool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); + + int testCycles = 20; + CountDownLatch countDownLatch = new CountDownLatch(testCycles); + + for (int i = 0; i < testCycles; i++) { + final String heyRelp = "hey this is renewed relp " + i; + ForkJoinPool.commonPool().submit(() -> { + IManagedRelpConnection connection = relpConnectionPool.get(); + + // will set timer to 5 millis + connection.ensureSent(heyRelp.getBytes(StandardCharsets.UTF_8)); + // exceed 5 millis + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + relpConnectionPool.offer(connection); + countDownLatch.countDown(); + }); + } + + + Assertions.assertDoesNotThrow(() -> countDownLatch.await()); + + relpConnectionPool.close(); + + Assertions.assertEquals(testCycles, messageList.size()); + + Pattern heyPattern = Pattern.compile("hey this is renewed relp \\d+"); + while(!messageList.isEmpty()) { + byte[] payload = messageList.removeFirst(); + Assertions.assertTrue(heyPattern.matcher(new String(payload, StandardCharsets.UTF_8)).matches()); + } + + Assertions.assertTrue(connectionOpenCount.get() > 1); + // renewable uses forceReconnect + Assertions.assertTrue(connectionCleanCloseCount.get() < connectionOpenCount.get()); + connectionOpenCount.set(0); + connectionCleanCloseCount.set(0); + } + + @Test + public void testPooledReboundConnections() { + RelpConfig relpConfig = new RelpConfig( + hostname, + port, + 500, + 1, + true, + Duration.ZERO, + false + ); + + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + + Pool relpConnectionPool = new UnboundPool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); + + int testCycles = 20; + CountDownLatch countDownLatch = new CountDownLatch(testCycles); + + for (int i = 0; i < testCycles; i++) { + final String heyRelp = "hey this is rebound relp " + i; + ForkJoinPool.commonPool().submit(() -> { + IManagedRelpConnection connection = relpConnectionPool.get(); + + // will set timer to 5 millis + connection.ensureSent(heyRelp.getBytes(StandardCharsets.UTF_8)); + // exceed 5 millis + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + relpConnectionPool.offer(connection); + countDownLatch.countDown(); + }); + } + + + Assertions.assertDoesNotThrow(() -> countDownLatch.await()); + + relpConnectionPool.close(); + + Assertions.assertEquals(testCycles, messageList.size()); + + Pattern heyPattern = Pattern.compile("hey this is rebound relp \\d+"); + while(!messageList.isEmpty()) { + byte[] payload = messageList.removeFirst(); + Assertions.assertTrue(heyPattern.matcher(new String(payload, StandardCharsets.UTF_8)).matches()); + } + + Assertions.assertTrue(connectionOpenCount.get() > 1); + Assertions.assertEquals(connectionOpenCount.get(), connectionCleanCloseCount.get()); + connectionOpenCount.set(0); + connectionCleanCloseCount.set(0); + } +} diff --git a/src/test/java/com/teragrep/rlp_01/pool/PoolTest.java b/src/test/java/com/teragrep/rlp_01/pool/PoolTest.java new file mode 100644 index 0000000..137d20b --- /dev/null +++ b/src/test/java/com/teragrep/rlp_01/pool/PoolTest.java @@ -0,0 +1,109 @@ +/* + Java Reliable Event Logging Protocol Library RLP-01 + Copyright (C) 2021-2024 Suomen Kanuuna Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package com.teragrep.rlp_01.pool; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicLong; + +public class PoolTest { + + @Test + public void testUnboundPool() { + AtomicLong report = new AtomicLong(); + + Pool pool = new UnboundPool<>(() -> new TestPoolableImpl(report), new TestPoolableStub()); + + final int testCycles = 1_000_000; + CountDownLatch countDownLatch = new CountDownLatch(testCycles); + + for (int i = 0; i < testCycles; i++) { + ForkJoinPool.commonPool().submit(() -> { + TestPoolable testPoolable = pool.get(); + testPoolable.increment(); + pool.offer(testPoolable); + countDownLatch.countDown(); + }); + } + + Assertions.assertAll(countDownLatch::await); + + pool.close(); + + Assertions.assertEquals(testCycles, report.get()); + } + + private interface TestPoolable extends Poolable { + void increment(); + } + + private static class TestPoolableImpl implements TestPoolable { + + private final AtomicLong report; + private final List counterList; + + TestPoolableImpl(AtomicLong report) { + this.report = report; + this.counterList = new ArrayList<>(1); + int counter = 0; + this.counterList.add(counter); + } + + @Override + public void increment() { + // unsynchronized list access here to test concurrent modification + int counter = counterList.remove(0); + counter = counter + 1; + counterList.add(counter); + } + + @Override + public boolean isStub() { + return false; + } + + @Override + public void close() throws IOException { + int counter = counterList.get(0); + report.addAndGet(counter); + } + } + + private static class TestPoolableStub implements TestPoolable { + + @Override + public boolean isStub() { + return true; + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("stub does not support this"); + } + + @Override + public void increment() { + throw new UnsupportedOperationException("stub does not support this"); + } + } +}