diff --git a/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java b/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java index 9ea11e48eef8..df1bd027a9af 100644 --- a/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.StringTuple; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.junit.Assert; import org.openjdk.jmh.annotations.Benchmark; @@ -73,6 +74,7 @@ public class DimensionRangeShardSpecBenchmark // Initial segment (null -> values) private final DimensionRangeShardSpec shardSpec0 = new DimensionRangeShardSpec( Arrays.asList("country", "city"), + VirtualColumns.EMPTY, new StringTuple(new String[]{null, null}), new StringTuple(new String[]{"Germany", "Munich"}), 0, @@ -82,6 +84,7 @@ public class DimensionRangeShardSpecBenchmark // Middle segment (values -> other values) private final DimensionRangeShardSpec shardSpec1 = new DimensionRangeShardSpec( Arrays.asList("country", "city"), + VirtualColumns.EMPTY, new StringTuple(new String[]{"Germany", "Munich"}), new StringTuple(new String[]{"United States", "New York"}), 1, @@ -91,6 +94,7 @@ public class DimensionRangeShardSpecBenchmark // End segment (values -> null) private final DimensionRangeShardSpec shardSpec2 = new DimensionRangeShardSpec( Arrays.asList("country", "city"), + VirtualColumns.EMPTY, new StringTuple(new String[]{"United States", "New York"}), new StringTuple(new String[]{null, null}), 2, diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index bc9f72365309..838bb5599893 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -22,6 +22,7 @@ import org.apache.druid.catalog.guice.CatalogClientModule; import org.apache.druid.catalog.guice.CatalogCoordinatorModule; import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.StringTuple; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.JsonInputFormat; @@ -51,6 +52,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.EqualityFilter; import org.apache.druid.query.filter.NotDimFilter; @@ -93,6 +95,7 @@ import org.apache.druid.testing.tools.StreamGenerator; import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.joda.time.DateTime; @@ -150,22 +153,6 @@ public EmbeddedDruidCluster createCluster() .addServer(new EmbeddedRouter()); } - - private void configureCompaction(CompactionEngine compactionEngine, @Nullable CompactionCandidateSearchPolicy policy) - { - final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord( - o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig( - 1.0, - 100, - policy, - true, - compactionEngine, - true - )) - ); - Assertions.assertTrue(updateResponse.isSuccess()); - } - @MethodSource("getEngine") @ParameterizedTest(name = "compactionEngine={0}") public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToYearGranularity_withInlineConfig( @@ -307,41 +294,6 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex Assertions.assertEquals(4000, getTotalRowCount()); } - protected void ingest1kRecords() - { - final EventSerializer serializer = new JsonEventSerializer(overlord.bindings().jsonMapper()); - final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 500, 100); - List records = streamGenerator.generateEvents(2); - - final InlineInputSource input = new InlineInputSource( - records.stream().map(b -> new String(b, StandardCharsets.UTF_8)).collect(Collectors.joining("\n"))); - final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( - input, - new JsonInputFormat(null, null, null, null, null), - true, - null - ); - final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( - DataSchema.builder() - .withDataSource(dataSource) - .withTimestamp(new TimestampSpec("timestamp", "iso", null)) - .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()) - .build(), - ioConfig, - TuningConfigBuilder.forParallelIndexTask().build() - ); - final String taskId = EmbeddedClusterApis.newTaskId(dataSource); - final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( - taskId, - null, - null, - indexIngestionSpec, - null - ); - cluster.callApi().submitTask(task); - cluster.callApi().waitForTaskToSucceed(taskId, overlord); - } - @MethodSource("getEngine") @ParameterizedTest(name = "compactionEngine={0}") public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine compactionEngine) @@ -575,6 +527,184 @@ public void test_cascadingReindexing_withVirtualColumnOnNestedData_filtersCorrec Assertions.assertTrue(count > 0); } + @Test + public void test_compaction_cluster_by_virtualcolumn() + { + // Virtual Columns on nested data is only supported with MSQ compaction engine right now. + CompactionEngine compactionEngine = CompactionEngine.MSQ; + configureCompaction(compactionEngine, null); + + String jsonDataWithNestedColumn = + """ + {"timestamp": "2023-01-01T00:00:00", "str":"a", "obj":{"a": "ll"}} + {"timestamp": "2023-01-01T00:00:00", "str":"", "obj":{"a": "mm"}} + {"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a": "nn"}} + {"timestamp": "2023-01-01T00:00:00", "str":"b", "obj":{"a": "oo"}} + {"timestamp": "2023-01-01T00:00:00", "str":"c", "obj":{"a": "pp"}} + {"timestamp": "2023-01-01T00:00:00", "str":"d", "obj":{"a": "qq"}} + {"timestamp": "2023-01-01T00:00:00", "str":null, "obj":{"a": "rr"}} + """; + + final TaskBuilder.Index task = TaskBuilder + .ofTypeIndex() + .dataSource(dataSource) + .jsonInputFormat() + .inlineInputSourceWithData(jsonDataWithNestedColumn) + .isoTimestampColumn("timestamp") + .schemaDiscovery() + .granularitySpec("DAY", null, false); + + cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + + Assertions.assertEquals(7, getTotalRowCount()); + + VirtualColumns virtualColumns = VirtualColumns.create( + new ExpressionVirtualColumn("v0", "json_value(obj, '$.a')", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ); + + InlineSchemaDataSourceCompactionConfig config = + InlineSchemaDataSourceCompactionConfig + .builder() + .forDataSource(dataSource) + .withSkipOffsetFromLatest(Period.seconds(0)) + .withTransformSpec( + new CompactionTransformSpec( + null, + virtualColumns + ) + ) + .withTuningConfig( + UserCompactionTaskQueryTuningConfig + .builder() + .partitionsSpec(new DimensionRangePartitionsSpec(4, null, List.of("v0"), false)) + .build() + ) + .build(); + + runCompactionWithSpec(config); + waitForAllCompactionTasksToFinish(); + + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + + List segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord).stream().toList(); + Assertions.assertEquals(2, segments.size()); + Assertions.assertEquals( + new DimensionRangeShardSpec( + List.of("v0"), + virtualColumns, + null, + StringTuple.create("oo"), + 0, + 2 + ), + segments.get(0).getShardSpec() + ); + Assertions.assertEquals( + new DimensionRangeShardSpec( + List.of("v0"), + virtualColumns, + StringTuple.create("oo"), + null, + 1, + 2 + ), + segments.get(1).getShardSpec() + ); + } + + @Test + public void test_compaction_cluster_by_virtualcolumn_rollup() + { + // Virtual Columns on nested data is only supported with MSQ compaction engine right now. + CompactionEngine compactionEngine = CompactionEngine.MSQ; + configureCompaction(compactionEngine, null); + + String jsonDataWithNestedColumn = + """ + {"timestamp": "2023-01-01T00:00:00", "str":"a", "obj":{"a": "ll"}} + {"timestamp": "2023-01-01T00:00:00", "str":"", "obj":{"a": "mm"}} + {"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a": "nn"}} + {"timestamp": "2023-01-01T00:00:00", "str":"b", "obj":{"a": "oo"}} + {"timestamp": "2023-01-01T00:00:00", "str":"c", "obj":{"a": "pp"}} + {"timestamp": "2023-01-01T00:00:00", "str":"d", "obj":{"a": "qq"}} + {"timestamp": "2023-01-01T00:00:00", "str":null, "obj":{"a": "rr"}} + """; + + final TaskBuilder.Index task = TaskBuilder + .ofTypeIndex() + .dataSource(dataSource) + .jsonInputFormat() + .inlineInputSourceWithData(jsonDataWithNestedColumn) + .isoTimestampColumn("timestamp") + .schemaDiscovery() + .dataSchema(builder -> builder.withAggregators(new CountAggregatorFactory("count"))) + .granularitySpec("DAY", "MINUTE", true); + + cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + + Assertions.assertEquals(7, getTotalRowCount()); + + VirtualColumns virtualColumns = VirtualColumns.create( + new ExpressionVirtualColumn( + "v0", + "json_value(obj, '$.a')", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ) + ); + + InlineSchemaDataSourceCompactionConfig config = + InlineSchemaDataSourceCompactionConfig + .builder() + .forDataSource(dataSource) + .withSkipOffsetFromLatest(Period.seconds(0)) + .withTransformSpec( + new CompactionTransformSpec( + null, + virtualColumns + ) + ) + .withTuningConfig( + UserCompactionTaskQueryTuningConfig + .builder() + .partitionsSpec(new DimensionRangePartitionsSpec(4, null, List.of("v0"), false)) + .build() + ) + .build(); + + runCompactionWithSpec(config); + waitForAllCompactionTasksToFinish(); + + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + + List segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord).stream().toList(); + Assertions.assertEquals(2, segments.size()); + Assertions.assertEquals( + new DimensionRangeShardSpec( + List.of("v0"), + virtualColumns, + null, + StringTuple.create("oo"), + 0, + 2 + ), + segments.get(0).getShardSpec() + ); + Assertions.assertEquals( + new DimensionRangeShardSpec( + List.of("v0"), + virtualColumns, + StringTuple.create("oo"), + null, + 1, + 2 + ), + segments.get(1).getShardSpec() + ); + } + /** * Tests that when a compaction task filters out all rows using a transform spec, * tombstones are created to properly drop the old segments. This test covers both @@ -702,6 +832,56 @@ public void test_compaction_legacy_string_discovery_sparse_column( Assertions.assertEquals(1, segments.get(0).getDimensions().size()); } + private void configureCompaction(CompactionEngine compactionEngine, @Nullable CompactionCandidateSearchPolicy policy) + { + final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord( + o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig( + 1.0, + 100, + policy, + true, + compactionEngine, + true + )) + ); + Assertions.assertTrue(updateResponse.isSuccess()); + } + + protected void ingest1kRecords() + { + final EventSerializer serializer = new JsonEventSerializer(overlord.bindings().jsonMapper()); + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 500, 100); + List records = streamGenerator.generateEvents(2); + + final InlineInputSource input = new InlineInputSource( + records.stream().map(b -> new String(b, StandardCharsets.UTF_8)).collect(Collectors.joining("\n"))); + final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( + input, + new JsonInputFormat(null, null, null, null, null), + true, + null + ); + final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( + DataSchema.builder() + .withDataSource(dataSource) + .withTimestamp(new TimestampSpec("timestamp", "iso", null)) + .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()) + .build(), + ioConfig, + TuningConfigBuilder.forParallelIndexTask().build() + ); + final String taskId = EmbeddedClusterApis.newTaskId(dataSource); + final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask( + taskId, + null, + null, + indexIngestionSpec, + null + ); + cluster.callApi().submitTask(task); + cluster.callApi().waitForTaskToSucceed(taskId, overlord); + } + private int getTotalRowCount() { return Numbers.parseInt(cluster.runSql("SELECT COUNT(*) as cnt FROM \"%s\"", dataSource)); @@ -723,7 +903,6 @@ private void verifyNoRowsWithNestedValue(String nestedColumn, String field, Stri ); } - private String generateEventsInInterval(Interval interval, int numEvents, long spacingMillis) { List events = new ArrayList<>(); diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java index d268bb8b897f..1d4ec613e6c3 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java @@ -19,10 +19,17 @@ package org.apache.druid.testing.embedded.msq; +import org.apache.druid.data.input.StringTuple; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.indexing.report.MSQResultsReport; import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.http.SqlTaskStatus; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.http.GetQueryReportResponse; import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedCoordinator; import org.apache.druid.testing.embedded.EmbeddedDruidCluster; @@ -32,6 +39,8 @@ import org.apache.druid.testing.embedded.indexing.MoreResources; import org.apache.druid.testing.embedded.indexing.Resources; import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -40,10 +49,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; public class MultiStageQueryTest extends EmbeddedClusterTestBase { - private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedBroker broker = + new EmbeddedBroker().setServerMemory(200_000_000) + .addProperty("druid.msq.dart.controller.maxRetainedReportCount", "10") + .addProperty("druid.query.default.context.maxConcurrentStages", "1") + .addProperty("druid.sql.planner.enableSysQueriesTable", "true"); private final EmbeddedOverlord overlord = new EmbeddedOverlord(); private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); private final EmbeddedIndexer indexer = new EmbeddedIndexer() @@ -59,6 +73,7 @@ protected EmbeddedDruidCluster createCluster() return EmbeddedDruidCluster .withEmbeddedDerbyAndZookeeper() .useLatchableEmitter() + .addCommonProperty("druid.msq.dart.enabled", "true") .addResource(exportDirectory) .addServer(overlord) .addServer(coordinator) @@ -152,4 +167,193 @@ public void testExport() actualResults ); } + + @Test + public void testClusterByVirtualColumn() + { + final String sqlTemplate = + """ + SET rowsPerSegment = 2; + SET groupByEnableMultiValueUnnesting = FALSE; + REPLACE INTO %s OVERWRITE ALL + WITH "ext" AS ( + SELECT * + FROM TABLE(EXTERN('{"type":"local","files":["%s"]}', '{"type":"json"}')) + EXTEND( + "timestamp" VARCHAR, + "added" BIGINT, + "delta" BIGINT, + "deleted" BIGINT, + "page" VARCHAR, + "city" VARCHAR, + "country" VARCHAR, + "user" VARCHAR + ) + ) + SELECT + TIME_PARSE("timestamp") AS __time, + added, + delta, + deleted, + page, + city, + country, + user + FROM "ext" + PARTITIONED BY DAY + CLUSTERED BY CONCAT(country, ':', city) + """; + final String sql = StringUtils.format( + sqlTemplate, + dataSource, + Resources.DataFile.tinyWiki1Json().getAbsolutePath() + ); + + final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql); + cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord.latchableEmitter()); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + + assertClusterByVirtualColumnSegments(); + assertClusterByVirtualColumnQueries(); + } + + @Test + public void testClusterByVirtualColumnRollup() + { + final String sqlTemplate = + """ + SET rowsPerSegment = 2; + SET groupByEnableMultiValueUnnesting = FALSE; + REPLACE INTO %s OVERWRITE ALL + WITH "ext" AS ( + SELECT * + FROM TABLE(EXTERN('{"type":"local","files":["%s"]}', '{"type":"json"}')) + EXTEND( + "timestamp" VARCHAR, + "added" BIGINT, + "delta" BIGINT, + "deleted" BIGINT, + "page" VARCHAR, + "city" VARCHAR, + "country" VARCHAR, + "user" VARCHAR + ) + ) + SELECT + TIME_PARSE("timestamp") AS __time, + page, + city, + country, + user, + SUM(added) as added, + SUM(delta) as delta, + SUM(deleted) as deleted + FROM "ext" + GROUP BY TIME_PARSE("timestamp"), page, city, country, user, CONCAT(country, ':', city) + PARTITIONED BY DAY + CLUSTERED BY CONCAT(country, ':', city) + """; + final String sql = StringUtils.format( + sqlTemplate, + dataSource, + Resources.DataFile.tinyWiki1Json().getAbsolutePath() + ); + + final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql); + cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord.latchableEmitter()); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + + assertClusterByVirtualColumnSegments(); + assertClusterByVirtualColumnQueries(); + } + + private void assertClusterByVirtualColumnSegments() + { + List segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord).stream().toList(); + Assertions.assertEquals(2, segments.size()); + VirtualColumns virtualColumns = VirtualColumns.create( + new ExpressionVirtualColumn("v1", "concat(\"country\",':',\"city\")", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ); + Assertions.assertEquals( + new DimensionRangeShardSpec( + List.of("v1"), + virtualColumns, + null, + StringTuple.create("Russia:Moscow"), + 0, + 2 + ), + segments.get(0).getShardSpec() + ); + Assertions.assertEquals( + new DimensionRangeShardSpec( + List.of("v1"), + virtualColumns, + StringTuple.create("Russia:Moscow"), + null, + 1, + 2 + ), + segments.get(1).getShardSpec() + ); + } + + private void assertClusterByVirtualColumnQueries() + { + String queryId = UUID.randomUUID().toString(); + cluster.callApi().verifySqlQuery( + "SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT __time, country, city, page FROM %s ORDER BY __time", + dataSource, + """ + 2013-08-31T01:02:33.000Z,United States,San Francisco,Gypsy Danger + 2013-08-31T03:32:45.000Z,Australia,Syndey,Striker Eureka + 2013-08-31T07:11:21.000Z,Russia,Moscow,Cherno Alpha""" + ); + Assertions.assertEquals(2, getSegmentsScannedForDartQuery(queryId)); + + queryId = UUID.randomUUID().toString(); + cluster.callApi().verifySqlQuery( + "SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT __time, country, city, page FROM %s WHERE CONCAT(country, ':', city) <= 'Russia' ORDER BY __time", + dataSource, + """ + 2013-08-31T03:32:45.000Z,Australia,Syndey,Striker Eureka""" + ); + Assertions.assertEquals(1, getSegmentsScannedForDartQuery(queryId)); + + queryId = UUID.randomUUID().toString(); + cluster.callApi().verifySqlQuery( + "SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT __time, country, city, page FROM %s WHERE CONCAT(country, ':', city) <= 'Russia:St. Petersburg' ORDER BY __time", + dataSource, + """ + 2013-08-31T03:32:45.000Z,Australia,Syndey,Striker Eureka + 2013-08-31T07:11:21.000Z,Russia,Moscow,Cherno Alpha""" + ); + Assertions.assertEquals(2, getSegmentsScannedForDartQuery(queryId)); + } + + private long getSegmentsScannedForDartQuery(String sqlQueryId) + { + ChannelCounters.Snapshot segmentChannelCounters = getDartSegmentChannelCounters(sqlQueryId); + return segmentChannelCounters.getFiles()[0]; + } + + private ChannelCounters.Snapshot getDartSegmentChannelCounters(String sqlQueryId) + { + final GetQueryReportResponse reportResponse = msqApis.getDartQueryReport(sqlQueryId, broker); + + Assertions.assertNotNull(reportResponse, "Report response should not be null"); + ChannelCounters.Snapshot segmentChannelCounters = + (ChannelCounters.Snapshot) reportResponse.getReportMap() + .findReport("multiStageQuery") + .map(r -> + ((MSQTaskReportPayload) r.getPayload()).getCounters() + .snapshotForStage(0) + .get(0) + .getMap() + .get("input0") + ).orElse(null); + + Assertions.assertNotNull(segmentChannelCounters); + return segmentChannelCounters; + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b18a2971e2ce..6a064c82c5dd 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -61,6 +61,7 @@ import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; @@ -174,6 +175,8 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.rowsandcols.serde.WireTransferableContext; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.indexing.DataSchema; @@ -1062,6 +1065,7 @@ private List generateSegmentIdsWithShardSpecs( final ClusterBy clusterBy, final RowKeyReader keyReader, final ClusterByPartitions partitionBoundaries, + final Map clusterByVirtualColumnMappings, final boolean mayHaveMultiValuedClusterByFields, @Nullable final Boolean isStageOutputEmpty ) throws IOException @@ -1073,6 +1077,7 @@ private List generateSegmentIdsWithShardSpecs( clusterBy, keyReader, partitionBoundaries, + clusterByVirtualColumnMappings, mayHaveMultiValuedClusterByFields, isStageOutputEmpty ); @@ -1240,6 +1245,7 @@ private List generateSegmentIdsWithShardSpecsForReplace( final ClusterBy clusterBy, final RowKeyReader keyReader, final ClusterByPartitions partitionBoundaries, + final Map clusterByVirtualColumnMappings, final boolean mayHaveMultiValuedClusterByFields, @Nullable final Boolean isStageOutputEmpty ) throws IOException @@ -1254,6 +1260,7 @@ private List generateSegmentIdsWithShardSpecsForReplace( signature, clusterBy, querySpec.getColumnMappings(), + clusterByVirtualColumnMappings, mayHaveMultiValuedClusterByFields ); final List shardColumns = shardReasonPair.lhs; @@ -1314,8 +1321,14 @@ private List generateSegmentIdsWithShardSpecsForReplace( segmentNumber == ranges.size() - 1 ? null : makeStringTuple(clusterBy, keyReader, range.getEnd(), shardColumns.size()); - - shardSpec = new DimensionRangeShardSpec(shardColumns, start, end, segmentNumber, ranges.size()); + shardSpec = new DimensionRangeShardSpec( + shardColumns, + VirtualColumns.create(clusterByVirtualColumnMappings.values()), + start, + end, + segmentNumber, + ranges.size() + ); } retVal[partitionNumber] = new SegmentIdWithShardSpec(destination.getDataSource(), interval, version, shardSpec); @@ -1710,19 +1723,13 @@ private void handleQueryResults( queryDef.getQueryId() ); } else { - DataSchema dataSchema = ((SegmentGeneratorStageProcessor) queryKernel - .getStageDefinition(finalStageId).getProcessor()).getDataSchema(); - - ShardSpec shardSpec = segments.isEmpty() ? null : segments.stream().findFirst().get().getShardSpec(); - ClusterBy clusterBy = queryKernel.getStageDefinition(finalStageId).getClusterBy(); + final ShardSpec shardSpec = segments.isEmpty() ? null : segments.stream().findFirst().get().getShardSpec(); compactionStateAnnotateFunction = addCompactionStateToSegments( + queryDef, querySpec, context.jsonMapper(), - dataSchema, - shardSpec, - clusterBy, - queryDef.getQueryId() + shardSpec ); } } @@ -1761,17 +1768,21 @@ private void handleQueryResults( } private static Function, Set> addCompactionStateToSegments( + QueryDefinition queryDef, MSQSpec querySpec, ObjectMapper jsonMapper, - DataSchema dataSchema, - @Nullable ShardSpec shardSpec, - @Nullable ClusterBy clusterBy, - String queryId + @Nullable ShardSpec shardSpec ) { + final ClusterBy clusterBy = queryDef.getFinalStageDefinition().getClusterBy(); final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); - PartitionsSpec partitionSpec; + final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); + final SegmentGeneratorStageProcessor segmentProcessor = + (SegmentGeneratorStageProcessor) queryDef.getFinalStageDefinition().getProcessor(); + + final DataSchema dataSchema = segmentProcessor.getDataSchema(); + final PartitionsSpec partitionSpec; // shardSpec is absent in the absence of segments, which happens when only tombstones are generated by an // MSQControllerTask. if (shardSpec != null) { @@ -1793,7 +1804,7 @@ private static Function, Set> addCompactionStateTo UnknownFault.forMessage( StringUtils.format( "Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].", - queryId, + queryDef.getQueryId(), shardSpec.getType() ))); } @@ -1811,27 +1822,40 @@ private static Function, Set> addCompactionStateTo partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE); } - Granularity segmentGranularity = ((DataSourceMSQDestination) querySpec.getDestination()) - .getSegmentGranularity(); + Granularity segmentGranularity = destination.getSegmentGranularity(); GranularitySpec granularitySpec = new UniformGranularitySpec( segmentGranularity, - querySpec.getContext() - .getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY, jsonMapper), + querySpec.getContext().getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY, jsonMapper), dataSchema.getGranularitySpec().isRollup(), // Not using dataSchema.getGranularitySpec().inputIntervals() as that always has ETERNITY - ((DataSourceMSQDestination) querySpec.getDestination()).getReplaceTimeChunks() + destination.getReplaceTimeChunks() ); DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec(); - CompactionTransformSpec transformSpec = TransformSpec.NONE.equals(dataSchema.getTransformSpec()) - ? null - : CompactionTransformSpec.of(dataSchema.getTransformSpec()); + + // if the clustered by requires virtual columns, preserve them here so that we can rebuild during compaction + CompactionTransformSpec transformSpec; + final Map clusterByVirtualColumnMappings = + segmentProcessor.getClusterByVirtualColumnMappings(); + + // only range partitioning can have virtual columns + if (clusterByVirtualColumnMappings.isEmpty() || !SecondaryPartitionType.RANGE.equals(partitionSpec.getType())) { + transformSpec = TransformSpec.NONE.equals(dataSchema.getTransformSpec()) + ? null + : CompactionTransformSpec.of(dataSchema.getTransformSpec()); + } else { + transformSpec = new CompactionTransformSpec( + dataSchema.getTransformSpec().getFilter(), + VirtualColumns.create(clusterByVirtualColumnMappings.values()) + ); + } + List metricsSpec = buildMSQCompactionMetrics(querySpec, dataSchema); IndexSpec indexSpec = tuningConfig.getIndexSpec(); - log.info("Query[%s] storing compaction state in segments.", queryId); + log.info("Query[%s] storing compaction state in segments.", queryDef.getQueryId()); return CompactionState.addCompactionStateToSegments( partitionSpec, @@ -1909,6 +1933,7 @@ private static Pair, String> computeShardColumns( final RowSignature signature, final ClusterBy clusterBy, final ColumnMappings columnMappings, + final Map clusterByVirtualColumns, boolean mayHaveMultiValuedClusterByFields ) { @@ -1959,19 +1984,24 @@ private static Pair, String> computeShardColumns( ); } - // DimensionRangeShardSpec only handles columns that appear as-is in the output. + // DimensionRangeShardSpec columns may either be explicitly in the table or defined as virtual columns if (outputColumns.isEmpty()) { - return Pair.of( - shardColumns, - StringUtils.format( - "Using only[%d] CLUSTERED BY columns for 'range' shard specs, since the next column was not mapped to " - + "an output column.", - shardColumns.size() - ) - ); + final VirtualColumn vc = clusterByVirtualColumns.get(column.columnName()); + if (vc != null) { + shardColumns.add(vc.getOutputName()); + } else { + return Pair.of( + shardColumns, + StringUtils.format( + "Using only[%d] CLUSTERED BY columns for 'range' shard specs, since the next column was not mapped to " + + "an output column or virtual column.", + shardColumns.size() + ) + ); + } + } else { + shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0))); } - - shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0))); } return Pair.of(shardColumns, "Using 'range' shard specs with all CLUSTERED BY fields."); @@ -1997,7 +2027,7 @@ private static StringTuple makeStringTuple( final int shardFieldCount ) { - final String[] array = new String[clusterBy.getColumns().size() - clusterBy.getBucketByCount()]; + final String[] array = new String[shardFieldCount]; for (int i = 0; i < shardFieldCount; i++) { final Object val = keyReader.read(key, clusterBy.getBucketByCount() + i); @@ -2633,6 +2663,8 @@ private void populateSegmentsToGenerate() throws IOException final boolean mayHaveMultiValuedClusterByFields = !shuffleStageDef.mustGatherResultKeyStatistics() || queryKernel.hasStageCollectorEncounteredAnyMultiValueField(shuffleStageId); + final SegmentGeneratorStageProcessor segmentGeneratorStageProcessor = + (SegmentGeneratorStageProcessor) queryDef.getFinalStageDefinition().getProcessor(); segmentsToGenerate = generateSegmentIdsWithShardSpecs( (DataSourceMSQDestination) querySpec.getDestination(), @@ -2640,6 +2672,7 @@ private void populateSegmentsToGenerate() throws IOException shuffleStageDef.getClusterBy(), shuffleStageDef.getClusterBy().keyReader(shuffleStageDef.getSignature(), rowBasedFrameType), partitionBoundaries, + segmentGeneratorStageProcessor.getClusterByVirtualColumnMappings(), mayHaveMultiValuedClusterByFields, isShuffleStageOutputEmpty ); diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index 7195303e4ef4..8b4cd39879b2 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -30,6 +30,7 @@ import org.apache.druid.client.indexing.ClientCompactionRunnerInfo; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; @@ -67,8 +68,10 @@ import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.policy.PolicyEnforcer; import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -76,6 +79,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.transform.CompactionTransformSpec; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.segment.virtual.VirtualizedColumnInspector; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizationResult; @@ -172,7 +176,10 @@ public CompactionConfigValidationResult validateCompactionTask( validationResults.add( ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ( compactionTask.getTuningConfig().getPartitionsSpec(), - dataSchema.getDimensionsSpec().getDimensions() + dataSchema.getDimensionsSpec().getDimensions(), + compactionTask.getTransformSpec() == null + ? VirtualColumns.EMPTY + : compactionTask.getTransformSpec().getVirtualColumns() ) ); validationResults.add( @@ -409,42 +416,73 @@ private DataSource getInputDataSource(String name) private static List getAggregateDimensions( DataSchema dataSchema, - Map inputColToVirtualCol + Map inputColToVirtualCol, + List orderBy ) { - List dimensionSpecs = new ArrayList<>(); + List dimensions = new ArrayList<>(); + + // build a RowSignature of non-virtual column dimensions of the dataschema to use to resolve virtual column types + RowSignature.Builder baseBuilder = RowSignature.builder().addTimeColumn(); + for (DimensionSchema schema : dataSchema.getDimensionsSpec().getDimensions()) { + if (inputColToVirtualCol.containsKey(schema.getName())) { + continue; + } + baseBuilder.add(schema.getName(), schema.getColumnType()); + } + final RowSignature baseSignature = baseBuilder.build(); + // and virtualized inspector from base signature + final ColumnInspector inspector = new VirtualizedColumnInspector( + baseSignature, + VirtualColumns.create(inputColToVirtualCol.values()) + ); - // if schema is not time-sorted, the time column mapping would already be in inputColToVirtualCol - if (!dataSchema.getDimensionsSpec().getDimensionNames().contains(ColumnHolder.TIME_COLUMN_NAME)) { + // if schema is not time-sorted, the time column will be in the dimensions list, otherwise add time dimension first + if (dataSchema.getDimensionsSpec().getSchema(ColumnHolder.TIME_COLUMN_NAME) == null) { if (isQueryGranularityEmptyOrNone(dataSchema)) { // Dimensions in group-by aren't allowed to have time column name as the output name. - dimensionSpecs.add(new DefaultDimensionSpec( - ColumnHolder.TIME_COLUMN_NAME, - TIME_VIRTUAL_COLUMN, - ColumnType.LONG - )); + dimensions.add(new DefaultDimensionSpec(ColumnHolder.TIME_COLUMN_NAME, TIME_VIRTUAL_COLUMN, ColumnType.LONG)); } else { // The changed granularity would result in a new virtual column that needs to be aggregated upon. - dimensionSpecs.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN, TIME_VIRTUAL_COLUMN, ColumnType.LONG)); + dimensions.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN, TIME_VIRTUAL_COLUMN, ColumnType.LONG)); + } + } + + // If dimensions point to virtual columns, replace dimension columns names with virtual column names. + for (DimensionSchema schema : dataSchema.getDimensionsSpec().getDimensions()) { + String dimension = schema.getName(); + ColumnType colType = schema.getColumnType(); + VirtualColumn vc = inputColToVirtualCol.get(dimension); + if (vc != null) { + dimension = vc.getOutputName(); + if (vc instanceof ExpressionVirtualColumn) { + colType = ((ExpressionVirtualColumn) vc).getOutputType(); + } else { + colType = ColumnType.fromCapabilities(vc.capabilities(inspector, vc.getOutputName())); + } } + dimensions.add(new DefaultDimensionSpec(dimension, dimension, colType)); } - // If virtual columns are created from dimensions, replace dimension columns names with virtual column names. - dimensionSpecs.addAll( - dataSchema.getDimensionsSpec().getDimensions().stream() - .map(dim -> { - String dimension = dim.getName(); - ColumnType colType = dim.getColumnType(); - if (inputColToVirtualCol.containsKey(dim.getName())) { - VirtualColumn virtualColumn = inputColToVirtualCol.get(dimension); - dimension = virtualColumn.getOutputName(); - if (virtualColumn instanceof ExpressionVirtualColumn) { - colType = ((ExpressionVirtualColumn) virtualColumn).getOutputType(); - } - } - return new DefaultDimensionSpec(dimension, dimension, colType); - }) - .collect(Collectors.toList())); - return dimensionSpecs; + + // if any orderby columns refer to a virtual column that was not explicitly a dimension, add it to the list + // this is not really optimal, but it works without requiring any conversion between virtualcolumns and + // postaggregators which doesn't really exist here + for (OrderByColumnSpec order : orderBy) { + if (dataSchema.getDimensionsSpec().getSchema(order.getDimension()) != null) { + continue; + } + VirtualColumn vc = inputColToVirtualCol.get(order.getDimension()); + if (vc != null) { + dimensions.add( + new DefaultDimensionSpec( + vc.getOutputName(), + order.getDimension(), + ColumnType.fromCapabilities(vc.capabilities(baseSignature, vc.getOutputName())) + ) + ); + } + } + return dimensions; } private static ColumnMappings getColumnMappings(DataSchema dataSchema) @@ -469,11 +507,13 @@ private static ColumnMappings getColumnMappings(DataSchema dataSchema) .map(dim -> dim.getName().equals(ColumnHolder.TIME_COLUMN_NAME) ? timeColumnMapping : new ColumnMapping(dim.getName(), dim.getName())) - .collect(Collectors.toList()) + .toList() + ); + columnMappings.addAll( + Arrays.stream(dataSchema.getAggregators()) + .map(agg -> new ColumnMapping(agg.getName(), agg.getName())) + .toList() ); - columnMappings.addAll(Arrays.stream(dataSchema.getAggregators()) - .map(agg -> new ColumnMapping(agg.getName(), agg.getName())) - .collect(Collectors.toList())); return new ColumnMappings(columnMappings); } @@ -508,31 +548,44 @@ private Query buildScanQuery( Map inputColToVirtualCol ) { - RowSignature rowSignature = getRowSignature(dataSchema); - VirtualColumns virtualColumns = VirtualColumns.create(inputColToVirtualCol.values()); + RowSignature baseRowSignature = getRowSignature(dataSchema); + final List columns = new ArrayList<>(baseRowSignature.getColumnNames()); + final List orderBys; + + RowSignature.Builder rowSignatureWithOrderByBuilder = RowSignature.builder().addAll(baseRowSignature); + + // when clustering by a virtual column, we might need to add the virtual column to columns list and row signature + if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) { + List orderByColumnSpecs = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec()); + orderBys = new ArrayList<>(); + for (OrderByColumnSpec spec : orderByColumnSpecs) { + orderBys.add(new OrderBy(spec.getDimension(), Order.fromString(spec.getDirection().toString()))); + + final VirtualColumn vc = inputColToVirtualCol.get(spec.getDimension()); + if (vc != null) { + columns.add(spec.getDimension()); + final ColumnCapabilities capabilities = vc.capabilities(baseRowSignature, vc.getOutputName()); + DruidException.conditionalDefensive( + capabilities != null, + "virtual column[%s] has null capabilities, cannot determine output type", + vc.getOutputName() + ); + rowSignatureWithOrderByBuilder.add(spec.getDimension(), capabilities.toColumnType()); + } + } + } else { + orderBys = null; + } + Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder() .dataSource(getInputDataSource(dataSchema.getDataSource())) - .columns(rowSignature.getColumnNames()) - .virtualColumns(virtualColumns) - .columnTypes(rowSignature.getColumnTypes()) .intervals(segmentSpec) .filters(dataSchema.getTransformSpec().getFilter()) + .virtualColumns(VirtualColumns.create(inputColToVirtualCol.values())) + .columns(columns) + .columnTypes(rowSignatureWithOrderByBuilder.build().getColumnTypes()) + .orderBy(orderBys) .context(buildQueryContext(compactionTask.getContext(), dataSchema)); - - if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) { - List orderByColumnSpecs = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec()); - - scanQueryBuilder.orderBy( - orderByColumnSpecs - .stream() - .map(orderByColumnSpec -> - new OrderBy( - orderByColumnSpec.getDimension(), - Order.fromString(orderByColumnSpec.getDirection().toString()) - )) - .collect(Collectors.toList()) - ); - } return scanQueryBuilder.build(); } @@ -647,7 +700,6 @@ private Query buildGroupByQuery( ) { DimFilter dimFilter = dataSchema.getTransformSpec().getFilter(); - VirtualColumns virtualColumns = VirtualColumns.create(inputColToVirtualCol.values()); // Convert MVDs converted to arrays back to MVDs, with the same name as the input column. @@ -668,20 +720,25 @@ private Query buildGroupByQuery( ) .collect(Collectors.toList()); + final List orderBy; + if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) { + orderBy = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec()); + } else { + orderBy = List.of(); + } + GroupByQuery.Builder builder = new GroupByQuery.Builder() .setDataSource(getInputDataSource(compactionTask.getDataSource())) - .setVirtualColumns(virtualColumns) - .setDimFilter(dimFilter) + .setQuerySegmentSpec(segmentSpec) .setGranularity(new AllGranularity()) - .setDimensions(getAggregateDimensions(dataSchema, inputColToVirtualCol)) - .setAggregatorSpecs(Arrays.asList(dataSchema.getAggregators())) + .setDimFilter(dimFilter) + .setVirtualColumns(virtualColumns) + .setDimensions(getAggregateDimensions(dataSchema, inputColToVirtualCol, orderBy)) + .setAggregatorSpecs(dataSchema.getAggregators()) .setPostAggregatorSpecs(postAggregators) - .setContext(buildQueryContext(compactionTask.getContext(), dataSchema)) - .setQuerySegmentSpec(segmentSpec); + .setOrderByColumns(orderBy) + .setContext(buildQueryContext(compactionTask.getContext(), dataSchema)); - if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) { - getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec()).forEach(builder::addOrderByColumn); - } return builder.build(); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java index 269a118f6d88..7898cd8ab6a9 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java @@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.indexing.processor.SegmentGeneratorStageProcessor; @@ -36,6 +37,10 @@ import org.apache.druid.msq.kernel.StageDefinitionBuilder; import org.apache.druid.msq.kernel.controller.WorkerInputs; import org.apache.druid.query.Query; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -43,7 +48,9 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; public class SegmentGenerationStageSpec implements TerminalStageSpec { @@ -70,19 +77,31 @@ public StageDefinitionBuilder constructFinalStage(QueryDefinition queryDef, MSQS final ClusterBy queryClusterBy = queryDef.getFinalStageDefinition().getClusterBy(); // Add a segment-generation stage. - final DataSchema dataSchema = - SegmentGenerationUtils.makeDataSchemaForIngestion(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper, query); + final DataSchema dataSchema = SegmentGenerationUtils.makeDataSchemaForIngestion( + querySpec, + querySignature, + queryClusterBy, + columnMappings, + jsonMapper, + query + ); - return StageDefinition.builder(queryDef.getNextStageNumber()) - .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) - .maxWorkerCount(tuningConfig.getMaxNumWorkers()) - .processor( - new SegmentGeneratorStageProcessor( - dataSchema, - columnMappings, - tuningConfig - ) + final Map clusterByVirtualColumnMappings = getClusterByVirtualColumnMappings( + query, + queryClusterBy ); + + return StageDefinition.builder(queryDef.getNextStageNumber()) + .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) + .maxWorkerCount(tuningConfig.getMaxNumWorkers()) + .processor( + new SegmentGeneratorStageProcessor( + dataSchema, + columnMappings, + clusterByVirtualColumnMappings, + tuningConfig + ) + ); } public Int2ObjectMap> getWorkerInfo( @@ -113,4 +132,32 @@ public Int2ObjectMap> getWorkerInfo( return retVal; } + + private static Map getClusterByVirtualColumnMappings(Query query, ClusterBy queryClusterBy) + { + final Map clusterByVirtualColumns = new LinkedHashMap<>(); + if (query instanceof GroupByQuery groupByQuery) { + final Map outputToVc = new LinkedHashMap<>(); + for (DimensionSpec spec : groupByQuery.getDimensions()) { + final VirtualColumn vc = groupByQuery.getVirtualColumns().getVirtualColumn(spec.getDimension()); + if (vc != null) { + outputToVc.put(spec.getOutputName(), vc); + } + } + for (KeyColumn column : queryClusterBy.getColumns()) { + final VirtualColumn vc = outputToVc.get(column.columnName()); + if (vc != null) { + clusterByVirtualColumns.put(column.columnName(), vc); + } + } + } else if (query instanceof ScanQuery scanQuery) { + for (KeyColumn column : queryClusterBy.getColumns()) { + final VirtualColumn vc = scanQuery.getVirtualColumns().getVirtualColumn(column.columnName()); + if (vc != null) { + clusterByVirtualColumns.put(column.columnName(), vc); + } + } + } + return clusterByVirtualColumns; + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java index fe9418ee3fb5..c80c03268fc3 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.indexing.processor; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.core.type.TypeReference; @@ -51,6 +52,7 @@ import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ReadableInput; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.incremental.AppendableIndexSpec; @@ -72,6 +74,7 @@ import java.io.File; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -80,17 +83,20 @@ public class SegmentGeneratorStageProcessor implements StageProcessor clusterByVirtualColumnMappings; private final MSQTuningConfig tuningConfig; @JsonCreator public SegmentGeneratorStageProcessor( @JsonProperty("dataSchema") final DataSchema dataSchema, @JsonProperty("columnMappings") final ColumnMappings columnMappings, + @JsonProperty("clusterByVirtualColumnMappings") @Nullable final Map clusterByVirtualColumnMappings, @JsonProperty("tuningConfig") final MSQTuningConfig tuningConfig ) { this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); this.columnMappings = Preconditions.checkNotNull(columnMappings, "columnMappings"); + this.clusterByVirtualColumnMappings = clusterByVirtualColumnMappings == null ? Map.of() : clusterByVirtualColumnMappings; this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); } @@ -106,6 +112,13 @@ public ColumnMappings getColumnMappings() return columnMappings; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public Map getClusterByVirtualColumnMappings() + { + return clusterByVirtualColumnMappings; + } + @JsonProperty public MSQTuningConfig getTuningConfig() { @@ -255,13 +268,14 @@ public boolean equals(Object o) SegmentGeneratorStageProcessor that = (SegmentGeneratorStageProcessor) o; return Objects.equals(dataSchema, that.dataSchema) && Objects.equals(columnMappings, that.columnMappings) + && Objects.equals(clusterByVirtualColumnMappings, that.clusterByVirtualColumnMappings) && Objects.equals(tuningConfig, that.tuningConfig); } @Override public int hashCode() { - return Objects.hash(dataSchema, columnMappings, tuningConfig); + return Objects.hash(dataSchema, columnMappings, clusterByVirtualColumnMappings, tuningConfig); } @Override diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 0032a6051ac2..c8b9546f0177 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -211,9 +211,12 @@ public static LegacyMSQSpec makeLegacyMSQSpec( return querySpec; } - private static MSQDestination buildMSQDestination(final IngestDestination targetDataSource, - final ColumnMappings columnMappings, final PlannerContext plannerContext, - final MSQTerminalStageSpecFactory terminalStageSpecFactory) + private static MSQDestination buildMSQDestination( + final IngestDestination targetDataSource, + final ColumnMappings columnMappings, + final PlannerContext plannerContext, + final MSQTerminalStageSpecFactory terminalStageSpecFactory + ) { final QueryContext sqlQueryContext = plannerContext.queryContext(); final Object segmentGranularity = getSegmentGranularity(plannerContext); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java index b18ee8dde537..07a4ad9769ca 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java @@ -46,6 +46,7 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.filter.EqualityFilter; import org.apache.druid.query.filter.FilterSegmentPruner; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; @@ -105,6 +106,7 @@ public class DartTableInputSpecSlicerTest extends InitializedNullHandlingTest .interval(Intervals.of("2000/2001")) .shardSpec(new DimensionRangeShardSpec( ImmutableList.of(PARTITION_DIM), + VirtualColumns.EMPTY, null, new StringTuple(new String[]{"foo"}), 0, @@ -119,6 +121,7 @@ public class DartTableInputSpecSlicerTest extends InitializedNullHandlingTest .interval(Intervals.of("2000/2001")) .shardSpec(new DimensionRangeShardSpec( ImmutableList.of(PARTITION_DIM), + VirtualColumns.EMPTY, new StringTuple(new String[]{"foo"}), null, 1, @@ -449,6 +452,7 @@ public void test_sliceStatic_dimensionFilter_twoSlices() ); final FilterSegmentPruner pruner = new FilterSegmentPruner( new EqualityFilter(PARTITION_DIM, ColumnType.STRING, "abc", null), + null, null ); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 9d4ddde70ca4..fdf58b3e8747 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -56,13 +56,16 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.QueryContext; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.NotDimFilter; import org.apache.druid.query.filter.NullFilter; import org.apache.druid.query.filter.RangeFilter; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.transform.CompactionTransformSpec; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -328,7 +331,7 @@ public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String contextName SegmentId.of("foo", Intervals.ETERNITY, "test", 0) ) ) - .setExpectedShardSpec(NumberedShardSpec.class) + .setExpectedShardSpec(DimensionRangeShardSpec.class) .setExpectedResultRows( ImmutableList.of( new Object[]{946684800000L, "", 1.0f}, @@ -347,7 +350,7 @@ public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String contextName .setExpectedLastCompactionState( expectedCompactionState( context, - Collections.emptyList(), + List.of("v0"), DimensionsSpec.builder() .setDimensions( ImmutableList.of( @@ -357,13 +360,24 @@ public void testReplaceOnFooWithAllClusteredByDimExplicitSort(String contextName ) .setDimensionExclusions(Collections.singletonList("__time")) .build(), + new CompactionTransformSpec( + null, + VirtualColumns.create( + new ExpressionVirtualColumn( + "v0", + "lower(\"dim1\")", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ) + ) + ), GranularityType.ALL, Intervals.ETERNITY ) ) .verifyResults(); } - + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testReplaceOnFooWithAllClusteredByExpression(String contextName, Map context) @@ -2813,6 +2827,25 @@ private CompactionState expectedCompactionState( GranularityType segmentGranularity, Interval interval ) + { + return expectedCompactionState( + context, + partitionDimensions, + dimensions, + null, + segmentGranularity, + interval + ); + } + + private CompactionState expectedCompactionState( + Map context, + List partitionDimensions, + List dimensions, + CompactionTransformSpec transformSpec, + GranularityType segmentGranularity, + Interval interval + ) { return expectedCompactionState( context, @@ -2821,6 +2854,7 @@ private CompactionState expectedCompactionState( .setDimensions(dimensions) .setDimensionExclusions(Collections.singletonList("__time")) .build(), + transformSpec, segmentGranularity, interval ); @@ -2833,6 +2867,25 @@ private CompactionState expectedCompactionState( GranularityType segmentGranularity, Interval interval ) + { + return expectedCompactionState( + context, + partitionDimensions, + dimensionsSpec, + null, + segmentGranularity, + interval + ); + } + + private CompactionState expectedCompactionState( + Map context, + List partitionDimensions, + DimensionsSpec dimensionsSpec, + CompactionTransformSpec transformSpec, + GranularityType segmentGranularity, + Interval interval + ) { if (!context.containsKey(Tasks.STORE_COMPACTION_STATE_KEY) || !((Boolean) context.get(Tasks.STORE_COMPACTION_STATE_KEY))) { @@ -2866,7 +2919,7 @@ private CompactionState expectedCompactionState( partitionsSpec, dimensionsSpec, metricsSpec, - null, + transformSpec, indexSpec, granularitySpec, null diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index fa124f44da35..1375ec64054d 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -232,7 +232,7 @@ public void testLongDimensionInRangePartitionsSpecIsInvalid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "MSQ: Non-string partition dimension[long_dim] of type[long] not supported with 'range' partition spec", + "MSQ: Non-string partition dimension[long_dim] of type[LONG] not supported with 'range' partition spec", validationResult.getReason() ); } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessorTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessorTest.java new file mode 100644 index 000000000000..7a06c419dd68 --- /dev/null +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessorTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.processor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.granularity.UniformGranularitySpec; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.msq.exec.StageProcessor; +import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.msq.indexing.MSQTuningConfig; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +class SegmentGeneratorStageProcessorTest +{ + @Test + public void testSerde() throws JsonProcessingException + { + ObjectMapper mapper = TestHelper.makeJsonMapper(); + mapper.registerModules(new MSQIndexingModule().getJacksonModules()); + SegmentGeneratorStageProcessor processor = new SegmentGeneratorStageProcessor( + DataSchema.builder() + .withDataSource("test") + .withTimestamp(new TimestampSpec("timestamp", "auto", null)) + .withGranularity(new UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, null)) + .withDimensions(new StringDimensionSchema("a"), new StringDimensionSchema("b")) + .withAggregators(new CountAggregatorFactory("cnt")) + .build(), + new ColumnMappings( + List.of( + new ColumnMapping("d0", "__time"), + new ColumnMapping("d1", "a"), + new ColumnMapping("d2", "b"), + new ColumnMapping("a0", "cnt") + ) + ), + Map.of( + "v0", + new ExpressionVirtualColumn("v0", "concat(\"a\",'foo')", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ), + MSQTuningConfig.defaultConfig() + ); + + Assertions.assertEquals(processor, mapper.readValue(mapper.writeValueAsString(processor), StageProcessor.class)); + } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(SegmentGeneratorStageProcessor.class).usingGetClass().verify(); + } +} diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java index 55ba95a9d98f..61fadac716a7 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java @@ -32,6 +32,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.EqualityFilter; import org.apache.druid.query.filter.FilterSegmentPruner; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.DataSegment; @@ -60,6 +61,7 @@ public class IndexerTableInputSpecSlicerTest extends InitializedNullHandlingTest Collections.emptyList(), new DimensionRangeShardSpec( ImmutableList.of("dim"), + VirtualColumns.EMPTY, null, new StringTuple(new String[]{"foo"}), 0, @@ -79,6 +81,7 @@ public class IndexerTableInputSpecSlicerTest extends InitializedNullHandlingTest Collections.emptyList(), new DimensionRangeShardSpec( ImmutableList.of("dim"), + VirtualColumns.EMPTY, new StringTuple(new String[]{"foo"}), null, 1, @@ -276,6 +279,7 @@ public void test_sliceStatic_dimFilter() ); final FilterSegmentPruner pruner = new FilterSegmentPruner( new EqualityFilter("dim", ColumnType.STRING, "bar", null), + null, null ); @@ -309,7 +313,8 @@ public void test_sliceStatic_dimFilterNotUsed() ); final FilterSegmentPruner segmentPruner = new FilterSegmentPruner( new EqualityFilter("dim", ColumnType.STRING, "bar", null), - Collections.emptySet() + Collections.emptySet(), + null ); Assert.assertEquals( @@ -350,6 +355,7 @@ public void test_sliceStatic_intervalAndDimFilter() ); final FilterSegmentPruner segmentPruner = new FilterSegmentPruner( new EqualityFilter("dim", ColumnType.STRING, "bar", null), + null, null ); diff --git a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java index cb0b88ea0790..aef51e2c05d2 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java +++ b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java @@ -21,6 +21,8 @@ import com.google.common.collect.RangeSet; import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.ShardSpec; @@ -44,15 +46,18 @@ public class FilterSegmentPruner implements SegmentPruner { private final DimFilter filter; private final Set filterFields; + private final VirtualColumns virtualColumns; private final Map>> rangeCache; public FilterSegmentPruner( DimFilter filter, - @Nullable Set filterFields + @Nullable Set filterFields, + @Nullable VirtualColumns virtualColumns ) { this.filter = InvalidInput.notNull(filter, "filter"); this.filterFields = filterFields == null ? filter.getRequiredColumns() : filterFields; + this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns; this.rangeCache = new HashMap<>(); } @@ -88,10 +93,25 @@ public Collection prune(Iterable input, Function conve Map> filterDomain = new HashMap<>(); List dimensions = shard.getDomainDimensions(); for (String dimension : dimensions) { - if (filterFields == null || filterFields.contains(dimension)) { + final VirtualColumn shardVirtualColumn = shard.getDomainVirtualColumns().getVirtualColumn(dimension); + if (shardVirtualColumn != null) { + final VirtualColumn queryEquivalent = virtualColumns.findEquivalent(shardVirtualColumn); + if (queryEquivalent != null) { + if (filterFields == null || filterFields.contains(queryEquivalent.getOutputName())) { + Optional> optFilterRangeSet = rangeCache + .computeIfAbsent( + queryEquivalent.getOutputName(), + d -> Optional.ofNullable(filter.getDimensionRangeSet(d)) + ); + optFilterRangeSet.ifPresent(stringRangeSet -> filterDomain.put( + shardVirtualColumn.getOutputName(), + stringRangeSet + )); + } + } + } else if (filterFields == null || filterFields.contains(dimension)) { Optional> optFilterRangeSet = rangeCache.computeIfAbsent(dimension, d -> Optional.ofNullable(filter.getDimensionRangeSet(d))); - optFilterRangeSet.ifPresent(stringRangeSet -> filterDomain.put(dimension, stringRangeSet)); } } @@ -114,13 +134,15 @@ public boolean equals(Object o) return false; } FilterSegmentPruner that = (FilterSegmentPruner) o; - return Objects.equals(filter, that.filter) && Objects.equals(filterFields, that.filterFields); + return Objects.equals(filter, that.filter) && + Objects.equals(filterFields, that.filterFields) && + Objects.equals(virtualColumns, that.virtualColumns); } @Override public int hashCode() { - return Objects.hash(filter, filterFields); + return Objects.hash(filter, filterFields, virtualColumns); } @Override @@ -129,7 +151,7 @@ public String toString() return "FilterSegmentPruner{" + "filter=" + filter + ", filterFields=" + filterFields + - ", rangeCache=" + rangeCache + + ", virtualColumns=" + virtualColumns + '}'; } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index 9faf0226bcb7..d31f130f6654 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -1088,6 +1088,14 @@ public Builder addOrderByColumn(OrderByColumnSpec columnSpec) return this; } + public Builder setOrderByColumns(List columnSpec) + { + ensureExplicitLimitSpecNotSet(); + this.orderByColumnSpecs = new ArrayList<>(columnSpec); + this.postProcessingFn = null; + return this; + } + public Builder setLimitSpec(LimitSpec limitSpec) { Preconditions.checkNotNull(limitSpec); diff --git a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java index d5baa72dd084..64a974771a20 100644 --- a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java +++ b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java @@ -232,7 +232,8 @@ public SegmentPruner getSegmentPruner() return new FilterSegmentPruner( topQuery.getFilter(), - baseFields + baseFields, + topQuery.getVirtualColumns() ); } diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java index 9d5abd6f76e6..b1e0d01c7476 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java @@ -24,6 +24,7 @@ import org.apache.druid.data.input.StringTuple; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.segment.VirtualColumns; import javax.annotation.Nullable; import java.util.Arrays; @@ -33,6 +34,7 @@ public abstract class BaseDimensionRangeShardSpec implements ShardSpec { protected final List dimensions; + protected final VirtualColumns virtualColumns; @Nullable protected final StringTuple start; @Nullable @@ -40,11 +42,13 @@ public abstract class BaseDimensionRangeShardSpec implements ShardSpec protected BaseDimensionRangeShardSpec( List dimensions, + @Nullable VirtualColumns virtualColumns, @Nullable StringTuple start, @Nullable StringTuple end ) { this.dimensions = dimensions; + this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns; this.start = start; this.end = end; } diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java index 308e36474a6b..c15f58370a78 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.StringTuple; +import org.apache.druid.segment.VirtualColumns; import javax.annotation.Nullable; import java.util.List; @@ -109,6 +110,7 @@ public DimensionRangeShardSpec convert(int numCorePartitions) numCorePartitions ) : new DimensionRangeShardSpec( dimensions, + VirtualColumns.EMPTY, start, end, partitionId, diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java index c37ea1400572..fc39f02b02c9 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.StringTuple; +import org.apache.druid.segment.VirtualColumns; import javax.annotation.Nullable; import java.util.List; @@ -51,7 +52,7 @@ public DimensionRangeBucketShardSpec( @JsonProperty("end") @Nullable StringTuple end ) { - super(dimensions, start, end); + super(dimensions, VirtualColumns.EMPTY, start, end); // Verify that the tuple sizes and number of dimensions are the same Preconditions.checkArgument( start == null || start.size() == dimensions.size(), diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java index a0a850dae08a..fea3d82cb5b3 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java @@ -20,12 +20,14 @@ package org.apache.druid.timeline.partition; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeSet; import org.apache.druid.data.input.StringTuple; +import org.apache.druid.segment.VirtualColumns; import javax.annotation.Nullable; import java.util.Collections; @@ -53,13 +55,14 @@ public class DimensionRangeShardSpec extends BaseDimensionRangeShardSpec @JsonCreator public DimensionRangeShardSpec( @JsonProperty("dimensions") List dimensions, + @JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns, @JsonProperty("start") @Nullable StringTuple start, @JsonProperty("end") @Nullable StringTuple end, @JsonProperty("partitionNum") int partitionNum, @JsonProperty("numCorePartitions") @Nullable Integer numCorePartitions // nullable for backward compatibility ) { - super(dimensions, start, end); + super(dimensions, virtualColumns, start, end); Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0"); Preconditions.checkArgument( dimensions != null && !dimensions.isEmpty(), @@ -76,6 +79,13 @@ public List getDimensions() return dimensions; } + @JsonProperty("virtualColumns") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public VirtualColumns getVirtualColumns() + { + return virtualColumns; + } + @Nullable @JsonProperty("start") public StringTuple getStartTuple() @@ -107,13 +117,13 @@ public int getNumCorePartitions() @Override public ShardSpec withPartitionNum(int partitionNum) { - return new DimensionRangeShardSpec(dimensions, start, end, partitionNum, numCorePartitions); + return new DimensionRangeShardSpec(dimensions, virtualColumns, start, end, partitionNum, numCorePartitions); } @Override public ShardSpec withCorePartitions(int partitions) { - return new DimensionRangeShardSpec(dimensions, start, end, partitionNum, partitions); + return new DimensionRangeShardSpec(dimensions, virtualColumns, start, end, partitionNum, partitions); } public boolean isNumCorePartitionsUnknown() @@ -127,6 +137,12 @@ public List getDomainDimensions() return Collections.unmodifiableList(dimensions); } + @Override + public VirtualColumns getDomainVirtualColumns() + { + return virtualColumns; + } + /** * Set[:i] is the cartesian product of Set[0],...,Set[i - 1] * EffectiveDomain[:i] is defined as QueryDomain[:i] INTERSECTION SegmentRange[:i] @@ -289,6 +305,7 @@ public boolean equals(Object o) return partitionNum == shardSpec.partitionNum && numCorePartitions == shardSpec.numCorePartitions && Objects.equals(dimensions, shardSpec.dimensions) && + Objects.equals(virtualColumns, shardSpec.virtualColumns) && Objects.equals(start, shardSpec.start) && Objects.equals(end, shardSpec.end); } @@ -296,7 +313,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(dimensions, start, end, partitionNum, numCorePartitions); + return Objects.hash(dimensions, virtualColumns, start, end, partitionNum, numCorePartitions); } @Override @@ -304,6 +321,7 @@ public String toString() { return "DimensionRangeShardSpec{" + "dimensions='" + dimensions + '\'' + + ", virtualColumns=" + virtualColumns + ", start='" + start + '\'' + ", end='" + end + '\'' + ", partitionNum=" + partitionNum + diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java index b91c96387a9d..da1c046465bf 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.collect.RangeSet; import org.apache.druid.error.DruidException; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; import java.util.List; import java.util.Map; @@ -136,13 +138,28 @@ default short getAtomicUpdateGroupSize() ShardSpecLookup getLookup(List shardSpecs); /** - * Get dimensions who have possible range for the rows this shard contains. + * Get dimensions who have possible range for the rows this shard contains. These columns might be physical columns + * stored in the shard, or computed expressions, in which case the manner in which they were computed is available in + * {@link #getDomainVirtualColumns()}. * - * @return list of dimensions who has its possible range. Dimensions with unknown possible range are not listed + * @return list of dimensions who has its possible range. Dimensions with unknown possible range are not listed. */ @JsonIgnore List getDomainDimensions(); + /** + * If any of the columns in {@link #getDomainDimensions()} was computed with an expression and was not stored, the + * {@link org.apache.druid.segment.VirtualColumn} which computes it is stored here. This allows matching ranges even + * when the value is not stored in the shard so long as {@link VirtualColumns#findEquivalent(VirtualColumn)} exists. + * + * @return {@link VirtualColumns} associated with columns listed in {@link #getDomainDimensions()}. + */ + @JsonIgnore + default VirtualColumns getDomainVirtualColumns() + { + return VirtualColumns.EMPTY; + } + /** * if given domain ranges are not possible in this shard, return false; otherwise return true; * diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java index e47b5aafaeae..ae9ea1fb416e 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.StringTuple; +import org.apache.druid.segment.VirtualColumns; import javax.annotation.Nullable; import java.util.Collections; @@ -52,6 +53,7 @@ public SingleDimensionRangeBucketShardSpec( { super( dimension == null ? Collections.emptyList() : Collections.singletonList(dimension), + VirtualColumns.EMPTY, start == null ? null : StringTuple.create(start), end == null ? null : StringTuple.create(end) ); diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java index 59ef1be3d6ac..aa346adabbf4 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java @@ -25,6 +25,7 @@ import com.google.common.collect.Range; import com.google.common.collect.RangeSet; import org.apache.druid.data.input.StringTuple; +import org.apache.druid.segment.VirtualColumns; import javax.annotation.Nullable; import java.util.Collections; @@ -62,6 +63,7 @@ public SingleDimensionShardSpec( { super( dimension == null ? Collections.emptyList() : Collections.singletonList(dimension), + VirtualColumns.EMPTY, start == null ? null : StringTuple.create(start), end == null ? null : StringTuple.create(end), partitionNum, diff --git a/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java index 131337ad4683..a1a1dba0c281 100644 --- a/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java +++ b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java @@ -24,7 +24,9 @@ import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; @@ -46,7 +48,7 @@ void testNullFilter() { Throwable t = Assertions.assertThrows( DruidException.class, - () -> new FilterSegmentPruner(null, null) + () -> new FilterSegmentPruner(null, null, null) ); Assertions.assertEquals("filter must not be null", t.getMessage()); } @@ -70,13 +72,73 @@ void testPrune() List segs = List.of(seg1, seg2, seg3, seg4, seg5, seg6, seg7); - FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null); - FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a, Collections.emptySet()); - FilterSegmentPruner prunerExpression = new FilterSegmentPruner(expression_b, null); + FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null, null); + FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a, Collections.emptySet(), null); + FilterSegmentPruner prunerExpression = new FilterSegmentPruner(expression_b, null, null); + // prune twice to exercise cache Assertions.assertEquals(Set.of(seg1, seg4, seg5, seg6, seg7), prunerRange.prune(segs, Function.identity())); + Assertions.assertEquals(Set.of(seg1, seg4, seg5, seg6, seg7), prunerRange.prune(segs, Function.identity())); + Assertions.assertEquals(Set.copyOf(segs), prunerExpression.prune(segs, Function.identity())); Assertions.assertEquals(Set.copyOf(segs), prunerExpression.prune(segs, Function.identity())); Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, Function.identity())); + Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, Function.identity())); + } + + @Test + void testPruneVirtualColumn() + { + VirtualColumns shardVirtualColumns = VirtualColumns.create( + new ExpressionVirtualColumn("vdim1", "concat(dim1, 'foo')", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ); + VirtualColumns shardVirtualColumnsDifferentName = VirtualColumns.create( + new ExpressionVirtualColumn("vdifferentname", "concat(dim1, 'foo')", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ); + + String interval1 = "2026-02-18T00:00:00Z/2026-02-19T00:00:00Z"; + + DataSegment seg1 = makeDataSegment( + interval1, + makeRange(List.of("vdim1"), shardVirtualColumns, 0, null, StringTuple.create("abcfoo")) + ); + DataSegment seg2 = makeDataSegment( + interval1, + makeRange(List.of("vdim1"), shardVirtualColumns, 1, StringTuple.create("abcfoo"), StringTuple.create("lmnfoo")) + ); + // same virtual column with a different name in this segment + DataSegment seg3 = makeDataSegment( + interval1, + makeRange(List.of("vdifferentname"), shardVirtualColumnsDifferentName, 2, StringTuple.create("lmnfoo"), null) + ); + + List segs = List.of(seg1, seg2, seg3); + + // same expression, same name + VirtualColumns queryVirtualColumns = VirtualColumns.create( + new ExpressionVirtualColumn("vdim1", "concat(dim1, 'foo')", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ); + DimFilter range_a = new RangeFilter("vdim1", ColumnType.STRING, null, "aaa", null, null, null); + FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null, queryVirtualColumns); + FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a, Collections.emptySet(), queryVirtualColumns); + // prune twice to exercise cache + Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, Function.identity())); + Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, Function.identity())); + Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, Function.identity())); + Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, Function.identity())); + + // same expression, different name + queryVirtualColumns = VirtualColumns.create( + new ExpressionVirtualColumn("v0", "concat(dim1, 'foo')", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ); + range_a = new RangeFilter("v0", ColumnType.STRING, null, "aaa", null, null, null); + prunerRange = new FilterSegmentPruner(range_a, null, queryVirtualColumns); + prunerEmptyFields = new FilterSegmentPruner(range_a, Collections.emptySet(), queryVirtualColumns); + + // prune twice to exercise cache + Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, Function.identity())); + Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, Function.identity())); + Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, Function.identity())); + Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, Function.identity())); } @Test @@ -106,9 +168,27 @@ private ShardSpec makeRange( @Nullable StringTuple start, @Nullable StringTuple end ) + { + return makeRange( + columns, + null, + partitionNumber, + start, + end + ); + } + + private ShardSpec makeRange( + List columns, + VirtualColumns virtualColumns, + int partitionNumber, + @Nullable StringTuple start, + @Nullable StringTuple end + ) { return new DimensionRangeShardSpec( columns, + virtualColumns, start, end, partitionNumber, diff --git a/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java b/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java index b8099e227a4f..6f7af3fecb8b 100644 --- a/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.StringTuple; +import org.apache.druid.segment.VirtualColumns; import org.junit.Assert; import org.junit.Test; @@ -39,6 +40,7 @@ public void testConvert() Assert.assertEquals( new DimensionRangeShardSpec( Arrays.asList("dim1", "dim2"), + VirtualColumns.EMPTY, StringTuple.create("start1", "start2"), StringTuple.create("end1", "end2"), 5, diff --git a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java index f4a2d88b5cd9..7eb607ecffb8 100644 --- a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java @@ -198,6 +198,9 @@ public void testInvalidEndTupleSize() @Test public void testEquals() { - EqualsVerifier.forClass(DimensionRangeBucketShardSpec.class).usingGetClass().verify(); + EqualsVerifier.forClass(DimensionRangeBucketShardSpec.class) + .usingGetClass() + .withIgnoredFields("virtualColumns") + .verify(); } } diff --git a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java index ade40e6ae38f..0f62da3518b1 100644 --- a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.StringTuple; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.VirtualColumns; import org.junit.Assert; import org.junit.Test; @@ -50,15 +51,30 @@ public void testShardSpecLookup() setDimensions("dim1", "dim2"); final List shardSpecs = ImmutableList.of( - new DimensionRangeShardSpec(dimensions, null, StringTuple.create("India", "Delhi"), 1, 1), new DimensionRangeShardSpec( dimensions, + VirtualColumns.EMPTY, + null, + StringTuple.create("India", "Delhi"), + 1, + 1 + ), + new DimensionRangeShardSpec( + dimensions, + VirtualColumns.EMPTY, StringTuple.create("India", "Delhi"), StringTuple.create("Spain", "Valencia"), 2, 1 ), - new DimensionRangeShardSpec(dimensions, StringTuple.create("Spain", "Valencia"), null, 3, 1) + new DimensionRangeShardSpec( + dimensions, + VirtualColumns.EMPTY, + StringTuple.create("Spain", "Valencia"), + null, + 3, + 1 + ) ); final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs); final long currentTime = DateTimes.nowUtc().getMillis(); @@ -143,6 +159,7 @@ public void testShardSpecLookupWithNull() final DimensionRangeShardSpec shard0 = new DimensionRangeShardSpec( dimensions, + VirtualColumns.EMPTY, null, StringTuple.create("India", null), 1, @@ -151,6 +168,7 @@ public void testShardSpecLookupWithNull() final DimensionRangeShardSpec shard1 = new DimensionRangeShardSpec( dimensions, + VirtualColumns.EMPTY, StringTuple.create("India", null), StringTuple.create("Spain", "Valencia"), 10, @@ -159,6 +177,7 @@ public void testShardSpecLookupWithNull() final DimensionRangeShardSpec shard2 = new DimensionRangeShardSpec( dimensions, + VirtualColumns.EMPTY, StringTuple.create("Spain", "Valencia"), StringTuple.create("Tokyo", null), 10, @@ -167,6 +186,7 @@ public void testShardSpecLookupWithNull() final DimensionRangeShardSpec shard3 = new DimensionRangeShardSpec( dimensions, + VirtualColumns.EMPTY, StringTuple.create("Tokyo", null), null, 100, @@ -202,7 +222,7 @@ public void testPossibleInDomain_withNullStart() final RangeSet universalSet = TreeRangeSet.create(); universalSet.add(Range.all()); - ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, null); + ShardSpec shard = new DimensionRangeShardSpec(dimensions, VirtualColumns.EMPTY, start, end, 0, null); Map> domain = new HashMap<>(); // {Mars} * {Zoo, Zuu} * {Blah, Random} @@ -265,7 +285,7 @@ public void testPossibleInDomain_withNullValues() final RangeSet universalSet = TreeRangeSet.create(); universalSet.add(Range.all()); - ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, null); + ShardSpec shard = new DimensionRangeShardSpec(dimensions, VirtualColumns.EMPTY, start, end, 0, null); Map> domain = new HashMap<>(); // (-INF, INF) * (-INF, INF) * (-INF, INF) @@ -345,7 +365,7 @@ public void testPossibleInDomain_nonNullValues_acceptanceScenarios() final RangeSet universalSet = TreeRangeSet.create(); universalSet.add(Range.all()); - ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, null); + ShardSpec shard = new DimensionRangeShardSpec(dimensions, VirtualColumns.EMPTY, start, end, 0, null); Map> domain = new HashMap<>(); // (-INF, INF) * (-INF, INF) * (-INF, INF) @@ -442,7 +462,7 @@ public void testPossibleInDomain_nonNullValues_pruningScenarios() final RangeSet universalSet = TreeRangeSet.create(); universalSet.add(Range.all()); - ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, null); + ShardSpec shard = new DimensionRangeShardSpec(dimensions, VirtualColumns.EMPTY, start, end, 0, null); Map> domain = new HashMap<>(); // (-INF, Earth) U (Earth, INF) * (-INF, INF) * (-INF, INF) @@ -504,7 +524,7 @@ public void testPossibleInDomain_falsePruning() final RangeSet universalSet = TreeRangeSet.create(); universalSet.add(Range.all()); - ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, null); + ShardSpec shard = new DimensionRangeShardSpec(dimensions, VirtualColumns.EMPTY, start, end, 0, null); Map> domain = new HashMap<>(); // {Earth} U {Mars} * (USA, INF) * (-INF, INF) diff --git a/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java b/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java index 2c3442645d26..daad7aafc94f 100644 --- a/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java @@ -24,6 +24,7 @@ import org.apache.druid.data.input.StringTuple; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Test; @@ -86,6 +87,7 @@ public static Iterable constructorFeeder() ImmutableList.of( new DimensionRangeShardSpec( Collections.singletonList("dim"), + VirtualColumns.EMPTY, null, StringTuple.create("aaa"), 0, @@ -93,6 +95,7 @@ public static Iterable constructorFeeder() ), new DimensionRangeShardSpec( Collections.singletonList("dim"), + VirtualColumns.EMPTY, StringTuple.create("ttt"), StringTuple.create("zzz"), 2, @@ -100,6 +103,7 @@ public static Iterable constructorFeeder() ), new DimensionRangeShardSpec( Collections.singletonList("dim"), + VirtualColumns.EMPTY, StringTuple.create("bbb"), StringTuple.create("fff"), 1, @@ -116,6 +120,7 @@ public static Iterable constructorFeeder() ImmutableList.of( new DimensionRangeShardSpec( Collections.singletonList("dim"), + VirtualColumns.EMPTY, StringTuple.create("bbb"), StringTuple.create("fff"), 1, @@ -123,6 +128,7 @@ public static Iterable constructorFeeder() ), new DimensionRangeShardSpec( Collections.singletonList("dim"), + VirtualColumns.EMPTY, StringTuple.create("fff"), null, 2, @@ -130,6 +136,7 @@ public static Iterable constructorFeeder() ), new DimensionRangeShardSpec( Collections.singletonList("dim"), + VirtualColumns.EMPTY, null, StringTuple.create("bbb"), 0, diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java index 532c71977189..c66def987934 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java @@ -28,7 +28,12 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.query.QueryContext; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.virtual.VirtualizedColumnInspector; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -38,9 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; /** @@ -119,7 +121,8 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn if (newConfig.getTuningConfig() != null) { validationResults.add(validatePartitionsSpecForMSQ( newConfig.getTuningConfig().getPartitionsSpec(), - newConfig.getDimensionsSpec() == null ? null : newConfig.getDimensionsSpec().getDimensions() + newConfig.getDimensionsSpec() == null ? null : newConfig.getDimensionsSpec().getDimensions(), + newConfig.getTransformSpec() == null ? VirtualColumns.EMPTY : newConfig.getTransformSpec().getVirtualColumns() )); } if (newConfig.getGranularitySpec() != null) { @@ -142,7 +145,8 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn */ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ( @Nullable PartitionsSpec partitionsSpec, - @Nullable List dimensionSchemas + @Nullable List dimensionSchemas, + VirtualColumns virtualColumns ) { if (partitionsSpec == null) { @@ -165,19 +169,36 @@ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ( ); } if (partitionsSpec instanceof DimensionRangePartitionsSpec && dimensionSchemas != null) { - Map dimensionSchemaMap = dimensionSchemas.stream().collect( - Collectors.toMap(DimensionSchema::getName, Function.identity()) - ); - Optional nonStringDimension = ((DimensionRangePartitionsSpec) partitionsSpec) - .getPartitionDimensions() - .stream() - .filter(dim -> !ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType())) - .findAny(); - if (nonStringDimension.isPresent()) { + RowSignature.Builder baseSignatureBuilder = RowSignature.builder().addTimeColumn(); + for (DimensionSchema dimensionSchema : dimensionSchemas) { + baseSignatureBuilder.add(dimensionSchema.getName(), dimensionSchema.getColumnType()); + } + final RowSignature baseSignature = baseSignatureBuilder.build(); + final ColumnInspector inspector = new VirtualizedColumnInspector(baseSignature, virtualColumns); + + String nonString = null; + ColumnType nonStringType = null; + for (String dim : ((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions()) { + ColumnType partitionType = baseSignature.getColumnType(dim).orElse(null); + if (partitionType == null) { + VirtualColumn virtualColumn = virtualColumns.getVirtualColumn(dim); + if (virtualColumn != null) { + partitionType = ColumnType.fromCapabilities( + virtualColumn.capabilities(inspector, virtualColumn.getOutputName()) + ); + } + } + if (!ColumnType.STRING.equals(partitionType)) { + nonString = dim; + nonStringType = partitionType; + break; + } + } + if (nonString != null) { return CompactionConfigValidationResult.failure( "MSQ: Non-string partition dimension[%s] of type[%s] not supported with 'range' partition spec", - nonStringDimension.get(), - dimensionSchemaMap.get(nonStringDimension.get()).getTypeName() + nonString, + nonStringType ); } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java index f0d35ef324b3..2730c98899e6 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java @@ -161,7 +161,7 @@ public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsInvalid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "MSQ: Non-string partition dimension[partitionDim] of type[long] not supported with 'range' partition spec", + "MSQ: Non-string partition dimension[partitionDim] of type[LONG] not supported with 'range' partition spec", validationResult.getReason() ); } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 8e166dbfcf91..91963e3a96d1 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -46,6 +46,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.TestDataSource; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.metadata.FingerprintGenerator; import org.apache.druid.segment.metadata.HeapMemoryIndexingStateStorage; @@ -670,7 +671,7 @@ public void testCommitReplaceSegmentsWithUpdatedCorePartitions() ImmutableMap.of("path", "a-" + i), ImmutableList.of("dim1"), ImmutableList.of("m1"), - new DimensionRangeShardSpec(List.of("dim1"), null, null, i - 1, 8), + new DimensionRangeShardSpec(List.of("dim1"), null, null, null, i - 1, 8), 9, 100 ); @@ -695,7 +696,7 @@ public void testCommitReplaceSegmentsWithUpdatedCorePartitions() ImmutableMap.of("path", "b-" + i), ImmutableList.of("dim1"), ImmutableList.of("m1"), - new DimensionRangeShardSpec(List.of("dim1"), null, null, i - 1, 8), + new DimensionRangeShardSpec(List.of("dim1"), null, null, null, i - 1, 8), 9, 100 ); @@ -3622,6 +3623,7 @@ public void testAddNumberedShardSpecAfterMultiDimensionsShardSpecWithUnknownCore metrics, new DimensionRangeShardSpec( Collections.singletonList("dim"), + VirtualColumns.EMPTY, i == 0 ? null : StringTuple.create(String.valueOf(i - 1)), i == 5 ? null : StringTuple.create(String.valueOf(i)), i, @@ -4316,7 +4318,7 @@ public void testRetrieveUsedSegmentsForSegmentAllocation() loadspec, dimensions, metrics, - new DimensionRangeShardSpec(dimensions, null, null, 0, 1), + new DimensionRangeShardSpec(dimensions, VirtualColumns.EMPTY, null, null, 0, 1), 0, 100 );