Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,35 @@ impl JoinType {
}
}

/// Whether each side of the join is preserved for ON-clause filter pushdown.
///
/// It is only correct to push ON-clause filters below a join for preserved
/// inputs.
///
/// # "Preserved" input definition
///
/// A join side is preserved if the join returns all or a subset of the rows
/// from that side, such that each output row directly maps to an input row.
/// If a side is not preserved, the join can produce extra null rows that
/// don't map to any input row.
///
/// # Return Value
///
/// A tuple of booleans - (left_preserved, right_preserved).
pub fn on_lr_is_preserved(&self) -> (bool, bool) {
match self {
JoinType::Inner => (true, true),
JoinType::Left => (false, true),
JoinType::Right => (true, false),
JoinType::Full => (false, false),
JoinType::LeftSemi | JoinType::RightSemi => (true, true),
JoinType::LeftAnti => (false, true),
JoinType::RightAnti => (true, false),
JoinType::LeftMark => (false, true),
JoinType::RightMark => (true, false),
}
}

/// Does the join type support swapping inputs?
pub fn supports_swap(&self) -> bool {
matches!(
Expand Down
274 changes: 274 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4086,3 +4086,277 @@ async fn test_filter_with_projection_pushdown() {
];
assert_batches_eq!(expected, &result);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_left_join() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create build side with limited values
let build_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
("c", Float64, [1.0, 2.0])
)
.unwrap(),
];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values (some won't match)
let probe_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("b", Utf8, ["ba", "bb", "bc", "bd"]),
("e", Float64, [1.0, 2.0, 3.0, 4.0])
)
.unwrap(),
];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("e", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create HashJoinExec with Left join and CollectLeft mode
let on = vec![
(
col("a", &build_side_schema).unwrap(),
col("a", &probe_side_schema).unwrap(),
),
(
col("b", &build_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
),
];
let plan = Arc::new(
HashJoinExec::try_new(
build_scan,
Arc::clone(&probe_scan),
on,
None,
&JoinType::Left,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// Expect the dynamic filter predicate to be pushed down into the probe side DataSource
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
",
);

// Actually apply the optimization and execute the plan
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();

// Test that dynamic filter linking survives with_new_children
let children = plan.children().into_iter().map(Arc::clone).collect();
let plan = plan.with_new_children(children).unwrap();

let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
.await
.unwrap();

// After execution, verify the dynamic filter was populated with bounds and IN-list
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
"
);

// Verify result correctness: left join preserves all build (left) rows.
// All build rows match probe rows here, so we get 2 matched rows.
// The dynamic filter pruned unmatched probe rows (ac, ad) at scan time,
// which is safe because those probe rows would never match any build row.
let result = format!("{}", pretty_format_batches(&batches).unwrap());
insta::assert_snapshot!(
result,
@r"
+----+----+-----+----+----+-----+
| a | b | c | a | b | e |
+----+----+-----+----+----+-----+
| aa | ba | 1.0 | aa | ba | 1.0 |
| ab | bb | 2.0 | ab | bb | 2.0 |
+----+----+-----+----+----+-----+
"
);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create build side with limited values
let build_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
("c", Float64, [1.0, 2.0])
)
.unwrap(),
];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values (some won't match)
let probe_batches = vec![
record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("b", Utf8, ["ba", "bb", "bc", "bd"]),
("e", Float64, [1.0, 2.0, 3.0, 4.0])
)
.unwrap(),
];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("e", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create HashJoinExec with LeftSemi join and CollectLeft mode
let on = vec![
(
col("a", &build_side_schema).unwrap(),
col("a", &probe_side_schema).unwrap(),
),
(
col("b", &build_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
),
];
let plan = Arc::new(
HashJoinExec::try_new(
build_scan,
Arc::clone(&probe_scan),
on,
None,
&JoinType::LeftSemi,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// Expect the dynamic filter predicate to be pushed down into the probe side DataSource
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
",
);

// Actually apply the optimization and execute the plan
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();

// Test that dynamic filter linking survives with_new_children
let children = plan.children().into_iter().map(Arc::clone).collect();
let plan = plan.with_new_children(children).unwrap();

let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
.await
.unwrap();

// After execution, verify the dynamic filter was populated with bounds and IN-list
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
"
);

// Verify result correctness: left semi join returns only build (left) rows
// that have at least one matching probe row. Output schema is build-side columns only.
let result = format!("{}", pretty_format_batches(&batches).unwrap());
insta::assert_snapshot!(
result,
@r"
+----+----+-----+
| a | b | c |
+----+----+-----+
| aa | ba | 1.0 |
| ab | bb | 2.0 |
+----+----+-----+
"
);
}
22 changes: 2 additions & 20 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,27 +176,9 @@ pub(crate) fn lr_is_preserved(join_type: JoinType) -> (bool, bool) {
}
}

/// For a given JOIN type, determine whether each input of the join is preserved
/// for the join condition (`ON` clause filters).
///
/// It is only correct to push filters below a join for preserved inputs.
///
/// # Return Value
/// A tuple of booleans - (left_preserved, right_preserved).
///
/// See [`lr_is_preserved`] for a definition of "preserved".
/// See [`JoinType::on_lr_is_preserved`] for details.
pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) {
match join_type {
JoinType::Inner => (true, true),
JoinType::Left => (false, true),
JoinType::Right => (true, false),
JoinType::Full => (false, false),
JoinType::LeftSemi | JoinType::RightSemi => (true, true),
JoinType::LeftAnti => (false, true),
JoinType::RightAnti => (true, false),
JoinType::LeftMark => (false, true),
JoinType::RightMark => (true, false),
}
join_type.on_lr_is_preserved()
}

/// Evaluates the columns referenced in the given expression to see if they refer
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,8 @@ impl HashJoinExec {
}

fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
if self.join_type != JoinType::Inner
|| !config.optimizer.enable_join_dynamic_filter_pushdown
{
let (_, probe_preserved) = self.join_type.on_lr_is_preserved();
if !probe_preserved || !config.optimizer.enable_join_dynamic_filter_pushdown {
return false;
}

Expand Down
Loading
Loading