Skip to content

Commit ca5a1ae

Browse files
Subham SinghalSubham Singhal
authored andcommitted
Fix build failure
1 parent 5c2c0fb commit ca5a1ae

5 files changed

Lines changed: 22 additions & 11 deletions

File tree

datafusion/core/tests/physical_optimizer/window_topn.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ fn plan_str(plan: &dyn ExecutionPlan) -> String {
5252
}
5353

5454
fn optimize(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
55-
let config = ConfigOptions::new();
55+
let mut config = ConfigOptions::new();
56+
config.optimizer.enable_window_topn = true;
5657
WindowTopN::new().optimize(plan, &config)
5758
}
5859

datafusion/physical-optimizer/src/window_topn.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,16 @@ use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr};
8181
/// - `K >= rn` (flipped) → fetch = K
8282
/// - `K > rn` (flipped) → fetch = K - 1
8383
///
84-
/// # When the Rule Does NOT Fire
84+
/// # When the Rule Fires
8585
///
86-
/// - Window function is not `ROW_NUMBER` (e.g., `RANK`, `DENSE_RANK`)
87-
/// - No `PARTITION BY` clause (global top-K is already handled by
88-
/// `SortExec` with `fetch`)
89-
/// - Filter predicate is on a data column, not the window output column
90-
/// - `FilterExec` has an embedded projection
91-
/// - Child of `BoundedWindowAggExec` is not a `SortExec`
92-
/// - Config flag `enable_window_topn` is `false`
86+
/// All of the following must be true:
87+
/// - Config flag `enable_window_topn` is `true`
88+
/// - The plan matches `FilterExec → [ProjectionExec] → BoundedWindowAggExec → SortExec`
89+
/// - The window function is `ROW_NUMBER` (not `RANK`, `DENSE_RANK`, etc.)
90+
/// - `ROW_NUMBER` has a `PARTITION BY` clause (global top-K is already
91+
/// handled by `SortExec` with `fetch`)
92+
/// - The filter predicate compares the window output column to an integer
93+
/// literal using `<=`, `<`, `>=`, or `>`
9394
///
9495
/// [`PartitionedTopKExec`]: datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec
9596
#[derive(Default, Clone, Debug)]

datafusion/physical-plan/src/sorts/partitioned_topk.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,12 @@ impl ExecutionPlan for PartitionedTopKExec {
295295
}
296296

297297
fn required_input_distribution(&self) -> Vec<Distribution> {
298-
vec![Distribution::UnspecifiedDistribution]
298+
let partition_exprs: Vec<Arc<dyn PhysicalExpr>> = self.expr
299+
[..self.partition_prefix_len]
300+
.iter()
301+
.map(|e| Arc::clone(&e.expr))
302+
.collect();
303+
vec![Distribution::HashPartitioned(partition_exprs)]
299304
}
300305

301306
fn maintains_input_order(&self) -> Vec<bool> {

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ physical_plan after EnforceDistribution SAME TEXT AS ABOVE
239239
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
240240
physical_plan after EnforceSorting SAME TEXT AS ABOVE
241241
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
242+
physical_plan after WindowTopN SAME TEXT AS ABOVE
242243
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
243244
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
244245
physical_plan after LimitAggregation SAME TEXT AS ABOVE
@@ -319,6 +320,7 @@ physical_plan after EnforceDistribution SAME TEXT AS ABOVE
319320
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
320321
physical_plan after EnforceSorting SAME TEXT AS ABOVE
321322
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
323+
physical_plan after WindowTopN SAME TEXT AS ABOVE
322324
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
323325
physical_plan after OutputRequirements
324326
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]]
@@ -365,6 +367,7 @@ physical_plan after EnforceDistribution SAME TEXT AS ABOVE
365367
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
366368
physical_plan after EnforceSorting SAME TEXT AS ABOVE
367369
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
370+
physical_plan after WindowTopN SAME TEXT AS ABOVE
368371
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
369372
physical_plan after OutputRequirements
370373
01)GlobalLimitExec: skip=0, fetch=10
@@ -611,6 +614,7 @@ physical_plan after EnforceDistribution SAME TEXT AS ABOVE
611614
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
612615
physical_plan after EnforceSorting SAME TEXT AS ABOVE
613616
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
617+
physical_plan after WindowTopN SAME TEXT AS ABOVE
614618
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
615619
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
616620
physical_plan after LimitAggregation SAME TEXT AS ABOVE

docs/source/user-guide/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ The following configuration settings are available:
140140
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
141141
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
142142
| datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible |
143-
| datafusion.optimizer.enable_window_topn | true | When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. |
143+
| datafusion.optimizer.enable_window_topn | false | When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. |
144144
| datafusion.optimizer.enable_topk_repartition | true | When set to true, the optimizer will push TopK (Sort with fetch) below hash repartition when the partition key is a prefix of the sort key, reducing data volume before the shuffle. |
145145
| datafusion.optimizer.enable_topk_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. |
146146
| datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. |

0 commit comments

Comments
 (0)