Skip to content

Commit

Permalink
- A slightly more gentle shutdown
Browse files Browse the repository at this point in the history
- log4j upgrade
- Retry policy
  • Loading branch information
EinsamHauer committed Feb 17, 2022
1 parent 00bd3b3 commit 352e8f4
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 50 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ There are a couple of things which seem to be an absolute must and which were mi
* aggregation: ability to sum similar metrics from several servers
* blacklisting: ability to omit storing metrics which match a certain pattern. Makes not much sense by itself but is quite handy when you have previous item
* caching incoming paths: this may really reduce the load on Elasticsearch cluster
* some minimal set of own metrics (received count, write count, etc)
* some minimal set of own metrics (received count, write count, etc.)
* true multitenancy

The other thing is performance. **Disthene** is being developed with one major requirement in mind - performance.
Expand Down Expand Up @@ -123,7 +123,7 @@ stats:
Configuration is straight forward as per log4j

##### Blacklist configuration in blacklist.yaml
This is a list of regular expressions per tenant. Matching metrics will NOT be store but they still WILL be aggregated (see below)
This is a list of regular expressions per tenant. Matching metrics will NOT be store, but they still WILL be aggregated (see below)

##### Whitelist configuration in whitelist.yaml
This is a list of regular expressions per tenant. Matching metrics will override blacklist rules.
Expand Down
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.iponweb.disthene</groupId>
<artifactId>disthene</artifactId>
<packaging>jar</packaging>
<version>2.0.1</version>
<version>2.0.2</version>
<name>disthene</name>

<properties>
Expand All @@ -21,22 +21,22 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.15.0</version>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.15.0</version>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.15.0</version>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.71.Final</version>
<version>4.1.72.Final</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
Expand Down Expand Up @@ -71,7 +71,7 @@
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.7.1</version>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>net.engio</groupId>
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/net/iponweb/disthene/Disthene.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private void run() {
}

BlackListConfiguration blackListConfiguration = new BlackListConfiguration(blacklistRules, whitelistRules);
logger.debug("Running with the following blacklist: " + blackListConfiguration.toString());
logger.debug("Running with the following blacklist: " + blackListConfiguration);
blacklistService = new BlacklistService(blackListConfiguration);

logger.info("Creating metric service");
Expand Down Expand Up @@ -133,7 +133,7 @@ private void run() {
in = Files.newInputStream(Paths.get(aggregationConfigLocation));
AggregationConfiguration aggregationConfiguration = new AggregationConfiguration(yaml.load(in));
in.close();
logger.debug("Running with the following aggregation rule set: " + aggregationConfiguration.toString());
logger.debug("Running with the following aggregation rule set: " + aggregationConfiguration);
logger.info("Creating sum aggregator");
sumService = new SumService(bus, distheneConfiguration, aggregationConfiguration, blacklistService);

Expand Down Expand Up @@ -233,7 +233,7 @@ public void handle(Signal signal) {

blacklistService.setRules(blackListConfiguration);

logger.debug("Reloaded blacklist: " + blackListConfiguration.toString());
logger.debug("Reloaded blacklist: " + blackListConfiguration);
} catch (Exception e) {
logger.error("Reloading blacklists failed");
logger.error(e);
Expand All @@ -247,7 +247,7 @@ public void handle(Signal signal) {
in.close();

sumService.setAggregationConfiguration(aggregationConfiguration);
logger.debug("Reloaded aggregation rules: " + aggregationConfiguration.toString());
logger.debug("Reloaded aggregation rules: " + aggregationConfiguration);
} catch (Exception e) {
logger.error("Reloading aggregation rules failed");
logger.error(e);
Expand Down Expand Up @@ -281,7 +281,7 @@ public void handle(Signal signal) {
logger.info("Shutting down carbon server");
carbonServer.shutdown();

// We will probably loose some last stats here. But leaving it to run will complicate things
// We will probably lose some last stats here. But leaving it to run will complicate things
logger.info("Shutting down stats service");
statsService.shutdown();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void handle(MetricStoreEvent metricStoreEvent) {
if (indexConfiguration.isCache()) {
handleWithCache(metricStoreEvent.getMetric());
} else {
//noinspection ResultOfMethodCallIgnored
metrics.offer(metricStoreEvent.getMetric());
}
}
Expand All @@ -93,6 +94,7 @@ private void handleWithCache(Metric metric) {
Long lastSeen = tenantPaths.put(metric.getPath(), System.currentTimeMillis() / 1000L);

if (lastSeen == null) {
//noinspection ResultOfMethodCallIgnored
metrics.offer(metric);
}
}
Expand Down Expand Up @@ -126,11 +128,6 @@ public synchronized void invalidateCache() {
public void shutdown() {
scheduler.shutdown();
indexThread.shutdown();
logger.info("Sleeping for 10 seconds to allow leftovers to be written");
try {
Thread.sleep(10000);
} catch (InterruptedException ignored) {
}
logger.info("Closing ES client");

try {
Expand Down
35 changes: 25 additions & 10 deletions src/main/java/net/iponweb/disthene/service/index/IndexThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author Andrei Ivanov
Expand Down Expand Up @@ -82,16 +83,14 @@ public void run() {
Metric metric = metrics.take();
addToBatch(metric);
} catch (Exception e) {
logger.warn("Encountered error in busy loop: ", e);
if (!shutdown) logger.error("Encountered error in busy loop: ", e);
}
}

if (request.size() > 0) {
try {
flush();
} catch (Exception e) {
logger.warn("Encountered error in busy loop: ", e);
}
try {
flush();
} catch (Exception e) {
logger.error("Encountered error in busy loop: ", e);
}
}

Expand All @@ -104,7 +103,9 @@ private void addToBatch(Metric metric) throws IOException {
}
}

private void flush() throws IOException {
private synchronized void flush() throws IOException {
if (request.size() <= 0) return;

MultiGetResponse multiGetResponse = client.mget(request, RequestOptions.DEFAULT);

for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
Expand All @@ -123,7 +124,7 @@ private void flush() throws IOException {
}
sb.append(parts[i]);
try {
bulkProcessor.add(new IndexRequest(index).id(metric.getTenant() + "_" + sb.toString()).source(
bulkProcessor.add(new IndexRequest(index).id(metric.getTenant() + "_" + sb).source(
XContentFactory.jsonBuilder().startObject()
.field("tenant", metric.getTenant())
.field("path", sb.toString())
Expand All @@ -146,6 +147,21 @@ private void flush() throws IOException {

public void shutdown() {
shutdown = true;

logger.info("Flushing leftovers");
try {
flush();
logger.info("Finished flushing leftovers");
} catch (IOException e) {
logger.error("Flush failed", e);
}

logger.info("Waiting for bulk processor to complete");
try {
bulkProcessor.awaitClose(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
logger.error("AwaitClose interrupted", e);
}
this.interrupt();
}

Expand All @@ -154,7 +170,6 @@ private static class MetricMultiGetRequest extends MultiGetRequest {
private final String index;
final Map<String, Metric> metrics = new HashMap<>();


public MetricMultiGetRequest(String index) {
this.index = index;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void run() {
flush();
}
} catch (InterruptedException e) {
logger.error("Thread interrupted", e);
if (!shutdown) logger.error("Thread interrupted", e);
this.interrupt();
}
}
Expand Down Expand Up @@ -83,16 +83,18 @@ private void addToBatch(Metric metric) {
}
}

private void flush() {
private synchronized void flush() {
List<List<BatchableStatement<?>>> batches = splitByToken();

for (List<BatchableStatement<?>> batchStatements : batches) {
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED, batchStatements);
final int batchSize = batchStatements.size();

requestsInFlight.incrementAndGet();
session
.executeAsync(batch)
.whenComplete((version, error) -> {
requestsInFlight.decrementAndGet();
if (error != null) {
bus.post(new StoreErrorEvent(batchSize)).now();
logger.error(error);
Expand All @@ -117,4 +119,24 @@ private List<List<BatchableStatement<?>>> splitByToken() {

return new ArrayList<>(batches.values());
}

@Override
public void shutdown() {
shutdown = true;

logger.info("Flushing leftovers");
flush();

while (requestsInFlight.get() > 0) {
logger.info("Requests in flight: " + requestsInFlight.get());
try {
//noinspection BusyWait
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("Wait interrupted", e);
}
}

this.interrupt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -57,6 +55,7 @@ public CassandraService(StoreConfiguration storeConfiguration, MBassador<Disthen
.withClass(DefaultDriverOption.REQUEST_THROTTLER_CLASS, ConcurrencyLimitingRequestThrottler.class)
.withInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_CONCURRENT_REQUESTS, storeConfiguration.getMaxConcurrentRequests())
.withInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, storeConfiguration.getMaxQueueSize())
.withClass(DefaultDriverOption.RETRY_POLICY_CLASS, CustomRetryPolicy.class)
.build();

CqlSessionBuilder builder = CqlSession.builder()
Expand Down Expand Up @@ -115,9 +114,10 @@ private List<String> getContactPoints(StoreConfiguration storeConfiguration) {
return storeConfiguration.getCluster().stream().map(s -> s + ":" + storeConfiguration.getPort()).collect(Collectors.toList());
}

@SuppressWarnings("unused")
@SuppressWarnings({"unused"})
@Handler
public void handle(MetricStoreEvent metricStoreEvent) {
//noinspection ResultOfMethodCallIgnored
metrics.offer(metricStoreEvent.getMetric());
}

Expand All @@ -128,6 +128,7 @@ public void shutdown() {

logger.info("Closing C* session");
session.close();
logger.info("C* session closed");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package net.iponweb.disthene.service.store;

import com.datastax.oss.driver.api.core.config.DriverOption;
import edu.umd.cs.findbugs.annotations.NonNull;

public enum CustomDriverOption implements DriverOption {
CUSTOM_NUMBER_OF_RETRIES("custom.retry-policy.max-retries"),
;

private final String path;

CustomDriverOption(String path) {
this.path = path;
}

@NonNull
@Override
public String getPath() {
return path;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package net.iponweb.disthene.service.store;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.retry.RetryDecision;
import com.datastax.oss.driver.api.core.servererrors.WriteType;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy;
import edu.umd.cs.findbugs.annotations.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomRetryPolicy extends DefaultRetryPolicy {

private static final Logger LOG = LoggerFactory.getLogger(DefaultRetryPolicy.class);

private final String logPrefix;
private final int maxRetries;

private static final int DEFAULT_MAX_RETRIES = 10;

public CustomRetryPolicy(DriverContext context, String profileName) {
super(context, profileName);
this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName;

maxRetries = context != null ? context.getConfig().getProfile(profileName).getInt(CustomDriverOption.CUSTOM_NUMBER_OF_RETRIES, DEFAULT_MAX_RETRIES) : DEFAULT_MAX_RETRIES;
}

@SuppressWarnings("deprecation")
@Override
public RetryDecision onWriteTimeout(@NonNull Request request, @NonNull ConsistencyLevel cl, @NonNull WriteType writeType, int blockFor, int received, int retryCount) {
RetryDecision decision =
(retryCount <= maxRetries)
? RetryDecision.RETRY_NEXT
: RetryDecision.RETHROW;

if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
LOG.info(
RETRYING_ON_WRITE_TIMEOUT, logPrefix, cl, writeType, blockFor, received, retryCount);
}
return decision;
}
}
Loading

0 comments on commit 352e8f4

Please sign in to comment.