diff --git a/crates/store/re_chunk/src/arrow_util.rs b/crates/store/re_chunk/src/arrow_util.rs index 9ff23565e963..c3d80b207b9b 100644 --- a/crates/store/re_chunk/src/arrow_util.rs +++ b/crates/store/re_chunk/src/arrow_util.rs @@ -7,6 +7,11 @@ use itertools::Itertools; // --- +#[inline] +pub fn into_arrow_ref(array: impl Array + 'static) -> ArrayRef { + std::sync::Arc::new(array) +} + /// Returns true if the given `list_array` is semantically empty. /// /// Semantic emptiness is defined as either one of these: @@ -285,7 +290,7 @@ where ); if indices.len() == array.len() { - let indices = indices.values().as_ref(); + let indices = indices.values(); let starts_at_zero = || indices[0] == O::Native::ZERO; let is_consecutive = || { diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index b7032f6acf14..936c1cb6ef81 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -2,14 +2,14 @@ use std::sync::atomic::{AtomicU64, Ordering}; use ahash::HashMap; use arrow::{ - array::{Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray}, + array::{ + Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray, + StructArray as ArrowStructArray, UInt64Array as ArrowUInt64Array, + }, buffer::ScalarBuffer as ArrowScalarBuffer, }; use arrow2::{ - array::{ - Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, - StructArray as Arrow2StructArray, - }, + array::{Array as Arrow2Array, ListArray as Arrow2ListArray}, Either, }; use itertools::{izip, Itertools}; @@ -34,7 +34,10 @@ pub enum ChunkError { Malformed { reason: String }, #[error(transparent)] - Arrow(#[from] arrow2::error::Error), + Arrow(#[from] arrow::error::ArrowError), + + #[error(transparent)] + Arrow2(#[from] arrow2::error::Error), #[error("{kind} index out of bounds: {index} (len={len})")] IndexOutOfBounds { @@ -224,7 +227,7 @@ pub struct Chunk { pub(crate) is_sorted: bool, /// The respective [`RowId`]s for each row of data. - pub(crate) row_ids: Arrow2StructArray, + pub(crate) row_ids: ArrowStructArray, /// The time columns. /// @@ -351,12 +354,6 @@ impl Chunk { components, } = self; - let row_ids_no_extension = arrow2::array::StructArray::new( - row_ids.data_type().to_logical_type().clone(), - row_ids.values().to_vec(), - row_ids.validity().cloned(), - ); - let components_no_extension: IntMap<_, _> = components .values() .flat_map(|per_desc| { @@ -388,16 +385,10 @@ impl Chunk { }) .collect(); - let other_row_ids_no_extension = arrow2::array::StructArray::new( - other.row_ids.data_type().to_logical_type().clone(), - other.row_ids.values().to_vec(), - other.row_ids.validity().cloned(), - ); - *id == other.id && *entity_path == other.entity_path && *is_sorted == other.is_sorted - && row_ids_no_extension == other_row_ids_no_extension + && row_ids == &other.row_ids && *timelines == other.timelines && components_no_extension == other_components_no_extension } @@ -437,11 +428,11 @@ impl Chunk { .collect_vec(); #[allow(clippy::unwrap_used)] - let row_ids = ::to_arrow2(&row_ids) + let row_ids = ::to_arrow(&row_ids) // Unwrap: native RowIds cannot fail to serialize. .unwrap() .as_any() - .downcast_ref::() + .downcast_ref::() // Unwrap: RowId schema is known in advance to be a struct array -- always. .unwrap() .clone(); @@ -493,11 +484,11 @@ impl Chunk { .collect_vec(); #[allow(clippy::unwrap_used)] - let row_ids = ::to_arrow2(&row_ids) + let row_ids = ::to_arrow(&row_ids) // Unwrap: native RowIds cannot fail to serialize. .unwrap() .as_any() - .downcast_ref::() + .downcast_ref::() // Unwrap: RowId schema is known in advance to be a struct array -- always. .unwrap() .clone(); @@ -826,7 +817,7 @@ impl Chunk { id: ChunkId, entity_path: EntityPath, is_sorted: Option, - row_ids: Arrow2StructArray, + row_ids: ArrowStructArray, timelines: IntMap, components: ChunkComponents, ) -> ChunkResult { @@ -866,13 +857,13 @@ impl Chunk { ) -> ChunkResult { re_tracing::profile_function!(); let row_ids = row_ids - .to_arrow2() + .to_arrow() // NOTE: impossible, but better safe than sorry. .map_err(|err| ChunkError::Malformed { reason: format!("RowIds failed to serialize: {err}"), })? .as_any() - .downcast_ref::() + .downcast_ref::() // NOTE: impossible, but better safe than sorry. .ok_or_else(|| ChunkError::Malformed { reason: "RowIds failed to downcast".to_owned(), @@ -923,7 +914,7 @@ impl Chunk { id: ChunkId, entity_path: EntityPath, is_sorted: Option, - row_ids: Arrow2StructArray, + row_ids: ArrowStructArray, components: ChunkComponents, ) -> ChunkResult { Self::new( @@ -943,7 +934,11 @@ impl Chunk { entity_path, heap_size_bytes: Default::default(), is_sorted: true, - row_ids: Arrow2StructArray::new_empty(RowId::arrow2_datatype()), + row_ids: arrow::array::StructBuilder::from_fields( + re_types_core::tuid_arrow_fields(), + 0, + ) + .finish(), timelines: Default::default(), components: Default::default(), } @@ -1203,27 +1198,24 @@ impl Chunk { } #[inline] - pub fn row_ids_array(&self) -> &Arrow2StructArray { + pub fn row_ids_array(&self) -> &ArrowStructArray { &self.row_ids } /// Returns the [`RowId`]s in their raw-est form: a tuple of (times, counters) arrays. #[inline] - pub fn row_ids_raw(&self) -> (&Arrow2PrimitiveArray, &Arrow2PrimitiveArray) { - let [times, counters] = self.row_ids.values() else { + pub fn row_ids_raw(&self) -> (&ArrowUInt64Array, &ArrowUInt64Array) { + let [times, counters] = self.row_ids.columns() else { panic!("RowIds are corrupt -- this should be impossible (sanity checked)"); }; #[allow(clippy::unwrap_used)] - let times = times - .as_any() - .downcast_ref::>() - .unwrap(); // sanity checked + let times = times.as_any().downcast_ref::().unwrap(); // sanity checked #[allow(clippy::unwrap_used)] let counters = counters .as_any() - .downcast_ref::>() + .downcast_ref::() .unwrap(); // sanity checked (times, counters) @@ -1235,7 +1227,7 @@ impl Chunk { #[inline] pub fn row_ids(&self) -> impl Iterator + '_ { let (times, counters) = self.row_ids_raw(); - izip!(times.values().as_ref(), counters.values().as_slice()) + izip!(times.values(), counters.values()) .map(|(&time, &counter)| RowId::from_u128((time as u128) << 64 | (counter as u128))) } @@ -1277,7 +1269,7 @@ impl Chunk { } let (times, counters) = self.row_ids_raw(); - let (times, counters) = (times.values().as_ref(), counters.values().as_slice()); + let (times, counters) = (times.values(), counters.values()); #[allow(clippy::unwrap_used)] // checked above let (index_min, index_max) = if self.is_sorted() { @@ -1558,11 +1550,11 @@ impl Chunk { // Row IDs { - if *row_ids.data_type().to_logical_type() != RowId::arrow2_datatype() { + if *row_ids.data_type() != RowId::arrow_datatype() { return Err(ChunkError::Malformed { reason: format!( "RowId data has the wrong datatype: expected {:?} but got {:?} instead", - RowId::arrow2_datatype(), + RowId::arrow_datatype(), *row_ids.data_type(), ), }); diff --git a/crates/store/re_chunk/src/iter.rs b/crates/store/re_chunk/src/iter.rs index bd38875103c0..f86d28919172 100644 --- a/crates/store/re_chunk/src/iter.rs +++ b/crates/store/re_chunk/src/iter.rs @@ -683,8 +683,8 @@ impl Iterator for ChunkIndicesIter { let row_id = { let (times, incs) = self.chunk.row_ids_raw(); - let times = times.values().as_slice(); - let incs = incs.values().as_slice(); + let times = times.values(); + let incs = incs.values(); let time = *times.get(i)?; let inc = *incs.get(i)?; diff --git a/crates/store/re_chunk/src/merge.rs b/crates/store/re_chunk/src/merge.rs index 9b4433b1e975..5236b8a02813 100644 --- a/crates/store/re_chunk/src/merge.rs +++ b/crates/store/re_chunk/src/merge.rs @@ -1,12 +1,12 @@ +use arrow::array::StructArray as ArrowStructArray; use arrow::buffer::ScalarBuffer as ArrowScalarBuffer; -use arrow2::array::{ - Array as Arrow2Array, ListArray as Arrow2ListArray, StructArray as Arrow2StructArray, -}; +use arrow2::array::{Array as Arrow2Array, ListArray as Arrow2ListArray}; use itertools::{izip, Itertools}; use nohash_hasher::IntMap; use crate::{ - arrow2_util, chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, TimeColumn, + arrow2_util, arrow_util, chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, + TimeColumn, }; // --- @@ -48,12 +48,12 @@ impl Chunk { let row_ids = { re_tracing::profile_scope!("row_ids"); - let row_ids = arrow2_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?; + let row_ids = arrow_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?; #[allow(clippy::unwrap_used)] // concatenating 2 RowId arrays must yield another RowId array row_ids .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() .clone() }; diff --git a/crates/store/re_chunk/src/shuffle.rs b/crates/store/re_chunk/src/shuffle.rs index 86d48b35bdb2..4b15f425def6 100644 --- a/crates/store/re_chunk/src/shuffle.rs +++ b/crates/store/re_chunk/src/shuffle.rs @@ -1,9 +1,11 @@ -use arrow::buffer::ScalarBuffer as ArrowScalarBuffer; +use std::sync::Arc; + +use arrow::{ + array::{Array as _, StructArray as ArrowStructArray, UInt64Array as ArrowUInt64Array}, + buffer::ScalarBuffer as ArrowScalarBuffer, +}; use arrow2::{ - array::{ - Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, - StructArray, - }, + array::{Array as Arrow2Array, ListArray as Arrow2ListArray}, offset::Offsets as ArrowOffsets, }; use itertools::Itertools as _; @@ -213,14 +215,11 @@ impl Chunk { sorted_counters[to] = counters[from]; } - let times = Arrow2PrimitiveArray::::from_vec(sorted_times).boxed(); - let counters = Arrow2PrimitiveArray::::from_vec(sorted_counters).boxed(); + let times = Arc::new(ArrowUInt64Array::from(sorted_times)); + let counters = Arc::new(ArrowUInt64Array::from(sorted_counters)); - self.row_ids = StructArray::new( - self.row_ids.data_type().clone(), - vec![times, counters], - None, - ); + self.row_ids = + ArrowStructArray::new(self.row_ids.fields().clone(), vec![times, counters], None); } let Self { diff --git a/crates/store/re_chunk/src/slice.rs b/crates/store/re_chunk/src/slice.rs index 3b2f842cef92..782f97cacb0b 100644 --- a/crates/store/re_chunk/src/slice.rs +++ b/crates/store/re_chunk/src/slice.rs @@ -1,6 +1,5 @@ use arrow2::array::{ Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, ListArray as Arrow2ListArray, - StructArray as Arrow2StructArray, }; use itertools::Itertools; @@ -38,8 +37,8 @@ impl Chunk { let row_id_inc = (row_id_128 & (!0 >> 64)) as u64; let (times, incs) = self.row_ids_raw(); - let times = times.values().as_slice(); - let incs = incs.values().as_slice(); + let times = times.values(); + let incs = incs.values(); let mut index = times.partition_point(|&time| time < row_id_time_ns); while index < incs.len() && incs[index] < row_id_inc { @@ -102,7 +101,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: row_ids.clone().sliced(index, len), + row_ids: row_ids.clone().slice(index, len), timelines: timelines .iter() .map(|(timeline, time_column)| (*timeline, time_column.row_sliced(index, len))) @@ -369,7 +368,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: arrow2_util::filter_array(row_ids, &validity_filter), + row_ids: arrow_util::filter_array(row_ids, &validity_filter.clone().into()), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.filtered(&validity_filter))) @@ -450,7 +449,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted: true, - row_ids: Arrow2StructArray::new_empty(row_ids.data_type().clone()), + row_ids: arrow::array::StructBuilder::from_fields(row_ids.fields().clone(), 0).finish(), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.emptied())) @@ -546,7 +545,10 @@ impl Chunk { entity_path: self.entity_path.clone(), heap_size_bytes: Default::default(), is_sorted: self.is_sorted, - row_ids: arrow2_util::take_array(&self.row_ids, &indices), + row_ids: arrow_util::take_array( + &self.row_ids, + &arrow::array::Int32Array::from(indices.clone()), + ), timelines: self .timelines .iter() @@ -619,7 +621,7 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: arrow2_util::filter_array(row_ids, filter), + row_ids: arrow_util::filter_array(row_ids, &filter.clone().into()), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.filtered(filter))) @@ -699,7 +701,10 @@ impl Chunk { entity_path: entity_path.clone(), heap_size_bytes: Default::default(), is_sorted, - row_ids: arrow2_util::take_array(row_ids, indices), + row_ids: arrow_util::take_array( + row_ids, + &arrow::array::Int32Array::from(indices.clone()), + ), timelines: timelines .iter() .map(|(&timeline, time_column)| (timeline, time_column.taken(indices))) diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 69350f685d71..f8dcbcbcd2ad 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -1,6 +1,6 @@ -use arrow::array::ArrayRef as ArrowArrayRef; +use arrow::array::{ArrayRef as ArrowArrayRef, StructArray as ArrowStructArray}; use arrow2::{ - array::{Array as Arrow2Array, ListArray, StructArray as Arrow2StructArray}, + array::{Array as Arrow2Array, ListArray}, chunk::Chunk as Arrow2Chunk, datatypes::{ DataType as Arrow2Datatype, Field as ArrowField, Metadata as Arrow2Metadata, @@ -15,7 +15,10 @@ use re_log_types::{EntityPath, Timeline}; use re_types_core::{Component as _, ComponentDescriptor, Loggable as _}; use tap::Tap as _; -use crate::{chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, RowId, TimeColumn}; +use crate::{ + arrow_util::into_arrow_ref, chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, + RowId, TimeColumn, +}; // --- @@ -398,7 +401,8 @@ impl Chunk { } = self; let mut schema = Arrow2Schema::default(); - let mut columns = Vec::with_capacity(1 /* row_ids */ + timelines.len() + components.len()); + let mut columns: Vec> = + Vec::with_capacity(1 /* row_ids */ + timelines.len() + components.len()); // Chunk-level metadata { @@ -432,7 +436,7 @@ impl Chunk { schema.fields.push( ArrowField::new( RowId::descriptor().to_string(), - row_ids.data_type().clone(), + RowId::arrow_datatype().clone().into(), false, ) .with_metadata( @@ -445,7 +449,7 @@ impl Chunk { }), ), ); - columns.push(row_ids.clone().boxed()); + columns.push(into_arrow_ref(row_ids.clone()).into()); } // Timelines @@ -561,13 +565,13 @@ impl Chunk { }); }; - row_ids + ArrowArrayRef::from(row_ids.clone()) .as_any() - .downcast_ref::() + .downcast_ref::() .ok_or_else(|| ChunkError::Malformed { reason: format!( "RowId data has the wrong datatype: expected {:?} but got {:?} instead", - RowId::arrow2_datatype(), + RowId::arrow_datatype(), *row_ids.data_type(), ), })? diff --git a/crates/store/re_chunk_store/tests/snapshots/formatting__format_chunk_store.snap b/crates/store/re_chunk_store/tests/snapshots/formatting__format_chunk_store.snap index f8edb7be14e0..23ee30ee7353 100644 --- a/crates/store/re_chunk_store/tests/snapshots/formatting__format_chunk_store.snap +++ b/crates/store/re_chunk_store/tests/snapshots/formatting__format_chunk_store.snap @@ -1,13 +1,15 @@ --- source: crates/store/re_chunk_store/tests/formatting.rs +assertion_line: 45 expression: store.to_string() +snapshot_kind: text --- ChunkStore { id: test_id config: ChunkStoreConfig { enable_changelog: true, chunk_max_bytes: 393216, chunk_max_rows: 4096, chunk_max_rows_if_unsorted: 1024 } stats: { num_chunks: 1 - total_size_bytes: 1.0 KiB + total_size_bytes: 1.3 KiB num_rows: 1 num_events: 2 } @@ -15,7 +17,7 @@ ChunkStore { ┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ CHUNK METADATA: │ │ * entity_path: "/this/that" │ - │ * heap_size_bytes: "838" │ + │ * heap_size_bytes: "1134" │ │ * id: "661EFDF2E3B19F7C045F15" │ │ * is_sorted: "" │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index e998efc7aad0..92481b4c3958 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -969,8 +969,8 @@ impl QueryHandle { let cur_index_row_id_at = |at: usize| { let (times, incs) = cur_index_row_ids; - let times = times.values().as_slice(); - let incs = incs.values().as_slice(); + let times = times.values(); + let incs = incs.values(); let time = *times.get(at)?; let inc = *incs.get(at)?; diff --git a/crates/store/re_types_core/src/lib.rs b/crates/store/re_types_core/src/lib.rs index c927f0197538..d8e4e38d104d 100644 --- a/crates/store/re_types_core/src/lib.rs +++ b/crates/store/re_types_core/src/lib.rs @@ -58,6 +58,7 @@ pub use self::{ DeserializationError, DeserializationResult, ResultExt, SerializationError, SerializationResult, _Backtrace, }, + tuid::tuid_arrow_fields, view::{View, ViewClassIdentifier}, }; diff --git a/crates/store/re_types_core/src/tuid.rs b/crates/store/re_types_core/src/tuid.rs index 622805a8e353..1bcbd9d6e22a 100644 --- a/crates/store/re_types_core/src/tuid.rs +++ b/crates/store/re_types_core/src/tuid.rs @@ -11,13 +11,18 @@ use crate::{DeserializationError, Loggable}; // --- +// TODO(emilk): This is a bit ugly… but good enough for now? +pub fn tuid_arrow_fields() -> Fields { + Fields::from(vec![ + Field::new("time_ns", DataType::UInt64, false), + Field::new("inc", DataType::UInt64, false), + ]) +} + impl Loggable for Tuid { #[inline] fn arrow_datatype() -> arrow::datatypes::DataType { - DataType::Struct(Fields::from(vec![ - Field::new("time_ns", DataType::UInt64, false), - Field::new("inc", DataType::UInt64, false), - ])) + DataType::Struct(tuid_arrow_fields()) } fn to_arrow_opt<'a>( diff --git a/crates/utils/re_byte_size/src/arrow_sizes.rs b/crates/utils/re_byte_size/src/arrow_sizes.rs index 7f8cba0f76dc..862af2b8b583 100644 --- a/crates/utils/re_byte_size/src/arrow_sizes.rs +++ b/crates/utils/re_byte_size/src/arrow_sizes.rs @@ -1,5 +1,5 @@ use arrow::{ - array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray}, + array::{Array, ArrayRef}, buffer::ScalarBuffer, datatypes::ArrowNativeType, }; @@ -13,17 +13,17 @@ impl SizeBytes for dyn Array { } } -impl SizeBytes for ArrayRef { +impl SizeBytes for &T { #[inline] fn heap_size_bytes(&self) -> u64 { self.get_array_memory_size() as u64 } } -impl SizeBytes for PrimitiveArray { +impl SizeBytes for ArrayRef { #[inline] fn heap_size_bytes(&self) -> u64 { - Array::get_array_memory_size(self) as u64 + self.get_array_memory_size() as u64 } }