Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sql query #606

Merged
merged 2 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.holoinsight.server.extension.ceresdbx;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -18,6 +19,9 @@

import io.ceresdb.common.parser.SqlParser;
import io.ceresdb.common.parser.SqlParserFactoryProvider;
import io.holoinsight.server.common.J;
import io.holoinsight.server.extension.model.DetailResult;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -57,6 +61,7 @@
* @author jiwliu
* @date 2023/1/13
*/
@Slf4j
public class CeresdbxMetricStorage implements MetricStorage {

public static final int DEFAULT_BATCH_RECORDS = 200;
Expand Down Expand Up @@ -214,6 +219,85 @@ public List<Result> pqlRangeQuery(PqlParam pqlParam) {
return pqlQueryService.queryRange(pqlParam);
}

@Override
public DetailResult queryDetail(QueryParam queryParam) {
String sql = queryParam.getQl();
List<String> tables = null;
if (StringUtils.isNotBlank(sql)) {
SqlParser parser = SqlParserFactoryProvider.getSqlParserFactory().getParser(sql);
tables = parser.tableNames();
} else {
log.warn("sql is empty for {}", J.toJson(queryParam));
return DetailResult.empty();
}
if (CollectionUtils.isEmpty(tables)) {
log.warn("tables is empty for {}", J.toJson(queryParam));
return DetailResult.empty();
}
LOGGER.info("queryDetail queryParam:{}, sql:{}", J.toJson(queryParam), sql);
final SqlQueryRequest queryRequest =
SqlQueryRequest.newBuilder().forTables(tables.toArray(new String[0])).sql(sql).build();
long begin = System.currentTimeMillis();
try {
CompletableFuture<io.ceresdb.models.Result<SqlQueryOk, Err>> qf = Context.ROOT.call(
() -> ceresdbxClientManager.getClient(queryParam.getTenant()).sqlQuery(queryRequest));
final io.ceresdb.models.Result<SqlQueryOk, Err> qr = qf.get();
if (!qr.isOk()) {
LOGGER.error("[CERESDBX_QUERY_DETAIL] failed to exec sql:{}, qr:{}, error:{}", sql, qr,
qr.getErr().getError());
throw new RuntimeException(qr.getErr().getError());
}
final List<Row> rows = qr.getOk().getRowList();
if (CollectionUtils.isEmpty(rows)) {
return DetailResult.empty();
}
String[] header = getHeader(rows.get(0));
return transToDetailResult(queryParam, header, rows, tables);
} catch (Exception e) {
LOGGER.error("[CERESDBX_QUERY_DETAIL] failed to exec sql:{}, cost:{}", sql,
System.currentTimeMillis() - begin, e);
return DetailResult.empty();
}
}

private DetailResult transToDetailResult(QueryParam queryParam, String[] header, List<Row> rows,
List<String> tables) {
DetailResult detailResult = DetailResult.empty();
if (header == null || header.length == 0) {
return detailResult;
}
detailResult.setHeaders(Arrays.asList(header));
detailResult.setSql(queryParam.getQl());
detailResult.setTables(tables);
for (Row row : rows) {
DetailResult.DetailRow detailRow = new DetailResult.DetailRow();
for (String name : header) {
Column column = row.getColumn(name);
Value value = column.getValue();
switch (value.getDataType()) {
case String:
detailRow.addStringValue(value.getString());
break;
case Timestamp:
detailRow.addTimestampValue(value.getTimestamp());
break;
case Boolean:
detailRow.addBooleanValue(value.getBoolean());
break;
case Varbinary:
log.info("reject unknown data type {}", value.getObject());
break;
default:
detailRow.addNumValue(value.getObject());
break;
}
}
detailResult.add(detailRow);
}
log.info("detailResult result {} rows {}", J.toJson(detailResult), J.toJson(rows));
return detailResult;
}

private void doBatchInsert(Set<String> metrics, String tenant,
List<io.ceresdb.models.Point> oneBatch) {
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -307,6 +391,9 @@ private Map<Map<String, String>, Result> getTagsWithResults(QueryParam queryPara
}
tags.put(name, column.getValue().getString());
}
if (point.getTimestamp() == null) {
point.setTimestamp(0L);
}
Result existResult = tagsToResult.get(tags);
if (existResult != null) {
existResult.getPoints().add(point);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package io.holoinsight.server.extension;

import java.util.List;
import java.util.Map;

import io.holoinsight.server.extension.model.DetailResult;
import io.holoinsight.server.extension.model.QueryMetricsParam;
import io.holoinsight.server.extension.model.QueryResult.Result;
import io.holoinsight.server.extension.model.WriteMetricsParam;
Expand Down Expand Up @@ -35,4 +35,6 @@ public interface MetricStorage {

List<Result> pqlRangeQuery(PqlParam pqlParam);

DetailResult queryDetail(QueryParam queryParam);

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.Map;

import io.holoinsight.server.extension.model.DetailResult;
import io.holoinsight.server.extension.model.PqlParam;
import io.holoinsight.server.extension.model.QueryMetricsParam;
import io.holoinsight.server.extension.model.QueryParam;
Expand Down Expand Up @@ -60,4 +61,9 @@ public List<QueryResult.Result> pqlInstantQuery(PqlParam pqlParam) {
public List<QueryResult.Result> pqlRangeQuery(PqlParam pqlParam) {
return Collections.emptyList();
}

@Override
public DetailResult queryDetail(QueryParam queryParam) {
return DetailResult.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.extension.model;

import lombok.Data;

import java.util.ArrayList;
import java.util.List;

/**
* @author masaimu
* @date 2023/8/11
*/
@Data
public class DetailResult {

List<String> tables;
String sql;
List<String> headers;
List<DetailRow> points;

public static DetailResult empty() {
return new DetailResult();
}

public boolean isEmpty() {
return this.points == null || this.points.size() == 0;
}

public void add(DetailRow detailRow) {
if (points == null) {
points = new ArrayList<>();
}
points.add(detailRow);
}

@Data
public static class DetailRow {
List<Value> values = new ArrayList<>();

public void addStringValue(String string) {
Value value = new Value(DetailDataType.String, string);
values.add(value);
}

public void addTimestampValue(long timestamp) {
Value value = new Value(DetailDataType.Timestamp, timestamp);
values.add(value);
}

public void addBooleanValue(boolean aBoolean) {
Value value = new Value(DetailDataType.Boolean, aBoolean);
values.add(value);
}

public void addNumValue(Object object) {
if (object instanceof Number) {
Value value = new Value(DetailDataType.Double, ((Number) object).doubleValue());
values.add(value);
}
}
}

@Data
public static class Value {
private final DetailDataType type;
private final Object value;
}

public enum DetailDataType {
String(String.class), //
Boolean(Boolean.class), //
Double(Double.class), //
Timestamp(Long.class); //

private final Class<?> javaType;

DetailDataType(Class<?> javaType) {
this.javaType = javaType;
}

public Class<?> getJavaType() {
return javaType;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.holoinsight.server.apm.common.model.specification.sw.Trace;
import io.holoinsight.server.common.J;
import io.holoinsight.server.home.common.service.query.KeyResult;
import io.holoinsight.server.home.common.service.query.QueryDetailResponse;
import io.holoinsight.server.home.common.service.query.QueryDetailResponse.DetailResult;
import io.holoinsight.server.home.common.service.query.QueryResponse;
import io.holoinsight.server.home.common.service.query.QuerySchemaResponse;
import io.holoinsight.server.home.common.service.query.Result;
Expand Down Expand Up @@ -89,6 +91,70 @@ public QueryResponse query(QueryProto.QueryRequest request) {
return response;
}

public QueryDetailResponse queryDetail(QueryProto.QueryRequest request) {
Debugger.print("QueryService", "query, request: " + J.toJson(request));
long start = System.currentTimeMillis();

QueryProto.QueryDetailResponse res = queryServiceBlockingStub.queryDetailData(request);

log.info("step4 result {}", J.toJson(res));

int pointSize = getPointSizeFromResp(res);
log.info("HOME_QUERY_STAT from[API_DETAIL] invoke[1], cost[{}], pointSize[{}]",
System.currentTimeMillis() - start, pointSize);

Debugger.print("QueryService", "query, response: " + J.toJson(res));

QueryDetailResponse response = new QueryDetailResponse();
List<DetailResult> results = new ArrayList<>();

Map<String, Datasource> map = new HashMap<>();
for (Datasource datasource : request.getDatasourcesList()) {
map.put(datasource.getMetric(), datasource);
}

for (QueryProto.DetailResult result : res.getResultsList()) {
DetailResult r = new DetailResult();
r.setTables(result.getTablesList());
r.setSql(result.getSql());
r.setHeaders(result.getHeadersList());

List<Object[]> values = new ArrayList<>();
if (CollectionUtils.isEmpty(r.getHeaders())) {
log.warn("headers is empty for {}", J.toJson(result));
continue;
}
int column = r.getHeaders().size();
for (QueryProto.DetailRow row : result.getRowsList()) {
Object[] value = new Object[column];
for (int i = 0; i < column; i++) {
QueryProto.DetailValue detailValue = row.getValues(i);
switch (detailValue.getType()) {
case "String":
value[i] = detailValue.getStrValue();
break;
case "Timestamp":
value[i] = detailValue.getTimestampValue();
break;
case "Double":
value[i] = detailValue.getDoubleValue();
break;
case "Boolean":
value[i] = detailValue.getBoolValue();
break;
}
}
values.add(value);
}
r.setValues(values);
results.add(r);
}

response.setResults(results);

return response;
}

public QueryResponse queryTags(QueryProto.QueryRequest request) {
Debugger.print("QueryService", "queryTags, request: " + J.toJson(request));

Expand Down Expand Up @@ -394,4 +460,19 @@ private int getPointSizeFromResp(QueryProto.QueryResponse response) {
}
return size;
}

private int getPointSizeFromResp(QueryProto.QueryDetailResponse response) {
int size = 0;
if (response == null || response.getResultsCount() == 0) {
return size;
}

List<QueryProto.DetailResult> results = response.getResultsList();
for (QueryProto.DetailResult result : results) {
for (QueryProto.DetailRow row : result.getRowsList()) {
size += row.getValuesCount();
}
}
return size;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.home.common.service.query;

import lombok.Data;
import java.util.List;

/**
* @author masaimu
* @version 2023-08-15 12:57:00
*/
@Data
public class QueryDetailResponse {

private List<DetailResult> results;

@Data
public static class DetailResult {
private List<String> tables;
private String sql;
private List<String> headers;
private List<Object[]> values;

}
}
Loading