From a15956f697dddce4a08198ff3d36ac3e326d069e Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Mon, 9 Sep 2024 08:58:25 +0900 Subject: [PATCH] GH-43983: [C++][Parquet] Add support for arrow::ArrayStatistics: zero-copy types (#43984) ### Rationale for this change Statistics is useful for fast processing. Target types: * `Int32` * `Int64` * `Float` * `Double` * `Timestamp[milli]` * `Timestamp[micro]` * `Timestamp[nano]` ### What changes are included in this PR? Map `ColumnChunkMetaData` information to `arrow::ArrayStatistics`. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. * GitHub Issue: #43983 Authored-by: Sutou Kouhei Signed-off-by: Sutou Kouhei --- .../parquet/arrow/arrow_statistics_test.cc | 66 ++++++++++--- cpp/src/parquet/arrow/reader_internal.cc | 96 +++++++++++++------ 2 files changed, 117 insertions(+), 45 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_statistics_test.cc b/cpp/src/parquet/arrow/arrow_statistics_test.cc index 2638358f1ce7c..5011bf89112c6 100644 --- a/cpp/src/parquet/arrow/arrow_statistics_test.cc +++ b/cpp/src/parquet/arrow/arrow_statistics_test.cc @@ -18,6 +18,8 @@ #include "gtest/gtest.h" #include "arrow/array.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/array/builder_time.h" #include "arrow/table.h" #include "arrow/testing/gtest_util.h" @@ -183,9 +185,8 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) { namespace { ::arrow::Result> StatisticsReadArray( - std::shared_ptr<::arrow::DataType> data_type, const std::string& json) { + std::shared_ptr<::arrow::DataType> data_type, std::shared_ptr<::arrow::Array> array) { auto schema = ::arrow::schema({::arrow::field("column", data_type)}); - auto array = ::arrow::ArrayFromJSON(data_type, json); auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array}); ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create()); const auto arrow_writer_properties = @@ -211,21 +212,27 @@ ::arrow::Result> StatisticsReadArray( template void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) { using ArrowArrayType = typename ::arrow::TypeTraits::ArrayType; + using ArrowArrayBuilder = typename ::arrow::TypeTraits::BuilderType; using ArrowCType = typename ArrowType::c_type; - constexpr auto min = std::numeric_limits::min(); + constexpr auto min = std::numeric_limits::lowest(); constexpr auto max = std::numeric_limits::max(); - std::string json; - json += "["; - json += std::to_string(max); - json += ", null, "; - json += std::to_string(min); - json += ", "; - json += std::to_string(max); - json += "]"; - ASSERT_OK_AND_ASSIGN(auto array, StatisticsReadArray(arrow_type, json)); - auto typed_array = std::static_pointer_cast(array); - auto statistics = typed_array->statistics(); + std::unique_ptr builder; + if constexpr (::arrow::TypeTraits::is_parameter_free) { + builder = std::make_unique(::arrow::default_memory_pool()); + } else { + builder = + std::make_unique(arrow_type, ::arrow::default_memory_pool()); + } + ASSERT_OK(builder->Append(max)); + ASSERT_OK(builder->AppendNull()); + ASSERT_OK(builder->Append(min)); + ASSERT_OK(builder->Append(max)); + ASSERT_OK_AND_ASSIGN(auto built_array, builder->Finish()); + ASSERT_OK_AND_ASSIGN(auto read_array, + StatisticsReadArray(arrow_type, std::move(built_array))); + auto typed_read_array = std::static_pointer_cast(read_array); + auto statistics = typed_read_array->statistics(); ASSERT_NE(nullptr, statistics); ASSERT_EQ(true, statistics->null_count.has_value()); ASSERT_EQ(1, statistics->null_count.value()); @@ -257,14 +264,30 @@ TEST(TestStatisticsRead, UInt16) { TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16()); } +TEST(TestStatisticsRead, Int32) { + TestStatisticsReadArray<::arrow::Int32Type, int64_t>(::arrow::int32()); +} + TEST(TestStatisticsRead, UInt32) { TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32()); } +TEST(TestStatisticsRead, Int64) { + TestStatisticsReadArray<::arrow::Int64Type, int64_t>(::arrow::int64()); +} + TEST(TestStatisticsRead, UInt64) { TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64()); } +TEST(TestStatisticsRead, Float) { + TestStatisticsReadArray<::arrow::FloatType, double>(::arrow::float32()); +} + +TEST(TestStatisticsRead, Double) { + TestStatisticsReadArray<::arrow::DoubleType, double>(::arrow::float64()); +} + TEST(TestStatisticsRead, Date32) { TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32()); } @@ -279,6 +302,21 @@ TEST(TestStatisticsRead, Time64) { ::arrow::time64(::arrow::TimeUnit::MICRO)); } +TEST(TestStatisticsRead, TimestampMilli) { + TestStatisticsReadArray<::arrow::TimestampType, int64_t>( + ::arrow::timestamp(::arrow::TimeUnit::MILLI)); +} + +TEST(TestStatisticsRead, TimestampMicro) { + TestStatisticsReadArray<::arrow::TimestampType, int64_t>( + ::arrow::timestamp(::arrow::TimeUnit::MICRO)); +} + +TEST(TestStatisticsRead, TimestampNano) { + TestStatisticsReadArray<::arrow::TimestampType, int64_t>( + ::arrow::timestamp(::arrow::TimeUnit::NANO)); +} + TEST(TestStatisticsRead, Duration) { TestStatisticsReadArray<::arrow::DurationType, int64_t>( ::arrow::duration(::arrow::TimeUnit::NANO)); diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index e6c2d95e1fbf7..aa84a7a92bbe1 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -319,30 +319,20 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) { } template -Status TransferInt(RecordReader* reader, - std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, - const ReaderContext* ctx, const std::shared_ptr& field, - Datum* out) { +void AttachStatistics(::arrow::ArrayData* data, + std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, + const ReaderContext* ctx) { using ArrowCType = typename ArrowType::c_type; - using ParquetCType = typename ParquetType::c_type; - int64_t length = reader->values_written(); - ARROW_ASSIGN_OR_RAISE(auto data, - ::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool)); - auto values = reinterpret_cast(reader->values()); - auto out_ptr = reinterpret_cast(data->mutable_data()); - std::copy(values, values + length, out_ptr); - int64_t null_count = 0; - std::vector> buffers = {nullptr, std::move(data)}; - if (field->nullable()) { - null_count = reader->null_count(); - buffers[0] = reader->ReleaseIsValid(); + auto statistics = metadata->statistics().get(); + if (data->null_count == ::arrow::kUnknownNullCount && !statistics) { + return; } - auto array_data = - ::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count); + auto array_statistics = std::make_shared<::arrow::ArrayStatistics>(); - array_statistics->null_count = null_count; - auto statistics = metadata->statistics().get(); + if (data->null_count != ::arrow::kUnknownNullCount) { + array_statistics->null_count = data->null_count; + } if (statistics) { if (statistics->HasDistinctCount()) { array_statistics->distinct_count = statistics->distinct_count(); @@ -352,17 +342,21 @@ Status TransferInt(RecordReader* reader, static_cast<::parquet::TypedStatistics*>(statistics); const ArrowCType min = typed_statistics->min(); const ArrowCType max = typed_statistics->max(); - if (std::is_signed::value) { + if (std::is_floating_point::value) { + array_statistics->min = static_cast(min); + array_statistics->max = static_cast(max); + } else if (std::is_signed::value) { array_statistics->min = static_cast(min); array_statistics->max = static_cast(max); } else { array_statistics->min = static_cast(min); array_statistics->max = static_cast(max); } - // We can assume that integer based min/max are always exact if - // they exist. Apache Parquet's "Statistics" has - // "is_min_value_exact" and "is_max_value_exact" but we can - // ignore them for integer based min/max. + // We can assume that integer and floating point number based + // min/max are always exact if they exist. Apache Parquet's + // "Statistics" has "is_min_value_exact" and + // "is_max_value_exact" but we can ignore them for integer and + // floating point number based min/max. // // See also the discussion at dev@parquet.apache.org: // https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4 @@ -370,13 +364,41 @@ Status TransferInt(RecordReader* reader, array_statistics->is_max_exact = true; } } - array_data->statistics = std::move(array_statistics); + + data->statistics = std::move(array_statistics); +} + +template +Status TransferInt(RecordReader* reader, + std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, + const ReaderContext* ctx, const std::shared_ptr& field, + Datum* out) { + using ArrowCType = typename ArrowType::c_type; + using ParquetCType = typename ParquetType::c_type; + int64_t length = reader->values_written(); + ARROW_ASSIGN_OR_RAISE(auto data, + ::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool)); + + auto values = reinterpret_cast(reader->values()); + auto out_ptr = reinterpret_cast(data->mutable_data()); + std::copy(values, values + length, out_ptr); + int64_t null_count = 0; + std::vector> buffers = {nullptr, std::move(data)}; + if (field->nullable()) { + null_count = reader->null_count(); + buffers[0] = reader->ReleaseIsValid(); + } + auto array_data = + ::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count); + AttachStatistics(array_data.get(), std::move(metadata), ctx); *out = std::make_shared>(std::move(array_data)); return Status::OK(); } -std::shared_ptr TransferZeroCopy(RecordReader* reader, - const std::shared_ptr& field) { +template +std::shared_ptr TransferZeroCopy( + RecordReader* reader, std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, + const ReaderContext* ctx, const std::shared_ptr& field) { std::shared_ptr<::arrow::ArrayData> data; if (field->nullable()) { std::vector> buffers = {reader->ReleaseIsValid(), @@ -388,7 +410,8 @@ std::shared_ptr TransferZeroCopy(RecordReader* reader, data = std::make_shared<::arrow::ArrayData>(field->type(), reader->values_written(), std::move(buffers), /*null_count=*/0); } - return ::arrow::MakeArray(data); + AttachStatistics(data.get(), std::move(metadata), ctx); + return ::arrow::MakeArray(std::move(data)); } Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, Datum* out) { @@ -794,10 +817,20 @@ Status TransferColumnData(RecordReader* reader, break; } case ::arrow::Type::INT32: + result = TransferZeroCopy<::arrow::Int32Type, Int32Type>( + reader, std::move(metadata), ctx, value_field); + break; case ::arrow::Type::INT64: + result = TransferZeroCopy<::arrow::Int64Type, Int64Type>( + reader, std::move(metadata), ctx, value_field); + break; case ::arrow::Type::FLOAT: + result = TransferZeroCopy<::arrow::FloatType, FloatType>( + reader, std::move(metadata), ctx, value_field); + break; case ::arrow::Type::DOUBLE: - result = TransferZeroCopy(reader, value_field); + result = TransferZeroCopy<::arrow::DoubleType, DoubleType>( + reader, std::move(metadata), ctx, value_field); break; case ::arrow::Type::BOOL: RETURN_NOT_OK(TransferBool(reader, value_field->nullable(), pool, &result)); @@ -895,7 +928,8 @@ Status TransferColumnData(RecordReader* reader, case ::arrow::TimeUnit::MILLI: case ::arrow::TimeUnit::MICRO: case ::arrow::TimeUnit::NANO: - result = TransferZeroCopy(reader, value_field); + result = TransferZeroCopy<::arrow::Int64Type, Int64Type>( + reader, std::move(metadata), ctx, value_field); break; default: return Status::NotImplemented("TimeUnit not supported");