Skip to content

Commit

Permalink
refactor: clean code in LevelStream
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 17, 2024
1 parent 55455af commit fc4e324
Showing 1 changed file with 46 additions and 54 deletions.
100 changes: 46 additions & 54 deletions src/stream/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use futures_core::Stream;
use pin_project_lite::pin_project;
use parquet::errors::ParquetError;

use crate::{
executor::Executor,
Expand All @@ -25,73 +25,65 @@ where
R: Record,
E: Executor,
{
Ready(Pin<Box<SsTableScan<R, E>>>),
Ready(SsTableScan<R, E>),
OpenFile(Pin<Box<dyn Future<Output = io::Result<E::File>> + 'level>>),
LoadStream(
Pin<
Box<
dyn Future<Output = Result<SsTableScan<R, E>, parquet::errors::ParquetError>>
+ 'level,
>,
>,
),
LoadStream(Pin<Box<dyn Future<Output = Result<SsTableScan<R, E>, ParquetError>> + 'level>>),
}

pin_project! {
pub(crate) struct LevelStream<'level, R, E>
where
R: Record,
E: Executor,
{
lower: Bound<&'level R::Key>,
upper: Bound<&'level R::Key>,
ts: Timestamp,
option: Arc<DbOption>,
gens: VecDeque<FileId>,
statue: FutureStatus<'level, R, E>,
}
pub(crate) struct LevelStream<'level, R, E>
where
R: Record,
E: Executor,
{
lower: Bound<&'level R::Key>,
upper: Bound<&'level R::Key>,
ts: Timestamp,
option: Arc<DbOption>,
gens: VecDeque<FileId>,
statue: FutureStatus<'level, R, E>,
}

impl<'level, R, E> Stream for LevelStream<'level, R, E>
where
R: Record,
E: Executor + 'level,
{
type Item = Result<RecordBatchEntry<R>, parquet::errors::ParquetError>;
type Item = Result<RecordBatchEntry<R>, ParquetError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut self.statue {
FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) {
Poll::Ready(None) => match self.gens.pop_front() {
None => Poll::Ready(None),
Some(gen) => {
self.statue =
FutureStatus::OpenFile(Box::pin(E::open(self.option.table_path(&gen))));
self.poll_next(cx)
loop {
return match &mut self.statue {
FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) {
Poll::Ready(None) => match self.gens.pop_front() {
None => Poll::Ready(None),
Some(gen) => {
self.statue = FutureStatus::OpenFile(Box::pin(E::open(
self.option.table_path(&gen),
)));
continue;
}
},
poll => poll,
},
FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) {
Poll::Ready(Ok(file)) => {
self.statue = FutureStatus::LoadStream(Box::pin(
SsTable::open(file).scan((self.lower, self.upper), self.ts),
));
continue;
}
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(ParquetError::from(err)))),
Poll::Pending => Poll::Pending,
},
FutureStatus::LoadStream(stream_future) => match Pin::new(stream_future).poll(cx) {
Poll::Ready(Ok(scan)) => {
self.statue = FutureStatus::Ready(scan);
continue;
}
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
Poll::Pending => Poll::Pending,
},
poll => poll,
},
FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) {
Poll::Ready(Ok(file)) => {
self.statue = FutureStatus::LoadStream(Box::pin(
SsTable::open(file).scan((self.lower, self.upper), self.ts),
));
self.poll_next(cx)
}
Poll::Ready(Err(err)) => {
Poll::Ready(Some(Err(parquet::errors::ParquetError::from(err))))
}
Poll::Pending => Poll::Pending,
},
FutureStatus::LoadStream(stream_future) => match Pin::new(stream_future).poll(cx) {
Poll::Ready(Ok(scan)) => {
self.statue = FutureStatus::Ready(Box::pin(scan));
self.poll_next(cx)
}
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
Poll::Pending => Poll::Pending,
},
};
}
}
}
Expand Down

0 comments on commit fc4e324

Please sign in to comment.