Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port row_ids to arrow1 #8657

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/store/re_chunk/src/arrow_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use itertools::Itertools;

// ---

#[inline]
pub fn into_arrow_ref(array: impl Array + 'static) -> ArrayRef {
std::sync::Arc::new(array)
}

/// Returns true if the given `list_array` is semantically empty.
///
/// Semantic emptiness is defined as either one of these:
Expand Down Expand Up @@ -285,7 +290,7 @@ where
);

if indices.len() == array.len() {
let indices = indices.values().as_ref();
let indices = indices.values();

let starts_at_zero = || indices[0] == O::Native::ZERO;
let is_consecutive = || {
Expand Down
74 changes: 33 additions & 41 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::sync::atomic::{AtomicU64, Ordering};

use ahash::HashMap;
use arrow::{
array::{Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray},
array::{
Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray,
StructArray as ArrowStructArray, UInt64Array as ArrowUInt64Array,
},
buffer::ScalarBuffer as ArrowScalarBuffer,
};
use arrow2::{
array::{
Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray,
StructArray as Arrow2StructArray,
},
array::{Array as Arrow2Array, ListArray as Arrow2ListArray},
Either,
};
use itertools::{izip, Itertools};
Expand All @@ -34,7 +34,10 @@ pub enum ChunkError {
Malformed { reason: String },

#[error(transparent)]
Arrow(#[from] arrow2::error::Error),
Arrow(#[from] arrow::error::ArrowError),

#[error(transparent)]
Arrow2(#[from] arrow2::error::Error),

#[error("{kind} index out of bounds: {index} (len={len})")]
IndexOutOfBounds {
Expand Down Expand Up @@ -224,7 +227,7 @@ pub struct Chunk {
pub(crate) is_sorted: bool,

/// The respective [`RowId`]s for each row of data.
pub(crate) row_ids: Arrow2StructArray,
pub(crate) row_ids: ArrowStructArray,

/// The time columns.
///
Expand Down Expand Up @@ -351,12 +354,6 @@ impl Chunk {
components,
} = self;

let row_ids_no_extension = arrow2::array::StructArray::new(
row_ids.data_type().to_logical_type().clone(),
row_ids.values().to_vec(),
row_ids.validity().cloned(),
);

let components_no_extension: IntMap<_, _> = components
.values()
.flat_map(|per_desc| {
Expand Down Expand Up @@ -388,16 +385,10 @@ impl Chunk {
})
.collect();

let other_row_ids_no_extension = arrow2::array::StructArray::new(
other.row_ids.data_type().to_logical_type().clone(),
other.row_ids.values().to_vec(),
other.row_ids.validity().cloned(),
);

*id == other.id
&& *entity_path == other.entity_path
&& *is_sorted == other.is_sorted
&& row_ids_no_extension == other_row_ids_no_extension
&& row_ids == &other.row_ids
&& *timelines == other.timelines
&& components_no_extension == other_components_no_extension
}
Expand Down Expand Up @@ -437,11 +428,11 @@ impl Chunk {
.collect_vec();

#[allow(clippy::unwrap_used)]
let row_ids = <RowId as Loggable>::to_arrow2(&row_ids)
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.as_any()
.downcast_ref::<Arrow2StructArray>()
.downcast_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();
Expand Down Expand Up @@ -493,11 +484,11 @@ impl Chunk {
.collect_vec();

#[allow(clippy::unwrap_used)]
let row_ids = <RowId as Loggable>::to_arrow2(&row_ids)
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.as_any()
.downcast_ref::<Arrow2StructArray>()
.downcast_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();
Expand Down Expand Up @@ -826,7 +817,7 @@ impl Chunk {
id: ChunkId,
entity_path: EntityPath,
is_sorted: Option<bool>,
row_ids: Arrow2StructArray,
row_ids: ArrowStructArray,
timelines: IntMap<Timeline, TimeColumn>,
components: ChunkComponents,
) -> ChunkResult<Self> {
Expand Down Expand Up @@ -866,13 +857,13 @@ impl Chunk {
) -> ChunkResult<Self> {
re_tracing::profile_function!();
let row_ids = row_ids
.to_arrow2()
.to_arrow()
// NOTE: impossible, but better safe than sorry.
.map_err(|err| ChunkError::Malformed {
reason: format!("RowIds failed to serialize: {err}"),
})?
.as_any()
.downcast_ref::<Arrow2StructArray>()
.downcast_ref::<ArrowStructArray>()
// NOTE: impossible, but better safe than sorry.
.ok_or_else(|| ChunkError::Malformed {
reason: "RowIds failed to downcast".to_owned(),
Expand Down Expand Up @@ -923,7 +914,7 @@ impl Chunk {
id: ChunkId,
entity_path: EntityPath,
is_sorted: Option<bool>,
row_ids: Arrow2StructArray,
row_ids: ArrowStructArray,
components: ChunkComponents,
) -> ChunkResult<Self> {
Self::new(
Expand All @@ -943,7 +934,11 @@ impl Chunk {
entity_path,
heap_size_bytes: Default::default(),
is_sorted: true,
row_ids: Arrow2StructArray::new_empty(RowId::arrow2_datatype()),
row_ids: arrow::array::StructBuilder::from_fields(
re_types_core::tuid_arrow_fields(),
0,
)
.finish(),
timelines: Default::default(),
components: Default::default(),
}
Expand Down Expand Up @@ -1203,27 +1198,24 @@ impl Chunk {
}

#[inline]
pub fn row_ids_array(&self) -> &Arrow2StructArray {
pub fn row_ids_array(&self) -> &ArrowStructArray {
&self.row_ids
}

/// Returns the [`RowId`]s in their raw-est form: a tuple of (times, counters) arrays.
#[inline]
pub fn row_ids_raw(&self) -> (&Arrow2PrimitiveArray<u64>, &Arrow2PrimitiveArray<u64>) {
let [times, counters] = self.row_ids.values() else {
pub fn row_ids_raw(&self) -> (&ArrowUInt64Array, &ArrowUInt64Array) {
let [times, counters] = self.row_ids.columns() else {
panic!("RowIds are corrupt -- this should be impossible (sanity checked)");
};

#[allow(clippy::unwrap_used)]
let times = times
.as_any()
.downcast_ref::<Arrow2PrimitiveArray<u64>>()
.unwrap(); // sanity checked
let times = times.as_any().downcast_ref::<ArrowUInt64Array>().unwrap(); // sanity checked

#[allow(clippy::unwrap_used)]
let counters = counters
.as_any()
.downcast_ref::<Arrow2PrimitiveArray<u64>>()
.downcast_ref::<ArrowUInt64Array>()
.unwrap(); // sanity checked

(times, counters)
Expand All @@ -1235,7 +1227,7 @@ impl Chunk {
#[inline]
pub fn row_ids(&self) -> impl Iterator<Item = RowId> + '_ {
let (times, counters) = self.row_ids_raw();
izip!(times.values().as_ref(), counters.values().as_slice())
izip!(times.values(), counters.values())
.map(|(&time, &counter)| RowId::from_u128((time as u128) << 64 | (counter as u128)))
}

Expand Down Expand Up @@ -1277,7 +1269,7 @@ impl Chunk {
}

let (times, counters) = self.row_ids_raw();
let (times, counters) = (times.values().as_ref(), counters.values().as_slice());
let (times, counters) = (times.values(), counters.values());

#[allow(clippy::unwrap_used)] // checked above
let (index_min, index_max) = if self.is_sorted() {
Expand Down Expand Up @@ -1558,11 +1550,11 @@ impl Chunk {

// Row IDs
{
if *row_ids.data_type().to_logical_type() != RowId::arrow2_datatype() {
if *row_ids.data_type() != RowId::arrow_datatype() {
return Err(ChunkError::Malformed {
reason: format!(
"RowId data has the wrong datatype: expected {:?} but got {:?} instead",
RowId::arrow2_datatype(),
RowId::arrow_datatype(),
*row_ids.data_type(),
),
});
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_chunk/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,8 @@ impl Iterator for ChunkIndicesIter {

let row_id = {
let (times, incs) = self.chunk.row_ids_raw();
let times = times.values().as_slice();
let incs = incs.values().as_slice();
let times = times.values();
let incs = incs.values();

let time = *times.get(i)?;
let inc = *incs.get(i)?;
Expand Down
12 changes: 6 additions & 6 deletions crates/store/re_chunk/src/merge.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use arrow::array::StructArray as ArrowStructArray;
use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
use arrow2::array::{
Array as Arrow2Array, ListArray as Arrow2ListArray, StructArray as Arrow2StructArray,
};
use arrow2::array::{Array as Arrow2Array, ListArray as Arrow2ListArray};
use itertools::{izip, Itertools};
use nohash_hasher::IntMap;

use crate::{
arrow2_util, chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, TimeColumn,
arrow2_util, arrow_util, chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult,
TimeColumn,
};

// ---
Expand Down Expand Up @@ -48,12 +48,12 @@ impl Chunk {
let row_ids = {
re_tracing::profile_scope!("row_ids");

let row_ids = arrow2_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?;
let row_ids = arrow_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?;
#[allow(clippy::unwrap_used)]
// concatenating 2 RowId arrays must yield another RowId array
row_ids
.as_any()
.downcast_ref::<Arrow2StructArray>()
.downcast_ref::<ArrowStructArray>()
.unwrap()
.clone()
};
Expand Down
23 changes: 11 additions & 12 deletions crates/store/re_chunk/src/shuffle.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
use std::sync::Arc;

use arrow::{
array::{Array as _, StructArray as ArrowStructArray, UInt64Array as ArrowUInt64Array},
buffer::ScalarBuffer as ArrowScalarBuffer,
};
use arrow2::{
array::{
Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray,
StructArray,
},
array::{Array as Arrow2Array, ListArray as Arrow2ListArray},
offset::Offsets as ArrowOffsets,
};
use itertools::Itertools as _;
Expand Down Expand Up @@ -213,14 +215,11 @@ impl Chunk {
sorted_counters[to] = counters[from];
}

let times = Arrow2PrimitiveArray::<u64>::from_vec(sorted_times).boxed();
let counters = Arrow2PrimitiveArray::<u64>::from_vec(sorted_counters).boxed();
let times = Arc::new(ArrowUInt64Array::from(sorted_times));
let counters = Arc::new(ArrowUInt64Array::from(sorted_counters));

self.row_ids = StructArray::new(
self.row_ids.data_type().clone(),
vec![times, counters],
None,
);
self.row_ids =
ArrowStructArray::new(self.row_ids.fields().clone(), vec![times, counters], None);
}

let Self {
Expand Down
23 changes: 14 additions & 9 deletions crates/store/re_chunk/src/slice.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use arrow2::array::{
Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, ListArray as Arrow2ListArray,
StructArray as Arrow2StructArray,
};

use itertools::Itertools;
Expand Down Expand Up @@ -38,8 +37,8 @@ impl Chunk {
let row_id_inc = (row_id_128 & (!0 >> 64)) as u64;

let (times, incs) = self.row_ids_raw();
let times = times.values().as_slice();
let incs = incs.values().as_slice();
let times = times.values();
let incs = incs.values();

let mut index = times.partition_point(|&time| time < row_id_time_ns);
while index < incs.len() && incs[index] < row_id_inc {
Expand Down Expand Up @@ -102,7 +101,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: row_ids.clone().sliced(index, len),
row_ids: row_ids.clone().slice(index, len),
timelines: timelines
.iter()
.map(|(timeline, time_column)| (*timeline, time_column.row_sliced(index, len)))
Expand Down Expand Up @@ -369,7 +368,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: arrow2_util::filter_array(row_ids, &validity_filter),
row_ids: arrow_util::filter_array(row_ids, &validity_filter.clone().into()),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.filtered(&validity_filter)))
Expand Down Expand Up @@ -450,7 +449,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted: true,
row_ids: Arrow2StructArray::new_empty(row_ids.data_type().clone()),
row_ids: arrow::array::StructBuilder::from_fields(row_ids.fields().clone(), 0).finish(),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.emptied()))
Expand Down Expand Up @@ -546,7 +545,10 @@ impl Chunk {
entity_path: self.entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted: self.is_sorted,
row_ids: arrow2_util::take_array(&self.row_ids, &indices),
row_ids: arrow_util::take_array(
&self.row_ids,
&arrow::array::Int32Array::from(indices.clone()),
),
timelines: self
.timelines
.iter()
Expand Down Expand Up @@ -619,7 +621,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: arrow2_util::filter_array(row_ids, filter),
row_ids: arrow_util::filter_array(row_ids, &filter.clone().into()),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.filtered(filter)))
Expand Down Expand Up @@ -699,7 +701,10 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: arrow2_util::take_array(row_ids, indices),
row_ids: arrow_util::take_array(
row_ids,
&arrow::array::Int32Array::from(indices.clone()),
),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.taken(indices)))
Expand Down
Loading
Loading