From 729015c9780b07c7037d3d63abadfa68c75f1e5c Mon Sep 17 00:00:00 2001 From: "A.Gromov" Date: Tue, 14 May 2024 20:33:37 +0300 Subject: [PATCH 1/4] [ADBDEV-5568] [Java] PXF: Add pushdown filter support for new datatypes - Add pushdown support for timestamp & timestampz for parquet - Add pushdown support for bigdecimal for parquet --- .../pxf/plugins/hdfs/ParquetFileAccessor.java | 4 +- .../pxf/plugins/hdfs/ParquetResolver.java | 36 +------ .../ParquetFixedLenByteArrayUtilities.java | 42 ++++++++ .../hdfs/parquet/ParquetOperatorPruner.java | 5 +- .../parquet/ParquetRecordFilterBuilder.java | 46 ++++++++- .../parquet/ParquetFilterPushDownTest.java | 36 ++++++- .../parquet/ParquetOperatorPrunerTest.java | 50 ++++++++-- .../ParquetRecordFilterBuilderTest.java | 96 +++++++++++-------- .../pxf/plugins/jdbc/JdbcResolver.java | 4 +- .../service/controller/ReadServiceImpl.java | 2 +- 10 files changed, 225 insertions(+), 96 deletions(-) create mode 100644 server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetFixedLenByteArrayUtilities.java diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java index 2bc37422c2..85f27c9ecc 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java @@ -64,6 +64,7 @@ import org.greenplum.pxf.plugins.hdfs.parquet.ParquetOperatorPruner; import org.greenplum.pxf.plugins.hdfs.parquet.ParquetRecordFilterBuilder; import org.greenplum.pxf.plugins.hdfs.parquet.ParquetUtilities; +import org.greenplum.pxf.plugins.hdfs.utilities.DecimalOverflowOption; import org.greenplum.pxf.plugins.hdfs.utilities.HdfsUtilities; import java.io.IOException; @@ -316,8 +317,9 @@ private FilterCompat.Filter getRecordFilter(String filterString, Map tupleDescription = context.getTupleDescription(); + DecimalOverflowOption decimalOverflowOption = DecimalOverflowOption.valueOf(configuration.get(ParquetResolver.PXF_PARQUET_WRITE_DECIMAL_OVERFLOW_PROPERTY_NAME, DecimalOverflowOption.ROUND.name()).toUpperCase()); ParquetRecordFilterBuilder filterBuilder = new ParquetRecordFilterBuilder( - tupleDescription, originalFieldsMap); + tupleDescription, originalFieldsMap, decimalOverflowOption); TreeVisitor pruner = new ParquetOperatorPruner( tupleDescription, originalFieldsMap, SUPPORTED_OPERATORS); TreeVisitor bpCharTransformer = new BPCharOperatorTransformer(tupleDescription); diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetResolver.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetResolver.java index 5f2748d0e7..f4959612b3 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetResolver.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetResolver.java @@ -38,10 +38,7 @@ import org.greenplum.pxf.api.model.Resolver; import org.greenplum.pxf.api.utilities.ColumnDescriptor; import org.greenplum.pxf.api.utilities.Utilities; -import org.greenplum.pxf.plugins.hdfs.parquet.ParquetConfig; -import org.greenplum.pxf.plugins.hdfs.parquet.ParquetTimestampUtilities; -import org.greenplum.pxf.plugins.hdfs.parquet.ParquetTypeConverterFactory; -import org.greenplum.pxf.plugins.hdfs.parquet.ParquetUtilities; +import org.greenplum.pxf.plugins.hdfs.parquet.*; import org.greenplum.pxf.plugins.hdfs.parquet.converters.ParquetTypeConverter; import org.greenplum.pxf.plugins.hdfs.utilities.DecimalOverflowOption; import org.greenplum.pxf.plugins.hdfs.utilities.DecimalUtilities; @@ -288,35 +285,8 @@ private void fillGroupWithPrimitive(Group group, int columnIndex, Object fieldVa } private byte[] getFixedLenByteArray(String value, Type type, String columnName) { - // From org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.DecimalDataWriter#decimalToBinary - DecimalLogicalTypeAnnotation typeAnnotation = (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); - int precision = Math.min(HiveDecimal.MAX_PRECISION, typeAnnotation.getPrecision()); - int scale = Math.min(HiveDecimal.MAX_SCALE, typeAnnotation.getScale()); - - HiveDecimal hiveDecimal = decimalUtilities.parseDecimalStringWithHiveDecimal(value, precision, scale, columnName); - if (hiveDecimal == null) { - return null; - } - - byte[] decimalBytes = hiveDecimal.bigIntegerBytesScaled(scale); - - // Estimated number of bytes needed. - int precToBytes = ParquetFileAccessor.PRECISION_TO_BYTE_COUNT[precision - 1]; - if (precToBytes == decimalBytes.length) { - // No padding needed. - return decimalBytes; - } - - byte[] tgt = new byte[precToBytes]; - if (hiveDecimal.signum() == -1) { - // For negative number, initializing bits to 1 - for (int i = 0; i < precToBytes; i++) { - tgt[i] |= 0xFF; - } - } - System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones. - return tgt; - // end -- org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.DecimalDataWriter#decimalToBinary + return ParquetFixedLenByteArrayUtilities.convertFromBigDecimal(decimalUtilities, value, columnName, + (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation()); } // Set schema from context if null diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetFixedLenByteArrayUtilities.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetFixedLenByteArrayUtilities.java new file mode 100644 index 0000000000..dd3921ef81 --- /dev/null +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetFixedLenByteArrayUtilities.java @@ -0,0 +1,42 @@ +package org.greenplum.pxf.plugins.hdfs.parquet; + +import lombok.experimental.UtilityClass; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.greenplum.pxf.plugins.hdfs.ParquetFileAccessor; +import org.greenplum.pxf.plugins.hdfs.utilities.DecimalUtilities; + +@UtilityClass +public class ParquetFixedLenByteArrayUtilities { + public static byte[] convertFromBigDecimal(DecimalUtilities decimalUtilities, String value, String columnName, + DecimalLogicalTypeAnnotation typeAnnotation) { + // From org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.DecimalDataWriter#decimalToBinary + int precision = Math.min(HiveDecimal.MAX_PRECISION, typeAnnotation.getPrecision()); + int scale = Math.min(HiveDecimal.MAX_SCALE, typeAnnotation.getScale()); + + HiveDecimal hiveDecimal = decimalUtilities.parseDecimalStringWithHiveDecimal(value, precision, scale, columnName); + if (hiveDecimal == null) { + return null; + } + + byte[] decimalBytes = hiveDecimal.bigIntegerBytesScaled(scale); + + // Estimated number of bytes needed. + int precToBytes = ParquetFileAccessor.PRECISION_TO_BYTE_COUNT[precision - 1]; + if (precToBytes == decimalBytes.length) { + // No padding needed. + return decimalBytes; + } + + byte[] tgt = new byte[precToBytes]; + if (hiveDecimal.signum() == -1) { + // For negative number, initializing bits to 1 + for (int i = 0; i < precToBytes; i++) { + tgt[i] |= (byte) 0xFF; + } + } + System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones. + return tgt; + // end -- org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.DecimalDataWriter#decimalToBinary + } +} diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetOperatorPruner.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetOperatorPruner.java index 6db0332b28..62f9324edd 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetOperatorPruner.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetOperatorPruner.java @@ -22,7 +22,7 @@ public class ParquetOperatorPruner extends SupportedOperatorPruner { // INT96 and FIXED_LEN_BYTE_ARRAY cannot be pushed down // for more details look at // org.apache.parquet.filter2.dictionarylevel.DictionaryFilter#expandDictionary - // where INT96 and FIXED_LEN_BYTE_ARRAY are not dictionary values + // where INT96 are not dictionary values private static final EnumSet SUPPORTED_PRIMITIVE_TYPES = EnumSet.of( PrimitiveType.PrimitiveTypeName.INT32, @@ -30,7 +30,8 @@ public class ParquetOperatorPruner extends SupportedOperatorPruner { PrimitiveType.PrimitiveTypeName.BOOLEAN, PrimitiveType.PrimitiveTypeName.BINARY, PrimitiveType.PrimitiveTypeName.FLOAT, - PrimitiveType.PrimitiveTypeName.DOUBLE); + PrimitiveType.PrimitiveTypeName.DOUBLE, + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); private final Map fields; private final List columnDescriptors; diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java index 1cdb92a9b7..bf419437e6 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java @@ -1,5 +1,6 @@ package org.greenplum.pxf.plugins.hdfs.parquet; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; @@ -14,6 +15,10 @@ import org.greenplum.pxf.api.filter.OperatorNode; import org.greenplum.pxf.api.filter.TreeVisitor; import org.greenplum.pxf.api.utilities.ColumnDescriptor; +import org.greenplum.pxf.plugins.hdfs.ParquetFileAccessor; +import org.greenplum.pxf.plugins.hdfs.ParquetResolver; +import org.greenplum.pxf.plugins.hdfs.utilities.DecimalOverflowOption; +import org.greenplum.pxf.plugins.hdfs.utilities.DecimalUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +40,8 @@ import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.or; import static org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import static org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import static org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; /** * This is the implementation of {@link TreeVisitor} for Parquet. @@ -51,6 +58,7 @@ public class ParquetRecordFilterBuilder implements TreeVisitor { private final Map fields; private final List columnDescriptors; private final Deque filterQueue; + private final DecimalUtilities decimalUtilities; /** * Constructor @@ -58,10 +66,12 @@ public class ParquetRecordFilterBuilder implements TreeVisitor { * @param columnDescriptors the list of column descriptors * @param originalFields a map of field names to types */ - public ParquetRecordFilterBuilder(List columnDescriptors, Map originalFields) { + public ParquetRecordFilterBuilder(List columnDescriptors, Map originalFields, + DecimalOverflowOption decimalOverflowOption) { this.columnDescriptors = columnDescriptors; this.filterQueue = new LinkedList<>(); this.fields = originalFields; + this.decimalUtilities = new DecimalUtilities(decimalOverflowOption, true); } @Override @@ -159,9 +169,9 @@ private void processSimpleColumnOperator(OperatorNode operatorNode) { String filterColumnName = columnDescriptor.columnName(); Type type = fields.get(filterColumnName); - // INT96 and FIXED_LEN_BYTE_ARRAY cannot be pushed down + // INT96 cannot be pushed down // for more details look at org.apache.parquet.filter2.dictionarylevel.DictionaryFilter#expandDictionary - // where INT96 and FIXED_LEN_BYTE_ARRAY are not dictionary values + // where INT96 are not dictionary values FilterPredicate simpleFilter; switch (type.asPrimitiveType().getPrimitiveTypeName()) { case INT32: @@ -171,12 +181,13 @@ private void processSimpleColumnOperator(OperatorNode operatorNode) { case INT64: simpleFilter = ParquetRecordFilterBuilder.getOperatorWithLtGtSupport(operator) - .apply(longColumn(type.getName()), valueOperand == null ? null : Long.parseLong(valueOperand.toString())); + .apply(longColumn(type.getName()), getLongForINT64(type.getLogicalTypeAnnotation(), valueOperand)); break; + case FIXED_LEN_BYTE_ARRAY: case BINARY: simpleFilter = ParquetRecordFilterBuilder.getOperatorWithLtGtSupport(operator) - .apply(binaryColumn(type.getName()), valueOperand == null ? null : Binary.fromString(valueOperand.toString())); + .apply(binaryColumn(type.getName()), getBinaryFromString(type.getLogicalTypeAnnotation(), valueOperand, filterColumnName)); break; case BOOLEAN: @@ -271,4 +282,29 @@ private static Integer getIntegerForINT32(LogicalTypeAnnotation logicalTypeAnnot } return Integer.parseInt(valueOperand.toString()); } + + private static Long getLongForINT64(LogicalTypeAnnotation logicalTypeAnnotation, OperandNode valueOperand) { + if (valueOperand == null) return null; + String value = valueOperand.toString(); + if (logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation) { + TimestampLogicalTypeAnnotation typets = (TimestampLogicalTypeAnnotation) logicalTypeAnnotation; + return ParquetTimestampUtilities.getLongFromTimestamp(value, typets.isAdjustedToUTC(), + ParquetResolver.TIMESTAMP_PATTERN.matcher(value).find()); + } + return Long.parseLong(value); + } + + private Binary getBinaryFromString(LogicalTypeAnnotation logicalTypeAnnotation, OperandNode valueOperand, + String columnName) { + if (valueOperand == null) return null; + String value = valueOperand.toString(); + if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) { + DecimalLogicalTypeAnnotation decimalType = (DecimalLogicalTypeAnnotation) logicalTypeAnnotation; + byte[] tgt = ParquetFixedLenByteArrayUtilities.convertFromBigDecimal(decimalUtilities, value, columnName, decimalType); + return Binary.fromReusedByteArray(tgt); + } + return Binary.fromString(value); + } + + } diff --git a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetFilterPushDownTest.java b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetFilterPushDownTest.java index 29ef043b98..08d7ba0ac2 100644 --- a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetFilterPushDownTest.java +++ b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetFilterPushDownTest.java @@ -663,11 +663,41 @@ public void testUnsupportedINT96Filter() throws Exception { } @Test - public void testUnsupportedFixedLenByteArrayFilter() throws Exception { + public void testSupportedFixedLenByteArrayFilter() throws Exception { // dec2 = 0 context.setFilterString("a14c23s1d0o5"); - // all rows are expected - assertRowsReturned(ALL); + int[] expectedRows = {1}; + assertRowsReturned(expectedRows); + + // dec2 > 0 + context.setFilterString("a14c23s1d0o2"); + expectedRows = new int[] {2, 4, 6, 8, 10, 12, 14, 17, 20, 21, 22, 23, 24, 25}; + assertRowsReturned(expectedRows); + + // dec2 < 0 + context.setFilterString("a14c23s1d0o1"); + expectedRows = new int[] {3, 5, 7, 9, 11, 13, 16, 18, 19}; + assertRowsReturned(expectedRows); + + // dec2 >= 0 + context.setFilterString("a14c23s1d0o4"); + expectedRows = new int[] {1, 2, 4, 6, 8, 10, 12, 14, 17, 20, 21, 22, 23, 24, 25}; + assertRowsReturned(expectedRows); + + // dec2 <= 0 + context.setFilterString("a14c23s1d0o3"); + expectedRows = new int[] {1, 3, 5, 7, 9, 11, 13, 16, 18, 19}; + assertRowsReturned(expectedRows); + + // dec2 == null + context.setFilterString("a14o8"); + expectedRows = new int[] {15}; + assertRowsReturned(expectedRows); + + // dec2 != null + context.setFilterString("a14o9"); + expectedRows = new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25}; + assertRowsReturned(expectedRows); } @Test diff --git a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetOperatorPrunerTest.java b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetOperatorPrunerTest.java index 07ccbd803d..d0b38d8f38 100644 --- a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetOperatorPrunerTest.java +++ b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetOperatorPrunerTest.java @@ -115,25 +115,59 @@ public void testUnsupportedINT96Filter() throws Exception { } @Test - public void testUnsupportedFixedLenByteArrayFilter() throws Exception { + public void testSupportedFixedLenByteArrayFilter() throws Exception { // dec2 = 0 Node result = helper("a14c23s1d0o5"); - assertNull(result); - - // name = 'row2' and dec2 = 0 -> name = 'row2' - result = helper("a1c25s4drow2o5a14c23s1d0o5l0"); assertNotNull(result); assertTrue(result instanceof OperatorNode); OperatorNode operatorNode = (OperatorNode) result; assertEquals(Operator.EQUALS, operatorNode.getOperator()); assertTrue(operatorNode.getLeft() instanceof ColumnIndexOperandNode); - assertEquals(1, ((ColumnIndexOperandNode) operatorNode.getLeft()).index()); + assertEquals(14, ((ColumnIndexOperandNode) operatorNode.getLeft()).index()); assertTrue(operatorNode.getRight() instanceof OperandNode); - assertEquals("row2", operatorNode.getRight().toString()); + assertEquals("0", operatorNode.getRight().toString()); + + // name = 'row2' and dec2 = 0 -> name = 'row2' + result = helper("a1c25s4drow2o5a14c23s1d0o5l0"); + assertNotNull(result); + assertTrue(result instanceof OperatorNode); + operatorNode = (OperatorNode) result; + assertEquals(Operator.AND, operatorNode.getOperator()); + assertTrue(operatorNode.getLeft() instanceof OperatorNode); + OperatorNode nameOperatorNode = (OperatorNode) operatorNode.getLeft(); + assertEquals(Operator.EQUALS, nameOperatorNode.getOperator()); + assertTrue(nameOperatorNode.getLeft() instanceof ColumnIndexOperandNode); + assertEquals(1, ((ColumnIndexOperandNode) nameOperatorNode.getLeft()).index()); + assertTrue(nameOperatorNode.getRight() instanceof OperandNode); + assertEquals("row2", nameOperatorNode.getRight().toString()); + assertTrue(operatorNode.getRight() instanceof OperatorNode); + OperatorNode decimalOperatorNode = (OperatorNode) operatorNode.getRight(); + assertEquals(Operator.EQUALS, decimalOperatorNode.getOperator()); + assertTrue(decimalOperatorNode.getLeft() instanceof ColumnIndexOperandNode); + assertEquals(14, ((ColumnIndexOperandNode) decimalOperatorNode.getLeft()).index()); + assertTrue(decimalOperatorNode.getRight() instanceof OperandNode); + assertEquals("0", decimalOperatorNode.getRight().toString()); // name = 'row2' or dec2 = 0 -> null result = helper("a1c25s4drow2o5a14c23s1d0o5l1"); - assertNull(result); + assertNotNull(result); + assertTrue(result instanceof OperatorNode); + operatorNode = (OperatorNode) result; + assertEquals(Operator.OR, operatorNode.getOperator()); + assertTrue(operatorNode.getLeft() instanceof OperatorNode); + nameOperatorNode = (OperatorNode) operatorNode.getLeft(); + assertEquals(Operator.EQUALS, nameOperatorNode.getOperator()); + assertTrue(nameOperatorNode.getLeft() instanceof ColumnIndexOperandNode); + assertEquals(1, ((ColumnIndexOperandNode) nameOperatorNode.getLeft()).index()); + assertTrue(nameOperatorNode.getRight() instanceof OperandNode); + assertEquals("row2", nameOperatorNode.getRight().toString()); + assertTrue(operatorNode.getRight() instanceof OperatorNode); + decimalOperatorNode = (OperatorNode) operatorNode.getRight(); + assertEquals(Operator.EQUALS, decimalOperatorNode.getOperator()); + assertTrue(decimalOperatorNode.getLeft() instanceof ColumnIndexOperandNode); + assertEquals(14, ((ColumnIndexOperandNode) decimalOperatorNode.getLeft()).index()); + assertTrue(decimalOperatorNode.getRight() instanceof OperandNode); + assertEquals("0", decimalOperatorNode.getRight().toString()); } @Test diff --git a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilderTest.java b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilderTest.java index 375d26c772..02bfa10309 100644 --- a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilderTest.java +++ b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilderTest.java @@ -1,11 +1,13 @@ package org.greenplum.pxf.plugins.hdfs.parquet; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.greenplum.pxf.api.filter.FilterParser; import org.greenplum.pxf.api.filter.Node; +import org.greenplum.pxf.plugins.hdfs.utilities.DecimalOverflowOption; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; public class ParquetRecordFilterBuilderTest extends ParquetBaseTest { @@ -82,72 +84,82 @@ public void testUnsupportedINT96IsNotNullFilter() { } @Test - public void testUnsupportedFixedLenByteArrayEqualsFilter() { + public void testSupportedFixedLenByteArrayEqualsFilter() throws Exception { // dec2 = 0 - Exception e = assertThrows(UnsupportedOperationException.class, - () -> filterBuilderFromFilterString("a14c23s1d0o5")); - assertEquals("Column dec2 of type FIXED_LEN_BYTE_ARRAY is not supported", e.getMessage()); + ParquetRecordFilterBuilder filterBuilder = filterBuilderFromFilterString("a14c23s1d0o5"); + FilterPredicate filterPredicate = getFilterPredicate(filterBuilder); + assertEquals("eq(dec2, Binary{3 reused bytes, [0, 0, 0]})", filterPredicate.toString()); } @Test - public void testUnsupportedFixedLenByteArrayLessThanFilter() { - // dec2 = 0 - Exception e = assertThrows(UnsupportedOperationException.class, - () -> filterBuilderFromFilterString("a14c23s1d0o1")); - assertEquals("Column dec2 of type FIXED_LEN_BYTE_ARRAY is not supported", e.getMessage()); + public void testSupportedFixedLenByteArrayLessThanFilter() throws Exception { + // dec2 < 0 + ParquetRecordFilterBuilder filterBuilder = filterBuilderFromFilterString("a14c23s1d0o1"); + FilterPredicate filterPredicate = getFilterPredicate(filterBuilder); + assertEquals("lt(dec2, Binary{3 reused bytes, [0, 0, 0]})", filterPredicate.toString()); } @Test - public void testUnsupportedFixedLenByteArrayGreaterThanFilter() { - // dec2 = 0 - Exception e = assertThrows(UnsupportedOperationException.class, - () -> filterBuilderFromFilterString("a14c23s1d0o2")); - assertEquals("Column dec2 of type FIXED_LEN_BYTE_ARRAY is not supported", e.getMessage()); + public void testSupportedFixedLenByteArrayGreaterThanFilter() throws Exception { + // dec2 > 0 + ParquetRecordFilterBuilder filterBuilder = filterBuilderFromFilterString("a14c23s1d0o2"); + FilterPredicate filterPredicate = getFilterPredicate(filterBuilder); + assertEquals("gt(dec2, Binary{3 reused bytes, [0, 0, 0]})", filterPredicate.toString()); } @Test - public void testUnsupportedFixedLenByteArrayLessThanOrEqualsFilter() { - // dec2 = 0 - Exception e = assertThrows(UnsupportedOperationException.class, - () -> filterBuilderFromFilterString("a14c23s1d0o3")); - assertEquals("Column dec2 of type FIXED_LEN_BYTE_ARRAY is not supported", e.getMessage()); + public void testSupportedFixedLenByteArrayLessThanOrEqualsFilter() throws Exception { + // dec2 <= 0 + ParquetRecordFilterBuilder filterBuilder = filterBuilderFromFilterString("a14c23s1d0o3"); + FilterPredicate filterPredicate = getFilterPredicate(filterBuilder); + assertEquals("lteq(dec2, Binary{3 reused bytes, [0, 0, 0]})", filterPredicate.toString()); } @Test - public void testUnsupportedFixedLenByteArrayGreaterThanOrEqualsFilter() { - // dec2 = 0 - Exception e = assertThrows(UnsupportedOperationException.class, - () -> filterBuilderFromFilterString("a14c23s1d0o4")); - assertEquals("Column dec2 of type FIXED_LEN_BYTE_ARRAY is not supported", e.getMessage()); + public void testSupportedFixedLenByteArrayGreaterThanOrEqualsFilter() throws Exception { + // dec2 >= 0 + ParquetRecordFilterBuilder filterBuilder = filterBuilderFromFilterString("a14c23s1d0o4"); + FilterPredicate filterPredicate = getFilterPredicate(filterBuilder); + assertEquals("gteq(dec2, Binary{3 reused bytes, [0, 0, 0]})", filterPredicate.toString()); } @Test - public void testUnsupportedFixedLenByteArrayNotEqualsFilter() { - // dec2 = 0 - Exception e = assertThrows(UnsupportedOperationException.class, - () -> filterBuilderFromFilterString("a14c23s1d0o6")); - assertEquals("Column dec2 of type FIXED_LEN_BYTE_ARRAY is not supported", e.getMessage()); + public void testSupportedFixedLenByteArrayNotEqualsFilter() throws Exception { + // dec2 != 0 + ParquetRecordFilterBuilder filterBuilder = filterBuilderFromFilterString("a14c23s1d0o6"); + FilterPredicate filterPredicate = getFilterPredicate(filterBuilder); + assertEquals("noteq(dec2, Binary{3 reused bytes, [0, 0, 0]})", filterPredicate.toString()); } @Test - public void testUnsupportedFixedLenByteArrayIsNullFilter() { - // tm = '2013-07-23 21:00:00' - Exception e = assertThrows(UnsupportedOperationException.class, - () -> filterBuilderFromFilterString("a14o8")); - assertEquals("Column dec2 of type FIXED_LEN_BYTE_ARRAY is not supported", e.getMessage()); + public void testSupportedFixedLenByteArrayIsNullFilter() throws Exception { + // dec2 == null + ParquetRecordFilterBuilder filterBuilder = filterBuilderFromFilterString("a14o8"); + FilterPredicate filterPredicate = getFilterPredicate(filterBuilder); + assertEquals("eq(dec2, null)", filterPredicate.toString()); } @Test - public void testUnsupportedFixedLenByteArrayIsNotNullFilter() { - // tm = '2013-07-23 21:00:00' - Exception e = assertThrows(UnsupportedOperationException.class, - () -> filterBuilderFromFilterString("a14o9")); - assertEquals("Column dec2 of type FIXED_LEN_BYTE_ARRAY is not supported", e.getMessage()); + public void testSupportedFixedLenByteArrayIsNotNullFilter() throws Exception { + // dec2 != null + ParquetRecordFilterBuilder filterBuilder = filterBuilderFromFilterString("a14o9"); + FilterPredicate filterPredicate = getFilterPredicate(filterBuilder); + assertEquals("noteq(dec2, null)", filterPredicate.toString()); + } + + private static FilterPredicate getFilterPredicate(ParquetRecordFilterBuilder filterBuilder) { + assertNotNull(filterBuilder); + FilterCompat.Filter recordFilter = filterBuilder.getRecordFilter(); + assertNotNull(recordFilter); + assertInstanceOf(FilterCompat.FilterPredicateCompat.class, recordFilter); + FilterPredicate filterPredicate = ((FilterCompat.FilterPredicateCompat) recordFilter).getFilterPredicate(); + return filterPredicate; } private ParquetRecordFilterBuilder filterBuilderFromFilterString(String filterString) throws Exception { - ParquetRecordFilterBuilder filterBuilder = new ParquetRecordFilterBuilder(columnDescriptors, originalFieldsMap); + ParquetRecordFilterBuilder filterBuilder = new ParquetRecordFilterBuilder(columnDescriptors, originalFieldsMap, + DecimalOverflowOption.IGNORE); // Parse the filter string into a expression tree Node Node root = new FilterParser().parse(filterString); diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcResolver.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcResolver.java index 4bd5c8ce96..feb636cfba 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcResolver.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcResolver.java @@ -136,7 +136,9 @@ public class JdbcResolver extends JdbcBasePlugin implements Resolver { DataType.SMALLINT, DataType.NUMERIC, DataType.TIMESTAMP, - DataType.DATE + DataType.DATE, + DataType.JSON, + DataType.JSONB ); /** diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadServiceImpl.java index 893952d144..833a14ef87 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadServiceImpl.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadServiceImpl.java @@ -85,7 +85,7 @@ private Predicate getIdentifierFilter(String profile, String /** * Calls Fragmenter service to get a list of fragments for the resource, then reads records for each fragment - * and writes them to the output stream. Maintains the satistics about the progress of the query and reports + * and writes them to the output stream. Maintains the statistics about the progress of the query and reports * it to the caller even if the operation failed or aborted. * * @param context request context From 8feb8f83c6a337a3d31744ae1847c8ffa9c3acca Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Thu, 16 May 2024 11:49:23 +0300 Subject: [PATCH 2/4] ADBDEV-5568: Change time zone converter for filter --- .../pxf/plugins/hdfs/ParquetFileAccessor.java | 5 ++++- .../hdfs/parquet/ParquetRecordFilterBuilder.java | 13 ++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java index 85f27c9ecc..4dc84977ff 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java @@ -89,6 +89,8 @@ import static org.apache.parquet.hadoop.ParquetOutputFormat.WRITER_VERSION; import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; import static org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import static org.greenplum.pxf.plugins.hdfs.ParquetResolver.DEFAULT_USE_LOCAL_PXF_TIMEZONE_READ; +import static org.greenplum.pxf.plugins.hdfs.ParquetResolver.USE_LOCAL_PXF_TIMEZONE_READ_NAME; /** * Parquet file accessor. @@ -318,8 +320,9 @@ private FilterCompat.Filter getRecordFilter(String filterString, Map tupleDescription = context.getTupleDescription(); DecimalOverflowOption decimalOverflowOption = DecimalOverflowOption.valueOf(configuration.get(ParquetResolver.PXF_PARQUET_WRITE_DECIMAL_OVERFLOW_PROPERTY_NAME, DecimalOverflowOption.ROUND.name()).toUpperCase()); + boolean useLocalPxfTimezoneRead = context.getOption(USE_LOCAL_PXF_TIMEZONE_READ_NAME, DEFAULT_USE_LOCAL_PXF_TIMEZONE_READ); ParquetRecordFilterBuilder filterBuilder = new ParquetRecordFilterBuilder( - tupleDescription, originalFieldsMap, decimalOverflowOption); + tupleDescription, originalFieldsMap, decimalOverflowOption, useLocalPxfTimezoneRead); TreeVisitor pruner = new ParquetOperatorPruner( tupleDescription, originalFieldsMap, SUPPORTED_OPERATORS); TreeVisitor bpCharTransformer = new BPCharOperatorTransformer(tupleDescription); diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java index bf419437e6..88ef2ecb7b 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java @@ -1,6 +1,5 @@ package org.greenplum.pxf.plugins.hdfs.parquet; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; @@ -15,7 +14,6 @@ import org.greenplum.pxf.api.filter.OperatorNode; import org.greenplum.pxf.api.filter.TreeVisitor; import org.greenplum.pxf.api.utilities.ColumnDescriptor; -import org.greenplum.pxf.plugins.hdfs.ParquetFileAccessor; import org.greenplum.pxf.plugins.hdfs.ParquetResolver; import org.greenplum.pxf.plugins.hdfs.utilities.DecimalOverflowOption; import org.greenplum.pxf.plugins.hdfs.utilities.DecimalUtilities; @@ -59,6 +57,7 @@ public class ParquetRecordFilterBuilder implements TreeVisitor { private final List columnDescriptors; private final Deque filterQueue; private final DecimalUtilities decimalUtilities; + private final boolean useLocalPxfTimezoneRead; /** * Constructor @@ -67,11 +66,12 @@ public class ParquetRecordFilterBuilder implements TreeVisitor { * @param originalFields a map of field names to types */ public ParquetRecordFilterBuilder(List columnDescriptors, Map originalFields, - DecimalOverflowOption decimalOverflowOption) { + DecimalOverflowOption decimalOverflowOption, boolean useLocalPxfTimezoneRead) { this.columnDescriptors = columnDescriptors; this.filterQueue = new LinkedList<>(); this.fields = originalFields; this.decimalUtilities = new DecimalUtilities(decimalOverflowOption, true); + this.useLocalPxfTimezoneRead = useLocalPxfTimezoneRead; } @Override @@ -129,7 +129,7 @@ private void processLogicalOperator(Operator operator) { left = filterQueue.poll(); if (left == null) { - throw new IllegalStateException("Unable to process logical operator " + operator.toString()); + throw new IllegalStateException("Unable to process logical operator " + operator); } } @@ -283,12 +283,11 @@ private static Integer getIntegerForINT32(LogicalTypeAnnotation logicalTypeAnnot return Integer.parseInt(valueOperand.toString()); } - private static Long getLongForINT64(LogicalTypeAnnotation logicalTypeAnnotation, OperandNode valueOperand) { + private Long getLongForINT64(LogicalTypeAnnotation logicalTypeAnnotation, OperandNode valueOperand) { if (valueOperand == null) return null; String value = valueOperand.toString(); if (logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation) { - TimestampLogicalTypeAnnotation typets = (TimestampLogicalTypeAnnotation) logicalTypeAnnotation; - return ParquetTimestampUtilities.getLongFromTimestamp(value, typets.isAdjustedToUTC(), + return ParquetTimestampUtilities.getLongFromTimestamp(value, useLocalPxfTimezoneRead, ParquetResolver.TIMESTAMP_PATTERN.matcher(value).find()); } return Long.parseLong(value); From 9fff653d784f97f39347b966d6ad480609c52137 Mon Sep 17 00:00:00 2001 From: "A.Gromov" Date: Thu, 16 May 2024 11:58:46 +0300 Subject: [PATCH 3/4] [ADBDEV-5568] [Java] PXF: Add pushdown filter support for new datatypes - Add pushdown support for timestamp & timestampz for parquet - Add pushdown support for bigdecimal for parquet --- .../plugins/hdfs/parquet/ParquetRecordFilterBuilder.java | 4 ++-- .../plugins/hdfs/parquet/ParquetTimestampUtilities.java | 8 ++++---- .../hdfs/parquet/ParquetRecordFilterBuilderTest.java | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java index 88ef2ecb7b..47487cff6f 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java @@ -66,12 +66,12 @@ public class ParquetRecordFilterBuilder implements TreeVisitor { * @param originalFields a map of field names to types */ public ParquetRecordFilterBuilder(List columnDescriptors, Map originalFields, - DecimalOverflowOption decimalOverflowOption, boolean useLocalPxfTimezoneRead) { + DecimalOverflowOption decimalOverflowOption, boolean useLocalPxfTimezone) { this.columnDescriptors = columnDescriptors; this.filterQueue = new LinkedList<>(); this.fields = originalFields; this.decimalUtilities = new DecimalUtilities(decimalOverflowOption, true); - this.useLocalPxfTimezoneRead = useLocalPxfTimezoneRead; + this.useLocalPxfTimezoneRead = useLocalPxfTimezone; } @Override diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetTimestampUtilities.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetTimestampUtilities.java index 728ec26d0f..014147f66f 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetTimestampUtilities.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetTimestampUtilities.java @@ -27,7 +27,7 @@ public static int getDaysFromEpochFromDateString(String dateString) { return (int) date.toEpochDay(); } - public static long getLongFromTimestamp(String timestampString, boolean isAdjustedToUTC, boolean isTimestampWithTimeZone) { + public static long getLongFromTimestamp(String timestampString, boolean useLocalPxfTimezone, boolean isTimestampWithTimeZone) { if (isTimestampWithTimeZone) { // We receive a timestamp string with time zone offset from GPDB OffsetDateTime date = OffsetDateTime.parse(timestampString, GreenplumDateTime.DATETIME_WITH_TIMEZONE_FORMATTER); @@ -35,9 +35,9 @@ public static long getLongFromTimestamp(String timestampString, boolean isAdjust return getEpochWithMicroSeconds(zdt, date.getNano()); } else { // We receive a timestamp string from GPDB in the server timezone - // If isAdjustedToUTC = true we convert it to the UTC using local pxf server timezone and save it in the parquet as UTC - // If isAdjustedToUTC = false we don't convert timestamp to the instant and save it as is - ZoneId zoneId = isAdjustedToUTC ? ZoneId.systemDefault() : ZoneOffset.UTC; + // If useLocalPxfTimezone = true we convert it to the UTC using local pxf server timezone and save it in the parquet as UTC + // If useLocalPxfTimezone = false we don't convert timestamp to the instant and save it as is + ZoneId zoneId = useLocalPxfTimezone ? ZoneId.systemDefault() : ZoneOffset.UTC; LocalDateTime date = LocalDateTime.parse(timestampString, GreenplumDateTime.DATETIME_FORMATTER); ZonedDateTime zdt = ZonedDateTime.of(date, zoneId); return getEpochWithMicroSeconds(zdt, date.getNano()); diff --git a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilderTest.java b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilderTest.java index 02bfa10309..7b426b7b94 100644 --- a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilderTest.java +++ b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilderTest.java @@ -159,7 +159,7 @@ private static FilterPredicate getFilterPredicate(ParquetRecordFilterBuilder fil private ParquetRecordFilterBuilder filterBuilderFromFilterString(String filterString) throws Exception { ParquetRecordFilterBuilder filterBuilder = new ParquetRecordFilterBuilder(columnDescriptors, originalFieldsMap, - DecimalOverflowOption.IGNORE); + DecimalOverflowOption.IGNORE, true); // Parse the filter string into a expression tree Node Node root = new FilterParser().parse(filterString); From 75505112b9aa1881f5092e10332c2ae40e4a7e1f Mon Sep 17 00:00:00 2001 From: "A.Gromov" Date: Thu, 16 May 2024 12:12:48 +0300 Subject: [PATCH 4/4] [ADBDEV-5568] [Java] PXF: Add pushdown filter support for new datatypes - Add pushdown support for timestamp & timestampz for parquet - Add pushdown support for bigdecimal for parquet --- .../pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java index 47487cff6f..c0986c6807 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/parquet/ParquetRecordFilterBuilder.java @@ -304,6 +304,4 @@ private Binary getBinaryFromString(LogicalTypeAnnotation logicalTypeAnnotation, } return Binary.fromString(value); } - - }