Skip to content

Commit 0ce5ffe

Browse files
committed
Refactor DML analysis and update planner tests
Consolidate duplicated joined-update and target-alias walks in physical_planner.rs by implementing a shared analyze_dml_input(...) helper. Update the filter and assignment extraction to utilize this common metadata. In sql_integration.rs, encapsulate the t1/t2 setup within a local UpdatePlanningContextProvider for new joined-update planner tests, eliminating unnecessary table names from the shared mock catalog in common/mod.rs.
1 parent d76d918 commit 0ce5ffe

3 files changed

Lines changed: 186 additions & 85 deletions

File tree

datafusion/core/src/physical_planner.rs

Lines changed: 48 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -777,10 +777,11 @@ impl DefaultPhysicalPlanner {
777777
input,
778778
..
779779
}) => {
780+
let analysis = analyze_dml_input(input, table_name)?;
780781
// TODO: remove this guard once UPDATE ... FROM routing via
781782
// TableProvider::update_from(...) and joined assignment handling land.
782783
// See https://github.com/apache/datafusion/issues/19950.
783-
if update_uses_joined_input(input)? {
784+
if analysis.has_joined_input {
784785
return not_impl_err!("UPDATE ... FROM is not supported");
785786
}
786787

@@ -2105,27 +2106,15 @@ fn extract_dml_filters(
21052106
input: &Arc<LogicalPlan>,
21062107
target: &TableReference,
21072108
) -> Result<Vec<Expr>> {
2109+
let analysis = analyze_dml_input(input, target)?;
21082110
let mut filters = Vec::new();
2109-
let mut allowed_refs = vec![target.clone()];
2110-
2111-
// First pass: collect any alias references to the target table
2112-
input.apply(|node| {
2113-
if let LogicalPlan::SubqueryAlias(alias) = node
2114-
// Check if this alias points to the target table
2115-
&& let LogicalPlan::TableScan(scan) = alias.input.as_ref()
2116-
&& scan.table_name.resolved_eq(target)
2117-
{
2118-
allowed_refs.push(TableReference::bare(alias.alias.to_string()));
2119-
}
2120-
Ok(TreeNodeRecursion::Continue)
2121-
})?;
21222111

21232112
input.apply(|node| {
21242113
match node {
21252114
LogicalPlan::Filter(filter) => {
21262115
// Split AND predicates into individual expressions
21272116
for predicate in split_conjunction(&filter.predicate) {
2128-
if predicate_is_on_target_multi(predicate, &allowed_refs)? {
2117+
if predicate_is_on_target_multi(predicate, &analysis.target_refs)? {
21292118
filters.push(predicate.clone());
21302119
}
21312120
}
@@ -2201,16 +2190,45 @@ fn extract_dml_filters(
22012190
})
22022191
}
22032192

2204-
fn update_uses_joined_input(input: &Arc<LogicalPlan>) -> Result<bool> {
2205-
let mut has_join = false;
2206-
input.apply(|node| {
2207-
if matches!(node, LogicalPlan::Join(_)) {
2208-
has_join = true;
2209-
return Ok(TreeNodeRecursion::Stop);
2193+
#[derive(Debug)]
2194+
struct DmlInputAnalysis {
2195+
has_joined_input: bool,
2196+
target_refs: Vec<TableReference>,
2197+
}
2198+
2199+
fn analyze_dml_input(
2200+
input: &Arc<LogicalPlan>,
2201+
target: &TableReference,
2202+
) -> Result<DmlInputAnalysis> {
2203+
let mut analysis = DmlInputAnalysis {
2204+
has_joined_input: false,
2205+
target_refs: vec![target.clone()],
2206+
};
2207+
analyze_target_branch(input, &mut analysis)?;
2208+
Ok(analysis)
2209+
}
2210+
2211+
fn analyze_target_branch(
2212+
input: &Arc<LogicalPlan>,
2213+
analysis: &mut DmlInputAnalysis,
2214+
) -> Result<()> {
2215+
match input.as_ref() {
2216+
LogicalPlan::Projection(projection) => {
2217+
analyze_target_branch(&projection.input, analysis)
22102218
}
2211-
Ok(TreeNodeRecursion::Continue)
2212-
})?;
2213-
Ok(has_join)
2219+
LogicalPlan::Filter(filter) => analyze_target_branch(&filter.input, analysis),
2220+
LogicalPlan::SubqueryAlias(alias) => {
2221+
analysis
2222+
.target_refs
2223+
.push(TableReference::bare(alias.alias.to_string()));
2224+
analyze_target_branch(&alias.input, analysis)
2225+
}
2226+
LogicalPlan::Join(join) => {
2227+
analysis.has_joined_input = true;
2228+
analyze_target_branch(&join.left, analysis)
2229+
}
2230+
_ => Ok(()),
2231+
}
22142232
}
22152233

22162234
/// Determine whether a predicate references only columns from the target table
@@ -2259,7 +2277,8 @@ fn strip_column_qualifiers(expr: Expr) -> Result<Expr> {
22592277
/// Extract column assignments from an UPDATE input plan.
22602278
/// For UPDATE statements, the SQL planner encodes assignments as a projection
22612279
/// over the source table. This function extracts column name and expression pairs
2262-
/// from the projection. Column qualifiers are stripped from the expressions.
2280+
/// from the projection. Column qualifiers are stripped only for single-table
2281+
/// updates so provider-facing expressions remain resolvable.
22632282
///
22642283
fn extract_update_assignments(
22652284
input: &Arc<LogicalPlan>,
@@ -2271,16 +2290,16 @@ fn extract_update_assignments(
22712290
// TableScan
22722291
//
22732292
// Each projected expression has an alias matching the column name
2293+
let analysis = analyze_dml_input(input, target)?;
22742294
let mut assignments = Vec::new();
2275-
let strip_qualifiers = !update_uses_joined_input(input)?;
2276-
let target_refs = collect_target_refs(input, target)?;
2295+
let strip_qualifiers = !analysis.has_joined_input;
22772296

22782297
// Find the top-level projection
22792298
if let LogicalPlan::Projection(projection) = input.as_ref() {
22802299
append_update_assignments(
22812300
&mut assignments,
22822301
projection,
2283-
&target_refs,
2302+
&analysis.target_refs,
22842303
strip_qualifiers,
22852304
)?;
22862305
} else {
@@ -2290,7 +2309,7 @@ fn extract_update_assignments(
22902309
append_update_assignments(
22912310
&mut assignments,
22922311
projection,
2293-
&target_refs,
2312+
&analysis.target_refs,
22942313
strip_qualifiers,
22952314
)?;
22962315
return Ok(TreeNodeRecursion::Stop);
@@ -2327,37 +2346,6 @@ fn append_update_assignments(
23272346
Ok(())
23282347
}
23292348

2330-
fn collect_target_refs(
2331-
input: &Arc<LogicalPlan>,
2332-
target: &TableReference,
2333-
) -> Result<Vec<TableReference>> {
2334-
let mut target_refs = vec![target.clone()];
2335-
collect_target_refs_from_target_branch(input, &mut target_refs)?;
2336-
Ok(target_refs)
2337-
}
2338-
2339-
fn collect_target_refs_from_target_branch(
2340-
input: &Arc<LogicalPlan>,
2341-
target_refs: &mut Vec<TableReference>,
2342-
) -> Result<()> {
2343-
match input.as_ref() {
2344-
LogicalPlan::Projection(projection) => {
2345-
collect_target_refs_from_target_branch(&projection.input, target_refs)
2346-
}
2347-
LogicalPlan::Filter(filter) => {
2348-
collect_target_refs_from_target_branch(&filter.input, target_refs)
2349-
}
2350-
LogicalPlan::Join(join) => {
2351-
collect_target_refs_from_target_branch(&join.left, target_refs)
2352-
}
2353-
LogicalPlan::SubqueryAlias(alias) => {
2354-
target_refs.push(TableReference::bare(alias.alias.to_string()));
2355-
collect_target_refs_from_target_branch(&alias.input, target_refs)
2356-
}
2357-
_ => Ok(()),
2358-
}
2359-
}
2360-
23612349
fn normalize_update_assignment_expr(expr: Expr, strip_qualifiers: bool) -> Result<Expr> {
23622350
if strip_qualifiers {
23632351
strip_column_qualifiers(expr)

datafusion/sql/tests/common/mod.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,6 @@ impl ContextProvider for MockContextProvider {
124124
Field::new("id", DataType::Int32, false),
125125
Field::new("price", DataType::Decimal128(10, 2), false),
126126
])),
127-
"t1" => Ok(Schema::new(vec![
128-
Field::new("a", DataType::Int32, false),
129-
Field::new("b", DataType::Utf8, false),
130-
Field::new("c", DataType::Float64, false),
131-
Field::new("d", DataType::Int32, false),
132-
])),
133-
"t2" => Ok(Schema::new(vec![
134-
Field::new("a", DataType::Int32, false),
135-
Field::new("b", DataType::Utf8, false),
136-
Field::new("c", DataType::Float64, false),
137-
Field::new("d", DataType::Int32, false),
138-
])),
139127
"person" => Ok(Schema::new(vec![
140128
Field::new("id", DataType::UInt32, false),
141129
Field::new("first_name", DataType::Utf8, false),

0 commit comments

Comments
 (0)