Skip to content

Commit

Permalink
change hive to dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Jan 27, 2025
1 parent f67b6e4 commit 110b89a
Show file tree
Hide file tree
Showing 18 changed files with 82 additions and 266 deletions.
70 changes: 29 additions & 41 deletions crates/polars-mem-engine/src/executors/multi_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ fn source_to_exec(
source,
file_info,
None,
None,
options,
cloud_options.clone(),
file_options.clone(),
Expand Down Expand Up @@ -184,7 +183,6 @@ fn source_to_exec(
options,
file_options,
predicate: None,
hive_parts: None,
cloud_options,
metadata: metadata.cloned(),
})
Expand Down Expand Up @@ -215,7 +213,7 @@ fn source_to_exec(
pub struct MultiScanExec {
sources: ScanSources,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
hive_parts: Option<Arc<HivePartitions>>,
predicate: Option<ScanPredicate>,
file_options: FileScanOptions,
scan_type: FileScan,
Expand All @@ -225,7 +223,7 @@ impl MultiScanExec {
pub fn new(
sources: ScanSources,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
hive_parts: Option<Arc<HivePartitions>>,
predicate: Option<ScanPredicate>,
file_options: FileScanOptions,
scan_type: FileScan,
Expand Down Expand Up @@ -276,25 +274,16 @@ impl MultiScanExec {
let predicate = self.predicate.take();

// Create a index set of the hive columns.
let mut hive_column_set = PlIndexSet::default();
let mut hive_schema = SchemaRef::default();
if let Some(hive_parts) = &self.hive_parts {
assert_eq!(self.sources.len(), hive_parts.len());

if let Some(fst_hive_part) = hive_parts.first() {
hive_column_set.extend(
fst_hive_part
.get_statistics()
.column_stats()
.iter()
.map(|c| c.field_name().clone()),
);
}
assert_eq!(self.sources.len(), hive_parts.0.height());
hive_schema = hive_parts.0.schema().clone();
}

// Look through the predicate and assess whether hive columns are being used in it.
let mut has_live_hive_columns = false;
if let Some(predicate) = &predicate {
for hive_column in &hive_column_set {
for hive_column in hive_schema.iter_names() {
has_live_hive_columns |= predicate.live_columns.contains(hive_column);
}
}
Expand Down Expand Up @@ -336,7 +325,7 @@ impl MultiScanExec {
file_with_columns = Some(
with_columns
.iter()
.filter(|&c| !hive_column_set.contains(c))
.filter(|&c| !hive_schema.contains(c))
.cloned()
.collect(),
);
Expand All @@ -360,7 +349,7 @@ impl MultiScanExec {
};

for (i, source) in self.sources.iter().enumerate() {
let hive_part = self.hive_parts.as_ref().and_then(|h| h.get(i));
let hive_part = self.hive_parts.as_ref().and_then(|h| h.0.get_row(i).ok());
if slice.is_some_and(|s| s.1 == 0) {
break;
}
Expand Down Expand Up @@ -403,21 +392,20 @@ impl MultiScanExec {
// to function even when there is a combination of hive and non-hive columns being
// used.
if has_live_hive_columns {
let hive_part = hive_part.unwrap();
file_predicate = Some(file_predicate.unwrap().with_constant_columns(
hive_column_set.iter().enumerate().map(|(idx, column)| {
let series = hive_part.get_statistics().column_stats()[idx]
.to_min()
.unwrap();
(
column.clone(),
Scalar::new(
series.dtype().clone(),
series.get(0).unwrap().into_static(),
),
)
}),
));
let hive_part = hive_part.as_ref().unwrap();
file_predicate = Some(
file_predicate.unwrap().with_constant_columns(
hive_schema
.iter()
.zip(hive_part.0.iter())
.map(|((column, dtype), value)| {
(
column.clone(),
Scalar::new(dtype.clone(), value.clone().into_static()),
)
}),
),
);
}

let skip_batch_predicate = file_predicate
Expand Down Expand Up @@ -530,14 +518,14 @@ impl MultiScanExec {
}
// Materialize the hive columns and add them back in.
if let Some(hive_part) = hive_part {
for hive_col in hive_part.get_statistics().column_stats() {
for ((column, dtype), value) in hive_schema.iter().zip(hive_part.0) {
df.with_column(
ScalarColumn::from_single_value_series(
hive_col
.to_min()
.unwrap()
.clone()
.with_name(hive_col.field_name().clone()),
ScalarColumn::new(
column.clone(),
Scalar::new(
dtype.clone(),
value.into_static(),
),
df.height(),
)
.into_column(),
Expand Down
7 changes: 0 additions & 7 deletions crates/polars-mem-engine/src/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use hive::HivePartitions;
use polars_core::config;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_error::feature_gated;
Expand All @@ -19,7 +18,6 @@ pub struct IpcExec {
#[allow(dead_code)]
pub(crate) options: IpcScanOptions,
pub(crate) file_options: FileScanOptions,
pub(crate) hive_parts: Option<Arc<Vec<HivePartitions>>>,
pub(crate) cloud_options: Option<CloudOptions>,
pub(crate) metadata: Option<Arc<arrow::io::ipc::read::FileMetadata>>,
}
Expand Down Expand Up @@ -94,11 +92,6 @@ impl IpcExec {
.with_n_rows(n_rows)
.with_row_index(self.file_options.row_index.clone())
.with_projection(projection.clone())
.with_hive_partition_columns(
self.hive_parts
.as_ref()
.map(|x| x[index].materialize_partition_columns()),
)
.with_include_file_path(
self.file_options
.include_file_paths
Expand Down
16 changes: 0 additions & 16 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use hive::HivePartitions;
use polars_core::config;
#[cfg(feature = "cloud")]
use polars_core::config::{get_file_prefetch_size, verbose};
Expand All @@ -17,8 +16,6 @@ pub struct ParquetExec {
sources: ScanSources,
file_info: FileInfo,

hive_parts: Option<Arc<Vec<HivePartitions>>>,

predicate: Option<ScanPredicate>,
skip_batch_predicate: Option<Arc<dyn SkipBatchPredicate>>,

Expand All @@ -35,7 +32,6 @@ impl ParquetExec {
pub(crate) fn new(
sources: ScanSources,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
predicate: Option<ScanPredicate>,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
Expand All @@ -46,8 +42,6 @@ impl ParquetExec {
sources,
file_info,

hive_parts,

predicate,
skip_batch_predicate: None,

Expand Down Expand Up @@ -177,10 +171,6 @@ impl ParquetExec {
// files in parallel even if we add row index columns or slices.
let iter = (i..end).into_par_iter().map(|i| {
let source = self.sources.at(i);
let hive_partitions = self
.hive_parts
.as_ref()
.map(|x| x[i].materialize_partition_columns());

let memslice = source.to_memslice()?;

Expand All @@ -197,7 +187,6 @@ impl ParquetExec {
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
.set_rechunk(false)
.with_hive_partition_columns(hive_partitions)
.with_include_file_path(
self.file_options
.include_file_paths
Expand Down Expand Up @@ -384,7 +373,6 @@ impl ParquetExec {
for batch_start in (first_file_idx..paths.len()).step_by(batch_size) {
let end = std::cmp::min(batch_start.saturating_add(batch_size), paths.len());
let paths = &paths[batch_start..end];
let hive_parts = self.hive_parts.as_ref().map(|x| &x[batch_start..end]);

if current_offset >= slice_end && !result.is_empty() {
return Ok(result);
Expand Down Expand Up @@ -452,9 +440,6 @@ impl ParquetExec {
let projected_arrow_schema = projected_arrow_schema.clone();
let predicate = predicate.clone();
let (cumulative_read, slice) = row_statistics[i];
let hive_partitions = hive_parts
.as_ref()
.map(|x| x[i].materialize_partition_columns());

async move {
let row_index = base_row_index_ref.as_ref().map(|rc| RowIndex {
Expand All @@ -474,7 +459,6 @@ impl ParquetExec {
.use_statistics(use_statistics)
.with_predicate(predicate)
.set_rechunk(false)
.with_hive_partition_columns(hive_partitions)
.with_include_file_path(
include_file_paths
.map(|x| (x.clone(), Arc::from(paths[i].to_str().unwrap()))),
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ fn create_physical_plan_impl(

let mut state = ExpressionConversionState::new(true, state.expr_depth);
let do_new_multifile = (sources.len() > 1 || hive_parts.is_some())
&& !matches!(scan_type, FileScan::Anonymous { .. })
&& std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1");
&& !matches!(scan_type, FileScan::Anonymous { .. });

let mut create_skip_batch_predicate = false;
create_skip_batch_predicate |= do_new_multifile;
Expand Down Expand Up @@ -269,7 +268,6 @@ fn create_physical_plan_impl(
predicate,
options,
file_options,
hive_parts,
cloud_options,
metadata,
})),
Expand All @@ -281,7 +279,6 @@ fn create_physical_plan_impl(
} => Ok(Box::new(executors::ParquetExec::new(
sources,
file_info,
hive_parts,
predicate,
options,
cloud_options,
Expand Down
14 changes: 8 additions & 6 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct ParquetSource {
#[allow(dead_code)]
cloud_options: Option<CloudOptions>,
first_metadata: Option<FileMetadataRef>,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
hive_parts: Option<Arc<HivePartitions>>,
verbose: bool,
run_async: bool,
prefetch_size: usize,
Expand Down Expand Up @@ -84,10 +84,12 @@ impl ParquetSource {
let options = self.options.clone();
let file_options = self.file_options.clone();

let hive_partitions = self
.hive_parts
.as_ref()
.map(|x| x[index].materialize_partition_columns());
let hive_partitions = self.hive_parts.as_deref().map(|x| {
x.0.slice(index as i64, 1)
.materialized_column_iter()
.cloned()
.collect()
});

let chunk_size = determine_chunk_size(
self.projected_arrow_schema
Expand Down Expand Up @@ -250,7 +252,7 @@ impl ParquetSource {
first_metadata: Option<FileMetadataRef>,
file_options: FileScanOptions,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
hive_parts: Option<Arc<HivePartitions>>,
verbose: bool,
predicate: Option<ScanIOPredicate>,
) -> PolarsResult<Self> {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
};

if let Some(ref hive_parts) = hive_parts {
let hive_schema = hive_parts[0].schema();
let hive_schema = hive_parts.schema();
file_info.update_schema_with_hive_schema(hive_schema.clone());
} else if let Some(hive_schema) = file_options.hive_options.schema.clone() {
// We hit here if we are passed the `hive_schema` to `scan_parquet` but end up with an empty file
Expand Down Expand Up @@ -323,7 +323,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
file_options.with_columns = if file_info.reader_schema.is_some() {
maybe_init_projection_excluding_hive(
file_info.reader_schema.as_ref().unwrap(),
hive_parts.as_ref().map(|x| &x[0]),
hive_parts.as_deref(),
)
} else {
None
Expand Down
14 changes: 0 additions & 14 deletions crates/polars-plan/src/plans/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,6 @@ impl Hash for FileScan {
}

impl FileScan {
pub(crate) fn remove_metadata(&mut self) {
match self {
#[cfg(feature = "parquet")]
Self::Parquet { metadata, .. } => {
*metadata = None;
},
#[cfg(feature = "ipc")]
Self::Ipc { metadata, .. } => {
*metadata = None;
},
_ => {},
}
}

pub(crate) fn sort_projection(&self, _file_options: &FileScanOptions) -> bool {
match self {
#[cfg(feature = "csv")]
Expand Down
Loading

0 comments on commit 110b89a

Please sign in to comment.