From 6affe013decf4682e51db3a0fab8f619be3dd746 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?= Date: Tue, 11 Nov 2025 15:48:57 +0000 Subject: [PATCH 1/2] CNDB-15946: Revamp SAI metrics for fetched/returned keys/partitions/rows/tombstones The metrics partitionsRead, rowsFiltered, rowsPreFiltered and shadowedPrimaryKeyCount, which dodn't always work as expected, are replaced by these metrics: * keysFetched: Number of partition/row keys that will be used to fetch rows from the base table. * partitionsFetched: Number of live partitions fetched from the base table, before post-filtering and sorting. Note that currently ANN fetches one partition per index key, without any grouping of same-partition clusterings. * partitionsReturned: Number of live partitions returned to the coordinator, after post-filtering and sorting. * partitionTombstonesFetched: Number of deleted partitions found when reading the base table. * rowsFetched: Number of live rows fetched from the base table, before post-filtering and sorting. * rowsReturned: Number of live rows returned to the coordinator, after post-filtering and sorting. * rowTombstonesFetched: Number deleted rows or row ranges found when reading the base table. StorageAttachedIndexSearcher is modified to use the command timestamp, rather than getting the current time everytime a query timestamp is needed, which was possibly buggy and inefficient. --- .../cassandra/index/sai/QueryContext.java | 154 ++- .../index/sai/metrics/TableQueryMetrics.java | 72 +- .../sai/plan/CountFetchedTransformation.java | 81 ++ .../sai/plan/CountReturnedTransformation.java | 77 ++ .../plan/QueryMonitorableExecutionInfo.java | 11 +- .../plan/StorageAttachedIndexSearcher.java | 75 +- .../test/sai/SlowSAIQueryLoggerTest.java | 118 +- .../cassandra/index/sai/QueryContextTest.java | 1100 +++++++++++++++++ .../index/sai/metrics/QueryMetricsTest.java | 118 +- 9 files changed, 1638 insertions(+), 168 deletions(-) create mode 100644 src/java/org/apache/cassandra/index/sai/plan/CountFetchedTransformation.java create mode 100644 src/java/org/apache/cassandra/index/sai/plan/CountReturnedTransformation.java create mode 100644 test/unit/org/apache/cassandra/index/sai/QueryContextTest.java diff --git a/src/java/org/apache/cassandra/index/sai/QueryContext.java b/src/java/org/apache/cassandra/index/sai/QueryContext.java index 341555ceeddd..aa78a6985f1f 100644 --- a/src/java/org/apache/cassandra/index/sai/QueryContext.java +++ b/src/java/org/apache/cassandra/index/sai/QueryContext.java @@ -44,9 +44,31 @@ public class QueryContext private final LongAdder sstablesHit = new LongAdder(); private final LongAdder segmentsHit = new LongAdder(); - private final LongAdder partitionsRead = new LongAdder(); - private final LongAdder rowsPreFiltered = new LongAdder(); - private final LongAdder rowsFiltered = new LongAdder(); + + /** + * The partition/row keys that will be used to fetch rows from the base table. + * They will be either partition keys in AA, or row keys in the later row-aware disk formats. + */ + private final LongAdder keysFetched = new LongAdder(); + + /** The number of live partitions fetched from the storage engine, before post-filtering. */ + private final LongAdder partitionsFetched = new LongAdder(); + + /** The number of live partitions returned to the coordinator, after post-filtering. */ + private final LongAdder partitionsReturned = new LongAdder(); + + /** The number of deleted partitions that are fetched. */ + private final LongAdder partitionTombstonesFetched = new LongAdder(); + + /** The number of live rows fetched from the storage engine, before post-filtering. */ + private final LongAdder rowsFetched = new LongAdder(); + + /** The number of live rows returned to the coordinator, after post-filtering. */ + private final LongAdder rowsReturned = new LongAdder(); + + /** The number of deleted individual rows or ranges of rows that are fetched. */ + private final LongAdder rowTombstonesFetched = new LongAdder(); + private final LongAdder trieSegmentsHit = new LongAdder(); private final LongAdder bkdPostingListsHit = new LongAdder(); @@ -64,8 +86,6 @@ public class QueryContext private float annRerankFloor = 0.0f; // only called from single-threaded setup code - private final LongAdder shadowedPrimaryKeyCount = new LongAdder(); - // Determines the order of using indexes for filtering and sorting. // Null means the query execution order hasn't been decided yet. private FilterSortOrder filterSortOrder = null; @@ -92,49 +112,81 @@ public void addSstablesHit(long val) { sstablesHit.add(val); } + public void addSegmentsHit(long val) { segmentsHit.add(val); } - public void addPartitionsRead(long val) + + public void addKeysFetched(long val) + { + keysFetched.add(val); + } + + public void addPartitionsFetched(long val) + { + partitionsFetched.add(val); + } + + public void addPartitionsReturned(long val) { - partitionsRead.add(val); + partitionsReturned.add(val); } - public void addRowsFiltered(long val) + + public void addPartitionTombstonesFetched(long val) { - rowsFiltered.add(val); + partitionTombstonesFetched.add(val); } - public void addRowsPreFiltered(long val) + + public void addRowsFetched(long val) { - rowsPreFiltered.add(val); + rowsFetched.add(val); } + + public void addRowsReturned(long val) + { + rowsReturned.add(val); + } + + public void addRowTombstonesFetched(long val) + { + rowTombstonesFetched.add(val); + } + public void addTrieSegmentsHit(long val) { trieSegmentsHit.add(val); } + public void addBkdPostingListsHit(long val) { bkdPostingListsHit.add(val); } + public void addBkdSegmentsHit(long val) { bkdSegmentsHit.add(val); } + public void addBkdPostingsSkips(long val) { bkdPostingsSkips.add(val); } + public void addBkdPostingsDecodes(long val) { bkdPostingsDecodes.add(val); } + public void addTriePostingsSkips(long val) { triePostingsSkips.add(val); } + public void addTriePostingsDecodes(long val) { triePostingsDecodes.add(val); } + public void addQueryTimeouts(long val) { queryTimeouts.add(val); @@ -156,53 +208,86 @@ public long sstablesHit() { return sstablesHit.longValue(); } + public long segmentsHit() { return segmentsHit.longValue(); } - public long partitionsRead() + + public long keysFetched() { - return partitionsRead.longValue(); + return keysFetched.longValue(); } - public long rowsFiltered() + + public long partitionsFetched() { - return rowsFiltered.longValue(); + return partitionsFetched.longValue(); } - public long rowsPreFiltered() + + public long partitionsReturned() + { + return partitionsReturned.longValue(); + } + + public long partitionTombstonesFetched() + { + return partitionTombstonesFetched.longValue(); + } + + public long rowsFetched() + { + return rowsFetched.longValue(); + } + + public long rowsReturned() + { + return rowsReturned.longValue(); + } + + public long rowTombstonesFetched() { - return rowsPreFiltered.longValue(); + return rowTombstonesFetched.longValue(); } + public long trieSegmentsHit() { return trieSegmentsHit.longValue(); } + public long bkdPostingListsHit() { return bkdPostingListsHit.longValue(); } + public long bkdSegmentsHit() { return bkdSegmentsHit.longValue(); } + public long bkdPostingsSkips() { return bkdPostingsSkips.longValue(); } + public long bkdPostingsDecodes() { return bkdPostingsDecodes.longValue(); } + public long triePostingsSkips() { return triePostingsSkips.longValue(); } + public long triePostingsDecodes() { return triePostingsDecodes.longValue(); } + public long queryTimeouts() { return queryTimeouts.longValue(); } + public long annGraphSearchLatency() { return annGraphSearchLatency.longValue(); @@ -222,19 +307,6 @@ public void checkpoint() } } - public void addShadowed(long count) - { - shadowedPrimaryKeyCount.add(count); - } - - /** - * @return shadowed primary keys, in ascending order - */ - public long getShadowedPrimaryKeyCount() - { - return shadowedPrimaryKeyCount.longValue(); - } - public float getAnnRerankFloor() { return annRerankFloor; @@ -277,9 +349,13 @@ public static class Snapshot public final long totalQueryTimeNs; public final long sstablesHit; public final long segmentsHit; - public final long partitionsRead; - public final long rowsFiltered; - public final long rowsPreFiltered; + public final long keysFetched; + public final long partitionsFetched; + public final long partitionsReturned; + public final long partitionTombstonesFetched; + public final long rowsFetched; + public final long rowsReturned; + public final long rowTombstonesFetched; public final long trieSegmentsHit; public final long bkdPostingListsHit; public final long bkdSegmentsHit; @@ -289,7 +365,6 @@ public static class Snapshot public final long triePostingsDecodes; public final long queryTimeouts; public final long annGraphSearchLatency; - public final long shadowedPrimaryKeyCount; public final FilterSortOrder filterSortOrder; /** @@ -302,9 +377,13 @@ private Snapshot(QueryContext context) totalQueryTimeNs = context.totalQueryTimeNs(); sstablesHit = context.sstablesHit(); segmentsHit = context.segmentsHit(); - partitionsRead = context.partitionsRead(); - rowsFiltered = context.rowsFiltered(); - rowsPreFiltered = context.rowsPreFiltered(); + keysFetched = context.keysFetched(); + partitionsFetched = context.partitionsFetched(); + partitionsReturned = context.partitionsReturned(); + partitionTombstonesFetched = context.partitionTombstonesFetched(); + rowsFetched = context.rowsFetched(); + rowsReturned = context.rowsReturned(); + rowTombstonesFetched = context.rowTombstonesFetched(); trieSegmentsHit = context.trieSegmentsHit(); bkdPostingListsHit = context.bkdPostingListsHit(); bkdSegmentsHit = context.bkdSegmentsHit(); @@ -314,7 +393,6 @@ private Snapshot(QueryContext context) triePostingsDecodes = context.triePostingsDecodes(); queryTimeouts = context.queryTimeouts(); annGraphSearchLatency = context.annGraphSearchLatency(); - shadowedPrimaryKeyCount = context.getShadowedPrimaryKeyCount(); filterSortOrder = context.filterSortOrder(); } } diff --git a/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java b/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java index b21bc09dd758..e94c00d6e975 100644 --- a/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java +++ b/src/java/org/apache/cassandra/index/sai/metrics/TableQueryMetrics.java @@ -121,9 +121,9 @@ public void record(QueryContext context, ReadCommand command) "post-filtered {} in {}, and took {} microseconds.", pluralize(snapshot.sstablesHit, "SSTable index", "es"), pluralize(snapshot.segmentsHit, "segment", "s"), - pluralize(snapshot.rowsPreFiltered, "row", "s"), - pluralize(snapshot.rowsFiltered, "row", "s"), - pluralize(snapshot.partitionsRead, "partition", "s"), + pluralize(snapshot.rowsFetched, "row", "s"), + pluralize(snapshot.rowsReturned, "row", "s"), + pluralize(snapshot.partitionsReturned, "partition", "s"), queryLatencyMicros); } else @@ -132,8 +132,8 @@ public void record(QueryContext context, ReadCommand command) "and took {} microseconds.", pluralize(snapshot.sstablesHit, "SSTable index", "es"), pluralize(snapshot.segmentsHit, "segment", "s"), - pluralize(snapshot.rowsFiltered, "row", "s"), - pluralize(snapshot.partitionsRead, "partition", "s"), + pluralize(snapshot.rowsReturned, "row", "s"), + pluralize(snapshot.partitionsReturned, "partition", "s"), queryLatencyMicros); } } @@ -190,8 +190,13 @@ public static class PerTable extends AbstractQueryMetrics public static final String METRIC_TYPE = "TableQueryMetrics"; public final Counter totalQueryTimeouts; - public final Counter totalPartitionReads; - public final Counter totalRowsFiltered; + public final Counter totalKeysFetched; + public final Counter totalPartitionsFetched; + public final Counter totalPartitionsReturned; + public final Counter totalPartitionTombstonesFetched; + public final Counter totalRowsFetched; + public final Counter totalRowsReturned; + public final Counter totalRowTombstonesFetched; public final Counter totalQueriesCompleted; public final Counter sortThenFilterQueriesCompleted; @@ -206,8 +211,13 @@ public PerTable(TableMetadata table, QueryKind queryKind, Predicate { super(table.keyspace, table.name, METRIC_TYPE, queryKind, filter); - totalPartitionReads = Metrics.counter(createMetricName("TotalPartitionReads")); - totalRowsFiltered = Metrics.counter(createMetricName("TotalRowsFiltered")); + totalKeysFetched = Metrics.counter(createMetricName("TotalKeysFetched")); + totalPartitionsFetched = Metrics.counter(createMetricName("TotalPartitionsFetched")); + totalPartitionsReturned = Metrics.counter(createMetricName("TotalPartitionsReturned")); + totalPartitionTombstonesFetched = Metrics.counter(createMetricName("TotalPartitionTombstonesFetched")); + totalRowsFetched = Metrics.counter(createMetricName("TotalRowsFetched")); + totalRowsReturned = Metrics.counter(createMetricName("TotalRowsReturned")); + totalRowTombstonesFetched = Metrics.counter(createMetricName("TotalRowTombstonesFetched")); totalQueriesCompleted = Metrics.counter(createMetricName("TotalQueriesCompleted")); totalQueryTimeouts = Metrics.counter(createMetricName("TotalQueryTimeouts")); @@ -225,8 +235,13 @@ public void record(QueryContext.Snapshot snapshot) } totalQueriesCompleted.inc(); - totalPartitionReads.inc(snapshot.partitionsRead); - totalRowsFiltered.inc(snapshot.rowsFiltered); + totalKeysFetched.inc(snapshot.keysFetched); + totalPartitionsFetched.inc(snapshot.partitionsFetched); + totalPartitionsReturned.inc(snapshot.partitionsReturned); + totalPartitionTombstonesFetched.inc(snapshot.partitionTombstonesFetched); + totalRowsFetched.inc(snapshot.rowsFetched); + totalRowsReturned.inc(snapshot.rowsReturned); + totalRowTombstonesFetched.inc(snapshot.rowTombstonesFetched); if (snapshot.filterSortOrder == QueryContext.FilterSortOrder.SCAN_THEN_FILTER) sortThenFilterQueriesCompleted.inc(); @@ -249,8 +264,13 @@ public static class PerQuery extends AbstractQueryMetrics */ public final Histogram sstablesHit; public final Histogram segmentsHit; - public final Histogram partitionReads; - public final Histogram rowsFiltered; + public final Histogram keysFetched; + public final Histogram partitionsFetched; + public final Histogram partitionsReturned; + public final Histogram partitionTombstonesFetched; + public final Histogram rowsFetched; + public final Histogram rowsReturned; + public final Histogram rowTombstonesFetched; /** * BKD index metrics. @@ -262,9 +282,6 @@ public static class PerQuery extends AbstractQueryMetrics public final Histogram kdTreePostingsSkips; public final Histogram kdTreePostingsDecodes; - /** Shadowed keys scan metrics **/ - public final Histogram shadowedKeysScannedHistogram; - /** * Trie index posting lists metrics. */ @@ -289,6 +306,13 @@ public PerQuery(TableMetadata table, QueryKind queryKind, Predicate sstablesHit = Metrics.histogram(createMetricName("SSTableIndexesHit"), false); segmentsHit = Metrics.histogram(createMetricName("IndexSegmentsHit"), false); + keysFetched = Metrics.histogram(createMetricName("KeysFetched"), false); + partitionsFetched = Metrics.histogram(createMetricName("PartitionsFetched"), false); + partitionsReturned = Metrics.histogram(createMetricName("PartitionsReturned"), false); + partitionTombstonesFetched = Metrics.histogram(createMetricName("PartitionTombstonesFetched"), false); + rowsFetched = Metrics.histogram(createMetricName("RowsFetched"), false); + rowsReturned = Metrics.histogram(createMetricName("RowsReturned"), false); + rowTombstonesFetched = Metrics.histogram(createMetricName("RowTombstonesFetched"), false); kdTreePostingsSkips = Metrics.histogram(createMetricName("KDTreePostingsSkips"), true); kdTreePostingsNumPostings = Metrics.histogram(createMetricName("KDTreePostingsNumPostings"), false); @@ -297,11 +321,6 @@ public PerQuery(TableMetadata table, QueryKind queryKind, Predicate postingsSkips = Metrics.histogram(createMetricName("PostingsSkips"), true); postingsDecodes = Metrics.histogram(createMetricName("PostingsDecodes"), false); - partitionReads = Metrics.histogram(createMetricName("PartitionReads"), false); - rowsFiltered = Metrics.histogram(createMetricName("RowsFiltered"), false); - - shadowedKeysScannedHistogram = Metrics.histogram(createMetricName("ShadowedKeysScannedHistogram"), false); - // Key vector metrics that translate to performance annGraphSearchLatency = Metrics.timer(createMetricName("ANNGraphSearchLatency")); } @@ -312,8 +331,13 @@ public void record(QueryContext.Snapshot snapshot) queryLatency.update(snapshot.totalQueryTimeNs, TimeUnit.NANOSECONDS); sstablesHit.update(snapshot.sstablesHit); segmentsHit.update(snapshot.segmentsHit); - partitionReads.update(snapshot.partitionsRead); - rowsFiltered.update(snapshot.rowsFiltered); + keysFetched.update(snapshot.keysFetched); + partitionsFetched.update(snapshot.partitionsFetched); + partitionsReturned.update(snapshot.partitionsReturned); + partitionTombstonesFetched.update(snapshot.partitionTombstonesFetched); + rowsFetched.update(snapshot.rowsFetched); + rowsReturned.update(snapshot.rowsReturned); + rowTombstonesFetched.update(snapshot.rowTombstonesFetched); // Record string index cache metrics. if (snapshot.trieSegmentsHit > 0) @@ -338,8 +362,6 @@ public void record(QueryContext.Snapshot snapshot) { annGraphSearchLatency.update(snapshot.annGraphSearchLatency, TimeUnit.NANOSECONDS); } - - shadowedKeysScannedHistogram.update(snapshot.shadowedPrimaryKeyCount); } } } diff --git a/src/java/org/apache/cassandra/index/sai/plan/CountFetchedTransformation.java b/src/java/org/apache/cassandra/index/sai/plan/CountFetchedTransformation.java new file mode 100644 index 000000000000..c0cdef99d255 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/plan/CountFetchedTransformation.java @@ -0,0 +1,81 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.plan; + +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.index.sai.QueryContext; + +/** + * Counts the number of partitions, rows and tombstones fetched by an index query, before post-filtering and sorting. + */ +class CountFetchedTransformation extends Transformation +{ + private final QueryContext queryContext; + private final int nowInSec; + + CountFetchedTransformation(QueryContext queryContext, int nowInSec) + { + this.queryContext = queryContext; + this.nowInSec = nowInSec; + } + + /** + * Updates the query context metrics about the number of fetched partitions, rows and tombstones + * with the contents of the provided partition iterator. + * + * @param partition the results of querying the base table with the indexed keys, before applying post-filtering and sorting + * @return a copy of the provided row iterator, which will populate the query context as it is consumed + */ + UnfilteredRowIterator apply(UnfilteredRowIterator partition) + { + return Transformation.apply(partition, this); + } + + @Override + protected DeletionTime applyToDeletion(DeletionTime deletionTime) + { + queryContext.checkpoint(); + if (deletionTime.deletes(nowInSec)) + queryContext.addPartitionTombstonesFetched(1); + else + queryContext.addPartitionsFetched(1); + return deletionTime; + } + + @Override + protected Row applyToRow(Row row) + { + queryContext.checkpoint(); + if (row.hasLiveData(nowInSec, false)) + queryContext.addRowsFetched(1); + else + queryContext.addRowTombstonesFetched(1); + return row; + } + + @Override + protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + queryContext.checkpoint(); + queryContext.addRowTombstonesFetched(1); + return marker; + } +} diff --git a/src/java/org/apache/cassandra/index/sai/plan/CountReturnedTransformation.java b/src/java/org/apache/cassandra/index/sai/plan/CountReturnedTransformation.java new file mode 100644 index 000000000000..d442e93a4b84 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/plan/CountReturnedTransformation.java @@ -0,0 +1,77 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.plan; + +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.index.sai.QueryContext; + +/** + * Counts the final number of partitions and rows returned by a query to the coordinator, after post-filtering and sorting. + * Tombstones are not counted because they are not returned to the coordinator. + */ +class CountReturnedTransformation extends Transformation +{ + private final QueryContext queryContext; + private final Runnable onClose; + private final Transformation rowCounter; + + private CountReturnedTransformation(QueryContext queryContext, Runnable onClose) + { + this.queryContext = queryContext; + this.onClose = onClose; + rowCounter = new Transformation<>() { + @Override + protected Row applyToRow(Row row) + { + queryContext.checkpoint(); + queryContext.addRowsReturned(1); + return row; + } + }; + } + + /** + * Updates the query context metrics about the number of partitions and rows returned to the coordinator + * with the contents of the provided partition iterator. + * + * @param partition the partition iterator containing the final results to return to the coordinator + * @param queryContext the query context to update with the metrics + * @param onClose a callback to run when the transformation is closed + * @return a copy of the provided partition iterator, which will populate the query context as it is consumed + */ + static UnfilteredPartitionIterator apply(UnfilteredPartitionIterator partition, QueryContext queryContext, Runnable onClose) + { + return Transformation.apply(partition, new CountReturnedTransformation(queryContext, onClose)); + } + + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + queryContext.checkpoint(); + queryContext.addPartitionsReturned(1); + return Transformation.apply(partition, rowCounter); + } + + @Override + protected void onClose() + { + onClose.run(); + } +} diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java b/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java index b072825286ee..3a7d87113d46 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryMonitorableExecutionInfo.java @@ -72,9 +72,13 @@ public String toLogString(boolean unique) sb.append(sectionNamePrefix).append("metrics:\n"); appendMetric(sb, "sstablesHit", metrics.sstablesHit); appendMetric(sb, "segmentsHit", metrics.segmentsHit); - appendMetric(sb, "partitionsRead", metrics.partitionsRead); - appendMetric(sb, "rowsFiltered", metrics.rowsFiltered); - appendMetric(sb, "rowsPreFiltered", metrics.rowsPreFiltered); + appendMetric(sb, "keysFetched", metrics.keysFetched); + appendMetric(sb, "partitionsFetched", metrics.partitionsFetched); + appendMetric(sb, "partitionsReturned", metrics.partitionsReturned); + appendMetric(sb, "partitionTombstonesFetched", metrics.partitionTombstonesFetched); + appendMetric(sb, "rowsFetched", metrics.rowsFetched); + appendMetric(sb, "rowsReturned", metrics.rowsReturned); + appendMetric(sb, "rowTombstonesFetched", metrics.rowTombstonesFetched); appendMetric(sb, "trieSegmentsHit", metrics.trieSegmentsHit); appendMetric(sb, "bkdPostingListsHit", metrics.bkdPostingListsHit); appendMetric(sb, "bkdSegmentsHit", metrics.bkdSegmentsHit); @@ -83,7 +87,6 @@ public String toLogString(boolean unique) appendMetric(sb, "triePostingsSkips", metrics.triePostingsSkips); appendMetric(sb, "triePostingsDecodes", metrics.triePostingsDecodes); appendMetric(sb, "annGraphSearchLatencyNanos", metrics.annGraphSearchLatency); - appendMetric(sb, "shadowedPrimaryKeyCount", metrics.shadowedPrimaryKeyCount); // append the plan sb.append(sectionNamePrefix).append("plan:\n").append(plan); diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index 36f13f7f748c..7c9bba9d5359 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -76,7 +76,6 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.btree.BTree; public class StorageAttachedIndexSearcher implements Index.Searcher @@ -117,6 +116,12 @@ public ReadCommand command() return command; } + @VisibleForTesting + public QueryContext queryContext() + { + return queryContext; + } + @VisibleForTesting public final Set plannedIndexes() { @@ -162,19 +167,25 @@ public UnfilteredPartitionIterator search(ReadExecutionController executionContr Iterator keysIterator = controller.buildIterator(plan); // Can't check for `command.isTopK()` because the planner could optimize sorting out + UnfilteredPartitionIterator result; Orderer ordering = plan.ordering(); if (ordering == null) { assert keysIterator instanceof KeyRangeIterator; - return new ResultRetriever((KeyRangeIterator) keysIterator, filterTree, controller, executionController, queryContext); + result = new ResultRetriever((KeyRangeIterator) keysIterator, filterTree, controller, executionController, queryContext); } - - assert !(keysIterator instanceof KeyRangeIterator); - var scoredKeysIterator = (CloseableIterator) keysIterator; - var result = new ScoreOrderedResultRetriever(scoredKeysIterator, filterTree, controller, - executionController, queryContext, command.limits().count(), - ordering.context.getDefinition()); - return new TopKProcessor(command).filter(result); + else + { + assert !(keysIterator instanceof KeyRangeIterator); + var scoredKeysIterator = (CloseableIterator) keysIterator; + var retriever = new ScoreOrderedResultRetriever(scoredKeysIterator, filterTree, controller, + executionController, queryContext, + command.nowInSec(), + command.limits().count(), + ordering.context.getDefinition()); + result = new TopKProcessor(command).filter(retriever); + } + return CountReturnedTransformation.apply(result, queryContext, controller::finish); } catch (QueryView.Builder.MissingIndexException e) { @@ -237,6 +248,7 @@ private class ResultRetriever extends AbstractIterator im private final QueryContext queryContext; private final PrimaryKey.Factory keyFactory; private final int partitionRowBatchSize; + private final CountFetchedTransformation fetchedRowsCounter; private PrimaryKey lastKey; @@ -255,6 +267,7 @@ private ResultRetriever(KeyRangeIterator operation, this.executionController = executionController; this.queryContext = queryContext; this.keyFactory = controller.primaryKeyFactory(); + this.fetchedRowsCounter = new CountFetchedTransformation(queryContext, command.nowInSec()); this.firstPrimaryKey = controller.firstPrimaryKey(); @@ -510,9 +523,10 @@ public void close() public UnfilteredRowIterator apply(List keys) { UnfilteredRowIterator partition = controller.getPartition(keys, executionController); - queryContext.addPartitionsRead(1); + UnfilteredRowIterator counted = fetchedRowsCounter.apply(partition); queryContext.checkpoint(); - return applyIndexFilter(partition, filterTree, queryContext); + queryContext.addKeysFetched(keys.size()); + return applyIndexFilter(counted, filterTree); } @Override @@ -525,7 +539,6 @@ public TableMetadata metadata() public void close() { FileUtils.closeQuietly(operation); - controller.finish(); } } @@ -549,6 +562,8 @@ public static class ScoreOrderedResultRetriever extends AbstractIterator processedKeys; private final Queue pendingRows; @@ -569,6 +584,7 @@ private ScoreOrderedResultRetriever(CloseableIterator sco QueryController controller, ReadExecutionController executionController, QueryContext queryContext, + int nowInSec, int limit, ColumnMetadata orderedColumn) { @@ -582,6 +598,8 @@ private ScoreOrderedResultRetriever(CloseableIterator sco this.controller = controller; this.executionController = executionController; this.queryContext = queryContext; + this.nowInSec = nowInSec; + this.fetchedRowsCounter = new CountFetchedTransformation(queryContext, nowInSec); this.processedKeys = new HashSet<>(limit); this.pendingRows = new ArrayDeque<>(limit); @@ -718,14 +736,15 @@ public UnfilteredRowIterator readAndValidatePartition(PrimaryKey pk, List BB.queryDelay.updateAndGet(x -> x / 4)); // restore the query delay // disable execution info logging and verify they are not logged @@ -353,6 +380,17 @@ public void testSlowSAIQueryLogger() throws Throwable assertLogsContain(mark, node, "was slow 2 times", "WHERE n = ?", "SAI slowest query metrics:"); assertLogsContain(mark, node, "was slow 3 times", "WHERE n > ?", "SAI slowest query metrics:"); assertLogsDoNotContain(mark, node, "WHERE n = 1", "WHERE n = 2", "WHERE n > 1", "WHERE n > 2", "WHERE n > 3"); + + // test some partition and row deletions + coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k = 1 AND c = 1"), ConsistencyLevel.ONE); + coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k = 1 AND c = 2"), ConsistencyLevel.ONE); + coordinator.execute(withKeyspace("DELETE FROM %s.t WHERE k = 2"), ConsistencyLevel.ONE); + node.flush(KEYSPACE); + String selectAllQuery = withKeyspace("SELECT * FROM %s.t WHERE n >= 0"); + coordinator.execute(selectAllQuery, ConsistencyLevel.ONE); + assertLogsContain(mark, node, + "partitionTombstonesFetched: 1", + "rowTombstonesFetched: 2"); } } diff --git a/test/unit/org/apache/cassandra/index/sai/QueryContextTest.java b/test/unit/org/apache/cassandra/index/sai/QueryContextTest.java new file mode 100644 index 000000000000..68fbf17a03a5 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/QueryContextTest.java @@ -0,0 +1,1100 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.index.sai.disk.format.Version; +import org.apache.cassandra.index.sai.plan.StorageAttachedIndexQueryPlan; +import org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher; + +import static org.junit.Assert.assertEquals; + +public class QueryContextTest extends SAITester.Versioned +{ + @Test + public void testSkinnyTable() + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)"); + createIndex("CREATE CUSTOM INDEX ON %s(a) USING 'StorageAttachedIndex'"); + execute("INSERT INTO %s (k, a, b) VALUES (0, 0, 0)"); + execute("INSERT INTO %s (k, a, b) VALUES (1, 1, 1)"); + execute("INSERT INTO %s (k, a, b) VALUES (2, 0, 0)"); + execute("INSERT INTO %s (k, a, b) VALUES (3, 1, 1)"); + flush(); + QueryContext.Snapshot snapshot; + + // index filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(1, 1, 1), + row(2, 0, 0), + row(3, 1, 1)); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(4, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts no rows + snapshot = queryContext("SELECT * FROM %s WHERE a < 0 ALLOW FILTERING"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0)); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows, different value + snapshot = queryContext("SELECT * FROM %s WHERE a = 1 ALLOW FILTERING", + row(1, 1, 1), + row(3, 1, 1)); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 AND b = 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0)); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts no rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 AND b = 1 ALLOW FILTERING"); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts some rows + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND b = 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0)); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition/primary key query + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k = 0 ALLOW FILTERING", + row(0, 0, 0)); + assertEquals(1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(1, snapshot.rowsFetched); + assertEquals(1, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition/primary key filtering + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k != 1 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0), + row(3, 1, 1)); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(3, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete a partition/row + execute("DELETE FROM %s WHERE k = 1"); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0), + row(3, 1, 1)); + assertEquals(4, snapshot.keysFetched); + assertEquals(3, snapshot.partitionsFetched); + assertEquals(3, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(3, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete an indexed cell + execute("DELETE a FROM %s WHERE k = 2"); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(3, 1, 1)); + assertEquals(4, snapshot.keysFetched); + assertEquals(3, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(3, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // compact to rebuild the index, and verify that tombstones are gone + flush(); + compact(); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(3, 1, 1)); + assertEquals(2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // truncate the table + truncate(false); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // insert some data using TTLs + execute("INSERT INTO %s (k, a, b) VALUES (0, 0, 0)"); + execute("INSERT INTO %s (k, a, b) VALUES (1, 1, 1) USING TTL 1"); + execute("INSERT INTO %s (k, a, b) VALUES (2, 0, 0)"); + execute("INSERT INTO %s (k, a, b) VALUES (3, 1, 1) USING TTL 1"); + flush(); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0), + row(2, 0, 0)); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + } + + @Test + public void testWideTableWithoutStatics() + { + createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY KEY(k, c))"); + createIndex("CREATE CUSTOM INDEX ON %s(a) USING 'StorageAttachedIndex'"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 3, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 3, 1, 1)"); + flush(); + QueryContext.Snapshot snapshot; + + // index filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0", + row(0, 0, 0, 0), + row(0, 1, 1, 1), + row(0, 2, 0, 0), + row(0, 3, 1, 1), + row(1, 0, 0, 0), + row(1, 1, 1, 1), + row(1, 2, 0, 0), + row(1, 3, 1, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(8, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts no rows + snapshot = queryContext("SELECT * FROM %s WHERE a < 0"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows, different value + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + snapshot = queryContext("SELECT * FROM %s WHERE a = 1", + row(0, 1, 1, 1), + row(0, 3, 1, 1), + row(1, 1, 1, 1), + row(1, 3, 1, 1)); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 AND b >= 0 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts no rows + snapshot = queryContext("SELECT * FROM %s WHERE a = 0 AND b < 0 ALLOW FILTERING"); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts some rows + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND b = 0 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key query + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k = 0 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 1, 1, 1), + row(0, 2, 0, 0), + row(0, 3, 1, 1)); + assertEquals(isRowAware() ? 4 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // primary key query + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k = 0 AND c IN (0, 2) ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0)); + assertEquals(isRowAware() ? 2 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key filtering + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND k != 1 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 1, 1, 1), + row(0, 2, 0, 0), + row(0, 3, 1, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // clustering key filtering + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 AND c != 1 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(0, 3, 1, 1), + row(1, 0, 0, 0), + row(1, 2, 0, 0), + row(1, 3, 1, 1)); + assertEquals(isRowAware() ? 6 : 2, snapshot.keysFetched); // the clustering key filter is applied to indexed keys + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(6, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete a row + execute("DELETE FROM %s WHERE k = 0 AND c = 0"); + snapshot = queryContext("SELECT * FROM %s WHERE a = 0", + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 3 : 7, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a cell + execute("DELETE a FROM %s WHERE k = 1 AND c = 0"); + snapshot = queryContext("SELECT * FROM %s WHERE a = 0", + row(0, 2, 0, 0), + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 3 : 7, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a partition + execute("DELETE FROM %s WHERE k = 0"); + snapshot = queryContext("SELECT * FROM %s WHERE a = 0", + row(1, 2, 0, 0)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 2 : 4, snapshot.rowsFetched); + assertEquals(1, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete all the rows in a partition + execute("DELETE FROM %s WHERE k = 1 AND c = 0"); + execute("DELETE FROM %s WHERE k = 1 AND c = 2"); + snapshot = queryContext("SELECT * FROM %s WHERE a = 0"); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 0 : 2, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + + // compact to rebuild the index, and verify that tombstones are gone + flush(); + compact(); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0", + row(1, 1, 1, 1), + row(1, 3, 1, 1)); + assertEquals(isRowAware() ? 2 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(isRowAware() ? 0 : 2, snapshot.rowTombstonesFetched); + + // truncate the table + truncate(false); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // insert some data using TTLs + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 1, 1) USING TTL 1"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 2, 0, 0) USING TTL 1"); + flush(); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING", + row(0, 0, 0, 0), + row(0, 2, 0, 0), + row(1, 0, 0, 0), + row(1, 1, 1, 1)); + assertEquals(isRowAware() ? 6 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + } + + @Test + public void testWideTableScoreOrdered() + { + Assume.assumeTrue(version.onOrAfter(Version.JVECTOR_EARLIEST)); + + createTable("CREATE TABLE %s (k int, c int, n int, v vector, PRIMARY KEY(k, c))"); + createIndex("CREATE CUSTOM INDEX ON %s(n) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex' WITH OPTIONS = {'similarity_function' : 'euclidean'}"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 0, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 1, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 2, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 3, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 0, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 1, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 2, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 3, 1, [1, 1])"); + flush(); + QueryContext.Snapshot snapshot; + + // index filtering that accepts all rows + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(0, 0, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(8, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts limited rows + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 4", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(0, 0, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0))); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows, different value + snapshot = queryContext("SELECT * FROM %s WHERE n = 1 ORDER BY v ANN OF [0, 0] LIMIT 10 ALLOW FILTERING", + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key query + snapshot = queryContext("SELECT * FROM %s WHERE k = 1 ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1))); + assertEquals(4, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete a row + execute("DELETE FROM %s WHERE k = 0 AND c = 0"); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(7, snapshot.rowsFetched); + assertEquals(7, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a cell + execute("DELETE v FROM %s WHERE k = 1 AND c = 0"); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 2, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(7, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a partition + execute("DELETE FROM %s WHERE k = 0"); + flush(); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 2, 0, vector(0, 0)), + row(1, 1, 1, vector(1, 1)), + row(1, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(4, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete all the rows in a partition with a range tombstone + execute("DELETE FROM %s WHERE k = 1 AND c >= 1"); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10"); + assertEquals(8, snapshot.keysFetched); + assertEquals(4, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(4, snapshot.partitionTombstonesFetched); + assertEquals(1, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(6, snapshot.rowTombstonesFetched); // 3 index entries with start/end bounds each + + // compact to rebuild the index, and verify that tombstones are gone + flush(); + compact(); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // truncate the table + truncate(false); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // insert some data using TTLs + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 0, 0, [0, 0]) USING TTL 1"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 1, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 2, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (0, 3, 1, [1, 1])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 0, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 1, 1, [1, 1]) USING TTL 1"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 2, 0, [0, 0])"); + execute("INSERT INTO %s (k, c, n, v) VALUES (1, 3, 1, [1, 1])"); + flush(); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + snapshot = queryContext("SELECT * FROM %s ORDER BY v ANN OF [0, 0] LIMIT 10", + row(1, 0, 0, vector(0, 0)), + row(1, 2, 0, vector(0, 0)), + row(0, 2, 0, vector(0, 0)), + row(1, 3, 1, vector(1, 1)), + row(0, 1, 1, vector(1, 1)), + row(0, 3, 1, vector(1, 1))); + assertEquals(8, snapshot.keysFetched); + assertEquals(8, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(6, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + } + + @Test + public void testWideTableWithStatics() + { + createTable("CREATE TABLE %s (k int, c int, a int, b int, s int static, PRIMARY KEY(k, c))"); + createIndex("CREATE CUSTOM INDEX ON %s(a) USING 'StorageAttachedIndex'"); + execute("INSERT INTO %s (k, c, a, b, s) VALUES (0, 0, 0, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 3, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b, s) VALUES (1, 0, 0, 0, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 3, 1, 1)"); + flush(); + QueryContext.Snapshot snapshot; + + // index filtering that accepts all rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0), + row(1, 0, 0, 0, 1), + row(1, 1, 1, 1, 1), + row(1, 2, 0, 0, 1), + row(1, 3, 1, 1, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(8, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts no rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a < 0"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a = 0", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0), + row(1, 0, 0, 0, 1), + row(1, 2, 0, 0, 1)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // index filtering that accepts some, different value + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a = 1", + row(0, 1, 1, 1, 0), + row(0, 3, 1, 1, 0), + row(1, 1, 1, 1, 1), + row(1, 3, 1, 1, 1)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts all rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a = 0 AND b >= 0 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0), + row(1, 0, 0, 0, 1), + row(1, 2, 0, 0, 1)); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts no rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a = 0 AND b < 0 ALLOW FILTERING"); + assertEquals(isRowAware() ? 4 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 4 : 8, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // not-indexed column filtering that accepts some rows + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND b = 0 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0), + row(1, 0, 0, 0, 1), + row(1, 2, 0, 0, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key query + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND k = 0 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0)); + assertEquals(isRowAware() ? 4 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // primary key query + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND k = 0 AND c IN (0, 2) ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0)); + assertEquals(isRowAware() ? 2 : 1, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(2, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // partition key filtering + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND k != 1 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // clustering key filtering + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND c != 1 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0), + row(1, 0, 0, 0, 1), + row(1, 2, 0, 0, 1), + row(1, 3, 1, 1, 1)); + assertEquals(isRowAware() ? 6 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(6, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // static row filtering + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND s != 1 ALLOW FILTERING", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // static row cell deletion + execute("DELETE s FROM %s WHERE k = 1"); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(0, 0, 0, 0, 0), + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0), + row(1, 0, 0, 0, null), + row(1, 1, 1, 1, null), + row(1, 2, 0, 0, null), + row(1, 3, 1, 1, null)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(8, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // static row cell filtering using the deleted value + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0 AND s = 1 ALLOW FILTERING"); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(8, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete a row + execute("DELETE FROM %s WHERE k = 0 AND c = 0"); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(0, 1, 1, 1, 0), + row(0, 2, 0, 0, 0), + row(0, 3, 1, 1, 0), + row(1, 0, 0, 0, null), + row(1, 1, 1, 1, null), + row(1, 2, 0, 0, null), + row(1, 3, 1, 1, null)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(7, snapshot.rowsFetched); + assertEquals(7, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + + // delete a partition + execute("DELETE FROM %s WHERE k = 0"); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(1, 0, 0, 0, null), + row(1, 1, 1, 1, null), + row(1, 2, 0, 0, null), + row(1, 3, 1, 1, null)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(4, snapshot.rowsFetched); + assertEquals(4, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // delete all the rows in a partition with a range tombstone + execute("DELETE FROM %s WHERE k = 1 AND c >= 0"); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0"); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(isRowAware() ? 8 : 2, snapshot.rowTombstonesFetched); + + // compact to rebuild the index, and verify that tombstones are gone + flush(); + compact(); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // truncate the table + truncate(false); + snapshot = queryContext("SELECT * FROM %s WHERE a >= 0 ALLOW FILTERING"); + assertEquals(0, snapshot.keysFetched); + assertEquals(0, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(0, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + + // insert some data using TTLs + execute("INSERT INTO %s (k, c, a, b, s) VALUES (0, 0, 0, 0, 0) USING TTL 1"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 2, 0, 0)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (0, 3, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b, s) VALUES (1, 0, 0, 0, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 2, 0, 0) USING TTL 1"); + execute("INSERT INTO %s (k, c, a, b) VALUES (1, 3, 1, 1)"); + flush(); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + snapshot = queryContext("SELECT k,c,a,b,s FROM %s WHERE a >= 0", + row(0, 1, 1, 1, null), + row(0, 2, 0, 0, null), + row(0, 3, 1, 1, null), + row(1, 0, 0, 0, 1), + row(1, 1, 1, 1, 1), + row(1, 3, 1, 1, 1)); + assertEquals(isRowAware() ? 8 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(6, snapshot.rowsFetched); + assertEquals(6, snapshot.rowsReturned); + assertEquals(2, snapshot.rowTombstonesFetched); + } + + @Test + public void testCollections() + { + createTable("CREATE TABLE %s (k int, c int, l list, s set, m map, PRIMARY KEY(k, c))"); + createIndex("CREATE CUSTOM INDEX ON %s(l) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(s) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(keys(m)) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(values(m)) USING 'StorageAttachedIndex'"); + createIndex("CREATE CUSTOM INDEX ON %s(entries(m)) USING 'StorageAttachedIndex'"); + execute("INSERT INTO %s (k, c, l, s, m) VALUES (1, 1, [1, 2, 3], {1, 2, 3}, {1:10, 2:20, 3:30})"); + execute("INSERT INTO %s (k, c, l, s, m) VALUES (1, 2, [2, 3, 4], {2, 3, 4}, {2:20, 3:30, 4:40})"); + execute("INSERT INTO %s (k, c, l, s, m) VALUES (2, 1, [3, 4, 5], {3, 4, 5}, {3:30, 4:40, 5:50})"); + execute("INSERT INTO %s (k, c, l, s, m) VALUES (2, 2, [5 ,6, 7], {5 ,6, 7}, {5:50, 6:60, 7:70})"); + flush(); + QueryContext.Snapshot snapshot; + + List queries = Arrays.asList("SELECT k, c FROM %s WHERE l CONTAINS 3", + "SELECT k, c FROM %s WHERE s CONTAINS 3", + "SELECT k, c FROM %s WHERE m CONTAINS KEY 3", + "SELECT k, c FROM %s WHERE m CONTAINS 30", + "SELECT k, c FROM %s WHERE m[3] = 30"); + + for (String query : queries) + { + snapshot = queryContext(query, row(1, 1), row(1, 2), row(2, 1)); + assertEquals(isRowAware() ? 3 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 3 : 4, snapshot.rowsFetched); + assertEquals(3, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + } + + // delete a cell + execute("UPDATE %s SET l = l - [3], s = s - {3}, m = m - {3} WHERE k = 1 AND c = 1"); + for (String query : queries) + { + snapshot = queryContext(query, row(1, 2), row(2, 1)); + assertEquals(isRowAware() ? 3 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(2, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 3 : 4, snapshot.rowsFetched); + assertEquals(2, snapshot.rowsReturned); + assertEquals(0, snapshot.rowTombstonesFetched); + } + + // delete a row + execute("DELETE FROM %s WHERE k = 1 AND c = 2"); + for (String query : queries) + { + snapshot = queryContext(query, row(2, 1)); + assertEquals(isRowAware() ? 3 : 2, snapshot.keysFetched); + assertEquals(2, snapshot.partitionsFetched); + assertEquals(1, snapshot.partitionsReturned); + assertEquals(0, snapshot.partitionTombstonesFetched); + assertEquals(isRowAware() ? 2 : 3, snapshot.rowsFetched); + assertEquals(1, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + } + + // delete a partition + execute("DELETE FROM %s WHERE k = 2"); + for (String query : queries) + { + snapshot = queryContext(query); + assertEquals(isRowAware() ? 3 : 2, snapshot.keysFetched); + assertEquals(1, snapshot.partitionsFetched); + assertEquals(0, snapshot.partitionsReturned); + assertEquals(1, snapshot.partitionTombstonesFetched); + assertEquals(1, snapshot.rowsFetched); + assertEquals(0, snapshot.rowsReturned); + assertEquals(1, snapshot.rowTombstonesFetched); + } + } + + private boolean isRowAware() + { + return version.after(Version.AA); + } + + private QueryContext.Snapshot queryContext(String query, Object[]... rows) + { + // First execute the query with the normal test path to validate the results + assertRowsIgnoringOrder(execute(query), rows); + + // Get an index searcher for the query + PartitionRangeReadCommand command = (PartitionRangeReadCommand) parseReadCommand(query); + StorageAttachedIndexQueryPlan plan = (StorageAttachedIndexQueryPlan) command.indexQueryPlan(); + Assert.assertNotNull(plan); + StorageAttachedIndexSearcher searcher = plan.searcherFor(command); + + // Execute the search for the query and consume the results to popupate the query context + try (ReadExecutionController executionController = command.executionController(); + UnfilteredPartitionIterator partitions = searcher.search(executionController)) + { + while (partitions.hasNext()) + { + try (UnfilteredRowIterator partition = partitions.next()) + { + while (partition.hasNext()) + { + partition.next(); + } + } + } + } + + // Return the query context snapshot, which should be populated + return searcher.queryContext().snapshot(); + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/metrics/QueryMetricsTest.java b/test/unit/org/apache/cassandra/index/sai/metrics/QueryMetricsTest.java index c93d38155559..b40c539e0303 100644 --- a/test/unit/org/apache/cassandra/index/sai/metrics/QueryMetricsTest.java +++ b/test/unit/org/apache/cassandra/index/sai/metrics/QueryMetricsTest.java @@ -238,7 +238,7 @@ public void testKDTreeQueryMetricsWithSingleIndex() waitForGreaterThanZero(objectNameNoIndex("QueryLatency", keyspace, table, PER_QUERY_METRIC_TYPE)); - waitForEquals(objectNameNoIndex("TotalPartitionReads", keyspace, table, TABLE_QUERY_METRIC_TYPE), resultCounter); + waitForEquals(objectNameNoIndex("TotalPartitionsFetched", keyspace, table, TABLE_QUERY_METRIC_TYPE), resultCounter); waitForEquals(objectName("KDTreeIntersectionLatency", keyspace, table, index, GLOBAL_METRIC_TYPE), queryCounter); } @@ -355,7 +355,7 @@ public void testInvertedIndexQueryMetricsWithSingleIndex() waitForGreaterThanZero(objectNameNoIndex("QueryLatency", keyspace, table, PER_QUERY_METRIC_TYPE)); - waitForEquals(objectNameNoIndex("TotalPartitionReads", keyspace, table, TABLE_QUERY_METRIC_TYPE), resultCounter); + waitForEquals(objectNameNoIndex("TotalPartitionsFetched", keyspace, table, TABLE_QUERY_METRIC_TYPE), resultCounter); } @Test @@ -384,9 +384,9 @@ public void testKDTreePartitionsReadAndRowsFiltered() assertEquals(3, actualRows); // This is 2 due to partition read batching. - waitForEquals(objectNameNoIndex("TotalPartitionReads", keyspace, table, TABLE_QUERY_METRIC_TYPE), 2); - waitForHistogramCountEquals(objectNameNoIndex("RowsFiltered", keyspace, table, PER_QUERY_METRIC_TYPE), 1); - waitForEquals(objectNameNoIndex("TotalRowsFiltered", keyspace, table, TABLE_QUERY_METRIC_TYPE), 3); + waitForEquals(objectNameNoIndex("TotalPartitionsFetched", keyspace, table, TABLE_QUERY_METRIC_TYPE), 2); + waitForHistogramCountEquals(objectNameNoIndex("RowsFetched", keyspace, table, PER_QUERY_METRIC_TYPE), 1); + waitForEquals(objectNameNoIndex("TotalRowsFetched", keyspace, table, TABLE_QUERY_METRIC_TYPE), 3); } @Test @@ -494,6 +494,14 @@ private void testQueryKindMetrics(boolean perTable, boolean perQuery) for (int c = 0; c < numRowsPerPartition; c++) execute("INSERT INTO %s (k, c, n, v) VALUES (?, ?, 1, [1, 1])", k, c); + // add a partition tombstone + execute("INSERT INTO %s (k, c, n, v) VALUES (?, ?, 1, [1, 1])", numPartitions, numRowsPerPartition); + execute("DELETE FROM %s WHERE k = ?", numPartitions); + + // add a row range tombstone + execute("INSERT INTO %s (k, c, n, v) VALUES (?, ?, 1, [1, 1])", numPartitions + 1, numRowsPerPartition); + execute("DELETE FROM %s WHERE k = ? AND c > 0", numPartitions + 1); + // filter query (goes to the general, filter and range query metrics) UntypedResultSet rows = execute("SELECT k, c FROM %s WHERE n = 1"); assertEquals(numRows, rows.size()); @@ -528,18 +536,58 @@ private void testQueryKindMetrics(boolean perTable, boolean perQuery) waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), 1); - // Verify counters for total partition reads. - name = "TotalPartitionReads"; - waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 1 + numPartitions + numRowsPerPartition + numRows + numRowsPerPartition + numRows); + // Verify counters for total keys fetched. + name = "TotalKeysFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 3L * (numRowsPerPartition + numRows + 2)); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numRows + 2); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numRows + 2); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numRows + 2); + + // Verify counters for total partitions fetched. + name = "TotalPartitionsFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 1 + numPartitions + 1 + numRowsPerPartition + numRows + 1 + numRowsPerPartition + numRows + 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), 1); - waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numPartitions); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numPartitions + 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), numRowsPerPartition); // single-partition top-k issues a partition access per each row - waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numRows); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numRows + 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), numRowsPerPartition); // single-partition top-k issues a partition access per each row + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numRows + 1); + + // Verify counters for total partitions returned. + name = "TotalPartitionsReturned"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 1 + numPartitions + 1 + numPartitions + 1 + numPartitions); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numPartitions); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numPartitions); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numPartitions); + + // Verify counters for total partition tombstones fetched. + name = "TotalPartitionTombstonesFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 3); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), 1); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), 1); + + // Verify counters for total rows fetched. + name = "TotalRowsFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), numRowsPerPartition + numRows + numRowsPerPartition + numRows + numRowsPerPartition + numRows); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numRows); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), numRowsPerPartition); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), numRows); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), numRowsPerPartition); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numRows); - // Verify counters for total rows filtered. - name = "TotalRowsFiltered"; + // Verify counters for total rows returned. + name = "TotalRowsReturned"; waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), numRowsPerPartition + numRows + numRowsPerPartition + numRows + numRowsPerPartition + numRows); waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), numRowsPerPartition); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), numRows); @@ -548,6 +596,16 @@ private void testQueryKindMetrics(boolean perTable, boolean perQuery) waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), numRowsPerPartition); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), numRows); + // Verify counters for total row tombstones fetched. + name = "TotalRowTombstonesFetched"; + waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 6); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_FILTER_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_FILTER_QUERY_METRIC_TYPE), 2); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_TOPK_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_TOPK_QUERY_METRIC_TYPE), 2); + waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 0); + waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), 2); + // Verify counters for timeouts. name = "TotalQueryTimeouts"; waitForEquals(objectName(name, TABLE_QUERY_METRIC_TYPE), 0); @@ -573,25 +631,25 @@ private void testQueryKindMetrics(boolean perTable, boolean perQuery) waitForEqualsIfExists(perTable, objectName(name, TABLE_SP_HYBRID_QUERY_METRIC_TYPE), 1); waitForEqualsIfExists(perTable, objectName(name, TABLE_MP_HYBRID_QUERY_METRIC_TYPE), 1); - // Verify histograms for partitions reads per query. - name = "PartitionReads"; - waitForHistogramCountEquals(objectName(name, PER_QUERY_METRIC_TYPE), 6); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_FILTER_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_FILTER_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_TOPK_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_TOPK_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_HYBRID_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_HYBRID_QUERY_METRIC_TYPE), 1); - - // Verify histograms for rows filtered per query. - name = "RowsFiltered"; + // Verify histograms + verifyHistogramCount("KeysFetched", perQuery); + verifyHistogramCount("PartitionsFetched", perQuery); + verifyHistogramCount("PartitionsReturned", perQuery); + verifyHistogramCount("PartitionTombstonesFetched", perQuery); + verifyHistogramCount("RowsFetched", perQuery); + verifyHistogramCount("RowsReturned", perQuery); + verifyHistogramCount("RowTombstonesFetched", perQuery); + } + + private void verifyHistogramCount(String name, boolean hasPerQueryKindMetrics) + { waitForHistogramCountEquals(objectName(name, PER_QUERY_METRIC_TYPE), 6); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_FILTER_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_FILTER_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_TOPK_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_TOPK_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_SP_HYBRID_QUERY_METRIC_TYPE), 1); - waitForHistogramCountEqualsIfExists(perQuery, objectName(name, PER_MP_HYBRID_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_SP_FILTER_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_MP_FILTER_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_SP_TOPK_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_MP_TOPK_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_SP_HYBRID_QUERY_METRIC_TYPE), 1); + waitForHistogramCountEqualsIfExists(hasPerQueryKindMetrics, objectName(name, PER_MP_HYBRID_QUERY_METRIC_TYPE), 1); } private ObjectName objectName(String name, String type) From ae536999925c1339509ee31e250aa6173d2c9311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20de=20la=20Pe=C3=B1a?= Date: Wed, 26 Nov 2025 14:56:41 +0000 Subject: [PATCH 2/2] Empty-Commit to trigger CI