Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ async fn sort_preserving_merge() {
"| plan_type | plan |",
"+---------------+--------------------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| | TableScan: t projection=[a, b] preferred_ordering=[t.a ASC NULLS LAST, t.b ASC NULLS LAST] |",
"| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], fetch=10 |",
"| | DataSourceExec: partitions=2, partition_sizes=[5, 5], fetch=10, output_ordering=a@0 ASC NULLS LAST, b@1 ASC NULLS LAST |",
"| | |",
Expand Down Expand Up @@ -303,7 +303,7 @@ async fn sort_spill_reservation() {
"| plan_type | plan |",
"+---------------+-------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST |",
"| | TableScan: t projection=[a, b] |",
"| | TableScan: t projection=[a, b] preferred_ordering=[t.a ASC NULLS LAST, t.b DESC NULLS FIRST] |",
"| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST, b@1 DESC], preserve_partitioning=[false] |",
"| | DataSourceExec: partitions=1, partition_sizes=[5], output_ordering=a@0 ASC NULLS LAST, b@1 ASC NULLS LAST |",
"| | |",
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,9 @@ async fn topk_query() -> Result<()> {
async fn topk_plan() -> Result<()> {
let ctx = setup_table(make_topk_context()).await?;

#[rustfmt::skip]
let mut expected = ["| logical_plan after topk | TopK: k=3 |",
"| | TableScan: sales projection=[customer_id,revenue] |"].join("\n");
"| | TableScan:salesprojection=[customer_id,revenue]preferred_ordering=[sales.revenueDESCNULLSFIRST] |"].join("\n");

let explain_query = format!("EXPLAIN VERBOSE {QUERY}");
let actual_output = exec_sql(&ctx, &explain_query).await?;
Expand Down
7 changes: 5 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter,
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Projection, RecursiveQuery, Repartition, ScanOrdering, SkipType, Sort,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addition of the ScanOrdering import results in reformatting of the rest

StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Unnest, Values, Window,
};
pub use statement::{
Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode,
Expand All @@ -54,3 +55,5 @@ pub use datafusion_common::format::ExplainFormat;
pub use display::display_schema;

pub use extension::{UserDefinedLogicalNode, UserDefinedLogicalNodeCore};

pub use tree_node::LogicalPlanContext;
96 changes: 96 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1757,6 +1757,7 @@ impl LogicalPlan {
ref projection,
ref filters,
ref fetch,
ref ordering,
..
}) => {
let projected_fields = match projection {
Expand Down Expand Up @@ -1824,6 +1825,20 @@ impl LogicalPlan {
write!(f, ", fetch={n}")?;
}

if let Some(ordering) = ordering {
if let Some(preferred_ordering) = &ordering.preferred_ordering {
write!(
f,
" preferred_ordering=[{}]",
preferred_ordering
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
)?;
}
}

Ok(())
}
LogicalPlan::Projection(Projection { ref expr, .. }) => {
Expand Down Expand Up @@ -2593,6 +2608,68 @@ impl PartialOrd for Window {
}
}

/// Communicates the desired ordering of the output of a scan operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this text is about the preferred_ordering field, so maybe it would be best moved closer there.

I can imagine potentially adding other fields like required_ordering in the future, which could communicate if the scan was required (if/when we extend the ExecutionPlan API to communicate what type of sort pushdowns are supported 🤔 )

///
/// Preferred orderings can potentially help DataFusion optimize queries, even in cases
/// when the output does not completely follow that order. This is information passed
/// to the scan about what might help.
///
/// For example, a query with `ORDER BY time DESC LIMIT 10`, DataFusion's dynamic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about also linking to the blog that explains this in more detail: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/

/// predicates and TopK operator will work better if the data is roughly ordered by descending
/// time (more recent data first).
///
/// Implementers of [`TableProvider`] should use this information to optimize the order in which data is output from the scan.
///
/// It is a hint and not a requirement:
/// - If this information is completely ignored, e.g. data is scanned randomly, the query will still be correct because a sort will be applied to the data.
/// - Partially ordered data will also be re-sorted but this may result in optimizations like early stopping, additional data pruning, reduced memory usage during the sort, etc.
/// - If the scan produces exactly the requested ordering, and sets it's properties to reflect this, upstream sorts may be optimized away.
///
/// Actually removing unnecessary sorts is done at the physical plan level: logical operators like a join may or may not preserve ordering
/// depending on what physical operator is chosen (e.g. HashJoin vs. SortMergeJoin).
/// If you as a [`TableProvider`] implementer would like to eliminiate unnecessary sorts you should make sure the [`ExecutionPlan`]
/// you produce reflects the ordering in it's properties.
///
/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
/// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Default)]
pub struct ScanOrdering {
/// Optional preferred ordering for the scan that matches the output order of upstream query nodes.
/// It is optional / best effort for the scan to produce this ordering.
/// If the scan produces this exact ordering and sets it's properties to reflect this upstream sorts may be optimized away.
/// Otherwise the sorts may remain in place but partial ordering may be exploited e.g. to do early stopping or reduce complexity of the sort.
/// Thus it is recommended for the scan to also do a best effort to produce partially sorted data if possible.
pub preferred_ordering: Option<Vec<SortExpr>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be preferred_ordering: Vec<SortExpr>,?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It comes down to what does a Vec of zero length mean? That would imply to me there is no preferred ordering, so an Option<Vec> is redundant

I would personally recommend making this non pub and adding an accessor like fn preferred_ordering(&self) -> Option<&Vec<...>> {} and documenting in doc comments what the invariants are

}

impl ScanOrdering {
/// Attach a preferred ordering to the scan ordering.
/// See [`ScanOrdering`] for details on how this is used.
pub fn with_preferred_ordering(mut self, preferred_ordering: Vec<SortExpr>) -> Self {
self.preferred_ordering = Some(preferred_ordering);
self
}
}

impl Debug for ScanOrdering {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let ordering_display = self
.preferred_ordering
.as_ref()
.map(|ordering| {
ordering
.iter()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join(", ")
})
.unwrap_or_else(|| "None".to_string());
f.debug_struct("ScanOrdering")
.field("preferred_ordering", &ordering_display)
.finish_non_exhaustive()
}
}

/// Produces rows from a table provider by reference or from the context
#[derive(Clone)]
pub struct TableScan {
Expand All @@ -2608,6 +2685,8 @@ pub struct TableScan {
pub filters: Vec<Expr>,
/// Optional number of rows to read
pub fetch: Option<usize>,
/// Ordering for the scan
pub ordering: Option<ScanOrdering>,
Comment on lines +2688 to +2689
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be ScanOrdering instead of Option<ScanOrdering>?

}

impl Debug for TableScan {
Expand All @@ -2619,6 +2698,7 @@ impl Debug for TableScan {
.field("projected_schema", &self.projected_schema)
.field("filters", &self.filters)
.field("fetch", &self.fetch)
.field("ordering", &self.ordering)
.finish_non_exhaustive()
}
}
Expand All @@ -2630,6 +2710,7 @@ impl PartialEq for TableScan {
&& self.projected_schema == other.projected_schema
&& self.filters == other.filters
&& self.fetch == other.fetch
&& self.ordering == other.ordering
}
}

Expand All @@ -2649,18 +2730,22 @@ impl PartialOrd for TableScan {
pub filters: &'a Vec<Expr>,
/// Optional number of rows to read
pub fetch: &'a Option<usize>,
/// Ordering information passed from the query to the scan.
pub ordering: &'a Option<ScanOrdering>,
}
let comparable_self = ComparableTableScan {
table_name: &self.table_name,
projection: &self.projection,
filters: &self.filters,
fetch: &self.fetch,
ordering: &self.ordering,
};
let comparable_other = ComparableTableScan {
table_name: &other.table_name,
projection: &other.projection,
filters: &other.filters,
fetch: &other.fetch,
ordering: &other.ordering,
};
comparable_self
.partial_cmp(&comparable_other)
Expand All @@ -2676,6 +2761,7 @@ impl Hash for TableScan {
self.projected_schema.hash(state);
self.filters.hash(state);
self.fetch.hash(state);
self.ordering.hash(state);
}
}

Expand Down Expand Up @@ -2729,8 +2815,16 @@ impl TableScan {
projected_schema,
filters,
fetch,
ordering: None,
})
}

/// Sets the ordering information for the scan.
/// See [`ScanOrdering`] for details on how this is used.
pub fn with_ordering(mut self, ordering: ScanOrdering) -> Self {
self.ordering = Some(ordering);
self
}
}

// Repartition the plan based on a partitioning scheme.
Expand Down Expand Up @@ -4823,6 +4917,7 @@ mod tests {
projected_schema: Arc::clone(&schema),
filters: vec![],
fetch: None,
ordering: None,
}));
let col = schema.field_names()[0].clone();

Expand Down Expand Up @@ -4853,6 +4948,7 @@ mod tests {
projected_schema: Arc::clone(&unique_schema),
filters: vec![],
fetch: None,
ordering: None,
}));
let col = schema.field_names()[0].clone();

Expand Down
105 changes: 105 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ impl LogicalPlan {
projected_schema,
filters,
fetch,
ordering,
}) => filters.map_elements(f)?.update_data(|filters| {
LogicalPlan::TableScan(TableScan {
table_name,
Expand All @@ -607,6 +608,7 @@ impl LogicalPlan {
projected_schema,
filters,
fetch,
ordering,
})
}),
LogicalPlan::Distinct(Distinct::On(DistinctOn {
Expand Down Expand Up @@ -868,3 +870,106 @@ impl LogicalPlan {
})
}
}

/// A node context object beneficial for writing optimizer rules.
/// This context encapsulates a [`LogicalPlan`] node with a payload.
///
/// Since each wrapped node has its children within both the [`LogicalPlanContext.plan.inputs()`],
/// as well as separately within the [`LogicalPlanContext.children`] (which are child nodes wrapped in the context),
/// it's important to keep these child plans in sync when performing mutations.
///
/// Since there are two ways to access child plans directly — it's recommended
/// to perform mutable operations via [`Self::update_plan_from_children`].
/// After mutating the `LogicalPlanContext.children`, or after creating the `LogicalPlanContext`,
/// call `update_plan_from_children` to sync.
///
/// See also:
/// - [`datafusion_common::tree_node::TreeNode`] trait for tree traversal and mutation utilities.
/// - [`datafusion_common::tree_node::ConcreteTreeNode`] trait for integrating with the tree node utilities.
/// - [`datafusion::physical_plan::tree_node::PlanContext`] for a similar context for physical plans.
///
/// [`datafusion::physical_plan::tree_node::PlanContext`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/tree_node/struct.PlanContext.html
#[derive(Debug, Clone)]
pub struct LogicalPlanContext<T: Sized> {
/// The logical plan associated with this context.
pub plan: LogicalPlan,
/// Custom data payload of the node.
pub data: T,
/// Child contexts of this node.
pub children: Vec<Self>,
}

impl<T> LogicalPlanContext<T> {
pub fn new(plan: LogicalPlan, data: T, children: Vec<Self>) -> Self {
Self {
plan,
data,
children,
}
}

/// Update the [`LogicalPlanContext.plan.inputs()`] from the [`LogicalPlanContext.children`],
/// if the `LogicalPlanContext.children` have been changed.
pub fn update_plan_from_children(mut self) -> Result<Self> {
// Get the plans from all children
let children_plans = self
.children
.iter()
.map(|c| c.plan.clone())
.collect::<Vec<_>>();

// Use TreeNode's map_children to reconstruct the plan with new children
let mut child_iter = children_plans.into_iter();
self.plan = self
.plan
.clone()
.map_children(|_| {
// Replace each child with the corresponding child from our context
child_iter.next().map(Transformed::no).ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
"Mismatch between plan children and context children".to_string(),
)
})
})?
.data;

Ok(self)
}
}

impl<T: Default> LogicalPlanContext<T> {
pub fn new_default(plan: LogicalPlan) -> Self {
let children = plan
.inputs()
.into_iter()
.cloned()
.map(Self::new_default)
.collect();
Self::new(plan, Default::default(), children)
}
}

impl<T: std::fmt::Display> std::fmt::Display for LogicalPlanContext<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let node_string = self.plan.display_indent();
write!(f, "Node plan: {node_string}")?;
write!(f, "Node data: {}", self.data)?;
write!(f, "")
}
}

impl<T> datafusion_common::tree_node::ConcreteTreeNode for LogicalPlanContext<T> {
fn children(&self) -> &[Self] {
&self.children
}

fn take_children(mut self) -> (Self, Vec<Self>) {
let children = std::mem::take(&mut self.children);
(self, children)
}

fn with_new_children(mut self, children: Vec<Self>) -> Result<Self> {
self.children = children;
self.update_plan_from_children()
}
}
1 change: 1 addition & 0 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod optimizer;
pub mod propagate_empty_relation;
pub mod push_down_filter;
pub mod push_down_limit;
pub mod push_down_sort;
pub mod replace_distinct_aggregate;
pub mod scalar_subquery_to_join;
pub mod simplify_expressions;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ fn optimize_projections(
filters,
fetch,
projected_schema: _,
ordering,
} = table_scan;

// Get indices referred to in the original (schema with all fields)
Expand All @@ -275,6 +276,10 @@ fn optimize_projections(
filters,
fetch,
)
.map(|s| match ordering {
Some(ordering) => s.with_ordering(ordering),
None => s,
})
.map(LogicalPlan::TableScan)
.map(Transformed::yes);
}
Expand Down
Loading