diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml index f0a03d9841a9d..73e5c5f810eb8 100644 --- a/.github/workflows/audit.yml +++ b/.github/workflows/audit.yml @@ -40,7 +40,7 @@ jobs: security_audit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 + - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - name: Install cargo-audit uses: taiki-e/install-action@f535147c22906d77695e11cb199e764aa610a4fc # v2.62.46 with: diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4b3c31e6b3b0c..2ce3f3b19d444 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -271,7 +271,21 @@ jobs: runs-on: ubuntu-latest container: image: amd64/rust + volumes: + - /usr/local:/host/usr/local steps: + - name: Remove unnecessary preinstalled software + run: | + echo "Disk space before cleanup:" + df -h + # remove tool cache: about 8.5GB (github has host /opt/hostedtoolcache mounted as /__t) + rm -rf /__t/* || true + # remove Haskell runtime: about 6.3GB (host /usr/local/.ghcup) + rm -rf /host/usr/local/.ghcup || true + # remove Android library: about 7.8GB (host /usr/local/lib/android) + rm -rf /host/usr/local/lib/android || true + echo "Disk space after cleanup:" + df -h - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 with: submodules: true diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0ed499da04757..a77fd764eea06 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -653,7 +653,7 @@ config_namespace! { /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, /// and `Binary/BinaryLarge` with `BinaryView`. - pub schema_force_view_types: bool, default = true + pub schema_force_view_types: bool, default = false /// (reading) If true, parquet reader will read columns of /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 52c5393e10319..48fd6f97f12d9 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -582,11 +582,11 @@ mod tests { assert_eq!(string_truncation_stats.null_count, Precision::Exact(2)); assert_eq!( string_truncation_stats.max_value, - Precision::Inexact(ScalarValue::Utf8View(Some("b".repeat(63) + "c"))) + Precision::Inexact(Utf8(Some("b".repeat(63) + "c"))) ); assert_eq!( string_truncation_stats.min_value, - Precision::Inexact(ScalarValue::Utf8View(Some("a".repeat(64)))) + Precision::Inexact(Utf8(Some("a".repeat(64)))) ); Ok(()) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index c15b7eae08432..920294504149f 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -69,6 +69,9 @@ use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::optimizer::PhysicalOptimizer; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::node_id::{ + annotate_node_id_for_execution_plan, NodeIdAnnotator, +}; use datafusion_physical_plan::ExecutionPlan; use datafusion_session::Session; #[cfg(feature = "sql")] @@ -668,9 +671,12 @@ impl SessionState { logical_plan: &LogicalPlan, ) -> datafusion_common::Result> { let logical_plan = self.optimize(logical_plan)?; - self.query_planner + let physical_plan = self + .query_planner .create_physical_plan(&logical_plan, self) - .await + .await?; + let mut id_annotator = NodeIdAnnotator::new(); + annotate_node_id_for_execution_plan(&physical_plan, &mut id_annotator) } /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 097600e45eadd..8caeda901b519 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -182,6 +182,18 @@ impl TestOutput { .map(|(_pruned, matched)| matched) } + /* + /// The number of row_groups fully matched by statistics + fn row_groups_fully_matched_statistics(&self) -> Option { + self.metric_value("row_groups_fully_matched_statistics") + } + + /// The number of row groups pruned by limit pruning + fn limit_pruned_row_groups(&self) -> Option { + self.metric_value("limit_pruned_row_groups") + } + */ + /// The number of row_groups pruned by statistics fn row_groups_pruned_statistics(&self) -> Option { self.pruning_metric("row_groups_pruned_statistics") @@ -232,20 +244,43 @@ impl TestOutput { /// and the appropriate scenario impl ContextWithParquet { async fn new(scenario: Scenario, unit: Unit) -> Self { - Self::with_config(scenario, unit, SessionConfig::new()).await + Self::with_config(scenario, unit, SessionConfig::new(), None, None).await + } + + // Set custom schema and batches for the test + /* + pub async fn with_custom_data( + scenario: Scenario, + unit: Unit, + schema: Arc, + batches: Vec, + ) -> Self { + Self::with_config( + scenario, + unit, + SessionConfig::new(), + Some(schema), + Some(batches), + ) + .await } + */ async fn with_config( scenario: Scenario, unit: Unit, mut config: SessionConfig, + custom_schema: Option>, + custom_batches: Option>, ) -> Self { // Use a single partition for deterministic results no matter how many CPUs the host has config = config.with_target_partitions(1); let file = match unit { Unit::RowGroup(row_per_group) => { config = config.with_parquet_bloom_filter_pruning(true); - make_test_file_rg(scenario, row_per_group).await + config.options_mut().execution.parquet.pushdown_filters = true; + make_test_file_rg(scenario, row_per_group, custom_schema, custom_batches) + .await } Unit::Page(row_per_page) => { config = config.with_parquet_page_index_pruning(true); @@ -1071,7 +1106,12 @@ fn create_data_batch(scenario: Scenario) -> Vec { } /// Create a test parquet file with various data types -async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile { +async fn make_test_file_rg( + scenario: Scenario, + row_per_group: usize, + custom_schema: Option>, + custom_batches: Option>, +) -> NamedTempFile { let mut output_file = tempfile::Builder::new() .prefix("parquet_pruning") .suffix(".parquet") @@ -1084,8 +1124,14 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem .set_statistics_enabled(EnabledStatistics::Page) .build(); - let batches = create_data_batch(scenario); - let schema = batches[0].schema(); + let (batches, schema) = + if let (Some(schema), Some(batches)) = (custom_schema, custom_batches) { + (batches, schema) + } else { + let batches = create_data_batch(scenario); + let schema = batches[0].schema(); + (batches, schema) + }; let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 27bee10234b57..6a99c1f8d61fd 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -165,7 +165,9 @@ async fn page_index_filter_one_col() { // 5.create filter date_string_col == "01/01/09"`; // Note this test doesn't apply type coercion so the literal must match the actual view type - let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8view("01/01/09"))); + // xudong: use new_utf8, because schema_force_view_types was changed to false now. + // qi: when schema_force_view_types setting to true, we should change back to utf8view + let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8("01/01/09"))); let batches = get_filter_results(&state, filter.clone(), false).await; assert_eq!(batches[0].num_rows(), 14); diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 0411298055f26..c5e2a4c917b0d 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -30,10 +30,12 @@ struct RowGroupPruningTest { query: String, expected_errors: Option, expected_row_group_matched_by_statistics: Option, + // expected_row_group_fully_matched_by_statistics: Option, expected_row_group_pruned_by_statistics: Option, expected_files_pruned_by_statistics: Option, expected_row_group_matched_by_bloom_filter: Option, expected_row_group_pruned_by_bloom_filter: Option, + // expected_limit_pruned_row_groups: Option, expected_rows: usize, } impl RowGroupPruningTest { @@ -45,9 +47,11 @@ impl RowGroupPruningTest { expected_errors: None, expected_row_group_matched_by_statistics: None, expected_row_group_pruned_by_statistics: None, + // expected_row_group_fully_matched_by_statistics: None, expected_files_pruned_by_statistics: None, expected_row_group_matched_by_bloom_filter: None, expected_row_group_pruned_by_bloom_filter: None, + // expected_limit_pruned_row_groups: None, expected_rows: 0, } } @@ -76,6 +80,22 @@ impl RowGroupPruningTest { self } + // Set the expected fully matched row groups by statistics + /* + fn with_fully_matched_by_stats( + mut self, + fully_matched_by_stats: Option, + ) -> Self { + self.expected_row_group_fully_matched_by_statistics = fully_matched_by_stats; + self + } + + fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option) -> Self { + self.expected_limit_pruned_row_groups = pruned_by_limit; + self + } + */ + // Set the expected pruned row groups by statistics fn with_pruned_by_stats(mut self, pruned_by_stats: Option) -> Self { self.expected_row_group_pruned_by_statistics = pruned_by_stats; @@ -144,6 +164,7 @@ impl RowGroupPruningTest { self.expected_row_group_pruned_by_bloom_filter, "mismatched row_groups_pruned_bloom_filter", ); + assert_eq!( output.result_rows, self.expected_rows, @@ -153,6 +174,66 @@ impl RowGroupPruningTest { output.description(), ); } + + // Execute the test with the current configuration + /* + async fn test_row_group_prune_with_custom_data( + self, + schema: Arc, + batches: Vec, + max_row_per_group: usize, + ) { + let output = ContextWithParquet::with_custom_data( + self.scenario, + RowGroup(max_row_per_group), + schema, + batches, + ) + .await + .query(&self.query) + .await; + + println!("{}", output.description()); + assert_eq!( + output.predicate_evaluation_errors(), + self.expected_errors, + "mismatched predicate_evaluation error" + ); + assert_eq!( + output.row_groups_matched_statistics(), + self.expected_row_group_matched_by_statistics, + "mismatched row_groups_matched_statistics", + ); + assert_eq!( + output.row_groups_fully_matched_statistics(), + self.expected_row_group_fully_matched_by_statistics, + "mismatched row_groups_fully_matched_statistics", + ); + assert_eq!( + output.row_groups_pruned_statistics(), + self.expected_row_group_pruned_by_statistics, + "mismatched row_groups_pruned_statistics", + ); + assert_eq!( + output.files_ranges_pruned_statistics(), + self.expected_files_pruned_by_statistics, + "mismatched files_ranges_pruned_statistics", + ); + assert_eq!( + output.limit_pruned_row_groups(), + self.expected_limit_pruned_row_groups, + "mismatched limit_pruned_row_groups", + ); + assert_eq!( + output.result_rows, + self.expected_rows, + "Expected {} rows, got {}: {}", + output.result_rows, + self.expected_rows, + output.description(), + ); + } + */ } #[tokio::test] @@ -289,11 +370,16 @@ async fn prune_disabled() { let expected_rows = 10; let config = SessionConfig::new().with_parquet_pruning(false); - let output = - ContextWithParquet::with_config(Scenario::Timestamps, RowGroup(5), config) - .await - .query(query) - .await; + let output = ContextWithParquet::with_config( + Scenario::Timestamps, + RowGroup(5), + config, + None, + None, + ) + .await + .query(query) + .await; println!("{}", output.description()); // This should not prune any @@ -1636,3 +1722,243 @@ async fn test_bloom_filter_decimal_dict() { .test_row_group_prune() .await; } + +/* +// Helper function to create a batch with a single Int32 column. +fn make_i32_batch( + name: &str, + values: Vec, +) -> datafusion_common::error::Result { + let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Int32, false)])); + let array: ArrayRef = Arc::new(Int32Array::from(values)); + RecordBatch::try_new(schema, vec![array]).map_err(DataFusionError::from) +} + +// Helper function to create a batch with two Int32 columns +fn make_two_col_i32_batch( + name_a: &str, + name_b: &str, + values_a: Vec, + values_b: Vec, +) -> datafusion_common::error::Result { + let schema = Arc::new(Schema::new(vec![ + Field::new(name_a, DataType::Int32, false), + Field::new(name_b, DataType::Int32, false), + ])); + let array_a: ArrayRef = Arc::new(Int32Array::from(values_a)); + let array_b: ArrayRef = Arc::new(Int32Array::from(values_b)); + RecordBatch::try_new(schema, vec![array_a, array_b]).map_err(DataFusionError::from) +} + +#[tokio::test] +async fn test_limit_pruning_basic() -> datafusion_common::error::Result<()> { + // Scenario: Simple integer column, multiple row groups + // Query: SELECT c1 FROM t WHERE c1 = 0 LIMIT 2 + // We expect 2 rows in total. + + // Row Group 0: c1 = [0, -2] -> Partially matched, 1 row + // Row Group 1: c1 = [1, 2] -> Fully matched, 2 rows + // Row Group 2: c1 = [3, 4] -> Fully matched, 2 rows + // Row Group 3: c1 = [5, 6] -> Fully matched, 2 rows + // Row Group 4: c1 = [-1, -2] -> Not matched + + // If limit = 2, and RG1 is fully matched and has 2 rows, we should + // only scan RG1 and prune other row groups + // RG4 is pruned by statistics. RG2 and RG3 are pruned by limit. + // So 2 row groups are effectively pruned due to limit pruning. + + let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); + let query = "SELECT c1 FROM t WHERE c1 >= 0 LIMIT 2"; + + let batches = vec![ + make_i32_batch("c1", vec![0, -2])?, + make_i32_batch("c1", vec![0, 0])?, + make_i32_batch("c1", vec![0, 0])?, + make_i32_batch("c1", vec![0, 0])?, + make_i32_batch("c1", vec![-1, -2])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) // Assuming Scenario::Int can handle this data + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(2) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) + .with_fully_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(1)) + .with_limit_pruned_row_groups(Some(3)) + .test_row_group_prune_with_custom_data(schema, batches, 2) + .await; + + Ok(()) +} + +#[tokio::test] +async fn test_limit_pruning_complex_filter() -> datafusion_common::error::Result<()> { + // Test Case 1: Complex filter with two columns (a = 1 AND b > 1 AND b < 4) + // Row Group 0: a=[1,1,1], b=[0,2,3] -> Partially matched, 2 rows match (b=2,3) + // Row Group 1: a=[1,1,1], b=[2,2,2] -> Fully matched, 3 rows + // Row Group 2: a=[1,1,1], b=[2,3,3] -> Fully matched, 3 rows + // Row Group 3: a=[1,1,1], b=[2,2,3] -> Fully matched, 3 rows + // Row Group 4: a=[2,2,2], b=[2,2,2] -> Not matched (a != 1) + // Row Group 5: a=[1,1,1], b=[5,6,7] -> Not matched (b >= 4) + + // With LIMIT 5, we need RG1 (3 rows) + RG2 (2 rows from 3) = 5 rows + // RG4 and RG5 should be pruned by statistics + // RG3 should be pruned by limit + // RG0 is partially matched, so it depends on the order + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let query = "SELECT a, b FROM t WHERE a = 1 AND b > 1 AND b < 4 LIMIT 5"; + + let batches = vec![ + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![0, 2, 3])?, + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![2, 2, 2])?, + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![2, 3, 3])?, + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![2, 2, 3])?, + make_two_col_i32_batch("a", "b", vec![2, 2, 2], vec![2, 2, 2])?, + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![5, 6, 7])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(5) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) // RG0,1,2,3 are matched + .with_fully_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(2)) // RG4,5 are pruned + .with_limit_pruned_row_groups(Some(2)) // RG0, RG3 is pruned by limit + .test_row_group_prune_with_custom_data(schema, batches, 3) + .await; + + Ok(()) +} + +#[tokio::test] +async fn test_limit_pruning_multiple_fully_matched( +) -> datafusion_common::error::Result<()> { + // Test Case 2: Limit requires multiple fully matched row groups + // Row Group 0: a=[5,5,5,5] -> Fully matched, 4 rows + // Row Group 1: a=[5,5,5,5] -> Fully matched, 4 rows + // Row Group 2: a=[5,5,5,5] -> Fully matched, 4 rows + // Row Group 3: a=[5,5,5,5] -> Fully matched, 4 rows + // Row Group 4: a=[1,2,3,4] -> Not matched + + // With LIMIT 8, we need RG0 (4 rows) + RG1 (4 rows) 8 rows + // RG2,3 should be pruned by limit + // RG4 should be pruned by statistics + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let query = "SELECT a FROM t WHERE a = 5 LIMIT 8"; + + let batches = vec![ + make_i32_batch("a", vec![5, 5, 5, 5])?, + make_i32_batch("a", vec![5, 5, 5, 5])?, + make_i32_batch("a", vec![5, 5, 5, 5])?, + make_i32_batch("a", vec![5, 5, 5, 5])?, + make_i32_batch("a", vec![1, 2, 3, 4])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(8) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) // RG0,1,2,3 matched + .with_fully_matched_by_stats(Some(4)) + .with_pruned_by_stats(Some(1)) // RG4 pruned + .with_limit_pruned_row_groups(Some(2)) // RG2,3 pruned by limit + .test_row_group_prune_with_custom_data(schema, batches, 4) + .await; + + Ok(()) +} + +#[tokio::test] +async fn test_limit_pruning_no_fully_matched() -> datafusion_common::error::Result<()> { + // Test Case 3: No fully matched row groups - all are partially matched + // Row Group 0: a=[1,2,3] -> Partially matched, 1 row (a=2) + // Row Group 1: a=[2,3,4] -> Partially matched, 1 row (a=2) + // Row Group 2: a=[2,5,6] -> Partially matched, 1 row (a=2) + // Row Group 3: a=[2,7,8] -> Partially matched, 1 row (a=2) + // Row Group 4: a=[9,10,11] -> Not matched + + // With LIMIT 3, we need to scan RG0,1,2 to get 3 matching rows + // Cannot prune much by limit since all matching RGs are partial + // RG4 should be pruned by statistics + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let query = "SELECT a FROM t WHERE a = 2 LIMIT 3"; + + let batches = vec![ + make_i32_batch("a", vec![1, 2, 3])?, + make_i32_batch("a", vec![2, 3, 4])?, + make_i32_batch("a", vec![2, 5, 6])?, + make_i32_batch("a", vec![2, 7, 8])?, + make_i32_batch("a", vec![9, 10, 11])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(3) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) // RG0,1,2,3 matched + .with_fully_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(1)) // RG4 pruned + .with_limit_pruned_row_groups(Some(0)) // RG3 pruned by limit + .test_row_group_prune_with_custom_data(schema, batches, 3) + .await; + + Ok(()) +} + +#[tokio::test] +async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error::Result<()> +{ + // Test Case 4: Limit exceeds all fully matched rows, need partially matched + // Row Group 0: a=[10,11,12,12] -> Partially matched, 1 row (a=10) + // Row Group 1: a=[10,10,10,10] -> Fully matched, 4 rows + // Row Group 2: a=[10,10,10,10] -> Fully matched, 4 rows + // Row Group 3: a=[10,13,14,11] -> Partially matched, 1 row (a=10) + // Row Group 4: a=[20,21,22,22] -> Not matched + + // With LIMIT 10, we need RG1 (4) + RG2 (4) = 8 from fully matched + // Still need 2 more, so we need to scan partially matched RG0 and RG3 + // All matching row groups should be scanned, only RG4 pruned by statistics + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let query = "SELECT a FROM t WHERE a = 10 LIMIT 10"; + + let batches = vec![ + make_i32_batch("a", vec![10, 11, 12, 12])?, + make_i32_batch("a", vec![10, 10, 10, 10])?, + make_i32_batch("a", vec![10, 10, 10, 10])?, + make_i32_batch("a", vec![10, 13, 14, 11])?, + make_i32_batch("a", vec![20, 21, 22, 22])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(10) // Total: 1 + 3 + 4 + 1 = 9 (less than limit) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) // RG0,1,2,3 matched + .with_fully_matched_by_stats(Some(2)) + .with_pruned_by_stats(Some(1)) // RG4 pruned + .with_limit_pruned_row_groups(Some(0)) // No limit pruning since we need all RGs + .test_row_group_prune_with_custom_data(schema, batches, 4) + .await; + + Ok(()) +} +*/ diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 5b7d9ac8fbe99..d0fe747ebd255 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3706,10 +3706,11 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { ); // Apply the function - let result = replace_order_preserving_variants(dist_context)?; + let result = replace_order_preserving_variants(dist_context, false)?; // Verify the plan was transformed to CoalescePartitionsExec result + .0 .plan .as_any() .downcast_ref::() @@ -3717,7 +3718,7 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { // Verify fetch was preserved assert_eq!( - result.plan.fetch(), + result.0.plan.fetch(), Some(5), "Fetch value was not preserved after transformation" ); diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 306bc9e6b013d..c45d234f3b512 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -48,6 +48,14 @@ pub struct ParquetFileMetrics { pub row_groups_pruned_bloom_filter: PruningMetrics, /// Number of row groups whose statistics were checked, tracked with matched/pruned counts pub row_groups_pruned_statistics: PruningMetrics, + /// Number of row groups whose bloom filters were checked and matched (not pruned) + pub row_groups_matched_bloom_filter: Count, + /// Number of row groups pruned due to limit pruning. + pub limit_pruned_row_groups: Count, + /// Number of row groups whose statistics were checked and fully matched + pub row_groups_fully_matched_statistics: Count, + /// Number of row groups whose statistics were checked and matched (not pruned) + pub row_groups_matched_statistics: Count, /// Total number of bytes scanned pub bytes_scanned: Count, /// Total rows filtered out by predicates pushed into parquet scan @@ -85,11 +93,27 @@ impl ParquetFileMetrics { // ----------------------- // 'summary' level metrics // ----------------------- + let row_groups_matched_bloom_filter = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("row_groups_matched_bloom_filter", partition); + let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) .with_type(MetricType::SUMMARY) .pruning_metrics("row_groups_pruned_bloom_filter", partition); + let limit_pruned_row_groups = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("limit_pruned_row_groups", partition); + + let row_groups_fully_matched_statistics = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("row_groups_fully_matched_statistics", partition); + + let row_groups_matched_statistics = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("row_groups_matched_statistics", partition); + let row_groups_pruned_statistics = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) .with_type(MetricType::SUMMARY) @@ -153,8 +177,12 @@ impl ParquetFileMetrics { Self { files_ranges_pruned_statistics, predicate_evaluation_errors, + row_groups_matched_bloom_filter, row_groups_pruned_bloom_filter, + row_groups_fully_matched_statistics, + row_groups_matched_statistics, row_groups_pruned_statistics, + limit_pruned_row_groups, bytes_scanned, pushdown_rows_pruned, pushdown_rows_matched, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 2815b82f1d455..5701b22ccbcc5 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -99,6 +99,8 @@ pub(super) struct ParquetOpener { pub enable_row_group_stats_pruning: bool, /// Coerce INT96 timestamps to specific TimeUnit pub coerce_int96: Option, + /// Should limit pruning be applied + pub enable_limit_pruning: bool, /// Optional parquet FileDecryptionProperties #[cfg(feature = "parquet_encryption")] pub file_decryption_properties: Option>, @@ -151,6 +153,7 @@ impl FileOpener for ParquetOpener { let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; + let enable_limit_pruning = self.enable_limit_pruning; let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); @@ -407,8 +410,14 @@ impl FileOpener for ParquetOpener { .add_matched(n_remaining_row_groups); } - let mut access_plan = row_groups.build(); + // Prune by limit + if enable_limit_pruning { + if let Some(limit) = limit { + row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); + } + } + let mut access_plan = row_groups.build(); // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well @@ -889,6 +898,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -962,6 +972,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1051,6 +1062,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1143,6 +1155,7 @@ mod test { reorder_filters: true, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, @@ -1235,6 +1248,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1385,6 +1399,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory), enable_row_group_stats_pruning: false, coerce_int96: None, diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 2043f75070b5c..f0d483ba35b10 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -24,6 +24,8 @@ use arrow::datatypes::Schema; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; use datafusion_datasource::FileRange; +use datafusion_physical_expr::expressions::NotExpr; +use datafusion_physical_expr::PhysicalExprSimplifier; use datafusion_pruning::PruningPredicate; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::parquet_column; @@ -46,13 +48,19 @@ use parquet::{ pub struct RowGroupAccessPlanFilter { /// which row groups should be accessed access_plan: ParquetAccessPlan, + /// which row groups are fully contained within the pruning predicate + is_fully_matched: Vec, } impl RowGroupAccessPlanFilter { /// Create a new `RowGroupPlanBuilder` for pruning out the groups to scan /// based on metadata and statistics pub fn new(access_plan: ParquetAccessPlan) -> Self { - Self { access_plan } + let num_row_groups = access_plan.len(); + Self { + access_plan, + is_fully_matched: vec![false; num_row_groups], + } } /// Return true if there are no row groups @@ -70,6 +78,49 @@ impl RowGroupAccessPlanFilter { self.access_plan } + /// Returns the is_fully_matched vector + pub fn is_fully_matched(&self) -> &Vec { + &self.is_fully_matched + } + + /// Prunes the access plan based on the limit and fully contained row groups. + pub fn prune_by_limit( + &mut self, + limit: usize, + rg_metadata: &[RowGroupMetaData], + metrics: &ParquetFileMetrics, + ) { + let mut fully_matched_row_group_indexes: Vec = Vec::new(); + let mut fully_matched_rows_count: usize = 0; + + // Iterate through the currently accessible row groups + for &idx in self.access_plan.row_group_indexes().iter() { + if self.is_fully_matched[idx] { + let row_group_row_count = rg_metadata[idx].num_rows() as usize; + fully_matched_row_group_indexes.push(idx); + fully_matched_rows_count += row_group_row_count; + if fully_matched_rows_count >= limit { + break; + } + } + } + + if fully_matched_rows_count >= limit { + let original_num_accessible_row_groups = + self.access_plan.row_group_indexes().len(); + let new_num_accessible_row_groups = fully_matched_row_group_indexes.len(); + let pruned_count = original_num_accessible_row_groups + .saturating_sub(new_num_accessible_row_groups); + metrics.limit_pruned_row_groups.add(pruned_count); + + let mut new_access_plan = ParquetAccessPlan::new_none(rg_metadata.len()); + for &idx in &fully_matched_row_group_indexes { + new_access_plan.scan(idx); + } + self.access_plan = new_access_plan; + } + } + /// Prune remaining row groups to only those within the specified range. /// /// Updates this set to mark row groups that should not be scanned @@ -135,13 +186,54 @@ impl RowGroupAccessPlanFilter { // try to prune the row groups in a single call match predicate.prune(&pruning_stats) { Ok(values) => { - // values[i] is false means the predicate could not be true for row group i + let mut fully_contained_candidates_original_idx: Vec = Vec::new(); for (idx, &value) in row_group_indexes.iter().zip(values.iter()) { if !value { self.access_plan.skip(*idx); metrics.row_groups_pruned_statistics.add_pruned(1); } else { metrics.row_groups_pruned_statistics.add_matched(1); + fully_contained_candidates_original_idx.push(*idx); + } + } + + // Note: this part of code shouldn't be expensive with a limited number of row groups + // If we do find it's expensive, we can consider optimizing it further. + if !fully_contained_candidates_original_idx.is_empty() { + // Use NotExpr to create the inverted predicate + let inverted_expr = + Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr()))); + // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0) + // before building the pruning predicate + let mut simplifier = PhysicalExprSimplifier::new(arrow_schema); + let inverted_expr = simplifier.simplify(inverted_expr).unwrap(); + if let Ok(inverted_predicate) = PruningPredicate::try_new( + inverted_expr, + Arc::clone(predicate.schema()), + ) { + let inverted_pruning_stats = RowGroupPruningStatistics { + parquet_schema, + row_group_metadatas: fully_contained_candidates_original_idx + .iter() + .map(|&i| &groups[i]) + .collect::>(), + arrow_schema, + }; + + if let Ok(inverted_values) = + inverted_predicate.prune(&inverted_pruning_stats) + { + for (i, &original_row_group_idx) in + fully_contained_candidates_original_idx.iter().enumerate() + { + // If the inverted predicate *also* prunes this row group (meaning inverted_values[i] is false), + // it implies that *all* rows in this group satisfy the original predicate. + if !inverted_values[i] { + self.is_fully_matched[original_row_group_idx] = true; + metrics.row_groups_fully_matched_statistics.add(1); + } + } + } } } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 450ccc5d0620e..339d36b57cc35 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -575,6 +575,7 @@ impl FileSource for ParquetSource { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, + enable_limit_pruning: base_config.limit_pruning, schema_adapter_factory, coerce_int96, #[cfg(feature = "parquet_encryption")] diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 5847a8cf5e11f..02d9762a4a396 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -201,6 +201,8 @@ pub struct FileScanConfig { /// Expression adapter used to adapt filters and projections that are pushed down into the scan /// from the logical schema to the physical schema of the file. pub expr_adapter_factory: Option>, + /// If there is a limit pushed down at the logical plan level, we can enable limit_pruning + pub limit_pruning: bool, } /// A builder for [`FileScanConfig`]'s. @@ -276,6 +278,8 @@ pub struct FileScanConfigBuilder { new_lines_in_values: Option, batch_size: Option, expr_adapter_factory: Option>, + /// If there is a limit pushed down at the logical plan level, we can enable limit_pruning + limit_pruning: bool, } impl FileScanConfigBuilder { @@ -304,6 +308,7 @@ impl FileScanConfigBuilder { constraints: None, batch_size: None, expr_adapter_factory: None, + limit_pruning: false, } } @@ -444,6 +449,12 @@ impl FileScanConfigBuilder { self } + /// Enable or disable limit pruning. + pub fn with_limit_pruning(mut self, limit_pruning: bool) -> Self { + self.limit_pruning = limit_pruning; + self + } + /// Build the final [`FileScanConfig`] with all the configured settings. /// /// This method takes ownership of the builder and returns the constructed `FileScanConfig`. @@ -463,6 +474,7 @@ impl FileScanConfigBuilder { new_lines_in_values, batch_size, expr_adapter_factory: expr_adapter, + limit_pruning, } = self; let constraints = constraints.unwrap_or_default(); @@ -495,6 +507,7 @@ impl FileScanConfigBuilder { new_lines_in_values, batch_size, expr_adapter_factory: expr_adapter, + limit_pruning, } } } @@ -517,6 +530,7 @@ impl From for FileScanConfigBuilder { constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, + limit_pruning: config.limit_pruning, } } } diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 4c7b37113d58d..3bc1bddd2a8dc 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -243,7 +243,7 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { /// This SchemaAdapter requires both the table schema and the projected table /// schema. See [`SchemaMapping`] for more details #[derive(Clone, Debug)] -pub(crate) struct DefaultSchemaAdapter { +pub struct DefaultSchemaAdapter { /// The schema for the table, projected to include only the fields being output (projected) by the /// associated ParquetSource projected_table_schema: SchemaRef, diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index a8adb46b96ffa..90c26f661cb68 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -254,6 +254,20 @@ impl ExecutionPlan for DataSinkExec { fn metrics(&self) -> Option { self.sink.metrics() } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = DataSinkExec::new( + Arc::clone(self.input()), + Arc::clone(&self.sink), + self.sort_order.clone(), + ); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Create a output record batch with a count diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 11a8a3867b809..7169997bd0316 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -72,8 +72,8 @@ use datafusion_physical_plan::filter_pushdown::{ /// ```text /// ┌─────────────────────┐ -----► execute path /// │ │ ┄┄┄┄┄► init path -/// │ DataSourceExec │ -/// │ │ +/// │ DataSourceExec │ +/// │ │ /// └───────▲─────────────┘ /// ┊ │ /// ┊ │ @@ -328,6 +328,15 @@ impl ExecutionPlan for DataSourceExec { } } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = DataSourceExec::new(Arc::clone(&self.data_source)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index d6b55182aa6ba..56e3339342691 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -260,7 +260,7 @@ fn insufficient_capacity_err( additional: usize, available: usize, ) -> DataFusionError { - resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool", + resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool", human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size), human_readable_size(available)) } diff --git a/datafusion/expr-common/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs index 2829a9416f033..99c121e3e81e8 100644 --- a/datafusion/expr-common/src/accumulator.rs +++ b/datafusion/expr-common/src/accumulator.rs @@ -311,4 +311,15 @@ pub trait Accumulator: Send + Sync + Debug { fn supports_retract_batch(&self) -> bool { false } + + fn as_serializable(&self) -> Option<&dyn SerializableAccumulator> { + None + } +} + +pub trait SerializableAccumulator: Accumulator { + fn serialize(&self) -> Result>; + fn deserialize(bytes: &[u8]) -> Result> + where + Self: Sized; } diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 9c3c5df7007ff..16eae571e3723 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -148,6 +148,25 @@ pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Resul .data() } +pub fn replace_col_with_expr( + expr: Expr, + replace_map: &HashMap, +) -> Result { + expr.transform(|expr| { + Ok({ + if let Expr::Column(c) = &expr { + match replace_map.get(c) { + Some(new_expr) => Transformed::yes((**new_expr).to_owned()), + None => Transformed::no(expr), + } + } else { + Transformed::no(expr) + } + }) + }) + .data() +} + /// Recursively 'unnormalize' (remove all qualifiers) from an /// expression tree. /// diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 2b7cc9d46ad34..7a6b21731dee4 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -85,6 +85,7 @@ pub use datafusion_doc::{ Documentation, DocumentationBuilder, }; pub use datafusion_expr_common::accumulator::Accumulator; +pub use datafusion_expr_common::accumulator::SerializableAccumulator; pub use datafusion_expr_common::columnar_value::ColumnarValue; pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; pub use datafusion_expr_common::operator::Operator; diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 1c0790b3e3acd..2cc836f6cbd06 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -31,7 +31,7 @@ use datafusion_common::{ internal_err, plan_err, qualified_name, Column, DFSchema, Result, }; use datafusion_expr::expr::WindowFunction; -use datafusion_expr::expr_rewriter::replace_col; +use datafusion_expr::expr_rewriter::{replace_col, replace_col_with_expr}; use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan, TableScan, Union}; use datafusion_expr::utils::{ conjunction, expr_to_columns, split_conjunction, split_conjunction_owned, @@ -41,9 +41,8 @@ use datafusion_expr::{ }; use crate::optimizer::ApplyOrder; -use crate::simplify_expressions::simplify_predicates; use crate::utils::{has_all_column_refs, is_restrict_null_predicate}; -use crate::{OptimizerConfig, OptimizerRule}; +use crate::{simplify_expressions::simplify_predicates, OptimizerConfig, OptimizerRule}; /// Optimizer rule for pushing (moving) filter expressions down in a plan so /// they are applied as early as possible. @@ -795,7 +794,7 @@ impl OptimizerRule for PushDownFilter { // remove duplicated filters let child_predicates = split_conjunction_owned(child_filter.predicate); - let new_predicates = parents_predicates + let mut new_predicates = parents_predicates .into_iter() .chain(child_predicates) // use IndexSet to remove dupes while preserving predicate order @@ -803,6 +802,8 @@ impl OptimizerRule for PushDownFilter { .into_iter() .collect::>(); + new_predicates = infer_predicates_from_equalities(new_predicates)?; + let Some(new_predicate) = conjunction(new_predicates) else { return plan_err!("at least one expression exists"); }; @@ -1422,6 +1423,73 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } +/// Infers new predicates by substituting equalities. +/// For example, with predicates `t2.b = 3` and `t1.b > t2.b`, +/// we can infer `t1.b > 3`. +fn infer_predicates_from_equalities(predicates: Vec) -> Result> { + // Map from column names to their literal values (from equality predicates) + let mut equality_map: HashMap = + HashMap::with_capacity(predicates.len()); + let mut final_predicates = Vec::with_capacity(predicates.len()); + // First pass: collect column=literal equalities + for predicate in predicates.iter() { + if let Expr::BinaryExpr(BinaryExpr { + left, + op: Operator::Eq, + right, + }) = predicate + { + if let Expr::Column(col) = left.as_ref() { + // Only add to map if right side is a literal + if matches!(right.as_ref(), Expr::Literal(_, _)) { + equality_map.insert(col.clone(), *right.clone()); + final_predicates.push(predicate.clone()); + } + } else if let Expr::Column(col) = right.as_ref() { + // Only add to map if left side is a literal + if matches!(left.as_ref(), Expr::Literal(_, _)) { + equality_map.insert(col.clone(), *right.clone()); + final_predicates.push(predicate.clone()); + } + } + } + } + + // If no equality mappings found, nothing to infer + if equality_map.is_empty() { + return Ok(predicates); + } + + // Second pass: apply substitutions to create new predicates + for predicate in predicates { + // Skip equality predicates we already used for mapping + if final_predicates.contains(&predicate) { + continue; + } + + // Try to replace columns with their literal values + let mut columns_in_expr = HashSet::new(); + expr_to_columns(&predicate, &mut columns_in_expr)?; + + // Create a combined replacement map for all columns in this predicate + let replace_map: HashMap<_, _> = columns_in_expr + .into_iter() + .filter_map(|col| equality_map.get(&col).map(|lit| (col, lit))) + .collect(); + + if replace_map.is_empty() { + final_predicates.push(predicate); + continue; + } + // Apply all substitutions at once to get the fully substituted predicate + let new_pred = replace_col_with_expr(predicate, &replace_map)?; + + final_predicates.push(new_pred); + } + + Ok(final_predicates) +} + #[cfg(test)] mod tests { use std::any::Any; diff --git a/datafusion/physical-expr/src/simplifier/mod.rs b/datafusion/physical-expr/src/simplifier/mod.rs index 80d6ee0a7b914..d0c787867dd06 100644 --- a/datafusion/physical-expr/src/simplifier/mod.rs +++ b/datafusion/physical-expr/src/simplifier/mod.rs @@ -24,8 +24,9 @@ use datafusion_common::{ }; use std::sync::Arc; -use crate::PhysicalExpr; +use crate::{simplifier::not::simplify_not_expr_recursive, PhysicalExpr}; +pub mod not; pub mod unwrap_cast; /// Simplifies physical expressions by applying various optimizations @@ -56,6 +57,11 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> { type Node = Arc; fn f_up(&mut self, node: Self::Node) -> Result> { + // Apply NOT expression simplification first + let not_simplified = simplify_not_expr_recursive(node, self.schema)?; + let node = not_simplified.data; + let transformed = not_simplified.transformed; + // Apply unwrap cast optimization #[cfg(test)] let original_type = node.data_type(self.schema).unwrap(); @@ -66,7 +72,12 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> { original_type, "Simplified expression should have the same data type as the original" ); - Ok(unwrapped) + // Combine transformation results + let final_transformed = transformed || unwrapped.transformed; + Ok(Transformed::new_transformed( + unwrapped.data, + final_transformed, + )) } } diff --git a/datafusion/physical-expr/src/simplifier/not.rs b/datafusion/physical-expr/src/simplifier/not.rs new file mode 100644 index 0000000000000..d3e69bc74904e --- /dev/null +++ b/datafusion/physical-expr/src/simplifier/not.rs @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Simplify NOT expressions in physical expressions +//! +//! This module provides optimizations for NOT expressions such as: +//! - Double negation elimination: NOT(NOT(expr)) -> expr +//! - NOT with binary comparisons: NOT(a = b) -> a != b +//! - NOT with IN expressions: NOT(a IN (list)) -> a NOT IN (list) +//! - De Morgan's laws: NOT(A AND B) -> NOT A OR NOT B +//! - Constant folding: NOT(TRUE) -> FALSE, NOT(FALSE) -> TRUE + +use std::sync::Arc; + +use arrow::datatypes::Schema; +use datafusion_common::{tree_node::Transformed, Result, ScalarValue}; +use datafusion_expr::Operator; + +use crate::expressions::{lit, BinaryExpr, Literal, NotExpr}; +use crate::PhysicalExpr; + +/// Attempts to simplify NOT expressions +pub(crate) fn simplify_not_expr( + expr: Arc, + schema: &Schema, +) -> Result>> { + // Check if this is a NOT expression + let not_expr = match expr.as_any().downcast_ref::() { + Some(not_expr) => not_expr, + None => return Ok(Transformed::no(expr)), + }; + + let inner_expr = not_expr.arg(); + + // Handle NOT(NOT(expr)) -> expr (double negation elimination) + if let Some(inner_not) = inner_expr.as_any().downcast_ref::() { + // Recursively simplify the inner expression + let simplified = + simplify_not_expr_recursive(Arc::clone(inner_not.arg()), schema)?; + // We eliminated double negation, so always return transformed=true + return Ok(Transformed::yes(simplified.data)); + } + + // Handle NOT(literal) -> !literal + if let Some(literal) = inner_expr.as_any().downcast_ref::() { + if let ScalarValue::Boolean(Some(val)) = literal.value() { + return Ok(Transformed::yes(lit(ScalarValue::Boolean(Some(!val))))); + } + if let ScalarValue::Boolean(None) = literal.value() { + return Ok(Transformed::yes(lit(ScalarValue::Boolean(None)))); + } + } + + // Handle NOT(binary_expr) where we can flip the operator + if let Some(binary_expr) = inner_expr.as_any().downcast_ref::() { + if let Some(negated_op) = negate_operator(binary_expr.op()) { + // Recursively simplify the left and right expressions first + let left_simplified = + simplify_not_expr_recursive(Arc::clone(binary_expr.left()), schema)?; + let right_simplified = + simplify_not_expr_recursive(Arc::clone(binary_expr.right()), schema)?; + + let new_binary = Arc::new(BinaryExpr::new( + left_simplified.data, + negated_op, + right_simplified.data, + )); + // We flipped the operator, so always return transformed=true + return Ok(Transformed::yes(new_binary)); + } + + // Handle De Morgan's laws for AND/OR + match binary_expr.op() { + Operator::And => { + // NOT(A AND B) -> NOT A OR NOT B + let not_left = Arc::new(NotExpr::new(Arc::clone(binary_expr.left()))); + let not_right = Arc::new(NotExpr::new(Arc::clone(binary_expr.right()))); + + // Recursively simplify the NOT expressions + let simplified_left = simplify_not_expr_recursive(not_left, schema)?; + let simplified_right = simplify_not_expr_recursive(not_right, schema)?; + + let new_binary = Arc::new(BinaryExpr::new( + simplified_left.data, + Operator::Or, + simplified_right.data, + )); + return Ok(Transformed::yes(new_binary)); + } + Operator::Or => { + // NOT(A OR B) -> NOT A AND NOT B + let not_left = Arc::new(NotExpr::new(Arc::clone(binary_expr.left()))); + let not_right = Arc::new(NotExpr::new(Arc::clone(binary_expr.right()))); + + // Recursively simplify the NOT expressions + let simplified_left = simplify_not_expr_recursive(not_left, schema)?; + let simplified_right = simplify_not_expr_recursive(not_right, schema)?; + + let new_binary = Arc::new(BinaryExpr::new( + simplified_left.data, + Operator::And, + simplified_right.data, + )); + return Ok(Transformed::yes(new_binary)); + } + _ => {} + } + } + + // If no simplification possible, return the original expression + Ok(Transformed::no(expr)) +} + +/// Helper function that recursively simplifies expressions, including NOT expressions +pub fn simplify_not_expr_recursive( + expr: Arc, + schema: &Schema, +) -> Result>> { + // First, try to simplify any NOT expressions in this expression + let not_simplified = simplify_not_expr(Arc::clone(&expr), schema)?; + + // If the expression was transformed, we might have created new opportunities for simplification + if not_simplified.transformed { + // Recursively simplify the result + let further_simplified = + simplify_not_expr_recursive(Arc::clone(¬_simplified.data), schema)?; + if further_simplified.transformed { + return Ok(Transformed::yes(further_simplified.data)); + } else { + return Ok(not_simplified); + } + } + + // If this expression wasn't a NOT expression, try to simplify its children + // This handles cases where NOT expressions might be nested deeper in the tree + if let Some(binary_expr) = expr.as_any().downcast_ref::() { + let left_simplified = + simplify_not_expr_recursive(Arc::clone(binary_expr.left()), schema)?; + let right_simplified = + simplify_not_expr_recursive(Arc::clone(binary_expr.right()), schema)?; + + if left_simplified.transformed || right_simplified.transformed { + let new_binary = Arc::new(BinaryExpr::new( + left_simplified.data, + *binary_expr.op(), + right_simplified.data, + )); + return Ok(Transformed::yes(new_binary)); + } + } + + Ok(not_simplified) +} + +/// Returns the negated version of a comparison operator, if possible +fn negate_operator(op: &Operator) -> Option { + match op { + Operator::Eq => Some(Operator::NotEq), + Operator::NotEq => Some(Operator::Eq), + Operator::Lt => Some(Operator::GtEq), + Operator::LtEq => Some(Operator::Gt), + Operator::Gt => Some(Operator::LtEq), + Operator::GtEq => Some(Operator::Lt), + Operator::IsDistinctFrom => Some(Operator::IsNotDistinctFrom), + Operator::IsNotDistinctFrom => Some(Operator::IsDistinctFrom), + // For other operators, we can't directly negate them + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::expressions::{col, lit, BinaryExpr, NotExpr}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use datafusion_expr::Operator; + + fn test_schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Boolean, false), + Field::new("b", DataType::Int32, false), + ]) + } + + #[test] + fn test_double_negation_elimination() -> Result<()> { + let schema = test_schema(); + + // Create NOT(NOT(b > 5)) + let inner_expr: Arc = Arc::new(BinaryExpr::new( + col("b", &schema)?, + Operator::Gt, + lit(ScalarValue::Int32(Some(5))), + )); + let inner_not = Arc::new(NotExpr::new(Arc::clone(&inner_expr))); + let double_not = Arc::new(NotExpr::new(inner_not)); + + let result = simplify_not_expr_recursive(double_not, &schema)?; + + assert!(result.transformed); + // Should be simplified back to the original b > 5 + assert_eq!(result.data.to_string(), inner_expr.to_string()); + Ok(()) + } + + #[test] + fn test_not_literal() -> Result<()> { + let schema = test_schema(); + + // NOT(TRUE) -> FALSE + let not_true = Arc::new(NotExpr::new(lit(ScalarValue::Boolean(Some(true))))); + let result = simplify_not_expr(not_true, &schema)?; + assert!(result.transformed); + + if let Some(literal) = result.data.as_any().downcast_ref::() { + assert_eq!(literal.value(), &ScalarValue::Boolean(Some(false))); + } else { + panic!("Expected literal result"); + } + + // NOT(FALSE) -> TRUE + let not_false = Arc::new(NotExpr::new(lit(ScalarValue::Boolean(Some(false))))); + let result = simplify_not_expr_recursive(not_false, &schema)?; + assert!(result.transformed); + + if let Some(literal) = result.data.as_any().downcast_ref::() { + assert_eq!(literal.value(), &ScalarValue::Boolean(Some(true))); + } else { + panic!("Expected literal result"); + } + + Ok(()) + } + + #[test] + fn test_negate_comparison() -> Result<()> { + let schema = test_schema(); + + // NOT(b = 5) -> b != 5 + let eq_expr = Arc::new(BinaryExpr::new( + col("b", &schema)?, + Operator::Eq, + lit(ScalarValue::Int32(Some(5))), + )); + let not_eq = Arc::new(NotExpr::new(eq_expr)); + + let result = simplify_not_expr_recursive(not_eq, &schema)?; + assert!(result.transformed); + + if let Some(binary) = result.data.as_any().downcast_ref::() { + assert_eq!(binary.op(), &Operator::NotEq); + } else { + panic!("Expected binary expression result"); + } + + Ok(()) + } + + #[test] + fn test_demorgans_law_and() -> Result<()> { + let schema = test_schema(); + + // NOT(a AND b) -> NOT a OR NOT b + let and_expr = Arc::new(BinaryExpr::new( + col("a", &schema)?, + Operator::And, + col("b", &schema)?, + )); + let not_and = Arc::new(NotExpr::new(and_expr)); + + let result = simplify_not_expr_recursive(not_and, &schema)?; + assert!(result.transformed); + + if let Some(binary) = result.data.as_any().downcast_ref::() { + assert_eq!(binary.op(), &Operator::Or); + // Left and right should both be NOT expressions + assert!(binary.left().as_any().downcast_ref::().is_some()); + assert!(binary.right().as_any().downcast_ref::().is_some()); + } else { + panic!("Expected binary expression result"); + } + + Ok(()) + } + + #[test] + fn test_demorgans_law_or() -> Result<()> { + let schema = test_schema(); + + // NOT(a OR b) -> NOT a AND NOT b + let or_expr = Arc::new(BinaryExpr::new( + col("a", &schema)?, + Operator::Or, + col("b", &schema)?, + )); + let not_or = Arc::new(NotExpr::new(or_expr)); + + let result = simplify_not_expr_recursive(not_or, &schema)?; + assert!(result.transformed); + + if let Some(binary) = result.data.as_any().downcast_ref::() { + assert_eq!(binary.op(), &Operator::And); + // Left and right should both be NOT expressions + assert!(binary.left().as_any().downcast_ref::().is_some()); + assert!(binary.right().as_any().downcast_ref::().is_some()); + } else { + panic!("Expected binary expression result"); + } + + Ok(()) + } + + #[test] + fn test_demorgans_with_comparison_simplification() -> Result<()> { + let schema = test_schema(); + + // NOT(b = 1 AND b = 2) -> b != 1 OR b != 2 + // This tests the combination of De Morgan's law and operator negation + let eq1 = Arc::new(BinaryExpr::new( + col("b", &schema)?, + Operator::Eq, + lit(ScalarValue::Int32(Some(1))), + )); + let eq2 = Arc::new(BinaryExpr::new( + col("b", &schema)?, + Operator::Eq, + lit(ScalarValue::Int32(Some(2))), + )); + let and_expr = Arc::new(BinaryExpr::new(eq1, Operator::And, eq2)); + let not_and = Arc::new(NotExpr::new(and_expr)); + + let result = simplify_not_expr_recursive(not_and, &schema)?; + assert!(result.transformed, "Expression should be transformed"); + + // Verify the result is an OR expression + if let Some(or_binary) = result.data.as_any().downcast_ref::() { + assert_eq!(or_binary.op(), &Operator::Or, "Top level should be OR"); + + // Verify left side is b != 1 + if let Some(left_binary) = + or_binary.left().as_any().downcast_ref::() + { + assert_eq!(left_binary.op(), &Operator::NotEq, "Left should be NotEq"); + } else { + panic!("Expected left to be a binary expression with !="); + } + + // Verify right side is b != 2 + if let Some(right_binary) = + or_binary.right().as_any().downcast_ref::() + { + assert_eq!(right_binary.op(), &Operator::NotEq, "Right should be NotEq"); + } else { + panic!("Expected right to be a binary expression with !="); + } + } else { + panic!("Expected binary OR expression result"); + } + + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index a6b5bf1871161..c9ca1edd85c79 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -332,6 +332,7 @@ pub trait AggregateWindowExpr: WindowExpr { return value.to_array_of_size(record_batch.num_rows()); } let order_bys = get_orderby_values(self.order_by_columns(record_batch)?); + let most_recent_row_order_bys = most_recent_row .map(|batch| self.order_by_columns(batch)) .transpose()? diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index e9e28fec064ff..9f7664d1c1066 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -944,9 +944,12 @@ fn add_hash_on_top( /// /// # Returns /// -/// Updated node with an execution plan, where the desired single distribution -/// requirement is satisfied. -fn add_merge_on_top(input: DistributionContext) -> DistributionContext { +/// Updated node with an execution plan, where desired single +/// distribution is satisfied by adding [`SortPreservingMergeExec`]. +fn add_merge_on_top( + input: DistributionContext, + fetch: &mut Option, +) -> DistributionContext { // Apply only when the partition count is larger than one. if input.plan.output_partitioning().partition_count() > 1 { // When there is an existing ordering, we preserve ordering @@ -956,10 +959,10 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext { // - Usage of order preserving variants is not desirable // (determined by flag `config.optimizer.prefer_existing_sort`) let new_plan = if let Some(req) = input.plan.output_ordering() { - Arc::new(SortPreservingMergeExec::new( - req.clone(), - Arc::clone(&input.plan), - )) as _ + Arc::new( + SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan)) + .with_fetch(fetch.take()), + ) as _ } else { // If there is no input order, we can simply coalesce partitions: Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _ @@ -988,20 +991,37 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext { /// ```text /// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet", /// ``` +#[allow(clippy::type_complexity)] fn remove_dist_changing_operators( mut distribution_context: DistributionContext, -) -> Result { +) -> Result<( + DistributionContext, + Option, + Option>, +)> { + let mut fetch = None; + let mut spm: Option> = None; while is_repartition(&distribution_context.plan) || is_coalesce_partitions(&distribution_context.plan) || is_sort_preserving_merge(&distribution_context.plan) { + if is_sort_preserving_merge(&distribution_context.plan) { + if let Some(child_fetch) = distribution_context.plan.fetch() { + if fetch.is_none() { + fetch = Some(child_fetch); + spm = Some(distribution_context.plan); + } else { + fetch = Some(fetch.unwrap().min(child_fetch)); + } + } + } // All of above operators have a single child. First child is only child. // Remove any distribution changing operators at the beginning: distribution_context = distribution_context.children.swap_remove(0); // Note that they will be re-inserted later on if necessary or helpful. } - Ok(distribution_context) + Ok((distribution_context, fetch, spm)) } /// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable. @@ -1022,27 +1042,36 @@ fn remove_dist_changing_operators( /// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", /// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet", /// ``` +#[allow(clippy::type_complexity)] pub fn replace_order_preserving_variants( mut context: DistributionContext, -) -> Result { - context.children = context - .children - .into_iter() - .map(|child| { - if child.data { - replace_order_preserving_variants(child) - } else { - Ok(child) - } - }) - .collect::>>()?; - + ordering_satisfied: bool, +) -> Result<(DistributionContext, Option)> { + let mut children = vec![]; + let mut fetch = None; + for child in context.children.into_iter() { + if child.data { + let (child, inner_fetch) = + replace_order_preserving_variants(child, ordering_satisfied)?; + children.push(child); + fetch = inner_fetch; + } else { + children.push(child); + } + } + context.children = children; if is_sort_preserving_merge(&context.plan) { + // Keep the fetch value of the SortPreservingMerge operator, maybe it will be used later. + let fetch = context.plan.fetch(); let child_plan = Arc::clone(&context.children[0].plan); - context.plan = Arc::new( - CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()), - ); - return Ok(context); + if !ordering_satisfied { + // It's safe to unwrap because `CoalescePartitionsExec` supports `fetch`. + context.plan = + Arc::new(CoalescePartitionsExec::new(child_plan).with_fetch(fetch)); + return Ok((context, None)); + } + context.plan = Arc::new(CoalescePartitionsExec::new(child_plan)); + return Ok((context, fetch)); } else if let Some(repartition) = context.plan.as_any().downcast_ref::() { @@ -1051,11 +1080,11 @@ pub fn replace_order_preserving_variants( Arc::clone(&context.children[0].plan), repartition.partitioning().clone(), )?); - return Ok(context); + return Ok((context, None)); } } - context.update_plan_from_children() + Ok((context.update_plan_from_children()?, fetch)) } /// A struct to keep track of repartition requirements for each child node. @@ -1197,11 +1226,15 @@ pub fn ensure_distribution( unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort; // Remove unnecessary repartition from the physical plan if any - let DistributionContext { - mut plan, - data, - children, - } = remove_dist_changing_operators(dist_context)?; + let ( + DistributionContext { + mut plan, + data, + children, + }, + mut fetch, + spm, + ) = remove_dist_changing_operators(dist_context)?; if let Some(exec) = plan.as_any().downcast_ref::() { if let Some(updated_window) = get_best_fitting_window( @@ -1234,43 +1267,39 @@ pub fn ensure_distribution( plan.maintains_input_order(), repartition_status_flags.into_iter() ) - .map( - |( - mut child, - required_input_ordering, - maintains, - RepartitionRequirementStatus { - requirement, - roundrobin_beneficial, - roundrobin_beneficial_stats, - hash_necessary, - }, - )| { - let add_roundrobin = enable_round_robin - // Operator benefits from partitioning (e.g. filter): - && roundrobin_beneficial - && roundrobin_beneficial_stats - // Unless partitioning increases the partition count, it is not beneficial: - && child.plan.output_partitioning().partition_count() < target_partitions; - - // When `repartition_file_scans` is set, attempt to increase - // parallelism at the source. - // - // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`) - // then no repartitioning will have occurred. As the default implementation returns None, it is only - // specific physical plan nodes, such as certain datasources, which are repartitioned. - if repartition_file_scans && roundrobin_beneficial_stats { - if let Some(new_child) = - child.plan.repartitioned(target_partitions, config)? - { - child.plan = new_child; + .map( + |( + mut child, + required_input_ordering, + maintains, + RepartitionRequirementStatus { + requirement, + roundrobin_beneficial, + roundrobin_beneficial_stats, + hash_necessary, + }, + )| { + let add_roundrobin = enable_round_robin + // Operator benefits from partitioning (e.g. filter): + && roundrobin_beneficial + && roundrobin_beneficial_stats + // Unless partitioning increases the partition count, it is not beneficial: + && child.plan.output_partitioning().partition_count() < target_partitions; + + // When `repartition_file_scans` is set, attempt to increase + // parallelism at the source. + if repartition_file_scans && roundrobin_beneficial_stats { + if let Some(new_child) = + child.plan.repartitioned(target_partitions, config)? + { + child.plan = new_child; + } } - } // Satisfy the distribution requirement if it is unmet. match &requirement { Distribution::SinglePartition => { - child = add_merge_on_top(child); + child = add_merge_on_top(child, &mut fetch); } Distribution::HashPartitioned(exprs) => { if add_roundrobin { @@ -1307,7 +1336,8 @@ pub fn ensure_distribution( if (!ordering_satisfied || !order_preserving_variants_desirable) && child.data { - child = replace_order_preserving_variants(child)?; + let (replaced_child, fetch) = replace_order_preserving_variants(child, ordering_satisfied)?; + child = replaced_child; // If ordering requirements were satisfied before repartitioning, // make sure ordering requirements are still satisfied after. if ordering_satisfied { @@ -1315,10 +1345,7 @@ pub fn ensure_distribution( child = add_sort_above_with_check( child, sort_req, - plan.as_any() - .downcast_ref::() - .map(|output| output.fetch()) - .unwrap_or(None), + fetch, )?; } } @@ -1330,20 +1357,22 @@ pub fn ensure_distribution( // Operator requires specific distribution. Distribution::SinglePartition | Distribution::HashPartitioned(_) => { // Since there is no ordering requirement, preserving ordering is pointless - child = replace_order_preserving_variants(child)?; + child = replace_order_preserving_variants(child, false)?.0; } Distribution::UnspecifiedDistribution => { // Since ordering is lost, trying to preserve ordering is pointless if !maintains || plan.as_any().is::() { - child = replace_order_preserving_variants(child)?; + child = replace_order_preserving_variants(child,false)?.0; } + } - } + }; } + Ok(child) - }, - ) - .collect::>>()?; + }, + ) + .collect::>>()?; let children_plans = children .iter() @@ -1382,9 +1411,20 @@ pub fn ensure_distribution( plan.with_new_children(children_plans)? }; - Ok(Transformed::yes(DistributionContext::new( - plan, data, children, - ))) + let mut optimized_distribution_ctx = + DistributionContext::new(Arc::clone(&plan), data, children); + + // If `fetch` was not consumed, it means that there was `SortPreservingMergeExec` with fetch before + // It was removed by `remove_dist_changing_operators` + // and we need to add it back. + if fetch.is_some() { + // It's safe to unwrap because `spm` is set only if `fetch` is set. + let plan = spm.unwrap().with_fetch(fetch.take()).unwrap(); + optimized_distribution_ctx = + DistributionContext::new(plan, data, vec![optimized_distribution_ctx]); + } + + Ok(Transformed::yes(optimized_distribution_ctx)) } /// Keeps track of distribution changing operators (like `RepartitionExec`, diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 30d1441f5773e..d0925a8aed2de 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1113,6 +1113,30 @@ impl ExecutionPlan for AggregateExec { Ok(FilterDescription::new().with_child(child_desc)) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = AggregateExec { + mode: self.mode, + group_by: self.group_by.clone(), + aggr_expr: self.aggr_expr.clone(), + filter_expr: self.filter_expr.clone(), + input_order_mode: self.input_order_mode.clone(), + input: Arc::clone(&self.input), + input_schema: Arc::clone(&self.input_schema), + schema: Arc::clone(&self.schema), + cache: self.cache.clone(), + limit: self.limit, + required_input_ordering: self.required_input_ordering.clone(), + metrics: self.metrics.clone(), + }; + + let new_props: PlanProperties = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } fn create_schema( diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index c696cf5aa5e60..571c51ffbd5ee 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -217,6 +217,22 @@ impl ExecutionPlan for AnalyzeExec { futures::stream::once(output), ))) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = AnalyzeExec::new( + self.verbose, + self.show_statistics, + self.metric_types.clone(), + Arc::clone(self.input()), + Arc::clone(&self.schema), + ); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Creates the output of AnalyzeExec as a RecordBatch diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index eb3c3b5befbdd..1eeb477a39b21 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -223,7 +223,17 @@ impl ExecutionPlan for CoalesceBatchesExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } - + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + CoalesceBatchesExec::new(Arc::clone(self.input()), self.target_batch_size) + .with_fetch(self.fetch()); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } fn gather_filters_for_pushdown( &self, _phase: FilterPushdownPhase, diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 2597dc6408dee..31e5a7369cab3 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -237,6 +237,16 @@ impl ExecutionPlan for CoalescePartitionsExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = CoalescePartitionsExec::new(Arc::clone(self.input())); + new_plan.fetch = self.fetch; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } /// Tries to swap `projection` with its input, which is known to be a /// [`CoalescePartitionsExec`]. If possible, performs the swap and returns diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 35ca0b65ae294..66eaa7d89f8ff 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -416,6 +416,12 @@ impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { write!(self.f, "{:indent$}", "", indent = self.indent * 2)?; plan.fmt_as(self.t, self.f)?; + + // MAX: disable this for now since we don't need it displayed + it fails many DF tests + //if let Some(node_id) = plan.properties().node_id() { + // write!(self.f, ", node_id={}", node_id)?; + //} + match self.show_metrics { ShowMetrics::None => {} ShowMetrics::Aggregated => { diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 40b4ec61dc102..acd00ac3fa255 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -175,6 +175,16 @@ impl ExecutionPlan for EmptyExec { None, )) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = EmptyExec::new(Arc::clone(&self.schema)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index ffa9611d26e85..340e5662cebcb 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -520,6 +520,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Unknown } + /// If supported, returns a copy of this `ExecutionPlan` node with the specified + /// node_id. Returns `None` otherwise. + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + Ok(None) + } /// Attempts to push down the given projection into the input of this `ExecutionPlan`. /// @@ -742,6 +750,11 @@ pub trait ExecutionPlanProperties { /// /// [`FilterExec`]: crate::filter::FilterExec fn equivalence_properties(&self) -> &EquivalenceProperties; + + // Node Id of this ExecutionPlan node. See also [`ExecutionPlan::with_node_id`] + fn node_id(&self) -> Option { + None + } } impl ExecutionPlanProperties for Arc { @@ -764,6 +777,10 @@ impl ExecutionPlanProperties for Arc { fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } + + fn node_id(&self) -> Option { + self.properties().node_id() + } } impl ExecutionPlanProperties for &dyn ExecutionPlan { @@ -786,6 +803,10 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } + + fn node_id(&self) -> Option { + self.properties().node_id() + } } /// Represents whether a stream of data **generated** by an operator is bounded (finite) @@ -989,6 +1010,8 @@ pub struct PlanProperties { pub scheduling_type: SchedulingType, /// See [ExecutionPlanProperties::output_ordering] output_ordering: Option, + /// See [ExecutionPlanProperties::node_id] + node_id: Option, } impl PlanProperties { @@ -1009,6 +1032,7 @@ impl PlanProperties { evaluation_type: EvaluationType::Lazy, scheduling_type: SchedulingType::NonCooperative, output_ordering, + node_id: None, } } @@ -1027,6 +1051,12 @@ impl PlanProperties { self } + /// Overwrite node id with its new value. + pub fn with_node_id(mut self, node_id: usize) -> Self { + self.node_id = Some(node_id); + self + } + /// Overwrite boundedness with its new value. pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self { self.boundedness = boundedness; @@ -1073,6 +1103,10 @@ impl PlanProperties { self.output_ordering.as_ref() } + pub fn node_id(&self) -> Option { + self.node_id + } + /// Get schema of the node. pub(crate) fn schema(&self) -> &SchemaRef { self.eq_properties.schema() diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 5ba508a8defe1..2ee7d283d8ad8 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -420,6 +420,18 @@ impl ExecutionPlan for FilterExec { CardinalityEffect::LowerEqual } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + FilterExec::try_new(Arc::clone(&self.predicate), Arc::clone(self.input()))? + .with_projection(self.projection.clone())?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + /// Tries to swap `projection` with its input (`filter`). If possible, performs /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`. fn try_swapping_with_projection( diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index fc32bb6fc94c7..877a7b58efa89 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -366,6 +366,16 @@ impl ExecutionPlan for CrossJoinExec { Ok(stats_cartesian_product(left_stats, right_stats)) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + CrossJoinExec::new(Arc::clone(&self.left), Arc::clone(&self.right)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, /// it returns the new swapped version having the [`CrossJoinExec`] as the top plan. /// Otherwise, it returns None. diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c552e6954c8f9..2f0e4f6b60e6b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1065,6 +1065,25 @@ impl ExecutionPlan for HashJoinExec { Ok(stats.project(self.projection.as_ref())) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = HashJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + self.join_type(), + self.projection.clone(), + *self.partition_mode(), + self.null_equality, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + /// Tries to push `projection` down through `hash_join`. If possible, performs the /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections /// as its children. Otherwise, returns `None`. diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 592878a3bb1c5..7cc95476b1058 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -537,6 +537,24 @@ impl ExecutionPlan for SortMergeJoinExec { ) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = SortMergeJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + self.join_type(), + self.sort_options.clone(), + self.null_equality, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + /// Tries to swap the projection with its input [`SortMergeJoinExec`]. If it can be done, /// it returns the new swapped version having the [`SortMergeJoinExec`] as the top plan. /// Otherwise, it returns None. diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index be4646e88bd76..886f9b7040dae 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -473,6 +473,26 @@ impl ExecutionPlan for SymmetricHashJoinExec { Ok(Statistics::new_unknown(&self.schema())) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = SymmetricHashJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + self.join_type(), + self.null_equality, + self.left_sort_exprs.clone(), + self.right_sort_exprs.clone(), + self.mode, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 17628fd8ad1d2..c8e8049c3b324 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -76,6 +76,7 @@ pub mod joins; pub mod limit; pub mod memory; pub mod metrics; +pub mod node_id; pub mod placeholder_row; pub mod projection; pub mod recursive_query; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 6a0cae20e5aa6..5bdd697b3555b 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -207,6 +207,17 @@ impl ExecutionPlan for GlobalLimitExec { fn supports_limit_pushdown(&self) -> bool { true } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + GlobalLimitExec::new(Arc::clone(self.input()), self.skip, self.fetch); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// LocalLimitExec applies a limit to a single partition diff --git a/datafusion/physical-plan/src/node_id.rs b/datafusion/physical-plan/src/node_id.rs new file mode 100644 index 0000000000000..7b8d0281eb73b --- /dev/null +++ b/datafusion/physical-plan/src/node_id.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::ExecutionPlan; + +use datafusion_common::DataFusionError; + +// Util for traversing ExecutionPlan tree and annotating node_id +pub struct NodeIdAnnotator { + next_id: usize, +} + +impl NodeIdAnnotator { + pub fn new() -> Self { + NodeIdAnnotator { next_id: 0 } + } + + fn annotate_execution_plan_with_node_id( + &mut self, + plan: Arc, + ) -> Result, DataFusionError> { + let plan_with_id = Arc::clone(&plan) + .with_node_id(self.next_id)? + .unwrap_or(plan); + self.next_id += 1; + Ok(plan_with_id) + } +} + +impl Default for NodeIdAnnotator { + fn default() -> Self { + Self::new() + } +} + +pub fn annotate_node_id_for_execution_plan( + plan: &Arc, + annotator: &mut NodeIdAnnotator, +) -> Result, DataFusionError> { + let mut new_children: Vec> = vec![]; + for child in plan.children() { + let new_child: Arc = + annotate_node_id_for_execution_plan(child, annotator)?; + new_children.push(new_child); + } + let new_plan = Arc::clone(plan).with_new_children(new_children)?; + let new_plan_with_id = annotator.annotate_execution_plan_with_node_id(new_plan)?; + Ok(new_plan_with_id) +} diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index e7df79f867d70..922f3e3d2d667 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -187,6 +187,16 @@ impl ExecutionPlan for PlaceholderRowExec { None, )) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = PlaceholderRowExec::new(Arc::clone(&self.schema)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index ead2196860cde..cfdaa4e9d9fd4 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -302,6 +302,16 @@ impl ExecutionPlan for ProjectionExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + ProjectionExec::try_new(self.expr().to_vec(), Arc::clone(self.input()))?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } fn try_swapping_with_projection( &self, diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 163f214444d09..da0c75696ac3d 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -206,6 +206,21 @@ impl ExecutionPlan for RecursiveQueryExec { fn statistics(&self) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = RecursiveQueryExec::try_new( + self.name.clone(), + Arc::clone(&self.static_term), + Arc::clone(&self.recursive_term), + self.is_distinct, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } impl DisplayAs for RecursiveQueryExec { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 74cf798895998..f215c6233b3a0 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1013,6 +1013,21 @@ impl ExecutionPlan for RepartitionExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = RepartitionExec { + input: Arc::clone(&self.input), + state: Arc::clone(&self.state), + metrics: self.metrics.clone(), + preserve_order: self.preserve_order, + cache: self.cache.clone(), + }; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } fn try_swapping_with_projection( &self, diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 7a623b0c30d32..f044d3bf56cdf 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -323,6 +323,24 @@ impl ExecutionPlan for PartialSortExec { fn partition_statistics(&self, partition: Option) -> Result { self.input.partition_statistics(partition) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = PartialSortExec { + expr: self.expr.clone(), + input: Arc::clone(&self.input), + common_prefix_length: self.common_prefix_length, + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + fetch: self.fetch, + cache: self.cache.clone(), + }; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } struct PartialSortStream { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index a95fad19f614b..19239f60cdecc 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1315,6 +1315,22 @@ impl ExecutionPlan for SortExec { CardinalityEffect::LowerEqual } } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let new_plan = SortExec { + input: Arc::clone(self.input()), + expr: self.expr.clone(), + fetch: self.fetch, + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + cache: self.cache.clone().with_node_id(node_id), + common_sort_prefix: self.common_sort_prefix.clone(), + filter: self.filter.clone(), + }; + Ok(Some(Arc::new(new_plan))) + } /// Tries to swap the projection with its input [`SortExec`]. If it can be done, /// it returns the new swapped version having the [`SortExec`] as the top plan. diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 3a94f156fa9b3..11f42c8153e08 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -363,6 +363,17 @@ impl ExecutionPlan for SortPreservingMergeExec { true } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + SortPreservingMergeExec::new(self.expr.clone(), Arc::clone(self.input())) + .with_fetch(self.fetch()); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } /// Tries to swap the projection with its input [`SortPreservingMergeExec`]. /// If this is possible, it returns the new [`SortPreservingMergeExec`] whose /// child is a projection. Otherwise, it returns None. diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index f9a7feb9e726e..5da512f86f106 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -339,6 +339,25 @@ impl ExecutionPlan for StreamingTableExec { metrics: self.metrics.clone(), })) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = StreamingTableExec { + partitions: self.partitions.clone(), + projection: self.projection.clone(), + projected_schema: Arc::clone(&self.projected_schema), + projected_output_ordering: self.projected_output_ordering.clone(), + infinite: self.infinite, + limit: self.limit, + cache: self.cache.clone(), + metrics: self.metrics.clone(), + }; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index c95678dac9cdd..1b939c003d65b 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -334,6 +334,16 @@ impl ExecutionPlan for UnionExec { true } + #[allow(deprecated)] + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = UnionExec::new(self.inputs.clone()); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } /// Tries to push `projection` down through `union`. If possible, performs the /// pushdown and returns a new [`UnionExec`] as the top plan which has projections /// as its children. Otherwise, returns `None`. diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 7212c764130e0..f6ef85dc59795 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -267,6 +267,22 @@ impl ExecutionPlan for UnnestExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = UnnestExec::new( + Arc::clone(self.input()), + self.list_column_indices.clone(), + self.struct_column_indices.clone(), + Arc::clone(&self.schema), + self.options.clone(), + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[derive(Clone, Debug)] diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 1b7cb9bb76e1b..511d8e69fa167 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -301,6 +301,20 @@ impl ExecutionPlan for WindowAggExec { Ok(Statistics::new_unknown(&self.schema())) } } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = WindowAggExec::try_new( + self.window_expr.clone(), + Arc::clone(self.input()), + self.can_repartition, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Compute the window aggregate columns diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 40a22f94b81f6..dfdbc37daca27 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -213,6 +213,17 @@ impl ExecutionPlan for WorkTableExec { Ok(Statistics::new_unknown(&self.schema())) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + WorkTableExec::new(self.name.clone(), Arc::clone(&self.schema)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + fn partition_statistics(&self, _partition: Option) -> Result { Ok(Statistics::new_unknown(&self.schema())) } diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index b16b12bc05162..27917739dee99 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -104,6 +104,8 @@ //! # use datafusion::prelude::*; //! # use datafusion_common::Result; //! # use datafusion_proto::bytes::{physical_plan_from_bytes,physical_plan_to_bytes}; +//! # use datafusion_physical_plan::node_id::annotate_node_id_for_execution_plan; +//! # use datafusion_physical_plan::node_id::NodeIdAnnotator; //! # #[tokio::main] //! # async fn main() -> Result<()>{ //! // Create a plan that scans table 't' @@ -116,6 +118,11 @@ //! //! // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan //! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?; +//! +//! // Workaround for `node_id` not being serializable: +//! let mut annotator = NodeIdAnnotator::new(); +//! let physical_round_trip = annotate_node_id_for_execution_plan(&physical_round_trip, &mut annotator)?; +//! //! assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip)); //! # Ok(()) //! # } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index c8b2bc02e447b..0fd6a3b834741 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -33,6 +33,9 @@ use arrow::datatypes::{Fields, TimeUnit}; use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::metrics::MetricType; +use datafusion::physical_plan::node_id::{ + annotate_node_id_for_execution_plan, NodeIdAnnotator, +}; use datafusion_expr::dml::InsertOp; use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf; use datafusion_functions_aggregate::array_agg::array_agg_udaf; @@ -139,13 +142,21 @@ fn roundtrip_test_and_return( ctx: &SessionContext, codec: &dyn PhysicalExtensionCodec, ) -> Result> { + let mut annotator = NodeIdAnnotator::new(); + let exec_plan = annotate_node_id_for_execution_plan(&exec_plan, &mut annotator)?; let proto: protobuf::PhysicalPlanNode = protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), codec) .expect("to proto"); - let result_exec_plan: Arc = proto + let mut result_exec_plan: Arc = proto .try_into_physical_plan(&ctx.task_ctx(), codec) .expect("from proto"); + // Qi: workaround for NodeId not being serialized/deserialized, + // otherwise the assert_eq! below will fail + let mut annotator2 = NodeIdAnnotator::new(); + result_exec_plan = + annotate_node_id_for_execution_plan(&result_exec_plan, &mut annotator2)?; + pretty_assertions::assert_eq!( format!("{exec_plan:?}"), format!("{result_exec_plan:?}") diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 380ada10df6e1..626a86cf10679 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -473,7 +473,6 @@ impl PruningPredicate { // Simplify the newly created predicate to get rid of redundant casts, comparisons, etc. let predicate_expr = PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?; - let literal_guarantees = LiteralGuarantee::analyze(&expr); Ok(Self { diff --git a/datafusion/sqllogictest/test_files/describe.slt b/datafusion/sqllogictest/test_files/describe.slt index 88347965c67a5..0fe9f03a797b0 100644 --- a/datafusion/sqllogictest/test_files/describe.slt +++ b/datafusion/sqllogictest/test_files/describe.slt @@ -81,8 +81,8 @@ int_col Int32 YES bigint_col Int64 YES float_col Float32 YES double_col Float64 YES -date_string_col Utf8View YES -string_col Utf8View YES +date_string_col Utf8 YES +string_col Utf8 YES timestamp_col Timestamp(ns) YES year Int32 YES month Int32 YES diff --git a/datafusion/sqllogictest/test_files/encrypted_parquet.slt b/datafusion/sqllogictest/test_files/encrypted_parquet.slt index d580b7d1ad2b8..326d7f42d3c83 100644 --- a/datafusion/sqllogictest/test_files/encrypted_parquet.slt +++ b/datafusion/sqllogictest/test_files/encrypted_parquet.slt @@ -29,11 +29,11 @@ STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/' OPTIONS ( -- Encryption properties 'format.crypto.file_encryption.encrypt_footer' 'true', 'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" - 'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450" - 'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451" + 'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450" + 'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451" -- Decryption properties - 'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" - 'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450" + 'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" + 'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450" 'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451" ) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index a3b6d40aea2d1..41585a5d2cc6d 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -301,8 +301,8 @@ initial_physical_plan 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec: order_by=[], dist_by=Unspecified, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] @@ -329,7 +329,7 @@ physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok @@ -346,8 +346,8 @@ initial_physical_plan_with_stats 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec: order_by=[], dist_by=Unspecified 02)--GlobalLimitExec: skip=0, fetch=10 @@ -375,7 +375,7 @@ physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok @@ -429,7 +429,7 @@ query TT explain select a from t1 where exists (select count(*) from t2); ---- logical_plan -01)LeftSemi Join: +01)LeftSemi Join: 02)--TableScan: t1 projection=[a] 03)--SubqueryAlias: __correlated_sq_1 04)----EmptyRelation: rows=1 diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 22f19a0af32e4..4f75557e45666 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -818,7 +818,7 @@ physical_plan 40)└───────────────────────────┘ query TT -explain select +explain select count(*) over(), row_number() over () from table1 @@ -931,7 +931,7 @@ physical_plan 27)└───────────────────────────┘ query TT -explain select +explain select rank() over(ORDER BY int_col DESC), row_number() over (ORDER BY int_col ASC) from table1 @@ -1470,8 +1470,8 @@ drop table t2; # prepare table statement ok CREATE UNBOUNDED EXTERNAL TABLE data ( - "date" DATE, - "ticker" VARCHAR, + "date" DATE, + "ticker" VARCHAR, "time" TIMESTAMP, ) STORED AS CSV WITH ORDER ("date", "ticker", "time") @@ -1480,8 +1480,8 @@ LOCATION './a.parquet'; # query query TT -explain SELECT * FROM data -WHERE ticker = 'A' +explain SELECT * FROM data +WHERE ticker = 'A' ORDER BY "date", "time"; ---- physical_plan @@ -1643,7 +1643,7 @@ physical_plan # same thing but order by time, date query TT -explain SELECT * FROM data +explain SELECT * FROM data WHERE ticker = 'A' AND CAST(time AS DATE) = date ORDER BY "time", "date"; ---- @@ -1736,8 +1736,8 @@ physical_plan # query query TT -explain SELECT * FROM data -WHERE date = '2006-01-02' +explain SELECT * FROM data +WHERE date = '2006-01-02' ORDER BY "ticker", "time"; ---- physical_plan diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 7a34b240bd7c7..dcf336c9be86e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -251,7 +251,7 @@ datafusion.execution.parquet.metadata_size_hint 524288 datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false -datafusion.execution.parquet.schema_force_view_types true +datafusion.execution.parquet.schema_force_view_types false datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page @@ -372,7 +372,7 @@ datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, t datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query -datafusion.execution.parquet.schema_force_view_types true (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. +datafusion.execution.parquet.schema_force_view_types false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index ae82aee5e1559..ffa7cb7e604d0 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -733,7 +733,7 @@ explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLi ---- logical_plan 01)Limit: skip=0, fetch=10 -02)--Cross Join: +02)--Cross Join: 03)----SubqueryAlias: t1 04)------Limit: skip=0, fetch=10 05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10 @@ -758,7 +758,7 @@ explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLi ---- logical_plan 01)Limit: skip=0, fetch=2 -02)--Cross Join: +02)--Cross Join: 03)----SubqueryAlias: t1 04)------Limit: skip=0, fetch=2 05)--------TableScan: testsubquerylimit projection=[a, b], fetch=2 diff --git a/datafusion/sqllogictest/test_files/listing_table_statistics.slt b/datafusion/sqllogictest/test_files/listing_table_statistics.slt index 37daf551c2c39..0b3b6106bbdc9 100644 --- a/datafusion/sqllogictest/test_files/listing_table_statistics.slt +++ b/datafusion/sqllogictest/test_files/listing_table_statistics.slt @@ -35,7 +35,7 @@ query TT explain format indent select * from t; ---- logical_plan TableScan: t projection=[int_col, str_col] -physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(212), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8View("a")) Max=Exact(Utf8View("d")) Null=Exact(0))]] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(212), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8("a")) Max=Exact(Utf8("d")) Null=Exact(0))]] statement ok drop table t; diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index a3234b4e7ee52..9b61c3a9acf78 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -45,7 +45,7 @@ describe data; ---- ints Map("entries": Struct("key": Utf8, "value": Int64), unsorted) NO strings Map("entries": Struct("key": Utf8, "value": Utf8), unsorted) NO -timestamp Utf8View NO +timestamp Utf8 NO query ??T SELECT * FROM data ORDER by ints['bytes'] DESC LIMIT 10; diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index c21f3129d4ee9..d74a13848e689 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -430,15 +430,15 @@ select arrow_typeof(binaryview_col), binaryview_col FROM binary_as_string_default; ---- -BinaryView 616161 BinaryView 616161 BinaryView 616161 -BinaryView 626262 BinaryView 626262 BinaryView 626262 -BinaryView 636363 BinaryView 636363 BinaryView 636363 -BinaryView 646464 BinaryView 646464 BinaryView 646464 -BinaryView 656565 BinaryView 656565 BinaryView 656565 -BinaryView 666666 BinaryView 666666 BinaryView 666666 -BinaryView 676767 BinaryView 676767 BinaryView 676767 -BinaryView 686868 BinaryView 686868 BinaryView 686868 -BinaryView 696969 BinaryView 696969 BinaryView 696969 +Binary 616161 LargeBinary 616161 BinaryView 616161 +Binary 626262 LargeBinary 626262 BinaryView 626262 +Binary 636363 LargeBinary 636363 BinaryView 636363 +Binary 646464 LargeBinary 646464 BinaryView 646464 +Binary 656565 LargeBinary 656565 BinaryView 656565 +Binary 666666 LargeBinary 666666 BinaryView 666666 +Binary 676767 LargeBinary 676767 BinaryView 676767 +Binary 686868 LargeBinary 686868 BinaryView 686868 +Binary 696969 LargeBinary 696969 BinaryView 696969 # Run an explain plan to show the cast happens in the plan (a CAST is needed for the predicates) query TT @@ -451,13 +451,13 @@ EXPLAIN binaryview_col LIKE '%a%'; ---- logical_plan -01)Filter: CAST(binary_as_string_default.binary_col AS Utf8View) LIKE Utf8View("%a%") AND CAST(binary_as_string_default.largebinary_col AS Utf8View) LIKE Utf8View("%a%") AND CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%") -02)--TableScan: binary_as_string_default projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[CAST(binary_as_string_default.binary_col AS Utf8View) LIKE Utf8View("%a%"), CAST(binary_as_string_default.largebinary_col AS Utf8View) LIKE Utf8View("%a%"), CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%")] +01)Filter: CAST(binary_as_string_default.binary_col AS Utf8) LIKE Utf8("%a%") AND CAST(binary_as_string_default.largebinary_col AS LargeUtf8) LIKE LargeUtf8("%a%") AND CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%") +02)--TableScan: binary_as_string_default projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[CAST(binary_as_string_default.binary_col AS Utf8) LIKE Utf8("%a%"), CAST(binary_as_string_default.largebinary_col AS LargeUtf8) LIKE LargeUtf8("%a%"), CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%")] physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 -02)--FilterExec: CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% +02)--FilterExec: CAST(binary_col@0 AS Utf8) LIKE %a% AND CAST(largebinary_col@1 AS LargeUtf8) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% 03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8) LIKE %a% AND CAST(largebinary_col@1 AS LargeUtf8) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% statement ok @@ -478,15 +478,15 @@ select arrow_typeof(binaryview_col), binaryview_col FROM binary_as_string_option; ---- -Utf8View aaa Utf8View aaa Utf8View aaa -Utf8View bbb Utf8View bbb Utf8View bbb -Utf8View ccc Utf8View ccc Utf8View ccc -Utf8View ddd Utf8View ddd Utf8View ddd -Utf8View eee Utf8View eee Utf8View eee -Utf8View fff Utf8View fff Utf8View fff -Utf8View ggg Utf8View ggg Utf8View ggg -Utf8View hhh Utf8View hhh Utf8View hhh -Utf8View iii Utf8View iii Utf8View iii +Utf8 aaa LargeUtf8 aaa Utf8View aaa +Utf8 bbb LargeUtf8 bbb Utf8View bbb +Utf8 ccc LargeUtf8 ccc Utf8View ccc +Utf8 ddd LargeUtf8 ddd Utf8View ddd +Utf8 eee LargeUtf8 eee Utf8View eee +Utf8 fff LargeUtf8 fff Utf8View fff +Utf8 ggg LargeUtf8 ggg Utf8View ggg +Utf8 hhh LargeUtf8 hhh Utf8View hhh +Utf8 iii LargeUtf8 iii Utf8View iii # Run an explain plan to show the cast happens in the plan (there should be no casts) query TT @@ -499,8 +499,8 @@ EXPLAIN binaryview_col LIKE '%a%'; ---- logical_plan -01)Filter: binary_as_string_option.binary_col LIKE Utf8View("%a%") AND binary_as_string_option.largebinary_col LIKE Utf8View("%a%") AND binary_as_string_option.binaryview_col LIKE Utf8View("%a%") -02)--TableScan: binary_as_string_option projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_option.binary_col LIKE Utf8View("%a%"), binary_as_string_option.largebinary_col LIKE Utf8View("%a%"), binary_as_string_option.binaryview_col LIKE Utf8View("%a%")] +01)Filter: binary_as_string_option.binary_col LIKE Utf8("%a%") AND binary_as_string_option.largebinary_col LIKE LargeUtf8("%a%") AND binary_as_string_option.binaryview_col LIKE Utf8View("%a%") +02)--TableScan: binary_as_string_option projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_option.binary_col LIKE Utf8("%a%"), binary_as_string_option.largebinary_col LIKE LargeUtf8("%a%"), binary_as_string_option.binaryview_col LIKE Utf8View("%a%")] physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a% @@ -665,8 +665,8 @@ query TT explain select * from foo where starts_with(column1, 'f'); ---- logical_plan -01)Filter: foo.column1 LIKE Utf8View("f%") -02)--TableScan: foo projection=[column1], partial_filters=[foo.column1 LIKE Utf8View("f%")] +01)Filter: foo.column1 LIKE Utf8("f%") +02)--TableScan: foo projection=[column1], partial_filters=[foo.column1 LIKE Utf8("f%")] physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 LIKE f% diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part index 4960ad1f4a914..6eabe0d2c31e4 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part @@ -59,7 +59,7 @@ logical_plan 03)----Projection: lineitem.l_extendedprice, lineitem.l_discount 04)------Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15) 05)--------Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount -06)----------Filter: (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG")) AND lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON") +06)----------Filter: lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON") AND (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG")) 07)------------TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], partial_filters=[lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG"), lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON"), lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)] 08)--------Filter: (part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1) 09)----------TableScan: part projection=[p_partkey, p_brand, p_size, p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15)] @@ -73,7 +73,7 @@ physical_plan 07)------------CoalesceBatchesExec: target_batch_size=8192 08)--------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4 09)----------------CoalesceBatchesExec: target_batch_size=8192 -10)------------------FilterExec: (l_quantity@1 >= Some(100),15,2 AND l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG) AND l_shipinstruct@4 = DELIVER IN PERSON, projection=[l_partkey@0, l_quantity@1, l_extendedprice@2, l_discount@3] +10)------------------FilterExec: l_shipinstruct@4 = DELIVER IN PERSON AND (l_quantity@1 >= Some(100),15,2 AND l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG), projection=[l_partkey@0, l_quantity@1, l_extendedprice@2, l_discount@3] 11)--------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], file_type=csv, has_header=false 12)------------CoalesceBatchesExec: target_batch_size=8192 13)--------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4 diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 0c8b8c6edb1fc..89ff794e7cf80 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -21,11 +21,11 @@ statement ok CREATE TABLE t1( - id INT, + id INT, name TEXT ) as VALUES - (1, 'Alex'), - (2, 'Bob'), + (1, 'Alex'), + (2, 'Bob'), (3, 'Alice') ; @@ -34,20 +34,20 @@ CREATE TABLE t2( id TINYINT, name TEXT ) as VALUES - (1, 'Alex'), - (2, 'Bob'), + (1, 'Alex'), + (2, 'Bob'), (3, 'John') ; # union with EXCEPT(JOIN) query T rowsort -( +( SELECT name FROM t1 EXCEPT SELECT name FROM t2 -) +) UNION ALL -( +( SELECT name FROM t2 EXCEPT SELECT name FROM t1 @@ -58,13 +58,13 @@ John # union with type coercion query IT rowsort -( +( SELECT * FROM t1 EXCEPT SELECT * FROM t2 -) +) UNION ALL -( +( SELECT * FROM t2 EXCEPT SELECT * FROM t1 @@ -584,11 +584,11 @@ OPTIONS ('format.has_header' 'true'); query TT explain SELECT c1 FROM( -( +( SELECT c1 FROM t1 -) +) UNION ALL -( +( SELECT c1a FROM t2 )) ORDER BY c1 @@ -764,8 +764,8 @@ DROP TABLE t4; # Test issue: https://github.com/apache/datafusion/issues/11742 query R rowsort -WITH - tt(v1) AS (VALUES (1::INT),(NULL::INT)) +WITH + tt(v1) AS (VALUES (1::INT),(NULL::INT)) SELECT NVL(v1, 0.5) FROM tt UNION ALL SELECT NULL WHERE FALSE; diff --git a/dev/changelog/44.0.0.md b/dev/changelog/44.0.0.md index 233e302e50e69..b3f10f6794b53 100644 --- a/dev/changelog/44.0.0.md +++ b/dev/changelog/44.0.0.md @@ -19,7 +19,7 @@ under the License. # Apache DataFusion 44.0.0 Changelog -This release consists of 332 commits from 94 contributors. See credits at the end of this changelog for more information. +This release consists of 329 commits from 94 contributors. See credits at the end of this changelog for more information. **Breaking changes:** @@ -110,6 +110,7 @@ This release consists of 332 commits from 94 contributors. See credits at the en - Support unicode character for `initcap` function [#13752](https://github.com/apache/datafusion/pull/13752) (tlm365) - [minor] make recursive package dependency optional [#13778](https://github.com/apache/datafusion/pull/13778) (buraksenn) - Fix `recursive-protection` feature flag [#13887](https://github.com/apache/datafusion/pull/13887) (alamb) +- Prepare for 44.0.0 release: version and changelog [#13882](https://github.com/apache/datafusion/pull/13882) (alamb) **Other:** @@ -362,13 +363,15 @@ This release consists of 332 commits from 94 contributors. See credits at the en - Minor: change the sort merge join emission as incremental [#13894](https://github.com/apache/datafusion/pull/13894) (berkaysynnada) - Minor: change visibility of hash join utils [#13893](https://github.com/apache/datafusion/pull/13893) (berkaysynnada) - Fix visibility of `swap_hash_join` to be `pub` [#13899](https://github.com/apache/datafusion/pull/13899) (alamb) +- Minor: Avoid emitting empty batches in partial sort [#13895](https://github.com/apache/datafusion/pull/13895) (berkaysynnada) +- BACKPORT: Correct return type for initcap scalar function with utf8view (#13909) [#13934](https://github.com/apache/datafusion/pull/13934) (alamb) ## Credits Thank you to everyone who contributed to this release. Here is a breakdown of commits (PRs merged) per contributor. ``` - 59 Andrew Lamb + 55 Andrew Lamb 35 Piotr Findeisen 16 Jonathan Chen 14 Jonah Gao @@ -383,13 +386,13 @@ Thank you to everyone who contributed to this release. Here is a breakdown of co 5 Dmitrii Blaginin 5 Qianqian 4 Adrian Garcia Badaracco + 4 Berkay Şahin 4 Marco Neumann 4 Tai Le Manh 4 Tim Saucer 4 zhuliquan 3 Andy Grove 3 Arttu - 3 Berkay Şahin 3 Burak Şen 3 Onur Satici 3 Qi Zhu diff --git a/dev/changelog/48.0.0.md b/dev/changelog/48.0.0.md index 9cf6c03b7acf0..42f128bcb7b51 100644 --- a/dev/changelog/48.0.0.md +++ b/dev/changelog/48.0.0.md @@ -19,7 +19,7 @@ under the License. # Apache DataFusion 48.0.0 Changelog -This release consists of 267 commits from 89 contributors. See credits at the end of this changelog for more information. +This release consists of 269 commits from 89 contributors. See credits at the end of this changelog for more information. **Breaking changes:** @@ -94,6 +94,7 @@ This release consists of 267 commits from 89 contributors. See credits at the en - fix: metadata of join schema [#16221](https://github.com/apache/datafusion/pull/16221) (chenkovsky) - fix: add missing row count limits to TPC-H queries [#16230](https://github.com/apache/datafusion/pull/16230) (0ax1) - fix: NaN semantics in GROUP BY [#16256](https://github.com/apache/datafusion/pull/16256) (chenkovsky) +- fix: [branch-48] Revert "Improve performance of constant aggregate window expression" [#16307](https://github.com/apache/datafusion/pull/16307) (andygrove) **Documentation updates:** @@ -305,6 +306,7 @@ This release consists of 267 commits from 89 contributors. See credits at the en - Handle dicts for distinct count [#15871](https://github.com/apache/datafusion/pull/15871) (blaginin) - Add `--substrait-round-trip` option in sqllogictests [#16183](https://github.com/apache/datafusion/pull/16183) (gabotechs) - Minor: fix upgrade papercut `pub use PruningStatistics` [#16264](https://github.com/apache/datafusion/pull/16264) (alamb) +- chore: update DF48 changelog [#16269](https://github.com/apache/datafusion/pull/16269) (xudong963) ## Credits @@ -313,7 +315,7 @@ Thank you to everyone who contributed to this release. Here is a breakdown of co ``` 30 dependabot[bot] 29 Andrew Lamb - 16 xudong.w + 17 xudong.w 14 Adrian Garcia Badaracco 10 Chen Chongchen 8 Gabriel @@ -328,13 +330,13 @@ Thank you to everyone who contributed to this release. Here is a breakdown of co 4 Nuno Faria 4 Yongting You 4 logan-keede + 3 Andy Grove 3 Christian 3 Daniël Heres 3 Liam Bao 3 Phillip LeBlanc 3 Piotr Findeisen 3 ding-young - 2 Andy Grove 2 Atahan Yorgancı 2 Brayan Jules 2 Georgi Krastev diff --git a/dev/changelog/48.0.1.md b/dev/changelog/48.0.1.md new file mode 100644 index 0000000000000..dcd4cc9c15479 --- /dev/null +++ b/dev/changelog/48.0.1.md @@ -0,0 +1,41 @@ + + +# Apache DataFusion 48.0.1 Changelog + +This release consists of 3 commits from 2 contributors. See credits at the end of this changelog for more information. + +See the [upgrade guide](https://datafusion.apache.org/library-user-guide/upgrading.html) for information on how to upgrade from previous versions. + +**Bug Fixes:** + +- [branch-48] Set the default value of datafusion.execution.collect_statistics to true #16447 [#16659](https://github.com/apache/datafusion/pull/16659) (blaginin) +- [branch-48] Fix parquet filter_pushdown: respect parquet filter pushdown config i… [#16656](https://github.com/apache/datafusion/pull/16656) (alamb) +- [branch-48] fix: column indices in FFI partition evaluator (#16480) [#16657](https://github.com/apache/datafusion/pull/16657) (alamb) + +## Credits + +Thank you to everyone who contributed to this release. Here is a breakdown of commits (PRs merged) per contributor. + +``` + 2 Andrew Lamb + 1 Dmitrii Blaginin +``` + +Thank you also to everyone who contributed in other ways such as filing issues, reviewing PRs, and providing feedback on this release. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6e5e063a12926..fa7ad5c6c393b 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,7 +84,7 @@ The following configuration settings are available: | datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | +| datafusion.execution.parquet.schema_force_view_types | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files |