diff --git a/cpp/src/parquet/arrow/arrow_statistics_test.cc b/cpp/src/parquet/arrow/arrow_statistics_test.cc index a19303c3dc03a..2638358f1ce7c 100644 --- a/cpp/src/parquet/arrow/arrow_statistics_test.cc +++ b/cpp/src/parquet/arrow/arrow_statistics_test.cc @@ -17,12 +17,14 @@ #include "gtest/gtest.h" +#include "arrow/array.h" #include "arrow/table.h" #include "arrow/testing/gtest_util.h" #include "parquet/api/reader.h" #include "parquet/api/writer.h" +#include "parquet/arrow/reader.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/writer.h" #include "parquet/file_writer.h" @@ -179,4 +181,107 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) { ASSERT_FALSE(stats->HasMinMax()); } +namespace { +::arrow::Result> StatisticsReadArray( + std::shared_ptr<::arrow::DataType> data_type, const std::string& json) { + auto schema = ::arrow::schema({::arrow::field("column", data_type)}); + auto array = ::arrow::ArrayFromJSON(data_type, json); + auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array}); + ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create()); + const auto arrow_writer_properties = + parquet::ArrowWriterProperties::Builder().store_schema()->build(); + ARROW_ASSIGN_OR_RAISE( + auto writer, + FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink, + default_writer_properties(), arrow_writer_properties)); + ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch)); + ARROW_RETURN_NOT_OK(writer->Close()); + ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish()); + + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); + std::unique_ptr file_reader; + ARROW_RETURN_NOT_OK( + FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader)); + std::shared_ptr<::arrow::ChunkedArray> chunked_array; + ARROW_RETURN_NOT_OK(file_reader->ReadColumn(0, &chunked_array)); + return chunked_array->chunk(0); +} + +template +void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) { + using ArrowArrayType = typename ::arrow::TypeTraits::ArrayType; + using ArrowCType = typename ArrowType::c_type; + constexpr auto min = std::numeric_limits::min(); + constexpr auto max = std::numeric_limits::max(); + + std::string json; + json += "["; + json += std::to_string(max); + json += ", null, "; + json += std::to_string(min); + json += ", "; + json += std::to_string(max); + json += "]"; + ASSERT_OK_AND_ASSIGN(auto array, StatisticsReadArray(arrow_type, json)); + auto typed_array = std::static_pointer_cast(array); + auto statistics = typed_array->statistics(); + ASSERT_NE(nullptr, statistics); + ASSERT_EQ(true, statistics->null_count.has_value()); + ASSERT_EQ(1, statistics->null_count.value()); + ASSERT_EQ(false, statistics->distinct_count.has_value()); + ASSERT_EQ(true, statistics->min.has_value()); + ASSERT_EQ(true, std::holds_alternative(*statistics->min)); + ASSERT_EQ(min, std::get(*statistics->min)); + ASSERT_EQ(true, statistics->is_min_exact); + ASSERT_EQ(true, statistics->max.has_value()); + ASSERT_EQ(true, std::holds_alternative(*statistics->max)); + ASSERT_EQ(max, std::get(*statistics->max)); + ASSERT_EQ(true, statistics->is_min_exact); +} +} // namespace + +TEST(TestStatisticsRead, Int8) { + TestStatisticsReadArray<::arrow::Int8Type, int64_t>(::arrow::int8()); +} + +TEST(TestStatisticsRead, UInt8) { + TestStatisticsReadArray<::arrow::UInt8Type, uint64_t>(::arrow::uint8()); +} + +TEST(TestStatisticsRead, Int16) { + TestStatisticsReadArray<::arrow::Int16Type, int64_t>(::arrow::int16()); +} + +TEST(TestStatisticsRead, UInt16) { + TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16()); +} + +TEST(TestStatisticsRead, UInt32) { + TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32()); +} + +TEST(TestStatisticsRead, UInt64) { + TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64()); +} + +TEST(TestStatisticsRead, Date32) { + TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32()); +} + +TEST(TestStatisticsRead, Time32) { + TestStatisticsReadArray<::arrow::Time32Type, int64_t>( + ::arrow::time32(::arrow::TimeUnit::MILLI)); +} + +TEST(TestStatisticsRead, Time64) { + TestStatisticsReadArray<::arrow::Time64Type, int64_t>( + ::arrow::time64(::arrow::TimeUnit::MICRO)); +} + +TEST(TestStatisticsRead, Duration) { + TestStatisticsReadArray<::arrow::DurationType, int64_t>( + ::arrow::duration(::arrow::TimeUnit::NANO)); +} + } // namespace parquet::arrow diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 285e2a597389d..4f57c3f4f56f7 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -485,8 +485,9 @@ class LeafReader : public ColumnReaderImpl { NextRowGroup(); } } - RETURN_NOT_OK( - TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, &out_)); + RETURN_NOT_OK(TransferColumnData(record_reader_.get(), + input_->column_chunk_metadata(), field_, descr_, + ctx_.get(), &out_)); return Status::OK(); END_PARQUET_CATCH_EXCEPTIONS } diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index e5aef5a45b5f3..e6c2d95e1fbf7 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -319,26 +319,59 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) { } template -Status TransferInt(RecordReader* reader, MemoryPool* pool, - const std::shared_ptr& field, Datum* out) { +Status TransferInt(RecordReader* reader, + std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, + const ReaderContext* ctx, const std::shared_ptr& field, + Datum* out) { using ArrowCType = typename ArrowType::c_type; using ParquetCType = typename ParquetType::c_type; int64_t length = reader->values_written(); ARROW_ASSIGN_OR_RAISE(auto data, - ::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool)); + ::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool)); auto values = reinterpret_cast(reader->values()); auto out_ptr = reinterpret_cast(data->mutable_data()); std::copy(values, values + length, out_ptr); + int64_t null_count = 0; + std::vector> buffers = {nullptr, std::move(data)}; if (field->nullable()) { - *out = std::make_shared>(field->type(), length, std::move(data), - reader->ReleaseIsValid(), - reader->null_count()); - } else { - *out = - std::make_shared>(field->type(), length, std::move(data), - /*null_bitmap=*/nullptr, /*null_count=*/0); + null_count = reader->null_count(); + buffers[0] = reader->ReleaseIsValid(); + } + auto array_data = + ::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count); + auto array_statistics = std::make_shared<::arrow::ArrayStatistics>(); + array_statistics->null_count = null_count; + auto statistics = metadata->statistics().get(); + if (statistics) { + if (statistics->HasDistinctCount()) { + array_statistics->distinct_count = statistics->distinct_count(); + } + if (statistics->HasMinMax()) { + auto typed_statistics = + static_cast<::parquet::TypedStatistics*>(statistics); + const ArrowCType min = typed_statistics->min(); + const ArrowCType max = typed_statistics->max(); + if (std::is_signed::value) { + array_statistics->min = static_cast(min); + array_statistics->max = static_cast(max); + } else { + array_statistics->min = static_cast(min); + array_statistics->max = static_cast(max); + } + // We can assume that integer based min/max are always exact if + // they exist. Apache Parquet's "Statistics" has + // "is_min_value_exact" and "is_max_value_exact" but we can + // ignore them for integer based min/max. + // + // See also the discussion at dev@parquet.apache.org: + // https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4 + array_statistics->is_min_exact = true; + array_statistics->is_max_exact = true; + } } + array_data->statistics = std::move(array_statistics); + *out = std::make_shared>(std::move(array_data)); return Status::OK(); } @@ -728,21 +761,26 @@ Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool, } // namespace -#define TRANSFER_INT32(ENUM, ArrowType) \ - case ::arrow::Type::ENUM: { \ - Status s = TransferInt(reader, pool, value_field, &result); \ - RETURN_NOT_OK(s); \ +#define TRANSFER_INT32(ENUM, ArrowType) \ + case ::arrow::Type::ENUM: { \ + Status s = TransferInt(reader, std::move(metadata), ctx, \ + value_field, &result); \ + RETURN_NOT_OK(s); \ } break; -#define TRANSFER_INT64(ENUM, ArrowType) \ - case ::arrow::Type::ENUM: { \ - Status s = TransferInt(reader, pool, value_field, &result); \ - RETURN_NOT_OK(s); \ +#define TRANSFER_INT64(ENUM, ArrowType) \ + case ::arrow::Type::ENUM: { \ + Status s = TransferInt(reader, std::move(metadata), ctx, \ + value_field, &result); \ + RETURN_NOT_OK(s); \ } break; -Status TransferColumnData(RecordReader* reader, const std::shared_ptr& value_field, - const ColumnDescriptor* descr, MemoryPool* pool, +Status TransferColumnData(RecordReader* reader, + std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, + const std::shared_ptr& value_field, + const ColumnDescriptor* descr, const ReaderContext* ctx, std::shared_ptr* out) { + auto pool = ctx->pool; Datum result; std::shared_ptr chunked_result; switch (value_field->type()->id()) { diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index cf9dbb86577b5..fab56c888045d 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -66,7 +66,8 @@ class FileColumnIterator { : column_index_(column_index), reader_(reader), schema_(reader->metadata()->schema()), - row_groups_(row_groups.begin(), row_groups.end()) {} + row_groups_(row_groups.begin(), row_groups.end()), + row_group_index_(-1) {} virtual ~FileColumnIterator() {} @@ -75,7 +76,8 @@ class FileColumnIterator { return nullptr; } - auto row_group_reader = reader_->RowGroup(row_groups_.front()); + row_group_index_ = row_groups_.front(); + auto row_group_reader = reader_->RowGroup(row_group_index_); row_groups_.pop_front(); return row_group_reader->GetColumnPageReader(column_index_); } @@ -86,23 +88,29 @@ class FileColumnIterator { std::shared_ptr metadata() const { return reader_->metadata(); } + std::unique_ptr row_group_metadata() const { + return metadata()->RowGroup(row_group_index_); + } + + std::unique_ptr column_chunk_metadata() const { + return row_group_metadata()->ColumnChunk(column_index_); + } + int column_index() const { return column_index_; } + int row_group_index() const { return row_group_index_; } + protected: int column_index_; ParquetFileReader* reader_; const SchemaDescriptor* schema_; std::deque row_groups_; + int row_group_index_; }; using FileColumnIteratorFactory = std::function; -Status TransferColumnData(::parquet::internal::RecordReader* reader, - const std::shared_ptr<::arrow::Field>& value_field, - const ColumnDescriptor* descr, ::arrow::MemoryPool* pool, - std::shared_ptr<::arrow::ChunkedArray>* out); - struct ReaderContext { ParquetFileReader* reader; ::arrow::MemoryPool* pool; @@ -118,5 +126,11 @@ struct ReaderContext { } }; +Status TransferColumnData(::parquet::internal::RecordReader* reader, + std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, + const std::shared_ptr<::arrow::Field>& value_field, + const ColumnDescriptor* descr, const ReaderContext* ctx, + std::shared_ptr<::arrow::ChunkedArray>* out); + } // namespace arrow } // namespace parquet