Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arrow-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ all-features = true
[features]
ffi = ["arrow-schema/ffi", "arrow-data/ffi"]
force_validate = []
pool = ["arrow-buffer/pool", "arrow-data/pool"]

[dev-dependencies]
rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] }
Expand Down
38 changes: 38 additions & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,34 @@ pub trait Array: std::fmt::Debug + Send + Sync {
/// This value will always be greater than returned by `get_buffer_memory_size()` and
/// includes the overhead of the data structures that contain the pointers to the various buffers.
fn get_array_memory_size(&self) -> usize;

/// Claim memory used by this array in the provided memory pool.
///
/// This recursively claims memory for:
/// - All data buffers in this array
/// - All child arrays (for nested types like List, Struct, etc.)
/// - The null bitmap buffer if present
///
/// This method guarantees that the memory pool will only compute occupied memory
/// exactly once. For example, if this array is derived from operations like `slice`,
/// calling `claim` on it would not change the memory pool's usage if the underlying buffers
/// are already counted before.
///
/// # Example
/// ```
/// # use arrow_array::{Int32Array, Array};
/// # use arrow_buffer::TrackingMemoryPool;
///
/// let array = Int32Array::from(vec![1, 2, 3, 4, 5]);
/// let pool = TrackingMemoryPool::default();
///
/// // Claim the array's memory in the pool
/// array.claim(&pool);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add an example (either here or elsewhere) of how one would use claim?

For example, if we now did

let array2 = array1.slice(0, 1);

Is the idea that now array2.array_memory_size() would be zero?

/// ```
#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.to_data().claim(pool)
}
}

/// A reference-counted reference to a generic `Array`
Expand Down Expand Up @@ -419,6 +447,11 @@ impl Array for ArrayRef {
fn get_array_memory_size(&self) -> usize {
self.as_ref().get_array_memory_size()
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.as_ref().claim(pool)
}
}

impl<T: Array> Array for &T {
Expand Down Expand Up @@ -489,6 +522,11 @@ impl<T: Array> Array for &T {
fn get_array_memory_size(&self) -> usize {
T::get_array_memory_size(self)
}

#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
T::claim(self, pool)
}
}

/// A generic trait for accessing the values of an [`Array`]
Expand Down
7 changes: 7 additions & 0 deletions arrow-buffer/src/buffer/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ impl NullBuffer {
pub fn buffer(&self) -> &Buffer {
self.buffer.inner()
}

/// Claim memory used by this null buffer in the provided memory pool.
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn crate::MemoryPool) {
// NullBuffer wraps a BooleanBuffer which wraps a Buffer
self.buffer.inner().claim(pool);
}
}

impl<'a> IntoIterator for &'a NullBuffer {
Expand Down
2 changes: 2 additions & 0 deletions arrow-data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ bench = false
force_validate = []
# Enable ffi support
ffi = ["arrow-schema/ffi"]
# Enable memory pool support
pool = ["arrow-buffer/pool"]

[package.metadata.docs.rs]
all-features = true
Expand Down
24 changes: 24 additions & 0 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,30 @@ impl ArrayData {
pub fn into_builder(self) -> ArrayDataBuilder {
self.into()
}

/// Claim memory used by this ArrayData in the provided memory pool.
///
/// This claims memory for:
/// - All buffers in self.buffers
/// - All child ArrayData recursively
/// - The null buffer if present
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
// Claim all data buffers
for buffer in &self.buffers {
buffer.claim(pool);
}

// Claim null buffer if present
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}

// Recursively claim child data
for child in &self.child_data {
child.claim(pool);
}
}
}

/// Return the expected [`DataTypeLayout`] Arrays of this data
Expand Down
7 changes: 7 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ force_validate = ["arrow-array/force_validate", "arrow-data/force_validate"]
ffi = ["arrow-schema/ffi", "arrow-data/ffi", "arrow-array/ffi"]
chrono-tz = ["arrow-array/chrono-tz"]
canonical_extension_types = ["arrow-schema/canonical_extension_types"]
# Enable memory tracking support
pool = ["arrow-array/pool"]

[dev-dependencies]
chrono = { workspace = true }
Expand Down Expand Up @@ -114,6 +116,11 @@ name = "zero_copy_ipc"
required-features = ["prettyprint"]
path = "examples/zero_copy_ipc.rs"

[[example]]
name = "memory_tracking"
required-features = ["pool"]
path = "examples/memory_tracking.rs"

[[bench]]
name = "aggregate_kernels"
harness = false
Expand Down
65 changes: 65 additions & 0 deletions arrow/examples/memory_tracking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Example demonstrating the Array memory tracking functionality
use arrow_array::{Array, Int32Array, ListArray};
use arrow_buffer::{MemoryPool, TrackingMemoryPool};
use arrow_schema::{DataType, Field};
use std::sync::Arc;

fn main() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we write this as a (doc?)test instead (or as well)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please -- I think it would be much easier to find as an doc test -- perhaps you could just move it to Array::claim

let pool = TrackingMemoryPool::default();

println!("Arrow Array Memory Tracking Example");
println!("===================================");

// Basic array memory tracking
let array = Int32Array::from(vec![1, 2, 3, 4, 5]);
array.claim(&pool);
println!("Int32Array (5 elements): {} bytes", pool.used());

// Nested array (recursive tracking)
let offsets = arrow_buffer::OffsetBuffer::new(vec![0, 2, 4].into());
let field = Arc::new(Field::new("item", DataType::Int32, false));
let list_array = ListArray::new(field, offsets, Arc::new(array), None);

let before_list = pool.used();
list_array.claim(&pool);
let after_list = pool.used();
println!("ListArray (nested): +{} bytes", after_list - before_list);

// No double-counting for derived arrays
let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
large_array.claim(&pool);
let original_usage = pool.used();
println!("Original array (1000 elements): {original_usage} bytes");

// Create and claim slices - should not increase memory usage
let slice1 = large_array.slice(0, 100);
let slice2 = large_array.slice(500, 200);

slice1.claim(&pool);
slice2.claim(&pool);
let final_usage = pool.used();

println!("After claiming 2 slices: {final_usage} bytes");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these should actually test the bytes used (not just print them out)

println!(
"Increase: {} bytes (slices share the same buffer!)",
final_usage - original_usage
);
}
Loading