diff --git a/pom.xml b/pom.xml
index dc850a3..21541b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,18 @@
test
${junit.version}
+
+ com.teragrep
+ rlp_03
+ 9.0.0
+ test
+
+
+ org.slf4j
+ slf4j-simple
+ 2.0.13
+ test
+
diff --git a/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java
new file mode 100644
index 0000000..07bc935
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java
@@ -0,0 +1,29 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.client;
+
+import com.teragrep.rlp_01.pool.Poolable;
+
+import java.io.IOException;
+
+public interface IManagedRelpConnection extends Poolable {
+ void reconnect();
+
+ void connect() throws IOException;
+ void forceReconnect();
+ void ensureSent(byte[] bytes);
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/IRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/IRelpConnection.java
new file mode 100644
index 0000000..ed0171c
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/IRelpConnection.java
@@ -0,0 +1,58 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.client;
+
+import com.teragrep.rlp_01.RelpBatch;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+// TODO refactor RelpConnection into an interface and RelpConnectionImpl and remove this
+public interface IRelpConnection {
+
+ int getReadTimeout();
+
+ void setReadTimeout(int readTimeout);
+
+ int getWriteTimeout();
+
+ void setWriteTimeout(int writeTimeout);
+
+ int getConnectionTimeout();
+
+ void setConnectionTimeout(int timeout);
+
+ void setKeepAlive(boolean on);
+
+ int getRxBufferSize();
+
+ void setRxBufferSize(int size);
+
+ int getTxBufferSize();
+
+ void setTxBufferSize(int size);
+
+ boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException;
+
+ void tearDown();
+
+ boolean disconnect() throws IOException, IllegalStateException, TimeoutException;
+
+ void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException;
+
+ RelpConfig relpConfig();
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java
new file mode 100644
index 0000000..456eff6
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java
@@ -0,0 +1,125 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.client;
+
+import com.teragrep.rlp_01.RelpBatch;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+public class ManagedRelpConnection implements IManagedRelpConnection {
+
+ private final IRelpConnection relpConnection;
+ private boolean hasConnected;
+
+
+ public ManagedRelpConnection(IRelpConnection relpConnection) {
+ this.relpConnection = relpConnection;
+ this.hasConnected = false;
+ }
+
+ @Override
+ public void forceReconnect() {
+ tearDown();
+ connect();
+ }
+
+ @Override
+ public void reconnect() {
+ close();
+ connect();
+ }
+
+ @Override
+ public void connect() {
+ boolean connected = false;
+ while (!connected) {
+ try {
+ this.hasConnected = true;
+ connected = relpConnection
+ .connect(relpConnection.relpConfig().relpTarget, relpConnection.relpConfig().relpPort);
+ }
+ catch (Exception e) {
+ System.err.println(
+ "Failed to connect to relp server <["+relpConnection.relpConfig().relpTarget+"]>:<["+relpConnection.relpConfig().relpPort+"]>: <"+e.getMessage()+">");
+
+ try {
+ Thread.sleep(relpConnection.relpConfig().relpReconnectInterval);
+ }
+ catch (InterruptedException exception) {
+ System.err.println("Reconnection timer interrupted, reconnecting now");
+ }
+ }
+ }
+ }
+
+ private void tearDown() {
+ /*
+ TODO remove: wouldn't need a check hasConnected but there is a bug in RLP-01 tearDown()
+ see https://github.com/teragrep/rlp_01/issues/63 for further info
+ */
+ if (hasConnected) {
+ relpConnection.tearDown();
+ }
+ }
+
+ @Override
+ public void ensureSent(byte[] bytes) {
+ // avoid unnecessary exception for fresh connections
+ if (!hasConnected) {
+ connect();
+ }
+
+ final RelpBatch relpBatch = new RelpBatch();
+ relpBatch.insert(bytes);
+ boolean notSent = true;
+ while (notSent) {
+ try {
+ relpConnection.commit(relpBatch);
+ }
+ catch (IllegalStateException | IOException | TimeoutException e) {
+ System.err.println("Exception <"+e.getMessage()+"> while sending relpBatch. Will retry");
+ }
+ if (!relpBatch.verifyTransactionAll()) {
+ relpBatch.retryAllFailed();
+ this.tearDown();
+ this.connect();
+ }
+ else {
+ notSent = false;
+ }
+ }
+ }
+
+ @Override
+ public boolean isStub() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ try {
+ this.relpConnection.disconnect();
+ }
+ catch (IllegalStateException | IOException | TimeoutException e) {
+ System.err.println("Forcefully closing connection due to exception <"+e.getMessage()+">");
+ }
+ finally {
+ tearDown();
+ }
+ }
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java
new file mode 100644
index 0000000..4d24793
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java
@@ -0,0 +1,52 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.client;
+
+import java.io.IOException;
+
+public class ManagedRelpConnectionStub implements IManagedRelpConnection {
+
+ @Override
+ public void reconnect() {
+ throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
+ }
+
+ @Override
+ public void connect() throws IOException {
+ throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
+ }
+
+ @Override
+ public void forceReconnect() {
+ throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
+ }
+
+ @Override
+ public void ensureSent(byte[] bytes) {
+ throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
+ }
+
+ @Override
+ public boolean isStub() {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
+ }
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java
new file mode 100644
index 0000000..25acd74
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java
@@ -0,0 +1,66 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.client;
+
+import java.io.IOException;
+
+public class RebindableRelpConnection implements IManagedRelpConnection {
+ private final IManagedRelpConnection managedRelpConnection;
+ private int recordsSent;
+ private final int rebindRequestAmount;
+
+ public RebindableRelpConnection(IManagedRelpConnection managedRelpConnection, int rebindRequestAmount) {
+ this.managedRelpConnection = managedRelpConnection;
+ this.recordsSent = 0;
+ this.rebindRequestAmount = rebindRequestAmount;
+ }
+
+ @Override
+ public void reconnect() {
+ managedRelpConnection.reconnect();
+ }
+
+ @Override
+ public void connect() throws IOException {
+ managedRelpConnection.connect();
+ }
+
+ @Override
+ public void forceReconnect() {
+ managedRelpConnection.forceReconnect();
+ }
+
+ @Override
+ public void ensureSent(byte[] bytes) {
+ if (recordsSent >= rebindRequestAmount) {
+ reconnect();
+ recordsSent = 0;
+ }
+ managedRelpConnection.ensureSent(bytes);
+ recordsSent++;
+ }
+
+ @Override
+ public boolean isStub() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ managedRelpConnection.close();
+ }
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/RelpConfig.java b/src/main/java/com/teragrep/rlp_01/client/RelpConfig.java
new file mode 100644
index 0000000..fb85d9e
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/RelpConfig.java
@@ -0,0 +1,23 @@
+package com.teragrep.rlp_01.client;
+
+import java.time.Duration;
+
+public class RelpConfig {
+ public final String relpTarget;
+ public final int relpPort;
+ public final int relpReconnectInterval;
+ public final int rebindRequestAmount;
+ public final boolean rebindEnabled;
+ public final Duration maxIdle;
+ public final boolean maxIdleEnabled;
+
+ public RelpConfig(String relpTarget, int relpPort, int relpReconnectInterval, int rebindRequestAmount, boolean rebindEnabled, Duration maxIdle, boolean maxIdleEnabled) {
+ this.relpTarget = relpTarget;
+ this.relpPort = relpPort;
+ this.relpReconnectInterval = relpReconnectInterval;
+ this.rebindRequestAmount = rebindRequestAmount;
+ this.rebindEnabled = rebindEnabled;
+ this.maxIdle = maxIdle;
+ this.maxIdleEnabled = maxIdleEnabled;
+ }
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/RelpConnectionFactory.java b/src/main/java/com/teragrep/rlp_01/client/RelpConnectionFactory.java
new file mode 100644
index 0000000..e35a106
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/RelpConnectionFactory.java
@@ -0,0 +1,58 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.client;
+
+import com.teragrep.rlp_01.RelpConnection;
+
+import java.util.function.Supplier;
+
+public class RelpConnectionFactory implements Supplier {
+
+ private final RelpConfig relpConfig;
+ private final SSLContextSupplier sslContextSupplier;
+
+ public RelpConnectionFactory(RelpConfig relpConfig) {
+ this(relpConfig, new SSLContextSupplierStub());
+ }
+ public RelpConnectionFactory(RelpConfig relpConfig, SSLContextSupplier sslContextSupplier) {
+ this.relpConfig = relpConfig;
+ this.sslContextSupplier = sslContextSupplier;
+ }
+
+ @Override
+ public IManagedRelpConnection get() {
+ IRelpConnection relpConnection;
+ if (sslContextSupplier.isStub()) {
+ relpConnection = new RelpConnectionWithConfig(new RelpConnection(), relpConfig);
+ }
+ else {
+ relpConnection = new RelpConnectionWithConfig(new RelpConnection(() -> sslContextSupplier.get().createSSLEngine()), relpConfig);
+ }
+
+ IManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection);
+
+ if (relpConfig.rebindEnabled) {
+ managedRelpConnection = new RebindableRelpConnection(managedRelpConnection, relpConfig.rebindRequestAmount);
+ }
+
+ if (relpConfig.maxIdleEnabled) {
+ managedRelpConnection = new RenewableRelpConnection(managedRelpConnection, relpConfig.maxIdle);
+ }
+
+ return managedRelpConnection;
+ }
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/RelpConnectionWithConfig.java b/src/main/java/com/teragrep/rlp_01/client/RelpConnectionWithConfig.java
new file mode 100644
index 0000000..b20264a
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/RelpConnectionWithConfig.java
@@ -0,0 +1,115 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.client;
+
+import com.teragrep.rlp_01.RelpBatch;
+import com.teragrep.rlp_01.RelpConnection;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+public class RelpConnectionWithConfig implements IRelpConnection {
+
+ private final RelpConnection relpConnection;
+ private final RelpConfig relpConfig;
+
+ public RelpConnectionWithConfig(RelpConnection relpConnection, RelpConfig relpConfig) {
+ this.relpConnection = relpConnection;
+ this.relpConfig = relpConfig;
+ }
+
+ @Override
+ public int getReadTimeout() {
+ return relpConnection.getReadTimeout();
+ }
+
+ @Override
+ public void setReadTimeout(int readTimeout) {
+ relpConnection.setReadTimeout(readTimeout);
+ }
+
+ @Override
+ public int getWriteTimeout() {
+ return relpConnection.getWriteTimeout();
+ }
+
+ @Override
+ public void setWriteTimeout(int writeTimeout) {
+ relpConnection.setWriteTimeout(writeTimeout);
+ }
+
+ @Override
+ public int getConnectionTimeout() {
+ return relpConnection.getConnectionTimeout();
+ }
+
+ @Override
+ public void setConnectionTimeout(int timeout) {
+ relpConnection.setConnectionTimeout(timeout);
+ }
+
+ @Override
+ public void setKeepAlive(boolean on) {
+ relpConnection.setKeepAlive(on);
+ }
+
+ @Override
+ public int getRxBufferSize() {
+ return relpConnection.getRxBufferSize();
+ }
+
+ @Override
+ public void setRxBufferSize(int size) {
+ relpConnection.setRxBufferSize(size);
+ }
+
+ @Override
+ public int getTxBufferSize() {
+ return relpConnection.getTxBufferSize();
+ }
+
+ @Override
+ public void setTxBufferSize(int size) {
+ relpConnection.setTxBufferSize(size);
+ }
+
+ @Override
+ public boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException {
+ return relpConnection.connect(hostname, port);
+ }
+
+ @Override
+ public void tearDown() {
+ relpConnection.tearDown();
+ }
+
+ @Override
+ public boolean disconnect() throws IOException, IllegalStateException, TimeoutException {
+ return relpConnection.disconnect();
+ }
+
+ @Override
+ public void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException {
+ relpConnection.commit(relpBatch);
+ }
+
+ @Override
+ public RelpConfig relpConfig() {
+ return relpConfig;
+ }
+
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java
new file mode 100644
index 0000000..9d73bcd
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java
@@ -0,0 +1,55 @@
+package com.teragrep.rlp_01.client;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.Duration;
+
+public class RenewableRelpConnection implements IManagedRelpConnection {
+
+ private final IManagedRelpConnection managedRelpConnection;
+ private final Duration maxIdle;
+ private Instant lastAccess;
+
+ public RenewableRelpConnection(IManagedRelpConnection managedRelpConnection, Duration maxIdle) {
+ this.managedRelpConnection = managedRelpConnection;
+ this.maxIdle = maxIdle;
+ this.lastAccess = Instant.ofEpochSecond(0);
+ }
+
+ @Override
+ public void reconnect() {
+ lastAccess = Instant.now();
+ managedRelpConnection.reconnect();
+ }
+
+ @Override
+ public void connect() throws IOException {
+ lastAccess = Instant.now();
+ managedRelpConnection.connect();
+ }
+
+ @Override
+ public void forceReconnect() {
+ lastAccess = Instant.now();
+ managedRelpConnection.forceReconnect();
+ }
+
+ @Override
+ public void ensureSent(byte[] bytes) {
+ if (lastAccess.plus(maxIdle).isBefore(Instant.now())) {
+ forceReconnect();
+ }
+ lastAccess = Instant.now();
+ managedRelpConnection.ensureSent(bytes);
+ }
+
+ @Override
+ public boolean isStub() {
+ return managedRelpConnection.isStub();
+ }
+
+ @Override
+ public void close() throws IOException {
+ managedRelpConnection.close();
+ }
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplier.java b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplier.java
new file mode 100644
index 0000000..b3ed7df
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplier.java
@@ -0,0 +1,13 @@
+package com.teragrep.rlp_01.client;
+
+import com.teragrep.rlp_01.pool.Stubable;
+
+import javax.net.ssl.SSLContext;
+import java.util.function.Supplier;
+
+/**
+ * Wrapper for SSLContext because such do not have stubness property
+ */
+public interface SSLContextSupplier extends Supplier, Stubable {
+ SSLContext get();
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierKeystore.java b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierKeystore.java
new file mode 100644
index 0000000..277f1a3
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierKeystore.java
@@ -0,0 +1,38 @@
+package com.teragrep.rlp_01.client;
+
+import com.teragrep.rlp_01.SSLContextFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+
+/**
+ * Wrapper for existing SSLContextFactory for providing stubness
+ */
+public class SSLContextSupplierKeystore implements SSLContextSupplier {
+ private final String keystorePath;
+ private final String keystorePassword;
+ private final String protocol;
+
+ public SSLContextSupplierKeystore(String keystorePath, String keystorePassword, String protocol) {
+ this.keystorePath = keystorePath;
+ this.keystorePassword = keystorePassword;
+ this.protocol = protocol;
+ }
+
+ @Override
+ public boolean isStub() {
+ return false;
+ }
+
+ @Override
+ public SSLContext get() {
+ try {
+ // TODO refactor static method authenticatedContext to non-static, this is just for version compatibility
+ return SSLContextFactory.authenticatedContext(keystorePath, keystorePassword, protocol);
+ }
+ catch (GeneralSecurityException | IOException exception) {
+ throw new RuntimeException(exception);
+ }
+ }
+}
diff --git a/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierStub.java b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierStub.java
new file mode 100644
index 0000000..4d3439f
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/client/SSLContextSupplierStub.java
@@ -0,0 +1,16 @@
+package com.teragrep.rlp_01.client;
+
+import javax.net.ssl.SSLContext;
+
+public class SSLContextSupplierStub implements SSLContextSupplier {
+
+ @Override
+ public SSLContext get() {
+ throw new UnsupportedOperationException("stub does not support this.");
+ }
+
+ @Override
+ public boolean isStub() {
+ return true;
+ }
+}
diff --git a/src/main/java/com/teragrep/rlp_01/pool/Pool.java b/src/main/java/com/teragrep/rlp_01/pool/Pool.java
new file mode 100644
index 0000000..0cc9386
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/pool/Pool.java
@@ -0,0 +1,27 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.pool;
+
+import java.util.function.Supplier;
+
+public interface Pool extends AutoCloseable, Supplier {
+ T get();
+
+ void offer(T object);
+
+ void close();
+}
diff --git a/src/main/java/com/teragrep/rlp_01/pool/Poolable.java b/src/main/java/com/teragrep/rlp_01/pool/Poolable.java
new file mode 100644
index 0000000..5573206
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/pool/Poolable.java
@@ -0,0 +1,23 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.pool;
+
+import java.io.Closeable;
+
+public interface Poolable extends Stubable, Closeable {
+
+}
diff --git a/src/main/java/com/teragrep/rlp_01/pool/Stubable.java b/src/main/java/com/teragrep/rlp_01/pool/Stubable.java
new file mode 100644
index 0000000..0708179
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/pool/Stubable.java
@@ -0,0 +1,22 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.pool;
+
+public interface Stubable {
+
+ boolean isStub();
+}
diff --git a/src/main/java/com/teragrep/rlp_01/pool/UnboundPool.java b/src/main/java/com/teragrep/rlp_01/pool/UnboundPool.java
new file mode 100644
index 0000000..a40f344
--- /dev/null
+++ b/src/main/java/com/teragrep/rlp_01/pool/UnboundPool.java
@@ -0,0 +1,100 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.pool;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+public class UnboundPool implements Pool {
+
+ private final Supplier supplier;
+
+ private final ConcurrentLinkedQueue queue;
+
+ private final T stub;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final AtomicBoolean close;
+
+ public UnboundPool(final Supplier supplier, T stub) {
+ this.supplier = supplier;
+ this.queue = new ConcurrentLinkedQueue<>();
+ this.stub = stub;
+ this.close = new AtomicBoolean();
+ }
+
+ @Override
+ public T get() {
+ T object;
+ if (close.get()) {
+ object = stub;
+ }
+ else {
+ // get or create
+ object = queue.poll();
+ if (object == null) {
+ object = supplier.get();
+ }
+ }
+
+ return object;
+ }
+
+ @Override
+ public void offer(T object) {
+ if (!object.isStub()) {
+ queue.add(object);
+ }
+
+ if (close.get()) {
+ while (queue.peek() != null) {
+ if (lock.tryLock()) {
+ while (true) {
+ T pooled = queue.poll();
+ if (pooled == null) {
+ break;
+ }
+ else {
+ try {
+ pooled.close();
+ }
+ catch (Exception exception) {
+ System.err.println("Exception <" + exception.getMessage() + "> while closing poolable <"+ pooled +">");
+ }
+ }
+ }
+ lock.unlock();
+ }
+ else {
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ close.set(true);
+
+ // close all that are in the pool right now
+ offer(stub);
+ }
+}
diff --git a/src/test/java/com/teragrep/rlp_01/SendMessageTest.java b/src/test/java/com/teragrep/rlp_01/SendMessageTest.java
new file mode 100644
index 0000000..5f44036
--- /dev/null
+++ b/src/test/java/com/teragrep/rlp_01/SendMessageTest.java
@@ -0,0 +1,208 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01;
+
+import com.teragrep.net_01.channel.socket.PlainFactory;
+import com.teragrep.net_01.eventloop.EventLoop;
+import com.teragrep.net_01.eventloop.EventLoopFactory;
+import com.teragrep.rlp_03.frame.FrameDelegationClockFactory;
+import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
+import com.teragrep.net_01.server.ServerFactory;
+import org.junit.jupiter.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * These are a copy from rlp_03 test suite
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class SendMessageTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SendMessageTest.class);
+
+ private final String hostname = "localhost";
+ private EventLoop eventLoop;
+ private Thread eventLoopThread;
+
+ private ExecutorService executorService;
+ private final int port = 1236;
+
+ private final List messageList = new LinkedList<>();
+
+ @BeforeAll
+ public void init() {
+ EventLoopFactory eventLoopFactory = new EventLoopFactory();
+ Assertions.assertDoesNotThrow(() -> eventLoop = eventLoopFactory.create());
+
+ eventLoopThread = new Thread(eventLoop);
+ eventLoopThread.start();
+
+ executorService = Executors.newSingleThreadExecutor();
+ ServerFactory serverFactory = new ServerFactory(
+ eventLoop,
+ executorService,
+ new PlainFactory(),
+ new FrameDelegationClockFactory(() -> new DefaultFrameDelegate((frame) -> messageList.add(frame.relpFrame().payload().toBytes())))
+ );
+ Assertions.assertDoesNotThrow(() -> serverFactory.create(port));
+ }
+
+ @AfterAll
+ public void cleanup() {
+ eventLoop.stop();
+ executorService.shutdown();
+ Assertions.assertDoesNotThrow(() -> eventLoopThread.join());
+ }
+
+ @AfterEach
+ public void clearMessageList() {
+ // clear received list
+ messageList.clear();
+ }
+
+ @Test
+ public void testSendMessage() {
+ RelpConnection relpSession = new RelpConnection();
+ Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port));
+ String msg = "<14>1 2020-05-15T13:24:03.603Z CFE-16 capsulated - - [CFE-16-metadata@48577 authentication_token=\"AUTH_TOKEN_11111\" channel=\"CHANNEL_11111\" time_source=\"generated\"][CFE-16-origin@48577] \"Hello, world!\"\n";
+ byte[] data = msg.getBytes(StandardCharsets.UTF_8);
+ RelpBatch batch = new RelpBatch();
+ long reqId = batch.insert(data);
+ Assertions.assertDoesNotThrow(() -> relpSession.commit(batch));
+ // verify successful transaction
+ Assertions.assertTrue(batch.verifyTransaction(reqId));
+ Assertions.assertDoesNotThrow(relpSession::disconnect);
+
+ // message must equal to what was send
+ Assertions.assertEquals(msg, new String(messageList.get(0)));
+ }
+
+ @Test
+ public void testSendSmallMessage() {
+ RelpConnection relpSession = new RelpConnection();
+ Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port));
+ String msg = "<167>Mar 1 01:00:00 1um:\n";
+ byte[] data = msg.getBytes(StandardCharsets.UTF_8);
+ RelpBatch batch = new RelpBatch();
+ long reqId = batch.insert(data);
+ Assertions.assertDoesNotThrow(() -> relpSession.commit(batch));
+ // verify successful transaction
+ Assertions.assertTrue(batch.verifyTransaction(reqId));
+ Assertions.assertDoesNotThrow(relpSession::disconnect);
+
+ // message must equal to what was send
+ Assertions.assertEquals(msg, new String(messageList.get(0)));
+ }
+
+ @Test
+ public void testOpenAndCloseSession() {
+ RelpConnection relpSession = new RelpConnection();
+ Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port));
+ Assertions.assertDoesNotThrow(relpSession::disconnect);
+ }
+
+ @Test
+ public void testSessionCloseTwice() {
+ RelpConnection relpSession = new RelpConnection();
+ Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port));
+ Assertions.assertDoesNotThrow(relpSession::disconnect);
+ Assertions.assertThrows(IllegalStateException.class, relpSession::disconnect);
+
+ }
+
+ @Test
+ public void clientTestOpenSendClose() {
+ RelpConnection relpSession = new RelpConnection();
+ Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port));
+ String msg = "clientTestOpenSendClose";
+ byte[] data = msg.getBytes(StandardCharsets.UTF_8);
+ RelpBatch batch = new RelpBatch();
+ batch.insert(data);
+ Assertions.assertDoesNotThrow(() -> relpSession.commit(batch));
+ Assertions.assertTrue(batch.verifyTransactionAll());
+ Assertions.assertDoesNotThrow(relpSession::disconnect);
+
+ // message must equal to what was send
+ Assertions.assertEquals(msg, new String(messageList.get(0)));
+ }
+
+ @Test
+ public void clientTestSendTwo() {
+ RelpConnection relpSession = new RelpConnection();
+ relpSession.setConnectionTimeout(5000);
+ relpSession.setReadTimeout(5000);
+ relpSession.setWriteTimeout(5000);
+ Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port));
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("test> Connected");
+ }
+ String msg1 = "clientTestOpenSendClose 1";
+ byte[] data1 = msg1.getBytes(StandardCharsets.UTF_8);
+ RelpBatch batch1 = new RelpBatch();
+ batch1.insert(data1);
+ Assertions.assertDoesNotThrow(() -> relpSession.commit(batch1));
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("test> Committed");
+ }
+ Assertions.assertTrue(batch1.verifyTransactionAll());
+
+ String msg2 = "clientTestOpenSendClose 2";
+ byte[] data2 = msg2.getBytes(StandardCharsets.UTF_8);
+ RelpBatch batch2 = new RelpBatch();
+ batch2.insert(data2);
+ Assertions.assertDoesNotThrow(() -> relpSession.commit(batch2));
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("test> Committed second");
+ }
+ Assertions.assertTrue(batch1.verifyTransactionAll());
+ Assertions.assertDoesNotThrow(relpSession::disconnect);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("test> Disconnected");
+ }
+
+ // messages must equal to what was send
+ Assertions.assertEquals(msg1, new String(messageList.get(0)));
+ Assertions.assertEquals(msg2, new String(messageList.get(1)));
+ }
+
+ @Test
+ public void testSendBatch() {
+ RelpConnection relpSession = new RelpConnection();
+ Assertions.assertDoesNotThrow(() -> relpSession.connect(hostname, port));
+ String msg = "Hello, world!";
+ byte[] data = msg.getBytes(StandardCharsets.UTF_8);
+ int n = 50;
+ RelpBatch batch = new RelpBatch();
+ for (int i = 0; i < n; i++) {
+ batch.insert(data);
+ }
+ Assertions.assertDoesNotThrow(() -> relpSession.commit(batch));
+ Assertions.assertTrue(batch.verifyTransactionAll());
+ Assertions.assertDoesNotThrow(relpSession::disconnect);
+
+ for (int i = 0; i < n; i++) {
+ Assertions.assertEquals(msg, new String(messageList.get(i)));
+ }
+ }
+
+}
diff --git a/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java b/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java
new file mode 100644
index 0000000..0ed9ee4
--- /dev/null
+++ b/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java
@@ -0,0 +1,303 @@
+package com.teragrep.rlp_01.client;
+
+import com.teragrep.net_01.channel.socket.PlainFactory;
+import com.teragrep.net_01.eventloop.EventLoop;
+import com.teragrep.net_01.eventloop.EventLoopFactory;
+import com.teragrep.net_01.server.ServerFactory;
+import com.teragrep.rlp_01.pool.Pool;
+import com.teragrep.rlp_01.pool.UnboundPool;
+import com.teragrep.rlp_03.frame.FrameDelegationClockFactory;
+import com.teragrep.rlp_03.frame.delegate.EventDelegate;
+import com.teragrep.rlp_03.frame.delegate.FrameContext;
+import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
+import com.teragrep.rlp_03.frame.delegate.event.RelpEvent;
+import com.teragrep.rlp_03.frame.delegate.event.RelpEventClose;
+import com.teragrep.rlp_03.frame.delegate.event.RelpEventOpen;
+import com.teragrep.rlp_03.frame.delegate.event.RelpEventSyslog;
+import org.junit.jupiter.api.*;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class ManagedConnectionTest {
+ private final String hostname = "localhost";
+ private final int port = 33601;
+
+ private EventLoop eventLoop;
+ private Thread eventLoopThread;
+
+ private ExecutorService executorService;
+
+ private final ConcurrentLinkedDeque messageList = new ConcurrentLinkedDeque<>();
+
+ private final AtomicLong connectionOpenCount = new AtomicLong();
+ private final AtomicLong connectionCleanCloseCount = new AtomicLong();
+
+ @BeforeAll
+ public void init() {
+ EventLoopFactory eventLoopFactory = new EventLoopFactory();
+ Assertions.assertDoesNotThrow(() -> eventLoop = eventLoopFactory.create());
+
+ eventLoopThread = new Thread(eventLoop);
+ eventLoopThread.start();
+
+ executorService = Executors.newSingleThreadExecutor();
+
+ Supplier frameDelegateSupplier = () -> {
+
+ Map relpCommandConsumerMap = new HashMap<>();
+
+ relpCommandConsumerMap.put("close", new RelpEventCloseCounting(connectionCleanCloseCount));
+
+ relpCommandConsumerMap.put("open", new RelpEventOpenCounting(connectionOpenCount));
+
+ relpCommandConsumerMap.put("syslog", new RelpEventSyslog((frame) -> messageList.add(frame.relpFrame().payload().toBytes())));
+
+ return new EventDelegate(relpCommandConsumerMap);
+ };
+
+ ServerFactory serverFactory = new ServerFactory(
+ eventLoop,
+ executorService,
+ new PlainFactory(),
+ new FrameDelegationClockFactory(frameDelegateSupplier)
+ );
+ Assertions.assertDoesNotThrow(() -> serverFactory.create(port));
+ }
+
+ @AfterAll
+ public void cleanup() {
+ eventLoop.stop();
+ executorService.shutdown();
+ Assertions.assertDoesNotThrow(() -> eventLoopThread.join());
+ }
+
+
+ private static class RelpEventCloseCounting extends RelpEvent {
+ private final AtomicLong closeCount;
+ private final RelpEventClose relpEventClose;
+
+ RelpEventCloseCounting(AtomicLong closeCount) {
+ this.closeCount = closeCount;
+ this.relpEventClose = new RelpEventClose();
+ }
+ @Override
+ public void accept(FrameContext frameContext) {
+ relpEventClose.accept(frameContext);
+ closeCount.incrementAndGet();
+ }
+ }
+
+ private static class RelpEventOpenCounting extends RelpEvent {
+ private final AtomicLong openCount;
+ private final RelpEventOpen eventOpen;
+
+ RelpEventOpenCounting(AtomicLong openCount) {
+ this.openCount = openCount;
+ this.eventOpen = new RelpEventOpen();
+ }
+
+ @Override
+ public void accept(FrameContext frameContext) {
+ eventOpen.accept(frameContext);
+ openCount.incrementAndGet();
+ }
+ }
+
+
+ @Test
+ public void testFactoryProvisionedConnection() {
+ RelpConfig relpConfig = new RelpConfig(
+ hostname,
+ port,
+ 500,
+ 0,
+ false,
+ Duration.ZERO,
+ false
+ );
+
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+
+ IManagedRelpConnection relpConnection = relpConnectionFactory.get();
+
+ Assertions.assertDoesNotThrow(relpConnection::connect);
+
+ String heyRelp = "hey this is relp";
+
+ relpConnection.ensureSent(heyRelp.getBytes(StandardCharsets.UTF_8));
+
+ Assertions.assertDoesNotThrow(relpConnection::close);
+
+ Assertions.assertEquals(heyRelp, new String(messageList.remove(), StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testPooledConnections() {
+ RelpConfig relpConfig = new RelpConfig(
+ hostname,
+ port,
+ 500,
+ 0,
+ false,
+ Duration.ZERO,
+ false
+ );
+
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+
+ Pool relpConnectionPool = new UnboundPool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
+
+ int testCycles = 1_000;
+ CountDownLatch countDownLatch = new CountDownLatch(testCycles);
+
+ for (int i = 0; i < testCycles; i++) {
+ final String heyRelp = "hey this is relp " + i;
+ ForkJoinPool.commonPool().submit(() -> {
+ IManagedRelpConnection connection = relpConnectionPool.get();
+
+ connection.ensureSent(heyRelp.getBytes(StandardCharsets.UTF_8));
+ relpConnectionPool.offer(connection);
+ countDownLatch.countDown();
+ });
+ }
+
+ Assertions.assertDoesNotThrow(() -> countDownLatch.await());
+
+ relpConnectionPool.close();
+
+ Assertions.assertEquals(testCycles, messageList.size());
+
+ Pattern heyPattern = Pattern.compile("hey this is relp \\d+");
+ while(!messageList.isEmpty()) {
+ byte[] payload = messageList.removeFirst();
+ Assertions.assertTrue(heyPattern.matcher(new String(payload, StandardCharsets.UTF_8)).matches());
+ }
+
+ Assertions.assertTrue(connectionOpenCount.get() > 1);
+ Assertions.assertEquals(connectionOpenCount.get(), connectionCleanCloseCount.get());
+ connectionOpenCount.set(0);
+ connectionCleanCloseCount.set(0);
+ }
+
+ @Test
+ public void testPooledRenewedConnections() {
+ RelpConfig relpConfig = new RelpConfig(
+ hostname,
+ port,
+ 500,
+ 0,
+ false,
+ Duration.of(5, ChronoUnit.MILLIS),
+ true
+ );
+
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+
+ Pool relpConnectionPool = new UnboundPool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
+
+ int testCycles = 20;
+ CountDownLatch countDownLatch = new CountDownLatch(testCycles);
+
+ for (int i = 0; i < testCycles; i++) {
+ final String heyRelp = "hey this is renewed relp " + i;
+ ForkJoinPool.commonPool().submit(() -> {
+ IManagedRelpConnection connection = relpConnectionPool.get();
+
+ // will set timer to 5 millis
+ connection.ensureSent(heyRelp.getBytes(StandardCharsets.UTF_8));
+ // exceed 5 millis
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ relpConnectionPool.offer(connection);
+ countDownLatch.countDown();
+ });
+ }
+
+
+ Assertions.assertDoesNotThrow(() -> countDownLatch.await());
+
+ relpConnectionPool.close();
+
+ Assertions.assertEquals(testCycles, messageList.size());
+
+ Pattern heyPattern = Pattern.compile("hey this is renewed relp \\d+");
+ while(!messageList.isEmpty()) {
+ byte[] payload = messageList.removeFirst();
+ Assertions.assertTrue(heyPattern.matcher(new String(payload, StandardCharsets.UTF_8)).matches());
+ }
+
+ Assertions.assertTrue(connectionOpenCount.get() > 1);
+ // renewable uses forceReconnect
+ Assertions.assertTrue(connectionCleanCloseCount.get() < connectionOpenCount.get());
+ connectionOpenCount.set(0);
+ connectionCleanCloseCount.set(0);
+ }
+
+ @Test
+ public void testPooledReboundConnections() {
+ RelpConfig relpConfig = new RelpConfig(
+ hostname,
+ port,
+ 500,
+ 1,
+ true,
+ Duration.ZERO,
+ false
+ );
+
+ RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);
+
+ Pool relpConnectionPool = new UnboundPool<>(relpConnectionFactory, new ManagedRelpConnectionStub());
+
+ int testCycles = 20;
+ CountDownLatch countDownLatch = new CountDownLatch(testCycles);
+
+ for (int i = 0; i < testCycles; i++) {
+ final String heyRelp = "hey this is rebound relp " + i;
+ ForkJoinPool.commonPool().submit(() -> {
+ IManagedRelpConnection connection = relpConnectionPool.get();
+
+ // will set timer to 5 millis
+ connection.ensureSent(heyRelp.getBytes(StandardCharsets.UTF_8));
+ // exceed 5 millis
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ relpConnectionPool.offer(connection);
+ countDownLatch.countDown();
+ });
+ }
+
+
+ Assertions.assertDoesNotThrow(() -> countDownLatch.await());
+
+ relpConnectionPool.close();
+
+ Assertions.assertEquals(testCycles, messageList.size());
+
+ Pattern heyPattern = Pattern.compile("hey this is rebound relp \\d+");
+ while(!messageList.isEmpty()) {
+ byte[] payload = messageList.removeFirst();
+ Assertions.assertTrue(heyPattern.matcher(new String(payload, StandardCharsets.UTF_8)).matches());
+ }
+
+ Assertions.assertTrue(connectionOpenCount.get() > 1);
+ Assertions.assertEquals(connectionOpenCount.get(), connectionCleanCloseCount.get());
+ connectionOpenCount.set(0);
+ connectionCleanCloseCount.set(0);
+ }
+}
diff --git a/src/test/java/com/teragrep/rlp_01/pool/PoolTest.java b/src/test/java/com/teragrep/rlp_01/pool/PoolTest.java
new file mode 100644
index 0000000..137d20b
--- /dev/null
+++ b/src/test/java/com/teragrep/rlp_01/pool/PoolTest.java
@@ -0,0 +1,109 @@
+/*
+ Java Reliable Event Logging Protocol Library RLP-01
+ Copyright (C) 2021-2024 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package com.teragrep.rlp_01.pool;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PoolTest {
+
+ @Test
+ public void testUnboundPool() {
+ AtomicLong report = new AtomicLong();
+
+ Pool pool = new UnboundPool<>(() -> new TestPoolableImpl(report), new TestPoolableStub());
+
+ final int testCycles = 1_000_000;
+ CountDownLatch countDownLatch = new CountDownLatch(testCycles);
+
+ for (int i = 0; i < testCycles; i++) {
+ ForkJoinPool.commonPool().submit(() -> {
+ TestPoolable testPoolable = pool.get();
+ testPoolable.increment();
+ pool.offer(testPoolable);
+ countDownLatch.countDown();
+ });
+ }
+
+ Assertions.assertAll(countDownLatch::await);
+
+ pool.close();
+
+ Assertions.assertEquals(testCycles, report.get());
+ }
+
+ private interface TestPoolable extends Poolable {
+ void increment();
+ }
+
+ private static class TestPoolableImpl implements TestPoolable {
+
+ private final AtomicLong report;
+ private final List counterList;
+
+ TestPoolableImpl(AtomicLong report) {
+ this.report = report;
+ this.counterList = new ArrayList<>(1);
+ int counter = 0;
+ this.counterList.add(counter);
+ }
+
+ @Override
+ public void increment() {
+ // unsynchronized list access here to test concurrent modification
+ int counter = counterList.remove(0);
+ counter = counter + 1;
+ counterList.add(counter);
+ }
+
+ @Override
+ public boolean isStub() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ int counter = counterList.get(0);
+ report.addAndGet(counter);
+ }
+ }
+
+ private static class TestPoolableStub implements TestPoolable {
+
+ @Override
+ public boolean isStub() {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException("stub does not support this");
+ }
+
+ @Override
+ public void increment() {
+ throw new UnsupportedOperationException("stub does not support this");
+ }
+ }
+}