diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 2ddf812f9c39..75f5018915bd 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +mod mask; + use crate::arrow::ProjectionMask; +use crate::arrow::arrow_reader::selection::mask::BooleanRowSelection; use crate::errors::ParquetError; use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use arrow_array::{Array, BooleanArray}; @@ -135,12 +138,285 @@ impl RowSelector { /// * Consecutive [`RowSelector`]s alternate skipping or selecting rows /// /// [`PageIndex`]: crate::file::page_index::column_index::ColumnIndexMetaData +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum RowSelection { + Selectors(RowSelectorRowSelection), + Mask(BooleanRowSelection), +} +impl Default for RowSelection { + fn default() -> Self { + Self::Selectors(RowSelectorRowSelection::default()) + } +} + +impl RowSelection { + /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] + /// + /// # Panic + /// + /// Panics if any of the [`BooleanArray`] contain nulls + pub fn from_filters(filters: &[BooleanArray]) -> Self { + // TODO decide how to do this based on density or something?? + Self::Selectors(RowSelectorRowSelection::from_filters(filters)) + } + + /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep + pub fn from_consecutive_ranges>>( + ranges: I, + total_rows: usize, + ) -> Self { + // todo should this be decided based on density or something?? + Self::Selectors(RowSelectorRowSelection::from_consecutive_ranges( + ranges, total_rows, + )) + } + + /// Given an offset index, return the byte ranges for all data pages selected by `self` + /// + /// This is useful for determining what byte ranges to fetch from underlying storage + /// + /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce + /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, + /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) + pub fn scan_ranges(&self, page_locations: &[PageLocation]) -> Vec> { + match self { + Self::Selectors(selection) => selection.scan_ranges(page_locations), + Self::Mask(_mask) => { + todo!() + } + } + } + + /// Returns true if selectors should be forced, preventing mask materialisation + pub(crate) fn should_force_selectors( + &self, + projection: &ProjectionMask, + offset_index: Option<&[OffsetIndexMetaData]>, + ) -> bool { + match self { + Self::Selectors(selectors) => { + selectors.should_force_selectors(projection, offset_index) + } + Self::Mask(_) => { + todo!() + } + } + } + + /// Splits off the first `row_count` from this [`RowSelection`] + pub fn split_off(&mut self, row_count: usize) -> Self { + match self { + Self::Selectors(selection) => Self::Selectors(selection.split_off(row_count)), + Self::Mask(_mask) => { + todo!() + } + } + } + + /// returns a [`RowSelection`] representing rows that are selected in both + /// input [`RowSelection`]s. + /// + /// This is equivalent to the logical `AND` / conjunction of the two + /// selections. + /// + /// # Example + /// If `N` means the row is not selected, and `Y` means it is + /// selected: + /// + /// ```text + /// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY + /// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN + /// + /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN + /// ``` + /// + /// # Panics + /// + /// Panics if `other` does not have a length equal to the number of rows selected + /// by this RowSelection + /// + pub fn and_then(&self, other: &Self) -> Self { + match (self, other) { + (Self::Selectors(left), Self::Selectors(right)) => { + Self::Selectors(left.and_then(right)) + } + (Self::Mask(left), Self::Mask(right)) => Self::Mask(left.and_then(right)), + // need to convert one to the other + _ => { + todo!() + } + } + } + + /// Compute the intersection of two [`RowSelection`] + /// For example: + /// self: NNYYYYNNYYNYN + /// other: NYNNNNNNY + /// + /// returned: NNNNNNNNYYNYN + pub fn intersection(&self, other: &Self) -> Self { + match (self, other) { + (Self::Selectors(left), Self::Selectors(right)) => { + Self::Selectors(intersect_row_selections(&left.selectors, &right.selectors)) + } + (Self::Mask(left), Self::Mask(right)) => Self::Mask(left.intersection(right)), + // need to convert one to the other + _ => { + todo!() + } + } + } + + /// Compute the union of two [`RowSelection`] + /// For example: + /// self: NNYYYYNNYYNYN + /// other: NYNNNNNNN + /// + /// returned: NYYYYYNNYYNYN + pub fn union(&self, other: &Self) -> Self { + match (self, other) { + (Self::Selectors(left), Self::Selectors(right)) => { + Self::Selectors(union_row_selections(&left.selectors, &right.selectors)) + } + (Self::Mask(left), Self::Mask(right)) => Self::Mask(left.union(right)), + // need to convert one to the other + _ => { + todo!() + } + } + } + + /// Returns `true` if this [`RowSelection`] selects any rows + pub fn selects_any(&self) -> bool { + match self { + Self::Selectors(selection) => selection.selects_any(), + Self::Mask(mask) => mask.selects_any(), + } + } + + /// Trims this [`RowSelection`] removing any trailing skips + pub(crate) fn trim(self) -> Self { + match self { + Self::Selectors(selection) => Self::Selectors(selection.trim()), + Self::Mask(_mask) => { + todo!() + } + } + } + + /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows + pub(crate) fn offset(self, offset: usize) -> Self { + match self { + Self::Selectors(selection) => Self::Selectors(selection.offset(offset)), + Self::Mask(_mask) => { + todo!() + } + } + } + + /// Limit this [`RowSelection`] to only select `limit` rows + pub(crate) fn limit(self, limit: usize) -> Self { + match self { + Self::Selectors(selection) => Self::Selectors(selection.limit(limit)), + Self::Mask(_mask) => { + todo!() + } + } + } + + /// Returns an iterator over the [`RowSelector`]s for this + /// [`RowSelection`]. + pub fn iter(&self) -> impl Iterator { + match self { + Self::Selectors(selection) => selection.iter(), + Self::Mask(_mask) => { + todo!() + } + } + } + + /// Returns the number of selected rows + pub fn row_count(&self) -> usize { + match self { + Self::Selectors(selection) => selection.row_count(), + Self::Mask(mask) => mask.row_count(), + } + } + + /// Returns the number of de-selected rows + pub fn skipped_row_count(&self) -> usize { + match self { + Self::Selectors(selection) => selection.skipped_row_count(), + Self::Mask(mask) => { + todo!() + } + } + } + + /// Expands the selection to align with batch boundaries. + /// This is needed when using cached array readers to ensure that + /// the cached data covers full batches. + pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self { + match self { + Self::Selectors(selection) => { + Self::Selectors(selection.expand_to_batch_boundaries(batch_size, total_rows)) + } + Self::Mask(mask) => { + todo!() + } + } + } +} + +impl From> for RowSelection { + fn from(selectors: Vec) -> Self { + Self::Selectors(selectors.into_iter().collect()) + } +} + +impl FromIterator for RowSelection { + fn from_iter>(iter: T) -> Self { + Self::Selectors(iter.into_iter().collect()) + } +} +impl From for Vec { + fn from(r: RowSelection) -> Self { + match r { + RowSelection::Selectors(selection) => selection.into(), + RowSelection::Mask(_) => { + todo!() + } + } + } +} + +impl From for VecDeque { + fn from(r: RowSelection) -> Self { + match r { + RowSelection::Selectors(selection) => selection.into(), + RowSelection::Mask(_) => { + todo!() + } + } + } +} + +// TODO move to its own module +/// Selection based on Vec +/// +/// This represents the result of a filter evaluation as a series of ranges. +/// It is more efficient for large contiguous selections or skips. +/// +/// It is similar to the "Range Index" described in +/// [Predicate Caching: Query-Driven Secondary Indexing for Cloud Data Warehouses] +/// +/// [Predicate Caching: Query-Driven Secondary Indexing for Cloud Data Warehouses]: https://dl.acm.org/doi/10.1145/3626246.3653395 #[derive(Debug, Clone, Default, Eq, PartialEq)] -pub struct RowSelection { +struct RowSelectorRowSelection { selectors: Vec, } -impl RowSelection { +impl RowSelectorRowSelection { /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] /// /// # Panic @@ -567,13 +843,13 @@ impl RowSelection { } } -impl From> for RowSelection { +impl From> for RowSelectorRowSelection { fn from(selectors: Vec) -> Self { selectors.into_iter().collect() } } -impl FromIterator for RowSelection { +impl FromIterator for RowSelectorRowSelection { fn from_iter>(iter: T) -> Self { let iter = iter.into_iter(); @@ -603,14 +879,14 @@ impl FromIterator for RowSelection { } } -impl From for Vec { - fn from(r: RowSelection) -> Self { +impl From for Vec { + fn from(r: RowSelectorRowSelection) -> Self { r.selectors } } -impl From for VecDeque { - fn from(r: RowSelection) -> Self { +impl From for VecDeque { + fn from(r: RowSelectorRowSelection) -> Self { r.selectors.into() } } @@ -621,7 +897,10 @@ impl From for VecDeque { /// other: NYNNNNNNY /// /// returned: NNNNNNNNYYNYN -fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection { +fn intersect_row_selections( + left: &[RowSelector], + right: &[RowSelector], +) -> RowSelectorRowSelection { let mut l_iter = left.iter().copied().peekable(); let mut r_iter = right.iter().copied().peekable(); @@ -683,7 +962,7 @@ fn intersect_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowS /// returned: NYYYYYNNYYNYN /// /// This can be removed from here once RowSelection::union is in parquet::arrow -fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelection { +fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelectorRowSelection { let mut l_iter = left.iter().copied().peekable(); let mut r_iter = right.iter().copied().peekable(); @@ -946,14 +1225,14 @@ mod tests { BooleanArray::from(Vec::::new()), ]; - let selection = RowSelection::from_filters(&filters[..1]); + let selection = RowSelectorRowSelection::from_filters(&filters[..1]); assert!(selection.selects_any()); assert_eq!( selection.selectors, vec![RowSelector::skip(3), RowSelector::select(4)] ); - let selection = RowSelection::from_filters(&filters[..2]); + let selection = RowSelectorRowSelection::from_filters(&filters[..2]); assert!(selection.selects_any()); assert_eq!( selection.selectors, @@ -965,7 +1244,7 @@ mod tests { ] ); - let selection = RowSelection::from_filters(&filters); + let selection = RowSelectorRowSelection::from_filters(&filters); assert!(selection.selects_any()); assert_eq!( selection.selectors, @@ -978,14 +1257,14 @@ mod tests { ] ); - let selection = RowSelection::from_filters(&filters[2..3]); + let selection = RowSelectorRowSelection::from_filters(&filters[2..3]); assert!(!selection.selects_any()); assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); } #[test] fn test_split_off() { - let mut selection = RowSelection::from(vec![ + let mut selection = RowSelectorRowSelection::from(vec![ RowSelector::skip(34), RowSelector::select(12), RowSelector::skip(3), @@ -1034,7 +1313,7 @@ mod tests { #[test] fn test_offset() { - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ RowSelector::select(5), RowSelector::skip(23), RowSelector::select(7), @@ -1092,21 +1371,21 @@ mod tests { #[test] fn test_and() { - let mut a = RowSelection::from(vec![ + let mut a = RowSelectorRowSelection::from(vec![ RowSelector::skip(12), RowSelector::select(23), RowSelector::skip(3), RowSelector::select(5), ]); - let b = RowSelection::from(vec![ + let b = RowSelectorRowSelection::from(vec![ RowSelector::select(5), RowSelector::skip(4), RowSelector::select(15), RowSelector::skip(4), ]); - let mut expected = RowSelection::from(vec![ + let mut expected = RowSelectorRowSelection::from(vec![ RowSelector::skip(12), RowSelector::select(5), RowSelector::skip(4), @@ -1122,9 +1401,9 @@ mod tests { expected.split_off(7); assert_eq!(a.and_then(&b), expected); - let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]); + let a = RowSelectorRowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]); - let b = RowSelection::from(vec![ + let b = RowSelectorRowSelection::from(vec![ RowSelector::select(2), RowSelector::skip(1), RowSelector::select(1), @@ -1170,70 +1449,70 @@ mod tests { RowSelector::skip(0), ]; - let expected = RowSelection::from(vec![ + let expected = RowSelectorRowSelection::from(vec![ RowSelector::skip(6), RowSelector::select(10), RowSelector::skip(4), ]); - assert_eq!(RowSelection::from_iter(a), expected); - assert_eq!(RowSelection::from_iter(b), expected); - assert_eq!(RowSelection::from_iter(c), expected); + assert_eq!(RowSelectorRowSelection::from_iter(a), expected); + assert_eq!(RowSelectorRowSelection::from_iter(b), expected); + assert_eq!(RowSelectorRowSelection::from_iter(c), expected); } #[test] fn test_combine_2elements() { let a = vec![RowSelector::select(10), RowSelector::select(5)]; let a_expect = vec![RowSelector::select(15)]; - assert_eq!(RowSelection::from_iter(a).selectors, a_expect); + assert_eq!(RowSelectorRowSelection::from_iter(a).selectors, a_expect); let b = vec![RowSelector::select(10), RowSelector::skip(5)]; let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)]; - assert_eq!(RowSelection::from_iter(b).selectors, b_expect); + assert_eq!(RowSelectorRowSelection::from_iter(b).selectors, b_expect); let c = vec![RowSelector::skip(10), RowSelector::select(5)]; let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)]; - assert_eq!(RowSelection::from_iter(c).selectors, c_expect); + assert_eq!(RowSelectorRowSelection::from_iter(c).selectors, c_expect); let d = vec![RowSelector::skip(10), RowSelector::skip(5)]; let d_expect = vec![RowSelector::skip(15)]; - assert_eq!(RowSelection::from_iter(d).selectors, d_expect); + assert_eq!(RowSelectorRowSelection::from_iter(d).selectors, d_expect); } #[test] fn test_from_one_and_empty() { let a = vec![RowSelector::select(10)]; - let selection1 = RowSelection::from(a.clone()); + let selection1 = RowSelectorRowSelection::from(a.clone()); assert_eq!(selection1.selectors, a); let b = vec![]; - let selection1 = RowSelection::from(b.clone()); + let selection1 = RowSelectorRowSelection::from(b.clone()); assert_eq!(selection1.selectors, b) } #[test] #[should_panic(expected = "selection exceeds the number of selected rows")] fn test_and_longer() { - let a = RowSelection::from(vec![ + let a = RowSelectorRowSelection::from(vec![ RowSelector::select(3), RowSelector::skip(33), RowSelector::select(3), RowSelector::skip(33), ]); - let b = RowSelection::from(vec![RowSelector::select(36)]); + let b = RowSelectorRowSelection::from(vec![RowSelector::select(36)]); a.and_then(&b); } #[test] #[should_panic(expected = "selection contains less than the number of selected rows")] fn test_and_shorter() { - let a = RowSelection::from(vec![ + let a = RowSelectorRowSelection::from(vec![ RowSelector::select(3), RowSelector::skip(33), RowSelector::select(3), RowSelector::skip(33), ]); - let b = RowSelection::from(vec![RowSelector::select(3)]); + let b = RowSelectorRowSelection::from(vec![RowSelector::select(3)]); a.and_then(&b); } @@ -1311,11 +1590,11 @@ mod tests { for _ in 0..100 { let a_len = rand.random_range(10..100); let a_bools: Vec<_> = (0..a_len).map(|_| rand.random_bool(0.2)).collect(); - let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]); + let a = RowSelectorRowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]); let b_len: usize = a_bools.iter().map(|x| *x as usize).sum(); let b_bools: Vec<_> = (0..b_len).map(|_| rand.random_bool(0.8)).collect(); - let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]); + let b = RowSelectorRowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]); let mut expected_bools = vec![false; a_len]; @@ -1326,7 +1605,8 @@ mod tests { } } - let expected = RowSelection::from_filters(&[BooleanArray::from(expected_bools)]); + let expected = + RowSelectorRowSelection::from_filters(&[BooleanArray::from(expected_bools)]); let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum(); assert_eq!(a_len, total_rows); @@ -1345,7 +1625,7 @@ mod tests { RowSelector::select(4), ]; - let round_tripped = RowSelection::from(selectors.clone()) + let round_tripped = RowSelectorRowSelection::from(selectors.clone()) .iter() .cloned() .collect::>(); @@ -1355,11 +1635,15 @@ mod tests { #[test] fn test_limit() { // Limit to existing limit should no-op - let selection = RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(90)]); + let selection = + RowSelectorRowSelection::from(vec![RowSelector::select(10), RowSelector::skip(90)]); let limited = selection.limit(10); - assert_eq!(RowSelection::from(vec![RowSelector::select(10)]), limited); + assert_eq!( + RowSelectorRowSelection::from(vec![RowSelector::select(10)]), + limited + ); - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ RowSelector::select(10), RowSelector::skip(10), RowSelector::select(10), @@ -1444,7 +1728,7 @@ mod tests { }, ]; - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ // Skip first page RowSelector::skip(10), // Multiple selects in same page @@ -1467,7 +1751,7 @@ mod tests { // assert_eq!(mask, vec![false, true, true, false, true, true, false]); assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]); - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ // Skip first page RowSelector::skip(10), // Multiple selects in same page @@ -1491,7 +1775,7 @@ mod tests { // assert_eq!(mask, vec![false, true, true, false, true, true, true]); assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]); - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ // Skip first page RowSelector::skip(10), // Multiple selects in same page @@ -1517,7 +1801,7 @@ mod tests { // assert_eq!(mask, vec![false, true, true, false, true, true, true]); assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]); - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ // Skip first page RowSelector::skip(10), // Multiple selects in same page @@ -1540,7 +1824,7 @@ mod tests { #[test] fn test_from_ranges() { let ranges = [1..3, 4..6, 6..6, 8..8, 9..10]; - let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10); + let selection = RowSelectorRowSelection::from_consecutive_ranges(ranges.into_iter(), 10); assert_eq!( selection.selectors, vec![ @@ -1555,14 +1839,14 @@ mod tests { let out_of_order_ranges = [1..3, 8..10, 4..7]; let result = std::panic::catch_unwind(|| { - RowSelection::from_consecutive_ranges(out_of_order_ranges.into_iter(), 10) + RowSelectorRowSelection::from_consecutive_ranges(out_of_order_ranges.into_iter(), 10) }); assert!(result.is_err()); } #[test] fn test_empty_selector() { - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ RowSelector::skip(0), RowSelector::select(2), RowSelector::skip(0), @@ -1570,7 +1854,7 @@ mod tests { ]); assert_eq!(selection.selectors, vec![RowSelector::select(4)]); - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ RowSelector::select(0), RowSelector::skip(2), RowSelector::select(0), @@ -1581,18 +1865,18 @@ mod tests { #[test] fn test_intersection() { - let selection = RowSelection::from(vec![RowSelector::select(1048576)]); + let selection = RowSelectorRowSelection::from(vec![RowSelector::select(1048576)]); let result = selection.intersection(&selection); assert_eq!(result, selection); - let a = RowSelection::from(vec![ + let a = RowSelectorRowSelection::from(vec![ RowSelector::skip(10), RowSelector::select(10), RowSelector::skip(10), RowSelector::select(20), ]); - let b = RowSelection::from(vec![ + let b = RowSelectorRowSelection::from(vec![ RowSelector::skip(20), RowSelector::select(20), RowSelector::skip(10), @@ -1611,12 +1895,12 @@ mod tests { #[test] fn test_union() { - let selection = RowSelection::from(vec![RowSelector::select(1048576)]); + let selection = RowSelectorRowSelection::from(vec![RowSelector::select(1048576)]); let result = selection.union(&selection); assert_eq!(result, selection); // NYNYY - let a = RowSelection::from(vec![ + let a = RowSelectorRowSelection::from(vec![ RowSelector::skip(10), RowSelector::select(10), RowSelector::skip(10), @@ -1624,7 +1908,7 @@ mod tests { ]); // NNYYNYN - let b = RowSelection::from(vec![ + let b = RowSelectorRowSelection::from(vec![ RowSelector::skip(20), RowSelector::select(20), RowSelector::skip(10), @@ -1647,7 +1931,7 @@ mod tests { #[test] fn test_row_count() { - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ RowSelector::skip(34), RowSelector::select(12), RowSelector::skip(3), @@ -1657,17 +1941,19 @@ mod tests { assert_eq!(selection.row_count(), 12 + 35); assert_eq!(selection.skipped_row_count(), 34 + 3); - let selection = RowSelection::from(vec![RowSelector::select(12), RowSelector::select(35)]); + let selection = + RowSelectorRowSelection::from(vec![RowSelector::select(12), RowSelector::select(35)]); assert_eq!(selection.row_count(), 12 + 35); assert_eq!(selection.skipped_row_count(), 0); - let selection = RowSelection::from(vec![RowSelector::skip(34), RowSelector::skip(3)]); + let selection = + RowSelectorRowSelection::from(vec![RowSelector::skip(34), RowSelector::skip(3)]); assert_eq!(selection.row_count(), 0); assert_eq!(selection.skipped_row_count(), 34 + 3); - let selection = RowSelection::from(vec![]); + let selection = RowSelectorRowSelection::from(vec![]); assert_eq!(selection.row_count(), 0); assert_eq!(selection.skipped_row_count(), 0); @@ -1675,7 +1961,7 @@ mod tests { #[test] fn test_trim() { - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ RowSelector::skip(34), RowSelector::select(12), RowSelector::skip(3), @@ -1691,7 +1977,7 @@ mod tests { assert_eq!(selection.trim().selectors, expected); - let selection = RowSelection::from(vec![ + let selection = RowSelectorRowSelection::from(vec![ RowSelector::skip(34), RowSelector::select(12), RowSelector::skip(3), diff --git a/parquet/src/arrow/arrow_reader/selection/mask.rs b/parquet/src/arrow/arrow_reader/selection/mask.rs new file mode 100644 index 000000000000..aa217d05ca2a --- /dev/null +++ b/parquet/src/arrow/arrow_reader/selection/mask.rs @@ -0,0 +1,566 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::arrow_reader::{RowSelection, RowSelector}; +use arrow_array::{Array, BooleanArray}; +use arrow_buffer::bit_iterator::BitIndexIterator; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, MutableBuffer}; +use std::ops::Range; + +/// Selection based on a [`BooleanArray`] +/// +/// This represents the result of a filter evaluation as a bitmap. It is +/// more efficient for sparse filter results with many small skips or selections. +/// +/// It is similar to the "Bitmap Index" described in +/// [Predicate Caching: Query-Driven Secondary Indexing for Cloud Data Warehouses] +/// +/// [Predicate Caching: Query-Driven Secondary Indexing for Cloud Data Warehouses]: https://dl.acm.org/doi/10.1145/3626246.3653395 +/// +/// Originally from Xiangpeng Hao in +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct BooleanRowSelection { + selector: BooleanBuffer, +} + +impl BooleanRowSelection { + /// Create a new [`BooleanRowSelection] from a list of [`BooleanArray`]. + pub fn from_filters(filters: &[BooleanArray]) -> Self { + let arrays: Vec<&dyn Array> = filters.iter().map(|x| x as &dyn Array).collect(); + let result = arrow_select::concat::concat(&arrays).unwrap().into_data(); + let (boolean_array, _null) = BooleanArray::from(result).into_parts(); + BooleanRowSelection { + selector: boolean_array, + } + } + + /// Create a new [`BooleanRowSelection`] with all rows unselected + pub fn new_unselected(row_count: usize) -> Self { + let buffer = BooleanBuffer::new_unset(row_count); + + BooleanRowSelection { selector: buffer } + } + + /// Create a new [`BooleanRowSelection`] with all rows selected + pub fn new_selected(row_count: usize) -> Self { + let buffer = BooleanBuffer::new_set(row_count); + + BooleanRowSelection { selector: buffer } + } + + /// Returns a new [`BooleanRowSelection`] that selects the inverse of this [`BooleanRowSelection`]. + pub fn as_inverted(&self) -> Self { + let buffer = !&self.selector; + BooleanRowSelection { selector: buffer } + } + + /// Returns the number of rows selected by this [`BooleanRowSelection`]. + pub fn row_count(&self) -> usize { + self.selector.count_set_bits() + } + + /// Create a new [`BooleanRowSelection`] from a list of consecutive ranges. + pub fn from_consecutive_ranges( + ranges: impl Iterator>, + total_rows: usize, + ) -> Self { + let mut buffer = BooleanBufferBuilder::new(total_rows); + let mut last_end = 0; + + for range in ranges { + let len = range.end - range.start; + if len == 0 { + continue; + } + + if range.start > last_end { + buffer.append_n(range.start - last_end, false); + } + buffer.append_n(len, true); + last_end = range.end; + } + + if last_end != total_rows { + buffer.append_n(total_rows - last_end, false); + } + + BooleanRowSelection { + selector: buffer.finish(), + } + } + + /// Compute the union of two [`BooleanRowSelection`] + /// For example: + /// self: NNYYYYNNYYNYN + /// other: NYNNNNNNN + /// + /// returned: NYYYYYNNYYNYN + #[must_use] + pub fn union(&self, other: &Self) -> Self { + // use arrow::compute::kernels::boolean::or; + + let union_selectors = &self.selector | &other.selector; + + BooleanRowSelection { + selector: union_selectors, + } + } + + /// Compute the intersection of two [`BooleanRowSelection`] + /// For example: + /// self: NNYYYYNNYYNYN + /// other: NYNNNNNNY + /// + /// returned: NNNNNNNNYYNYN + #[must_use] + pub fn intersection(&self, other: &Self) -> Self { + let intersection_selectors = &self.selector & &other.selector; + + BooleanRowSelection { + selector: intersection_selectors, + } + } + + /// Combines this [`BooleanRowSelection`] with another using logical AND on the selected bits. + /// + /// Unlike intersection, the `other` [`BooleanRowSelection`] must have exactly as many set bits as `self`. + /// This method will keep only the bits in `self` that are also set in `other` + /// at the positions corresponding to `self`'s set bits. + pub fn and_then(&self, other: &Self) -> Self { + // Ensure that 'other' has exactly as many set bits as 'self' + debug_assert_eq!( + self.row_count(), + other.selector.len(), + "The 'other' selection must have exactly as many set bits as 'self'." + ); + + if self.selector.len() == other.selector.len() { + // fast path if the two selections are the same length + // common if this is the first predicate + debug_assert_eq!(self.row_count(), self.selector.len()); + return self.intersection(other); + } + + let mut buffer = MutableBuffer::from_len_zeroed(self.selector.inner().len()); + buffer.copy_from_slice(self.selector.inner().as_slice()); + let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.selector.len()); + + // Create iterators for 'self' and 'other' bits + let mut other_bits = other.selector.iter(); + + for bit_idx in self.true_iter() { + let predicate = other_bits + .next() + .expect("Mismatch in set bits between self and other"); + if !predicate { + builder.set_bit(bit_idx, false); + } + } + + BooleanRowSelection { + selector: builder.finish(), + } + } + + /// Returns an iterator over the indices of the set bits in this [`BooleanRowSelection`] + pub fn true_iter(&self) -> BitIndexIterator<'_> { + self.selector.set_indices() + } + + /// Returns `true` if this [`BooleanRowSelection`] selects any rows + pub fn selects_any(&self) -> bool { + self.true_iter().next().is_some() + } + + /// Returns a new [`BooleanRowSelection`] that selects the rows in this [`BooleanRowSelection`] from `offset` to `offset + len` + pub fn slice(&self, offset: usize, len: usize) -> BooleanArray { + BooleanArray::new(self.selector.slice(offset, len), None) + } +} + +impl From> for BooleanRowSelection { + fn from(selection: Vec) -> Self { + let selection = RowSelection::from(selection); + RowSelection::into(selection) + } +} + +impl From for BooleanRowSelection { + fn from(selection: RowSelection) -> Self { + let total_rows = selection.row_count(); + let mut builder = BooleanBufferBuilder::new(total_rows); + + for selector in selection.iter() { + if selector.skip { + builder.append_n(selector.row_count, false); + } else { + builder.append_n(selector.row_count, true); + } + } + + BooleanRowSelection { + selector: builder.finish(), + } + } +} + +impl From<&BooleanRowSelection> for RowSelection { + fn from(selection: &BooleanRowSelection) -> Self { + let array = BooleanArray::new(selection.selector.clone(), None); + RowSelection::from_filters(&[array]) + } +} + +/// Combines this [`BooleanBuffer`] with another using logical AND on the selected bits. +/// +/// Unlike intersection, the `other` [`BooleanBuffer`] must have exactly as many **set bits** as `self`, +/// i.e., self.count_set_bits() == other.len(). +/// +/// This method will keep only the bits in `self` that are also set in `other` +/// at the positions corresponding to `self`'s set bits. +/// For example: +/// left: NNYYYNNYYNYN +/// right: YNY NY N +/// result: NNYNYNNNYNNN +/// +/// Optimized version of `boolean_buffer_and_then` using BMI2 PDEP instructions. +/// This function performs the same operation but uses bit manipulation instructions +/// for better performance on supported x86_64 CPUs. +pub fn boolean_buffer_and_then(left: &BooleanBuffer, right: &BooleanBuffer) -> BooleanBuffer { + debug_assert_eq!( + left.count_set_bits(), + right.len(), + "the right selection must have the same number of set bits as the left selection" + ); + + if left.len() == right.len() { + debug_assert_eq!(left.count_set_bits(), left.len()); + return right.clone(); + } + + // Fast path for BMI2 support + #[cfg(target_arch = "x86_64")] + { + if is_x86_feature_detected!("bmi2") { + unsafe { boolean_buffer_and_then_bmi2(left, right) } + } else { + // Fallback to the original implementation + boolean_buffer_and_then_fallback(left, right) + } + } + #[cfg(not(target_arch = "x86_64"))] + { + // Fallback to the original implementation + boolean_buffer_and_then_fallback(left, right) + } +} + +fn boolean_buffer_and_then_fallback(left: &BooleanBuffer, right: &BooleanBuffer) -> BooleanBuffer { + todo!() +} + +#[cfg(target_arch = "x86_64")] +#[target_feature(enable = "bmi2")] +unsafe fn boolean_buffer_and_then_bmi2( + left: &BooleanBuffer, + right: &BooleanBuffer, +) -> BooleanBuffer { + use core::arch::x86_64::_pdep_u64; + + debug_assert_eq!(left.count_set_bits(), right.len()); + + let bit_len = left.len(); + let byte_len = (bit_len + 7) / 8; + let left_ptr = left.values().as_ptr(); + let right_ptr = right.values().as_ptr(); + + let mut out = MutableBuffer::from_len_zeroed(byte_len); + let out_ptr = out.as_mut_ptr(); + + let full_words = byte_len / 8; + let mut right_bit_idx = 0; // how many bits we have processed from right + + for word_idx in 0..full_words { + let left_word = + unsafe { core::ptr::read_unaligned(left_ptr.add(word_idx * 8) as *const u64) }; + + if left_word == 0 { + continue; + } + + let need = left_word.count_ones(); + + // Absolute byte & bit offset of the first needed bit inside `right`. + let rb_byte = right_bit_idx / 8; + let rb_bit = (right_bit_idx & 7) as u32; + + // We load two u64 words and shift/mask them to avoid branches and loops. + let mut r_bits = + unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte) as *const u64) } >> rb_bit; + if rb_bit != 0 { + let next = + unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte + 8) as *const u64) }; + r_bits |= next << (64 - rb_bit); + } + + // Mask off the high garbage if we asked for < 64 bits. + r_bits &= 1u64.unbounded_shl(need).wrapping_sub(1); + + // The PDEP instruction: https://www.felixcloutier.com/x86/pdep + // It takes left_word as the mask, and deposit the packed bits into the sparse positions of `left_word`. + let result = _pdep_u64(r_bits, left_word); + + unsafe { + core::ptr::write_unaligned(out_ptr.add(word_idx * 8) as *mut u64, result); + } + + right_bit_idx += need as usize; + } + + // Handle remaining bits that are less than 64 bits + let tail_bits = bit_len & 63; + if tail_bits != 0 { + let mut mask = 0u64; + for bit in 0..tail_bits { + let byte = unsafe { *left_ptr.add(full_words * 8 + (bit / 8)) }; + mask |= (((byte >> (bit & 7)) & 1) as u64) << bit; + } + + if mask != 0 { + let need = mask.count_ones(); + + let rb_byte = right_bit_idx / 8; + let rb_bit = (right_bit_idx & 7) as u32; + + let mut r_bits = + unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte) as *const u64) } + >> rb_bit; + if rb_bit != 0 { + let next = + unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte + 8) as *const u64) }; + r_bits |= next << (64 - rb_bit); + } + + r_bits &= 1u64.unbounded_shl(need).wrapping_sub(1); + + let result = _pdep_u64(r_bits, mask); + + let tail_bytes = (tail_bits + 7) / 8; + unsafe { + core::ptr::copy_nonoverlapping( + &result.to_le_bytes()[0], + out_ptr.add(full_words * 8), + tail_bytes, + ); + } + } + } + + BooleanBuffer::new(out.into(), 0, bit_len) +} + +#[cfg(test)] +mod tests { + use rand::Rng; + + use super::*; + + fn generate_random_row_selection(total_rows: usize, selection_ratio: f64) -> BooleanArray { + let mut rng = rand::rng(); + let bools: Vec = (0..total_rows) + .map(|_| rng.random_bool(selection_ratio)) + .collect(); + BooleanArray::from(bools) + } + + #[test] + fn test_boolean_row_selection_round_trip() { + let total_rows = 1_000; + for &selection_ratio in &[0.0, 0.1, 0.5, 0.9, 1.0] { + let selection = generate_random_row_selection(total_rows, selection_ratio); + let boolean_selection = BooleanRowSelection::from_filters(&[selection]); + let row_selection = RowSelection::from(&boolean_selection); + let boolean_selection_again = row_selection.into(); + assert_eq!(boolean_selection, boolean_selection_again); + } + } + + #[test] + fn test_boolean_union_intersection() { + let total_rows = 1_000; + + let base_boolean_selection = + BooleanRowSelection::from_filters(&[generate_random_row_selection(total_rows, 0.1)]); + let base_row_selection = RowSelection::from(&base_boolean_selection); + for &selection_ratio in &[0.0, 0.1, 0.5, 0.9, 1.0] { + let boolean_selection = + BooleanRowSelection::from_filters(&[generate_random_row_selection( + total_rows, + selection_ratio, + )]); + let row_selection = RowSelection::from(&boolean_selection); + + let boolean_union = boolean_selection.union(&base_boolean_selection); + let row_union = row_selection.union(&base_row_selection); + assert_eq!(boolean_union, BooleanRowSelection::from(row_union)); + + let boolean_intersection = boolean_selection.intersection(&base_boolean_selection); + let row_intersection = row_selection.intersection(&base_row_selection); + assert_eq!( + boolean_intersection, + BooleanRowSelection::from(row_intersection) + ); + } + } + + #[test] + fn test_boolean_selection_and_then() { + // Initial mask: 001011010101 + let self_filters = vec![BooleanArray::from(vec![ + false, false, true, false, true, true, false, true, false, true, false, true, + ])]; + let self_selection = BooleanRowSelection::from_filters(&self_filters); + + // Predicate mask (only for selected bits): 001101 + let other_filters = vec![BooleanArray::from(vec![ + false, false, true, true, false, true, + ])]; + let other_selection = BooleanRowSelection::from_filters(&other_filters); + + let result = self_selection.and_then(&other_selection); + + // Expected result: 000001010001 + let expected_filters = vec![BooleanArray::from(vec![ + false, false, false, false, false, true, false, true, false, false, false, true, + ])]; + let expected_selection = BooleanRowSelection::from_filters(&expected_filters); + + assert_eq!(result, expected_selection); + } + + #[test] + #[should_panic( + expected = "The 'other' selection must have exactly as many set bits as 'self'." + )] + fn test_and_then_mismatched_set_bits() { + let self_filters = vec![BooleanArray::from(vec![true, true, false])]; + let self_selection = BooleanRowSelection::from_filters(&self_filters); + + // 'other' has only one set bit, but 'self' has two + let other_filters = vec![BooleanArray::from(vec![true, false, false])]; + let other_selection = BooleanRowSelection::from_filters(&other_filters); + + // This should panic + let _ = self_selection.and_then(&other_selection); + } + + #[test] + #[cfg(target_arch = "x86_64")] + fn test_boolean_buffer_and_then_bmi2_large() { + use super::boolean_buffer_and_then_bmi2; + + // Test with larger buffer (more than 64 bits) + let size = 128; + let mut left_builder = BooleanBufferBuilder::new(size); + let mut right_bits = Vec::new(); + + // Create a pattern where every 3rd bit is set in left + for i in 0..size { + let is_set = i % 3 == 0; + left_builder.append(is_set); + if is_set { + // For right buffer, alternate between true/false + right_bits.push(right_bits.len() % 2 == 0); + } + } + let left = left_builder.finish(); + + let mut right_builder = BooleanBufferBuilder::new(right_bits.len()); + for bit in right_bits { + right_builder.append(bit); + } + let right = right_builder.finish(); + + let result_bmi2 = unsafe { boolean_buffer_and_then_bmi2(&left, &right) }; + let result_orig = boolean_buffer_and_then_fallback(&left, &right); + + assert_eq!(result_bmi2.len(), result_orig.len()); + assert_eq!(result_bmi2.len(), size); + + // Verify they produce the same result + for i in 0..size { + assert_eq!( + result_bmi2.value(i), + result_orig.value(i), + "Mismatch at position {}", + i + ); + } + } + + #[test] + #[cfg(target_arch = "x86_64")] + fn test_boolean_buffer_and_then_bmi2_edge_cases() { + use super::boolean_buffer_and_then_bmi2; + + // Test case: all bits set in left, alternating pattern in right + let mut left_builder = BooleanBufferBuilder::new(16); + for _ in 0..16 { + left_builder.append(true); + } + let left = left_builder.finish(); + + let mut right_builder = BooleanBufferBuilder::new(16); + for i in 0..16 { + right_builder.append(i % 2 == 0); + } + let right = right_builder.finish(); + + let result_bmi2 = unsafe { boolean_buffer_and_then_bmi2(&left, &right) }; + let result_orig = boolean_buffer_and_then_fallback(&left, &right); + + assert_eq!(result_bmi2.len(), result_orig.len()); + for i in 0..16 { + assert_eq!( + result_bmi2.value(i), + result_orig.value(i), + "Mismatch at position {}", + i + ); + // Should be true for even indices, false for odd + assert_eq!(result_bmi2.value(i), i % 2 == 0); + } + + // Test case: no bits set in left + let mut left_empty_builder = BooleanBufferBuilder::new(8); + for _ in 0..8 { + left_empty_builder.append(false); + } + let left_empty = left_empty_builder.finish(); + let right_empty = BooleanBufferBuilder::new(0).finish(); + + let result_bmi2_empty = unsafe { boolean_buffer_and_then_bmi2(&left_empty, &right_empty) }; + let result_orig_empty = boolean_buffer_and_then_fallback(&left_empty, &right_empty); + + assert_eq!(result_bmi2_empty.len(), result_orig_empty.len()); + assert_eq!(result_bmi2_empty.len(), 8); + for i in 0..8 { + assert_eq!(result_bmi2_empty.value(i), false); + assert_eq!(result_orig_empty.value(i), false); + } + } +} diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 61a244589c6d..a3e71106187c 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -712,6 +712,6 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 200); + assert_eq!(std::mem::size_of::(), 224); } }