Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 229 additions & 11 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ async fn test_static_filter_pushdown_through_hash_join() {
"
);

// Test left join - filters should NOT be pushed down
// Test left join: filter on preserved (build) side is pushed down,
// filter on non-preserved (probe) side is NOT pushed down.
let join = Arc::new(
HashJoinExec::try_new(
TestScanBuilder::new(Arc::clone(&build_side_schema))
Expand All @@ -425,25 +426,30 @@ async fn test_static_filter_pushdown_through_hash_join() {
);

let join_schema = join.schema();
let filter = col_lit_predicate("a", "aa", &join_schema);
let plan =
Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc<dyn ExecutionPlan>;
// Filter on build side column (preserved): should be pushed down
let left_filter = col_lit_predicate("a", "aa", &join_schema);
// Filter on probe side column (not preserved): should NOT be pushed down
let right_filter = col_lit_predicate("e", "ba", &join_schema);
let filter =
Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap());
let plan = Arc::new(FilterExec::try_new(right_filter, filter).unwrap())
as Arc<dyn ExecutionPlan>;

// Test that filters are NOT pushed down for left join
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown::new(), true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = aa
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
- FilterExec: e@4 = ba
- FilterExec: a@0 = aa
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
output:
Ok:
- FilterExec: a@0 = aa
- FilterExec: e@4 = ba
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
"
);
Expand Down Expand Up @@ -1722,6 +1728,218 @@ async fn test_hashjoin_parent_filter_pushdown() {
);
}

#[test]
fn test_hashjoin_parent_filter_pushdown_same_column_names() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

let build_side_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("build_val", DataType::Utf8, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.build();

let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("probe_val", DataType::Utf8, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.build();

let on = vec![(
col("id", &build_side_schema).unwrap(),
col("id", &probe_side_schema).unwrap(),
)];
let join = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_scan,
on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);

let join_schema = join.schema();

let build_id_filter = col_lit_predicate("id", "aa", &join_schema);
let probe_val_filter = col_lit_predicate("probe_val", "x", &join_schema);

let filter =
Arc::new(FilterExec::try_new(build_id_filter, Arc::clone(&join) as _).unwrap());
let plan = Arc::new(FilterExec::try_new(probe_val_filter, filter).unwrap())
as Arc<dyn ExecutionPlan>;

insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
@r"
OptimizationTest:
input:
- FilterExec: probe_val@3 = x
- FilterExec: id@0 = aa
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, build_val], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, probe_val], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, build_val], file_type=test, pushdown_supported=true, predicate=id@0 = aa
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, probe_val], file_type=test, pushdown_supported=true, predicate=probe_val@1 = x
"
);
}

#[test]
fn test_hashjoin_parent_filter_pushdown_mark_join() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

let left_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("val", DataType::Utf8, false),
]));
let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
.with_support(true)
.build();

let right_schema =
Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
.with_support(true)
.build();

let on = vec![(
col("id", &left_schema).unwrap(),
col("id", &right_schema).unwrap(),
)];
let join = Arc::new(
HashJoinExec::try_new(
left_scan,
right_scan,
on,
None,
&JoinType::LeftMark,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);

let join_schema = join.schema();

let left_filter = col_lit_predicate("val", "x", &join_schema);
let mark_filter = col_lit_predicate("mark", true, &join_schema);

let filter =
Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap());
let plan = Arc::new(FilterExec::try_new(mark_filter, filter).unwrap())
as Arc<dyn ExecutionPlan>;

insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
@r"
OptimizationTest:
input:
- FilterExec: mark@2 = true
- FilterExec: val@1 = x
- HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(id@0, id@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, val], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true
output:
Ok:
- FilterExec: mark@2 = true
- HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(id@0, id@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id, val], file_type=test, pushdown_supported=true, predicate=val@1 = x
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true
"
);
}

/// Test that filters on join key columns are pushed to both sides of semi/anti joins.
/// For LeftSemi/LeftAnti, the output only contains left columns, but filters on
/// join key columns can also be pushed to the right (non-preserved) side because
/// the equijoin condition guarantees the key values match.
#[test]
fn test_hashjoin_parent_filter_pushdown_semi_anti_join() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

let left_schema = Arc::new(Schema::new(vec![
Field::new("k", DataType::Utf8, false),
Field::new("v", DataType::Utf8, false),
]));
let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
.with_support(true)
.build();

let right_schema = Arc::new(Schema::new(vec![
Field::new("k", DataType::Utf8, false),
Field::new("w", DataType::Utf8, false),
]));
let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
.with_support(true)
.build();

let on = vec![(
col("k", &left_schema).unwrap(),
col("k", &right_schema).unwrap(),
)];

let join = Arc::new(
HashJoinExec::try_new(
left_scan,
right_scan,
on,
None,
&JoinType::LeftSemi,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);

let join_schema = join.schema();
// Filter on join key column: k = 'x' — should be pushed to BOTH sides
let key_filter = col_lit_predicate("k", "x", &join_schema);
// Filter on non-key column: v = 'y' — should only be pushed to the left side
let val_filter = col_lit_predicate("v", "y", &join_schema);

let filter =
Arc::new(FilterExec::try_new(key_filter, Arc::clone(&join) as _).unwrap());
let plan = Arc::new(FilterExec::try_new(val_filter, filter).unwrap())
as Arc<dyn ExecutionPlan>;

insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
@r"
OptimizationTest:
input:
- FilterExec: v@1 = y
- FilterExec: k@0 = x
- HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(k@0, k@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, v], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, w], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(k@0, k@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, v], file_type=test, pushdown_supported=true, predicate=k@0 = x AND v@1 = y
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[k, w], file_type=test, pushdown_supported=true, predicate=k@0 = x
"
);
}

/// Integration test for dynamic filter pushdown with TopK.
/// We use an integration test because there are complex interactions in the optimizer rules
/// that the unit tests applying a single optimizer rule do not cover.
Expand Down
Loading