diff --git a/cpp/src/arrow/array/array_base.h b/cpp/src/arrow/array/array_base.h index 716ae0722069e..c36d4518bdbd9 100644 --- a/cpp/src/arrow/array/array_base.h +++ b/cpp/src/arrow/array/array_base.h @@ -232,6 +232,14 @@ class ARROW_EXPORT Array { /// \return DeviceAllocationType DeviceAllocationType device_type() const { return data_->device_type(); } + /// \brief Return the statistics of this Array + /// + /// This just delegates to calling statistics on the underlying ArrayData + /// object which backs this Array. + /// + /// \return const ArrayStatistics& + const ArrayStatistics& statistics() const { return data_->statistics; } + protected: Array() = default; ARROW_DEFAULT_MOVE_AND_ASSIGN(Array); diff --git a/cpp/src/arrow/array/array_primitive.h b/cpp/src/arrow/array/array_primitive.h index e6df92e3b788c..900b4bbdf526b 100644 --- a/cpp/src/arrow/array/array_primitive.h +++ b/cpp/src/arrow/array/array_primitive.h @@ -68,6 +68,11 @@ class ARROW_EXPORT BooleanArray : public PrimitiveArray { IteratorType end() const { return IteratorType(*this, length()); } + /// \brief Return the statistics for boolean. + const BooleanArrayStatistics& statistics() const { + return static_cast(Array::statistics()); + } + protected: using PrimitiveArray::PrimitiveArray; }; @@ -119,6 +124,11 @@ class NumericArray : public PrimitiveArray { IteratorType end() const { return IteratorType(*this, length()); } + /// \brief Return the typed statistics. + const TypedArrayStatistics& statistics() const { + return static_cast&>(Array::statistics()); + } + protected: using PrimitiveArray::PrimitiveArray; }; diff --git a/cpp/src/arrow/array/data.h b/cpp/src/arrow/array/data.h index e0508fe6980a7..14eaed67e71d7 100644 --- a/cpp/src/arrow/array/data.h +++ b/cpp/src/arrow/array/data.h @@ -24,6 +24,7 @@ #include #include +#include "arrow/array/statistics.h" #include "arrow/buffer.h" #include "arrow/result.h" #include "arrow/type.h" @@ -152,7 +153,8 @@ struct ARROW_EXPORT ArrayData { offset(other.offset), buffers(std::move(other.buffers)), child_data(std::move(other.child_data)), - dictionary(std::move(other.dictionary)) { + dictionary(std::move(other.dictionary)), + statistics(std::move(other.statistics)) { SetNullCount(other.null_count); } @@ -163,7 +165,8 @@ struct ARROW_EXPORT ArrayData { offset(other.offset), buffers(other.buffers), child_data(other.child_data), - dictionary(other.dictionary) { + dictionary(other.dictionary), + statistics(other.statistics) { SetNullCount(other.null_count); } @@ -176,6 +179,7 @@ struct ARROW_EXPORT ArrayData { buffers = std::move(other.buffers); child_data = std::move(other.child_data); dictionary = std::move(other.dictionary); + statistics = std::move(other.statistics); return *this; } @@ -188,6 +192,7 @@ struct ARROW_EXPORT ArrayData { buffers = other.buffers; child_data = other.child_data; dictionary = other.dictionary; + statistics = other.statistics; return *this; } @@ -390,6 +395,9 @@ struct ARROW_EXPORT ArrayData { // The dictionary for this Array, if any. Only used for dictionary type std::shared_ptr dictionary; + + // The statistics for this Array. + ArrayStatistics statistics{}; }; /// \brief A non-owning Buffer reference diff --git a/cpp/src/arrow/array/statistics.h b/cpp/src/arrow/array/statistics.h new file mode 100644 index 0000000000000..c080087a00b5e --- /dev/null +++ b/cpp/src/arrow/array/statistics.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/util/visibility.h" + +namespace arrow { + +/// \brief Statistics for an Array +/// +/// Apache Arrow format doesn't have statistics but data source such +/// as Apache Parquet may have statistics. Statistics associate with +/// data source can be read unified API via this class. +struct ARROW_EXPORT ArrayStatistics { + public: + using ElementBufferType = std::variant; + + ArrayStatistics() = default; + ~ArrayStatistics() = default; + + /// \brief The number of null values, may not be set + std::optional null_count = std::nullopt; + + /// \brief The number of distinct values, may not be set + std::optional distinct_count = std::nullopt; + + /// \brief The current minimum value buffer, may not be set + std::optional min_buffer = std::nullopt; + + /// \brief The current maximum value buffer, may not be set + std::optional max_buffer = std::nullopt; + + /// \brief Check two Statistics for equality + bool Equals(const ArrayStatistics& other) const { + return null_count == other.null_count && distinct_count == other.distinct_count && + min_buffer == other.min_buffer && max_buffer == other.max_buffer; + } +}; + +/// \brief A typed implementation of ArrayStatistics +template +class TypedArrayStatistics : public ArrayStatistics { + public: + using ElementType = typename TypeClass::c_type; + + /// \brief The current minimum value, may not be set + std::optional min() const { + if (min_buffer && std::holds_alternative(*min_buffer)) { + return std::get(*min_buffer); + } else { + return std::nullopt; + } + } + + /// \brief The current maximum value, may not be set + std::optional max() const { + if (max_buffer && std::holds_alternative(*max_buffer)) { + return std::get(*max_buffer); + } else { + return std::nullopt; + } + } +}; + +} // namespace arrow diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index 08777d247edbf..54d3e09f01d26 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -89,6 +89,9 @@ using ChunkedArrayVector = std::vector>; using RecordBatchVector = std::vector>; using RecordBatchIterator = Iterator>; +template +class TypedArrayStatistics; + class DictionaryType; class DictionaryArray; struct DictionaryScalar; @@ -102,6 +105,7 @@ class FixedWidthType; class BooleanType; class BooleanArray; +using BooleanArrayStatistics = TypedArrayStatistics; class BooleanBuilder; struct BooleanScalar; @@ -215,11 +219,12 @@ class NumericBuilder; template class NumericTensor; -#define _NUMERIC_TYPE_DECL(KLASS) \ - class KLASS##Type; \ - using KLASS##Array = NumericArray; \ - using KLASS##Builder = NumericBuilder; \ - struct KLASS##Scalar; \ +#define _NUMERIC_TYPE_DECL(KLASS) \ + class KLASS##Type; \ + using KLASS##Array = NumericArray; \ + using KLASS##ArrayStatistics = TypedArrayStatistics; \ + using KLASS##Builder = NumericBuilder; \ + struct KLASS##Scalar; \ using KLASS##Tensor = NumericTensor; _NUMERIC_TYPE_DECL(Int8) diff --git a/cpp/src/parquet/arrow/arrow_statistics_test.cc b/cpp/src/parquet/arrow/arrow_statistics_test.cc index ad4496933ef4c..4b4bf5e740609 100644 --- a/cpp/src/parquet/arrow/arrow_statistics_test.cc +++ b/cpp/src/parquet/arrow/arrow_statistics_test.cc @@ -17,12 +17,14 @@ #include "gtest/gtest.h" +#include "arrow/array.h" #include "arrow/table.h" #include "arrow/testing/gtest_util.h" #include "parquet/api/reader.h" #include "parquet/api/writer.h" +#include "parquet/arrow/reader.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/writer.h" #include "parquet/file_writer.h" @@ -156,4 +158,56 @@ INSTANTIATE_TEST_SUITE_P( /*expected_min=*/"z", /*expected_max=*/"z"})); +namespace { +::arrow::Result> StatisticsReadArray( + std::shared_ptr<::arrow::DataType> data_type, const std::string& json) { + auto schema = ::arrow::schema({::arrow::field("column", data_type)}); + auto array = ::arrow::ArrayFromJSON(data_type, json); + auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array}); + ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create()); + ARROW_ASSIGN_OR_RAISE(auto writer, + FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink)); + ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch)); + ARROW_RETURN_NOT_OK(writer->Close()); + ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish()); + + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); + std::unique_ptr file_reader; + ARROW_RETURN_NOT_OK( + FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader)); + std::shared_ptr<::arrow::ChunkedArray> chunked_array; + ARROW_RETURN_NOT_OK(file_reader->ReadColumn(0, &chunked_array)); + return chunked_array->chunk(0); +} +} // namespace + +TEST(TestStatisticsRead, Boolean) { + ASSERT_OK_AND_ASSIGN(auto array, + StatisticsReadArray(::arrow::boolean(), R"([true, null, true])")); + auto typed_array = std::static_pointer_cast<::arrow::BooleanArray>(array); + auto statistics = typed_array->statistics(); + ASSERT_EQ(true, statistics.null_count.has_value()); + ASSERT_EQ(1, statistics.null_count.value()); + ASSERT_EQ(false, statistics.distinct_count.has_value()); + ASSERT_EQ(true, statistics.min().has_value()); + ASSERT_EQ(true, statistics.min().value()); + ASSERT_EQ(true, statistics.max().has_value()); + ASSERT_EQ(true, statistics.max().value()); +} + +TEST(TestStatisticsRead, Int8) { + ASSERT_OK_AND_ASSIGN(auto array, + StatisticsReadArray(::arrow::int8(), R"([1, null, -1, 1])")); + auto typed_array = std::static_pointer_cast<::arrow::Int8Array>(array); + auto statistics = typed_array->statistics(); + ASSERT_EQ(true, statistics.null_count.has_value()); + ASSERT_EQ(1, statistics.null_count.value()); + ASSERT_EQ(false, statistics.distinct_count.has_value()); + ASSERT_EQ(true, statistics.min().has_value()); + ASSERT_EQ(-1, statistics.min().value()); + ASSERT_EQ(true, statistics.max().has_value()); + ASSERT_EQ(1, statistics.max().value()); +} + } // namespace parquet::arrow diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 285e2a597389d..42ce6d4fc5b14 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -485,8 +485,9 @@ class LeafReader : public ColumnReaderImpl { NextRowGroup(); } } - RETURN_NOT_OK( - TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, &out_)); + RETURN_NOT_OK(TransferColumnData(record_reader_.get(), + input_->column_chunk_metadata(), field_, descr_, + ctx_->pool, &out_)); return Status::OK(); END_PARQUET_CATCH_EXCEPTIONS } diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index e5aef5a45b5f3..0bb6b6104b2da 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -319,8 +319,9 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) { } template -Status TransferInt(RecordReader* reader, MemoryPool* pool, - const std::shared_ptr& field, Datum* out) { +Status TransferInt(RecordReader* reader, + std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, + MemoryPool* pool, const std::shared_ptr& field, Datum* out) { using ArrowCType = typename ArrowType::c_type; using ParquetCType = typename ParquetType::c_type; int64_t length = reader->values_written(); @@ -330,15 +331,30 @@ Status TransferInt(RecordReader* reader, MemoryPool* pool, auto values = reinterpret_cast(reader->values()); auto out_ptr = reinterpret_cast(data->mutable_data()); std::copy(values, values + length, out_ptr); + int64_t null_count = 0; + std::vector> buffers = {nullptr, std::move(data)}; if (field->nullable()) { - *out = std::make_shared>(field->type(), length, std::move(data), - reader->ReleaseIsValid(), - reader->null_count()); - } else { - *out = - std::make_shared>(field->type(), length, std::move(data), - /*null_bitmap=*/nullptr, /*null_count=*/0); + null_count = reader->null_count(); + buffers[0] = reader->ReleaseIsValid(); + } + auto array_data = + ::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count); + array_data->statistics.null_count = null_count; + auto statistics = metadata->statistics().get(); + if (statistics) { + if (statistics->HasDistinctCount()) { + array_data->statistics.distinct_count = statistics->distinct_count(); + } + if (statistics->HasMinMax()) { + auto typed_statistics = + static_cast<::parquet::TypedStatistics*>(statistics); + array_data->statistics.min_buffer = + static_cast(typed_statistics->min()); + array_data->statistics.max_buffer = + static_cast(typed_statistics->max()); + } } + *out = std::make_shared>(std::move(array_data)); return Status::OK(); } @@ -358,7 +374,9 @@ std::shared_ptr TransferZeroCopy(RecordReader* reader, return ::arrow::MakeArray(data); } -Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, Datum* out) { +Status TransferBool(RecordReader* reader, + std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, + bool nullable, MemoryPool* pool, Datum* out) { int64_t length = reader->values_written(); const int64_t buffer_size = bit_util::BytesForBits(length); @@ -375,13 +393,27 @@ Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, Datum } } + int64_t null_count = 0; + std::vector> buffers = {nullptr, std::move(data)}; if (nullable) { - *out = std::make_shared(length, std::move(data), - reader->ReleaseIsValid(), reader->null_count()); - } else { - *out = std::make_shared(length, std::move(data), - /*null_bitmap=*/nullptr, /*null_count=*/0); + null_count = reader->null_count(); + buffers[0] = reader->ReleaseIsValid(); + } + auto array_data = ::arrow::ArrayData::Make(::arrow::boolean(), length, + std::move(buffers), null_count); + array_data->statistics.null_count = null_count; + auto statistics = metadata->statistics().get(); + if (statistics) { + if (statistics->HasDistinctCount()) { + array_data->statistics.distinct_count = statistics->distinct_count(); + } + if (statistics->HasMinMax()) { + auto bool_statistics = static_cast<::parquet::BoolStatistics*>(statistics); + array_data->statistics.min_buffer = bool_statistics->min(); + array_data->statistics.max_buffer = bool_statistics->max(); + } } + *out = std::make_shared(std::move(array_data)); return Status::OK(); } @@ -728,19 +760,23 @@ Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool, } // namespace -#define TRANSFER_INT32(ENUM, ArrowType) \ - case ::arrow::Type::ENUM: { \ - Status s = TransferInt(reader, pool, value_field, &result); \ - RETURN_NOT_OK(s); \ +#define TRANSFER_INT32(ENUM, ArrowType) \ + case ::arrow::Type::ENUM: { \ + Status s = TransferInt(reader, std::move(metadata), pool, \ + value_field, &result); \ + RETURN_NOT_OK(s); \ } break; -#define TRANSFER_INT64(ENUM, ArrowType) \ - case ::arrow::Type::ENUM: { \ - Status s = TransferInt(reader, pool, value_field, &result); \ - RETURN_NOT_OK(s); \ +#define TRANSFER_INT64(ENUM, ArrowType) \ + case ::arrow::Type::ENUM: { \ + Status s = TransferInt(reader, std::move(metadata), pool, \ + value_field, &result); \ + RETURN_NOT_OK(s); \ } break; -Status TransferColumnData(RecordReader* reader, const std::shared_ptr& value_field, +Status TransferColumnData(RecordReader* reader, + std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, + const std::shared_ptr& value_field, const ColumnDescriptor* descr, MemoryPool* pool, std::shared_ptr* out) { Datum result; @@ -762,7 +798,8 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr& va result = TransferZeroCopy(reader, value_field); break; case ::arrow::Type::BOOL: - RETURN_NOT_OK(TransferBool(reader, value_field->nullable(), pool, &result)); + RETURN_NOT_OK(TransferBool(reader, std::move(metadata), value_field->nullable(), + pool, &result)); break; TRANSFER_INT32(UINT8, ::arrow::UInt8Type); TRANSFER_INT32(INT8, ::arrow::Int8Type); diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index cf9dbb86577b5..3efd4d2cd9e75 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -66,7 +66,8 @@ class FileColumnIterator { : column_index_(column_index), reader_(reader), schema_(reader->metadata()->schema()), - row_groups_(row_groups.begin(), row_groups.end()) {} + row_groups_(row_groups.begin(), row_groups.end()), + row_group_index_(-1) {} virtual ~FileColumnIterator() {} @@ -75,7 +76,8 @@ class FileColumnIterator { return nullptr; } - auto row_group_reader = reader_->RowGroup(row_groups_.front()); + row_group_index_ = row_groups_.front(); + auto row_group_reader = reader_->RowGroup(row_group_index_); row_groups_.pop_front(); return row_group_reader->GetColumnPageReader(column_index_); } @@ -86,19 +88,31 @@ class FileColumnIterator { std::shared_ptr metadata() const { return reader_->metadata(); } + std::unique_ptr row_group_metadata() const { + return metadata()->RowGroup(row_group_index_); + } + + std::unique_ptr column_chunk_metadata() const { + return row_group_metadata()->ColumnChunk(column_index_); + } + int column_index() const { return column_index_; } + int row_group_index() const { return row_group_index_; } + protected: int column_index_; ParquetFileReader* reader_; const SchemaDescriptor* schema_; std::deque row_groups_; + int row_group_index_; }; using FileColumnIteratorFactory = std::function; Status TransferColumnData(::parquet::internal::RecordReader* reader, + std::unique_ptr<::parquet::ColumnChunkMetaData> metadata, const std::shared_ptr<::arrow::Field>& value_field, const ColumnDescriptor* descr, ::arrow::MemoryPool* pool, std::shared_ptr<::arrow::ChunkedArray>* out);