diff --git a/pom.xml b/pom.xml
index 61082e7..d0e5ec8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,7 +62,7 @@
com.teragrep
rlp_01
- 4.0.3
+ 4.1.0
com.teragrep
@@ -72,7 +72,7 @@
com.teragrep
rlp_03
- 8.0.1
+ 10.0.0
test
@@ -180,6 +180,18 @@
parsson
1.1.7
+
+
+
+ com.azure
+ azure-security-keyvault-jca
+ 2.10.0
+
+
+ org.apache.httpcomponents.core5
+ httpcore5
+ 5.3.1
+
${project.basedir}/target
diff --git a/src/main/java/com/teragrep/aer_02/DefaultOutput.java b/src/main/java/com/teragrep/aer_02/DefaultOutput.java
index 55b0f5b..0c7b90c 100644
--- a/src/main/java/com/teragrep/aer_02/DefaultOutput.java
+++ b/src/main/java/com/teragrep/aer_02/DefaultOutput.java
@@ -46,161 +46,72 @@
package com.teragrep.aer_02;
import com.codahale.metrics.*;
-import com.teragrep.aer_02.config.RelpConfig;
+import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.rlp_01.RelpBatch;
-import com.teragrep.rlp_01.RelpConnection;
+import com.teragrep.rlp_01.client.*;
+import com.teragrep.rlp_01.pool.Pool;
+import com.teragrep.rlp_01.pool.UnboundPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.concurrent.TimeoutException;
-
-import static com.codahale.metrics.MetricRegistry.name;
-
-// TODO unify, this is a copy from cfe_35 which is a copy from rlo_10 with FIXES
final class DefaultOutput implements Output {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultOutput.class);
- private final RelpConnection relpConnection;
+ private final Pool relpConnectionPool;
private final String relpAddress;
private final int relpPort;
- private final int reconnectInterval;
-
- // metrics
- private final Counter records;
- private final Counter bytes;
- private final Counter resends;
- private final Counter connects;
- private final Counter retriedConnects;
- private final Timer sendLatency;
- private final Timer connectLatency;
-
- DefaultOutput(String name, RelpConfig relpConfig, MetricRegistry metricRegistry) {
- this(name, relpConfig, metricRegistry, new RelpConnection());
- }
-
- DefaultOutput(String name, RelpConfig relpConfig, MetricRegistry metricRegistry, RelpConnection relpConnection) {
- this(
- name,
- relpConfig,
- metricRegistry,
- relpConnection,
- new SlidingWindowReservoir(10000),
- new SlidingWindowReservoir(10000)
- );
- }
DefaultOutput(
String name,
- RelpConfig relpConfig,
+ RelpConnectionConfig relpConnectionConfig,
MetricRegistry metricRegistry,
- RelpConnection relpConnection,
- Reservoir sendReservoir,
- Reservoir connectReservoir
+ SSLContextSupplier sslContextSupplier
) {
- this.relpAddress = relpConfig.relpAddress();
- this.relpPort = relpConfig.relpPort();
- this.reconnectInterval = relpConfig.reconnectInterval();
-
- this.relpConnection = relpConnection;
- this.relpConnection.setConnectionTimeout(relpConfig.connectTimeout());
- this.relpConnection.setReadTimeout(relpConfig.readTimeout());
- this.relpConnection.setWriteTimeout(relpConfig.writeTimeout());
-
- this.records = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "records"));
- this.bytes = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "bytes"));
- this.resends = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "resends"));
- this.connects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "connects"));
- this.retriedConnects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "retriedConnects"));
- this.sendLatency = metricRegistry
- .timer(name(DefaultOutput.class, "<[" + name + "]>", "sendLatency"), () -> new Timer(sendReservoir));
- this.connectLatency = metricRegistry
- .timer(name(DefaultOutput.class, "<[" + name + "]>", "connectLatency"), () -> new Timer(connectReservoir));
+ this(
+ relpConnectionConfig,
+ new UnboundPool<>(
+ new ManagedRelpConnectionWithMetricsFactory(
+ relpConnectionConfig.asRelpConfig(),
+ name,
+ metricRegistry,
+ relpConnectionConfig.asSocketConfig(),
+ sslContextSupplier
+ ),
+ new ManagedRelpConnectionStub()
+ )
+ );
+ }
- connect();
+ DefaultOutput(String name, RelpConnectionConfig relpConnectionConfig, MetricRegistry metricRegistry) {
+ this(
+ relpConnectionConfig,
+ new UnboundPool<>(
+ new ManagedRelpConnectionWithMetricsFactory(
+ relpConnectionConfig.asRelpConfig(),
+ name,
+ metricRegistry,
+ relpConnectionConfig.asSocketConfig()
+ ),
+ new ManagedRelpConnectionStub()
+ )
+ );
}
- private void connect() {
- boolean connected = false;
- while (!connected) {
- final Timer.Context context = connectLatency.time(); // reset the time (new context)
- try {
- // coverity[leaked_resource]
- connected = this.relpConnection.connect(relpAddress, relpPort);
- /*
- Not closing the context in case of an exception thrown in .connect() will leave the timer.context
- for garbage collector to remove. This will happen even if the context is closed because of how
- the Timer is implemented.
- */
- context.close(); // manually close here, so the timer is only updated if no exceptions were thrown
- connects.inc();
- }
- catch (IOException | TimeoutException e) {
- LOGGER.error("Exception while connecting to <[{}]>:<[{}]>", relpAddress, relpPort, e);
- }
- catch (UnresolvedAddressException e) {
- LOGGER.error("Can't resolve address of target <[{}]>", relpAddress, e);
- }
+ DefaultOutput(RelpConnectionConfig relpConnectionConfig, Pool relpConnectionPool) {
+ this.relpAddress = relpConnectionConfig.relpAddress();
+ this.relpPort = relpConnectionConfig.relpPort();
- if (!connected) {
- try {
- Thread.sleep(reconnectInterval);
- retriedConnects.inc();
- }
- catch (InterruptedException e) {
- LOGGER
- .warn(
- "Sleep interrupted while waiting for reconnectInterval <[{}]> on <[{}]>:<[{}]>",
- reconnectInterval, relpAddress, relpPort, e
- );
- }
- }
- }
+ this.relpConnectionPool = relpConnectionPool;
}
@Override
public void accept(byte[] syslogMessage) {
- try (final Timer.Context context = sendLatency.time()) {
- RelpBatch batch = new RelpBatch();
- batch.insert(syslogMessage);
-
- boolean allSent = false;
- while (!allSent) {
- try {
- this.relpConnection.commit(batch);
-
- // metrics
- // NOTICE these if batch size changes
- records.inc(1);
- bytes.inc(syslogMessage.length);
-
- }
- catch (IllegalStateException | IOException | TimeoutException e) {
- LOGGER.error("Exception while committing a batch to <[{}]>:<[{}]>", relpAddress, relpPort, e);
- }
- // Check if everything has been sent, retry and reconnect if not.
- if (!batch.verifyTransactionAll()) {
- batch.retryAllFailed();
-
- // metrics
- // NOTICE this if batch size changes
- resends.inc(1);
- relpConnection.tearDown();
- try {
- Thread.sleep(reconnectInterval);
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- connect();
- }
- else {
- allSent = true;
- }
- }
- }
+ RelpBatch batch = new RelpBatch();
+ batch.insert(syslogMessage);
+ IManagedRelpConnection connection = relpConnectionPool.get();
+ connection.ensureSent(syslogMessage);
+ relpConnectionPool.offer(connection);
}
@Override
@@ -208,15 +119,8 @@ public String toString() {
return "DefaultOutput{" + "relpAddress='" + relpAddress + '\'' + ", relpPort=" + relpPort + '}';
}
+ @Override
public void close() {
- try {
- relpConnection.disconnect();
- }
- catch (IOException | TimeoutException e) {
- LOGGER.warn("Exception while disconnecting from <[{}]>:<[{}]>", relpAddress, relpPort, e);
- }
- finally {
- relpConnection.tearDown();
- }
+ relpConnectionPool.close();
}
}
diff --git a/src/main/java/com/teragrep/aer_02/EventDataConsumer.java b/src/main/java/com/teragrep/aer_02/EventDataConsumer.java
index 7eea8d5..9573977 100644
--- a/src/main/java/com/teragrep/aer_02/EventDataConsumer.java
+++ b/src/main/java/com/teragrep/aer_02/EventDataConsumer.java
@@ -47,13 +47,14 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
-import com.teragrep.aer_02.config.RelpConfig;
+import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.aer_02.config.SyslogConfig;
import com.teragrep.aer_02.config.source.Sourceable;
import com.teragrep.rlo_14.Facility;
import com.teragrep.rlo_14.SDElement;
import com.teragrep.rlo_14.Severity;
import com.teragrep.rlo_14.SyslogMessage;
+import com.teragrep.rlp_01.client.SSLContextSupplier;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
@@ -71,10 +72,22 @@ final class EventDataConsumer implements AutoCloseable {
private final SyslogConfig syslogConfig;
private final MetricRegistry metricRegistry;
+ EventDataConsumer(
+ final Sourceable configSource,
+ final String hostname,
+ final MetricRegistry metricRegistry,
+ final SSLContextSupplier sslContextSupplier
+ ) {
+ this(
+ configSource,
+ new DefaultOutput("defaultOutput", new RelpConnectionConfig(configSource), metricRegistry, sslContextSupplier), hostname, metricRegistry
+ );
+ }
+
EventDataConsumer(final Sourceable configSource, final String hostname, final MetricRegistry metricRegistry) {
this(
configSource,
- new DefaultOutput("defaultOutput", new RelpConfig(configSource), metricRegistry),
+ new DefaultOutput("defaultOutput", new RelpConnectionConfig(configSource), metricRegistry),
hostname,
metricRegistry
);
diff --git a/src/main/java/com/teragrep/aer_02/ManagedRelpConnectionWithMetrics.java b/src/main/java/com/teragrep/aer_02/ManagedRelpConnectionWithMetrics.java
new file mode 100644
index 0000000..94ef9c4
--- /dev/null
+++ b/src/main/java/com/teragrep/aer_02/ManagedRelpConnectionWithMetrics.java
@@ -0,0 +1,212 @@
+/*
+ * Teragrep Eventhub Reader as an Azure Function
+ * Copyright (C) 2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+package com.teragrep.aer_02;
+
+import com.codahale.metrics.*;
+import com.teragrep.rlp_01.RelpBatch;
+import com.teragrep.rlp_01.client.IManagedRelpConnection;
+import com.teragrep.rlp_01.client.IRelpConnection;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+public class ManagedRelpConnectionWithMetrics implements IManagedRelpConnection {
+
+ private final IRelpConnection relpConnection;
+ private boolean hasConnected;
+
+ // metrics
+ private final Counter records;
+ private final Counter bytesCnt;
+ private final Counter resends;
+ private final Counter connects;
+ private final Counter retriedConnects;
+ private final Timer sendLatency;
+ private final Timer connectLatency;
+
+ public ManagedRelpConnectionWithMetrics(
+ IRelpConnection relpConnection,
+ String name,
+ MetricRegistry metricRegistry
+ ) {
+ this(
+ relpConnection,
+ name,
+ metricRegistry,
+ new SlidingWindowReservoir(10000),
+ new SlidingWindowReservoir(10000)
+ );
+ }
+
+ public ManagedRelpConnectionWithMetrics(
+ IRelpConnection relpConnection,
+ String name,
+ MetricRegistry metricRegistry,
+ Reservoir sendReservoir,
+ Reservoir connectReservoir
+ ) {
+ this.relpConnection = relpConnection;
+
+ this.records = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "records"));
+ this.bytesCnt = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "bytes"));
+ this.resends = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "resends"));
+ this.connects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "connects"));
+ this.retriedConnects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "retriedConnects"));
+ this.sendLatency = metricRegistry
+ .timer(name(DefaultOutput.class, "<[" + name + "]>", "sendLatency"), () -> new Timer(sendReservoir));
+ this.connectLatency = metricRegistry
+ .timer(name(DefaultOutput.class, "<[" + name + "]>", "connectLatency"), () -> new Timer(connectReservoir));
+
+ this.hasConnected = false;
+ }
+
+ @Override
+ public void forceReconnect() {
+ tearDown();
+ connect();
+ }
+
+ @Override
+ public void reconnect() {
+ close();
+ connect();
+ }
+
+ @Override
+ public void connect() {
+ boolean connected = false;
+ while (!connected) {
+ final Timer.Context context = connectLatency.time();
+ try {
+ this.hasConnected = true;
+ connected = relpConnection
+ .connect(relpConnection.relpConfig().relpTarget, relpConnection.relpConfig().relpPort);
+
+ context.close();
+ connects.inc();
+ }
+ catch (Exception e) {
+ System.err
+ .println(
+ "Failed to connect to relp server <[" + relpConnection.relpConfig().relpTarget + "]>:<["
+ + relpConnection.relpConfig().relpPort + "]>: <" + e.getMessage() + ">"
+ );
+
+ try {
+ Thread.sleep(relpConnection.relpConfig().relpReconnectInterval);
+ retriedConnects.inc();
+ }
+ catch (InterruptedException exception) {
+ System.err.println("Reconnection timer interrupted, reconnecting now");
+ }
+ }
+ }
+ }
+
+ private void tearDown() {
+ /*
+ TODO remove: wouldn't need a check hasConnected but there is a bug in RLP-01 tearDown()
+ see https://github.com/teragrep/rlp_01/issues/63 for further info
+ */
+ if (hasConnected) {
+ relpConnection.tearDown();
+ }
+ }
+
+ @Override
+ public void ensureSent(byte[] bytes) {
+ try (final Timer.Context context = sendLatency.time()) {
+ // avoid unnecessary exception for fresh connections
+ if (!hasConnected) {
+ connect();
+ }
+
+ final RelpBatch relpBatch = new RelpBatch();
+ relpBatch.insert(bytes);
+ boolean notSent = true;
+ while (notSent) {
+ try {
+ relpConnection.commit(relpBatch);
+
+ records.inc(1);
+ bytesCnt.inc(bytes.length);
+ }
+ catch (IllegalStateException | IOException | TimeoutException e) {
+ System.err.println("Exception <" + e.getMessage() + "> while sending relpBatch. Will retry");
+ }
+ if (!relpBatch.verifyTransactionAll()) {
+ relpBatch.retryAllFailed();
+ resends.inc(1);
+ this.tearDown();
+ this.connect();
+ }
+ else {
+ notSent = false;
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isStub() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ try {
+ this.relpConnection.disconnect();
+ }
+ catch (IllegalStateException | IOException | TimeoutException e) {
+ System.err.println("Forcefully closing connection due to exception <" + e.getMessage() + ">");
+ }
+ finally {
+ tearDown();
+ }
+ }
+}
diff --git a/src/main/java/com/teragrep/aer_02/ManagedRelpConnectionWithMetricsFactory.java b/src/main/java/com/teragrep/aer_02/ManagedRelpConnectionWithMetricsFactory.java
new file mode 100644
index 0000000..15f304a
--- /dev/null
+++ b/src/main/java/com/teragrep/aer_02/ManagedRelpConnectionWithMetricsFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Teragrep Eventhub Reader as an Azure Function
+ * Copyright (C) 2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+package com.teragrep.aer_02;
+
+import com.codahale.metrics.MetricRegistry;
+import com.teragrep.rlp_01.RelpConnection;
+import com.teragrep.rlp_01.client.*;
+
+import java.util.function.Supplier;
+
+public class ManagedRelpConnectionWithMetricsFactory implements Supplier {
+
+ private final RelpConfig relpConfig;
+ private final SocketConfig socketConfig;
+ private final SSLContextSupplier sslContextSupplier;
+ private final String name;
+ private final MetricRegistry metricRegistry;
+
+ public ManagedRelpConnectionWithMetricsFactory(String name, MetricRegistry metricRegistry, RelpConfig relpConfig) {
+ this(relpConfig, name, metricRegistry, new SocketConfigDefault());
+ }
+
+ public ManagedRelpConnectionWithMetricsFactory(
+ RelpConfig relpConfig,
+ String name,
+ MetricRegistry metricRegistry,
+ SocketConfig socketConfig
+ ) {
+ this(relpConfig, name, metricRegistry, socketConfig, new SSLContextSupplierStub());
+ }
+
+ public ManagedRelpConnectionWithMetricsFactory(
+ RelpConfig relpConfig,
+ String name,
+ MetricRegistry metricRegistry,
+ SSLContextSupplier sslContextSupplier
+ ) {
+ this(relpConfig, name, metricRegistry, new SocketConfigDefault(), sslContextSupplier);
+ }
+
+ public ManagedRelpConnectionWithMetricsFactory(
+ RelpConfig relpConfig,
+ String name,
+ MetricRegistry metricRegistry,
+ SocketConfig socketConfig,
+ SSLContextSupplier sslContextSupplier
+ ) {
+ this.relpConfig = relpConfig;
+ this.name = name;
+ this.metricRegistry = metricRegistry;
+ this.socketConfig = socketConfig;
+ this.sslContextSupplier = sslContextSupplier;
+ }
+
+ @Override
+ public IManagedRelpConnection get() {
+ IRelpConnection relpConnection;
+ if (sslContextSupplier.isStub()) {
+ relpConnection = new RelpConnectionWithConfig(new RelpConnection(), relpConfig);
+ }
+ else {
+ relpConnection = new RelpConnectionWithConfig(
+ new RelpConnection(() -> sslContextSupplier.get().createSSLEngine()),
+ relpConfig
+ );
+ }
+
+ relpConnection.setReadTimeout(socketConfig.readTimeout());
+ relpConnection.setWriteTimeout(socketConfig.writeTimeout());
+ relpConnection.setConnectionTimeout(socketConfig.connectTimeout());
+ relpConnection.setKeepAlive(socketConfig.keepAlive());
+
+ IManagedRelpConnection managedRelpConnection = new ManagedRelpConnectionWithMetrics(
+ relpConnection,
+ name,
+ metricRegistry
+ );
+
+ if (relpConfig.rebindEnabled) {
+ managedRelpConnection = new RebindableRelpConnection(managedRelpConnection, relpConfig.rebindRequestAmount);
+ }
+
+ if (relpConfig.maxIdleEnabled) {
+ managedRelpConnection = new RenewableRelpConnection(managedRelpConnection, relpConfig.maxIdle);
+ }
+
+ return managedRelpConnection;
+ }
+}
diff --git a/src/main/java/com/teragrep/aer_02/SyslogBridge.java b/src/main/java/com/teragrep/aer_02/SyslogBridge.java
index 4f5601e..a3b1ef3 100644
--- a/src/main/java/com/teragrep/aer_02/SyslogBridge.java
+++ b/src/main/java/com/teragrep/aer_02/SyslogBridge.java
@@ -49,19 +49,18 @@
import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.teragrep.aer_02.config.source.EnvironmentSource;
+import com.teragrep.aer_02.config.source.Sourceable;
import com.teragrep.aer_02.json.JsonRecords;
import com.teragrep.aer_02.metrics.JmxReport;
import com.teragrep.aer_02.metrics.PrometheusReport;
import com.teragrep.aer_02.metrics.Report;
import com.teragrep.aer_02.metrics.Slf4jReport;
+import com.teragrep.aer_02.tls.AzureSSLContextSupplier;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.common.TextFormat;
-import java.io.IOException;
-import java.io.StringWriter;
-import java.io.UncheckedIOException;
-import java.io.Writer;
+import java.io.*;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
@@ -134,11 +133,15 @@ public void eventHubTriggerToSyslog(
}
if (consumer == null) {
- consumer = new EventDataConsumer(
- new EnvironmentSource(),
- new Hostname("localhost").hostname(),
- metricRegistry
- );
+ final Sourceable configSource = new EnvironmentSource();
+ final String hostname = new Hostname("localhost").hostname();
+
+ if (configSource.source("relp.tls.mode", "none").equals("keyVault")) {
+ consumer = new EventDataConsumer(configSource, hostname, metricRegistry, new AzureSSLContextSupplier());
+ }
+ else {
+ consumer = new EventDataConsumer(configSource, hostname, metricRegistry);
+ }
}
for (int index = 0; index < events.length; index++) {
diff --git a/src/main/java/com/teragrep/aer_02/config/RelpConfig.java b/src/main/java/com/teragrep/aer_02/config/RelpConnectionConfig.java
similarity index 61%
rename from src/main/java/com/teragrep/aer_02/config/RelpConfig.java
rename to src/main/java/com/teragrep/aer_02/config/RelpConnectionConfig.java
index e80b11d..33bad1f 100644
--- a/src/main/java/com/teragrep/aer_02/config/RelpConfig.java
+++ b/src/main/java/com/teragrep/aer_02/config/RelpConnectionConfig.java
@@ -46,8 +46,13 @@
package com.teragrep.aer_02.config;
import com.teragrep.aer_02.config.source.Sourceable;
+import com.teragrep.rlp_01.client.RelpConfig;
+import com.teragrep.rlp_01.client.SocketConfig;
+import com.teragrep.rlp_01.client.SocketConfigImpl;
-public final class RelpConfig {
+import java.time.Duration;
+
+public final class RelpConnectionConfig {
private final int connectTimeout;
private final int readTimeout;
@@ -55,25 +60,41 @@ public final class RelpConfig {
private final int reconnectInterval;
private final int port;
private final String address;
+ private final int rebindRequestAmount;
+ private final boolean rebindEnabled;
+ private final Duration maxIdle;
+ private final boolean maxIdleEnabled;
+ private final boolean keepAlive;
- public RelpConfig(final Sourceable configSource) {
+ public RelpConnectionConfig(final Sourceable configSource) {
this(
- Integer.parseInt(configSource.source("relp.connection.timeout", "5000")),
- Integer.parseInt(configSource.source("relp.transaction.read.timeout", "5000")),
- Integer.parseInt(configSource.source("relp.transaction.write.timeout", "5000")),
- Integer.parseInt(configSource.source("relp.connection.retry.interval", "5000")),
+ Integer.parseInt(configSource.source("relp.connection.timeout", "2500")),
+ Integer.parseInt(configSource.source("relp.transaction.read.timeout", "1500")),
+ Integer.parseInt(configSource.source("relp.transaction.write.timeout", "1500")),
+ Integer.parseInt(configSource.source("relp.connection.retry.interval", "500")),
Integer.parseInt(configSource.source("relp.connection.port", "601")),
- configSource.source("relp.connection.address", "localhost")
+ configSource.source("relp.connection.address", "localhost"),
+ Integer.parseInt(configSource.source("relp.rebind.request.amount", "100000")),
+ Boolean.parseBoolean(configSource.source("relp.rebind.enabled", "true")),
+ Duration.parse(configSource.source("relp.max.idle.duration", Duration.ofMillis(150000L).toString())),
+ Boolean.parseBoolean(configSource.source("relp.max.idle.enabled", "false")),
+ Boolean.parseBoolean(configSource.source("relp.connection.keepalive", "true"))
+
);
}
- public RelpConfig(
+ public RelpConnectionConfig(
final int connectTimeout,
final int readTimeout,
final int writeTimeout,
final int reconnectInt,
final int port,
- final String addr
+ final String addr,
+ final int rebindRequestAmount,
+ final boolean rebindEnabled,
+ final Duration maxIdle,
+ final boolean maxIdleEnabled,
+ final boolean keepAlive
) {
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
@@ -81,6 +102,11 @@ public RelpConfig(
this.reconnectInterval = reconnectInt;
this.port = port;
this.address = addr;
+ this.rebindRequestAmount = rebindRequestAmount;
+ this.rebindEnabled = rebindEnabled;
+ this.maxIdle = maxIdle;
+ this.maxIdleEnabled = maxIdleEnabled;
+ this.keepAlive = keepAlive;
}
/**
@@ -124,4 +150,40 @@ public int relpPort() {
public String relpAddress() {
return address;
}
+
+ public boolean maxIdleEnabled() {
+ return maxIdleEnabled;
+ }
+
+ public Duration maxIdle() {
+ return maxIdle;
+ }
+
+ public boolean rebindEnabled() {
+ return rebindEnabled;
+ }
+
+ public int rebindRequestAmount() {
+ return rebindRequestAmount;
+ }
+
+ public boolean keepAlive() {
+ return keepAlive;
+ }
+
+ public RelpConfig asRelpConfig() {
+ return new RelpConfig(
+ relpAddress(),
+ relpPort(),
+ reconnectInterval(),
+ rebindRequestAmount(),
+ rebindEnabled(),
+ maxIdle(),
+ maxIdleEnabled()
+ );
+ }
+
+ public SocketConfig asSocketConfig() {
+ return new SocketConfigImpl(readTimeout(), writeTimeout(), connectTimeout(), keepAlive());
+ }
}
diff --git a/src/main/java/com/teragrep/aer_02/tls/AzureSSLContextSupplier.java b/src/main/java/com/teragrep/aer_02/tls/AzureSSLContextSupplier.java
new file mode 100644
index 0000000..64b6235
--- /dev/null
+++ b/src/main/java/com/teragrep/aer_02/tls/AzureSSLContextSupplier.java
@@ -0,0 +1,86 @@
+/*
+ * Teragrep Eventhub Reader as an Azure Function
+ * Copyright (C) 2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+package com.teragrep.aer_02.tls;
+
+import com.teragrep.rlp_01.client.SSLContextSupplier;
+import com.azure.security.keyvault.jca.*;
+import org.apache.hc.core5.ssl.SSLContexts;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.security.*;
+import java.security.cert.CertificateException;
+
+public class AzureSSLContextSupplier implements SSLContextSupplier {
+
+ @Override
+ public SSLContext get() {
+ KeyVaultJcaProvider jca = new KeyVaultJcaProvider();
+ Security.addProvider(jca);
+ KeyStore keyStore;
+ try {
+ keyStore = KeyVaultKeyStore.getKeyVaultKeyStoreBySystemProperty();
+ }
+ catch (CertificateException | KeyStoreException | NoSuchAlgorithmException | IOException e) {
+ throw new RuntimeException("Error retrieving KeyStore from KeyVault: ", e);
+ }
+
+ SSLContext sslContext;
+ try {
+ sslContext = SSLContexts.custom().loadTrustMaterial(keyStore, (chain, authType) -> true).build();
+ }
+ catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
+ throw new RuntimeException("Error creating SSLContext: ", e);
+ }
+
+ return sslContext;
+ }
+
+ @Override
+ public boolean isStub() {
+ return false;
+ }
+}
diff --git a/src/test/java/com/teragrep/aer_02/ConfigTest.java b/src/test/java/com/teragrep/aer_02/ConfigTest.java
index 6ad5a5c..4f69900 100644
--- a/src/test/java/com/teragrep/aer_02/ConfigTest.java
+++ b/src/test/java/com/teragrep/aer_02/ConfigTest.java
@@ -45,7 +45,7 @@
*/
package com.teragrep.aer_02;
-import com.teragrep.aer_02.config.RelpConfig;
+import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.aer_02.config.SyslogConfig;
import com.teragrep.aer_02.config.source.EnvironmentSource;
import com.teragrep.aer_02.config.source.PropertySource;
@@ -65,7 +65,7 @@ public void testConfigFromProperty() {
@Test
public void testConfigFallback() {
- RelpConfig relpConfig = new RelpConfig(new EnvironmentSource());
- Assertions.assertEquals(1601, relpConfig.relpPort(), "Expected to get fallback value");
+ RelpConnectionConfig relpConnectionConfig = new RelpConnectionConfig(new EnvironmentSource());
+ Assertions.assertEquals(1601, relpConnectionConfig.relpPort(), "Expected to get fallback value");
}
}
diff --git a/src/test/java/com/teragrep/aer_02/DefaultOutputTest.java b/src/test/java/com/teragrep/aer_02/DefaultOutputTest.java
index 1a4200d..1b2cedc 100644
--- a/src/test/java/com/teragrep/aer_02/DefaultOutputTest.java
+++ b/src/test/java/com/teragrep/aer_02/DefaultOutputTest.java
@@ -48,15 +48,18 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;
-import com.teragrep.aer_02.config.RelpConfig;
+import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.aer_02.config.source.PropertySource;
import com.teragrep.aer_02.fakes.ConnectionlessRelpConnectionFake;
+import com.teragrep.rlp_01.client.ManagedRelpConnectionStub;
import com.teragrep.aer_02.fakes.RelpConnectionFake;
import com.teragrep.aer_02.fakes.ThrowingRelpConnectionFake;
import com.teragrep.rlo_14.Facility;
import com.teragrep.rlo_14.Severity;
import com.teragrep.rlo_14.SyslogMessage;
-import com.teragrep.rlp_01.RelpConnection;
+import com.teragrep.rlp_01.client.IManagedRelpConnection;
+import com.teragrep.rlp_01.client.RelpConnectionWithConfig;
+import com.teragrep.rlp_01.pool.UnboundPool;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -82,9 +85,22 @@ public void testSendLatencyMetricIsCapped() { // Should only keep information on
MetricRegistry metricRegistry = new MetricRegistry();
SlidingWindowReservoir sendReservoir = new SlidingWindowReservoir(measurementLimit);
SlidingWindowReservoir connectReservoir = new SlidingWindowReservoir(measurementLimit);
- try (
- DefaultOutput output = new DefaultOutput("defaultOutput", new RelpConfig(new PropertySource()), metricRegistry, new RelpConnectionFake(), sendReservoir, connectReservoir)
- ) {
+
+ UnboundPool pool = new UnboundPool<>(
+ () -> new ManagedRelpConnectionWithMetrics(
+ new RelpConnectionWithConfig(
+ new RelpConnectionFake(),
+ new RelpConnectionConfig(new PropertySource()).asRelpConfig()
+ ),
+ "defaultOutput",
+ metricRegistry,
+ sendReservoir,
+ connectReservoir
+ ),
+ new ManagedRelpConnectionStub()
+ );
+
+ try (DefaultOutput output = new DefaultOutput(new RelpConnectionConfig(new PropertySource()), pool)) {
for (int i = 0; i < measurementLimit + 100; i++) { // send more messages than the limit is
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
@@ -112,10 +128,15 @@ public void testConnectionLatencyMetricIsCapped() { // Should take information o
MetricRegistry metricRegistry = new MetricRegistry();
SlidingWindowReservoir sendReservoir = new SlidingWindowReservoir(measurementLimit);
SlidingWindowReservoir connectReservoir = new SlidingWindowReservoir(measurementLimit);
- RelpConnection relpConnection = new ConnectionlessRelpConnectionFake(reconnections); // use a fake that forces reconnects
- try (
- DefaultOutput output = new DefaultOutput("defaultOutput", new RelpConfig(new PropertySource()), metricRegistry, relpConnection, sendReservoir, connectReservoir)
- ) {
+
+ UnboundPool pool = new UnboundPool<>(
+ () -> new ManagedRelpConnectionWithMetrics(
+ new RelpConnectionWithConfig(new ConnectionlessRelpConnectionFake(reconnections), new RelpConnectionConfig(new PropertySource()).asRelpConfig()), "defaultOutput", metricRegistry, sendReservoir, connectReservoir
+ ),
+ new ManagedRelpConnectionStub()
+ );
+
+ try (DefaultOutput output = new DefaultOutput(new RelpConnectionConfig(new PropertySource()), pool)) {
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}
@@ -136,13 +157,16 @@ public void testConnectionLatencyMetricWithException() { // should not update va
.withMsg("test");
final int reconnections = 10;
-
// set up DefaultOutput
MetricRegistry metricRegistry = new MetricRegistry();
- RelpConnection relpConnection = new ThrowingRelpConnectionFake(reconnections); // use a fake that throws exceptions when connecting
- try (
- DefaultOutput output = new DefaultOutput("defaultOutput", new RelpConfig(new PropertySource()), metricRegistry, relpConnection)
- ) {
+ UnboundPool pool = new UnboundPool<>(
+ () -> new ManagedRelpConnectionWithMetrics(
+ new RelpConnectionWithConfig(new ThrowingRelpConnectionFake(reconnections), new RelpConnectionConfig(new PropertySource()).asRelpConfig()), "defaultOutput", metricRegistry
+ ),
+ new ManagedRelpConnectionStub()
+ );
+
+ try (DefaultOutput output = new DefaultOutput(new RelpConnectionConfig(new PropertySource()), pool)) {
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}
diff --git a/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java b/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java
new file mode 100644
index 0000000..fa8c62a
--- /dev/null
+++ b/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java
@@ -0,0 +1,246 @@
+/*
+ * Teragrep Eventhub Reader as an Azure Function
+ * Copyright (C) 2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+package com.teragrep.aer_02;
+
+import com.codahale.metrics.MetricRegistry;
+import com.teragrep.aer_02.config.source.EnvironmentSource;
+import com.teragrep.aer_02.fakes.PartitionContextFake;
+import com.teragrep.aer_02.fakes.SystemPropsFake;
+import com.teragrep.net_01.channel.socket.TLSFactory;
+import com.teragrep.net_01.eventloop.EventLoop;
+import com.teragrep.net_01.eventloop.EventLoopFactory;
+import com.teragrep.net_01.server.Server;
+import com.teragrep.net_01.server.ServerFactory;
+import com.teragrep.rlo_06.RFC5424Frame;
+import com.teragrep.rlp_01.SSLContextFactory;
+import com.teragrep.rlp_01.client.SSLContextSupplier;
+import com.teragrep.rlp_03.frame.FrameDelegationClockFactory;
+import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
+import com.teragrep.rlp_03.frame.delegate.FrameContext;
+import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.time.ZonedDateTime;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class EventDataConsumerTlsTest {
+
+ private Server server;
+ private EventLoop eventLoop;
+ private Thread eventLoopThread;
+ private ExecutorService executorService;
+ private final List messages = Collections.synchronizedList(new ArrayList<>());
+
+ @BeforeEach
+ void setUp() {
+ messages.clear();
+ this.executorService = Executors.newFixedThreadPool(1);
+ Consumer syslogConsumer = new Consumer<>() {
+
+ @Override
+ public synchronized void accept(FrameContext frameContext) {
+ messages.add(frameContext.relpFrame().payload().toString());
+ }
+ };
+
+ Supplier frameDelegateSupplier = () -> new DefaultFrameDelegate(syslogConsumer);
+
+ // SSL
+ SSLContext sslContext = Assertions
+ .assertDoesNotThrow(
+ () -> SSLContextFactory
+ .authenticatedContext("src/test/resources/keystore-server.jks", "changeit", "TLSv1.3")
+ );
+
+ Function sslEngineFunction = sslContext1 -> {
+ SSLEngine sslEngine = sslContext1.createSSLEngine();
+ sslEngine.setUseClientMode(false);
+ return sslEngine;
+ };
+
+ EventLoopFactory eventLoopFactory = new EventLoopFactory();
+ this.eventLoop = Assertions.assertDoesNotThrow(eventLoopFactory::create);
+
+ this.eventLoopThread = new Thread(eventLoop);
+ eventLoopThread.start();
+
+ ServerFactory serverFactory = new ServerFactory(
+ eventLoop,
+ executorService,
+ new TLSFactory(sslContext, sslEngineFunction),
+ new FrameDelegationClockFactory(frameDelegateSupplier)
+ );
+ this.server = Assertions.assertDoesNotThrow(() -> serverFactory.create(1601));
+ }
+
+ @AfterEach
+ void tearDown() {
+ eventLoop.stop();
+ Assertions.assertDoesNotThrow(() -> eventLoopThread.join());
+ executorService.shutdown();
+ Assertions.assertDoesNotThrow(() -> server.close());
+ }
+
+ @Test
+ void testSyslogBridgeTls() {
+ final EventDataConsumer edc = new EventDataConsumer(
+ new EnvironmentSource(),
+ "localhost",
+ new MetricRegistry(),
+ new SSLContextSupplier() {
+
+ @Override
+ public SSLContext get() {
+ SSLContext rv;
+ try {
+ rv = InternalSSLContextFactory
+ .authenticatedContext(
+ "src/test/resources/keystore-client.jks",
+ "src/test/resources/truststore.jks", "changeit", "changeit", "TLSv1.3"
+ );
+ }
+ catch (GeneralSecurityException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ return rv;
+ }
+
+ @Override
+ public boolean isStub() {
+ return false;
+ }
+ }
+ );
+
+ // Fake data
+ PartitionContextFake pcf = new PartitionContextFake("eventhub.123", "test1", "$Default", "0");
+ Map props = new HashMap<>();
+ List eventDatas = Arrays.asList("event0", "event1", "event2");
+ Map[] propsArray = new Map[] {
+ props, props, props
+ };
+ Map[] sysPropsArray = new Map[] {
+ new SystemPropsFake("0").asMap(), new SystemPropsFake("1").asMap(), new SystemPropsFake("2").asMap()
+ };
+ List offsets = Arrays.asList("0", "1", "2");
+ List enqueuedArray = Arrays.asList("2010-01-01T00:00:00", "2010-01-02T00:00:00", "2010-01-03T00:00:00");
+
+ for (int i = 0; i < eventDatas.size(); i++) {
+ edc
+ .accept(eventDatas.get(i), pcf.asMap(), ZonedDateTime.parse(enqueuedArray.get(i) + "Z"), offsets.get(i), propsArray[i], sysPropsArray[i]);
+ }
+
+ Assertions.assertEquals(3, messages.size());
+
+ int loops = 0;
+ for (String message : messages) {
+ final RFC5424Frame frame = new RFC5424Frame(false);
+ frame.load(new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8)));
+ Assertions.assertTrue(Assertions.assertDoesNotThrow(frame::next));
+ Assertions.assertEquals("localhost.localdomain", frame.hostname.toString());
+ Assertions.assertEquals("aer-02", frame.appName.toString());
+ Assertions.assertEquals(String.valueOf(loops), frame.msgId.toString());
+ loops++;
+ }
+
+ Assertions.assertEquals(3, loops);
+ Assertions.assertDoesNotThrow(edc::close);
+ }
+
+ private static class InternalSSLContextFactory {
+
+ public static SSLContext authenticatedContext(
+ String keystorePath,
+ String truststorePath,
+ String keystorePassword,
+ String truststorePassword,
+ String protocol
+ ) throws GeneralSecurityException, IOException {
+
+ SSLContext sslContext = SSLContext.getInstance(protocol);
+ KeyStore ks = KeyStore.getInstance("JKS");
+ KeyStore ts = KeyStore.getInstance("JKS");
+
+ File ksFile = new File(keystorePath);
+ File tsFile = new File(truststorePath);
+
+ try (FileInputStream ksFileIS = new FileInputStream(ksFile)) {
+ try (FileInputStream tsFileIS = new FileInputStream(tsFile)) {
+ ts.load(tsFileIS, truststorePassword.toCharArray());
+ TrustManagerFactory tmf = TrustManagerFactory
+ .getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(ts);
+
+ ks.load(ksFileIS, keystorePassword.toCharArray());
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(ks, keystorePassword.toCharArray());
+ sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+ return sslContext;
+ }
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/teragrep/aer_02/SyslogBridgeTest.java b/src/test/java/com/teragrep/aer_02/SyslogBridgeTest.java
index f16e324..bc4f014 100644
--- a/src/test/java/com/teragrep/aer_02/SyslogBridgeTest.java
+++ b/src/test/java/com/teragrep/aer_02/SyslogBridgeTest.java
@@ -84,7 +84,7 @@ public final class SyslogBridgeTest {
private EventLoop eventLoop;
private Thread eventLoopThread;
private ExecutorService executorService;
- private final List messages = new ArrayList<>();
+ private final List messages = Collections.synchronizedList(new ArrayList<>());
@BeforeEach
void setup() {
diff --git a/src/test/java/com/teragrep/aer_02/fakes/RelpConnectionFake.java b/src/test/java/com/teragrep/aer_02/fakes/RelpConnectionFake.java
index 41222d8..4ded90b 100644
--- a/src/test/java/com/teragrep/aer_02/fakes/RelpConnectionFake.java
+++ b/src/test/java/com/teragrep/aer_02/fakes/RelpConnectionFake.java
@@ -52,21 +52,61 @@
public class RelpConnectionFake extends RelpConnection {
+ @Override
+ public int getReadTimeout() {
+ return 0;
+ }
+
@Override
public void setReadTimeout(int readTimeout) {
// no-op in fake
}
+ @Override
+ public int getWriteTimeout() {
+ return 0;
+ }
+
@Override
public void setWriteTimeout(int writeTimeout) {
// no-op in fake
}
+ @Override
+ public int getConnectionTimeout() {
+ return 0;
+ }
+
@Override
public void setConnectionTimeout(int timeout) {
// no-op in fake
}
+ @Override
+ public void setKeepAlive(boolean b) {
+ // no-op in fake
+ }
+
+ @Override
+ public int getRxBufferSize() {
+ return 0;
+ }
+
+ @Override
+ public void setRxBufferSize(int i) {
+ // no-op in fake
+ }
+
+ @Override
+ public int getTxBufferSize() {
+ return 0;
+ }
+
+ @Override
+ public void setTxBufferSize(int i) {
+ // no-op in fake
+ }
+
@Override
public boolean connect(String hostname, int port) throws IOException {
return true;
diff --git a/src/test/resources/create-certs.sh b/src/test/resources/create-certs.sh
new file mode 100644
index 0000000..6d759fa
--- /dev/null
+++ b/src/test/resources/create-certs.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+
+# Create CA cert
+openssl genrsa -out ca-key.pem 2048
+openssl req -new -x509 -nodes -days 100000 -key ca-key.pem -out ca-cert.pem -batch -subj '/CN=test-ca.example.com/C=FI'
+
+# truststore, for both client and server
+/usr/lib/jvm/jre-1.8.0-openjdk/bin/keytool -import -trustcacerts -alias "CA" -file ca-cert.pem -keystore truststore.jks -deststorepass changeit --noprompt
+
+# ---------------- SERVER ----------------
+
+# Server certificate
+openssl req -newkey rsa:2048 -nodes -keyout server-key.pem -out server-req.pem -batch -subj '/CN=test-server.example.com/C=FI'
+openssl x509 -req -days 10000 -in server-req.pem -CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial -out server-cert.pem
+rm -f server-req.pem
+
+# server keystore: combine pems
+openssl pkcs12 -export -out combined-server.pfx -inkey server-key.pem -in server-cert.pem -passout pass:changeit
+
+# server keystore: import combined pems
+/usr/lib/jvm/jre-1.8.0-openjdk/bin/keytool -importkeystore -srckeystore combined-server.pfx -srcstoretype PKCS12 -srcstorepass changeit -deststorepass changeit -destkeypass changeit -destkeystore keystore-server.jks
+rm -f combined-server.pfx
+
+# ---------------- CLIENT ----------------
+
+# client certificate
+openssl req -newkey rsa:2048 -days 10000 -nodes -keyout client-key.pem -out client-req.pem -batch -subj '/CN=test-client.example.com/C=FI'
+openssl x509 -req -days 10000 -in client-req.pem -CA ca-cert.pem -CAkey ca-key.pem -CAserial ca-cert.srl -out client-cert.pem
+rm -f client-req.pem
+
+# client keystore: combine pems
+openssl pkcs12 -export -out combined-client.pfx -inkey client-key.pem -in client-cert.pem -passout pass:changeit
+
+# client keystore: import combined pems
+/usr/lib/jvm/jre-1.8.0-openjdk/bin/keytool -importkeystore -srckeystore combined-client.pfx -srcstoretype PKCS12 -srcstorepass changeit -deststorepass changeit -destkeypass changeit -destkeystore keystore-client.jks
+rm -f combined-client.pfx