Skip to content

Commit

Permalink
feat: support discarding intermediate data when aggtask is updated (#818
Browse files Browse the repository at this point in the history
)
  • Loading branch information
xzchaoo authored Mar 13, 2024
1 parent 5293a11 commit 759cc69
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ public class Extension {
* Whether to enter debug mode
*/
private boolean debug;

/**
* When an AggTask update is found, whether to discard the intermediate calculation results of the
* current cycle
*/
private boolean discardWhenUpdate;
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ private void processRecords(List<ConsumerRecord<AggTaskKey, AggProtos.AggTaskVal
}

AggTaskExecutor executor = getOrCreateAggTaskExecutor(key, latestAggTask);
executor.lastUsedAggTask = latestAggTask;

RecursiveTask<Long> rt = new RecursiveTask<Long>() {
@Override
Expand Down Expand Up @@ -165,8 +164,25 @@ protected Long compute() {
state.updateMaxEventTimestamp(partitionMET);
}

private void removeAggTaskExecutor(AggTaskKey key) {
aggTaskExecutors.remove(key);
state.getAggTaskStates().remove(key);
}

private AggTaskExecutor getOrCreateAggTaskExecutor(AggTaskKey key, XAggTask aggTask) {
AggTaskExecutor e = aggTaskExecutors.get(key);

// When an AggTask update is found, discard the intermediate calculation results of
// the current cycle. Doing so allows you to immediately update the day-level tasks instead of
// waiting until the next day to take effect. But the disadvantage is that the intermediate
// state will be lost and the data will be calculated from zero.
if (e != null && e.lastUsedAggTask != null
&& aggTask.getInner().getExtension().isDiscardWhenUpdate()
&& !e.lastUsedAggTask.hasSameVersion(aggTask)) {
removeAggTaskExecutor(key);
e = null;
}

if (e == null) {
AggTaskState s = new AggTaskState(key);
long watermark = state.getMaxEventTimestamp() //
Expand All @@ -178,10 +194,10 @@ private AggTaskExecutor getOrCreateAggTaskExecutor(AggTaskKey key, XAggTask aggT
s.setWatermark(watermark);
e = new AggTaskExecutor(s, completenessService, output, aggMetaService);
e.ignoredMinWatermark = watermark;
e.lastUsedAggTask = aggTask;
aggTaskExecutors.put(key, e);
state.put(s);
}
e.lastUsedAggTask = aggTask;
return e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ public XAggTask(AggTask inner, XSelect select, XWhere where) {
this.select = select;
this.where = where;
}

public boolean hasSameVersion(XAggTask aggTask) {
return inner.getVersion() == aggTask.getInner().getVersion();
}
}

0 comments on commit 759cc69

Please sign in to comment.