Skip to content

Commit ac34b8b

Browse files
committed
Fix blocking issue and clean up projection logic
Remove the EXPLAIN-time UPDATE ... FROM rejection in statement.rs to allow the SQL planner to expose the joined logical plan. Adjust regression test in sql_integration.rs to assert the Explain -> Dml(Update) plan shape. Consolidate duplicated projection-walking logic in physical_planner.rs by using a shared helper function for extract_update_assignments(). This simplifies identity-check and qualifier-normalization rules.
1 parent 85145d8 commit ac34b8b

3 files changed

Lines changed: 49 additions & 64 deletions

File tree

datafusion/core/src/physical_planner.rs

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2277,42 +2277,22 @@ fn extract_update_assignments(
22772277

22782278
// Find the top-level projection
22792279
if let LogicalPlan::Projection(projection) = input.as_ref() {
2280-
for expr in &projection.expr {
2281-
if let Expr::Alias(alias) = expr {
2282-
// The alias name is the column name being updated
2283-
// The inner expression is the new value
2284-
let column_name = alias.name.clone();
2285-
// Only include if it's not just a column reference to itself
2286-
// (those are columns that aren't being updated)
2287-
if !is_identity_assignment(&alias.expr, &column_name, &target_refs) {
2288-
let assignment_expr = normalize_update_assignment_expr(
2289-
(*alias.expr).clone(),
2290-
strip_qualifiers,
2291-
)?;
2292-
assignments.push((column_name, assignment_expr));
2293-
}
2294-
}
2295-
}
2280+
append_update_assignments(
2281+
&mut assignments,
2282+
projection,
2283+
&target_refs,
2284+
strip_qualifiers,
2285+
)?;
22962286
} else {
22972287
// Try to find projection deeper in the plan
22982288
input.apply(|node| {
22992289
if let LogicalPlan::Projection(projection) = node {
2300-
for expr in &projection.expr {
2301-
if let Expr::Alias(alias) = expr {
2302-
let column_name = alias.name.clone();
2303-
if !is_identity_assignment(
2304-
&alias.expr,
2305-
&column_name,
2306-
&target_refs,
2307-
) {
2308-
let assignment_expr = normalize_update_assignment_expr(
2309-
(*alias.expr).clone(),
2310-
strip_qualifiers,
2311-
)?;
2312-
assignments.push((column_name, assignment_expr));
2313-
}
2314-
}
2315-
}
2290+
append_update_assignments(
2291+
&mut assignments,
2292+
projection,
2293+
&target_refs,
2294+
strip_qualifiers,
2295+
)?;
23162296
return Ok(TreeNodeRecursion::Stop);
23172297
}
23182298
Ok(TreeNodeRecursion::Continue)
@@ -2322,6 +2302,31 @@ fn extract_update_assignments(
23222302
Ok(assignments)
23232303
}
23242304

2305+
fn append_update_assignments(
2306+
assignments: &mut Vec<(String, Expr)>,
2307+
projection: &Projection,
2308+
target_refs: &[TableReference],
2309+
strip_qualifiers: bool,
2310+
) -> Result<()> {
2311+
for expr in &projection.expr {
2312+
if let Expr::Alias(alias) = expr {
2313+
// The alias name is the column name being updated
2314+
// The inner expression is the new value
2315+
let column_name = alias.name.clone();
2316+
// Only include if it's not just a column reference to itself
2317+
// (those are columns that aren't being updated)
2318+
if !is_identity_assignment(&alias.expr, &column_name, target_refs) {
2319+
let assignment_expr = normalize_update_assignment_expr(
2320+
(*alias.expr).clone(),
2321+
strip_qualifiers,
2322+
)?;
2323+
assignments.push((column_name, assignment_expr));
2324+
}
2325+
}
2326+
}
2327+
Ok(())
2328+
}
2329+
23252330
fn collect_target_refs(
23262331
input: &Arc<LogicalPlan>,
23272332
target: &TableReference,

datafusion/sql/src/statement.rs

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use crate::utils::normalize_ident;
3232
use arrow::datatypes::{Field, FieldRef, Fields};
3333
use datafusion_common::error::_plan_err;
3434
use datafusion_common::parsers::CompressionTypeVariant;
35-
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
3635
use datafusion_common::{
3736
Column, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
3837
ScalarValue, SchemaError, SchemaReference, TableReference, ToDFSchema, exec_err,
@@ -1913,12 +1912,6 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
19131912
statement: DFStatement,
19141913
) -> Result<LogicalPlan> {
19151914
let plan = self.statement_to_plan(statement)?;
1916-
// TODO: remove this guard once UPDATE ... FROM routing via
1917-
// TableProvider::update_from(...) and joined assignment handling land.
1918-
// See https://github.com/apache/datafusion/issues/19950.
1919-
if update_uses_joined_input(&plan)? {
1920-
return not_impl_err!("UPDATE ... FROM is not supported");
1921-
}
19221915
if matches!(plan, LogicalPlan::Explain(_)) {
19231916
return plan_err!("Nested EXPLAINs are not supported");
19241917
}
@@ -2573,24 +2566,3 @@ ON p.function_name = r.routine_name
25732566
}
25742567
}
25752568
}
2576-
2577-
fn update_uses_joined_input(plan: &LogicalPlan) -> Result<bool> {
2578-
let LogicalPlan::Dml(DmlStatement {
2579-
op: WriteOp::Update,
2580-
input,
2581-
..
2582-
}) = plan
2583-
else {
2584-
return Ok(false);
2585-
};
2586-
2587-
let mut has_join = false;
2588-
input.apply(|node| {
2589-
if matches!(node, LogicalPlan::Join(_)) {
2590-
has_join = true;
2591-
return Ok(TreeNodeRecursion::Stop);
2592-
}
2593-
Ok(TreeNodeRecursion::Continue)
2594-
})?;
2595-
Ok(has_join)
2596-
}

datafusion/sql/tests/sql_integration.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -761,13 +761,21 @@ fn plan_update_from_with_aliases() {
761761
}
762762

763763
#[test]
764-
fn explain_update_from_is_rejected() {
764+
fn plan_explain_update_from() {
765765
let sql = "EXPLAIN UPDATE t1 SET b = t2.b, c = t2.a, d = 1 \
766766
FROM t2 WHERE t1.a = t2.a AND t1.b > 'foo' AND t2.c > 1.0";
767-
let err = logical_plan(sql).expect_err("EXPLAIN UPDATE ... FROM should fail");
767+
let plan = logical_plan(sql).unwrap();
768768
assert_snapshot!(
769-
err.strip_backtrace(),
770-
@r#"This feature is not implemented: UPDATE ... FROM is not supported"#
769+
plan,
770+
@r#"
771+
Explain
772+
Dml: op=[Update] table=[t1]
773+
Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
774+
Filter: t1.a = t2.a AND t1.b > Utf8("foo") AND t2.c > Float64(1)
775+
Cross Join:
776+
TableScan: t1
777+
TableScan: t2
778+
"#
771779
);
772780
}
773781

0 commit comments

Comments
 (0)