From 8d6697f1bb483e0e08e13eb98fdfdbc87c6edfe7 Mon Sep 17 00:00:00 2001 From: David Cromberge Date: Mon, 4 Nov 2024 14:33:38 +0000 Subject: [PATCH] Configurable sketch accuracy in merge rollup task --- .../pinot/core/common/MinionConstants.java | 1 + .../DistinctCountCPCSketchAggregator.java | 3 +- .../DistinctCountHLLAggregator.java | 3 +- .../DistinctCountThetaSketchAggregator.java | 24 +++++++++++---- .../DistinctCountULLAggregator.java | 3 +- .../IntegerTupleSketchAggregator.java | 21 +++++++++++-- .../aggregator/MaxValueAggregator.java | 3 +- .../aggregator/MinValueAggregator.java | 3 +- .../aggregator/SumValueAggregator.java | 3 +- .../aggregator/ValueAggregator.java | 5 +++- .../framework/SegmentProcessorConfig.java | 23 ++++++++++++-- .../processing/reducer/ReducerFactory.java | 3 +- .../processing/reducer/RollupReducer.java | 20 +++++++++---- .../plugin/minion/tasks/MergeTaskUtils.java | 22 ++++++++++++++ .../mergerollup/MergeRollupTaskExecutor.java | 4 +++ .../mergerollup/MergeRollupTaskUtils.java | 15 ++++++++++ ...RealtimeToOfflineSegmentsTaskExecutor.java | 4 +++ .../minion/tasks/MergeTaskUtilsTest.java | 30 +++++++++++++++---- .../mergerollup/MergeRollupTaskUtilsTest.java | 24 +++++++++++++++ 19 files changed, 185 insertions(+), 29 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 9b1b89b4b72b..27f44aa88b61 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -97,6 +97,7 @@ public static abstract class MergeTask { // Merge config public static final String MERGE_TYPE_KEY = "mergeType"; public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType"; + public static final String AGGREGATION_FUNCTION_PARAMETERS_PREFIX = "aggregationFunctionParameters."; public static final String MODE = "mode"; public static final String PROCESS_FROM_WATERMARK_MODE = "processFromWatermark"; public static final String PROCESS_ALL_MODE = "processAll"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java index 82e9a7416162..73985f564d2e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.datasketches.cpc.CpcSketch; import org.apache.datasketches.cpc.CpcUnion; import org.apache.pinot.core.common.ObjectSerDeUtils; @@ -30,7 +31,7 @@ public DistinctCountCPCSketchAggregator() { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map functionParameters) { CpcSketch first = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value1); CpcSketch second = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value2); CpcSketch result; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java index 4eecbe3696c3..940d356a9530 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java @@ -20,12 +20,13 @@ import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import java.util.Map; import org.apache.pinot.core.common.ObjectSerDeUtils; public class DistinctCountHLLAggregator implements ValueAggregator { @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map functionParameters) { try { HyperLogLog first = ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) value1); HyperLogLog second = ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) value2); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java index b11f7d7b0034..f22e38ed3cc6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java @@ -18,26 +18,38 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.datasketches.theta.Sketch; import org.apache.datasketches.theta.Union; import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.spi.Constants; import org.apache.pinot.spi.utils.CommonConstants; public class DistinctCountThetaSketchAggregator implements ValueAggregator { - private final Union _union; - public DistinctCountThetaSketchAggregator() { - // TODO: Handle configurable nominal entries - _union = Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion(); } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map functionParameters) { + String nominalEntriesParam = functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES); + + int sketchNominalEntries; + + // Check if nominal entries values match + if (nominalEntriesParam != null) { + sketchNominalEntries = Integer.parseInt(nominalEntriesParam); + } else { + // If the functionParameters don't have an explicit nominal entries value set, + // use the default value for nominal entries + sketchNominalEntries = CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES; + } + + Union union = Union.builder().setNominalEntries(sketchNominalEntries).buildUnion(); Sketch first = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value1); Sketch second = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value2); - Sketch result = _union.union(first, second); + Sketch result = union.union(first, second); return ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(result); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java index 2a51ac052b6f..70469f8cf46b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java @@ -19,12 +19,13 @@ package org.apache.pinot.core.segment.processing.aggregator; import com.dynatrace.hash4j.distinctcount.UltraLogLog; +import java.util.Map; import org.apache.pinot.core.common.ObjectSerDeUtils; public class DistinctCountULLAggregator implements ValueAggregator { @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map functionParameters) { UltraLogLog first = ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize((byte[]) value1); UltraLogLog second = ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize((byte[]) value2); // add to the one with a larger P and return that diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java index 8bdf7f8a86fa..b7df4c05fecd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java @@ -18,11 +18,14 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.datasketches.tuple.Sketch; import org.apache.datasketches.tuple.Union; import org.apache.datasketches.tuple.aninteger.IntegerSummary; import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations; import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.spi.utils.CommonConstants; public class IntegerTupleSketchAggregator implements ValueAggregator { @@ -33,10 +36,24 @@ public IntegerTupleSketchAggregator(IntegerSummary.Mode mode) { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map functionParameters) { + String nominalEntriesParam = functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES); + + int sketchNominalEntries; + + // Check if nominal entries values match + if (nominalEntriesParam != null) { + sketchNominalEntries = Integer.parseInt(nominalEntriesParam); + } else { + // If the functionParameters don't have an explicit nominal entries value set, + // use the default value for nominal entries + sketchNominalEntries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK); + } + Sketch first = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1); Sketch second = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2); - Sketch result = new Union<>(new IntegerSummarySetOperations(_mode, _mode)).union(first, second); + Sketch result = + new Union<>(sketchNominalEntries, new IntegerSummarySetOperations(_mode, _mode)).union(first, second); return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java index 6a231b036c35..1c4fa5a498f0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.pinot.spi.data.FieldSpec; @@ -33,7 +34,7 @@ public MaxValueAggregator(FieldSpec.DataType dataType) { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map functionParameters) { Object result; switch (_dataType) { case INT: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java index 9352cc99d088..8914dfa7c891 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.pinot.spi.data.FieldSpec; @@ -33,7 +34,7 @@ public MinValueAggregator(FieldSpec.DataType dataType) { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map functionParameters) { Object result; switch (_dataType) { case INT: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java index 0570cca1b514..8b7d57d88920 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.pinot.spi.data.FieldSpec; @@ -33,7 +34,7 @@ public SumValueAggregator(FieldSpec.DataType dataType) { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map functionParameters) { Object result; switch (_dataType) { case INT: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java index 016e0fb0915f..70d90dd100c5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; + + /** * Interface for value aggregator */ @@ -27,5 +30,5 @@ public interface ValueAggregator { * Given two values, return the aggregated value * @return aggregated value given two column values */ - Object aggregate(Object value1, Object value2); + Object aggregate(Object value1, Object value2, Map functionParameters); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java index 053f78b6f3f8..56009608ee77 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java @@ -44,12 +44,14 @@ public class SegmentProcessorConfig { private final List _partitionerConfigs; private final MergeType _mergeType; private final Map _aggregationTypes; + private final Map> _aggregationFunctionParameters; private final SegmentConfig _segmentConfig; private final Consumer _progressObserver; private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandlerConfig timeHandlerConfig, List partitionerConfigs, MergeType mergeType, - Map aggregationTypes, SegmentConfig segmentConfig, + Map aggregationTypes, + Map> aggregationFunctionParameters, SegmentConfig segmentConfig, Consumer progressObserver) { TimestampIndexUtils.applyTimestampIndex(tableConfig, schema); _tableConfig = tableConfig; @@ -58,6 +60,7 @@ private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandl _partitionerConfigs = partitionerConfigs; _mergeType = mergeType; _aggregationTypes = aggregationTypes; + _aggregationFunctionParameters = aggregationFunctionParameters; _segmentConfig = segmentConfig; _progressObserver = (progressObserver != null) ? progressObserver : p -> { // Do nothing. @@ -106,6 +109,13 @@ public Map getAggregationTypes() { return _aggregationTypes; } + /** + * The aggregation function parameters for the SegmentProcessorFramework's reduce phase with ROLLUP merge type + */ + public Map> getAggregationFunctionParameters() { + return _aggregationFunctionParameters; + } + /** * The SegmentConfig for the SegmentProcessorFramework's reduce phase */ @@ -134,6 +144,7 @@ public static class Builder { private List _partitionerConfigs; private MergeType _mergeType; private Map _aggregationTypes; + private Map> _aggregationFunctionParameters; private SegmentConfig _segmentConfig; private Consumer _progressObserver; @@ -167,6 +178,11 @@ public Builder setAggregationTypes(Map aggregat return this; } + public Builder setAggregationFunctionParameters(Map> aggregationFunctionParameters) { + _aggregationFunctionParameters = aggregationFunctionParameters; + return this; + } + public Builder setSegmentConfig(SegmentConfig segmentConfig) { _segmentConfig = segmentConfig; return this; @@ -193,11 +209,14 @@ public SegmentProcessorConfig build() { if (_aggregationTypes == null) { _aggregationTypes = Collections.emptyMap(); } + if (_aggregationFunctionParameters == null) { + _aggregationFunctionParameters = Collections.emptyMap(); + } if (_segmentConfig == null) { _segmentConfig = new SegmentConfig.Builder().build(); } return new SegmentProcessorConfig(_tableConfig, _schema, _timeHandlerConfig, _partitionerConfigs, _mergeType, - _aggregationTypes, _segmentConfig, _progressObserver); + _aggregationTypes, _aggregationFunctionParameters, _segmentConfig, _progressObserver); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java index a205500e3429..59fca478fa7e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java @@ -38,7 +38,8 @@ public static Reducer getReducer(String partitionId, GenericRowFileManager fileM case CONCAT: return new ConcatReducer(fileManager); case ROLLUP: - return new RollupReducer(partitionId, fileManager, processorConfig.getAggregationTypes(), reducerOutputDir); + return new RollupReducer(partitionId, fileManager, processorConfig.getAggregationTypes(), + processorConfig.getAggregationFunctionParameters(), reducerOutputDir); case DEDUP: return new DedupReducer(partitionId, fileManager, reducerOutputDir); default: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java index ae88120f204b..fdd1a671736e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java @@ -20,6 +20,7 @@ import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; @@ -47,14 +48,17 @@ public class RollupReducer implements Reducer { private final String _partitionId; private final GenericRowFileManager _fileManager; private final Map _aggregationTypes; + private final Map> _aggregationFunctionParameters; private final File _reducerOutputDir; private GenericRowFileManager _rollupFileManager; public RollupReducer(String partitionId, GenericRowFileManager fileManager, - Map aggregationTypes, File reducerOutputDir) { + Map aggregationTypes, + Map> aggregationFunctionParameters, File reducerOutputDir) { _partitionId = partitionId; _fileManager = fileManager; _aggregationTypes = aggregationTypes; + _aggregationFunctionParameters = aggregationFunctionParameters; _reducerOutputDir = reducerOutputDir; } @@ -91,7 +95,8 @@ private GenericRowFileManager doReduce() for (FieldSpec fieldSpec : fieldSpecs) { if (fieldSpec.getFieldType() == FieldType.METRIC) { aggregatorContextList.add(new AggregatorContext(fieldSpec, - _aggregationTypes.getOrDefault(fieldSpec.getName(), DEFAULT_AGGREGATOR_TYPE))); + _aggregationTypes.getOrDefault(fieldSpec.getName(), DEFAULT_AGGREGATOR_TYPE), + _aggregationFunctionParameters.getOrDefault(fieldSpec.getName(), Collections.emptyMap()))); } } @@ -159,7 +164,8 @@ private static void aggregateWithNullFields(GenericRow aggregatedRow, GenericRow } else { // Non-null field, aggregate the value aggregatedRow.putValue(column, - aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column))); + aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column), + aggregatorContext._functionParameters)); } } } @@ -169,17 +175,21 @@ private static void aggregateWithoutNullFields(GenericRow aggregatedRow, Generic for (AggregatorContext aggregatorContext : aggregatorContextList) { String column = aggregatorContext._column; aggregatedRow.putValue(column, - aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column))); + aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column), + aggregatorContext._functionParameters)); } } private static class AggregatorContext { final String _column; final ValueAggregator _aggregator; + final Map _functionParameters; - AggregatorContext(FieldSpec fieldSpec, AggregationFunctionType aggregationType) { + AggregatorContext(FieldSpec fieldSpec, AggregationFunctionType aggregationType, + Map functionParameters) { _column = fieldSpec.getName(); _aggregator = ValueAggregatorFactory.getValueAggregator(aggregationType, fieldSpec.getDataType()); + _functionParameters = functionParameters; } } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java index 34bf2e5ecf33..43f951629b32 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java @@ -137,6 +137,28 @@ public static Map getAggregationTypes(Map> getAggregationFunctionParameters(Map taskConfig) { + Map> aggregationFunctionParameters = new HashMap<>(); + String prefix = MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX; + + for (Map.Entry entry : taskConfig.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(prefix)) { + String[] parts = key.substring(prefix.length()).split("\\.", 2); + if (parts.length == 2) { + String metricColumn = parts[0]; + String paramName = parts[1]; + aggregationFunctionParameters.computeIfAbsent(metricColumn, k -> new HashMap<>()).put(paramName, value); + } + } + } + return aggregationFunctionParameters; + } + /** * Returns the segment config based on the task config. */ diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java index 049859c1a996..9dc4d024660f 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java @@ -83,6 +83,10 @@ protected List convert(PinotTaskConfig pinotTaskConfig, // Aggregation types segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs)); + // Aggregation function parameters + segmentProcessorConfigBuilder.setAggregationFunctionParameters( + MergeTaskUtils.getAggregationFunctionParameters(configs)); + // Segment config segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs)); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java index 463b2e53bbe3..a9afc928ffd4 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java @@ -20,6 +20,8 @@ import java.util.Map; import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.pinot.core.common.MinionConstants.MergeTask; @@ -45,12 +47,25 @@ private MergeRollupTaskUtils() { */ public static Map> getLevelToConfigMap(Map taskConfig) { Map> levelToConfigMap = new TreeMap<>(); + + // Regex to match aggregation function parameter keys + Pattern pattern = Pattern.compile("(\\w+)\\.aggregationFunctionParameters\\.(\\w+)\\.(\\w+)"); + for (Map.Entry entry : taskConfig.entrySet()) { String key = entry.getKey(); for (String configKey : VALID_CONFIG_KEYS) { if (key.endsWith(configKey)) { String level = key.substring(0, key.length() - configKey.length() - 1); levelToConfigMap.computeIfAbsent(level, k -> new TreeMap<>()).put(configKey, entry.getValue()); + } else { + Matcher matcher = pattern.matcher(key); + if (matcher.matches()) { + String level = matcher.group(1).trim(); // e.g., "1day" or "1hour" + String metric = matcher.group(2).trim(); // e.g., "metricColumnA" or "metricColumnB" + String param = matcher.group(3).trim(); // e.g., "nominalEntries" or "p" + String metricParam = MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + metric + "." + param; + levelToConfigMap.computeIfAbsent(level, k -> new TreeMap<>()).put(metricParam, entry.getValue()); + } } } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java index 502fa1cc7629..bb1fc70afafa 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java @@ -149,6 +149,10 @@ protected List convert(PinotTaskConfig pinotTaskConfig, // Aggregation types segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs)); + // Aggregation function parameters + segmentProcessorConfigBuilder.setAggregationFunctionParameters( + MergeTaskUtils.getAggregationFunctionParameters(configs)); + // Segment config segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs)); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java index 731607784bb0..c60b86899e56 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java @@ -49,8 +49,9 @@ public class MergeTaskUtilsTest { public void testGetTimeHandlerConfig() { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("dateTime").build(); - Schema schema = new Schema.SchemaBuilder() - .addDateTime("dateTime", DataType.LONG, "1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss", "1:SECONDS").build(); + Schema schema = + new Schema.SchemaBuilder().addDateTime("dateTime", DataType.LONG, "1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss", + "1:SECONDS").build(); Map taskConfig = new HashMap<>(); long expectedWindowStartMs = 1625097600000L; long expectedWindowEndMs = 1625184000000L; @@ -171,6 +172,23 @@ public void testGetAggregationTypes() { } } + @Test + public void testGetAggregationFunctionParameters() { + Map taskConfig = new HashMap<>(); + taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnA.param1", "value1"); + taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnA.param2", "value2"); + taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnB.param1", "value3"); + taskConfig.put("otherPrefix.metricColumnC.param1", "value1"); + taskConfig.put("aggregationFunction.metricColumnD.param2", "value2"); + Map> result = MergeTaskUtils.getAggregationFunctionParameters(taskConfig); + assertEquals(result.size(), 2); + assertTrue(result.containsKey("metricColumnA")); + assertTrue(result.containsKey("metricColumnB")); + assertEquals(result.get("metricColumnA").get("param1"), "value1"); + assertEquals(result.get("metricColumnA").get("param2"), "value2"); + assertEquals(result.get("metricColumnB").get("param1"), "value3"); + } + @Test public void testGetSegmentConfig() { Map taskConfig = new HashMap<>(); @@ -206,12 +224,12 @@ public void testAllowMerge() { segmentZKMetadata.setCustomMap(Collections.emptyMap()); assertTrue(MergeTaskUtils.allowMerge(segmentZKMetadata)); - segmentZKMetadata - .setCustomMap(Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, "false")); + segmentZKMetadata.setCustomMap( + Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, "false")); assertTrue(MergeTaskUtils.allowMerge(segmentZKMetadata)); - segmentZKMetadata - .setCustomMap(Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, "true")); + segmentZKMetadata.setCustomMap( + Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, "true")); assertFalse(MergeTaskUtils.allowMerge(segmentZKMetadata)); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java index 611598c7e0ac..dbb245dd6eab 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java @@ -62,4 +62,28 @@ public void testGetLevelToConfigMap() { assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY), "5000000"); assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS), "5"); } + + @Test + public void testAggregationFunctionParameters() { + Map taskConfig = new HashMap<>(); + taskConfig.put("hourly.aggregationFunctionParameters.metricColumnA.nominalEntries", "16384"); + taskConfig.put("hourly.aggregationFunctionParameters.metricColumnB.nominalEntries", "8192"); + taskConfig.put("daily.aggregationFunctionParameters.metricColumnA.nominalEntries", "8192"); + taskConfig.put("daily.aggregationFunctionParameters.metricColumnB.nominalEntries", "4096"); + + Map> levelToConfigMap = MergeRollupTaskUtils.getLevelToConfigMap(taskConfig); + assertEquals(levelToConfigMap.size(), 2); + + Map hourlyConfig = levelToConfigMap.get("hourly"); + assertNotNull(hourlyConfig); + assertEquals(hourlyConfig.size(), 2); + assertEquals(hourlyConfig.get("aggregationFunctionParameters.metricColumnA.nominalEntries"), "16384"); + assertEquals(hourlyConfig.get("aggregationFunctionParameters.metricColumnB.nominalEntries"), "8192"); + + Map dailyConfig = levelToConfigMap.get("daily"); + assertNotNull(dailyConfig); + assertEquals(dailyConfig.size(), 2); + assertEquals(dailyConfig.get("aggregationFunctionParameters.metricColumnA.nominalEntries"), "8192"); + assertEquals(dailyConfig.get("aggregationFunctionParameters.metricColumnB.nominalEntries"), "4096"); + } }