Add support for MSQ CLUSTERED BY expressions to be preserved in the segment shard spec as virtual columns#19061
Conversation
…egment shard spec as virtual columns changes: * `ShardSpec` interface has a new method, `getDomainVirtualColumns` to provide the virtual column information for pruning * `DimensionRangeShardSpec` stores `VirtualColumns` in segment metadata so they can be compared to query expressions and be used for pruning * `FilterSegmentPruner` is virtual column aware for segment pruning using the new methods * `ClusterBy` now contains a map of key column to `VirtualColumn` alongside key columns, to support key columns being virtual columns * `ControllerImpl` persists clustering virtual columns in compaction state in the transform spec * `MSQCompactionRunner` handles virtual columns in order-by/cluster-by for compaction
3bf2b31 to
814b4ec
Compare
pom.xml
Outdated
| <failsOnError>true</failsOnError> | ||
| <excludes> | ||
| *com/fasterxml/jackson/databind/*,**/NestedDataFormatsTest.java | ||
| *com/fasterxml/jackson/databind/*,**/NestedDataFormatsTest.java,**/CompactionSupervisorTest.java,**/MultiStageQueryTest.java |
There was a problem hiding this comment.
i'm going to upgrade checkstyle in a follow-up so we can remove this stuff now that #18977 is merged
|
I am wondering if we have to push up a minor refactor of the cascading reindexing stuff from #18939 to support this for that type of MSQ compaction supervisor? Not saying it needs to be in this PR. In fact it should be able to be done in parallel. The VCs in the tuning config rule that I talk about below would just be meaningless without the context of this PR off the top of my head: add a also we'd have to make sure the config optimizer doesn't nuke the VC for partitioning just because no dim filters in the deletes reference it. |
Yea, there is some work to do to support the reindexing stuff, I haven't decide yet what is best way to do it. Part of me thinks it would be kind of nice if all of the partitioning stuff was part of the same config like maybe a |
capistrant
left a comment
There was a problem hiding this comment.
I've got no objections if you want to merge this now. Regarding cascading reindexing template support, I hope to open a gh issue soon and will call out how we need to adapt that to work nicely with this before D37
| @JsonCreator | ||
| public ClusterBy( | ||
| @JsonProperty("columns") List<KeyColumn> columns, | ||
| @JsonProperty("virtualColumnMap") @Nullable Map<String, VirtualColumn> virtualColumnMap, |
There was a problem hiding this comment.
Why does this need to be on the clusterBy? It seems to me like the wrong place to put it, since clusterBy is an MSQ framework concept and virtual columns are an ingestion & query concept.
There was a problem hiding this comment.
I couldn't really find a clean way to get them to create shardspec or compaction state, while the clusterby is created from the query, so it seemed by far the easiest and least disruptive to just add additional information if those cluster keys were created from virtual columns by the query creating the object.
Am i missing some clean way i could get the virtual columns at the time we are translating the clusterby into shardspec/compaction state?
| @JsonCreator | ||
| public DimensionRangeShardSpec( | ||
| @JsonProperty("dimensions") List<String> dimensions, | ||
| @JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns, |
There was a problem hiding this comment.
Are there going to be issues with deserializing virtual columns on server types that haven't had to deal with them before (like the Coordinator)? I wonder if all expressions are registered there or if some modules have more narrow scopes.
There was a problem hiding this comment.
Good question i guess, i am not aware of any modules which conditionally register expressions or custom virtual implementation, but i guess they could exist... All of the built in expressions seemed fine at least since everything has a macro table from expression module, and using the json virtual column worked fine too.
imo if someone is trying to partition by something that makes the coordinator explode, then maybe we should fix that thing so that it doesn't explode can load on the coordinator?
| } else { | ||
| transformSpec = new CompactionTransformSpec( | ||
| dataSchema.getTransformSpec().getFilter(), | ||
| VirtualColumns.create(clusterBy.getVirtualColumnMap().values()) |
There was a problem hiding this comment.
Won't adding the virtual columns to the transformSpec make them become real columns? I don't think that's what we want.
There was a problem hiding this comment.
Won't adding the virtual columns to the transformSpec make them become real columns? I don't think that's what we want.
I don't think so, depending on what you mean by real columns. Virtual columns being here on compaction transform config is a new MSQ compaction only thing that was added a couple of weeks ago for reindexing templates to support filters on virtual columns (for the deletion rules). Native compaction explodes if they exist.
I would agree that it is kind of an odd and confusing place to define virtual columns available for MSQ compaction, especially since there is no such thing on the actual TransformSpec. They were added they were added there in that PR i believe because there wasn't really a better existing place on the compaction config.
The virtual columns here are added as part of building the synthetic virtual columns like time_floor and mv_to_array stuff, which are then passed into the build query methods which can add them as appropriate depending on how they are used, https://github.com/clintropolis/druid/blob/d5d63c753cf5c3216081ebd0dc9797deb8c72876/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java#L276.
There was a problem hiding this comment.
By real columns I mean actual physically stored columns. I didn't catch the change that recently added virtualColumns to CompactionTransformSpec. I suppose I assumed they worked the same way as transforms on the regular TransformSpec, in that they actually create columns. It would be good to have javadocs on CompactionTransformSpec that explain that the virtualColumns are just for use by the filter.
But I guess in this patch you're adding a new use for them? How does the new use work?
There was a problem hiding this comment.
the new use allows them to be used as intermediary columns to aid in the sorting/clustering, but not saved in the final segment, basically the MSQ compaction equivalent of writing SQL replace queries like in this test https://github.com/apache/druid/pull/19061/changes#diff-207e886c7791d20d886d23425945203683100878b45eb070547d23ff9ed516deR172
| .filters(dataSchema.getTransformSpec().getFilter()) | ||
| .virtualColumns(VirtualColumns.create(inputColToVirtualCol.values())) | ||
| .columns(columns) | ||
| .columnTypes(rowSignatureWithOrderByBuilder.build().getColumnTypes()) |
There was a problem hiding this comment.
columnTypes, columns, and virtualColumns appear twice in this list
There was a problem hiding this comment.
oops, made a mistake resolving merge conflicts
| public SegmentGeneratorStageProcessor( | ||
| @JsonProperty("dataSchema") final DataSchema dataSchema, | ||
| @JsonProperty("columnMappings") final ColumnMappings columnMappings, | ||
| @JsonProperty("clusterByVirtualColumnsMappings") @Nullable final Map<String, VirtualColumn> clusterByVirtualColumnMappings, |
There was a problem hiding this comment.
Does not match getClusterByVirtualColumnMappings() (Columns vs Column). Please add a serde test.
There was a problem hiding this comment.
fixed and added test
| { | ||
| private final DataSchema dataSchema; | ||
| private final ColumnMappings columnMappings; | ||
| private final Map<String, VirtualColumn> clusterByVirtualColumnMappings; |
There was a problem hiding this comment.
This new field is missing from equals and hashCode. Please add an EqualsVerifier test.
There was a problem hiding this comment.
this must not have mattered all that much, DataSchema also didn't implement equals and hashcode, and after adding it i see why was skipped, was kind of wonky with some lazy initialization of stuff, but between #19109 and #19166 we will soon be able to drop the parser stuff from it completely, so i went ahead and changed it to be eager for now, added equalsverifier tests for both DataSchema and SegmentGeneratorStageProcessor... i may revert this if it has too much trouble in CI, since there were quite a lot of failures just in DataSchemaTest
| sortingNeeded = true; | ||
| break; | ||
| } | ||
| if (query.getVirtualColumns().getVirtualColumn(columnSpec.getDimension()) != null) { |
There was a problem hiding this comment.
This doesn't seem right. The OrderByColumnSpec refers to dimension and aggregator output names. Virtual columns would potentially contain the names of input fields to dimensions and aggregators, but wouldn't contain the output names. What was the check needed for?
There was a problem hiding this comment.
oops, this was not meant to be here i think, and was some experiment i was doing much earlier on, removed
|
|
||
| // if the clustered by requires virtual columns, preserve them here so that we can rebuild during compaction | ||
| CompactionTransformSpec transformSpec; | ||
| // this is true if we are in here |
There was a problem hiding this comment.
what is true if we are in here?
There was a problem hiding this comment.
oops, a stale comment i didn't remove
|
|
||
| // Key must be 100% sortable or 100% nonsortable. If empty, call it sortable. | ||
| boolean sortable = true; | ||
|
|
There was a problem hiding this comment.
The changes in this file have become formatting-only, how about reverting it to match master?
| } | ||
|
|
||
| @Override | ||
| public VirtualColumns getDomainVirtualColumns() |
There was a problem hiding this comment.
Javadoc please. I don't think it will be immediately obvious what this means.
| List<String> dimensions = shard.getDomainDimensions(); | ||
| for (String dimension : dimensions) { | ||
| if (filterFields == null || filterFields.contains(dimension)) { | ||
| final VirtualColumn shardVirtualColumn = shard.getDomainVirtualColumns().getVirtualColumn(dimension); |
There was a problem hiding this comment.
Consider adding test cases that verify pruning still works if the query-time virtual column doesn't have the same name as the one in the shard spec. Maybe it's there but I didn't see one.
server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
Fixed
Show fixed
Hide fixed
server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
Fixed
Show fixed
Hide fixed
capistrant
left a comment
There was a problem hiding this comment.
Few minor test comments. app code looks good to me. Since Gian is deeper in the review, I think I will defer approval to him once he reviews your responses to his comments
| // same expression, different name | ||
| queryVirtualColumns = VirtualColumns.create( | ||
| new ExpressionVirtualColumn("v0", "concat(dim1, 'foo')", ColumnType.STRING, TestExprMacroTable.INSTANCE) | ||
| ); | ||
| range_a = new RangeFilter("v0", ColumnType.STRING, null, "aaa", null, null, null); | ||
| prunerRange = new FilterSegmentPruner(range_a, null, queryVirtualColumns); | ||
| prunerEmptyFields = new FilterSegmentPruner(range_a, Collections.emptySet(), queryVirtualColumns); | ||
|
|
||
| Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, Function.identity())); | ||
| Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, Function.identity())); | ||
|
|
||
| // same expression, different name | ||
| queryVirtualColumns = VirtualColumns.create( | ||
| new ExpressionVirtualColumn("v10", "concat(dim1, 'foo')", ColumnType.STRING, TestExprMacroTable.INSTANCE) | ||
| ); | ||
| range_a = new RangeFilter("v10", ColumnType.STRING, null, "aaa", null, null, null); | ||
| prunerRange = new FilterSegmentPruner(range_a, null, queryVirtualColumns); | ||
| prunerEmptyFields = new FilterSegmentPruner(range_a, Collections.emptySet(), queryVirtualColumns); | ||
|
|
||
| Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, Function.identity())); | ||
| Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, Function.identity())); |
There was a problem hiding this comment.
are these testing different things? they look the same to me, just different vc names with neither matching the shard vc
There was a problem hiding this comment.
ah no they are not really different, removed this redundant one but added some additional stuff like mixing different names from the segments too and calling prune twice to ensure cache is hit
| ) | ||
| ) | ||
| .withTuningConfig( | ||
| new UserCompactionTaskQueryTuningConfig( |
There was a problem hiding this comment.
a recent pr added a builder for this
| ) | ||
| ) | ||
| .withTuningConfig( | ||
| new UserCompactionTaskQueryTuningConfig( |
Description
changes:
ShardSpecinterface has a new method,getDomainVirtualColumnsto provide the virtual column information for pruningDimensionRangeShardSpecstoresVirtualColumnsin segment metadata so they can be compared to query expressions and be used for pruningFilterSegmentPruneris virtual column aware for segment pruning using the new methodsClusterBynow contains a map of key column toVirtualColumnalongside key columns, to support key columns being virtual columnsControllerImplpersists clustering virtual columns in compaction state in the transform specMSQCompactionRunnerhandles virtual columns in order-by/cluster-by for compactionVirtualColumns.createacross the codebase, and relaxed the constraints on one of the methods to useCollectioninstead ofListto provide more flexibility