Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: ensure safety condition of unsafe blocks #16

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/arrows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ where
let key = match range {
Bound::Included(key) => {
cmp = &gt_eq;
Some(unsafe { &*(key as *const _) })
Some(&*(key as *const _))
}
Bound::Excluded(key) => {
cmp = >
Some(unsafe { &*(key as *const _) })
Some(&*(key as *const _))
}
Bound::Unbounded => {
cmp = &|this, _| {
Expand Down
5 changes: 4 additions & 1 deletion src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ where
version_edits: &mut Vec<VersionEdit<<R as Record>::Key>>,
level: usize,
streams: Vec<ScanStream<'scan, R, FP>>,
) -> Result<(), CompactionError<R>> {
) -> Result<(), CompactionError<R>>
where
FP: 'scan,
{
let mut stream = MergeStream::<R, FP>::from_vec(streams).await?;

// Kould: is the capacity parameter necessary?
Expand Down
18 changes: 9 additions & 9 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use super::mutable::Mutable;
use crate::{
record::{internal::InternalRecordRef, Key, Record, RecordRef},
stream::record_batch::RecordBatchEntry,
timestamp::{
timestamped::{Timestamped, TimestampedRef},
Timestamp, EPOCH,
},
timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH},
};

pub trait ArrowArrays: Sized {
Expand Down Expand Up @@ -154,10 +151,13 @@ where
self.range.next().map(|(_, &offset)| {
let record_ref = R::Ref::from_record_batch(self.record_batch, offset as usize);
// TODO: remove cloning record batch
RecordBatchEntry::new(self.record_batch.clone(), unsafe {
transmute::<InternalRecordRef<R::Ref<'_>>, InternalRecordRef<R::Ref<'static>>>(
record_ref,
)
RecordBatchEntry::new(self.record_batch.clone(), {
// Safety: record_ref self-references the record batch
unsafe {
transmute::<InternalRecordRef<R::Ref<'_>>, InternalRecordRef<R::Ref<'static>>>(
record_ref,
)
}
})
})
}
Expand All @@ -179,7 +179,7 @@ pub(crate) mod tests {
use crate::{
record::Record,
tests::{Test, TestRef},
timestamp::Timestamped,
timestamp::timestamped::Timestamped,
};

#[derive(Debug)]
Expand Down
14 changes: 10 additions & 4 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
Expand All @@ -16,26 +17,31 @@ use crate::{

pin_project! {
#[derive(Debug)]
pub struct SsTableScan<R, FP>
pub struct SsTableScan<'scan, R, FP>
where
FP: FileProvider,
{
#[pin]
stream: ParquetRecordBatchStream<Compat<FP::File>>,
iter: Option<RecordBatchIterator<R>>,
_marker: PhantomData<&'scan ()>
}
}

impl<R, FP> SsTableScan<R, FP>
impl<R, FP> SsTableScan<'_, R, FP>
where
FP: FileProvider,
{
pub fn new(stream: ParquetRecordBatchStream<Compat<FP::File>>) -> Self {
SsTableScan { stream, iter: None }
SsTableScan {
stream,
iter: None,
_marker: PhantomData,
}
}
}

impl<R, FP> Stream for SsTableScan<R, FP>
impl<'scan, R, FP> Stream for SsTableScan<'scan, R, FP>
where
R: Record,
FP: FileProvider,
Expand Down
3 changes: 3 additions & 0 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ where
let builder = self.into_parquet_builder(limit).await?;

let schema_descriptor = builder.metadata().file_metadata().schema_descr();

// Safety: filter's lifetime relies on range's lifetime, sstable must not live longer than
// it
let filter = unsafe { get_range_filter::<R>(schema_descriptor, range, ts) };

Ok(SsTableScan::new(builder.with_row_filter(filter).build()?))
Expand Down
6 changes: 4 additions & 2 deletions src/stream/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ where
FP: FileProvider,
{
Init(FileId),
Ready(SsTableScan<R, FP>),
Ready(SsTableScan<'level, R, FP>),
OpenFile(Pin<Box<dyn Future<Output = io::Result<FP::File>> + 'level>>),
LoadStream(Pin<Box<dyn Future<Output = Result<SsTableScan<R, FP>, ParquetError>> + 'level>>),
LoadStream(
Pin<Box<dyn Future<Output = Result<SsTableScan<'level, R, FP>, ParquetError>> + 'level>>,
),
}

pub(crate) struct LevelStream<'level, R, FP>
Expand Down
13 changes: 7 additions & 6 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ where
{
pub(crate) fn key(&self) -> Timestamped<<R::Key as Key>::Ref<'_>> {
match self {
Entry::Mutable(entry) => entry
.key()
.map(|key| unsafe { transmute(key.as_key_ref()) }),
Entry::Mutable(entry) => entry.key().map(|key| {
// Safety: shorter lifetime must be safe
unsafe { transmute(key.as_key_ref()) }
}),
Entry::SsTable(entry) => entry.internal_key(),
Entry::Immutable(entry) => entry.internal_key(),
Entry::Level(entry) => entry.internal_key(),
Expand Down Expand Up @@ -95,7 +96,7 @@ pin_project! {
},
SsTable {
#[pin]
inner: SsTableScan<R, FP>,
inner: SsTableScan<'scan, R, FP>,
},
Level {
#[pin]
Expand Down Expand Up @@ -128,12 +129,12 @@ where
}
}

impl<'scan, R, FP> From<SsTableScan<R, FP>> for ScanStream<'scan, R, FP>
impl<'scan, R, FP> From<SsTableScan<'scan, R, FP>> for ScanStream<'scan, R, FP>
where
R: Record,
FP: FileProvider,
{
fn from(inner: SsTableScan<R, FP>) -> Self {
fn from(inner: SsTableScan<'scan, R, FP>) -> Self {
ScanStream::SsTable { inner }
}
}
Expand Down
32 changes: 7 additions & 25 deletions src/timestamp/timestamped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,7 @@ where
V: PartialEq,
{
fn eq(&self, other: &Self) -> bool {
unsafe {
let this = transmute::<&TimestampedRef<V>, [usize; 2]>(self);
let other = transmute::<&TimestampedRef<V>, [usize; 2]>(other);
let this_value = transmute::<usize, &V>(this[0]);
let other_value = transmute::<usize, &V>(other[0]);
this_value == other_value && this[1] == other[1]
}
self.value() == other.value() && self.ts() == other.ts()
}
}

Expand All @@ -121,15 +115,9 @@ where
V: PartialOrd,
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
unsafe {
let this = transmute::<&TimestampedRef<V>, [usize; 2]>(self);
let other = transmute::<&TimestampedRef<V>, [usize; 2]>(other);
let this_value = transmute::<usize, &V>(this[0]);
let other_value = transmute::<usize, &V>(other[0]);
this_value
.partial_cmp(other_value)
.map(|ordering| ordering.then_with(|| other[1].cmp(&this[1])))
}
self.value()
.partial_cmp(other.value())
.map(|ordering| ordering.then_with(|| other.ts().cmp(&self.ts())))
}
}

Expand All @@ -138,15 +126,9 @@ where
K: Ord,
{
fn cmp(&self, other: &Self) -> Ordering {
unsafe {
let this = transmute::<&TimestampedRef<K>, [usize; 2]>(self);
let other = transmute::<&TimestampedRef<K>, [usize; 2]>(other);
let this_value = transmute::<usize, &K>(this[0]);
let other_value = transmute::<usize, &K>(other[0]);
this_value
.cmp(other_value)
.then_with(|| other[1].cmp(&this[1]))
}
self.value()
.cmp(other.value())
.then_with(|| other.ts().cmp(&self.ts()))
}
}

Expand Down
Loading