Skip to content

Commit 526f0cb

Browse files
authored
perf: Reduce Box and Arc allocation churn during tree rewriting (#21749)
## Which issue does this PR close? - Closes #21751. ## Rationale for this change Profiling the planner suggests that a surprising amount of time was being spent doing tree rewriting in the logical optimizer. One culprit is `TreeNodeContainer::map_elements()` for `Box<C>` and `Arc<C>`, which do the following: * Fetch the inner `C` value from the `Box`/`Arc` * Pass the innter value to the closure * Wrap the return value of the closure in a newly allocated `Box` / `Arc`, respectively This allocates a fresh `Box` or `Arc` for every node visited while walking an expression or logical plan, even if the tree rewrite we're doing didn't modify the expression/plan node. Instead, we can reuse the current `Box<C>` or `Arc<C>`: use `std::mem::take()` to swap the inner value with `C::default()`, pass the inner value to the closure, and put the result back in the original container. Swapping the inner value with `C::default()` means the container always has a valid value, which is important if the closure panics. For `Arc<C>`, we need to use `Arc::make_mut()`, which only clones if the `Arc` is not unique. This reduces the bytes allocated to plan TPC-H Q13 by ~22% (988 kB -> 765 kB), and reduces allocated blocks by 8.5% (210k -> 192k). ## What changes are included in this PR? * Optimize `Box<C>::map_elements()` and `Arc<C>::map_elements()` as described above * Change `map_children()` for `Expr::Alias` to use `map_elements()`, rather than invoking `f(*expr)` directly; this ensures that it can take advantage of this optimization * Make `LogicalPlan::default()` use a shared `DFSchema`, rather than allocating a fresh `DFSchema` for every call. Because `default()` is not in the hot path for tree rewriting, it is important that it is cheap * Add unit tests for new `map_elements()` behavior * Add note to migration guide for breaking API change ## Are these changes tested? Yes, plus new unit tests added. ## Are there any user-facing changes? Yes: `TreeNodeContainer` impls for `Box<C>` and `Arc<C>` now require `C: Default`. This is a breaking API change for third-party code that implements `TreeNodeContainer` for a custom type. The fix is usually straightforward.
1 parent 9b5e43e commit 526f0cb

6 files changed

Lines changed: 117 additions & 19 deletions

File tree

datafusion/common/src/dfschema.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
use std::collections::{BTreeSet, HashMap, HashSet};
2222
use std::fmt::{Display, Formatter};
2323
use std::hash::Hash;
24-
use std::sync::Arc;
24+
use std::sync::{Arc, LazyLock};
2525

2626
use crate::error::{_plan_err, _schema_err, DataFusionError, Result};
2727
use crate::{
@@ -129,6 +129,13 @@ impl DFSchema {
129129
}
130130
}
131131

132+
/// Returns a reference to a shared empty [`DFSchema`].
133+
pub fn empty_ref() -> &'static DFSchemaRef {
134+
static EMPTY: LazyLock<DFSchemaRef> =
135+
LazyLock::new(|| Arc::new(DFSchema::empty()));
136+
&EMPTY
137+
}
138+
132139
/// Return a reference to the inner Arrow [`Schema`]
133140
///
134141
/// Note this does not have the qualifier information

datafusion/common/src/tree_node.rs

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,9 @@ pub trait TreeNodeContainer<'a, T: 'a>: Sized {
796796
) -> Result<Transformed<Self>>;
797797
}
798798

799-
impl<'a, T: 'a, C: TreeNodeContainer<'a, T>> TreeNodeContainer<'a, T> for Box<C> {
799+
impl<'a, T: 'a, C: TreeNodeContainer<'a, T> + Default> TreeNodeContainer<'a, T>
800+
for Box<C>
801+
{
800802
fn apply_elements<F: FnMut(&'a T) -> Result<TreeNodeRecursion>>(
801803
&'a self,
802804
f: F,
@@ -805,14 +807,24 @@ impl<'a, T: 'a, C: TreeNodeContainer<'a, T>> TreeNodeContainer<'a, T> for Box<C>
805807
}
806808

807809
fn map_elements<F: FnMut(T) -> Result<Transformed<T>>>(
808-
self,
810+
mut self,
809811
f: F,
810812
) -> Result<Transformed<Self>> {
811-
(*self).map_elements(f)?.map_data(|c| Ok(Self::new(c)))
813+
// Rewrite in place so the existing heap allocation can be reused.
814+
// `mem::take` hands the inner `C` to `f` while leaving
815+
// `C::default()` in the slot, so an unwinding drop finds a valid
816+
// `C` even if `f` panics or the `?` short-circuits.
817+
let inner = std::mem::take(&mut *self);
818+
Ok(inner.map_elements(f)?.update_data(|c| {
819+
*self = c;
820+
self
821+
}))
812822
}
813823
}
814824

815-
impl<'a, T: 'a, C: TreeNodeContainer<'a, T> + Clone> TreeNodeContainer<'a, T> for Arc<C> {
825+
impl<'a, T: 'a, C: TreeNodeContainer<'a, T> + Clone + Default> TreeNodeContainer<'a, T>
826+
for Arc<C>
827+
{
816828
fn apply_elements<F: FnMut(&'a T) -> Result<TreeNodeRecursion>>(
817829
&'a self,
818830
f: F,
@@ -821,12 +833,18 @@ impl<'a, T: 'a, C: TreeNodeContainer<'a, T> + Clone> TreeNodeContainer<'a, T> fo
821833
}
822834

823835
fn map_elements<F: FnMut(T) -> Result<Transformed<T>>>(
824-
self,
836+
mut self,
825837
f: F,
826838
) -> Result<Transformed<Self>> {
827-
Arc::unwrap_or_clone(self)
828-
.map_elements(f)?
829-
.map_data(|c| Ok(Arc::new(c)))
839+
// Rewrite in place using the same `mem::take` strategy as
840+
// `Box<C>::map_elements`. `Arc::make_mut` gives us exclusive
841+
// access (cloning `C` first if we were sharing), after which
842+
// `get_mut` is infallible.
843+
let inner = std::mem::take(Arc::make_mut(&mut self));
844+
Ok(inner.map_elements(f)?.update_data(|c| {
845+
*Arc::get_mut(&mut self).unwrap() = c;
846+
self
847+
}))
830848
}
831849
}
832850

@@ -1335,14 +1353,15 @@ impl<T: ConcreteTreeNode> TreeNode for T {
13351353
pub(crate) mod tests {
13361354
use std::collections::HashMap;
13371355
use std::fmt::Display;
1356+
use std::sync::Arc;
13381357

13391358
use crate::Result;
13401359
use crate::tree_node::{
13411360
Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion, TreeNodeRewriter,
13421361
TreeNodeVisitor,
13431362
};
13441363

1345-
#[derive(Debug, Eq, Hash, PartialEq, Clone)]
1364+
#[derive(Debug, Default, Eq, Hash, PartialEq, Clone)]
13461365
pub struct TestTreeNode<T> {
13471366
pub(crate) children: Vec<TestTreeNode<T>>,
13481367
pub(crate) data: T,
@@ -2431,4 +2450,46 @@ pub(crate) mod tests {
24312450

24322451
item.visit(&mut visitor).unwrap();
24332452
}
2453+
2454+
#[test]
2455+
fn box_map_elements_reuses_allocation() {
2456+
let boxed = Box::new(TestTreeNode::new_leaf(42i32));
2457+
let before: *const TestTreeNode<i32> = &*boxed;
2458+
let out = boxed.map_elements(|n| Ok(Transformed::no(n))).unwrap();
2459+
let after: *const TestTreeNode<i32> = &*out.data;
2460+
assert_eq!(after, before);
2461+
}
2462+
2463+
#[test]
2464+
fn arc_map_elements_reuses_allocation_when_unique() {
2465+
let arc = Arc::new(TestTreeNode::new_leaf(42i32));
2466+
let before = Arc::as_ptr(&arc);
2467+
let out = arc.map_elements(|n| Ok(Transformed::no(n))).unwrap();
2468+
assert_eq!(Arc::as_ptr(&out.data), before);
2469+
}
2470+
2471+
#[test]
2472+
fn arc_map_elements_clones_when_shared() {
2473+
// When the input `Arc` is shared, `make_mut` clones into a fresh
2474+
// allocation, so the reuse optimization does not apply.
2475+
let arc = Arc::new(TestTreeNode::new_leaf(42i32));
2476+
let _keepalive = Arc::clone(&arc);
2477+
let before = Arc::as_ptr(&arc);
2478+
let out = arc.map_elements(|n| Ok(Transformed::no(n))).unwrap();
2479+
assert_ne!(Arc::as_ptr(&out.data), before);
2480+
}
2481+
2482+
#[test]
2483+
fn box_map_elements_panic() {
2484+
use std::panic::{AssertUnwindSafe, catch_unwind};
2485+
let boxed = Box::new(TestTreeNode::new_leaf(42i32));
2486+
let result = catch_unwind(AssertUnwindSafe(|| {
2487+
boxed
2488+
.map_elements(|_: TestTreeNode<i32>| -> Result<_> {
2489+
panic!("simulated panic during rewrite")
2490+
})
2491+
.ok()
2492+
}));
2493+
assert!(result.is_err());
2494+
}
24342495
}

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,12 @@ pub enum LogicalPlan {
294294

295295
impl Default for LogicalPlan {
296296
fn default() -> Self {
297+
// `Default` is used as a transient placeholder on hot paths (e.g.
298+
// `Box`/`Arc` `map_elements`), so use a shared empty schema to avoid
299+
// allocating.
297300
LogicalPlan::EmptyRelation(EmptyRelation {
298301
produce_one_row: false,
299-
schema: Arc::new(DFSchema::empty()),
302+
schema: Arc::clone(DFSchema::empty_ref()),
300303
})
301304
}
302305
}

datafusion/expr/src/logical_plan/statement.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use datafusion_common::metadata::format_type_and_metadata;
2020
use datafusion_common::{DFSchema, DFSchemaRef};
2121
use itertools::Itertools as _;
2222
use std::fmt::{self, Display};
23-
use std::sync::{Arc, LazyLock};
23+
use std::sync::Arc;
2424

2525
use crate::{Expr, LogicalPlan, expr_vec_fmt};
2626

@@ -55,10 +55,7 @@ impl Statement {
5555
/// Get a reference to the logical plan's schema
5656
pub fn schema(&self) -> &DFSchemaRef {
5757
// Statements have an unchanging empty schema.
58-
static STATEMENT_EMPTY_SCHEMA: LazyLock<DFSchemaRef> =
59-
LazyLock::new(|| Arc::new(DFSchema::empty()));
60-
61-
&STATEMENT_EMPTY_SCHEMA
58+
DFSchema::empty_ref()
6259
}
6360

6461
/// Return a descriptive string describing the type of this

datafusion/expr/src/tree_node.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl TreeNode for Expr {
116116
/// indicating whether the expression was transformed or left unchanged.
117117
fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
118118
self,
119-
mut f: F,
119+
f: F,
120120
) -> Result<Transformed<Self>> {
121121
Ok(match self {
122122
// TODO: remove the next line after `Expr::Wildcard` is removed
@@ -150,8 +150,13 @@ impl TreeNode for Expr {
150150
relation,
151151
name,
152152
metadata,
153-
}) => f(*expr)?.update_data(|e| {
154-
e.alias_qualified_with_metadata(relation, name, metadata)
153+
}) => expr.map_elements(f)?.update_data(|expr| {
154+
Expr::Alias(Alias {
155+
expr,
156+
relation,
157+
name,
158+
metadata,
159+
})
155160
}),
156161
Expr::InSubquery(InSubquery {
157162
expr,

docs/source/library-user-guide/upgrading/54.0.0.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,3 +347,28 @@ SELECT CAST(approx_percentile_cont(quantity, 0.5) AS BIGINT) FROM orders;
347347
```
348348

349349
[#21074]: https://github.com/apache/datafusion/pull/21074
350+
351+
### `Box<C>` and `Arc<C>` `TreeNodeContainer` impls now require `C: Default`
352+
353+
The generic `TreeNodeContainer` implementations for `Box<C>` and `Arc<C>` now
354+
require `C: Default`. This change was necessary as part of optimizing tree
355+
rewriting to reduce heap allocations.
356+
357+
**Who is affected:**
358+
359+
- Users that implement `TreeNodeContainer` on a custom type and wrap it in
360+
`Box` or `Arc` when walking trees.
361+
362+
**Migration guide:**
363+
364+
Add a `Default` implementation to your type. The default value is used as a
365+
temporary placeholder during query optimization, so when possible, pick a cheap,
366+
allocation-free variant:
367+
368+
```rust,ignore
369+
impl Default for MyTreeNode {
370+
fn default() -> Self {
371+
MyTreeNode::Leaf // or whichever variant is cheapest to construct
372+
}
373+
}
374+
```

0 commit comments

Comments
 (0)