Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.junit.Assert;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class DimensionRangeShardSpecBenchmark
// Initial segment (null -> values)
private final DimensionRangeShardSpec shardSpec0 = new DimensionRangeShardSpec(
Arrays.asList("country", "city"),
VirtualColumns.EMPTY,
new StringTuple(new String[]{null, null}),
new StringTuple(new String[]{"Germany", "Munich"}),
0,
Expand All @@ -82,6 +84,7 @@ public class DimensionRangeShardSpecBenchmark
// Middle segment (values -> other values)
private final DimensionRangeShardSpec shardSpec1 = new DimensionRangeShardSpec(
Arrays.asList("country", "city"),
VirtualColumns.EMPTY,
new StringTuple(new String[]{"Germany", "Munich"}),
new StringTuple(new String[]{"United States", "New York"}),
1,
Expand All @@ -91,6 +94,7 @@ public class DimensionRangeShardSpecBenchmark
// End segment (values -> null)
private final DimensionRangeShardSpec shardSpec2 = new DimensionRangeShardSpec(
Arrays.asList("country", "city"),
VirtualColumns.EMPTY,
new StringTuple(new String[]{"United States", "New York"}),
new StringTuple(new String[]{null, null}),
2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.query.filter.NotDimFilter;
Expand Down Expand Up @@ -93,6 +95,7 @@
import org.apache.druid.testing.tools.StreamGenerator;
import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -150,22 +153,6 @@ public EmbeddedDruidCluster createCluster()
.addServer(new EmbeddedRouter());
}


private void configureCompaction(CompactionEngine compactionEngine, @Nullable CompactionCandidateSearchPolicy policy)
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(
1.0,
100,
policy,
true,
compactionEngine,
true
))
);
Assertions.assertTrue(updateResponse.isSuccess());
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToYearGranularity_withInlineConfig(
Expand Down Expand Up @@ -307,41 +294,6 @@ public void test_minorCompactionWithMSQ(PartitionsSpec partitionsSpec) throws Ex
Assertions.assertEquals(4000, getTotalRowCount());
}

protected void ingest1kRecords()
{
final EventSerializer serializer = new JsonEventSerializer(overlord.bindings().jsonMapper());
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 500, 100);
List<byte[]> records = streamGenerator.generateEvents(2);

final InlineInputSource input = new InlineInputSource(
records.stream().map(b -> new String(b, StandardCharsets.UTF_8)).collect(Collectors.joining("\n")));
final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
input,
new JsonInputFormat(null, null, null, null, null),
true,
null
);
final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec(
DataSchema.builder()
.withDataSource(dataSource)
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
.build(),
ioConfig,
TuningConfigBuilder.forParallelIndexTask().build()
);
final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
taskId,
null,
null,
indexIngestionSpec,
null
);
cluster.callApi().submitTask(task);
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine compactionEngine)
Expand Down Expand Up @@ -575,6 +527,184 @@ public void test_cascadingReindexing_withVirtualColumnOnNestedData_filtersCorrec
Assertions.assertTrue(count > 0);
}

@Test
public void test_compaction_cluster_by_virtualcolumn()
{
// Virtual Columns on nested data is only supported with MSQ compaction engine right now.
CompactionEngine compactionEngine = CompactionEngine.MSQ;
configureCompaction(compactionEngine, null);

String jsonDataWithNestedColumn =
"""
{"timestamp": "2023-01-01T00:00:00", "str":"a", "obj":{"a": "ll"}}
{"timestamp": "2023-01-01T00:00:00", "str":"", "obj":{"a": "mm"}}
{"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a": "nn"}}
{"timestamp": "2023-01-01T00:00:00", "str":"b", "obj":{"a": "oo"}}
{"timestamp": "2023-01-01T00:00:00", "str":"c", "obj":{"a": "pp"}}
{"timestamp": "2023-01-01T00:00:00", "str":"d", "obj":{"a": "qq"}}
{"timestamp": "2023-01-01T00:00:00", "str":null, "obj":{"a": "rr"}}
""";

final TaskBuilder.Index task = TaskBuilder
.ofTypeIndex()
.dataSource(dataSource)
.jsonInputFormat()
.inlineInputSourceWithData(jsonDataWithNestedColumn)
.isoTimestampColumn("timestamp")
.schemaDiscovery()
.granularitySpec("DAY", null, false);

cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

Assertions.assertEquals(7, getTotalRowCount());

VirtualColumns virtualColumns = VirtualColumns.create(
new ExpressionVirtualColumn("v0", "json_value(obj, '$.a')", ColumnType.STRING, TestExprMacroTable.INSTANCE)
);

InlineSchemaDataSourceCompactionConfig config =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withTransformSpec(
new CompactionTransformSpec(
null,
virtualColumns
)
)
.withTuningConfig(
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new DimensionRangePartitionsSpec(4, null, List.of("v0"), false))
.build()
)
.build();

runCompactionWithSpec(config);
waitForAllCompactionTasksToFinish();

cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

List<DataSegment> segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord).stream().toList();
Assertions.assertEquals(2, segments.size());
Assertions.assertEquals(
new DimensionRangeShardSpec(
List.of("v0"),
virtualColumns,
null,
StringTuple.create("oo"),
0,
2
),
segments.get(0).getShardSpec()
);
Assertions.assertEquals(
new DimensionRangeShardSpec(
List.of("v0"),
virtualColumns,
StringTuple.create("oo"),
null,
1,
2
),
segments.get(1).getShardSpec()
);
}

@Test
public void test_compaction_cluster_by_virtualcolumn_rollup()
{
// Virtual Columns on nested data is only supported with MSQ compaction engine right now.
CompactionEngine compactionEngine = CompactionEngine.MSQ;
configureCompaction(compactionEngine, null);

String jsonDataWithNestedColumn =
"""
{"timestamp": "2023-01-01T00:00:00", "str":"a", "obj":{"a": "ll"}}
{"timestamp": "2023-01-01T00:00:00", "str":"", "obj":{"a": "mm"}}
{"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a": "nn"}}
{"timestamp": "2023-01-01T00:00:00", "str":"b", "obj":{"a": "oo"}}
{"timestamp": "2023-01-01T00:00:00", "str":"c", "obj":{"a": "pp"}}
{"timestamp": "2023-01-01T00:00:00", "str":"d", "obj":{"a": "qq"}}
{"timestamp": "2023-01-01T00:00:00", "str":null, "obj":{"a": "rr"}}
""";

final TaskBuilder.Index task = TaskBuilder
.ofTypeIndex()
.dataSource(dataSource)
.jsonInputFormat()
.inlineInputSourceWithData(jsonDataWithNestedColumn)
.isoTimestampColumn("timestamp")
.schemaDiscovery()
.dataSchema(builder -> builder.withAggregators(new CountAggregatorFactory("count")))
.granularitySpec("DAY", "MINUTE", true);

cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

Assertions.assertEquals(7, getTotalRowCount());

VirtualColumns virtualColumns = VirtualColumns.create(
new ExpressionVirtualColumn(
"v0",
"json_value(obj, '$.a')",
ColumnType.STRING,
TestExprMacroTable.INSTANCE
)
);

InlineSchemaDataSourceCompactionConfig config =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withTransformSpec(
new CompactionTransformSpec(
null,
virtualColumns
)
)
.withTuningConfig(
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new DimensionRangePartitionsSpec(4, null, List.of("v0"), false))
.build()
)
.build();

runCompactionWithSpec(config);
waitForAllCompactionTasksToFinish();

cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

List<DataSegment> segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord).stream().toList();
Assertions.assertEquals(2, segments.size());
Assertions.assertEquals(
new DimensionRangeShardSpec(
List.of("v0"),
virtualColumns,
null,
StringTuple.create("oo"),
0,
2
),
segments.get(0).getShardSpec()
);
Assertions.assertEquals(
new DimensionRangeShardSpec(
List.of("v0"),
virtualColumns,
StringTuple.create("oo"),
null,
1,
2
),
segments.get(1).getShardSpec()
);
}

/**
* Tests that when a compaction task filters out all rows using a transform spec,
* tombstones are created to properly drop the old segments. This test covers both
Expand Down Expand Up @@ -702,6 +832,56 @@ public void test_compaction_legacy_string_discovery_sparse_column(
Assertions.assertEquals(1, segments.get(0).getDimensions().size());
}

private void configureCompaction(CompactionEngine compactionEngine, @Nullable CompactionCandidateSearchPolicy policy)
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(
1.0,
100,
policy,
true,
compactionEngine,
true
))
);
Assertions.assertTrue(updateResponse.isSuccess());
}

protected void ingest1kRecords()
{
final EventSerializer serializer = new JsonEventSerializer(overlord.bindings().jsonMapper());
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 500, 100);
List<byte[]> records = streamGenerator.generateEvents(2);

final InlineInputSource input = new InlineInputSource(
records.stream().map(b -> new String(b, StandardCharsets.UTF_8)).collect(Collectors.joining("\n")));
final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
input,
new JsonInputFormat(null, null, null, null, null),
true,
null
);
final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec(
DataSchema.builder()
.withDataSource(dataSource)
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
.build(),
ioConfig,
TuningConfigBuilder.forParallelIndexTask().build()
);
final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
taskId,
null,
null,
indexIngestionSpec,
null
);
cluster.callApi().submitTask(task);
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
}

private int getTotalRowCount()
{
return Numbers.parseInt(cluster.runSql("SELECT COUNT(*) as cnt FROM \"%s\"", dataSource));
Expand All @@ -723,7 +903,6 @@ private void verifyNoRowsWithNestedValue(String nestedColumn, String field, Stri
);
}


private String generateEventsInInterval(Interval interval, int numEvents, long spacingMillis)
{
List<String> events = new ArrayList<>();
Expand Down
Loading
Loading