Skip to content

Commit

Permalink
apacheGH-35511: [C++] Util: add memory_pool in SwapEndianArrayData (a…
Browse files Browse the repository at this point in the history
…pache#36431)

### Rationale for this change

Add memory_pool argument in `SwapEndianArrayData` to avoid allocating all from default memory pool.

### What changes are included in this PR?

Add memory_pool argument in `SwapEndianArrayData` 

### Are these changes tested?

Maybe tested by ipc

### Are there any user-facing changes?

Maybe user would use the new interface

* Closes: apache#35511

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
mapleFU committed Jul 4, 2023
1 parent 42447e6 commit 6eebf79
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
19 changes: 12 additions & 7 deletions cpp/src/arrow/array/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class ArrayDataWrapper {

class ArrayDataEndianSwapper {
public:
explicit ArrayDataEndianSwapper(const std::shared_ptr<ArrayData>& data) : data_(data) {
explicit ArrayDataEndianSwapper(const std::shared_ptr<ArrayData>& data,
MemoryPool* pool)
: data_(data), pool_(pool) {
out_ = data->Copy();
}

Expand All @@ -100,7 +102,7 @@ class ArrayDataEndianSwapper {
Status SwapChildren(const FieldVector& child_fields) {
for (size_t i = 0; i < child_fields.size(); i++) {
ARROW_ASSIGN_OR_RAISE(out_->child_data[i],
internal::SwapEndianArrayData(data_->child_data[i]));
internal::SwapEndianArrayData(data_->child_data[i], pool_));
}
return Status::OK();
}
Expand All @@ -113,7 +115,7 @@ class ArrayDataEndianSwapper {
return in_buffer;
}
auto in_data = reinterpret_cast<const T*>(in_buffer->data());
ARROW_ASSIGN_OR_RAISE(auto out_buffer, AllocateBuffer(in_buffer->size()));
ARROW_ASSIGN_OR_RAISE(auto out_buffer, AllocateBuffer(in_buffer->size(), pool_));
auto out_data = reinterpret_cast<T*>(out_buffer->mutable_data());
// NOTE: data_->length not trusted (see warning above)
int64_t length = in_buffer->size() / sizeof(T);
Expand Down Expand Up @@ -149,7 +151,8 @@ class ArrayDataEndianSwapper {

Status Visit(const Decimal128Type& type) {
auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
ARROW_ASSIGN_OR_RAISE(auto new_buffer,
AllocateBuffer(data_->buffers[1]->size(), pool_));
auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
// NOTE: data_->length not trusted (see warning above)
const int64_t length = data_->buffers[1]->size() / Decimal128Type::kByteWidth;
Expand Down Expand Up @@ -209,7 +212,8 @@ class ArrayDataEndianSwapper {
Status Visit(const MonthDayNanoIntervalType& type) {
using MonthDayNanos = MonthDayNanoIntervalType::MonthDayNanos;
auto data = reinterpret_cast<const MonthDayNanos*>(data_->buffers[1]->data());
ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
ARROW_ASSIGN_OR_RAISE(auto new_buffer,
AllocateBuffer(data_->buffers[1]->size(), pool_));
auto new_data = reinterpret_cast<MonthDayNanos*>(new_buffer->mutable_data());
// NOTE: data_->length not trusted (see warning above)
const int64_t length = data_->buffers[1]->size() / sizeof(MonthDayNanos);
Expand Down Expand Up @@ -288,6 +292,7 @@ class ArrayDataEndianSwapper {
}

const std::shared_ptr<ArrayData>& data_;
MemoryPool* pool_;
std::shared_ptr<ArrayData> out_;
};

Expand All @@ -296,11 +301,11 @@ class ArrayDataEndianSwapper {
namespace internal {

Result<std::shared_ptr<ArrayData>> SwapEndianArrayData(
const std::shared_ptr<ArrayData>& data) {
const std::shared_ptr<ArrayData>& data, MemoryPool* pool) {
if (data->offset != 0) {
return Status::Invalid("Unsupported data format: data.offset != 0");
}
ArrayDataEndianSwapper swapper(data);
ArrayDataEndianSwapper swapper(data, pool);
RETURN_NOT_OK(swapper.SwapType(*data->type));
return std::move(swapper.out_);
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/array/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ namespace internal {
/// are not swapped by this function and should be handled separately.
///
/// \param[in] data the array contents
/// \param[in] pool the memory pool to allocate memory from
/// \return the resulting ArrayData whose elements were swapped
ARROW_EXPORT
Result<std::shared_ptr<ArrayData>> SwapEndianArrayData(
const std::shared_ptr<ArrayData>& data);
const std::shared_ptr<ArrayData>& data, MemoryPool* pool = default_memory_pool());

/// Given a number of ArrayVectors, treat each ArrayVector as the
/// chunks of a chunked array. Then rechunk each ArrayVector such that
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,8 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
if (context.swap_endian) {
for (int i = 0; i < static_cast<int>(filtered_columns.size()); ++i) {
ARROW_ASSIGN_OR_RAISE(filtered_columns[i],
arrow::internal::SwapEndianArrayData(filtered_columns[i]));
arrow::internal::SwapEndianArrayData(
filtered_columns[i], context.options.memory_pool));
}
}
return RecordBatch::Make(std::move(filtered_schema), metadata->length(),
Expand Down Expand Up @@ -823,7 +824,8 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,

// swap endian in dict_data if necessary (swap_endian == true)
if (context.swap_endian) {
ARROW_ASSIGN_OR_RAISE(dict_data, ::arrow::internal::SwapEndianArrayData(dict_data));
ARROW_ASSIGN_OR_RAISE(dict_data, ::arrow::internal::SwapEndianArrayData(
dict_data, context.options.memory_pool));
}

if (dictionary_batch->isDelta()) {
Expand Down Expand Up @@ -1667,8 +1669,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
// swap endian in a set of ArrayData if necessary (swap_endian == true)
if (context.swap_endian) {
for (int i = 0; i < static_cast<int>(filtered_columns.size()); ++i) {
ARROW_ASSIGN_OR_RAISE(filtered_columns[i], arrow::internal::SwapEndianArrayData(
filtered_columns[i]));
ARROW_ASSIGN_OR_RAISE(filtered_columns[i],
arrow::internal::SwapEndianArrayData(
filtered_columns[i], context.options.memory_pool));
}
}
return RecordBatch::Make(std::move(filtered_schema), length,
Expand Down

0 comments on commit 6eebf79

Please sign in to comment.