Skip to content

Commit

Permalink
apacheGH-43983: [C++][Parquet] Add support for arrow::ArrayStatistics…
Browse files Browse the repository at this point in the history
…: zero-copy types
  • Loading branch information
kou committed Sep 6, 2024
1 parent 5ad0b3e commit 14f5ffa
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 28 deletions.
33 changes: 32 additions & 1 deletion cpp/src/parquet/arrow/arrow_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ template <typename ArrowType, typename MinMaxType>
void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) {
using ArrowArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
using ArrowCType = typename ArrowType::c_type;
constexpr auto min = std::numeric_limits<ArrowCType>::min();
constexpr auto min = std::numeric_limits<ArrowCType>::lowest();
constexpr auto max = std::numeric_limits<ArrowCType>::max();

std::string json;
Expand Down Expand Up @@ -257,14 +257,30 @@ TEST(TestStatisticsRead, UInt16) {
TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16());
}

TEST(TestStatisticsRead, Int32) {
TestStatisticsReadArray<::arrow::Int32Type, int64_t>(::arrow::int32());
}

TEST(TestStatisticsRead, UInt32) {
TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32());
}

TEST(TestStatisticsRead, Int64) {
TestStatisticsReadArray<::arrow::Int64Type, int64_t>(::arrow::int64());
}

TEST(TestStatisticsRead, UInt64) {
TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64());
}

TEST(TestStatisticsRead, Float) {
TestStatisticsReadArray<::arrow::FloatType, double>(::arrow::float32());
}

TEST(TestStatisticsRead, Double) {
TestStatisticsReadArray<::arrow::DoubleType, double>(::arrow::float64());
}

TEST(TestStatisticsRead, Date32) {
TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32());
}
Expand All @@ -279,6 +295,21 @@ TEST(TestStatisticsRead, Time64) {
::arrow::time64(::arrow::TimeUnit::MICRO));
}

TEST(TestStatisticsRead, TimestampMilli) {
TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
::arrow::timestamp(::arrow::TimeUnit::MILLI));
}

TEST(TestStatisticsRead, TimestampMicro) {
TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
::arrow::timestamp(::arrow::TimeUnit::MICRO));
}

TEST(TestStatisticsRead, TimestampNano) {
TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
::arrow::timestamp(::arrow::TimeUnit::NANO));
}

TEST(TestStatisticsRead, Duration) {
TestStatisticsReadArray<::arrow::DurationType, int64_t>(
::arrow::duration(::arrow::TimeUnit::NANO));
Expand Down
87 changes: 60 additions & 27 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,30 +319,20 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) {
}

template <typename ArrowType, typename ParquetType>
Status TransferInt(RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx, const std::shared_ptr<Field>& field,
Datum* out) {
void AttachStatistics(::arrow::ArrayData* data,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
ARROW_ASSIGN_OR_RAISE(auto data,
::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool));

auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
int64_t null_count = 0;
std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
if (field->nullable()) {
null_count = reader->null_count();
buffers[0] = reader->ReleaseIsValid();
auto statistics = metadata->statistics().get();
if (data->null_count == ::arrow::kUnknownNullCount && !statistics) {
return;
}
auto array_data =
::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count);

auto array_statistics = std::make_shared<::arrow::ArrayStatistics>();
array_statistics->null_count = null_count;
auto statistics = metadata->statistics().get();
if (data->null_count != ::arrow::kUnknownNullCount) {
array_statistics->null_count = data->null_count;
}
if (statistics) {
if (statistics->HasDistinctCount()) {
array_statistics->distinct_count = statistics->distinct_count();
Expand All @@ -352,7 +342,10 @@ Status TransferInt(RecordReader* reader,
static_cast<::parquet::TypedStatistics<ParquetType>*>(statistics);
const ArrowCType min = typed_statistics->min();
const ArrowCType max = typed_statistics->max();
if (std::is_signed<ArrowCType>::value) {
if (std::is_floating_point<ArrowCType>::value) {
array_statistics->min = static_cast<double>(min);
array_statistics->max = static_cast<double>(max);
} else if (std::is_signed<ArrowCType>::value) {
array_statistics->min = static_cast<int64_t>(min);
array_statistics->max = static_cast<int64_t>(max);
} else {
Expand All @@ -370,13 +363,41 @@ Status TransferInt(RecordReader* reader,
array_statistics->is_max_exact = true;
}
}
array_data->statistics = std::move(array_statistics);

data->statistics = std::move(array_statistics);
}

template <typename ArrowType, typename ParquetType>
Status TransferInt(RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx, const std::shared_ptr<Field>& field,
Datum* out) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
ARROW_ASSIGN_OR_RAISE(auto data,
::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool));

auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
int64_t null_count = 0;
std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
if (field->nullable()) {
null_count = reader->null_count();
buffers[0] = reader->ReleaseIsValid();
}
auto array_data =
::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count);
AttachStatistics<ArrowType, ParquetType>(array_data.get(), std::move(metadata), ctx);
*out = std::make_shared<ArrayType<ArrowType>>(std::move(array_data));
return Status::OK();
}

std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
const std::shared_ptr<Field>& field) {
template <typename ArrowType, typename ParquetType>
std::shared_ptr<Array> TransferZeroCopy(
RecordReader* reader, std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx, const std::shared_ptr<Field>& field) {
std::shared_ptr<::arrow::ArrayData> data;
if (field->nullable()) {
std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
Expand All @@ -388,7 +409,8 @@ std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
data = std::make_shared<::arrow::ArrayData>(field->type(), reader->values_written(),
std::move(buffers), /*null_count=*/0);
}
return ::arrow::MakeArray(data);
AttachStatistics<ArrowType, ParquetType>(data.get(), std::move(metadata), ctx);
return ::arrow::MakeArray(std::move(data));
}

Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, Datum* out) {
Expand Down Expand Up @@ -794,10 +816,20 @@ Status TransferColumnData(RecordReader* reader,
break;
}
case ::arrow::Type::INT32:
result = TransferZeroCopy<::arrow::Int32Type, Int32Type>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::INT64:
result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::FLOAT:
result = TransferZeroCopy<::arrow::FloatType, FloatType>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::DOUBLE:
result = TransferZeroCopy(reader, value_field);
result = TransferZeroCopy<::arrow::DoubleType, DoubleType>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::BOOL:
RETURN_NOT_OK(TransferBool(reader, value_field->nullable(), pool, &result));
Expand Down Expand Up @@ -895,7 +927,8 @@ Status TransferColumnData(RecordReader* reader,
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
case ::arrow::TimeUnit::NANO:
result = TransferZeroCopy(reader, value_field);
result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
reader, std::move(metadata), ctx, value_field);
break;
default:
return Status::NotImplemented("TimeUnit not supported");
Expand Down

0 comments on commit 14f5ffa

Please sign in to comment.