Skip to content

Commit

Permalink
Add TLS support (#46)
Browse files Browse the repository at this point in the history
* initial tls branch commit: rlp-01 upgrade

* Add "AzureSSLContextSupplier" to supply SSLContext with KeyVault KeyStore. Rename RelpConfig to RelpConnectionConfig. Add "create-certs.sh" script to generate certs and keystores for testing. Add azure-security-keyvault-jca and httpcore5 to pom.xml. Add additional relp configs introduced by new rlp-01 version.

* implement ManagedRelpConnectionWithMetrics, fix DefaultOutput tests

* cleanup pom.xml commented out dependencies

* remove ManagedRelpConnectionStub.java; change relp.rebind.request.amount to '100000'; use synchronized list for messages in tests

* fix remaining defaults in RelpConnectionConfig; add keepAlive to the config, was previously hardcoded to false.
  • Loading branch information
eemhu authored Dec 20, 2024
1 parent 6ed75ae commit e3522a9
Show file tree
Hide file tree
Showing 14 changed files with 951 additions and 181 deletions.
16 changes: 14 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>rlp_01</artifactId>
<version>4.0.3</version>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.teragrep</groupId>
Expand All @@ -72,7 +72,7 @@
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>rlp_03</artifactId>
<version>8.0.1</version>
<version>10.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -180,6 +180,18 @@
<artifactId>parsson</artifactId>
<version>1.1.7</version>
</dependency>
<!-- Azure KeyVault -->
<!-- https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/keyvault/azure-security-keyvault-jca#client-side-ssl -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-security-keyvault-jca</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>5.3.1</version>
</dependency>
</dependencies>
<build>
<directory>${project.basedir}/target</directory>
Expand Down
186 changes: 45 additions & 141 deletions src/main/java/com/teragrep/aer_02/DefaultOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,177 +46,81 @@
package com.teragrep.aer_02;

import com.codahale.metrics.*;
import com.teragrep.aer_02.config.RelpConfig;
import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.RelpConnection;
import com.teragrep.rlp_01.client.*;
import com.teragrep.rlp_01.pool.Pool;
import com.teragrep.rlp_01.pool.UnboundPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.TimeoutException;

import static com.codahale.metrics.MetricRegistry.name;

// TODO unify, this is a copy from cfe_35 which is a copy from rlo_10 with FIXES
final class DefaultOutput implements Output {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultOutput.class);

private final RelpConnection relpConnection;
private final Pool<IManagedRelpConnection> relpConnectionPool;
private final String relpAddress;
private final int relpPort;
private final int reconnectInterval;

// metrics
private final Counter records;
private final Counter bytes;
private final Counter resends;
private final Counter connects;
private final Counter retriedConnects;
private final Timer sendLatency;
private final Timer connectLatency;

DefaultOutput(String name, RelpConfig relpConfig, MetricRegistry metricRegistry) {
this(name, relpConfig, metricRegistry, new RelpConnection());
}

DefaultOutput(String name, RelpConfig relpConfig, MetricRegistry metricRegistry, RelpConnection relpConnection) {
this(
name,
relpConfig,
metricRegistry,
relpConnection,
new SlidingWindowReservoir(10000),
new SlidingWindowReservoir(10000)
);
}

DefaultOutput(
String name,
RelpConfig relpConfig,
RelpConnectionConfig relpConnectionConfig,
MetricRegistry metricRegistry,
RelpConnection relpConnection,
Reservoir sendReservoir,
Reservoir connectReservoir
SSLContextSupplier sslContextSupplier
) {
this.relpAddress = relpConfig.relpAddress();
this.relpPort = relpConfig.relpPort();
this.reconnectInterval = relpConfig.reconnectInterval();

this.relpConnection = relpConnection;
this.relpConnection.setConnectionTimeout(relpConfig.connectTimeout());
this.relpConnection.setReadTimeout(relpConfig.readTimeout());
this.relpConnection.setWriteTimeout(relpConfig.writeTimeout());

this.records = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "records"));
this.bytes = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "bytes"));
this.resends = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "resends"));
this.connects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "connects"));
this.retriedConnects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "retriedConnects"));
this.sendLatency = metricRegistry
.timer(name(DefaultOutput.class, "<[" + name + "]>", "sendLatency"), () -> new Timer(sendReservoir));
this.connectLatency = metricRegistry
.timer(name(DefaultOutput.class, "<[" + name + "]>", "connectLatency"), () -> new Timer(connectReservoir));
this(
relpConnectionConfig,
new UnboundPool<>(
new ManagedRelpConnectionWithMetricsFactory(
relpConnectionConfig.asRelpConfig(),
name,
metricRegistry,
relpConnectionConfig.asSocketConfig(),
sslContextSupplier
),
new ManagedRelpConnectionStub()
)
);
}

connect();
DefaultOutput(String name, RelpConnectionConfig relpConnectionConfig, MetricRegistry metricRegistry) {
this(
relpConnectionConfig,
new UnboundPool<>(
new ManagedRelpConnectionWithMetricsFactory(
relpConnectionConfig.asRelpConfig(),
name,
metricRegistry,
relpConnectionConfig.asSocketConfig()
),
new ManagedRelpConnectionStub()
)
);
}

private void connect() {
boolean connected = false;
while (!connected) {
final Timer.Context context = connectLatency.time(); // reset the time (new context)
try {
// coverity[leaked_resource]
connected = this.relpConnection.connect(relpAddress, relpPort);
/*
Not closing the context in case of an exception thrown in .connect() will leave the timer.context
for garbage collector to remove. This will happen even if the context is closed because of how
the Timer is implemented.
*/
context.close(); // manually close here, so the timer is only updated if no exceptions were thrown
connects.inc();
}
catch (IOException | TimeoutException e) {
LOGGER.error("Exception while connecting to <[{}]>:<[{}]>", relpAddress, relpPort, e);
}
catch (UnresolvedAddressException e) {
LOGGER.error("Can't resolve address of target <[{}]>", relpAddress, e);
}
DefaultOutput(RelpConnectionConfig relpConnectionConfig, Pool<IManagedRelpConnection> relpConnectionPool) {
this.relpAddress = relpConnectionConfig.relpAddress();
this.relpPort = relpConnectionConfig.relpPort();

if (!connected) {
try {
Thread.sleep(reconnectInterval);
retriedConnects.inc();
}
catch (InterruptedException e) {
LOGGER
.warn(
"Sleep interrupted while waiting for reconnectInterval <[{}]> on <[{}]>:<[{}]>",
reconnectInterval, relpAddress, relpPort, e
);
}
}
}
this.relpConnectionPool = relpConnectionPool;
}

@Override
public void accept(byte[] syslogMessage) {
try (final Timer.Context context = sendLatency.time()) {
RelpBatch batch = new RelpBatch();
batch.insert(syslogMessage);

boolean allSent = false;
while (!allSent) {
try {
this.relpConnection.commit(batch);

// metrics
// NOTICE these if batch size changes
records.inc(1);
bytes.inc(syslogMessage.length);

}
catch (IllegalStateException | IOException | TimeoutException e) {
LOGGER.error("Exception while committing a batch to <[{}]>:<[{}]>", relpAddress, relpPort, e);
}
// Check if everything has been sent, retry and reconnect if not.
if (!batch.verifyTransactionAll()) {
batch.retryAllFailed();

// metrics
// NOTICE this if batch size changes
resends.inc(1);
relpConnection.tearDown();
try {
Thread.sleep(reconnectInterval);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
connect();
}
else {
allSent = true;
}
}
}
RelpBatch batch = new RelpBatch();
batch.insert(syslogMessage);
IManagedRelpConnection connection = relpConnectionPool.get();
connection.ensureSent(syslogMessage);
relpConnectionPool.offer(connection);
}

@Override
public String toString() {
return "DefaultOutput{" + "relpAddress='" + relpAddress + '\'' + ", relpPort=" + relpPort + '}';
}

@Override
public void close() {
try {
relpConnection.disconnect();
}
catch (IOException | TimeoutException e) {
LOGGER.warn("Exception while disconnecting from <[{}]>:<[{}]>", relpAddress, relpPort, e);
}
finally {
relpConnection.tearDown();
}
relpConnectionPool.close();
}
}
17 changes: 15 additions & 2 deletions src/main/java/com/teragrep/aer_02/EventDataConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.teragrep.aer_02.config.RelpConfig;
import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.aer_02.config.SyslogConfig;
import com.teragrep.aer_02.config.source.Sourceable;
import com.teragrep.rlo_14.Facility;
import com.teragrep.rlo_14.SDElement;
import com.teragrep.rlo_14.Severity;
import com.teragrep.rlo_14.SyslogMessage;
import com.teragrep.rlp_01.client.SSLContextSupplier;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand All @@ -71,10 +72,22 @@ final class EventDataConsumer implements AutoCloseable {
private final SyslogConfig syslogConfig;
private final MetricRegistry metricRegistry;

EventDataConsumer(
final Sourceable configSource,
final String hostname,
final MetricRegistry metricRegistry,
final SSLContextSupplier sslContextSupplier
) {
this(
configSource,
new DefaultOutput("defaultOutput", new RelpConnectionConfig(configSource), metricRegistry, sslContextSupplier), hostname, metricRegistry
);
}

EventDataConsumer(final Sourceable configSource, final String hostname, final MetricRegistry metricRegistry) {
this(
configSource,
new DefaultOutput("defaultOutput", new RelpConfig(configSource), metricRegistry),
new DefaultOutput("defaultOutput", new RelpConnectionConfig(configSource), metricRegistry),
hostname,
metricRegistry
);
Expand Down
Loading

0 comments on commit e3522a9

Please sign in to comment.