Skip to content

Internal error: PhysicalExpr Column references bound error, Failure in spilling for AggregateMode::Single #15530

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
rluvaton opened this issue Apr 1, 2025 · 1 comment · Fixed by #15531
Labels
bug Something isn't working

Comments

@rluvaton
Copy link
Contributor

rluvaton commented Apr 1, 2025

Describe the bug

when using aggregate exec with single mode, and spilling and the group by expressions are not the first expressions from the previous plan there will be schema mismatch

To Reproduce

#[cfg(test)]
mod tests {
    use std::fmt::{Display, Formatter};
    use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray};
    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
    use datafusion::common::Result;
    use datafusion::execution::memory_pool::FairSpillPool;
    use datafusion::execution::runtime_env::RuntimeEnvBuilder;
    use datafusion::execution::TaskContext;
    use datafusion::functions_aggregate::sum::sum_udaf;
    use datafusion::physical_expr::aggregate::AggregateExprBuilder;
    use datafusion::physical_expr::expressions::{lit, Column};
    use datafusion::physical_plan::aggregates::{PhysicalGroupBy, AggregateExec, AggregateMode};
    use datafusion::physical_plan::common::collect;
    use datafusion::physical_plan::ExecutionPlan;
    use rand::{random, thread_rng, Rng};
    use std::sync::Arc;
    use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
    use parking_lot::RwLock;

    #[tokio::test]
    async fn test_debug() -> Result<()> {
        let scan_schema = Arc::new(Schema::new(vec![
            Field::new("col_0", DataType::Int64, true),
            Field::new("col_1", DataType::Utf8, true),
            Field::new("col_2", DataType::Utf8, true),
            Field::new("col_3", DataType::Utf8, true),
            Field::new("col_4", DataType::Utf8, true),
            Field::new("col_5", DataType::Int32, true),
            Field::new("col_6", DataType::Utf8, true),
            Field::new("col_7", DataType::Utf8, true),
            Field::new("col_8", DataType::Utf8, true),
        ]));

        let group_by = PhysicalGroupBy::new_single(vec![
            (Arc::new(Column::new("col_1", 1)), "col_1".to_string()),
            (Arc::new(Column::new("col_7", 7)), "col_7".to_string()),
            (Arc::new(Column::new("col_0", 0)), "col_0".to_string()),
            (Arc::new(Column::new("col_8", 8)), "col_8".to_string()),
        ]);

        fn generate_int64_array() -> ArrayRef {
            Arc::new(Int64Array::from_iter_values(
                (0..8192).map(|_| random::<i64>()),
            ))
        }
        fn generate_int32_array() -> ArrayRef {
            Arc::new(Int32Array::from_iter_values(
                (0..8192).map(|_| random::<i32>()),
            ))
        }

        fn generate_string_array() -> ArrayRef {
            Arc::new(StringArray::from(
                (0..8192)
                    .map(|_| -> String {
                        thread_rng()
                            .sample_iter::<char, _>(rand::distributions::Standard)
                            .take(10)
                            .collect()
                    })
                    .collect::<Vec<_>>(),
            ))
        }

        fn generate_record_batch(schema: &SchemaRef) -> Result<RecordBatch> {
            RecordBatch::try_new(
                Arc::clone(&schema),
                vec![
                    generate_int64_array(),
                    generate_string_array(),
                    generate_string_array(),
                    generate_string_array(),
                    generate_string_array(),
                    generate_int32_array(),
                    generate_string_array(),
                    generate_string_array(),
                    generate_string_array(),
                ],
            )
            .map_err(|err| err.into())
        }

        let aggregate_expressions = vec![Arc::new(
            AggregateExprBuilder::new(sum_udaf(), vec![lit(1i64)])
                .schema(Arc::clone(&scan_schema))
                .alias("SUM(1i64)")
                .build()?,
        )];

        #[derive(Debug)]
        struct Generator {
            index: usize,
            count: usize,
            schema: SchemaRef,
        }

        impl Display for Generator {
            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
                write!(f, "Generator")
            }
        }

        impl LazyBatchGenerator for Generator {
            fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
                if self.index > self.count {
                    return Ok(None);
                }

                let batch = generate_record_batch(&self.schema)?;
                self.index += 1;

                Ok(Some(batch))
            }
        }

        let generator = Generator {
            index: 0,
            count: 10,
            schema: Arc::clone(&scan_schema),
        };

        let plan: Arc<dyn ExecutionPlan> =
          Arc::new(LazyMemoryExec::try_new(Arc::clone(&scan_schema), vec![Arc::new(RwLock::new(generator))])?);

        let single_aggregate = Arc::new(AggregateExec::try_new(
            AggregateMode::Single,
            group_by,
            aggregate_expressions.clone(),
            vec![None; aggregate_expressions.len()],
            plan,
            Arc::clone(&scan_schema),
        )?);

        let memory_pool = Arc::new(FairSpillPool::new(10006216));
        let task_ctx = Arc::new(
            TaskContext::default().with_runtime(Arc::new(
                RuntimeEnvBuilder::new()
                    .with_memory_pool(memory_pool)
                    .build()?,
            )),
        );

        let res = collect(single_aggregate.execute(0, Arc::clone(&task_ctx))?).await;

        match res {
            Ok(_) => println!("Success"),
            Err(e) => {
                println!("Error: {}", e);

                return Err(e);
            },
        }

        Ok(())
    }
}

The following error happen:

Error: Internal error: PhysicalExpr Column references column 'col_7' at index 7 (zero-based) but input schema only has 5 columns: ["col_1", "col_7", "col_0", "col_8", "SUM(1i64)[sum]"]

backtrace:    0: std::backtrace_rs::backtrace::libunwind::trace
             ...
   1: std::backtrace_rs::backtrace::trace_unsynchronized
             at ...
   2: std::backtrace::Backtrace::create
             at ...
   3: datafusion_common::error::DataFusionError::get_back_trace
             at <crates>/datafusion-common-46.0.1/src/error.rs:473:30
   4: datafusion_physical_expr::expressions::column::Column::bounds_check
             at <crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:147:13
   5: <datafusion_physical_expr::expressions::column::Column as datafusion_physical_expr_common::physical_expr::PhysicalExpr>::evaluate
             at <crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:126:9
   6: datafusion_physical_plan::aggregates::evaluate_group_by::{{closure}}
             at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1305:25
   7: core::iter::adapters::map::map_try_fold::{{closure}}
             at ...
             
  8-21: std stuff
  
  22: core::iter::traits::iterator::Iterator::collect
             at ...
  23: datafusion_physical_plan::aggregates::evaluate_group_by
             at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1301:32
  24: datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::group_aggregate_batch
             at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:821:13
  25: <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next
             at <crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:697:29
             
  26-28: futures stuff
  
  29: datafusion_physical_plan::common::collect::{{closure}}
             at <crates>/datafusion-physical-plan-46.0.1/src/common.rs:45:36
  30: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}}
             at ./src/repro_bug_agg.rs:145:99
             
  31-51: std and tokio stuff
  
  52: datafusion_pg::repro_bug_agg::tests::test_debug
             at ./src/repro_bug_agg.rs:147:9
  53: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}}
             at ./src/repro_bug_agg.rs:23:30
.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

Expected behavior

should not fail

Additional context

The spill schema is:

spill_state.spill_schema: Schema {
    fields: [
        Field {
            name: "col_1",
            data_type: Utf8,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
        Field {
            name: "col_7",
            data_type: Utf8,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
        Field {
            name: "col_0",
            data_type: Int64,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
        Field {
            name: "col_8",
            data_type: Utf8,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
        Field {
            name: "SUM(1i64)[sum]",
            data_type: Int64,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
    ],
    metadata: {},
}

the issue is that the spilling schema is the output schema of the intermediate results while the group by expressions are the same and because column point to an index rather than by name the index now does not exists

@rluvaton rluvaton added the bug Something isn't working label Apr 1, 2025
@rluvaton
Copy link
Contributor Author

rluvaton commented Apr 1, 2025

will create a PR soon

rluvaton added a commit to rluvaton/datafusion that referenced this issue Apr 1, 2025
@alamb alamb closed this as completed in 0b061be Apr 3, 2025
nirnayroy pushed a commit to nirnayroy/datafusion that referenced this issue May 2, 2025
* fix: update group by columns for merge phase after spill

fixes apache#15530

* Update datafusion/physical-plan/src/aggregates/row_hash.rs

Co-authored-by: Oleks V <comphead@users.noreply.github.com>

* move test to aggregate_fuzz

---------

Co-authored-by: Oleks V <comphead@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant