Skip to content

Commit

Permalink
Merge pull request #19141 from jackdelv/HPCC-32558
Browse files Browse the repository at this point in the history
HPCC-32558 Parquet plugin not utilizing Decimal encoding

Reviewed-By: Dan S. Camper <dan.camper@lexisnexisrisk.com>
Reviewed-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday authored Sep 26, 2024
2 parents fa97fd1 + 0d13c87 commit 9ce53c6
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 12 deletions.
76 changes: 70 additions & 6 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ arrow::Status ParquetWriter::fieldToNode(const RtlFieldInfo *field, std::vector<
}
break;
case type_decimal:
arrowFields.push_back(std::make_shared<arrow::Field>(name.str(), arrow::large_utf8())); // TODO: add decimal encoding
arrowFields.push_back(std::make_shared<arrow::Field>(name.str(), arrow::decimal(field->type->getDecimalDigits(), field->type->getDecimalPrecision())));
break;
case type_data:
if (field->type->length > 0)
Expand Down Expand Up @@ -1231,7 +1231,16 @@ std::string_view ParquetRowBuilder::getCurrView(const RtlFieldInfo *field)
case LargeStringType:
return arrayVisitor->largeStringArr->GetView(currArrayIndex());
case DecimalType:
return arrayVisitor->size == 128 ? arrayVisitor->decArr->GetView(currArrayIndex()) : arrayVisitor->largeDecArr->GetView(currArrayIndex());
if (arrayVisitor->size == 128)
{
serialized.append(arrayVisitor->decArr->FormatValue(currArrayIndex()).c_str());
return serialized.str();
}
else
{
serialized.append(arrayVisitor->largeDecArr->FormatValue(currArrayIndex()).c_str());
return serialized.str();
}
case FixedSizeBinaryType:
return arrayVisitor->fixedSizeBinaryArr->GetView(currArrayIndex());
default:
Expand Down Expand Up @@ -1896,11 +1905,63 @@ void ParquetRecordBinder::processReal(double value, const RtlFieldInfo *field)
}

/**
* @brief Processes the field for its respective type, and adds the key-value pair to the current row.
* @brief Adds DECIMAL and UDECIMAL fields to the builder based on the size of the decimal
*
* @param decText Data to be written to the Parquet file.
* @param bytes Number of bytes holding the digits.
* @param digits Number of digits in the decimal.
* @param precision Number of digits after the decimal point.
* @param field RtlFieldInfo holds metadata about the field.
*/
void ParquetRecordBinder::addDecimalFieldToBuilder(rtlDataAttr *decText, size32_t bytes, int32_t digits, int32_t precision, const RtlFieldInfo *field)
{
arrow::ArrayBuilder *fieldBuilder = parquetWriter->getFieldBuilder(field);
if (fieldBuilder->type()->id() == arrow::Type::DECIMAL128)
{
arrow::Decimal128Builder *decimal128Builder = static_cast<arrow::Decimal128Builder *>(fieldBuilder);
arrow::Decimal128 decimal128;
reportIfFailure(arrow::Decimal128::FromString(std::string_view(decText->getstr(), bytes), &decimal128, &digits, &precision));
reportIfFailure(decimal128Builder->Append(decimal128));
}
else if (fieldBuilder->type()->id() == arrow::Type::DECIMAL256)
{
arrow::Decimal256Builder *decimal256Builder = static_cast<arrow::Decimal256Builder *>(fieldBuilder);
arrow::Decimal256 decimal256;
reportIfFailure(arrow::Decimal256::FromString(std::string_view(decText->getstr(), bytes), &decimal256, &digits, &precision));
reportIfFailure(decimal256Builder->Append(decimal256));
}
else
failx("Incorrect type for Decimal field %s: %s", field->name, fieldBuilder->type()->ToString().c_str());
}

/**
* @brief Convert from Binary Coded Decimal to string and add to Decimal field builder.
*
* @param value Data to be written to the Parquet file.
* @param digits Number of bytes holding the digits.
* @param precision Number of digits after the decimal point.
* @param field RtlFieldInfo holds metadata about the field.
*/
void ParquetRecordBinder::processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo *field)
{
Decimal val;
size32_t bytes;
rtlDataAttr decText;
val.setUDecimal(digits, precision, value);
val.getStringX(bytes, decText.refstr());

// convert digits from number of bytes to number of digits
val.getPrecision(digits, precision);
assert(digits <= 64 && precision <= 32);
addDecimalFieldToBuilder(&decText, bytes, digits, precision, field);
}

/**
* @brief Convert from Binary Coded Decimal to string and add to Decimal field builder.
*
* @param value Data to be written to the Parquet file.
* @param digits Number of digits in decimal.
* @param precision Number of digits of precision.
* @param digits Number of bytes holding the digits.
* @param precision Number of digits after the decimal point.
* @param field RtlFieldInfo holds metadata about the field.
*/
void ParquetRecordBinder::processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo *field)
Expand All @@ -1911,7 +1972,10 @@ void ParquetRecordBinder::processDecimal(const void *value, unsigned digits, uns
val.setDecimal(digits, precision, value);
val.getStringX(bytes, decText.refstr());

parquetWriter->addFieldToBuilder(field, bytes, decText.getstr());
// convert digits from number of bytes to number of digits
val.getPrecision(digits, precision);
assert(digits <= 64 && precision <= 32);
addDecimalFieldToBuilder(&decText, bytes, digits, precision, field);
}

/**
Expand Down
9 changes: 3 additions & 6 deletions plugins/parquet/parquetembed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ class PARQUETEMBED_PLUGIN_API ParquetRowBuilder : public CInterfaceOf<IFieldSour

private:
__int64 currentRow; // The index in the arrow Array to read the current value.
StringBuffer serialized; // Output string from serialization.
StringBuffer serialized; // Output string from serialization of numeric types.
TableColumns *resultRows = nullptr; // A pointer to the result rows map where the left side are the field names for the columns and the right is an array of values.
std::vector<ParquetColumnTracker> pathStack; // ParquetColumnTracker keeps track of nested data when reading sets.
std::shared_ptr<ParquetArrayVisitor> arrayVisitor; // Visitor class for getting the correct type when reading a Parquet column.
Expand All @@ -561,10 +561,7 @@ class ParquetRecordBinder : public CInterfaceOf<IFieldProcessor>
virtual void processUInt(unsigned __int64 value, const RtlFieldInfo *field);
virtual void processReal(double value, const RtlFieldInfo *field);
virtual void processDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo *field);
virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo *field)
{
UNSUPPORTED("UNSIGNED decimals");
}
virtual void processUDecimal(const void *value, unsigned digits, unsigned precision, const RtlFieldInfo *field);
virtual void processUnicode(unsigned chars, const UChar *value, const RtlFieldInfo *field);
virtual void processQString(unsigned len, const char *value, const RtlFieldInfo *field);
virtual void processUtf8(unsigned chars, const char *value, const RtlFieldInfo *field);
Expand Down Expand Up @@ -597,7 +594,7 @@ class ParquetRecordBinder : public CInterfaceOf<IFieldProcessor>

protected:
inline unsigned checkNextParam(const RtlFieldInfo *field);

void addDecimalFieldToBuilder(rtlDataAttr *decText, size32_t bytes, int32_t digits, int32_t precision, const RtlFieldInfo *field);
const RtlTypeInfo *typeInfo = nullptr;
const IContextLogger &logctx;
int firstParam;
Expand Down
3 changes: 3 additions & 0 deletions testing/regress/ecl/key/parquetDecimals.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Dataset name='Result 1'>
<Row><d1>0</d1><d2>0</d2><d3>0</d3><d4>0</d4><d5>0</d5><d6>0</d6></Row>
</Dataset>
59 changes: 59 additions & 0 deletions testing/regress/ecl/parquetDecimals.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*##############################################################################
HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
############################################################################## */
//class=parquet
//nothor,noroxie

IMPORT STD;
IMPORT PARQUET;

layout := RECORD
DECIMAL d1;
DECIMAL1_0 d2;
DECIMAL16_8 d3;
DECIMAL32_16 d4;
UDECIMAL64_32 d5;
DECIMAL64_32 d6;
END;

decimalData := DATASET([{(DECIMAL) '0.12345678901234567890123456789012',
(DECIMAL1_0) '1',
(DECIMAL16_8) '12345678.12345678',
(DECIMAL32_16) '1234567890123456.1234567890123456',
(UDECIMAL64_32) '12345678901234567890123456789012.12345678901234567890123456789012',
(DECIMAL64_32) '-12345678901234567890123456789012.12345678901234567890123456789012'
}], layout);

overwriteOption := TRUE;
dropzoneDirectory := Std.File.GetDefaultDropZone();
parquetFilePath := dropzoneDirectory + '/regress/decimal.parquet';

ParquetIO.Write(decimalData, parquetFilePath, overwriteOption);

parquetDecimal := ParquetIO.Read(layout, parquetFilePath);

layout joinTransform (decimalData d, parquetDecimal p) := TRANSFORM
SELF.d1 := d.d1 - p.d1;
SELF.d2 := d.d2 - p.d2;
SELF.d3 := d.d3 - p.d3;
SELF.d4 := d.d4 - p.d4;
SELF.d5 := d.d5 - p.d5;
SELF.d6 := d.d6 - p.d6;
END;

result := JOIN(decimalData, parquetDecimal, true, joinTransform(LEFT, RIGHT), ALL);

OUTPUT(result);

0 comments on commit 9ce53c6

Please sign in to comment.