Skip to content

Commit

Permalink
Merge pull request #96 from arenadata/feature/ADBDEV-5568
Browse files Browse the repository at this point in the history
[ADBDEV-5568] [Java] PXF: Add pushdown filter support for new datatypes
  • Loading branch information
iamlapa authored May 29, 2024
2 parents 8d8ad49 + 7550511 commit 103d300
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.greenplum.pxf.plugins.hdfs.parquet.ParquetOperatorPruner;
import org.greenplum.pxf.plugins.hdfs.parquet.ParquetRecordFilterBuilder;
import org.greenplum.pxf.plugins.hdfs.parquet.ParquetUtilities;
import org.greenplum.pxf.plugins.hdfs.utilities.DecimalOverflowOption;
import org.greenplum.pxf.plugins.hdfs.utilities.HdfsUtilities;

import java.io.IOException;
Expand All @@ -88,6 +89,8 @@
import static org.apache.parquet.hadoop.ParquetOutputFormat.WRITER_VERSION;
import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
import static org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import static org.greenplum.pxf.plugins.hdfs.ParquetResolver.DEFAULT_USE_LOCAL_PXF_TIMEZONE_READ;
import static org.greenplum.pxf.plugins.hdfs.ParquetResolver.USE_LOCAL_PXF_TIMEZONE_READ_NAME;

/**
* Parquet file accessor.
Expand Down Expand Up @@ -316,8 +319,10 @@ private FilterCompat.Filter getRecordFilter(String filterString, Map<String, Typ
}

List<ColumnDescriptor> tupleDescription = context.getTupleDescription();
DecimalOverflowOption decimalOverflowOption = DecimalOverflowOption.valueOf(configuration.get(ParquetResolver.PXF_PARQUET_WRITE_DECIMAL_OVERFLOW_PROPERTY_NAME, DecimalOverflowOption.ROUND.name()).toUpperCase());
boolean useLocalPxfTimezoneRead = context.getOption(USE_LOCAL_PXF_TIMEZONE_READ_NAME, DEFAULT_USE_LOCAL_PXF_TIMEZONE_READ);
ParquetRecordFilterBuilder filterBuilder = new ParquetRecordFilterBuilder(
tupleDescription, originalFieldsMap);
tupleDescription, originalFieldsMap, decimalOverflowOption, useLocalPxfTimezoneRead);
TreeVisitor pruner = new ParquetOperatorPruner(
tupleDescription, originalFieldsMap, SUPPORTED_OPERATORS);
TreeVisitor bpCharTransformer = new BPCharOperatorTransformer(tupleDescription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@
import org.greenplum.pxf.api.model.Resolver;
import org.greenplum.pxf.api.utilities.ColumnDescriptor;
import org.greenplum.pxf.api.utilities.Utilities;
import org.greenplum.pxf.plugins.hdfs.parquet.ParquetConfig;
import org.greenplum.pxf.plugins.hdfs.parquet.ParquetTimestampUtilities;
import org.greenplum.pxf.plugins.hdfs.parquet.ParquetTypeConverterFactory;
import org.greenplum.pxf.plugins.hdfs.parquet.ParquetUtilities;
import org.greenplum.pxf.plugins.hdfs.parquet.*;
import org.greenplum.pxf.plugins.hdfs.parquet.converters.ParquetTypeConverter;
import org.greenplum.pxf.plugins.hdfs.utilities.DecimalOverflowOption;
import org.greenplum.pxf.plugins.hdfs.utilities.DecimalUtilities;
Expand Down Expand Up @@ -288,35 +285,8 @@ private void fillGroupWithPrimitive(Group group, int columnIndex, Object fieldVa
}

private byte[] getFixedLenByteArray(String value, Type type, String columnName) {
// From org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.DecimalDataWriter#decimalToBinary
DecimalLogicalTypeAnnotation typeAnnotation = (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation();
int precision = Math.min(HiveDecimal.MAX_PRECISION, typeAnnotation.getPrecision());
int scale = Math.min(HiveDecimal.MAX_SCALE, typeAnnotation.getScale());

HiveDecimal hiveDecimal = decimalUtilities.parseDecimalStringWithHiveDecimal(value, precision, scale, columnName);
if (hiveDecimal == null) {
return null;
}

byte[] decimalBytes = hiveDecimal.bigIntegerBytesScaled(scale);

// Estimated number of bytes needed.
int precToBytes = ParquetFileAccessor.PRECISION_TO_BYTE_COUNT[precision - 1];
if (precToBytes == decimalBytes.length) {
// No padding needed.
return decimalBytes;
}

byte[] tgt = new byte[precToBytes];
if (hiveDecimal.signum() == -1) {
// For negative number, initializing bits to 1
for (int i = 0; i < precToBytes; i++) {
tgt[i] |= 0xFF;
}
}
System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones.
return tgt;
// end -- org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.DecimalDataWriter#decimalToBinary
return ParquetFixedLenByteArrayUtilities.convertFromBigDecimal(decimalUtilities, value, columnName,
(DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation());
}

// Set schema from context if null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.greenplum.pxf.plugins.hdfs.parquet;

import lombok.experimental.UtilityClass;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.greenplum.pxf.plugins.hdfs.ParquetFileAccessor;
import org.greenplum.pxf.plugins.hdfs.utilities.DecimalUtilities;

@UtilityClass
public class ParquetFixedLenByteArrayUtilities {
public static byte[] convertFromBigDecimal(DecimalUtilities decimalUtilities, String value, String columnName,
DecimalLogicalTypeAnnotation typeAnnotation) {
// From org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.DecimalDataWriter#decimalToBinary
int precision = Math.min(HiveDecimal.MAX_PRECISION, typeAnnotation.getPrecision());
int scale = Math.min(HiveDecimal.MAX_SCALE, typeAnnotation.getScale());

HiveDecimal hiveDecimal = decimalUtilities.parseDecimalStringWithHiveDecimal(value, precision, scale, columnName);
if (hiveDecimal == null) {
return null;
}

byte[] decimalBytes = hiveDecimal.bigIntegerBytesScaled(scale);

// Estimated number of bytes needed.
int precToBytes = ParquetFileAccessor.PRECISION_TO_BYTE_COUNT[precision - 1];
if (precToBytes == decimalBytes.length) {
// No padding needed.
return decimalBytes;
}

byte[] tgt = new byte[precToBytes];
if (hiveDecimal.signum() == -1) {
// For negative number, initializing bits to 1
for (int i = 0; i < precToBytes; i++) {
tgt[i] |= (byte) 0xFF;
}
}
System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones.
return tgt;
// end -- org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.DecimalDataWriter#decimalToBinary
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ public class ParquetOperatorPruner extends SupportedOperatorPruner {
// INT96 and FIXED_LEN_BYTE_ARRAY cannot be pushed down
// for more details look at
// org.apache.parquet.filter2.dictionarylevel.DictionaryFilter#expandDictionary
// where INT96 and FIXED_LEN_BYTE_ARRAY are not dictionary values
// where INT96 are not dictionary values
private static final EnumSet<PrimitiveType.PrimitiveTypeName> SUPPORTED_PRIMITIVE_TYPES =
EnumSet.of(
PrimitiveType.PrimitiveTypeName.INT32,
PrimitiveType.PrimitiveTypeName.INT64,
PrimitiveType.PrimitiveTypeName.BOOLEAN,
PrimitiveType.PrimitiveTypeName.BINARY,
PrimitiveType.PrimitiveTypeName.FLOAT,
PrimitiveType.PrimitiveTypeName.DOUBLE);
PrimitiveType.PrimitiveTypeName.DOUBLE,
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);

private final Map<String, Type> fields;
private final List<ColumnDescriptor> columnDescriptors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import org.greenplum.pxf.api.filter.OperatorNode;
import org.greenplum.pxf.api.filter.TreeVisitor;
import org.greenplum.pxf.api.utilities.ColumnDescriptor;
import org.greenplum.pxf.plugins.hdfs.ParquetResolver;
import org.greenplum.pxf.plugins.hdfs.utilities.DecimalOverflowOption;
import org.greenplum.pxf.plugins.hdfs.utilities.DecimalUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +38,8 @@
import static org.apache.parquet.filter2.predicate.FilterApi.not;
import static org.apache.parquet.filter2.predicate.FilterApi.or;
import static org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import static org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;

/**
* This is the implementation of {@link TreeVisitor} for Parquet.
Expand All @@ -51,17 +56,22 @@ public class ParquetRecordFilterBuilder implements TreeVisitor {
private final Map<String, Type> fields;
private final List<ColumnDescriptor> columnDescriptors;
private final Deque<FilterPredicate> filterQueue;
private final DecimalUtilities decimalUtilities;
private final boolean useLocalPxfTimezoneRead;

/**
* Constructor
*
* @param columnDescriptors the list of column descriptors
* @param originalFields a map of field names to types
*/
public ParquetRecordFilterBuilder(List<ColumnDescriptor> columnDescriptors, Map<String, Type> originalFields) {
public ParquetRecordFilterBuilder(List<ColumnDescriptor> columnDescriptors, Map<String, Type> originalFields,
DecimalOverflowOption decimalOverflowOption, boolean useLocalPxfTimezone) {
this.columnDescriptors = columnDescriptors;
this.filterQueue = new LinkedList<>();
this.fields = originalFields;
this.decimalUtilities = new DecimalUtilities(decimalOverflowOption, true);
this.useLocalPxfTimezoneRead = useLocalPxfTimezone;
}

@Override
Expand Down Expand Up @@ -119,7 +129,7 @@ private void processLogicalOperator(Operator operator) {
left = filterQueue.poll();

if (left == null) {
throw new IllegalStateException("Unable to process logical operator " + operator.toString());
throw new IllegalStateException("Unable to process logical operator " + operator);
}
}

Expand Down Expand Up @@ -159,9 +169,9 @@ private void processSimpleColumnOperator(OperatorNode operatorNode) {
String filterColumnName = columnDescriptor.columnName();
Type type = fields.get(filterColumnName);

// INT96 and FIXED_LEN_BYTE_ARRAY cannot be pushed down
// INT96 cannot be pushed down
// for more details look at org.apache.parquet.filter2.dictionarylevel.DictionaryFilter#expandDictionary
// where INT96 and FIXED_LEN_BYTE_ARRAY are not dictionary values
// where INT96 are not dictionary values
FilterPredicate simpleFilter;
switch (type.asPrimitiveType().getPrimitiveTypeName()) {
case INT32:
Expand All @@ -171,12 +181,13 @@ private void processSimpleColumnOperator(OperatorNode operatorNode) {

case INT64:
simpleFilter = ParquetRecordFilterBuilder.<Long, Operators.LongColumn>getOperatorWithLtGtSupport(operator)
.apply(longColumn(type.getName()), valueOperand == null ? null : Long.parseLong(valueOperand.toString()));
.apply(longColumn(type.getName()), getLongForINT64(type.getLogicalTypeAnnotation(), valueOperand));
break;

case FIXED_LEN_BYTE_ARRAY:
case BINARY:
simpleFilter = ParquetRecordFilterBuilder.<Binary, Operators.BinaryColumn>getOperatorWithLtGtSupport(operator)
.apply(binaryColumn(type.getName()), valueOperand == null ? null : Binary.fromString(valueOperand.toString()));
.apply(binaryColumn(type.getName()), getBinaryFromString(type.getLogicalTypeAnnotation(), valueOperand, filterColumnName));
break;

case BOOLEAN:
Expand Down Expand Up @@ -271,4 +282,26 @@ private static Integer getIntegerForINT32(LogicalTypeAnnotation logicalTypeAnnot
}
return Integer.parseInt(valueOperand.toString());
}

private Long getLongForINT64(LogicalTypeAnnotation logicalTypeAnnotation, OperandNode valueOperand) {
if (valueOperand == null) return null;
String value = valueOperand.toString();
if (logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation) {
return ParquetTimestampUtilities.getLongFromTimestamp(value, useLocalPxfTimezoneRead,
ParquetResolver.TIMESTAMP_PATTERN.matcher(value).find());
}
return Long.parseLong(value);
}

private Binary getBinaryFromString(LogicalTypeAnnotation logicalTypeAnnotation, OperandNode valueOperand,
String columnName) {
if (valueOperand == null) return null;
String value = valueOperand.toString();
if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) {
DecimalLogicalTypeAnnotation decimalType = (DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
byte[] tgt = ParquetFixedLenByteArrayUtilities.convertFromBigDecimal(decimalUtilities, value, columnName, decimalType);
return Binary.fromReusedByteArray(tgt);
}
return Binary.fromString(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ public static int getDaysFromEpochFromDateString(String dateString) {
return (int) date.toEpochDay();
}

public static long getLongFromTimestamp(String timestampString, boolean isAdjustedToUTC, boolean isTimestampWithTimeZone) {
public static long getLongFromTimestamp(String timestampString, boolean useLocalPxfTimezone, boolean isTimestampWithTimeZone) {
if (isTimestampWithTimeZone) {
// We receive a timestamp string with time zone offset from GPDB
OffsetDateTime date = OffsetDateTime.parse(timestampString, GreenplumDateTime.DATETIME_WITH_TIMEZONE_FORMATTER);
ZonedDateTime zdt = date.toZonedDateTime();
return getEpochWithMicroSeconds(zdt, date.getNano());
} else {
// We receive a timestamp string from GPDB in the server timezone
// If isAdjustedToUTC = true we convert it to the UTC using local pxf server timezone and save it in the parquet as UTC
// If isAdjustedToUTC = false we don't convert timestamp to the instant and save it as is
ZoneId zoneId = isAdjustedToUTC ? ZoneId.systemDefault() : ZoneOffset.UTC;
// If useLocalPxfTimezone = true we convert it to the UTC using local pxf server timezone and save it in the parquet as UTC
// If useLocalPxfTimezone = false we don't convert timestamp to the instant and save it as is
ZoneId zoneId = useLocalPxfTimezone ? ZoneId.systemDefault() : ZoneOffset.UTC;
LocalDateTime date = LocalDateTime.parse(timestampString, GreenplumDateTime.DATETIME_FORMATTER);
ZonedDateTime zdt = ZonedDateTime.of(date, zoneId);
return getEpochWithMicroSeconds(zdt, date.getNano());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,11 +663,41 @@ public void testUnsupportedINT96Filter() throws Exception {
}

@Test
public void testUnsupportedFixedLenByteArrayFilter() throws Exception {
public void testSupportedFixedLenByteArrayFilter() throws Exception {
// dec2 = 0
context.setFilterString("a14c23s1d0o5");
// all rows are expected
assertRowsReturned(ALL);
int[] expectedRows = {1};
assertRowsReturned(expectedRows);

// dec2 > 0
context.setFilterString("a14c23s1d0o2");
expectedRows = new int[] {2, 4, 6, 8, 10, 12, 14, 17, 20, 21, 22, 23, 24, 25};
assertRowsReturned(expectedRows);

// dec2 < 0
context.setFilterString("a14c23s1d0o1");
expectedRows = new int[] {3, 5, 7, 9, 11, 13, 16, 18, 19};
assertRowsReturned(expectedRows);

// dec2 >= 0
context.setFilterString("a14c23s1d0o4");
expectedRows = new int[] {1, 2, 4, 6, 8, 10, 12, 14, 17, 20, 21, 22, 23, 24, 25};
assertRowsReturned(expectedRows);

// dec2 <= 0
context.setFilterString("a14c23s1d0o3");
expectedRows = new int[] {1, 3, 5, 7, 9, 11, 13, 16, 18, 19};
assertRowsReturned(expectedRows);

// dec2 == null
context.setFilterString("a14o8");
expectedRows = new int[] {15};
assertRowsReturned(expectedRows);

// dec2 != null
context.setFilterString("a14o9");
expectedRows = new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25};
assertRowsReturned(expectedRows);
}

@Test
Expand Down
Loading

0 comments on commit 103d300

Please sign in to comment.