Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance the query performance and storage size of the Cassandra storage in v3 server #3584

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.internal.core.auth.ProgrammaticPlainTextAuthProvider;
Expand Down Expand Up @@ -77,11 +79,23 @@ public int getDefaultTtl(String table) {
}

public <T> List<T> executeQuery(String cql, ResultHandler<T> resultHandler, Object... params) {
return executeQuery(cqlSession.prepare(cql), resultHandler, params);
}

public <T> CompletionStage<List<T>> executeAsyncQuery(String cql, ResultHandler<T> resultHandler, Object... params) {
return executeAsyncQuery(cqlSession.prepare(cql), resultHandler, params);
}

public PreparedStatement prepare(String cql) {
return cqlSession.prepare(cql);
}

public <T> List<T> executeQuery(PreparedStatement statement, ResultHandler<T> resultHandler, Object... params) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing CQL: {}", cql);
LOG.debug("Executing CQL: {}", statement.getQuery());
LOG.debug("CQL parameters: {}", Arrays.toString(params));
}
final BoundStatement stmt = cqlSession.prepare(cql).bind(params);
final BoundStatement stmt = statement.bind(params);
final ResultSet resultSet = cqlSession.execute(stmt);
healthChecker.health();
if (resultHandler != null) {
Expand All @@ -91,12 +105,23 @@ public <T> List<T> executeQuery(String cql, ResultHandler<T> resultHandler, Obje
return null;
}

public <T> CompletionStage<List<T>> executeAsyncQuery(String cql, ResultHandler<T> resultHandler, Object... params) {
public <T> CompletionStage<List<T>> executeAsyncQuery(PreparedStatement statement, ResultHandler<T> resultHandler, Object... params) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing CQL: {}", cql);
LOG.debug("Executing CQL: {}", statement.getQuery());
LOG.debug("CQL parameters: {}", Arrays.toString(params));
}
final BoundStatement stmt = cqlSession.prepare(cql).bind(params);
final BoundStatement stmt = statement.bind(params);
return executeAsyncQuery0(stmt, resultHandler);
}

public <T> CompletionStage<List<T>> executeAsyncQueryWithCustomBind(PreparedStatement original, Statement statement, ResultHandler<T> resultHandler) {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing Custom Bind CQL: {}", original.getQuery());
}
return executeAsyncQuery0(statement, resultHandler);
}

private <T> CompletionStage<List<T>> executeAsyncQuery0(Statement stmt, ResultHandler<T> resultHandler) {
final CompletionStage<AsyncResultSet> resultSet = cqlSession.executeAsync(stmt);
healthChecker.health();
if (resultHandler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

package zipkin.server.storage.cassandra;

import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.zipkin.ZipkinSpanRecord;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInstaller;

public class CassandraTableInstaller implements ModelCreator.CreatingListener {
private final CassandraClient client;
Expand All @@ -43,6 +46,17 @@ public void start() {

@Override
public void whenCreating(Model model) throws StorageException {
if (ZipkinSpanRecord.INDEX_NAME.equals(model.getName())) {
// remove unnecessary columns
for (int i = model.getColumns().size() - 1; i >= 0; i--) {
final String columnName = model.getColumns().get(i).getColumnName().getStorageName();
if (StorageData.TIME_BUCKET.equals(columnName) ||
ZipkinSpanRecord.TIMESTAMP_MILLIS.equals(columnName) ||
JDBCTableInstaller.TABLE_COLUMN.equals(columnName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CassandraTableInstaller is referring to the JDBC implementation? Where does this TABLE COLUMN come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CassandraTableInstaller is referring to the JDBC implementation?

Nope, For now, the table installer only caches the table metadata.

Where does this TABLE COLUMN come from?

There are definitions in skywalking core that are not necessary for the query.

model.getColumns().remove(i);
}
}
}
TableMetaInfo.addModel(model);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package zipkin.server.storage.cassandra.dao;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
Expand All @@ -29,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

public class CassandraBatchDAO implements IBatchDAO {
static final Logger LOG = LoggerFactory.getLogger(CassandraBatchDAO.class);
Expand All @@ -37,6 +39,8 @@ public class CassandraBatchDAO implements IBatchDAO {
private final DataCarrier<PrepareRequest> dataCarrier;
private final int maxBatchSqlSize;

private static final ConcurrentHashMap<String, PreparedStatement> PREPARED_STATEMENT_MAP = new ConcurrentHashMap<>();

public CassandraBatchDAO(CassandraClient client, int maxBatchCqlSize, int asyncBatchPersistentPoolSize) {
this.client = client;
String name = "CASSANDRA_ASYNCHRONOUS_BATCH_PERSISTENT";
Expand Down Expand Up @@ -76,7 +80,7 @@ public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
}

final long start = System.currentTimeMillis();
boolean success = true;
final boolean isInsert = cqls.get(0) instanceof InsertRequest;
try {
for (PrepareRequest cql : cqls) {
final CQLExecutor executor = (CQLExecutor) cql;
Expand All @@ -85,28 +89,26 @@ public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
LOG.debug("CQL parameters: {}", executor.getParams());
}

final BoundStatement stmt = client.getSession().prepare(executor.getCql())
.bind(((CQLExecutor) cql).getParams().toArray());
client.getSession().execute(stmt);
final PreparedStatement stmt = PREPARED_STATEMENT_MAP.computeIfAbsent(executor.getCql(), e -> client.getSession().prepare(e));

final BoundStatement boundStatement = stmt.bind(((CQLExecutor) cql).getParams().toArray());
client.getSession().executeAsync(boundStatement).thenAccept(result -> {
if (isInsert) {
((InsertRequest) executor).onInsertCompleted();
} else if (!result.wasApplied()) {
((UpdateRequest) executor).onUpdateFailure();
};
});
}
} catch (Exception e) {
// Just to avoid one execution failure makes the rest of batch failure.
LOG.error(e.getMessage(), e);
success = false;
}

final boolean isInsert = cqls.get(0) instanceof InsertRequest;
for (PrepareRequest executor : cqls) {
if (isInsert) {
((InsertRequest) executor).onInsertCompleted();
} else if (!success) {
((UpdateRequest) executor).onUpdateFailure();
}
}
if (LOG.isDebugEnabled()) {
long end = System.currentTimeMillis();
long cost = end - start;
LOG.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}, cost:{}ms", prepareRequests.size(), maxBatchSqlSize, cost);
LOG.debug("execute sync sql statements done, data size: {}, maxBatchSqlSize: {}, cost:{}ms", prepareRequests.size(), maxBatchSqlSize, cost);
}
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package zipkin.server.storage.cassandra.dao;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.Row;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.StorageData;
Expand All @@ -35,6 +34,8 @@
import zipkin.server.storage.cassandra.CassandraClient;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -137,10 +138,6 @@ private <T extends StorageData> List<CQLExecutor> buildAdditionalInsertExecutor(
List<Object> param = new ArrayList<>();
final StringBuilder cqlBuilder = new StringBuilder("INSERT INTO ").append(tableName);

columnNames.add(ID_COLUMN);
values.add("?");
param.add(TableHelper.generateId(model, metrics.id().build()));

int position = 0;
List valueList = new ArrayList();
for (int i = 0; i < columns.size(); i++) {
Expand Down Expand Up @@ -172,7 +169,7 @@ private <T extends StorageData> List<CQLExecutor> buildAdditionalInsertExecutor(
if (!CollectionUtils.isEmpty(valueList)) {
for (Object object : valueList) {
List<Object> paramCopy = new ArrayList<>(param);
paramCopy.set(position, object);
paramCopy.set(position - 1, object);
sqlExecutors.add(new CQLExecutor(sql, paramCopy, callback, null));
}
} else {
Expand All @@ -196,7 +193,7 @@ private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
final List<ModelColumn> columns = model.getColumns();
final List<String> columnNames =
Stream.concat(
Stream.of(ID_COLUMN, JDBCTableInstaller.TABLE_COLUMN),
(model.isRecord() ? Collections.<String>emptyList() : Arrays.asList(ID_COLUMN, JDBCTableInstaller.TABLE_COLUMN)).stream(),
columns
.stream()
.map(ModelColumn::getColumnName)
Expand All @@ -210,7 +207,7 @@ private <T extends StorageData> CQLExecutor buildInsertExecutor(Model model,
cqlBuilder.append(columnNames.stream().map(it -> "?").collect(Collectors.joining(",", "(", ")")));

final List<Object> params = Stream.concat(
Stream.of(TableHelper.generateId(model, metrics.id().build()), model.getName()),
(model.isRecord() ? Collections.<String>emptyList() : Arrays.asList(TableHelper.generateId(model, metrics.id().build()), model.getName())).stream(),
columns
.stream()
.map(ModelColumn::getColumnName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static List<CQLExecutor> buildExtensionsForSpan(ZipkinSpanRecord span, Se

public static int durationIndexBucket(long ts_milli) {
// if the window constant has microsecond precision, the division produces negative getValues
return (int) (ts_milli / (DURATION_INDEX_BUCKET_WINDOW_SECONDS));
return (int) (ts_milli / (DURATION_INDEX_BUCKET_WINDOW_SECONDS)) / 1000;
}

private static CQLExecutor buildServiceSpan(String service, String span, int bucket, UUID ts, String trace_id, long durationMillis,
Expand Down
Loading
Loading