Skip to content

Commit

Permalink
feat: supports meta join in agg (#794)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Feb 2, 2024
1 parent 940482e commit 93b546c
Show file tree
Hide file tree
Showing 17 changed files with 352 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import com.google.common.base.Preconditions;

import io.holoinsight.server.agg.v1.core.data.DataAccessor;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
Expand Down Expand Up @@ -79,10 +78,4 @@ public void fixDefaultValue() {
}
}

public void fillTagValuesFromDataAccessor(String[] values, DataAccessor da) {
Preconditions.checkArgument(values.length == items.size());
for (int i = 0; i < items.size(); i++) {
values[i] = items.get(i).get(da);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
@Data
public class GroupByItem {
public static final String DEFAULT_VALUE = "-";

/**
* tag to group by
*/
Expand All @@ -39,6 +40,12 @@ public class GroupByItem {
@Nullable
private String as;

/**
* Generate group tag through joining metadata
*/
@Nullable
private JoinMeta joinMeta;

public GroupByItem() {}

GroupByItem(String tag, String defaultValue, String as) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.core.conf;

import java.util.Map;

import lombok.Data;

/**
* <p>
* created at 2024/1/10
*
* @author xzchaoo
*/
@Data
public class JoinMeta {
private String metaTable;

/**
* <code>
* {
* "app": {
* "type": "const",
* "value: "foobar"
* },
* "env": {
* "type": "const",
* "value": "PROD"
* },
* "ip": {
* "type": "tag",
* "value": "ip"
* }
* }
* </code>
*/
private Map<String, Value> condition;

/**
* When no metadata is joined, whether to discard the current data
*/
private boolean discardIfUnmatch;

@Data
public static class Value {
/**
* <ul>
* <li>const: {@link #value} is a const</li>
* <li>tag: {@link #value} is a tag name</li>
* </ul>
*/
private String type;
private String value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,35 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import lombok.extern.slf4j.Slf4j;

/**
* created at 2023/9/21
*
* @author xzchaoo
*/
@Slf4j
public class Dict {
private static final Map<String, String> DICT = new ConcurrentHashMap<>();

static {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AGG-DICT-%d").build());
scheduler.scheduleWithFixedDelay(Dict::clearDict, 3, 3, TimeUnit.MINUTES);
}

public static void clearDict() {
log.info("[agg-dict] clear dict {}", DICT.size());
DICT.clear();
}

public static Map<String, String> get(Map<String, String> m) {
if (m == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.commons.lang3.StringUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;

Expand Down Expand Up @@ -148,4 +149,13 @@ public String getTagValue(String key, String defaultValue) {

return defaultValue;
}

@VisibleForTesting
public FixedSizeTags with(String key, String value) {
String[] keys = Arrays.copyOf(this.keys, this.keys.length + 1);
String[] values = Arrays.copyOf(this.values, this.keys.length + 1);
keys[keys.length - 1] = key;
values[values.length - 1] = value;
return new FixedSizeTags(keys, values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class WindowStat {
private int discardKeyLimit;
private int error;
private int filterWhere;
private int discardMetaUnmatched;
private int matchMultiMeta;

public void incInput() {
input++;
Expand All @@ -40,4 +42,12 @@ public void incFilterWhere() {
public String toString() {
return JSON.toJSONString(this);
}

public void incDiscardMetaUnmatched() {
discardMetaUnmatched++;
}

public void incMatchMultiMeta() {
matchMultiMeta++;
}
}
8 changes: 5 additions & 3 deletions server/agg/agg-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>net.agkn</groupId>
Expand All @@ -55,7 +53,11 @@
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>io.holoinsight.server</groupId>
<artifactId>meta-facade</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.holoinsight.server.agg.v1.core.AggProperties;
import io.holoinsight.server.agg.v1.core.mapper.AggOffsetV1DOMapper;
import io.holoinsight.server.agg.v1.executor.executor.AggMetaService;
import io.holoinsight.server.agg.v1.executor.executor.AggMetaServiceImpl;
import io.holoinsight.server.agg.v1.executor.executor.ExecutorConfig;
import io.holoinsight.server.agg.v1.executor.executor.ExecutorManager;
import io.holoinsight.server.agg.v1.executor.executor.FaultToleranceConfig;
Expand All @@ -34,6 +36,7 @@
import io.holoinsight.server.common.dao.CommonDaoConfiguration;
import io.holoinsight.server.common.springboot.ConditionalOnRole;
import io.holoinsight.server.common.threadpool.ThreadPoolConfiguration;
import io.holoinsight.server.meta.facade.service.DataClientService;

/**
* <p>
Expand Down Expand Up @@ -81,7 +84,7 @@ public AsyncOutput asyncOutput() {
@Bean
public ExecutorManager executorManager(DataSource dataSource, IAggTaskService aggTaskService,
PartitionStateStore stateStore, CompletenessService completenessService, AsyncOutput output,
AggExecutorConfig aggExecutorConfig) {
AggMetaService aggMetaService) {
ExecutorConfig config = new ExecutorConfig();
config.setTopic(aggProperties.getTopic());
config.setKafkaBootstrapServers(aggProperties.getKafkaBootstrapServers());
Expand All @@ -98,11 +101,11 @@ public ExecutorManager executorManager(DataSource dataSource, IAggTaskService ag

config.getFaultToleranceConfig().setStateConfig(stateConfig);
ExecutorManager m = new ExecutorManager(config);
m.setAggExecutorConfig(aggExecutorConfig);
m.setAggTaskService(aggTaskService);
m.setStateStore(stateStore);
m.setCompletenessService(completenessService);
m.setOutput(output);
m.setAggMetaService(aggMetaService);
return m;
}

Expand All @@ -120,4 +123,9 @@ public ExecutorInitRunner executorInitRunner() {
public XOutput xConsoleOutput() {
return new XConsoleOutput();
}

@Bean
public AggMetaService aggMetaService(DataClientService dataClientService) {
return new AggMetaServiceImpl(dataClientService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.executor.executor;

import java.util.List;
import java.util.Map;

/**
* <p>
* created at 2024/1/10
*
* @author xzchaoo
*/
public interface AggMetaService {
/**
*
* @param metaTable
* @param condition
* @return
*/
List<Map<String, Object>> find(String metaTable, Map<String, Object> condition);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.executor.executor;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

import io.holoinsight.server.meta.common.model.QueryExample;
import io.holoinsight.server.meta.facade.service.DataClientService;
import lombok.Data;

/**
* <p>
* created at 2024/1/11
*
* @author xiangfeng.xzc
*/
public class AggMetaServiceImpl implements AggMetaService {
private DataClientService dataClientService;

private Cache<CacheKey, List<Map<String, Object>>> cache = Caffeine.newBuilder() //
.expireAfterWrite(Duration.ofMinutes(1)) //
.softValues() //
.build();

public AggMetaServiceImpl(DataClientService dataClientService) {
this.dataClientService = Objects.requireNonNull(dataClientService);
}

@Override
public List<Map<String, Object>> find(String metaTable, Map<String, Object> condition) {
CacheKey cacheKey = new CacheKey(metaTable, condition);
List<Map<String, Object>> cached = cache.getIfPresent(cacheKey);
if (cached != null) {
return cached;
}
QueryExample example = new QueryExample();
example.setParams(condition);
List<Map<String, Object>> ret = dataClientService.queryByExample(metaTable, example);
cache.put(cacheKey, ret);
return ret;
}

@Data
private static final class CacheKey {
private final String metaTable;
private final Map<String, Object> condition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class AggTaskExecutor {
private final AggTaskState state;
private final CompletenessService completenessService;
private final AsyncOutput output;
private final AggMetaService aggMetaService;

/**
* AggTaskExecutor should ignore data whose aligned ts < ignoredMinWatermark
Expand All @@ -82,10 +83,11 @@ public class AggTaskExecutor {
transient XAggTask lastUsedAggTask;

public AggTaskExecutor(AggTaskState state, CompletenessService completenessService,
AsyncOutput output) {
AsyncOutput output, AggMetaService aggMetaService) {
this.state = Objects.requireNonNull(state);
this.completenessService = Objects.requireNonNull(completenessService);
this.output = Objects.requireNonNull(output);
this.aggMetaService = Objects.requireNonNull(aggMetaService);
}

/**
Expand All @@ -100,17 +102,16 @@ public void process(XAggTask latestAggTask, AggProtos.AggTaskValue aggTaskValue)
processCompletenessInfo(latestAggTask, aggTaskValue);
break;
default:
processData(latestAggTask, aggTaskValue);
processData2(latestAggTask, aggTaskValue);
if (aggTaskValue.hasDataTable()) {
processData2(latestAggTask, aggTaskValue);
} else {
processData(latestAggTask, aggTaskValue);
}
break;
}
}

private void processData2(XAggTask latestAggTask, AggProtos.AggTaskValue aggTaskValue) {
if (!aggTaskValue.hasDataTable()) {
return;
}

long now = System.currentTimeMillis();
AggWindowState lastWindowState = null;
AggProtos.Table table = aggTaskValue.getDataTable();
Expand Down Expand Up @@ -171,10 +172,8 @@ private void processData2(XAggTask latestAggTask, AggProtos.AggTaskValue aggTask
CompletenessUtils.processCompletenessInfoInData(w, da);

// decide which group to use
Group g = w.getOrCreateGroup(da, this::onGroupCreate);
Group g = w.getOrCreateGroup(da, aggMetaService, this::onGroupCreate);
if (g == null) {
w.getStat().incDiscardKeyLimit();
log.error("[agg] [{}] group key limit exceeded", key());
continue;
}

Expand Down Expand Up @@ -248,9 +247,8 @@ private void processData(XAggTask latestAggTask, AggProtos.AggTaskValue aggTaskV
CompletenessUtils.processCompletenessInfoInData(w, da);

// decide which group to use
Group g = w.getOrCreateGroup(da, this::onGroupCreate);
Group g = w.getOrCreateGroup(da, aggMetaService, this::onGroupCreate);
if (g == null) {
w.getStat().incDiscardKeyLimit();
continue;
}

Expand Down
Loading

0 comments on commit 93b546c

Please sign in to comment.