diff --git a/crates/polars-io/src/hive.rs b/crates/polars-io/src/hive.rs index 1d5514bfcb5b..18d58649b692 100644 --- a/crates/polars-io/src/hive.rs +++ b/crates/polars-io/src/hive.rs @@ -18,8 +18,9 @@ pub(crate) fn materialize_hive_partitions( df: &mut DataFrame, reader_schema: &polars_schema::Schema, hive_partition_columns: Option<&[Series]>, - num_rows: usize, ) { + let num_rows = df.height(); + if let Some(hive_columns) = hive_partition_columns { // Insert these hive columns in the order they are stored in the file. if hive_columns.is_empty() { diff --git a/crates/polars-io/src/ipc/ipc_file.rs b/crates/polars-io/src/ipc/ipc_file.rs index 919e0f766f69..26f7d970eed1 100644 --- a/crates/polars-io/src/ipc/ipc_file.rs +++ b/crates/polars-io/src/ipc/ipc_file.rs @@ -252,24 +252,22 @@ impl SerReader for IpcReader { // In case only hive columns are projected, the df would be empty, but we need the row count // of the file in order to project the correct number of rows for the hive columns. - let (mut df, row_count) = (|| { + let mut df = (|| { if self.projection.as_ref().is_some_and(|x| x.is_empty()) { let row_count = if let Some(v) = self.n_rows { v } else { get_row_count(&mut self.reader)? as usize }; - let mut df = DataFrame::empty(); - unsafe { df.set_height(row_count) }; + let df = DataFrame::empty_with_height(row_count); - return PolarsResult::Ok((df, row_count)); + return PolarsResult::Ok(df); } if self.memory_map.is_some() && self.reader.to_file().is_some() { match self.finish_memmapped(None) { Ok(df) => { - let n = df.height(); - return Ok((df, n)); + return Ok(df); }, Err(err) => check_mmap_err(err)?, } @@ -293,17 +291,11 @@ impl SerReader for IpcReader { let ipc_reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows); let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?; - let n = df.height(); - Ok((df, n)) + Ok(df) })()?; if let Some(hive_cols) = hive_partition_columns { - materialize_hive_partitions( - &mut df, - reader_schema, - Some(hive_cols.as_slice()), - row_count, - ); + materialize_hive_partitions(&mut df, reader_schema, Some(hive_cols.as_slice())); }; if let Some((col, value)) = include_file_path { @@ -314,7 +306,7 @@ impl SerReader for IpcReader { DataType::String, AnyValue::StringOwned(value.as_ref().into()), ), - row_count, + df.height(), )) }; } diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 14fd0c279139..81a57e9ba959 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -440,12 +440,7 @@ fn rg_to_dfs_prefiltered( } else { df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns.clone()) }; - materialize_hive_partitions( - &mut df, - schema.as_ref(), - hive_partition_columns, - md.num_rows(), - ); + materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns); let s = predicate.predicate.evaluate_io(&df)?; let mask = s.bool().expect("filter predicates was not of type boolean"); @@ -489,12 +484,7 @@ fn rg_to_dfs_prefiltered( // We don't need to do any further work if there are no dead columns if dead_idx_to_col_idx.is_empty() { - materialize_hive_partitions( - &mut df, - schema.as_ref(), - hive_partition_columns, - md.num_rows(), - ); + materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns); return Ok(Some(df)); } @@ -606,12 +596,7 @@ fn rg_to_dfs_prefiltered( // and the length is given by the parquet file which should always be the same. let mut df = unsafe { DataFrame::new_no_checks(height, merged) }; - materialize_hive_partitions( - &mut df, - schema.as_ref(), - hive_partition_columns, - md.num_rows(), - ); + materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns); PolarsResult::Ok(Some(df)) }) @@ -713,7 +698,7 @@ fn rg_to_dfs_optionally_par_over_columns( ); } - materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1); + materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns); apply_predicate( &mut df, predicate.as_ref().map(|p| p.predicate.as_ref()), @@ -850,12 +835,7 @@ fn rg_to_dfs_par_over_rg( ); } - materialize_hive_partitions( - &mut df, - schema.as_ref(), - hive_partition_columns, - slice.1, - ); + materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns); apply_predicate( &mut df, predicate.as_ref().map(|p| p.predicate.as_ref()), diff --git a/crates/polars-io/src/parquet/read/utils.rs b/crates/polars-io/src/parquet/read/utils.rs index c6d8476e08f8..e81b5bf15765 100644 --- a/crates/polars-io/src/parquet/read/utils.rs +++ b/crates/polars-io/src/parquet/read/utils.rs @@ -26,7 +26,7 @@ pub fn materialize_empty_df( .unwrap(); } - materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns, 0); + materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns); df } diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 2bb0c6e7d37c..f6ee0a958f1b 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -4,7 +4,7 @@ import urllib.parse import warnings from collections import OrderedDict -from datetime import datetime +from datetime import date, datetime from functools import partial from pathlib import Path from typing import Any, Callable @@ -888,3 +888,50 @@ def test_hive_auto_enables_when_unspecified_and_hive_schema_passed( pl.Series("a", [1], dtype=pl.UInt8), ), ) + + +@pytest.mark.write_disk +def test_hive_parquet_prefiltered_20894_21327(tmp_path: Path) -> None: + file_path = tmp_path / "date=2025-01-01/00000000.parquet" + file_path.parent.mkdir(exist_ok=True, parents=True) + + data = pl.DataFrame( + { + "date": [date(2025, 1, 1), date(2025, 1, 1)], + "value": ["1", "2"], + } + ) + + data.write_parquet(file_path) + + import base64 + import subprocess + + # For security + scan_path_b64 = base64.b64encode(str(file_path).encode()).decode() + + # This is, the easiest way to control the threadpool size in the test suite. + out = subprocess.check_output( + [ + sys.executable, + "-c", + f"""\ +import os +os.environ["POLARS_MAX_THREADS"] = "1" + +import polars as pl +import base64 + +assert pl.thread_pool_size() == 1 + +tmp_path = base64.b64decode("{scan_path_b64}").decode() + +# We need the str() to trigger panic on invalid state +str(pl.scan_parquet(tmp_path, hive_partitioning=True).filter(pl.col("value") == "1").collect()) + +print("OK", end="") +""", + ] + ) + + assert out == b"OK"