From 759cc69bc5b2f6c5e2f9b04910fcfe7c0f0fc857 Mon Sep 17 00:00:00 2001 From: xzchaoo Date: Wed, 13 Mar 2024 14:58:42 +0800 Subject: [PATCH] feat: support discarding intermediate data when aggtask is updated (#818) --- .../server/agg/v1/core/conf/Extension.java | 6 ++++++ .../executor/executor/PartitionProcessor.java | 20 +++++++++++++++++-- .../agg/v1/executor/executor/XAggTask.java | 4 ++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/Extension.java b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/Extension.java index a8dca59a1..33679da57 100644 --- a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/Extension.java +++ b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/Extension.java @@ -18,4 +18,10 @@ public class Extension { * Whether to enter debug mode */ private boolean debug; + + /** + * When an AggTask update is found, whether to discard the intermediate calculation results of the + * current cycle + */ + private boolean discardWhenUpdate; } diff --git a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/PartitionProcessor.java b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/PartitionProcessor.java index 3a160c5d3..e2aaff081 100644 --- a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/PartitionProcessor.java +++ b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/PartitionProcessor.java @@ -126,7 +126,6 @@ private void processRecords(List rt = new RecursiveTask() { @Override @@ -165,8 +164,25 @@ protected Long compute() { state.updateMaxEventTimestamp(partitionMET); } + private void removeAggTaskExecutor(AggTaskKey key) { + aggTaskExecutors.remove(key); + state.getAggTaskStates().remove(key); + } + private AggTaskExecutor getOrCreateAggTaskExecutor(AggTaskKey key, XAggTask aggTask) { AggTaskExecutor e = aggTaskExecutors.get(key); + + // When an AggTask update is found, discard the intermediate calculation results of + // the current cycle. Doing so allows you to immediately update the day-level tasks instead of + // waiting until the next day to take effect. But the disadvantage is that the intermediate + // state will be lost and the data will be calculated from zero. + if (e != null && e.lastUsedAggTask != null + && aggTask.getInner().getExtension().isDiscardWhenUpdate() + && !e.lastUsedAggTask.hasSameVersion(aggTask)) { + removeAggTaskExecutor(key); + e = null; + } + if (e == null) { AggTaskState s = new AggTaskState(key); long watermark = state.getMaxEventTimestamp() // @@ -178,10 +194,10 @@ private AggTaskExecutor getOrCreateAggTaskExecutor(AggTaskKey key, XAggTask aggT s.setWatermark(watermark); e = new AggTaskExecutor(s, completenessService, output, aggMetaService); e.ignoredMinWatermark = watermark; - e.lastUsedAggTask = aggTask; aggTaskExecutors.put(key, e); state.put(s); } + e.lastUsedAggTask = aggTask; return e; } diff --git a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/XAggTask.java b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/XAggTask.java index a7e7a39ef..9a5a0337d 100644 --- a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/XAggTask.java +++ b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/XAggTask.java @@ -33,4 +33,8 @@ public XAggTask(AggTask inner, XSelect select, XWhere where) { this.select = select; this.where = where; } + + public boolean hasSameVersion(XAggTask aggTask) { + return inner.getVersion() == aggTask.getInner().getVersion(); + } }