diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index 8abed90238d4..ef2a3eaca0c8 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -31,7 +31,9 @@ use datafusion::{ test_util::aggr_test_schema, }; -use datafusion::datasource::physical_plan::FileScanConfigBuilder; +use datafusion::datasource::{ + physical_plan::FileScanConfigBuilder, table_schema::TableSchema, +}; use futures::StreamExt; use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; @@ -67,7 +69,7 @@ async fn csv_opener() -> Result<()> { let config = CsvSource::new(true, b',', b'"') .with_comment(Some(b'#')) - .with_schema(schema) + .with_schema(TableSchema::from_file_schema(schema)) .with_batch_size(8192) .with_projection(&scan_config); diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 94d651ddadd5..37b9663111a5 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -45,6 +45,7 @@ pub use datafusion_catalog::view; pub use datafusion_datasource::schema_adapter; pub use datafusion_datasource::sink; pub use datafusion_datasource::source; +pub use datafusion_datasource::table_schema; pub use datafusion_execution::object_store; pub use datafusion_physical_expr::create_ordering; diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 6df5cd7ac68f..18b855cec55e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -54,7 +54,7 @@ mod tests { use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::file::FileSource; - use datafusion_datasource::{FileRange, PartitionedFile}; + use datafusion_datasource::{FileRange, PartitionedFile, TableSchema}; use datafusion_datasource_parquet::source::ParquetSource; use datafusion_datasource_parquet::{ DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat, @@ -186,7 +186,7 @@ mod tests { source = source.with_bloom_filter_on_read(false); } - source.with_schema(Arc::clone(&table_schema)) + source.with_schema(TableSchema::new(Arc::clone(&table_schema), vec![])) } fn build_parquet_exec( diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index eb4c61c02524..203d9e97d2a8 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -40,6 +40,7 @@ use crate::prelude::{Expr, SessionConfig, SessionContext}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::TableSchema; use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; @@ -186,7 +187,7 @@ impl TestParquetFile { ParquetSource::new(parquet_options) .with_predicate(Arc::clone(&physical_filter_expr)), ) - .with_schema(Arc::clone(&self.schema)); + .with_schema(TableSchema::from_file_schema(Arc::clone(&self.schema))); let config = scan_config_builder.with_source(source).build(); let parquet_exec = DataSourceExec::from_data_source(config); diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 54e8e7bf04da..7d8a9c7c2125 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -24,6 +24,7 @@ use datafusion_datasource::{ file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory, schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile, + TableSchema, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; @@ -156,9 +157,13 @@ impl FileSource for TestSource { }) } - fn with_schema(&self, schema: SchemaRef) -> Arc { + fn with_schema(&self, schema: TableSchema) -> Arc { + assert!( + schema.table_partition_cols().is_empty(), + "TestSource does not support partition columns" + ); Arc::new(TestSource { - schema: Some(schema), + schema: Some(schema.file_schema().clone()), ..self.clone() }) } diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index f43f11880182..f254b7e3ff30 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -20,9 +20,9 @@ use std::sync::Arc; use datafusion_datasource::as_file_source; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_datasource::TableSchema; use arrow::buffer::Buffer; -use arrow::datatypes::SchemaRef; use arrow_ipc::reader::FileDecoder; use datafusion_common::error::Result; use datafusion_common::{exec_datafusion_err, Statistics}; @@ -73,7 +73,7 @@ impl FileSource for ArrowSource { Arc::new(Self { ..self.clone() }) } - fn with_schema(&self, _schema: SchemaRef) -> Arc { + fn with_schema(&self, _schema: TableSchema) -> Arc { Arc::new(Self { ..self.clone() }) } fn with_statistics(&self, statistics: Statistics) -> Arc { diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 0916222337b8..1ff73d2c3cc3 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -29,6 +29,7 @@ use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_datasource::TableSchema; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -84,11 +85,13 @@ impl FileSource for AvroSource { Arc::new(conf) } - fn with_schema(&self, schema: SchemaRef) -> Arc { + fn with_schema(&self, schema: TableSchema) -> Arc { let mut conf = self.clone(); - conf.schema = Some(schema); + // TableSchema may have partition columns, but AvroSource does not use partition columns or values atm + conf.schema = Some(Arc::clone(schema.file_schema())); Arc::new(conf) } + fn with_statistics(&self, statistics: Statistics) -> Arc { let mut conf = self.clone(); conf.projected_statistics = Some(statistics); diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 0445329d0653..0b18571e58bd 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -29,7 +29,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::{ as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile, - RangeCalculation, + RangeCalculation, TableSchema, }; use arrow::csv; @@ -258,9 +258,9 @@ impl FileSource for CsvSource { Arc::new(conf) } - fn with_schema(&self, schema: SchemaRef) -> Arc { + fn with_schema(&self, schema: TableSchema) -> Arc { let mut conf = self.clone(); - conf.file_schema = Some(schema); + conf.file_schema = Some(Arc::clone(schema.file_schema())); Arc::new(conf) } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 0b1eee1dac58..52ed0def03f1 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -32,6 +32,7 @@ use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation, + TableSchema, }; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -122,7 +123,7 @@ impl FileSource for JsonSource { Arc::new(conf) } - fn with_schema(&self, _schema: SchemaRef) -> Arc { + fn with_schema(&self, _schema: TableSchema) -> Arc { Arc::new(Self { ..self.clone() }) } fn with_statistics(&self, statistics: Statistics) -> Arc { diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 186d922fc373..d646e44762a6 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -35,11 +35,12 @@ use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; -use arrow::datatypes::{SchemaRef, TimeUnit}; +use arrow::datatypes::TimeUnit; use datafusion_common::config::TableParquetOptions; use datafusion_common::{DataFusionError, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::TableSchema; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; @@ -275,7 +276,7 @@ pub struct ParquetSource { /// The schema of the file. /// In particular, this is the schema of the table without partition columns, /// *not* the physical schema of the file. - pub(crate) file_schema: Option, + pub(crate) table_schema: Option, /// Optional predicate for row filtering during parquet scan pub(crate) predicate: Option>, /// Optional user defined parquet file reader factory @@ -601,9 +602,9 @@ impl FileSource for ParquetSource { Arc::new(conf) } - fn with_schema(&self, schema: SchemaRef) -> Arc { + fn with_schema(&self, schema: TableSchema) -> Arc { Arc::new(Self { - file_schema: Some(schema), + table_schema: Some(schema), ..self.clone() }) } @@ -661,9 +662,10 @@ impl FileSource for ParquetSource { // the actual predicates are built in reference to the physical schema of // each file, which we do not have at this point and hence cannot use. // Instead we use the logical schema of the file (the table schema without partition columns). - if let (Some(file_schema), Some(predicate)) = - (&self.file_schema, &self.predicate) - { + if let (Some(file_schema), Some(predicate)) = ( + &self.table_schema.as_ref().map(|ts| ts.file_schema()), + &self.predicate, + ) { let predicate_creation_errors = Count::new(); if let (Some(pruning_predicate), _) = build_pruning_predicates( Some(predicate), @@ -700,7 +702,12 @@ impl FileSource for ParquetSource { filters: Vec>, config: &ConfigOptions, ) -> datafusion_common::Result>> { - let Some(file_schema) = self.file_schema.clone() else { + let Some(table_schema) = self + .table_schema + .as_ref() + .map(|ts| ts.table_schema()) + .cloned() + else { return Ok(FilterPushdownPropagation::with_parent_pushdown_result( vec![PushedDown::No; filters.len()], )); @@ -720,7 +727,7 @@ impl FileSource for ParquetSource { let filters: Vec = filters .into_iter() .map(|filter| { - if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) { + if can_expr_be_pushed_down_with_schemas(&filter, &table_schema) { PushedDownPredicate::supported(filter) } else { PushedDownPredicate::unsupported(filter) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 7a2cf403fd8d..d6ade3b8b210 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -26,7 +26,7 @@ use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; use crate::schema_adapter::SchemaAdapterFactory; -use arrow::datatypes::SchemaRef; +use crate::TableSchema; use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, Result, Statistics}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; @@ -64,7 +64,7 @@ pub trait FileSource: Send + Sync { /// Initialize new type with batch size configuration fn with_batch_size(&self, batch_size: usize) -> Arc; /// Initialize new instance with a new schema - fn with_schema(&self, schema: SchemaRef) -> Arc; + fn with_schema(&self, schema: TableSchema) -> Arc; /// Initialize new instance with projection information fn with_projection(&self, config: &FileScanConfig) -> Arc; /// Initialize new instance with projected statistics diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c52397d9a7cc..00ff95ba0dd0 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -89,6 +89,7 @@ use log::{debug, warn}; /// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; /// # use datafusion_datasource::file_stream::FileOpener; /// # use datafusion_datasource::source::DataSourceExec; +/// # use datafusion_datasource::table_schema::TableSchema; /// # use datafusion_execution::object_store::ObjectStoreUrl; /// # use datafusion_physical_plan::ExecutionPlan; /// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -109,7 +110,7 @@ use log::{debug, warn}; /// # fn create_file_opener(&self, _: Arc, _: &FileScanConfig, _: usize) -> Arc { unimplemented!() } /// # fn as_any(&self) -> &dyn Any { self } /// # fn with_batch_size(&self, _: usize) -> Arc { unimplemented!() } -/// # fn with_schema(&self, _: SchemaRef) -> Arc { Arc::new(self.clone()) as Arc } +/// # fn with_schema(&self, _: TableSchema) -> Arc { Arc::new(self.clone()) as Arc } /// # fn with_projection(&self, _: &FileScanConfig) -> Arc { unimplemented!() } /// # fn with_statistics(&self, statistics: Statistics) -> Arc { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) } /// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() } @@ -470,7 +471,7 @@ impl FileScanConfigBuilder { let file_source = file_source .with_statistics(statistics.clone()) - .with_schema(Arc::clone(table_schema.file_schema())); + .with_schema(table_schema.clone()); let file_compression_type = file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); let new_lines_in_values = new_lines_in_values.unwrap_or(false); diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index 863c123e3b1d..8002df4a99df 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -85,6 +85,11 @@ impl TableSchema { /// The table schema is automatically computed by appending the partition columns /// to the file schema. /// + /// You should prefer calling this method over + /// chaining [`TableSchema::from_file_schema`] and [`TableSchema::with_table_partition_cols`] + /// if you have both the file schema and partition columns available at construction time + /// since it avoids re-computing the table schema. + /// /// # Arguments /// /// * `file_schema` - Schema of the data files (without partition columns) @@ -121,18 +126,21 @@ impl TableSchema { } } - /// Create a new TableSchema from a file schema with no partition columns. + /// Create a new TableSchema with no partition columns. + /// + /// You should prefer calling [`TableSchema::new`] if you have partition columns at + /// construction time since it avoids re-computing the table schema. pub fn from_file_schema(file_schema: SchemaRef) -> Self { Self::new(file_schema, vec![]) } - /// Set the table partition columns and rebuild the table schema. - pub fn with_table_partition_cols( - mut self, - table_partition_cols: Vec, - ) -> TableSchema { - self.table_partition_cols = table_partition_cols; - // Rebuild the table schema with the new partition columns + /// Add partition columns to an existing TableSchema, returning a new instance. + /// + /// You should prefer calling [`TableSchema::new`] instead of chaining [`TableSchema::from_file_schema`] + /// into [`TableSchema::with_table_partition_cols`] if you have partition columns at construction time + /// since it avoids re-computing the table schema. + pub fn with_table_partition_cols(mut self, partition_cols: Vec) -> Self { + self.table_partition_cols = partition_cols; let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); builder.extend(self.table_partition_cols.iter().cloned()); self.table_schema = Arc::new(builder.finish()); diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index f0aff1fa62b7..feb704af9913 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -22,7 +22,8 @@ use crate::{ use std::sync::Arc; -use arrow::datatypes::{Schema, SchemaRef}; +use crate::TableSchema; +use arrow::datatypes::Schema; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr::{expressions::Column, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -66,7 +67,7 @@ impl FileSource for MockSource { Arc::new(Self { ..self.clone() }) } - fn with_schema(&self, _schema: SchemaRef) -> Arc { + fn with_schema(&self, _schema: TableSchema) -> Arc { Arc::new(Self { ..self.clone() }) } diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 6dc2c264aeb8..e4676ae5332d 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -474,10 +474,7 @@ EXPLAIN select * from t_pushdown where part != val logical_plan 01)Filter: t_pushdown.val != t_pushdown.part 02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != t_pushdown.part] -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--FilterExec: val@0 != part@1 -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet +physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != part@1 # If we reference only a partition column it gets evaluated during the listing phase query TT @@ -505,11 +502,7 @@ EXPLAIN select * from t_pushdown where val != 'd' AND val != 'c' AND part = 'a' logical_plan 01)Filter: t_pushdown.val != Utf8View("d") AND t_pushdown.val != Utf8View("c") AND t_pushdown.val != t_pushdown.part 02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8View("a")], partial_filters=[t_pushdown.val != Utf8View("d"), t_pushdown.val != Utf8View("c"), t_pushdown.val != t_pushdown.part] -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--FilterExec: val@0 != part@1 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != d AND val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != d OR d != val_max@1) AND val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c, d)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != d AND val@0 != c AND val@0 != part@1, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != d OR d != val_max@1) AND val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c, d)] # The order of filters should not matter query TT @@ -518,10 +511,7 @@ EXPLAIN select val, part from t_pushdown where part = 'a' AND part = val; logical_plan 01)Filter: t_pushdown.val = t_pushdown.part 02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8View("a")], partial_filters=[t_pushdown.val = t_pushdown.part] -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--FilterExec: val@0 = part@1 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 = part@1 query TT select val, part from t_pushdown where part = 'a' AND part = val; @@ -534,10 +524,7 @@ EXPLAIN select val, part from t_pushdown where part = val AND part = 'a'; logical_plan 01)Filter: t_pushdown.val = t_pushdown.part 02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8View("a")], partial_filters=[t_pushdown.val = t_pushdown.part] -physical_plan -01)CoalesceBatchesExec: target_batch_size=8192 -02)--FilterExec: val@0 = part@1 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 = part@1 query TT select val, part from t_pushdown where part = val AND part = 'a'; diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index c568b8b28e1f..f34b8b2a5cf0 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -182,6 +182,84 @@ let indices = projection_exprs.column_indices(); _execution plan_ of the query. With this release, `DESCRIBE query` now outputs the computed _schema_ of the query, consistent with the behavior of `DESCRIBE table_name`. +### Introduction of `TableSchema` and changes to `FileSource::with_schema()` method + +A new `TableSchema` struct has been introduced in the `datafusion-datasource` crate to better manage table schemas with partition columns. This struct helps distinguish between: + +- **File schema**: The schema of actual data files on disk +- **Partition columns**: Columns derived from directory structure (e.g., Hive-style partitioning) +- **Table schema**: The complete schema combining both file and partition columns + +As part of this change, the `FileSource::with_schema()` method signature has changed from accepting a `SchemaRef` to accepting a `TableSchema`. + +**Who is affected:** + +- Users who have implemented custom `FileSource` implementations will need to update their code +- Users who only use built-in file sources (Parquet, CSV, JSON, AVRO, Arrow) are not affected + +**Migration guide for custom `FileSource` implementations:** + +```diff + use datafusion_datasource::file::FileSource; +-use arrow::datatypes::SchemaRef; ++use datafusion_datasource::TableSchema; + + impl FileSource for MyCustomSource { +- fn with_schema(&self, schema: SchemaRef) -> Arc { ++ fn with_schema(&self, schema: TableSchema) -> Arc { + Arc::new(Self { +- schema: Some(schema), ++ // Use schema.file_schema() to get the file schema without partition columns ++ schema: Some(Arc::clone(schema.file_schema())), + ..self.clone() + }) + } + } +``` + +For implementations that need access to partition columns: + +```rust,ignore +fn with_schema(&self, schema: TableSchema) -> Arc { + Arc::new(Self { + file_schema: Arc::clone(schema.file_schema()), + partition_cols: schema.table_partition_cols().clone(), + table_schema: Arc::clone(schema.table_schema()), + ..self.clone() + }) +} +``` + +**Note**: Most `FileSource` implementations only need to store the file schema (without partition columns), as shown in the first example. The second pattern of storing all three schema components is typically only needed for advanced use cases where you need access to different schema representations for different operations (e.g., ParquetSource uses the file schema for building pruning predicates but needs the table schema for filter pushdown logic). + +**Using `TableSchema` directly:** + +If you're constructing a `FileScanConfig` or working with table schemas and partition columns, you can now use `TableSchema`: + +```rust +use datafusion_datasource::TableSchema; +use arrow::datatypes::{Schema, Field, DataType}; +use std::sync::Arc; + +// Create a TableSchema with partition columns +let file_schema = Arc::new(Schema::new(vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), +])); + +let partition_cols = vec![ + Arc::new(Field::new("date", DataType::Utf8, false)), + Arc::new(Field::new("region", DataType::Utf8, false)), +]; + +let table_schema = TableSchema::new(file_schema, partition_cols); + +// Access different schema representations +let file_schema_ref = table_schema.file_schema(); // Schema without partition columns +let full_schema = table_schema.table_schema(); // Complete schema with partition columns +let partition_cols_ref = table_schema.table_partition_cols(); // Just the partition columns +``` + ## DataFusion `50.0.0` ### ListingTable automatically detects Hive Partitioned tables