Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions arrow-arith/src/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
//! [here](https://doc.rust-lang.org/stable/core/arch/) for more information.

use arrow_array::*;
use arrow_buffer::buffer::{bitwise_bin_op_helper, bitwise_quaternary_op_helper};
use arrow_buffer::{BooleanBuffer, NullBuffer, buffer_bin_and_not};
use arrow_buffer::buffer::bitwise_quaternary_op_helper;
use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, buffer_bin_and_not};
use arrow_schema::ArrowError;

/// Logical 'and' boolean values with Kleene logic
Expand Down Expand Up @@ -74,7 +74,7 @@ pub fn and_kleene(left: &BooleanArray, right: &BooleanArray) -> Result<BooleanAr
// The final null bit is set only if:
// 1. left null bit is set, or
// 2. right data bit is false (because null AND false = false).
Some(bitwise_bin_op_helper(
Some(Buffer::from_bitwise_binary_op(
left_null_buffer.buffer(),
left_null_buffer.offset(),
right_values.inner(),
Expand All @@ -85,7 +85,7 @@ pub fn and_kleene(left: &BooleanArray, right: &BooleanArray) -> Result<BooleanAr
}
(None, Some(right_null_buffer)) => {
// Same as above
Some(bitwise_bin_op_helper(
Some(Buffer::from_bitwise_binary_op(
right_null_buffer.buffer(),
right_null_buffer.offset(),
left_values.inner(),
Expand Down Expand Up @@ -169,7 +169,7 @@ pub fn or_kleene(left: &BooleanArray, right: &BooleanArray) -> Result<BooleanArr
// The final null bit is set only if:
// 1. left null bit is set, or
// 2. right data bit is true (because null OR true = true).
Some(bitwise_bin_op_helper(
Some(Buffer::from_bitwise_binary_op(
left_nulls.buffer(),
left_nulls.offset(),
right_values.inner(),
Expand All @@ -180,7 +180,7 @@ pub fn or_kleene(left: &BooleanArray, right: &BooleanArray) -> Result<BooleanArr
}
(None, Some(right_nulls)) => {
// Same as above
Some(bitwise_bin_op_helper(
Some(Buffer::from_bitwise_binary_op(
right_nulls.buffer(),
right_nulls.offset(),
left_values.inner(),
Expand Down
155 changes: 149 additions & 6 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ use std::sync::Arc;

use crate::BufferBuilder;
use crate::alloc::{Allocation, Deallocation};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{bit_util, bytes::Bytes, native::ArrowNativeType};

use crate::bit_util::ceil;
#[cfg(feature = "pool")]
use crate::pool::MemoryPool;
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{bit_util, bytes::Bytes, native::ArrowNativeType};

use super::ops::bitwise_unary_op_helper;
use super::{MutableBuffer, ScalarBuffer};

/// A contiguous memory region that can be shared with other buffers and across
Expand Down Expand Up @@ -115,6 +114,150 @@ impl Buffer {
Self::from(bytes)
}

/// Create a new [`Buffer`] by applying the bitwise operation `op` to two input buffers.
///
/// This function is highly optimized for bitwise operations on large
/// bitmaps by processing input buffers in chunks of 64 bits (8 bytes) at a
/// time, and thus is much faster than applying the operation bit by bit.
///
/// # Notes:
/// * `op` takes two `u64` inputs and produces one `u64` output,
/// operating on 64 bits at a time. **It must only apply bitwise operations
/// on the relevant bits, as the input `u64` may contain irrelevant bits
/// and may be processed differently on different endian architectures.**
/// * The inputs are treated as bitmaps, meaning that offsets and length
/// are specified in number of bits.
/// * The output always has zero offset
///
/// # See Also
/// - [`Buffer::from_bitwise_unary_op`] for unary operations on a single input buffer.
/// - [`apply_bitwise_binary_op`](bit_util::apply_bitwise_binary_op) for in-place binary bitwise operations
///
/// # Example: Create new [`Buffer`] from bitwise `AND` of two [`Buffer`]s
/// ```
/// # use arrow_buffer::Buffer;
/// let left = Buffer::from(&[0b11001100u8, 0b10111010u8]); // 2 bytes = 16 bits
/// let right = Buffer::from(&[0b10101010u8, 0b11011100u8, 0b11110000u8]); // 3 bytes = 24 bits
/// // AND of the first 12 bits
/// let result = Buffer::from_bitwise_binary_op(
/// &left, 0, &right, 0, 12, |a, b| a & b
/// );
/// assert_eq!(result.as_slice(), &[0b10001000u8, 0b00001000u8]);
/// ```
///
/// # Example: Create new [`Buffer`] from bitwise `OR` of two byte slices
/// ```
/// # use arrow_buffer::Buffer;
/// let left = [0b11001100u8, 0b10111010u8];
/// let right = [0b10101010u8, 0b11011100u8];
/// // OR of bits 4..16 from left and bits 0..12 from right
/// let result = Buffer::from_bitwise_binary_op(
/// &left, 4, &right, 0, 12, |a, b| a | b
/// );
/// assert_eq!(result.as_slice(), &[0b10101110u8, 0b00001111u8]);
/// ```
pub fn from_bitwise_binary_op<F>(
left: impl AsRef<[u8]>,
left_offset_in_bits: usize,
right: impl AsRef<[u8]>,
right_offset_in_bits: usize,
len_in_bits: usize,
mut op: F,
) -> Buffer
where
F: FnMut(u64, u64) -> u64,
{
let left_chunks = BitChunks::new(left.as_ref(), left_offset_in_bits, len_in_bits);
let right_chunks = BitChunks::new(right.as_ref(), right_offset_in_bits, len_in_bits);

let chunks = left_chunks
.iter()
.zip(right_chunks.iter())
.map(|(left, right)| op(left, right));
// Soundness: `BitChunks` is a `BitChunks` iterator which
// correctly reports its upper bound
let mut buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) };

let remainder_bytes = ceil(left_chunks.remainder_len(), 8);
let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits());
// we are counting its starting from the least significant bit, to to_le_bytes should be correct
let rem = &rem.to_le_bytes()[0..remainder_bytes];
buffer.extend_from_slice(rem);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might do an extra allocation? Other places avoid this by preallocating the final u64 needed for the remainder as well (collect_bool)

Copy link
Contributor Author

@alamb alamb Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good call -- I will make the change

However, this is same code as how the current bitwise_binary_op does it, so I would expect no performance difference 🤔

https://github.com/apache/arrow-rs/pull/8854/files#diff-e7a951ab8abfeef1016ed4427a3aef25be5be470454caa1e1dd93e56968316b5L122

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, however allocations during benchmarking seems to make benchmarking very noisy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I tried this

  pub fn from_bitwise_binary_op<F>(
        left: impl AsRef<[u8]>,
        left_offset_in_bits: usize,
        right: impl AsRef<[u8]>,
        right_offset_in_bits: usize,
        len_in_bits: usize,
        mut op: F,
    ) -> Buffer
    where
        F: FnMut(u64, u64) -> u64,
    {
        let left_chunks = BitChunks::new(left.as_ref(), left_offset_in_bits, len_in_bits);
        let right_chunks = BitChunks::new(right.as_ref(), right_offset_in_bits, len_in_bits);

        let remainder_bytes = ceil(left_chunks.remainder_len(), 8);
        // if it evenly divides into u64 chunks
        let buffer = if remainder_bytes == 0 {
            let chunks = left_chunks
                .iter()
                .zip(right_chunks.iter())
                .map(|(left, right)| op(left, right));
            // Soundness: `BitChunks` is a `BitChunks` iterator which
            // correctly reports its upper bound
            unsafe { MutableBuffer::from_trusted_len_iter(chunks) }
        } else {
            // Compute last u64 here so that we can reserve exact capacity
            let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits());

            let chunks = left_chunks
                .iter()
                .zip(right_chunks.iter())
                .map(|(left, right)| op(left, right))
                .chain(std::iter::once(rem));
            // Soundness: `BitChunks` is a `BitChunks` iterator which
            // correctly reports its upper bound, and so is the `chain` iterator
            let mut buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) };
            // Adjust the length down if last u64 is not fully used
            let extra_bytes = 8 - remainder_bytes;
            buffer.truncate(buffer.len() - extra_bytes);
            buffer
        };
        buffer.into()
    }

But it seems to be slower.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried making a version of MutableBuffer::from_trusted_len_iter that also added additional and it didn't seem to help either (perhaps because the benchmarks happen to avoid reallocation 🤔 )

    /// Like [`from_trusted_len_iter`] but can add additional capacity at the end
    /// in case the caller wants to add more data after the initial iterator.
    #[inline]
    pub unsafe fn from_trusted_len_iter_with_additional_capacity<T: ArrowNativeType, I: Iterator<Item = T>>(
        iterator: I,
        additional_capacity: usize,
    ) -> Self {
        let item_size = std::mem::size_of::<T>();
        let (_, upper) = iterator.size_hint();
        let upper = upper.expect("from_trusted_len_iter requires an upper limit");
        let len = upper * item_size;

        let mut buffer = MutableBuffer::new(len + additional_capacity);

        let mut dst = buffer.data.as_ptr();
        for item in iterator {
            // note how there is no reserve here (compared with `extend_from_iter`)
            let src = item.to_byte_slice().as_ptr();
            unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
            dst = unsafe { dst.add(item_size) };
        }
        assert_eq!(
            unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
            len,
            "Trusted iterator length was not accurately reported"
        );
        buffer.len = len;
        buffer
    }

Copy link
Contributor

@Dandandan Dandandan Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also a extend from trusted len iter in MutableBuffer? Other option is to use Vec::extend here as well.


buffer.into()
}

/// Create a new [`Buffer`] by applying the bitwise operation to `op` to an input buffer.
///
/// This function is highly optimized for bitwise operations on large
/// bitmaps by processing input buffers in chunks of 64 bits (8 bytes) at a
/// time, and thus is much faster than applying the operation bit by bit.
///
/// # Notes:
/// * `op` takes two `u64` inputs and produces one `u64` output,
/// operating on 64 bits at a time. **It must only apply bitwise operations
/// on the relevant bits, as the input `u64` may contain irrelevant bits
/// and may be processed differently on different endian architectures.**
/// * The inputs are treated as bitmaps, meaning that offsets and length
/// are specified in number of bits.
/// * The output always has zero offset
///
/// # See Also
/// - [`Buffer::from_bitwise_binary_op`] for binary operations on a single input buffer.
/// - [`apply_bitwise_unary_op`](bit_util::apply_bitwise_unary_op) for in-place unary bitwise operations
///
/// # Example: Create new [`Buffer`] from bitwise `NOT` of an input [`Buffer`]
/// ```
/// # use arrow_buffer::Buffer;
/// let input = Buffer::from(&[0b11001100u8, 0b10111010u8]); // 2 bytes = 16 bits
/// // NOT of the first 12 bits
/// let result = Buffer::from_bitwise_unary_op(
/// &input, 0, 12, |a| !a
/// );
/// assert_eq!(result.as_slice(), &[0b00110011u8, 0b11110101u8]);
/// ```
///
/// # Example: Create a new [`Buffer`] copying a bit slice from in input slice
/// ```
/// # use arrow_buffer::Buffer;
/// let input = [0b11001100u8, 0b10111010u8];
/// // // Copy bits 4..16 from input
/// let result = Buffer::from_bitwise_unary_op(
/// &input, 4, 12, |a| a
/// );
/// assert_eq!(result.as_slice(), &[0b10101100u8, 0b00001011u8], "[{:08b}, {:08b}]", result.as_slice()[0], result.as_slice()[1]);
pub fn from_bitwise_unary_op<F>(
left: impl AsRef<[u8]>,
offset_in_bits: usize,
len_in_bits: usize,
mut op: F,
) -> Buffer
where
F: FnMut(u64) -> u64,
{
// reserve capacity and set length so we can get a typed view of u64 chunks
let mut result =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we overwrite the results, we shouldn't need to initialize/zero out the array.

MutableBuffer::new(ceil(len_in_bits, 8)).with_bitset(len_in_bits / 64 * 8, false);

let left_chunks = BitChunks::new(left.as_ref(), offset_in_bits, len_in_bits);

let result_chunks = result.typed_data_mut::<u64>().iter_mut();

result_chunks
.zip(left_chunks.iter())
.for_each(|(res, left)| {
*res = op(left);
});

let remainder_bytes = ceil(left_chunks.remainder_len(), 8);
let rem = op(left_chunks.remainder_bits());
// we are counting its starting from the least significant bit, to to_le_bytes should be correct
let rem = &rem.to_le_bytes()[0..remainder_bytes];
result.extend_from_slice(rem);

result.into()
}

/// Returns the offset, in bytes, of `Self::ptr` to `Self::data`
///
/// self.ptr and self.data can be different after slicing or advancing the buffer.
Expand Down Expand Up @@ -344,10 +487,10 @@ impl Buffer {
return self.slice_with_length(offset / 8, bit_util::ceil(len, 8));
}

bitwise_unary_op_helper(self, offset, len, |a| a)
Self::from_bitwise_unary_op(self, offset, len, |a| a)
}

/// Returns a `BitChunks` instance which can be used to iterate over this buffers bits
/// Returns a `BitChunks` instance which can be used to iterate over this buffer's bits
/// in larger chunks and starting at arbitrary bit offsets.
/// Note that both `offset` and `length` are measured in bits.
pub fn bit_chunks(&self, offset: usize, len: usize) -> BitChunks<'_> {
Expand Down
64 changes: 18 additions & 46 deletions arrow-buffer/src/buffer/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,69 +60,41 @@ where

/// Apply a bitwise operation `op` to two inputs and return the result as a Buffer.
/// The inputs are treated as bitmaps, meaning that offsets and length are specified in number of bits.
#[deprecated(since = "57.1.0", note = "use Buffer::from_bitwise_binary_op instead")]
pub fn bitwise_bin_op_helper<F>(
left: &Buffer,
left_offset_in_bits: usize,
right: &Buffer,
right_offset_in_bits: usize,
len_in_bits: usize,
mut op: F,
op: F,
) -> Buffer
where
F: FnMut(u64, u64) -> u64,
{
let left_chunks = left.bit_chunks(left_offset_in_bits, len_in_bits);
let right_chunks = right.bit_chunks(right_offset_in_bits, len_in_bits);

let chunks = left_chunks
.iter()
.zip(right_chunks.iter())
.map(|(left, right)| op(left, right));
// Soundness: `BitChunks` is a `BitChunks` iterator which
// correctly reports its upper bound
let mut buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) };

let remainder_bytes = ceil(left_chunks.remainder_len(), 8);
let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits());
// we are counting its starting from the least significant bit, to to_le_bytes should be correct
let rem = &rem.to_le_bytes()[0..remainder_bytes];
buffer.extend_from_slice(rem);

buffer.into()
Buffer::from_bitwise_binary_op(
left,
left_offset_in_bits,
right,
right_offset_in_bits,
len_in_bits,
op,
)
}

/// Apply a bitwise operation `op` to one input and return the result as a Buffer.
/// The input is treated as a bitmap, meaning that offset and length are specified in number of bits.
#[deprecated(since = "57.1.0", note = "use Buffer::from_bitwise_unary_op instead")]
pub fn bitwise_unary_op_helper<F>(
left: &Buffer,
offset_in_bits: usize,
len_in_bits: usize,
mut op: F,
op: F,
) -> Buffer
where
F: FnMut(u64) -> u64,
{
// reserve capacity and set length so we can get a typed view of u64 chunks
let mut result =
MutableBuffer::new(ceil(len_in_bits, 8)).with_bitset(len_in_bits / 64 * 8, false);

let left_chunks = left.bit_chunks(offset_in_bits, len_in_bits);

let result_chunks = result.typed_data_mut::<u64>().iter_mut();

result_chunks
.zip(left_chunks.iter())
.for_each(|(res, left)| {
*res = op(left);
});

let remainder_bytes = ceil(left_chunks.remainder_len(), 8);
let rem = op(left_chunks.remainder_bits());
// we are counting its starting from the least significant bit, to to_le_bytes should be correct
let rem = &rem.to_le_bytes()[0..remainder_bytes];
result.extend_from_slice(rem);

result.into()
Buffer::from_bitwise_unary_op(left, offset_in_bits, len_in_bits, op)
}

/// Apply a bitwise and to two inputs and return the result as a Buffer.
Expand All @@ -134,7 +106,7 @@ pub fn buffer_bin_and(
right_offset_in_bits: usize,
len_in_bits: usize,
) -> Buffer {
bitwise_bin_op_helper(
Buffer::from_bitwise_binary_op(
left,
left_offset_in_bits,
right,
Expand All @@ -153,7 +125,7 @@ pub fn buffer_bin_or(
right_offset_in_bits: usize,
len_in_bits: usize,
) -> Buffer {
bitwise_bin_op_helper(
Buffer::from_bitwise_binary_op(
left,
left_offset_in_bits,
right,
Expand All @@ -172,7 +144,7 @@ pub fn buffer_bin_xor(
right_offset_in_bits: usize,
len_in_bits: usize,
) -> Buffer {
bitwise_bin_op_helper(
Buffer::from_bitwise_binary_op(
left,
left_offset_in_bits,
right,
Expand All @@ -191,7 +163,7 @@ pub fn buffer_bin_and_not(
right_offset_in_bits: usize,
len_in_bits: usize,
) -> Buffer {
bitwise_bin_op_helper(
Buffer::from_bitwise_binary_op(
left,
left_offset_in_bits,
right,
Expand All @@ -204,5 +176,5 @@ pub fn buffer_bin_and_not(
/// Apply a bitwise not to one input and return the result as a Buffer.
/// The input is treated as a bitmap, meaning that offset and length are specified in number of bits.
pub fn buffer_unary_not(left: &Buffer, offset_in_bits: usize, len_in_bits: usize) -> Buffer {
bitwise_unary_op_helper(left, offset_in_bits, len_in_bits, |a| !a)
Buffer::from_bitwise_unary_op(left, offset_in_bits, len_in_bits, |a| !a)
}
7 changes: 3 additions & 4 deletions arrow-select/src/nullif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
//! Implements the `nullif` function for Arrow arrays.

use arrow_array::{Array, ArrayRef, BooleanArray, make_array};
use arrow_buffer::buffer::{bitwise_bin_op_helper, bitwise_unary_op_helper};
use arrow_buffer::{BooleanBuffer, NullBuffer};
use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer};
use arrow_schema::{ArrowError, DataType};

/// Returns a new array with the same values and the validity bit to false where
Expand Down Expand Up @@ -75,7 +74,7 @@ pub fn nullif(left: &dyn Array, right: &BooleanArray) -> Result<ArrayRef, ArrowE
let (combined, null_count) = match left_data.nulls() {
Some(left) => {
let mut valid_count = 0;
let b = bitwise_bin_op_helper(
let b = Buffer::from_bitwise_binary_op(
left.buffer(),
left.offset(),
right.inner(),
Expand All @@ -91,7 +90,7 @@ pub fn nullif(left: &dyn Array, right: &BooleanArray) -> Result<ArrayRef, ArrowE
}
None => {
let mut null_count = 0;
let buffer = bitwise_unary_op_helper(right.inner(), right.offset(), len, |b| {
let buffer = Buffer::from_bitwise_unary_op(right.inner(), right.offset(), len, |b| {
let t = !b;
null_count += t.count_zeros() as usize;
t
Expand Down
Loading