Skip to content

Commit 04272f0

Browse files
committed
fix
1 parent a0d19fc commit 04272f0

File tree

9 files changed

+40
-9
lines changed

9 files changed

+40
-9
lines changed

datafusion/catalog/src/default_table_source.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ impl TableSource for DefaultTableSource {
7373
&self,
7474
filter: &[&Expr],
7575
) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
76-
#[allow(deprecated)]
7776
self.table_provider.supports_filters_pushdown(filter)
7877
}
7978

datafusion/catalog/src/memory/table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ impl MemTable {
138138
) -> Result<Self> {
139139
let schema = t.schema();
140140
let constraints = t.constraints();
141+
#[expect(deprecated)]
141142
let exec = t.scan(state, None, &[], None).await?;
142143
let partition_count = exec.output_partitioning().partition_count();
143144

datafusion/catalog/src/table.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,22 @@ pub trait TableProvider: Debug + Sync + Send {
185185
limit,
186186
} = args;
187187
let filters = filters.unwrap_or_default();
188-
#[allow(deprecated)]
188+
let unsupported_filters = self
189+
.supports_filters_pushdown(&filters.iter().collect_vec())?
190+
.into_iter()
191+
.zip(&filters)
192+
.filter_map(|(support, expr)| match support {
193+
TableProviderFilterPushDown::Inexact | TableProviderFilterPushDown::Unsupported => {
194+
Some(expr.clone())
195+
}
196+
TableProviderFilterPushDown::Exact => None,
197+
})
198+
.collect_vec();
199+
#[expect(deprecated)]
189200
let plan = self
190201
.scan(state, projection.as_ref(), &filters, limit)
191202
.await?;
192-
Ok(ScanResult::new(plan, vec![]))
203+
Ok(ScanResult::new(plan, unsupported_filters))
193204
}
194205

195206
/// Specify if DataFusion should provide filter expressions to the
@@ -272,7 +283,6 @@ pub trait TableProvider: Debug + Sync + Send {
272283
/// }
273284
/// }
274285
/// ```
275-
#[deprecated(since = "50.0.0", note = "Use `scan_with_args` instead")]
276286
fn supports_filters_pushdown(
277287
&self,
278288
filters: &[&Expr],

datafusion/core/src/datasource/listing/table.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,9 +1178,9 @@ impl TableProvider for ListingTable {
11781178
state: &dyn Session,
11791179
args: ScanArgs,
11801180
) -> Result<ScanResult> {
1181-
let projection = options.projection();
1182-
let filters = options.filters().map(|f| f.to_vec()).unwrap_or_default();
1183-
let limit = options.limit();
1181+
let projection = args.projection();
1182+
let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
1183+
let limit = args.limit();
11841184

11851185
// extract types of partition columns
11861186
let table_partition_cols = self
@@ -1219,7 +1219,7 @@ impl TableProvider for ListingTable {
12191219
}
12201220

12211221
let known_file_ordering = self.try_create_output_ordering()?;
1222-
let desired_file_ordering = match options.preferred_ordering() {
1222+
let desired_file_ordering = match args.preferred_ordering() {
12231223
Some(ordering) if !ordering.is_empty() => {
12241224
// Prefer the ordering requested by the query to any inherint file ordering
12251225
create_ordering(&self.table_schema, &[ordering.to_vec()])?
@@ -1696,6 +1696,7 @@ mod tests {
16961696

16971697
let table = load_table(&ctx, "alltypes_plain.parquet").await?;
16981698
let projection = None;
1699+
#[expect(deprecated)]
16991700
let exec = table
17001701
.scan(&ctx.state(), projection, &[], None)
17011702
.await
@@ -1833,6 +1834,7 @@ mod tests {
18331834
// this will filter out the only file in the store
18341835
let filter = Expr::not_eq(col("p1"), lit("v1"));
18351836

1837+
#[expect(deprecated)]
18361838
let scan = table
18371839
.scan(&ctx.state(), None, &[filter], None)
18381840
.await
@@ -2733,6 +2735,7 @@ mod tests {
27332735
.with_schema(schema_default);
27342736
let table_default = ListingTable::try_new(config_default)?;
27352737

2738+
#[expect(deprecated)]
27362739
let exec_default = table_default.scan(&state, None, &[], None).await?;
27372740
assert_eq!(
27382741
exec_default.partition_statistics(None)?.num_rows,
@@ -2754,6 +2757,7 @@ mod tests {
27542757
.with_schema(schema_disabled);
27552758
let table_disabled = ListingTable::try_new(config_disabled)?;
27562759

2760+
#[expect(deprecated)]
27572761
let exec_disabled = table_disabled.scan(&state, None, &[], None).await?;
27582762
assert_eq!(
27592763
exec_disabled.partition_statistics(None)?.num_rows,
@@ -2773,6 +2777,7 @@ mod tests {
27732777
.with_schema(schema_enabled);
27742778
let table_enabled = ListingTable::try_new(config_enabled)?;
27752779

2780+
#[expect(deprecated)]
27762781
let exec_enabled = table_enabled.scan(&state, None, &[], None).await?;
27772782
assert_eq!(
27782783
exec_enabled.partition_statistics(None)?.num_rows,
@@ -2879,6 +2884,7 @@ mod tests {
28792884
assert!(table.schema_adapter_factory().is_none());
28802885

28812886
// The scan should work correctly with the default schema adapter
2887+
#[expect(deprecated)]
28822888
let scan_result = table.scan(&ctx.state(), None, &[], None).await;
28832889
assert!(
28842890
scan_result.is_ok(),
@@ -2915,6 +2921,7 @@ mod tests {
29152921
)?;
29162922

29172923
// The error should bubble up from the scan operation when schema mapping fails
2924+
#[expect(deprecated)]
29182925
let scan_result = table.scan(&ctx.state(), None, &[], None).await;
29192926

29202927
assert!(scan_result.is_err());

datafusion/core/src/datasource/memory_test.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ mod tests {
5959
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
6060

6161
// scan with projection
62+
#[expect(deprecated)]
6263
let exec = provider
6364
.scan(&session_ctx.state(), Some(&vec![2, 1]), &[], None)
6465
.await?;
@@ -94,6 +95,7 @@ mod tests {
9495

9596
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
9697

98+
#[expect(deprecated)]
9799
let exec = provider.scan(&session_ctx.state(), None, &[], None).await?;
98100
let mut it = exec.execute(0, task_ctx)?;
99101
let batch1 = it.next().await.unwrap()?;
@@ -126,6 +128,7 @@ mod tests {
126128

127129
let projection: Vec<usize> = vec![0, 4];
128130

131+
#[expect(deprecated)]
129132
match provider
130133
.scan(&session_ctx.state(), Some(&projection), &[], None)
131134
.await
@@ -254,6 +257,7 @@ mod tests {
254257
let provider =
255258
MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?;
256259

260+
#[expect(deprecated)]
257261
let exec = provider.scan(&session_ctx.state(), None, &[], None).await?;
258262
let mut it = exec.execute(0, task_ctx)?;
259263
let batch1 = it.next().await.unwrap()?;

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ use datafusion_expr::{
8686
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8787
use datafusion_physical_expr::expressions::{Column, Literal};
8888
use datafusion_physical_expr::{
89-
conjunction, create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
89+
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
9090
};
9191
use datafusion_physical_optimizer::PhysicalOptimizerRule;
9292
use datafusion_physical_plan::empty::EmptyExec;

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ async fn check_stats_precision_with_filter_pushdown() {
6060
options.execution.parquet.pushdown_filters = true;
6161

6262
// Scan without filter, stats are exact
63+
#[expect(deprecated)]
6364
let exec = table.scan(&state, None, &[], None).await.unwrap();
6465
assert_eq!(
6566
exec.partition_statistics(None).unwrap().num_rows,
@@ -71,6 +72,7 @@ async fn check_stats_precision_with_filter_pushdown() {
7172
// (it is not a partition filter). Therefore; it will be pushed down to the
7273
// source operator after the appropriate optimizer pass.
7374
let filter_expr = Expr::gt(col("id"), lit(1));
75+
#[expect(deprecated)]
7476
let exec_with_filter = table
7577
.scan(&state, None, std::slice::from_ref(&filter_expr), None)
7678
.await
@@ -119,6 +121,7 @@ async fn load_table_stats_with_session_level_cache() {
119121

120122
//Session 1 first time list files
121123
assert_eq!(get_static_cache_size(&state1), 0);
124+
#[expect(deprecated)]
122125
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
123126

124127
assert_eq!(
@@ -135,6 +138,7 @@ async fn load_table_stats_with_session_level_cache() {
135138
//Session 2 first time list files
136139
//check session 1 cache result not show in session 2
137140
assert_eq!(get_static_cache_size(&state2), 0);
141+
#[expect(deprecated)]
138142
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
139143
assert_eq!(
140144
exec2.partition_statistics(None).unwrap().num_rows,
@@ -150,6 +154,7 @@ async fn load_table_stats_with_session_level_cache() {
150154
//Session 1 second time list files
151155
//check session 1 cache result not show in session 2
152156
assert_eq!(get_static_cache_size(&state1), 1);
157+
#[expect(deprecated)]
153158
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
154159
assert_eq!(
155160
exec3.partition_statistics(None).unwrap().num_rows,
@@ -195,6 +200,7 @@ async fn list_files_with_session_level_cache() {
195200

196201
//Session 1 first time list files
197202
assert_eq!(get_list_file_cache_size(&state1), 0);
203+
#[expect(deprecated)]
198204
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
199205
let data_source_exec = exec1.as_any().downcast_ref::<DataSourceExec>().unwrap();
200206
let data_source = data_source_exec.data_source();
@@ -211,6 +217,7 @@ async fn list_files_with_session_level_cache() {
211217
//Session 2 first time list files
212218
//check session 1 cache result not show in session 2
213219
assert_eq!(get_list_file_cache_size(&state2), 0);
220+
#[expect(deprecated)]
214221
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
215222
let data_source_exec = exec2.as_any().downcast_ref::<DataSourceExec>().unwrap();
216223
let data_source = data_source_exec.data_source();
@@ -227,6 +234,7 @@ async fn list_files_with_session_level_cache() {
227234
//Session 1 second time list files
228235
//check session 1 cache result not show in session 2
229236
assert_eq!(get_list_file_cache_size(&state1), 1);
237+
#[expect(deprecated)]
230238
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
231239
let data_source_exec = exec3.as_any().downcast_ref::<DataSourceExec>().unwrap();
232240
let data_source = data_source_exec.data_source();

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ mod test {
102102
.downcast_ref::<ListingTable>()
103103
.unwrap()
104104
.clone();
105+
#[expect(deprecated)]
105106
listing_table
106107
.scan(&ctx.state(), None, &[], None)
107108
.await

datafusion/ffi/src/table_provider.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ unsafe extern "C" fn scan_fn_wrapper(
260260

261261
let projections: Vec<_> = projections.into_iter().collect();
262262

263+
#[expect(deprecated)]
263264
let plan = rresult_return!(
264265
internal_provider
265266
.scan(&ctx.state(), Some(&projections), &filters, limit.into())

0 commit comments

Comments
 (0)