diff --git a/README.md b/README.md index ff5d4db..892f826 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ There are a couple of things which seem to be an absolute must and which were mi * aggregation: ability to sum similar metrics from several servers * blacklisting: ability to omit storing metrics which match a certain pattern. Makes not much sense by itself but is quite handy when you have previous item * caching incoming paths: this may really reduce the load on Elasticsearch cluster -* some minimal set of own metrics (received count, write count, etc) +* some minimal set of own metrics (received count, write count, etc.) * true multitenancy The other thing is performance. **Disthene** is being developed with one major requirement in mind - performance. @@ -123,7 +123,7 @@ stats: Configuration is straight forward as per log4j ##### Blacklist configuration in blacklist.yaml -This is a list of regular expressions per tenant. Matching metrics will NOT be store but they still WILL be aggregated (see below) +This is a list of regular expressions per tenant. Matching metrics will NOT be store, but they still WILL be aggregated (see below) ##### Whitelist configuration in whitelist.yaml This is a list of regular expressions per tenant. Matching metrics will override blacklist rules. diff --git a/pom.xml b/pom.xml index 8aedbd0..04900af 100644 --- a/pom.xml +++ b/pom.xml @@ -1,10 +1,10 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 net.iponweb.disthene disthene jar - 2.0.1 + 2.0.2 disthene @@ -21,22 +21,22 @@ org.apache.logging.log4j log4j-core - 2.15.0 + 2.17.0 org.apache.logging.log4j log4j-api - 2.15.0 + 2.17.0 org.apache.logging.log4j log4j-slf4j-impl - 2.15.0 + 2.17.0 io.netty netty-all - 4.1.71.Final + 4.1.72.Final joda-time @@ -71,7 +71,7 @@ org.lz4 lz4-java - 1.7.1 + 1.8.0 net.engio diff --git a/src/main/java/net/iponweb/disthene/Disthene.java b/src/main/java/net/iponweb/disthene/Disthene.java index 38f76cc..a1958ac 100644 --- a/src/main/java/net/iponweb/disthene/Disthene.java +++ b/src/main/java/net/iponweb/disthene/Disthene.java @@ -105,7 +105,7 @@ private void run() { } BlackListConfiguration blackListConfiguration = new BlackListConfiguration(blacklistRules, whitelistRules); - logger.debug("Running with the following blacklist: " + blackListConfiguration.toString()); + logger.debug("Running with the following blacklist: " + blackListConfiguration); blacklistService = new BlacklistService(blackListConfiguration); logger.info("Creating metric service"); @@ -133,7 +133,7 @@ private void run() { in = Files.newInputStream(Paths.get(aggregationConfigLocation)); AggregationConfiguration aggregationConfiguration = new AggregationConfiguration(yaml.load(in)); in.close(); - logger.debug("Running with the following aggregation rule set: " + aggregationConfiguration.toString()); + logger.debug("Running with the following aggregation rule set: " + aggregationConfiguration); logger.info("Creating sum aggregator"); sumService = new SumService(bus, distheneConfiguration, aggregationConfiguration, blacklistService); @@ -233,7 +233,7 @@ public void handle(Signal signal) { blacklistService.setRules(blackListConfiguration); - logger.debug("Reloaded blacklist: " + blackListConfiguration.toString()); + logger.debug("Reloaded blacklist: " + blackListConfiguration); } catch (Exception e) { logger.error("Reloading blacklists failed"); logger.error(e); @@ -247,7 +247,7 @@ public void handle(Signal signal) { in.close(); sumService.setAggregationConfiguration(aggregationConfiguration); - logger.debug("Reloaded aggregation rules: " + aggregationConfiguration.toString()); + logger.debug("Reloaded aggregation rules: " + aggregationConfiguration); } catch (Exception e) { logger.error("Reloading aggregation rules failed"); logger.error(e); @@ -281,7 +281,7 @@ public void handle(Signal signal) { logger.info("Shutting down carbon server"); carbonServer.shutdown(); - // We will probably loose some last stats here. But leaving it to run will complicate things + // We will probably lose some last stats here. But leaving it to run will complicate things logger.info("Shutting down stats service"); statsService.shutdown(); diff --git a/src/main/java/net/iponweb/disthene/service/index/IndexService.java b/src/main/java/net/iponweb/disthene/service/index/IndexService.java index 6d2f0ba..251af9a 100644 --- a/src/main/java/net/iponweb/disthene/service/index/IndexService.java +++ b/src/main/java/net/iponweb/disthene/service/index/IndexService.java @@ -84,6 +84,7 @@ public void handle(MetricStoreEvent metricStoreEvent) { if (indexConfiguration.isCache()) { handleWithCache(metricStoreEvent.getMetric()); } else { + //noinspection ResultOfMethodCallIgnored metrics.offer(metricStoreEvent.getMetric()); } } @@ -93,6 +94,7 @@ private void handleWithCache(Metric metric) { Long lastSeen = tenantPaths.put(metric.getPath(), System.currentTimeMillis() / 1000L); if (lastSeen == null) { + //noinspection ResultOfMethodCallIgnored metrics.offer(metric); } } @@ -126,11 +128,6 @@ public synchronized void invalidateCache() { public void shutdown() { scheduler.shutdown(); indexThread.shutdown(); - logger.info("Sleeping for 10 seconds to allow leftovers to be written"); - try { - Thread.sleep(10000); - } catch (InterruptedException ignored) { - } logger.info("Closing ES client"); try { diff --git a/src/main/java/net/iponweb/disthene/service/index/IndexThread.java b/src/main/java/net/iponweb/disthene/service/index/IndexThread.java index 96db340..857c5d6 100644 --- a/src/main/java/net/iponweb/disthene/service/index/IndexThread.java +++ b/src/main/java/net/iponweb/disthene/service/index/IndexThread.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; /** * @author Andrei Ivanov @@ -82,16 +83,14 @@ public void run() { Metric metric = metrics.take(); addToBatch(metric); } catch (Exception e) { - logger.warn("Encountered error in busy loop: ", e); + if (!shutdown) logger.error("Encountered error in busy loop: ", e); } } - if (request.size() > 0) { - try { - flush(); - } catch (Exception e) { - logger.warn("Encountered error in busy loop: ", e); - } + try { + flush(); + } catch (Exception e) { + logger.error("Encountered error in busy loop: ", e); } } @@ -104,7 +103,9 @@ private void addToBatch(Metric metric) throws IOException { } } - private void flush() throws IOException { + private synchronized void flush() throws IOException { + if (request.size() <= 0) return; + MultiGetResponse multiGetResponse = client.mget(request, RequestOptions.DEFAULT); for (MultiGetItemResponse response : multiGetResponse.getResponses()) { @@ -123,7 +124,7 @@ private void flush() throws IOException { } sb.append(parts[i]); try { - bulkProcessor.add(new IndexRequest(index).id(metric.getTenant() + "_" + sb.toString()).source( + bulkProcessor.add(new IndexRequest(index).id(metric.getTenant() + "_" + sb).source( XContentFactory.jsonBuilder().startObject() .field("tenant", metric.getTenant()) .field("path", sb.toString()) @@ -146,6 +147,21 @@ private void flush() throws IOException { public void shutdown() { shutdown = true; + + logger.info("Flushing leftovers"); + try { + flush(); + logger.info("Finished flushing leftovers"); + } catch (IOException e) { + logger.error("Flush failed", e); + } + + logger.info("Waiting for bulk processor to complete"); + try { + bulkProcessor.awaitClose(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + logger.error("AwaitClose interrupted", e); + } this.interrupt(); } @@ -154,7 +170,6 @@ private static class MetricMultiGetRequest extends MultiGetRequest { private final String index; final Map metrics = new HashMap<>(); - public MetricMultiGetRequest(String index) { this.index = index; } diff --git a/src/main/java/net/iponweb/disthene/service/store/BatchWriterThread.java b/src/main/java/net/iponweb/disthene/service/store/BatchWriterThread.java index 7485bec..c65faba 100644 --- a/src/main/java/net/iponweb/disthene/service/store/BatchWriterThread.java +++ b/src/main/java/net/iponweb/disthene/service/store/BatchWriterThread.java @@ -55,7 +55,7 @@ public void run() { flush(); } } catch (InterruptedException e) { - logger.error("Thread interrupted", e); + if (!shutdown) logger.error("Thread interrupted", e); this.interrupt(); } } @@ -83,16 +83,18 @@ private void addToBatch(Metric metric) { } } - private void flush() { + private synchronized void flush() { List>> batches = splitByToken(); for (List> batchStatements : batches) { BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED, batchStatements); final int batchSize = batchStatements.size(); + requestsInFlight.incrementAndGet(); session .executeAsync(batch) .whenComplete((version, error) -> { + requestsInFlight.decrementAndGet(); if (error != null) { bus.post(new StoreErrorEvent(batchSize)).now(); logger.error(error); @@ -117,4 +119,24 @@ private List>> splitByToken() { return new ArrayList<>(batches.values()); } + + @Override + public void shutdown() { + shutdown = true; + + logger.info("Flushing leftovers"); + flush(); + + while (requestsInFlight.get() > 0) { + logger.info("Requests in flight: " + requestsInFlight.get()); + try { + //noinspection BusyWait + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.error("Wait interrupted", e); + } + } + + this.interrupt(); + } } diff --git a/src/main/java/net/iponweb/disthene/service/store/CassandraService.java b/src/main/java/net/iponweb/disthene/service/store/CassandraService.java index 349a06b..fc2167f 100644 --- a/src/main/java/net/iponweb/disthene/service/store/CassandraService.java +++ b/src/main/java/net/iponweb/disthene/service/store/CassandraService.java @@ -23,9 +23,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.*; import java.util.stream.Collectors; /** @@ -57,6 +55,7 @@ public CassandraService(StoreConfiguration storeConfiguration, MBassador getContactPoints(StoreConfiguration storeConfiguration) { return storeConfiguration.getCluster().stream().map(s -> s + ":" + storeConfiguration.getPort()).collect(Collectors.toList()); } - @SuppressWarnings("unused") + @SuppressWarnings({"unused"}) @Handler public void handle(MetricStoreEvent metricStoreEvent) { + //noinspection ResultOfMethodCallIgnored metrics.offer(metricStoreEvent.getMetric()); } @@ -128,6 +128,7 @@ public void shutdown() { logger.info("Closing C* session"); session.close(); + logger.info("C* session closed"); } } diff --git a/src/main/java/net/iponweb/disthene/service/store/CustomDriverOption.java b/src/main/java/net/iponweb/disthene/service/store/CustomDriverOption.java new file mode 100644 index 0000000..9be4e2d --- /dev/null +++ b/src/main/java/net/iponweb/disthene/service/store/CustomDriverOption.java @@ -0,0 +1,21 @@ +package net.iponweb.disthene.service.store; + +import com.datastax.oss.driver.api.core.config.DriverOption; +import edu.umd.cs.findbugs.annotations.NonNull; + +public enum CustomDriverOption implements DriverOption { + CUSTOM_NUMBER_OF_RETRIES("custom.retry-policy.max-retries"), + ; + + private final String path; + + CustomDriverOption(String path) { + this.path = path; + } + + @NonNull + @Override + public String getPath() { + return path; + } +} \ No newline at end of file diff --git a/src/main/java/net/iponweb/disthene/service/store/CustomRetryPolicy.java b/src/main/java/net/iponweb/disthene/service/store/CustomRetryPolicy.java new file mode 100644 index 0000000..9b79870 --- /dev/null +++ b/src/main/java/net/iponweb/disthene/service/store/CustomRetryPolicy.java @@ -0,0 +1,43 @@ +package net.iponweb.disthene.service.store; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.retry.RetryDecision; +import com.datastax.oss.driver.api.core.servererrors.WriteType; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy; +import edu.umd.cs.findbugs.annotations.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CustomRetryPolicy extends DefaultRetryPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultRetryPolicy.class); + + private final String logPrefix; + private final int maxRetries; + + private static final int DEFAULT_MAX_RETRIES = 10; + + public CustomRetryPolicy(DriverContext context, String profileName) { + super(context, profileName); + this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName; + + maxRetries = context != null ? context.getConfig().getProfile(profileName).getInt(CustomDriverOption.CUSTOM_NUMBER_OF_RETRIES, DEFAULT_MAX_RETRIES) : DEFAULT_MAX_RETRIES; + } + + @SuppressWarnings("deprecation") + @Override + public RetryDecision onWriteTimeout(@NonNull Request request, @NonNull ConsistencyLevel cl, @NonNull WriteType writeType, int blockFor, int received, int retryCount) { + RetryDecision decision = + (retryCount <= maxRetries) + ? RetryDecision.RETRY_NEXT + : RetryDecision.RETHROW; + + if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) { + LOG.info( + RETRYING_ON_WRITE_TIMEOUT, logPrefix, cl, writeType, blockFor, received, retryCount); + } + return decision; + } +} \ No newline at end of file diff --git a/src/main/java/net/iponweb/disthene/service/store/SingleWriterThread.java b/src/main/java/net/iponweb/disthene/service/store/SingleWriterThread.java index 37c36f2..d62a221 100644 --- a/src/main/java/net/iponweb/disthene/service/store/SingleWriterThread.java +++ b/src/main/java/net/iponweb/disthene/service/store/SingleWriterThread.java @@ -32,7 +32,7 @@ public void run() { store(metric); } } catch (InterruptedException e) { - logger.error("Thread interrupted", e); + if (!shutdown) logger.error("Thread interrupted", e); this.interrupt(); } } @@ -44,20 +44,39 @@ private void store(Metric metric) { return; } - session.executeAsync( - statement.bind( - metric.getRollup() * metric.getPeriod(), - Collections.singletonList(metric.getValue()), - metric.getPath(), - metric.getTimestamp() - )).whenComplete((version, error) -> { - if (error != null) { - bus.post(new StoreErrorEvent(1)).now(); - logger.error(error); - } else { - bus.post(new StoreSuccessEvent(1)).now(); - } - }); + requestsInFlight.incrementAndGet(); + session + .executeAsync( + statement.bind( + metric.getRollup() * metric.getPeriod(), + Collections.singletonList(metric.getValue()), + metric.getPath(), + metric.getTimestamp() + )).whenComplete((version, error) -> { + requestsInFlight.decrementAndGet(); + if (error != null) { + bus.post(new StoreErrorEvent(1)).now(); + logger.error(error); + } else { + bus.post(new StoreSuccessEvent(1)).now(); + } + }); } + @Override + public void shutdown() { + shutdown = true; + + while (requestsInFlight.get() > 0) { + logger.info("Requests in flight: " + requestsInFlight.get()); + try { + //noinspection BusyWait + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.error("Wait interrupted", e); + } + } + + this.interrupt(); + } } diff --git a/src/main/java/net/iponweb/disthene/service/store/WriterThread.java b/src/main/java/net/iponweb/disthene/service/store/WriterThread.java index 9173b56..ddccaa0 100644 --- a/src/main/java/net/iponweb/disthene/service/store/WriterThread.java +++ b/src/main/java/net/iponweb/disthene/service/store/WriterThread.java @@ -7,6 +7,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; /** * @author Andrei Ivanov @@ -24,6 +25,8 @@ public abstract class WriterThread extends Thread { protected final Executor executor; + protected final AtomicInteger requestsInFlight = new AtomicInteger(0); + public WriterThread(String name, MBassador bus, CqlSession session, TablesRegistry tablesRegistry, BlockingQueue metrics, Executor executor) { super(name); this.bus = bus;