Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,8 +1042,9 @@ pub fn project(
Expr::Wildcard => {
projected_expr.extend(expand_wildcard(input_schema, &plan)?)
}
Expr::QualifiedWildcard { ref qualifier } => projected_expr
.extend(expand_qualified_wildcard(qualifier, input_schema, &plan)?),
Expr::QualifiedWildcard { ref qualifier } => {
projected_expr.extend(expand_qualified_wildcard(qualifier, input_schema)?)
}
_ => projected_expr
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
}
Expand Down
24 changes: 19 additions & 5 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,23 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
let using_columns = plan.using_columns()?;
let columns_to_skip = using_columns
.into_iter()
// For each USING JOIN condition, only expand to one column in projection
// For each USING JOIN condition, only expand to one of each join column in projection
.flat_map(|cols| {
let mut cols = cols.into_iter().collect::<Vec<_>>();
// sort join columns to make sure we consistently keep the same
// qualified column
cols.sort();
cols.into_iter().skip(1)
let mut out_column_names: HashSet<String> = HashSet::new();
cols.into_iter()
.filter_map(|c| {
if out_column_names.contains(&c.name) {
Some(c)
} else {
out_column_names.insert(c.name);
None
}
})
.collect::<Vec<_>>()
Comment on lines +159 to +169
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main fix is here, since instead of only skipping the first column (which is based on assumption of using join with only one column), actually keep track of which columns to skip, allowing only one set of the join columns to be output

})
.collect::<HashSet<_>>();

Expand Down Expand Up @@ -186,7 +196,6 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
pub fn expand_qualified_wildcard(
qualifier: &str,
schema: &DFSchema,
plan: &LogicalPlan,
) -> Result<Vec<Expr>> {
let qualified_fields: Vec<DFField> = schema
.fields_with_qualified(qualifier)
Expand All @@ -198,9 +207,14 @@ pub fn expand_qualified_wildcard(
"Invalid qualifier {qualifier}"
)));
}
let qualifier_schema =
let qualified_schema =
DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?;
expand_wildcard(&qualifier_schema, plan)
// if qualified, allow all columns in output (i.e. ignore using column check)
Ok(qualified_schema
.fields()
.iter()
.map(|f| Expr::Column(f.qualified_column()))
.collect::<Vec<Expr>>())
Comment on lines +210 to +217
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an extra fix, as i observed in postgresql if you have the following query:

select a.*, b.*, c.*
from categories a
	join categories b using (category_id)
	join categories c using (category_id)
;

then a.category_id, b.category_id and c.category_id are all included in the output instead of being omitted due to being part of the using join, because they have been specifically qualified in their wildcard, hence shouldn't try to deduplicate those as in the non-qualified wildcard case

}

/// (expr, "is the SortExpr for window (either comes from PARTITION BY or ORDER BY columns)")
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

let qualifier = format!("{object_name}");
// do not expand from outer schema
expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan)
expand_qualified_wildcard(&qualifier, plan.schema().as_ref())
}
}
}
Expand Down
70 changes: 70 additions & 0 deletions datafusion/sql/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,76 @@ fn join_with_ambiguous_column() {
quick_test(sql, expected);
}

#[test]
fn using_join_multiple_keys() {
let sql = "SELECT * FROM person a join person b using (id, age)";
let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
b.first_name, b.last_name, b.state, b.salary, b.birth_date, b.😀\
\n Inner Join: Using a.id = b.id, a.age = b.age\
\n SubqueryAlias: a\
\n TableScan: person\
\n SubqueryAlias: b\
\n TableScan: person";
quick_test(sql, expected);
}

#[test]
fn using_join_multiple_keys_subquery() {
let sql =
"SELECT age FROM (SELECT * FROM person a join person b using (id, age, state))";
let expected = "Projection: a.age\
\n Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
b.first_name, b.last_name, b.salary, b.birth_date, b.😀\
\n Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\
\n SubqueryAlias: a\
\n TableScan: person\
\n SubqueryAlias: b\
\n TableScan: person";
quick_test(sql, expected);
}

#[test]
fn using_join_multiple_keys_qualified_wildcard_select() {
let sql = "SELECT a.* FROM person a join person b using (id, age)";
let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀\
\n Inner Join: Using a.id = b.id, a.age = b.age\
\n SubqueryAlias: a\
\n TableScan: person\
\n SubqueryAlias: b\
\n TableScan: person";
quick_test(sql, expected);
}

#[test]
fn using_join_multiple_keys_select_all_columns() {
let sql = "SELECT a.*, b.* FROM person a join person b using (id, age)";
let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
b.id, b.first_name, b.last_name, b.age, b.state, b.salary, b.birth_date, b.😀\
\n Inner Join: Using a.id = b.id, a.age = b.age\
\n SubqueryAlias: a\
\n TableScan: person\
\n SubqueryAlias: b\
\n TableScan: person";
quick_test(sql, expected);
}

#[test]
fn using_join_multiple_keys_multiple_joins() {
let sql = "SELECT * FROM person a join person b using (id, age, state) join person c using (id, age, state)";
let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
b.first_name, b.last_name, b.salary, b.birth_date, b.😀, \
c.first_name, c.last_name, c.salary, c.birth_date, c.😀\
\n Inner Join: Using a.id = c.id, a.age = c.age, a.state = c.state\
\n Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\
\n SubqueryAlias: a\
\n TableScan: person\
\n SubqueryAlias: b\
\n TableScan: person\
\n SubqueryAlias: c\
\n TableScan: person";
quick_test(sql, expected);
}

#[test]
fn select_with_having() {
let sql = "SELECT id, age
Expand Down