Skip to content

Commit

Permalink
- Backported some logic from 2.0
Browse files Browse the repository at this point in the history
- code cleanup
  • Loading branch information
EinsamHauer committed Feb 19, 2022
1 parent 12a821a commit 91f5c06
Show file tree
Hide file tree
Showing 23 changed files with 162 additions and 141 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>net.iponweb.disthene</groupId>
<artifactId>disthene</artifactId>
<packaging>jar</packaging>
<version>1.0.13</version>
<version>1.0.14</version>
<name>disthene</name>
<url>http://maven.apache.org</url>
<dependencies>
Expand Down Expand Up @@ -121,8 +121,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down
24 changes: 9 additions & 15 deletions src/main/java/net/iponweb/disthene/Disthene.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.bus.error.IPublicationErrorHandler;
import net.engio.mbassy.bus.error.PublicationError;
import net.iponweb.disthene.carbon.CarbonServer;
import net.iponweb.disthene.config.AggregationConfiguration;
import net.iponweb.disthene.config.BlackListConfiguration;
Expand Down Expand Up @@ -52,10 +51,10 @@ public class Disthene {
private static final String DEFAULT_AGGREGATION_CONFIG_LOCATION = "/etc/disthene/aggregator.yaml";
private static final String DEFAULT_LOG_CONFIG_LOCATION = "/etc/disthene/disthene-log4j.xml";

private String configLocation;
private String blacklistLocation;
private String whitelistLocation;
private String aggregationConfigLocation;
private final String configLocation;
private final String blacklistLocation;
private final String whitelistLocation;
private final String aggregationConfigLocation;

private MBassador<DistheneEvent> bus;
private BlacklistService blacklistService;
Expand Down Expand Up @@ -89,12 +88,7 @@ private void run() {
.addFeature(Feature.SyncPubSub.Default())
.addFeature(Feature.AsynchronousHandlerInvocation.Default())
.addFeature(dispatch)
.setProperty(Properties.Handler.PublicationError, new IPublicationErrorHandler() {
@Override
public void handleError(PublicationError error) {
logger.error(error);
}
})
.setProperty(Properties.Handler.PublicationError, (IPublicationErrorHandler) error -> logger.error(error))
);

logger.info("Loading blacklists & whitelists");
Expand All @@ -118,7 +112,7 @@ public void handleError(PublicationError error) {
}

BlackListConfiguration blackListConfiguration = new BlackListConfiguration(blacklistRules, whitelistRules);
logger.debug("Running with the following blacklist: " + blackListConfiguration.toString());
logger.debug("Running with the following blacklist: " + blackListConfiguration);
blacklistService = new BlacklistService(blackListConfiguration);

logger.info("Creating metric service");
Expand Down Expand Up @@ -148,7 +142,7 @@ public void handleError(PublicationError error) {
@SuppressWarnings("unchecked")
AggregationConfiguration aggregationConfiguration = new AggregationConfiguration((Map<String, Map<String, String>>) yaml.load(in));
in.close();
logger.debug("Running with the following aggregation rule set: " + aggregationConfiguration.toString());
logger.debug("Running with the following aggregation rule set: " + aggregationConfiguration);
logger.info("Creating sum aggregator");
sumService = new SumService(bus, distheneConfiguration, aggregationConfiguration, blacklistService);

Expand Down Expand Up @@ -241,7 +235,7 @@ public void handle(Signal signal) {

blacklistService.setRules(blackListConfiguration);

logger.debug("Reloaded blacklist: " + blackListConfiguration.toString());
logger.debug("Reloaded blacklist: " + blackListConfiguration);
} catch (Exception e) {
logger.error("Reloading blacklists failed");
logger.error(e);
Expand All @@ -256,7 +250,7 @@ public void handle(Signal signal) {
in.close();

sumService.setAggregationConfiguration(aggregationConfiguration);
logger.debug("Reloaded aggregation rules: " + aggregationConfiguration.toString());
logger.debug("Reloaded aggregation rules: " + aggregationConfiguration);
} catch (Exception e) {
logger.error("Reloading aggregation rules failed");
logger.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class AggregationRule {

private Pattern source;
private String destination;
private String prefix;
private final String prefix;

public AggregationRule(String sourceDefinition, String destination) {
this.destination = destination;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/iponweb/disthene/bean/Metric.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
public class Metric {

private MetricKey key;
private final MetricKey key;
private double value;

public Metric(String input, Rollup rollup) {
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/net/iponweb/disthene/bean/MetricKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
* @author Andrei Ivanov
*/
public class MetricKey {
private String tenant;
private String path;
private int rollup;
private int period;
private long timestamp;
private final String tenant;
private final String path;
private final int rollup;
private final int period;
private final long timestamp;

public MetricKey(String tenant, String path, int rollup, int period, long timestamp) {
this.tenant = tenant;
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/net/iponweb/disthene/carbon/CarbonServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import net.iponweb.disthene.service.auth.TenantService;
import org.apache.log4j.Logger;

import java.util.HashSet;

/**
* @author Andrei Ivanov
*/
Expand All @@ -23,14 +21,14 @@ public class CarbonServer {
private static final int MAX_FRAME_LENGTH = 8192 ;
private static final Logger logger = Logger.getLogger(CarbonServer.class);

private DistheneConfiguration configuration;
private final DistheneConfiguration configuration;

private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workerGroup = new NioEventLoopGroup();
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();

private MBassador<DistheneEvent> bus;
private final MBassador<DistheneEvent> bus;

private TenantService tenantService;
private final TenantService tenantService;

public CarbonServer(DistheneConfiguration configuration, MBassador<DistheneEvent> bus, TenantService tenantService) {
this.bus = bus;
Expand All @@ -45,7 +43,7 @@ public void run() throws InterruptedException {
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH, false, Delimiters.lineDelimiter()));
p.addLast(new CarbonServerHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
*/
public class AggregationConfiguration {

private Map<String, List<AggregationRule>> rules = new HashMap<>();
private final Map<String, List<AggregationRule>> rules = new HashMap<>();

public AggregationConfiguration(Map<String, Map<String, String>> rulesDefinition) {
for(Map.Entry<String, Map<String, String>> entry : rulesDefinition.entrySet()) {
if (!rules.containsKey(entry.getKey())) {
rules.put(entry.getKey(), new ArrayList<AggregationRule>());
rules.put(entry.getKey(), new ArrayList<>());
}

for(Map.Entry<String, String> def : entry.getValue().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package net.iponweb.disthene.config;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
public abstract class AbstractMetricEvent implements DistheneEvent {

private Metric metric;
private final Metric metric;

public AbstractMetricEvent(Metric metric) {
this.metric = metric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
public class StoreErrorEvent implements DistheneEvent {

private int count;
private final int count;

public StoreErrorEvent(int count) {
this.count = count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
public class StoreSuccessEvent implements DistheneEvent{

private int count;
private final int count;

public StoreSuccessEvent(int count) {
this.count = count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

/**
* @author Andrei Ivanov
*/
@Listener(references= References.Strong)
@Listener(references = References.Strong)
public class RollupService {
private static final String SCHEDULER_NAME = "distheneRollupAggregatorFlusher";
private static final String SCHEDULER_NAME = "distheneRollupAggregatorFlusherScheduler";
private static final String FLUSHER_NAME = "distheneRollupAggregatorFlusher";
private static final int RATE = 60;
private volatile boolean shuttingDown = false;

private static final Logger logger = Logger.getLogger(RollupService.class);

private MBassador<DistheneEvent> bus;
private DistheneConfiguration distheneConfiguration;
private final MBassador<DistheneEvent> bus;
private final DistheneConfiguration distheneConfiguration;
private Rollup maxRollup;
private List<Rollup> rollups;
private final List<Rollup> rollups;

private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory(SCHEDULER_NAME));
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory(SCHEDULER_NAME));
private final ExecutorService flusher = Executors.newCachedThreadPool(new NamedThreadFactory(FLUSHER_NAME));

private final ConcurrentNavigableMap<Long, ConcurrentMap<MetricKey, AverageRecord>> accumulator = new ConcurrentSkipListMap<>();

Expand All @@ -50,21 +50,17 @@ public RollupService(MBassador<DistheneEvent> bus, DistheneConfiguration disthen
this.bus = bus;
bus.subscribe(this);

for(Rollup rollup : rollups) {
for (Rollup rollup : rollups) {
if (maxRollup == null || maxRollup.getRollup() < rollup.getRollup()) {
maxRollup = rollup;
}
}

scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
flush();
}
}, 60 - ((System.currentTimeMillis() / 1000L) % 60), RATE, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(this::flush, 60 - ((System.currentTimeMillis() / 1000L) % 60), RATE, TimeUnit.SECONDS);
}

@Handler(rejectSubtypes = false)
@SuppressWarnings("unused")
@Handler()
public void handle(MetricStoreEvent metricStoreEvent) {
if (rollups.size() > 0 && maxRollup.getRollup() > metricStoreEvent.getMetric().getRollup()) {
aggregate(metricStoreEvent.getMetric());
Expand Down Expand Up @@ -98,7 +94,7 @@ private AverageRecord getAverageRecord(ConcurrentMap<MetricKey, AverageRecord> m
}

private void aggregate(Metric metric) {
for(Rollup rollup : rollups) {
for (Rollup rollup : rollups) {
long timestamp = getRollupTimestamp(metric.getTimestamp(), rollup);
ConcurrentMap<MetricKey, AverageRecord> timestampMap = getTimestampMap(timestamp);
MetricKey destinationMetricKey = new MetricKey(
Expand All @@ -113,22 +109,44 @@ private void aggregate(Metric metric) {
private void flush() {
Collection<Metric> metricsToFlush = new ArrayList<>();

while(accumulator.size() > 0 && (accumulator.firstKey() < DateTime.now(DateTimeZone.UTC).getMillis() / 1000 - distheneConfiguration.getCarbon().getAggregatorDelay() * 2)) {
logger.debug("Adding rollup flush for time: " + (new DateTime(accumulator.firstKey() * 1000)) + " (current time is " + DateTime.now(DateTimeZone.UTC) + ")");
// Get timestamps to flush
Set<Long> timestampsToFlush = new HashSet<>(accumulator.headMap(DateTime.now(DateTimeZone.UTC).getMillis() / 1000 - distheneConfiguration.getCarbon().getAggregatorDelay() * 2L).keySet());

logger.trace("There are " + timestampsToFlush.size() + " timestamps to flush");

for (Long timestamp : timestampsToFlush) {
ConcurrentMap<MetricKey, AverageRecord> timestampMap = accumulator.remove(timestamp);

// Get the earliest map
ConcurrentMap<MetricKey, AverageRecord> timestampMap = accumulator.pollFirstEntry().getValue();
// double check just in case
if (timestampMap != null) {
logger.trace("Adding rollup flush for time: " + (new DateTime(timestamp * 1000)) + " (current time is " + DateTime.now(DateTimeZone.UTC) + ")");
logger.trace("Will flush " + timestampMap.size() + " metrics");

for(Map.Entry<MetricKey, AverageRecord> entry : timestampMap.entrySet()) {
metricsToFlush.add(new Metric(entry.getKey(), entry.getValue().getAverage()));
for (Map.Entry<MetricKey, AverageRecord> entry : timestampMap.entrySet()) {
metricsToFlush.add(new Metric(entry.getKey(), entry.getValue().getAverage()));
}
logger.trace("Done adding rollup flush for time: " + (new DateTime(timestamp * 1000)) + " (current time is " + DateTime.now(DateTimeZone.UTC) + ")");
}
}

// do the flush asynchronously
if (metricsToFlush.size() > 0) {
doFlush(metricsToFlush, getFlushRateLimiter(metricsToFlush.size()));
logger.trace("Flushing total of " + metricsToFlush.size() + " metrics");

CompletableFuture.supplyAsync((Supplier<Void>) () -> {
doFlush(metricsToFlush, getFlushRateLimiter(metricsToFlush.size()));
return null;
}, flusher).whenComplete((o, error) -> {
if (error != null) {
logger.error(error);
} else {
logger.trace("Done flushing total of " + metricsToFlush.size() + " metrics");
}
});
}
}

@SuppressWarnings("UnstableApiUsage")
private RateLimiter getFlushRateLimiter(int currentBatch) {
/*
The idea is that we'd like to be able to process ALL the contents of accumulator in 1/2 time till next rollup time.
Expand All @@ -146,11 +164,12 @@ private RateLimiter getFlushRateLimiter(int currentBatch) {
if (deadline - timestamp <= 0) return null;

// 100 is an arbitrary small number here
double rate = Math.max(100, 2 * currentBatch / (deadline - timestamp));
double rate = Math.max(100, 2L * currentBatch / (deadline - timestamp));

return RateLimiter.create(rate);
}

@SuppressWarnings("UnstableApiUsage")
private void doFlush(Collection<Metric> metricsToFlush, RateLimiter rateLimiter) {
// We'd like to feed metrics in a more gentle manner here but not allowing the queue to grow.

Expand All @@ -159,7 +178,7 @@ private void doFlush(Collection<Metric> metricsToFlush, RateLimiter rateLimiter)
logger.debug("QPS is limited to " + (long) rateLimiter.getRate());
}

for(Metric metric : metricsToFlush) {
for (Metric metric : metricsToFlush) {
if (!shuttingDown && rateLimiter != null) {
rateLimiter.acquire();
}
Expand All @@ -175,8 +194,8 @@ public synchronized void shutdown() {

Collection<Metric> metricsToFlush = new ArrayList<>();

for(Map.Entry<Long, ConcurrentMap<MetricKey, AverageRecord>> entry : accumulator.entrySet()) {
for(Map.Entry<MetricKey, AverageRecord> innerEntry : entry.getValue().entrySet()) {
for (Map.Entry<Long, ConcurrentMap<MetricKey, AverageRecord>> entry : accumulator.entrySet()) {
for (Map.Entry<MetricKey, AverageRecord> innerEntry : entry.getValue().entrySet()) {
metricsToFlush.add(new Metric(innerEntry.getKey(), innerEntry.getValue().getAverage()));
}
}
Expand All @@ -188,9 +207,9 @@ private static long getRollupTimestamp(long timestamp, Rollup rollup) {
return ((long) Math.ceil(timestamp / (double) rollup.getRollup())) * rollup.getRollup();
}

private class AverageRecord {
private AtomicDouble value = new AtomicDouble(0);
private AtomicInteger count = new AtomicInteger(0);
private static class AverageRecord {
private final AtomicDouble value = new AtomicDouble(0);
private final AtomicInteger count = new AtomicInteger(0);

void addValue(double value) {
this.value.addAndGet(value);
Expand Down
Loading

0 comments on commit 91f5c06

Please sign in to comment.