Skip to content

Commit

Permalink
- Made topology awareness optional
Browse files Browse the repository at this point in the history
- Upgraded some dependencies
  • Loading branch information
EinsamHauer committed Oct 4, 2022
1 parent 352e8f4 commit 335dcc1
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 52 deletions.
1 change: 1 addition & 0 deletions config/disthene.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ store:
readTimeout: 10
connectTimeout: 10
batch: true
topologyAware: false
batchSize: 500
pool: 2
index:
Expand Down
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>net.iponweb.disthene</groupId>
<artifactId>disthene</artifactId>
<packaging>jar</packaging>
<version>2.0.2</version>
<version>2.0.3</version>
<name>disthene</name>

<properties>
Expand All @@ -21,7 +21,7 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.0</version>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand All @@ -36,7 +36,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.72.Final</version>
<version>4.1.77.Final</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
Expand All @@ -46,7 +46,7 @@
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.29</version>
<version>1.33</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand All @@ -66,7 +66,7 @@
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.13.0</version>
<version>4.15.0</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/net/iponweb/disthene/config/StoreConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class StoreConfiguration {
private int maxConcurrentRequests = 1024;
private int maxQueueSize = 1024*1024;
private boolean batch;
private boolean topologyAware = false;
private int batchSize;
private int pool;
private String tableTemplate = "metric_%s_%d"; //%s - tenant, %d rollup
Expand Down Expand Up @@ -123,6 +124,14 @@ public void setBatch(boolean batch) {
this.batch = batch;
}

public boolean isTopologyAware() {
return topologyAware;
}

public void setTopologyAware(boolean topologyAware) {
this.topologyAware = topologyAware;
}

public int getBatchSize() {
return batchSize;
}
Expand Down Expand Up @@ -169,6 +178,7 @@ public String toString() {
", maxConcurrentRequests=" + maxConcurrentRequests +
", maxQueueSize=" + maxQueueSize +
", batch=" + batch +
", topologyAware=" + topologyAware +
", batchSize=" + batchSize +
", pool=" + pool +
", tableTemplate='" + tableTemplate + '\'' +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package net.iponweb.disthene.service.store;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import net.engio.mbassy.bus.MBassador;
import net.iponweb.disthene.bean.Metric;
import net.iponweb.disthene.events.DistheneEvent;
Expand All @@ -16,7 +13,9 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.*;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;

Expand All @@ -31,16 +30,13 @@ class BatchWriterThread extends WriterThread {

private final int batchSize;

private final List<BoundStatement> statements = new LinkedList<>();
private final List<BatchableStatement<?>> statements = new LinkedList<>();

private long lastFlushTimestamp = System.currentTimeMillis();

private final TokenMap tokenMap;

BatchWriterThread(String name, MBassador<DistheneEvent> bus, CqlSession session, TablesRegistry tablesRegistry, BlockingQueue<Metric> metrics, Executor executor, int batchSize) {
super(name, bus, session, tablesRegistry, metrics, executor);
this.batchSize = batchSize;
this.tokenMap = session.getMetadata().getTokenMap().orElse(null);
}

@Override
Expand All @@ -67,14 +63,13 @@ private void addToBatch(Metric metric) {
return;
}

Token token = tokenMap != null ? tokenMap.newToken(TypeCodecs.TEXT.encode(metric.getPath(), ProtocolVersion.DEFAULT)) : null;
statements.add(
statement.bind(
metric.getRollup() * metric.getPeriod(),
Collections.singletonList(metric.getValue()),
metric.getPath(),
metric.getTimestamp()
).setRoutingToken(token)
)
);

if (statements.size() >= batchSize || (lastFlushTimestamp < System.currentTimeMillis() - INTERVAL)) {
Expand All @@ -84,42 +79,24 @@ private void addToBatch(Metric metric) {
}

private synchronized void flush() {
List<List<BatchableStatement<?>>> batches = splitByToken();

for (List<BatchableStatement<?>> batchStatements : batches) {
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED, batchStatements);
final int batchSize = batchStatements.size();

requestsInFlight.incrementAndGet();
session
.executeAsync(batch)
.whenComplete((version, error) -> {
requestsInFlight.decrementAndGet();
if (error != null) {
bus.post(new StoreErrorEvent(batchSize)).now();
logger.error(error);
} else {
bus.post(new StoreSuccessEvent(batchSize)).now();
}
});
}
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED, statements);

requestsInFlight.incrementAndGet();
session
.executeAsync(batch)
.whenComplete((version, error) -> {
requestsInFlight.decrementAndGet();
if (error != null) {
bus.post(new StoreErrorEvent(statements.size())).now();
logger.error(error);
} else {
bus.post(new StoreSuccessEvent(statements.size())).now();
}
});

statements.clear();
}

private List<List<BatchableStatement<?>>> splitByToken() {
Map<Optional<Node>, List<BatchableStatement<?>>> batches = new HashMap<>();
for (BoundStatement statement : statements) {
Queue<Node> nodes = session.getContext().getLoadBalancingPolicy(DriverExecutionProfile.DEFAULT_NAME).newQueryPlan(statement, session);

Optional<Node> primaryNode = nodes.isEmpty() ? Optional.empty() : Optional.of(nodes.poll());

batches.computeIfAbsent(primaryNode, node -> new ArrayList<>()).add(statement);
}

return new ArrayList<>(batches.values());
}

@Override
public void shutdown() {
shutdown = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public CassandraService(StoreConfiguration storeConfiguration, MBassador<Disthen
TablesRegistry tablesRegistry = new TablesRegistry(session, storeConfiguration);

// Creating writers
if (storeConfiguration.isBatch()) {
if (storeConfiguration.isBatch() && !storeConfiguration.isTopologyAware()) {
for (int i = 0; i < storeConfiguration.getPool(); i++) {
WriterThread writerThread = new BatchWriterThread(
"distheneCassandraBatchWriter" + i,
Expand All @@ -90,6 +90,21 @@ public CassandraService(StoreConfiguration storeConfiguration, MBassador<Disthen
storeConfiguration.getBatchSize()
);

writerThreads.add(writerThread);
writerThread.start();
}
} else if (storeConfiguration.isBatch() && storeConfiguration.isTopologyAware()) {
for (int i = 0; i < storeConfiguration.getPool(); i++) {
WriterThread writerThread = new TopologyAwareBatchWriterThread(
"distheneCassandraBatchWriter" + i,
bus,
session,
tablesRegistry,
metrics,
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()),
storeConfiguration.getBatchSize()
);

writerThreads.add(writerThread);
writerThread.start();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package net.iponweb.disthene.service.store;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import net.engio.mbassy.bus.MBassador;
import net.iponweb.disthene.bean.Metric;
import net.iponweb.disthene.events.DistheneEvent;
import net.iponweb.disthene.events.StoreErrorEvent;
import net.iponweb.disthene.events.StoreSuccessEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;

/**
* @author Andrei Ivanov
*/
class TopologyAwareBatchWriterThread extends WriterThread {
//todo: interval via config?
private static final long INTERVAL = 60_000;

private static final Logger logger = LogManager.getLogger(TopologyAwareBatchWriterThread.class);

private final int batchSize;

private final List<BoundStatement> statements = new LinkedList<>();

private long lastFlushTimestamp = System.currentTimeMillis();

private final TokenMap tokenMap;

TopologyAwareBatchWriterThread(String name, MBassador<DistheneEvent> bus, CqlSession session, TablesRegistry tablesRegistry, BlockingQueue<Metric> metrics, Executor executor, int batchSize) {
super(name, bus, session, tablesRegistry, metrics, executor);
this.batchSize = batchSize;
this.tokenMap = session.getMetadata().getTokenMap().orElse(null);
}

@Override
public void run() {
try {
while (!shutdown) {
Metric metric = metrics.take();
addToBatch(metric);
}

if (statements.size() > 0) {
flush();
}
} catch (InterruptedException e) {
if (!shutdown) logger.error("Thread interrupted", e);
this.interrupt();
}
}

private void addToBatch(Metric metric) {
PreparedStatement statement = tablesRegistry.getStatement(metric.getTenant(), metric.getRollup());
if (statement == null) {
logger.error("Unable to store metric " + metric + ". Can't get the statement");
return;
}

Token token = tokenMap != null ? tokenMap.newToken(TypeCodecs.TEXT.encode(metric.getPath(), ProtocolVersion.DEFAULT)) : null;
statements.add(
statement.bind(
metric.getRollup() * metric.getPeriod(),
Collections.singletonList(metric.getValue()),
metric.getPath(),
metric.getTimestamp()
).setRoutingToken(token)
);

if (statements.size() >= batchSize || (lastFlushTimestamp < System.currentTimeMillis() - INTERVAL)) {
lastFlushTimestamp = System.currentTimeMillis();
flush();
}
}

private synchronized void flush() {
List<List<BatchableStatement<?>>> batches = splitByToken();

for (List<BatchableStatement<?>> batchStatements : batches) {
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED, batchStatements);
final int batchSize = batchStatements.size();

requestsInFlight.incrementAndGet();
session
.executeAsync(batch)
.whenComplete((version, error) -> {
requestsInFlight.decrementAndGet();
if (error != null) {
bus.post(new StoreErrorEvent(batchSize)).now();
logger.error(error);
} else {
bus.post(new StoreSuccessEvent(batchSize)).now();
}
});
}

statements.clear();
}

private List<List<BatchableStatement<?>>> splitByToken() {
Map<Optional<Node>, List<BatchableStatement<?>>> batches = new HashMap<>();
for (BoundStatement statement : statements) {
Queue<Node> nodes = session.getContext().getLoadBalancingPolicy(DriverExecutionProfile.DEFAULT_NAME).newQueryPlan(statement, session);

Optional<Node> primaryNode = nodes.isEmpty() ? Optional.empty() : Optional.of(nodes.poll());

batches.computeIfAbsent(primaryNode, node -> new ArrayList<>()).add(statement);
}

return new ArrayList<>(batches.values());
}

@Override
public void shutdown() {
shutdown = true;

logger.info("Flushing leftovers");
flush();

while (requestsInFlight.get() > 0) {
logger.info("Requests in flight: " + requestsInFlight.get());
try {
//noinspection BusyWait
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("Wait interrupted", e);
}
}

this.interrupt();
}
}

0 comments on commit 335dcc1

Please sign in to comment.