From 110b89a6fe794de5de9c997f78016f5a7d0e474b Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Mon, 27 Jan 2025 13:01:09 +0100 Subject: [PATCH] change hive to dataframe --- .../src/executors/multi_file_scan.rs | 70 ++++++-------- .../src/executors/scan/ipc.rs | 7 -- .../src/executors/scan/parquet.rs | 16 ---- crates/polars-mem-engine/src/planner/lp.rs | 5 +- .../src/executors/sources/parquet.rs | 14 +-- .../src/plans/conversion/dsl_to_ir.rs | 4 +- crates/polars-plan/src/plans/file_scan.rs | 14 --- crates/polars-plan/src/plans/hive.rs | 91 +++++-------------- crates/polars-plan/src/plans/ir/mod.rs | 2 +- .../plans/optimizer/predicate_pushdown/mod.rs | 56 +----------- .../optimizer/projection_pushdown/mod.rs | 30 +++--- .../src/nodes/io_sources/parquet/init.rs | 7 -- .../src/nodes/io_sources/parquet/mod.rs | 3 - .../io_sources/parquet/row_group_decode.rs | 17 +--- crates/polars-stream/src/physical_plan/fmt.rs | 8 -- .../src/physical_plan/lower_ir.rs | 1 - crates/polars-stream/src/physical_plan/mod.rs | 2 - .../src/physical_plan/to_graph.rs | 1 - 18 files changed, 82 insertions(+), 266 deletions(-) diff --git a/crates/polars-mem-engine/src/executors/multi_file_scan.rs b/crates/polars-mem-engine/src/executors/multi_file_scan.rs index 208829e35614..6a92c6581b95 100644 --- a/crates/polars-mem-engine/src/executors/multi_file_scan.rs +++ b/crates/polars-mem-engine/src/executors/multi_file_scan.rs @@ -139,7 +139,6 @@ fn source_to_exec( source, file_info, None, - None, options, cloud_options.clone(), file_options.clone(), @@ -184,7 +183,6 @@ fn source_to_exec( options, file_options, predicate: None, - hive_parts: None, cloud_options, metadata: metadata.cloned(), }) @@ -215,7 +213,7 @@ fn source_to_exec( pub struct MultiScanExec { sources: ScanSources, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option, file_options: FileScanOptions, scan_type: FileScan, @@ -225,7 +223,7 @@ impl MultiScanExec { pub fn new( sources: ScanSources, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option, file_options: FileScanOptions, scan_type: FileScan, @@ -276,25 +274,16 @@ impl MultiScanExec { let predicate = self.predicate.take(); // Create a index set of the hive columns. - let mut hive_column_set = PlIndexSet::default(); + let mut hive_schema = SchemaRef::default(); if let Some(hive_parts) = &self.hive_parts { - assert_eq!(self.sources.len(), hive_parts.len()); - - if let Some(fst_hive_part) = hive_parts.first() { - hive_column_set.extend( - fst_hive_part - .get_statistics() - .column_stats() - .iter() - .map(|c| c.field_name().clone()), - ); - } + assert_eq!(self.sources.len(), hive_parts.0.height()); + hive_schema = hive_parts.0.schema().clone(); } // Look through the predicate and assess whether hive columns are being used in it. let mut has_live_hive_columns = false; if let Some(predicate) = &predicate { - for hive_column in &hive_column_set { + for hive_column in hive_schema.iter_names() { has_live_hive_columns |= predicate.live_columns.contains(hive_column); } } @@ -336,7 +325,7 @@ impl MultiScanExec { file_with_columns = Some( with_columns .iter() - .filter(|&c| !hive_column_set.contains(c)) + .filter(|&c| !hive_schema.contains(c)) .cloned() .collect(), ); @@ -360,7 +349,7 @@ impl MultiScanExec { }; for (i, source) in self.sources.iter().enumerate() { - let hive_part = self.hive_parts.as_ref().and_then(|h| h.get(i)); + let hive_part = self.hive_parts.as_ref().and_then(|h| h.0.get_row(i).ok()); if slice.is_some_and(|s| s.1 == 0) { break; } @@ -403,21 +392,20 @@ impl MultiScanExec { // to function even when there is a combination of hive and non-hive columns being // used. if has_live_hive_columns { - let hive_part = hive_part.unwrap(); - file_predicate = Some(file_predicate.unwrap().with_constant_columns( - hive_column_set.iter().enumerate().map(|(idx, column)| { - let series = hive_part.get_statistics().column_stats()[idx] - .to_min() - .unwrap(); - ( - column.clone(), - Scalar::new( - series.dtype().clone(), - series.get(0).unwrap().into_static(), - ), - ) - }), - )); + let hive_part = hive_part.as_ref().unwrap(); + file_predicate = Some( + file_predicate.unwrap().with_constant_columns( + hive_schema + .iter() + .zip(hive_part.0.iter()) + .map(|((column, dtype), value)| { + ( + column.clone(), + Scalar::new(dtype.clone(), value.clone().into_static()), + ) + }), + ), + ); } let skip_batch_predicate = file_predicate @@ -530,14 +518,14 @@ impl MultiScanExec { } // Materialize the hive columns and add them back in. if let Some(hive_part) = hive_part { - for hive_col in hive_part.get_statistics().column_stats() { + for ((column, dtype), value) in hive_schema.iter().zip(hive_part.0) { df.with_column( - ScalarColumn::from_single_value_series( - hive_col - .to_min() - .unwrap() - .clone() - .with_name(hive_col.field_name().clone()), + ScalarColumn::new( + column.clone(), + Scalar::new( + dtype.clone(), + value.into_static(), + ), df.height(), ) .into_column(), diff --git a/crates/polars-mem-engine/src/executors/scan/ipc.rs b/crates/polars-mem-engine/src/executors/scan/ipc.rs index 3468144ea22c..85ed575f9465 100644 --- a/crates/polars-mem-engine/src/executors/scan/ipc.rs +++ b/crates/polars-mem-engine/src/executors/scan/ipc.rs @@ -1,4 +1,3 @@ -use hive::HivePartitions; use polars_core::config; use polars_core::utils::accumulate_dataframes_vertical; use polars_error::feature_gated; @@ -19,7 +18,6 @@ pub struct IpcExec { #[allow(dead_code)] pub(crate) options: IpcScanOptions, pub(crate) file_options: FileScanOptions, - pub(crate) hive_parts: Option>>, pub(crate) cloud_options: Option, pub(crate) metadata: Option>, } @@ -94,11 +92,6 @@ impl IpcExec { .with_n_rows(n_rows) .with_row_index(self.file_options.row_index.clone()) .with_projection(projection.clone()) - .with_hive_partition_columns( - self.hive_parts - .as_ref() - .map(|x| x[index].materialize_partition_columns()), - ) .with_include_file_path( self.file_options .include_file_paths diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 89502cf34017..ad9afad7378e 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -1,4 +1,3 @@ -use hive::HivePartitions; use polars_core::config; #[cfg(feature = "cloud")] use polars_core::config::{get_file_prefetch_size, verbose}; @@ -17,8 +16,6 @@ pub struct ParquetExec { sources: ScanSources, file_info: FileInfo, - hive_parts: Option>>, - predicate: Option, skip_batch_predicate: Option>, @@ -35,7 +32,6 @@ impl ParquetExec { pub(crate) fn new( sources: ScanSources, file_info: FileInfo, - hive_parts: Option>>, predicate: Option, options: ParquetOptions, cloud_options: Option, @@ -46,8 +42,6 @@ impl ParquetExec { sources, file_info, - hive_parts, - predicate, skip_batch_predicate: None, @@ -177,10 +171,6 @@ impl ParquetExec { // files in parallel even if we add row index columns or slices. let iter = (i..end).into_par_iter().map(|i| { let source = self.sources.at(i); - let hive_partitions = self - .hive_parts - .as_ref() - .map(|x| x[i].materialize_partition_columns()); let memslice = source.to_memslice()?; @@ -197,7 +187,6 @@ impl ParquetExec { .set_low_memory(self.options.low_memory) .use_statistics(self.options.use_statistics) .set_rechunk(false) - .with_hive_partition_columns(hive_partitions) .with_include_file_path( self.file_options .include_file_paths @@ -384,7 +373,6 @@ impl ParquetExec { for batch_start in (first_file_idx..paths.len()).step_by(batch_size) { let end = std::cmp::min(batch_start.saturating_add(batch_size), paths.len()); let paths = &paths[batch_start..end]; - let hive_parts = self.hive_parts.as_ref().map(|x| &x[batch_start..end]); if current_offset >= slice_end && !result.is_empty() { return Ok(result); @@ -452,9 +440,6 @@ impl ParquetExec { let projected_arrow_schema = projected_arrow_schema.clone(); let predicate = predicate.clone(); let (cumulative_read, slice) = row_statistics[i]; - let hive_partitions = hive_parts - .as_ref() - .map(|x| x[i].materialize_partition_columns()); async move { let row_index = base_row_index_ref.as_ref().map(|rc| RowIndex { @@ -474,7 +459,6 @@ impl ParquetExec { .use_statistics(use_statistics) .with_predicate(predicate) .set_rechunk(false) - .with_hive_partition_columns(hive_partitions) .with_include_file_path( include_file_paths .map(|x| (x.clone(), Arc::from(paths[i].to_str().unwrap()))), diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 05480427a341..632cf739b159 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -207,8 +207,7 @@ fn create_physical_plan_impl( let mut state = ExpressionConversionState::new(true, state.expr_depth); let do_new_multifile = (sources.len() > 1 || hive_parts.is_some()) - && !matches!(scan_type, FileScan::Anonymous { .. }) - && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1"); + && !matches!(scan_type, FileScan::Anonymous { .. }); let mut create_skip_batch_predicate = false; create_skip_batch_predicate |= do_new_multifile; @@ -269,7 +268,6 @@ fn create_physical_plan_impl( predicate, options, file_options, - hive_parts, cloud_options, metadata, })), @@ -281,7 +279,6 @@ fn create_physical_plan_impl( } => Ok(Box::new(executors::ParquetExec::new( sources, file_info, - hive_parts, predicate, options, cloud_options, diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 0be39c4a876e..66688ff86164 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -42,7 +42,7 @@ pub struct ParquetSource { #[allow(dead_code)] cloud_options: Option, first_metadata: Option, - hive_parts: Option>>, + hive_parts: Option>, verbose: bool, run_async: bool, prefetch_size: usize, @@ -84,10 +84,12 @@ impl ParquetSource { let options = self.options.clone(); let file_options = self.file_options.clone(); - let hive_partitions = self - .hive_parts - .as_ref() - .map(|x| x[index].materialize_partition_columns()); + let hive_partitions = self.hive_parts.as_deref().map(|x| { + x.0.slice(index as i64, 1) + .materialized_column_iter() + .cloned() + .collect() + }); let chunk_size = determine_chunk_size( self.projected_arrow_schema @@ -250,7 +252,7 @@ impl ParquetSource { first_metadata: Option, file_options: FileScanOptions, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, verbose: bool, predicate: Option, ) -> PolarsResult { diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index a2e8003dc902..7d2bba1f5d32 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -281,7 +281,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult }; if let Some(ref hive_parts) = hive_parts { - let hive_schema = hive_parts[0].schema(); + let hive_schema = hive_parts.schema(); file_info.update_schema_with_hive_schema(hive_schema.clone()); } else if let Some(hive_schema) = file_options.hive_options.schema.clone() { // We hit here if we are passed the `hive_schema` to `scan_parquet` but end up with an empty file @@ -323,7 +323,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult file_options.with_columns = if file_info.reader_schema.is_some() { maybe_init_projection_excluding_hive( file_info.reader_schema.as_ref().unwrap(), - hive_parts.as_ref().map(|x| &x[0]), + hive_parts.as_deref(), ) } else { None diff --git a/crates/polars-plan/src/plans/file_scan.rs b/crates/polars-plan/src/plans/file_scan.rs index 9f1ec4dd906d..a4f958defd24 100644 --- a/crates/polars-plan/src/plans/file_scan.rs +++ b/crates/polars-plan/src/plans/file_scan.rs @@ -147,20 +147,6 @@ impl Hash for FileScan { } impl FileScan { - pub(crate) fn remove_metadata(&mut self) { - match self { - #[cfg(feature = "parquet")] - Self::Parquet { metadata, .. } => { - *metadata = None; - }, - #[cfg(feature = "ipc")] - Self::Ipc { metadata, .. } => { - *metadata = None; - }, - _ => {}, - } - } - pub(crate) fn sort_projection(&self, _file_options: &FileScanOptions) -> bool { match self { #[cfg(feature = "csv")] diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 597bde5eb62f..c71c6ac3323e 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -1,60 +1,41 @@ use std::path::{Path, PathBuf}; use polars_core::prelude::*; -use polars_io::predicates::{BatchStats, ColumnStats}; use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone)] -pub struct HivePartitions { - /// Single value Series that can be used to run the predicate against. - /// They are to be broadcasted if the predicates don't filter them out. - stats: BatchStats, -} +pub struct HivePartitions(pub DataFrame); impl HivePartitions { - pub fn get_projection_schema_and_indices( - &self, - names: &PlHashSet, - ) -> (SchemaRef, Vec) { - let mut out_schema = Schema::with_capacity(self.stats.schema().len()); - let mut out_indices = Vec::with_capacity(self.stats.column_stats().len()); - - for (i, cs) in self.stats.column_stats().iter().enumerate() { - let name = cs.field_name(); - if names.contains(name.as_str()) { - out_indices.push(i); - out_schema - .insert_at_index(out_schema.len(), name.clone(), cs.dtype().clone()) - .unwrap(); - } - } - - (out_schema.into(), out_indices) + pub fn apply_projection(&mut self, column_indices: &[usize]) -> PolarsResult<()> { + let new_schema = self.0.schema().try_project_indices(column_indices)?; + let columns = new_schema + .iter_names() + .map(|n| self.0.column(n).cloned()) + .collect::>>()?; + + self.0 = DataFrame::new(columns)?; + Ok(()) } - pub fn apply_projection(&mut self, new_schema: SchemaRef, column_indices: &[usize]) { - self.stats.with_schema(new_schema); - self.stats.take_indices(column_indices); - } - - pub fn get_statistics(&self) -> &BatchStats { - &self.stats - } + // pub fn get_statistics(&self) -> &BatchStats { + // &self.stats + // } pub(crate) fn schema(&self) -> &SchemaRef { - self.get_statistics().schema() + self.0.schema() } - pub fn materialize_partition_columns(&self) -> Vec { - self.get_statistics() - .column_stats() - .iter() - .map(|cs| cs.get_min_state().unwrap().clone()) - .collect() - } + // pub fn materialize_partition_columns(&self) -> Vec { + // self.get_statistics() + // .column_stats() + // .iter() + // .map(|cs| cs.get_min_state().unwrap().clone()) + // .collect() + // } } /// Note: Returned hive partitions are ordered by their position in the `reader_schema` @@ -67,7 +48,7 @@ pub fn hive_partitions_from_paths( schema: Option, reader_schema: &Schema, try_parse_dates: bool, -) -> PolarsResult>>> { +) -> PolarsResult>> { let Some(path) = paths.first() else { return Ok(None); }; @@ -199,36 +180,14 @@ pub fn hive_partitions_from_paths( } } - let mut hive_partitions = Vec::with_capacity(paths.len()); let mut buffers = buffers .into_iter() - .map(|x| x.into_series()) + .map(|x| x.into_series().map(|s| s.into_column())) .collect::>>()?; - buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX)); - #[allow(clippy::needless_range_loop)] - for i in 0..paths.len() { - let column_stats = buffers - .iter() - .map(|x| { - ColumnStats::from_column_literal(unsafe { x.take_slice_unchecked(&[i as IdxSize]) }) - }) - .collect::>(); - - if column_stats.is_empty() { - polars_bail!( - ComputeError: "expected Hive partitioned path, got {}\n\n\ - This error occurs if some paths are Hive partitioned and some paths are not.", - paths[i].to_str().unwrap(), - ) - } - - let stats = BatchStats::new(hive_schema.clone(), column_stats, None); - hive_partitions.push(HivePartitions { stats }); - } - - Ok(Some(Arc::from(hive_partitions))) + let hive_partitions = DataFrame::new(buffers)?; + Ok(Some(Arc::new(HivePartitions(hive_partitions)))) } /// Determine the path separator for identifying Hive partitions. diff --git a/crates/polars-plan/src/plans/ir/mod.rs b/crates/polars-plan/src/plans/ir/mod.rs index 199d2389d913..7cbba2a6a40f 100644 --- a/crates/polars-plan/src/plans/ir/mod.rs +++ b/crates/polars-plan/src/plans/ir/mod.rs @@ -57,7 +57,7 @@ pub enum IR { Scan { sources: ScanSources, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option, /// schema of the projected file output_schema: Option, diff --git a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs index 40824c24ee06..08f5c3527afa 100644 --- a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs @@ -360,11 +360,11 @@ impl PredicatePushDown<'_> { Ok(lp) }, Scan { - mut sources, + sources, file_info, - hive_parts: mut scan_hive_parts, + hive_parts, ref predicate, - mut scan_type, + scan_type, file_options: options, output_schema, } => { @@ -398,54 +398,6 @@ impl PredicatePushDown<'_> { }; let predicate = predicate_at_scan(acc_predicates, predicate.clone(), expr_arena); - if let (Some(hive_parts), Some(predicate)) = (&scan_hive_parts, &predicate) { - if let Some(io_expr) = - self.expr_eval.unwrap()(predicate, expr_arena, &file_info.schema) - { - if let Some(stats_evaluator) = io_expr.as_stats_evaluator() { - let paths = sources.as_paths().ok_or_else(|| { - polars_err!(nyi = "Hive partitioning of in-memory buffers") - })?; - let mut new_paths = Vec::with_capacity(paths.len()); - let mut new_hive_parts = Vec::with_capacity(paths.len()); - - for i in 0..paths.len() { - let path = &paths[i]; - let hive_parts = &hive_parts[i]; - - if stats_evaluator.should_read(hive_parts.get_statistics())? { - new_paths.push(path.clone()); - new_hive_parts.push(hive_parts.clone()); - } - } - - if paths.len() != new_paths.len() { - if self.verbose { - eprintln!( - "hive partitioning: skipped {} files, first file : {}", - paths.len() - new_paths.len(), - paths[0].display() - ) - } - scan_type.remove_metadata(); - } - if new_paths.is_empty() { - let schema = output_schema.as_ref().unwrap_or(&file_info.schema); - let df = DataFrame::empty_with_schema(schema); - - return Ok(DataFrameScan { - df: Arc::new(df), - schema: schema.clone(), - output_schema: None, - }); - } else { - sources = ScanSources::Paths(new_paths.into()); - scan_hive_parts = Some(Arc::from(new_hive_parts)); - } - } - } - } - let mut do_optimization = match &scan_type { #[cfg(feature = "csv")] FileScan::Csv { .. } => options.slice.is_none(), @@ -457,8 +409,6 @@ impl PredicatePushDown<'_> { }; do_optimization &= predicate.is_some(); - let hive_parts = scan_hive_parts; - let lp = if do_optimization { Scan { sources, diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index 53e1463ee4ca..5e3410b05a28 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -17,6 +17,7 @@ use recursive::recursive; #[cfg(feature = "semi_anti_join")] use semi_anti_join::process_semi_anti_join; +use self::optimizer::hive::HivePartitions; use crate::prelude::optimizer::projection_pushdown::generic::process_generic; use crate::prelude::optimizer::projection_pushdown::group_by::process_group_by; use crate::prelude::optimizer::projection_pushdown::hconcat::process_hconcat; @@ -498,24 +499,17 @@ impl ProjectionPushDown { )?; hive_parts = if let Some(hive_parts) = hive_parts { - let (new_schema, projected_indices) = hive_parts[0] - .get_projection_schema_and_indices( - &with_columns.iter().cloned().collect::>(), - ); - - Some(Arc::new( - hive_parts + let hive_schema = hive_parts.schema(); + Some(Arc::new(HivePartitions( + with_columns .iter() - .cloned() - .map(|mut hp| { - hp.apply_projection( - new_schema.clone(), - projected_indices.as_ref(), - ); - hp + .filter_map(|n| { + hive_schema + .index_of(n) + .map(|idx| hive_parts.0.get_columns()[idx].clone()) }) - .collect::>(), - )) + .collect::(), + ))) } else { None }; @@ -528,7 +522,7 @@ impl ProjectionPushDown { && std::env::var("POLARS_NEW_MULTIFILE").as_deref() != Ok("1") { // Skip reading hive columns from the file. - let partition_schema = hive_parts.first().unwrap().schema(); + let partition_schema = hive_parts.schema(); file_options.with_columns = file_options.with_columns.map(|x| { x.iter() .filter(|x| !partition_schema.contains(x)) @@ -608,7 +602,7 @@ impl ProjectionPushDown { } else { file_options.with_columns = maybe_init_projection_excluding_hive( file_info.reader_schema.as_ref().unwrap(), - hive_parts.as_ref().map(|x| &x[0]), + hive_parts.as_deref(), ); None }; diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs index 5759d72a485f..d5d180cffe54 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs @@ -193,11 +193,6 @@ impl ParquetSourceNode { /// * `self.physical_predicate` pub(super) fn init_row_group_decoder(&self) -> RowGroupDecoder { let scan_sources = self.scan_sources.clone(); - let hive_partitions = self.hive_parts.clone(); - let hive_partitions_width = hive_partitions - .as_deref() - .map(|x| x[0].get_statistics().column_stats().len()) - .unwrap_or(0); let include_file_paths = self.file_options.include_file_paths.clone(); let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap(); let row_index = self.row_index.clone(); @@ -258,8 +253,6 @@ impl ParquetSourceNode { RowGroupDecoder { scan_sources, - hive_partitions, - hive_partitions_width, include_file_paths, reader_schema: self.schema.clone().unwrap(), projected_arrow_schema, diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs index 5250f89bca70..68b3e3b32be5 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs @@ -9,7 +9,6 @@ use polars_io::cloud::CloudOptions; use polars_io::predicates::ScanIOPredicate; use polars_io::prelude::{FileMetadata, ParquetOptions}; use polars_io::utils::byte_source::DynByteSourceBuilder; -use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::{FileInfo, ScanSources}; use polars_plan::prelude::FileScanOptions; use polars_utils::index::AtomicIdxSize; @@ -37,7 +36,6 @@ type AsyncTaskData = ( pub struct ParquetSourceNode { scan_sources: ScanSources, file_info: FileInfo, - hive_parts: Option>>, predicate: Option, options: ParquetOptions, cloud_options: Option, @@ -104,7 +102,6 @@ impl ParquetSourceNode { Self { scan_sources, file_info, - hive_parts: None, predicate, options, cloud_options, diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs index 9bea2726c5fa..61b67e06a823 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs @@ -13,7 +13,6 @@ use polars_io::predicates::ScanIOPredicate; use polars_io::prelude::_internal::calc_prefilter_cost; pub use polars_io::prelude::_internal::PrefilterMaskSetting; use polars_io::prelude::try_set_sorted_flag; -use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::ScanSources; use polars_utils::index::AtomicIdxSize; use polars_utils::pl_str::PlSmallStr; @@ -26,8 +25,6 @@ use crate::nodes::TaskPriority; /// Turns row group data into DataFrames. pub(super) struct RowGroupDecoder { pub(super) scan_sources: ScanSources, - pub(super) hive_partitions: Option>>, - pub(super) hive_partitions_width: usize, pub(super) include_file_paths: Option, pub(super) reader_schema: Arc, pub(super) projected_arrow_schema: Arc, @@ -61,7 +58,6 @@ impl RowGroupDecoder { let out_width = self.row_index.is_some() as usize + self.projected_arrow_schema.len() - + self.hive_partitions_width + self.include_file_paths.is_some() as usize; let mut out_columns = Vec::with_capacity(out_width); @@ -139,17 +135,7 @@ impl RowGroupDecoder { async fn shared_file_state_init_func(&self, row_group_data: &RowGroupData) -> SharedFileState { let path_index = row_group_data.path_index; - let hive_series = if let Some(hp) = self.hive_partitions.as_deref() { - let v = hp[path_index].materialize_partition_columns(); - v.into_iter() - .map(|s| { - s.into_column() - .new_from_index(0, row_group_data.file_max_row_group_height) - }) - .collect() - } else { - vec![] - }; + let hive_series = vec![]; // @scalar-opt let file_path_series = self.include_file_paths.clone().map(|file_path_col| { @@ -479,7 +465,6 @@ impl RowGroupDecoder { let mut live_columns = Vec::with_capacity( self.row_index.is_some() as usize + self.predicate_arrow_field_indices.len() - + self.hive_partitions_width + self.include_file_paths.is_some() as usize, ); diff --git a/crates/polars-stream/src/physical_plan/fmt.rs b/crates/polars-stream/src/physical_plan/fmt.rs index 0c829e0ec029..add4aa61833a 100644 --- a/crates/polars-stream/src/physical_plan/fmt.rs +++ b/crates/polars-stream/src/physical_plan/fmt.rs @@ -139,7 +139,6 @@ fn visualize_plan_rec( PhysNodeKind::FileScan { scan_sources, file_info, - hive_parts, output_schema: _, scan_type, predicate, @@ -193,13 +192,6 @@ fn visualize_plan_rec( write!(f, "\nfilter: {}", predicate.display(expr_arena)).unwrap(); } - if let Some(v) = hive_parts - .as_deref() - .map(|x| x[0].get_statistics().column_stats().len()) - { - write!(f, "\nhive: {} columns", v).unwrap(); - } - (out, &[][..]) }, PhysNodeKind::GroupBy { input, key, aggs } => ( diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 9d8d9d8b35c8..6b4fb8b6e3ad 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -435,7 +435,6 @@ pub fn lower_ir( let node_kind = PhysNodeKind::FileScan { scan_sources, file_info, - hive_parts, output_schema: scan_output_schema, scan_type, predicate, diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index 87acf2c3a726..dcf1daf80741 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -7,7 +7,6 @@ use polars_core::schema::{Schema, SchemaRef}; use polars_error::PolarsResult; use polars_ops::frame::JoinArgs; use polars_plan::dsl::JoinTypeOptionsIR; -use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::{AExpr, DataFrameUdf, FileInfo, FileScan, ScanSources, IR}; use polars_plan::prelude::expr_ir::ExprIR; @@ -164,7 +163,6 @@ pub enum PhysNodeKind { FileScan { scan_sources: ScanSources, file_info: FileInfo, - hive_parts: Option>>, predicate: Option, output_schema: Option, scan_type: FileScan, diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 0e869431ba5c..4a2bdd42dc1f 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -344,7 +344,6 @@ fn to_graph_rec<'a>( let FileScan { scan_sources, file_info, - hive_parts: _, output_schema, scan_type, predicate,