Skip to content

Commit

Permalink
apacheGH-43944: [C++][Parquet] Add support for arrow::ArrayStatistics…
Browse files Browse the repository at this point in the history
…: non zero-copy int based types (apache#43945)

### Rationale for this change

Statistics is useful for fast processing.

Target types:

* `UInt8`
* `Int8`
* `UInt16`
* `Int16`
* `UInt32`
* `UInt64`
* `Date32`
* `Time32`
* `Time64`
* `Duration`

### What changes are included in this PR?

Map `ColumnChunkMetaData` information to `arrow::ArrayStatistics`.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.

* GitHub Issue: apache#43944

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
kou authored Sep 5, 2024
1 parent 9761241 commit 262d6f6
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 29 deletions.
105 changes: 105 additions & 0 deletions cpp/src/parquet/arrow/arrow_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

#include "gtest/gtest.h"

#include "arrow/array.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"

#include "parquet/api/reader.h"
#include "parquet/api/writer.h"

#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/file_writer.h"
Expand Down Expand Up @@ -179,4 +181,107 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) {
ASSERT_FALSE(stats->HasMinMax());
}

namespace {
::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
std::shared_ptr<::arrow::DataType> data_type, const std::string& json) {
auto schema = ::arrow::schema({::arrow::field("column", data_type)});
auto array = ::arrow::ArrayFromJSON(data_type, json);
auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array});
ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create());
const auto arrow_writer_properties =
parquet::ArrowWriterProperties::Builder().store_schema()->build();
ARROW_ASSIGN_OR_RAISE(
auto writer,
FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink,
default_writer_properties(), arrow_writer_properties));
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
ARROW_RETURN_NOT_OK(writer->Close());
ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish());

auto reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
std::unique_ptr<FileReader> file_reader;
ARROW_RETURN_NOT_OK(
FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &file_reader));
std::shared_ptr<::arrow::ChunkedArray> chunked_array;
ARROW_RETURN_NOT_OK(file_reader->ReadColumn(0, &chunked_array));
return chunked_array->chunk(0);
}

template <typename ArrowType, typename MinMaxType>
void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) {
using ArrowArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
using ArrowCType = typename ArrowType::c_type;
constexpr auto min = std::numeric_limits<ArrowCType>::min();
constexpr auto max = std::numeric_limits<ArrowCType>::max();

std::string json;
json += "[";
json += std::to_string(max);
json += ", null, ";
json += std::to_string(min);
json += ", ";
json += std::to_string(max);
json += "]";
ASSERT_OK_AND_ASSIGN(auto array, StatisticsReadArray(arrow_type, json));
auto typed_array = std::static_pointer_cast<ArrowArrayType>(array);
auto statistics = typed_array->statistics();
ASSERT_NE(nullptr, statistics);
ASSERT_EQ(true, statistics->null_count.has_value());
ASSERT_EQ(1, statistics->null_count.value());
ASSERT_EQ(false, statistics->distinct_count.has_value());
ASSERT_EQ(true, statistics->min.has_value());
ASSERT_EQ(true, std::holds_alternative<MinMaxType>(*statistics->min));
ASSERT_EQ(min, std::get<MinMaxType>(*statistics->min));
ASSERT_EQ(true, statistics->is_min_exact);
ASSERT_EQ(true, statistics->max.has_value());
ASSERT_EQ(true, std::holds_alternative<MinMaxType>(*statistics->max));
ASSERT_EQ(max, std::get<MinMaxType>(*statistics->max));
ASSERT_EQ(true, statistics->is_min_exact);
}
} // namespace

TEST(TestStatisticsRead, Int8) {
TestStatisticsReadArray<::arrow::Int8Type, int64_t>(::arrow::int8());
}

TEST(TestStatisticsRead, UInt8) {
TestStatisticsReadArray<::arrow::UInt8Type, uint64_t>(::arrow::uint8());
}

TEST(TestStatisticsRead, Int16) {
TestStatisticsReadArray<::arrow::Int16Type, int64_t>(::arrow::int16());
}

TEST(TestStatisticsRead, UInt16) {
TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16());
}

TEST(TestStatisticsRead, UInt32) {
TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32());
}

TEST(TestStatisticsRead, UInt64) {
TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64());
}

TEST(TestStatisticsRead, Date32) {
TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32());
}

TEST(TestStatisticsRead, Time32) {
TestStatisticsReadArray<::arrow::Time32Type, int64_t>(
::arrow::time32(::arrow::TimeUnit::MILLI));
}

TEST(TestStatisticsRead, Time64) {
TestStatisticsReadArray<::arrow::Time64Type, int64_t>(
::arrow::time64(::arrow::TimeUnit::MICRO));
}

TEST(TestStatisticsRead, Duration) {
TestStatisticsReadArray<::arrow::DurationType, int64_t>(
::arrow::duration(::arrow::TimeUnit::NANO));
}

} // namespace parquet::arrow
5 changes: 3 additions & 2 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,9 @@ class LeafReader : public ColumnReaderImpl {
NextRowGroup();
}
}
RETURN_NOT_OK(
TransferColumnData(record_reader_.get(), field_, descr_, ctx_->pool, &out_));
RETURN_NOT_OK(TransferColumnData(record_reader_.get(),
input_->column_chunk_metadata(), field_, descr_,
ctx_.get(), &out_));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}
Expand Down
78 changes: 58 additions & 20 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,26 +319,59 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) {
}

template <typename ArrowType, typename ParquetType>
Status TransferInt(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<Field>& field, Datum* out) {
Status TransferInt(RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx, const std::shared_ptr<Field>& field,
Datum* out) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
ARROW_ASSIGN_OR_RAISE(auto data,
::arrow::AllocateBuffer(length * sizeof(ArrowCType), pool));
::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool));

auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
int64_t null_count = 0;
std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
if (field->nullable()) {
*out = std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data),
reader->ReleaseIsValid(),
reader->null_count());
} else {
*out =
std::make_shared<ArrayType<ArrowType>>(field->type(), length, std::move(data),
/*null_bitmap=*/nullptr, /*null_count=*/0);
null_count = reader->null_count();
buffers[0] = reader->ReleaseIsValid();
}
auto array_data =
::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count);
auto array_statistics = std::make_shared<::arrow::ArrayStatistics>();
array_statistics->null_count = null_count;
auto statistics = metadata->statistics().get();
if (statistics) {
if (statistics->HasDistinctCount()) {
array_statistics->distinct_count = statistics->distinct_count();
}
if (statistics->HasMinMax()) {
auto typed_statistics =
static_cast<::parquet::TypedStatistics<ParquetType>*>(statistics);
const ArrowCType min = typed_statistics->min();
const ArrowCType max = typed_statistics->max();
if (std::is_signed<ArrowCType>::value) {
array_statistics->min = static_cast<int64_t>(min);
array_statistics->max = static_cast<int64_t>(max);
} else {
array_statistics->min = static_cast<uint64_t>(min);
array_statistics->max = static_cast<uint64_t>(max);
}
// We can assume that integer based min/max are always exact if
// they exist. Apache Parquet's "Statistics" has
// "is_min_value_exact" and "is_max_value_exact" but we can
// ignore them for integer based min/max.
//
// See also the discussion at dev@parquet.apache.org:
// https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4
array_statistics->is_min_exact = true;
array_statistics->is_max_exact = true;
}
}
array_data->statistics = std::move(array_statistics);
*out = std::make_shared<ArrayType<ArrowType>>(std::move(array_data));
return Status::OK();
}

Expand Down Expand Up @@ -728,21 +761,26 @@ Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool,

} // namespace

#define TRANSFER_INT32(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_field, &result); \
RETURN_NOT_OK(s); \
#define TRANSFER_INT32(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int32Type>(reader, std::move(metadata), ctx, \
value_field, &result); \
RETURN_NOT_OK(s); \
} break;

#define TRANSFER_INT64(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_field, &result); \
RETURN_NOT_OK(s); \
#define TRANSFER_INT64(ENUM, ArrowType) \
case ::arrow::Type::ENUM: { \
Status s = TransferInt<ArrowType, Int64Type>(reader, std::move(metadata), ctx, \
value_field, &result); \
RETURN_NOT_OK(s); \
} break;

Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& value_field,
const ColumnDescriptor* descr, MemoryPool* pool,
Status TransferColumnData(RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const std::shared_ptr<Field>& value_field,
const ColumnDescriptor* descr, const ReaderContext* ctx,
std::shared_ptr<ChunkedArray>* out) {
auto pool = ctx->pool;
Datum result;
std::shared_ptr<ChunkedArray> chunked_result;
switch (value_field->type()->id()) {
Expand Down
28 changes: 21 additions & 7 deletions cpp/src/parquet/arrow/reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class FileColumnIterator {
: column_index_(column_index),
reader_(reader),
schema_(reader->metadata()->schema()),
row_groups_(row_groups.begin(), row_groups.end()) {}
row_groups_(row_groups.begin(), row_groups.end()),
row_group_index_(-1) {}

virtual ~FileColumnIterator() {}

Expand All @@ -75,7 +76,8 @@ class FileColumnIterator {
return nullptr;
}

auto row_group_reader = reader_->RowGroup(row_groups_.front());
row_group_index_ = row_groups_.front();
auto row_group_reader = reader_->RowGroup(row_group_index_);
row_groups_.pop_front();
return row_group_reader->GetColumnPageReader(column_index_);
}
Expand All @@ -86,23 +88,29 @@ class FileColumnIterator {

std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); }

std::unique_ptr<RowGroupMetaData> row_group_metadata() const {
return metadata()->RowGroup(row_group_index_);
}

std::unique_ptr<ColumnChunkMetaData> column_chunk_metadata() const {
return row_group_metadata()->ColumnChunk(column_index_);
}

int column_index() const { return column_index_; }

int row_group_index() const { return row_group_index_; }

protected:
int column_index_;
ParquetFileReader* reader_;
const SchemaDescriptor* schema_;
std::deque<int> row_groups_;
int row_group_index_;
};

using FileColumnIteratorFactory =
std::function<FileColumnIterator*(int, ParquetFileReader*)>;

Status TransferColumnData(::parquet::internal::RecordReader* reader,
const std::shared_ptr<::arrow::Field>& value_field,
const ColumnDescriptor* descr, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::ChunkedArray>* out);

struct ReaderContext {
ParquetFileReader* reader;
::arrow::MemoryPool* pool;
Expand All @@ -118,5 +126,11 @@ struct ReaderContext {
}
};

Status TransferColumnData(::parquet::internal::RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const std::shared_ptr<::arrow::Field>& value_field,
const ColumnDescriptor* descr, const ReaderContext* ctx,
std::shared_ptr<::arrow::ChunkedArray>* out);

} // namespace arrow
} // namespace parquet

0 comments on commit 262d6f6

Please sign in to comment.