Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync skywalking upstream version and reduce Cassandra storage size #3586

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion skywalking
Submodule skywalking updated 156 files
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static void start() {
RunningMode.setMode(mode);

ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
ModuleManager manager = new ModuleManager();
ModuleManager manager = new ModuleManager("Zipkin Server");
try {
ApplicationConfiguration applicationConfiguration = configLoader.load();
manager.init(applicationConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class CassandraClient implements Client {
static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);

public static final String RECORD_UNIQUE_UUID_COLUMN = "uuid_unique";
public static final String ZIPKIN_SPAN_ANNOTATION_QUERY_COLUMN = "annotation_query";

private final CassandraConfig config;
private final DelegatedHealthChecker healthChecker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
package zipkin.server.storage.cassandra.dao;

import com.datastax.oss.driver.api.core.cql.Row;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.model.SQLDatabaseModelExtension;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInstaller;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.TableHelper;
Expand All @@ -44,8 +44,10 @@
import java.util.stream.Stream;

import static org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInstaller.ID_COLUMN;
import static zipkin2.internal.RecyclableBuffers.SHORT_STRING_LENGTH;

public class CassandraCqlExecutor {
private static final JsonObject EMPTY_JSON_OBJECT = new JsonObject();

protected <T extends StorageData> List<StorageData> getByIDs(CassandraClient client,
String modelName,
Expand Down Expand Up @@ -74,20 +76,6 @@ protected <T extends StorageData>CQLExecutor getInsertExecutor(Model model, T me
});
CQLExecutor sqlExecutor = buildInsertExecutor(
model, metrics, timeBucket, mainEntity, callback);
//build additional table cql
for (final SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension().getAdditionalTables().values()) {
Map<String, Object> additionalEntity = new HashMap<>();
additionalTable.getColumns().forEach(column -> {
additionalEntity.put(column.getColumnName().getName(), objectMap.get(column.getColumnName().getName()));
});

List<CQLExecutor> additionalSQLExecutors = buildAdditionalInsertExecutor(
model, additionalTable.getName(), additionalTable.getColumns(), metrics,
timeBucket, additionalEntity, callback
);
sqlExecutor.appendAdditionalCQLs(additionalSQLExecutors);
}

// extension the span tables for query
if (metrics instanceof ZipkinSpanRecord) {
sqlExecutor.appendAdditionalCQLs(CassandraTableExtension.buildExtensionsForSpan((ZipkinSpanRecord) metrics, callback));
Expand Down Expand Up @@ -125,64 +113,6 @@ protected <T extends StorageData> CQLExecutor getUpdateExecutor(Model model, T m
return new CQLExecutor(sqlBuilder.toString(), param, callback, null);
}

private <T extends StorageData> List<CQLExecutor> buildAdditionalInsertExecutor(Model model, String tableName,
List<ModelColumn> columns,
T metrics,
long timeBucket,
Map<String, Object> objectMap,
SessionCacheCallback callback) {

List<CQLExecutor> sqlExecutors = new ArrayList<>();
List<String> columnNames = new ArrayList<>();
List<String> values = new ArrayList<>();
List<Object> param = new ArrayList<>();
final StringBuilder cqlBuilder = new StringBuilder("INSERT INTO ").append(tableName);

int position = 0;
List valueList = new ArrayList();
for (int i = 0; i < columns.size(); i++) {
ModelColumn column = columns.get(i);
if (List.class.isAssignableFrom(column.getType())) {
valueList = (List) objectMap.get(column.getColumnName().getName());

columnNames.add(column.getColumnName().getStorageName());
values.add("?");
param.add(null);

position = i + 1;
} else {
columnNames.add(column.getColumnName().getStorageName());
values.add("?");

Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataComplexObject) {
param.add(((StorageDataComplexObject) value).toStorageData());
} else {
param.add(value);
}
}
}

cqlBuilder.append("(").append(columnNames.stream().collect(Collectors.joining(", "))).append(")")
.append(" VALUES (").append(values.stream().collect(Collectors.joining(", "))).append(")");
String sql = cqlBuilder.toString();
if (!CollectionUtils.isEmpty(valueList)) {
for (Object object : valueList) {
List<Object> paramCopy = new ArrayList<>(param);
paramCopy.set(position - 1, object);
sqlExecutors.add(new CQLExecutor(sql, paramCopy, callback, null));
}
} else {
// if not query data, then ignore the data insert
if ("zipkin_query".equals(tableName)) {
return sqlExecutors;
}
sqlExecutors.add(new CQLExecutor(sql, param, callback, null));
}

return sqlExecutors;
}

private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
T metrics,
long timeBucket,
Expand All @@ -199,8 +129,9 @@ private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
.map(ModelColumn::getColumnName)
.map(ColumnName::getStorageName))
.collect(Collectors.toList());
if (model.isRecord()) {
if (metrics instanceof ZipkinSpanRecord) {
columnNames.add(CassandraClient.RECORD_UNIQUE_UUID_COLUMN);
columnNames.add(CassandraClient.ZIPKIN_SPAN_ANNOTATION_QUERY_COLUMN);
}
cqlBuilder.append(columnNames.stream().collect(Collectors.joining(",", "(", ")")));
cqlBuilder.append(" VALUES ");
Expand All @@ -220,8 +151,9 @@ private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
return it;
}))
.collect(Collectors.toList());
if (model.isRecord()) {
if (metrics instanceof ZipkinSpanRecord) {
params.add(UUID.randomUUID().toString());
params.add(annotationQuery((ZipkinSpanRecord) metrics));
}

return new CQLExecutor(cqlBuilder.toString(), params, onCompleteCallback, null);
Expand All @@ -242,4 +174,35 @@ private static String getModelTables(CassandraClient client, String modelName) {
final Model model = TableMetaInfo.get(modelName);
return TableHelper.getTableName(model);
}

private String annotationQuery(ZipkinSpanRecord span) {
final JsonObject annotation = jsonCheck(span.getAnnotations());
final JsonObject tags = jsonCheck(span.getTags());
if (annotation.size() == 0 && tags.size() == 0) return null;

char delimiter = '░'; // as very unlikely to be in the query
StringBuilder result = new StringBuilder().append(delimiter);
for (Map.Entry<String, JsonElement> annotationEntry : annotation.entrySet()) {
final String annotationValue = annotationEntry.getValue().getAsString();
if (annotationValue.length() > SHORT_STRING_LENGTH) continue;

result.append(annotationValue).append(delimiter);
}

for (Map.Entry<String, JsonElement> tagEntry : tags.entrySet()) {
final String tagValue = tagEntry.getValue().getAsString();
if (tagValue.length() > SHORT_STRING_LENGTH) continue;

result.append(tagEntry.getKey()).append(delimiter); // search is possible by key alone
result.append(tagEntry.getKey()).append('=').append(tagValue).append(delimiter);
}
return result.length() == 1 ? null : result.toString();
}

private JsonObject jsonCheck(JsonObject json) {
if (json == null) {
json = EMPTY_JSON_OBJECT;
}
return json;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public List<List<Span>> getTraces(QueryRequest request, Duration duration) throw
if (CollectionUtils.isNotEmpty(request.annotationQuery())) {
for (Map.Entry<String, String> entry : request.annotationQuery().entrySet()) {
completionTraceIds.add(traceIDByAnnotationQueryExecutor.asyncGet(
entry.getValue().isEmpty() ? entry.getKey() : entry.getKey() + "=" + entry.getValue(),
duration.getStartTimeBucket(), duration.getEndTimeBucket()
request.serviceName(), entry.getValue().isEmpty() ? entry.getKey() : entry.getKey() + "=" + entry.getValue(),
duration.getStartTimestamp() * 1000, duration.getEndTimestamp() * 1000, request.limit()
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package zipkin.server.storage.cassandra.dao.executor;

import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import zipkin.server.storage.cassandra.CassandraClient;
import zipkin.server.storage.cassandra.CassandraTableHelper;

Expand All @@ -23,19 +24,28 @@

public class TraceIDByAnnotationQueryExecutor extends BaseQueryExecutor {
private final Query<String> query;
private final Query<String> queryWithService;
public TraceIDByAnnotationQueryExecutor(CassandraClient client, CassandraTableHelper tableHelper) {
super(client, tableHelper);
this.query = buildQuery(
() -> "select " + ZipkinSpanRecord.TRACE_ID +
" from " + ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE +
" where " + ZipkinSpanRecord.QUERY + " = ?" +
" and " + ZipkinSpanRecord.TIME_BUCKET + " >= ?" +
" and " + ZipkinSpanRecord.TIME_BUCKET + " <= ?",
String querySuffix = "annotation_query LIKE ?"
+ " AND " + ZipkinSpanRecord.TIMESTAMP + ">=?"
+ " AND " + ZipkinSpanRecord.TIMESTAMP + "<=?"
+ " LIMIT ?"
+ " ALLOW FILTERING";

this.query = buildQuery(() -> "select trace_id from " + ZipkinSpanRecord.INDEX_NAME + " where " + querySuffix,
row -> row.getString(ZipkinSpanRecord.TRACE_ID)
);
this.queryWithService = buildQuery(() -> "select trace_id from " + ZipkinSpanRecord.INDEX_NAME + " where " +
ZipkinSpanRecord.LOCAL_ENDPOINT_SERVICE_NAME + " = ? and " + querySuffix,
row -> row.getString(ZipkinSpanRecord.TRACE_ID)
);
}

public CompletionStage<List<String>> asyncGet(String query, long startTimeBucket, long endTimeBucket) {
return executeAsync(this.query, query, startTimeBucket, endTimeBucket);
public CompletionStage<List<String>> asyncGet(String serviceName, String query, long startTimeBucket, long endTimeBucket, int size) {
if (StringUtil.isNotEmpty(serviceName)) {
return executeAsync(this.queryWithService, serviceName, query, startTimeBucket, endTimeBucket, size);
}
return executeAsync(this.query, query, startTimeBucket, endTimeBucket, size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS zipkin_span (
debug INT,
shared INT,
uuid_unique text,
annotation_query text,
PRIMARY KEY(trace_id, uuid_unique)
)
WITH CLUSTERING ORDER BY (uuid_unique DESC)
Expand All @@ -34,6 +35,15 @@ CREATE TABLE IF NOT EXISTS zipkin_span (
AND speculative_retry = '95percentile'
AND comment = 'Primary table for holding trace data';

CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin_span (local_endpoint_service_name) USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {'mode': 'PREFIX'};
CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin_span (annotation_query) USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {
'mode': 'PREFIX',
'analyzed': 'true',
'analyzer_class':'org.apache.cassandra.index.sasi.analyzer.DelimiterAnalyzer',
'delimiter': '░'};

CREATE TABLE IF NOT EXISTS zipkin_service_relation_traffic (
id text,
table_name text,
Expand Down Expand Up @@ -83,21 +93,6 @@ CREATE TABLE IF NOT EXISTS zipkin_service_traffic (
AND speculative_retry = '95percentile'
AND comment = 'Secondary table for looking up all services';

CREATE TABLE IF NOT EXISTS zipkin_query (
trace_id text,
query text,
time_bucket BIGINT,
PRIMARY KEY(query, time_bucket, trace_id)
)
WITH CLUSTERING ORDER BY (time_bucket DESC, trace_id DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
AND default_time_to_live = 604800
AND gc_grace_seconds = 3600
AND read_repair_chance = 0
AND dclocal_read_repair_chance = 0
AND speculative_retry = '95percentile'
AND comment = 'Secondary table for looking up traces by annotation query';

CREATE TABLE IF NOT EXISTS tag_autocomplete (
id text,
table_name text,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ITCassandraStorage {
@RegisterExtension
CassandraExtension cassandra = new CassandraExtension();

private final ModuleManager moduleManager = new ModuleManager();
private final ModuleManager moduleManager = new ModuleManager("Zipkin Server");
private SpanForwardService forward;
private ITagAutoCompleteQueryDAO tagAutoCompleteQueryDAO;
private IZipkinQueryDAO zipkinQueryDAO;
Expand Down Expand Up @@ -92,6 +92,7 @@ public void test() throws InterruptedException, IOException {
// search traces
final QueryRequest query = QueryRequest.newBuilder()
.lookback(86400000L)
.serviceName("frontend")
.endTs(System.currentTimeMillis())
.minDuration(1000L)
.annotationQuery(Collections.singletonMap("http.path", "/api"))
Expand Down
Loading