Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ impl HashJoinStream {
filter,
JoinSide::Left,
None,
self.join_type,
)?
} else {
(left_indices, right_indices)
Expand Down Expand Up @@ -781,6 +782,7 @@ impl HashJoinStream {
&right_indices,
&self.column_indices,
join_side,
self.join_type,
)?;

let push_status = self.output_buffer.push_batch(batch)?;
Expand Down Expand Up @@ -899,6 +901,7 @@ impl HashJoinStream {
&right_side,
&self.column_indices,
JoinSide::Left,
self.join_type,
)?;
let push_status = self.output_buffer.push_batch(batch)?;

Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,7 @@ pub(crate) fn build_side_determined_results(
&probe_indices,
column_indices,
build_hash_joiner.build_side,
join_type,
)
.map(|batch| (batch.num_rows() > 0).then_some(batch))
} else {
Expand Down Expand Up @@ -1022,6 +1023,7 @@ pub(crate) fn join_with_probe_batch(
filter,
build_hash_joiner.build_side,
None,
join_type,
)?
} else {
(build_indices, probe_indices)
Expand Down Expand Up @@ -1060,6 +1062,7 @@ pub(crate) fn join_with_probe_batch(
&probe_indices,
column_indices,
build_hash_joiner.build_side,
join_type,
)
.map(|batch| (batch.num_rows() > 0).then_some(batch))
}
Expand Down
15 changes: 14 additions & 1 deletion datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,7 @@ pub(crate) fn get_final_indices_from_bit_map(
(left_indices, right_indices)
}

#[expect(clippy::too_many_arguments)]
pub(crate) fn apply_join_filter_to_indices(
build_input_buffer: &RecordBatch,
probe_batch: &RecordBatch,
Expand All @@ -918,6 +919,7 @@ pub(crate) fn apply_join_filter_to_indices(
filter: &JoinFilter,
build_side: JoinSide,
max_intermediate_size: Option<usize>,
join_type: JoinType,
) -> Result<(UInt64Array, UInt32Array)> {
if build_indices.is_empty() && probe_indices.is_empty() {
return Ok((build_indices, probe_indices));
Expand All @@ -938,6 +940,7 @@ pub(crate) fn apply_join_filter_to_indices(
&probe_indices.slice(i, len),
filter.column_indices(),
build_side,
join_type,
)?;
let filter_result = filter
.expression()
Expand All @@ -959,6 +962,7 @@ pub(crate) fn apply_join_filter_to_indices(
&probe_indices,
filter.column_indices(),
build_side,
join_type,
)?;

filter
Expand Down Expand Up @@ -990,6 +994,7 @@ fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result<RecordBat

/// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`.
/// The resulting batch has [Schema] `schema`.
#[expect(clippy::too_many_arguments)]
pub(crate) fn build_batch_from_indices(
schema: &Schema,
build_input_buffer: &RecordBatch,
Expand All @@ -998,9 +1003,17 @@ pub(crate) fn build_batch_from_indices(
probe_indices: &UInt32Array,
column_indices: &[ColumnIndex],
build_side: JoinSide,
join_type: JoinType,
) -> Result<RecordBatch> {
if schema.fields().is_empty() {
return new_empty_schema_batch(schema, build_indices.len());
// For RightAnti and RightSemi joins, after `adjust_indices_by_join_type`
// the build_indices were untouched so only probe_indices hold the actual
// row count.
let row_count = match join_type {
JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should RightMark be in the list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it becomes the same number of indices for both left + right indices. Refer to here

_ => build_indices.len(),
};
return new_empty_schema_batch(schema, row_count);
}

// build the columns of the new [RecordBatch]:
Expand Down
43 changes: 43 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5318,3 +5318,46 @@ DROP TABLE issue_20437_small;

statement count 0
DROP TABLE issue_20437_large;

# Test count(*) with right semi/anti joins returns correct row counts
# issue: https://github.com/apache/datafusion/issues/20669

statement ok
CREATE TABLE t1 (k INT, v INT);

statement ok
CREATE TABLE t2 (k INT, v INT);

statement ok
INSERT INTO t1 SELECT i AS k, i AS v FROM generate_series(1, 100) t(i);

statement ok
INSERT INTO t2 VALUES (1, 1);

query I
WITH t AS (
SELECT *
FROM t1
LEFT ANTI JOIN t2 ON t1.k = t2.k
)
SELECT count(*)
FROM t;
----
99

query I
WITH t AS (
SELECT *
FROM t1
LEFT SEMI JOIN t2 ON t1.k = t2.k
)
SELECT count(*)
FROM t;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checked tests with duckDB, everything is fine

----
1

statement count 0
DROP TABLE t1;

statement count 0
DROP TABLE t2;
Loading