Skip to content

Commit 3b5008a

Browse files
authored
fix: impl handle_child_pushdown_result for SortExec (#21527)
## Which issue does this PR close? - Closes #21526. ## Rationale for this change `FilterPushdown` does not push filters through `SortExec` because `SortExec` lacks an implementation of `handle_child_pushdown_result`. When a `FilterExec` sits above a plain `SortExec` (no fetch), the filter can safely be moved below the sort without changing semantics, since sorting preserves all rows. ## What changes are included in this PR? Implemented `handle_child_pushdown_result` for `SortExec` in `datafusion/physical-plan/src/sorts/sort.rs`: - For plain `SortExec` (no fetch) in the `Pre` phase: any filters not absorbed by the child are collected into a new `FilterExec` inserted between the `SortExec` and its child. - For `SortExec` with fetch (TopK) or non-`Pre` phases: filters are not absorbed, preserving correct TopK semantics where filtering after limiting would change results. ## Are these changes tested? Added comprehensive tests in `datafusion/core/tests/physical_optimizer/filter_pushdown.rs` covering: - Filter pushdown through sort into a scan that supports pushdown - Filter pushdown through sort when scan does NOT support pushdown (FilterExec inserted between sort and scan) - Multiple conjunctive filters pushed through sort - Filter NOT pushed through sort with fetch (TopK) - Filter with projection pushed through sort - Filter pushdown preserving the `preserve_partitioning` flag - Filter with fetch limit propagated to SortExec (TopK conversion) and one test case `test_filter_pushdown_through_sort_with_projection` for use LogicalPlanBuilder to reproduce. not able to use sql to reproduce due to #15886 ## Are there any user-facing changes?
1 parent 4c7bb08 commit 3b5008a

4 files changed

Lines changed: 463 additions & 20 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3046,3 +3046,331 @@ async fn test_discover_dynamic_filters_via_expressions_api() {
30463046
"After optimization, should discover exactly 2 dynamic filters (1 in HashJoinExec, 1 in DataSourceExec), found {count_after}"
30473047
);
30483048
}
3049+
3050+
// ==== Filter pushdown through SortExec tests ====
3051+
3052+
/// FilterExec above a plain SortExec (no fetch) should be pushed below it.
3053+
/// The scan supports pushdown, so the filter lands in the DataSourceExec.
3054+
#[test]
3055+
fn test_filter_pushdown_through_sort_into_scan() {
3056+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
3057+
let sort = Arc::new(SortExec::new(
3058+
LexOrdering::new(vec![PhysicalSortExpr::new_default(
3059+
col("a", &schema()).unwrap(),
3060+
)])
3061+
.unwrap(),
3062+
scan,
3063+
));
3064+
let predicate = col_lit_predicate("a", "foo", &schema());
3065+
let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap());
3066+
3067+
insta::assert_snapshot!(
3068+
OptimizationTest::new(plan, FilterPushdown::new(), true),
3069+
@r"
3070+
OptimizationTest:
3071+
input:
3072+
- FilterExec: a@0 = foo
3073+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
3074+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3075+
output:
3076+
Ok:
3077+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
3078+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
3079+
"
3080+
);
3081+
}
3082+
3083+
/// FilterExec above a plain SortExec (no fetch) when the scan does NOT
3084+
/// support pushdown. The filter should still move below the sort, landing
3085+
/// as a new FilterExec between SortExec and DataSourceExec.
3086+
#[test]
3087+
fn test_filter_pushdown_through_sort_no_scan_support() {
3088+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
3089+
let sort = Arc::new(SortExec::new(
3090+
LexOrdering::new(vec![PhysicalSortExpr::new_default(
3091+
col("a", &schema()).unwrap(),
3092+
)])
3093+
.unwrap(),
3094+
scan,
3095+
));
3096+
let predicate = col_lit_predicate("a", "foo", &schema());
3097+
let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap());
3098+
3099+
insta::assert_snapshot!(
3100+
OptimizationTest::new(plan, FilterPushdown::new(), false),
3101+
@r"
3102+
OptimizationTest:
3103+
input:
3104+
- FilterExec: a@0 = foo
3105+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
3106+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3107+
output:
3108+
Ok:
3109+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
3110+
- FilterExec: a@0 = foo
3111+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3112+
"
3113+
);
3114+
}
3115+
3116+
/// Multiple conjunctive filters above a plain SortExec should all be
3117+
/// pushed below the sort as a single FilterExec.
3118+
#[test]
3119+
fn test_multiple_filters_pushdown_through_sort() {
3120+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
3121+
let sort = Arc::new(SortExec::new(
3122+
LexOrdering::new(vec![PhysicalSortExpr::new_default(
3123+
col("a", &schema()).unwrap(),
3124+
)])
3125+
.unwrap(),
3126+
scan,
3127+
));
3128+
// WHERE a = 'foo' AND b = 'bar'
3129+
let predicate = Arc::new(BinaryExpr::new(
3130+
col_lit_predicate("a", "foo", &schema()),
3131+
Operator::And,
3132+
col_lit_predicate("b", "bar", &schema()),
3133+
)) as Arc<dyn PhysicalExpr>;
3134+
let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap());
3135+
3136+
insta::assert_snapshot!(
3137+
OptimizationTest::new(plan, FilterPushdown::new(), false),
3138+
@r"
3139+
OptimizationTest:
3140+
input:
3141+
- FilterExec: a@0 = foo AND b@1 = bar
3142+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
3143+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3144+
output:
3145+
Ok:
3146+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
3147+
- FilterExec: a@0 = foo AND b@1 = bar
3148+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3149+
"
3150+
);
3151+
}
3152+
3153+
/// FilterExec above a SortExec with fetch (TopK) must NOT be pushed below,
3154+
/// because limiting happens after filtering — changing the order would alter
3155+
/// the result set.
3156+
#[test]
3157+
fn test_filter_not_pushed_through_sort_with_fetch() {
3158+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
3159+
let sort = Arc::new(
3160+
SortExec::new(
3161+
LexOrdering::new(vec![PhysicalSortExpr::new_default(
3162+
col("a", &schema()).unwrap(),
3163+
)])
3164+
.unwrap(),
3165+
scan,
3166+
)
3167+
.with_fetch(Some(10)),
3168+
);
3169+
let predicate = col_lit_predicate("a", "foo", &schema());
3170+
let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap());
3171+
3172+
insta::assert_snapshot!(
3173+
OptimizationTest::new(plan, FilterPushdown::new(), false),
3174+
@r"
3175+
OptimizationTest:
3176+
input:
3177+
- FilterExec: a@0 = foo
3178+
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
3179+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3180+
output:
3181+
Ok:
3182+
- FilterExec: a@0 = foo
3183+
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
3184+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3185+
"
3186+
);
3187+
}
3188+
3189+
/// FilterExec above a SortExec with fetch (TopK) must NOT be pushed below,
3190+
/// because limiting happens after filtering — changing the order would alter
3191+
/// the result set.
3192+
#[test]
3193+
fn test_filter_pushed_through_sort_with_fetch() {
3194+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
3195+
let sort = Arc::new(
3196+
SortExec::new(
3197+
LexOrdering::new(vec![PhysicalSortExpr::new_default(
3198+
col("a", &schema()).unwrap(),
3199+
)])
3200+
.unwrap(),
3201+
scan,
3202+
)
3203+
.with_fetch(Some(10)),
3204+
);
3205+
let predicate = col_lit_predicate("a", "foo", &schema());
3206+
let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap());
3207+
3208+
insta::assert_snapshot!(
3209+
OptimizationTest::new(plan, FilterPushdown::new(), false),
3210+
@r"
3211+
OptimizationTest:
3212+
input:
3213+
- FilterExec: a@0 = foo
3214+
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
3215+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3216+
output:
3217+
Ok:
3218+
- FilterExec: a@0 = foo
3219+
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
3220+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3221+
"
3222+
);
3223+
}
3224+
3225+
/// FilterExec with a projection above SortExec. The filter should be pushed
3226+
/// below the sort, and the projection should be preserved as a
3227+
/// ProjectionExec on top.
3228+
#[test]
3229+
fn test_filter_with_projection_pushdown_through_sort() {
3230+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
3231+
let sort = Arc::new(SortExec::new(
3232+
LexOrdering::new(vec![PhysicalSortExpr::new_default(
3233+
col("a", &schema()).unwrap(),
3234+
)])
3235+
.unwrap(),
3236+
scan,
3237+
));
3238+
// FilterExec: b = 'bar', projection=[a] (only output column a)
3239+
let predicate = col_lit_predicate("b", "bar", &schema());
3240+
let plan = Arc::new(
3241+
FilterExecBuilder::new(predicate, sort)
3242+
.apply_projection(Some(vec![0]))
3243+
.unwrap()
3244+
.build()
3245+
.unwrap(),
3246+
);
3247+
3248+
insta::assert_snapshot!(
3249+
OptimizationTest::new(plan, FilterPushdown::new(), false),
3250+
@r"
3251+
OptimizationTest:
3252+
input:
3253+
- FilterExec: b@1 = bar, projection=[a@0]
3254+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
3255+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3256+
output:
3257+
Ok:
3258+
- ProjectionExec: expr=[a@0 as a]
3259+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
3260+
- FilterExec: b@1 = bar
3261+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3262+
"
3263+
);
3264+
}
3265+
3266+
/// SortExec with preserve_partitioning=true should keep that setting after
3267+
/// filters are pushed below it.
3268+
#[test]
3269+
fn test_filter_pushdown_through_sort_preserves_partitioning() {
3270+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
3271+
let sort = Arc::new(
3272+
SortExec::new(
3273+
LexOrdering::new(vec![PhysicalSortExpr::new_default(
3274+
col("a", &schema()).unwrap(),
3275+
)])
3276+
.unwrap(),
3277+
scan,
3278+
)
3279+
.with_preserve_partitioning(true),
3280+
);
3281+
let predicate = col_lit_predicate("a", "foo", &schema());
3282+
let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap());
3283+
3284+
insta::assert_snapshot!(
3285+
OptimizationTest::new(plan, FilterPushdown::new(), false),
3286+
@r"
3287+
OptimizationTest:
3288+
input:
3289+
- FilterExec: a@0 = foo
3290+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
3291+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3292+
output:
3293+
Ok:
3294+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
3295+
- FilterExec: a@0 = foo
3296+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3297+
"
3298+
);
3299+
}
3300+
3301+
/// FilterExec **with a fetch limit** above a plain SortExec. When the filter
3302+
/// is pushed below the sort the fetch should be propagated to the SortExec
3303+
/// (turning it into a TopK).
3304+
#[test]
3305+
fn test_filter_with_fetch_pushdown_through_sort() {
3306+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
3307+
let sort = Arc::new(SortExec::new(
3308+
LexOrdering::new(vec![PhysicalSortExpr::new_default(
3309+
col("a", &schema()).unwrap(),
3310+
)])
3311+
.unwrap(),
3312+
scan,
3313+
));
3314+
let predicate = col_lit_predicate("a", "foo", &schema());
3315+
let plan = Arc::new(
3316+
FilterExecBuilder::new(predicate, sort)
3317+
.with_fetch(Some(10))
3318+
.build()
3319+
.unwrap(),
3320+
);
3321+
3322+
insta::assert_snapshot!(
3323+
OptimizationTest::new(plan, FilterPushdown::new(), false),
3324+
@r"
3325+
OptimizationTest:
3326+
input:
3327+
- FilterExec: a@0 = foo, fetch=10
3328+
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
3329+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3330+
output:
3331+
Ok:
3332+
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
3333+
- FilterExec: a@0 = foo
3334+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3335+
"
3336+
);
3337+
}
3338+
3339+
#[test]
3340+
fn test_filter_pushdown_through_sort_with_projection() {
3341+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
3342+
let sort = Arc::new(SortExec::new(
3343+
LexOrdering::new(vec![PhysicalSortExpr::new(
3344+
col("a", &schema()).unwrap(),
3345+
SortOptions::new(true, false), // descending, nulls_last
3346+
)])
3347+
.unwrap(),
3348+
scan,
3349+
));
3350+
// FilterExec: b = 'bar', projection=[a] (only output column a)
3351+
let predicate = col_lit_predicate("b", "bar", &schema());
3352+
let plan = Arc::new(
3353+
FilterExecBuilder::new(predicate, sort)
3354+
.apply_projection(Some(vec![0]))
3355+
.unwrap()
3356+
.build()
3357+
.unwrap(),
3358+
);
3359+
3360+
insta::assert_snapshot!(
3361+
OptimizationTest::new(plan, FilterPushdown::new(), false),
3362+
@r"
3363+
OptimizationTest:
3364+
input:
3365+
- FilterExec: b@1 = bar, projection=[a@0]
3366+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
3367+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3368+
output:
3369+
Ok:
3370+
- ProjectionExec: expr=[a@0 as a]
3371+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
3372+
- FilterExec: b@1 = bar
3373+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
3374+
"
3375+
);
3376+
}

0 commit comments

Comments
 (0)