Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ use async_trait::async_trait;
use datafusion_common::{plan_err, DFSchema, ScalarValue};
use datafusion_expr::expr::{
self, AggregateFunction, AggregateUDF, Alias, Between, BinaryExpr, Cast,
GetIndexedField, GroupingSet, InList, Like, ScalarUDF, TryCast, WindowFunction,
GetFieldAccess, GetIndexedField, GroupingSet, InList, Like, ScalarUDF, TryCast,
WindowFunction,
};
use datafusion_expr::expr_rewriter::{unalias, unnormalize_cols};
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
Expand Down Expand Up @@ -181,28 +182,22 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT UNKNOWN"))
}
Expr::GetIndexedField(GetIndexedField {
key,
extra_key,
expr,
}) => {
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
let expr = create_physical_name(expr, false)?;
let name = match field {
GetFieldAccess::NamedStructField { name } => format!("{expr}[{name}]"),
GetFieldAccess::ListIndex { key } => {
let key = create_physical_name(key, false)?;
format!("{expr}[{key}]")
}
GetFieldAccess::ListRange { start, stop } => {
let start = create_physical_name(start, false)?;
let stop = create_physical_name(stop, false)?;
format!("{expr}[{start}:{stop}]")
}
};

if let (Some(list_key), Some(extra_key)) = (&key.list_key, extra_key) {
let key = create_physical_name(list_key, false)?;
let extra_key = create_physical_name(extra_key, false)?;
Ok(format!("{expr}[{key}:{extra_key}]"))
} else {
let key = if let Some(list_key) = &key.list_key {
create_physical_name(list_key, false)?
} else if let Some(ScalarValue::Utf8(Some(struct_key))) = &key.struct_key
{
struct_key.to_string()
} else {
String::from("")
};
Ok(format!("{expr}[{key}]"))
}
Ok(name)
}
Expr::ScalarFunction(func) => {
create_function_physical_name(&func.fun.to_string(), false, &func.args)
Expand Down
102 changes: 33 additions & 69 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,31 +359,15 @@ impl ScalarUDF {
}
}

/// Key of `GetIndexedFieldKey`.
/// This structure is needed to separate the responsibilities of the key for `DataType::List` and `DataType::Struct`.
/// If we use index with `DataType::List`, then we use the `list_key` argument with `struct_key` equal to `None`.
/// If we use index with `DataType::Struct`, then we use the `struct_key` argument with `list_key` equal to `None`.
/// `list_key` can be any expression, unlike `struct_key` which can only be `ScalarValue::Utf8`.
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct GetIndexedFieldKey {
/// The key expression for `DataType::List`
pub list_key: Option<Expr>,
/// The key expression for `DataType::Struct`
pub struct_key: Option<ScalarValue>,
}

impl GetIndexedFieldKey {
/// Create a new GetIndexedFieldKey expression
pub fn new(list_key: Option<Expr>, struct_key: Option<ScalarValue>) -> Self {
// value must be either `list_key` or `struct_key`
assert_ne!(list_key.is_some(), struct_key.is_some());
assert_ne!(list_key.is_none(), struct_key.is_none());

Self {
list_key,
struct_key,
}
}
pub enum GetFieldAccess {
/// returns the field `struct[field]`. For example `struct["name"]`
NamedStructField { name: ScalarValue },
/// single list index
// list[i]
ListIndex { key: Box<Expr> },
/// list range `list[i:j]`
ListRange { start: Box<Expr>, stop: Box<Expr> },
}

/// Returns the field of a [`arrow::array::ListArray`] or [`arrow::array::StructArray`] by `key`.
Expand All @@ -393,23 +377,13 @@ pub struct GetIndexedField {
/// The expression to take the field from
pub expr: Box<Expr>,
/// The name of the field to take
pub key: Box<GetIndexedFieldKey>,
/// The right border of the field to take
pub extra_key: Option<Box<Expr>>,
pub field: GetFieldAccess,
}

impl GetIndexedField {
/// Create a new GetIndexedField expression
pub fn new(
expr: Box<Expr>,
key: Box<GetIndexedFieldKey>,
extra_key: Option<Box<Expr>>,
) -> Self {
Self {
expr,
key,
extra_key,
}
pub fn new(expr: Box<Expr>, field: GetFieldAccess) -> Self {
Self { expr, field }
}
}

Expand Down Expand Up @@ -1178,22 +1152,15 @@ impl fmt::Display for Expr {
}
Expr::Wildcard => write!(f, "*"),
Expr::QualifiedWildcard { qualifier } => write!(f, "{qualifier}.*"),
Expr::GetIndexedField(GetIndexedField {
key,
extra_key,
expr,
}) => {
let key = if let Some(list_key) = &key.list_key {
format!("{list_key}")
} else {
format!("{0}", key.struct_key.clone().unwrap())
};
if let Some(extra_key) = extra_key {
write!(f, "({expr})[{key}:{extra_key}]")
} else {
write!(f, "({expr})[{key}]")
Expr::GetIndexedField(GetIndexedField { field, expr }) => match field {
GetFieldAccess::NamedStructField { name } => {
write!(f, "({expr})[{name}]")
}
}
GetFieldAccess::ListIndex { key } => write!(f, "({expr})[{key}]"),
GetFieldAccess::ListRange { start, stop } => {
write!(f, "({expr})[{start}:{stop}]")
}
},
Expr::GroupingSet(grouping_sets) => match grouping_sets {
GroupingSet::Rollup(exprs) => {
// ROLLUP (c0, c1, c2)
Expand Down Expand Up @@ -1382,24 +1349,21 @@ fn create_name(e: &Expr) -> Result<String> {
Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).name().clone())
}
Expr::GetIndexedField(GetIndexedField {
key,
extra_key,
expr,
}) => {
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
let expr = create_name(expr)?;
let key = if let Some(list_key) = &key.list_key {
create_name(list_key)?
} else if let Some(ScalarValue::Utf8(Some(struct_key))) = &key.struct_key {
struct_key.to_string()
} else {
String::new()
};
if let Some(extra_key) = extra_key {
let extra_key = create_name(extra_key)?;
Ok(format!("{expr}[{key}:{extra_key}]"))
} else {
Ok(format!("{expr}[{key}]"))
match field {
GetFieldAccess::NamedStructField { name } => {
Ok(format!("{expr}[{name}]"))
}
GetFieldAccess::ListIndex { key } => {
let key = create_name(key)?;
Ok(format!("{expr}[{key}]"))
}
GetFieldAccess::ListRange { start, stop } => {
let start = create_name(start)?;
let stop = create_name(stop)?;
Ok(format!("{expr}[{start}:{stop}]"))
}
}
}
Expr::ScalarFunction(func) => {
Expand Down
76 changes: 42 additions & 34 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

use super::{Between, Expr, Like};
use crate::expr::{
AggregateFunction, AggregateUDF, Alias, BinaryExpr, Cast, GetIndexedField, InList,
InSubquery, Placeholder, ScalarFunction, ScalarUDF, Sort, TryCast, WindowFunction,
AggregateFunction, AggregateUDF, Alias, BinaryExpr, Cast, GetFieldAccess,
GetIndexedField, InList, InSubquery, Placeholder, ScalarFunction, ScalarUDF, Sort,
TryCast, WindowFunction,
};
use crate::field_util::get_indexed_field;
use crate::field_util::{get_indexed_field, GetFieldAccessCharacteristic};
use crate::type_coercion::binary::get_result_type;
use crate::{LogicalPlan, Projection, Subquery};
use arrow::compute::can_cast_types;
Expand Down Expand Up @@ -155,24 +156,27 @@ impl ExprSchemable for Expr {
// grouping sets do not really have a type and do not appear in projections
Ok(DataType::Null)
}
Expr::GetIndexedField(GetIndexedField {
key,
extra_key,
expr,
}) => {
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
let expr_dt = expr.get_type(schema)?;
let key = if let Some(list_key) = &key.list_key {
(Some(list_key.get_type(schema)?), None)
} else {
(None, key.struct_key.clone())
};
let extra_key_dt = if let Some(extra_key) = extra_key {
Some(extra_key.get_type(schema)?)
} else {
None
let field_ch = match field {
GetFieldAccess::NamedStructField { name } => {
GetFieldAccessCharacteristic::NamedStructField {
name: name.clone(),
}
}
GetFieldAccess::ListIndex { key } => {
GetFieldAccessCharacteristic::ListIndex {
key_dt: key.get_type(schema)?,
}
}
GetFieldAccess::ListRange { start, stop } => {
GetFieldAccessCharacteristic::ListRange {
start_dt: start.get_type(schema)?,
stop_dt: stop.get_type(schema)?,
}
}
};
get_indexed_field(&expr_dt, &key, &extra_key_dt)
.map(|x| x.data_type().clone())
get_indexed_field(&expr_dt, &field_ch).map(|x| x.data_type().clone())
}
}
}
Expand Down Expand Up @@ -280,23 +284,27 @@ impl ExprSchemable for Expr {
"QualifiedWildcard expressions are not valid in a logical query plan"
.to_owned(),
)),
Expr::GetIndexedField(GetIndexedField {
key,
extra_key,
expr,
}) => {
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
let expr_dt = expr.get_type(input_schema)?;
let key = if let Some(list_key) = &key.list_key {
(Some(list_key.get_type(input_schema)?), None)
} else {
(None, key.struct_key.clone())
};
let extra_key_dt = if let Some(extra_key) = extra_key {
Some(extra_key.get_type(input_schema)?)
} else {
None
let field_ch = match field {
GetFieldAccess::NamedStructField { name } => {
GetFieldAccessCharacteristic::NamedStructField {
name: name.clone(),
}
}
GetFieldAccess::ListIndex { key } => {
GetFieldAccessCharacteristic::ListIndex {
key_dt: key.get_type(input_schema)?,
}
}
GetFieldAccess::ListRange { start, stop } => {
GetFieldAccessCharacteristic::ListRange {
start_dt: start.get_type(input_schema)?,
stop_dt: stop.get_type(input_schema)?,
}
}
};
get_indexed_field(&expr_dt, &key, &extra_key_dt).map(|x| x.is_nullable())
get_indexed_field(&expr_dt, &field_ch).map(|x| x.is_nullable())
}
Expr::GroupingSet(_) => {
// grouping sets do not really have the concept of nullable and do not appear
Expand Down
72 changes: 47 additions & 25 deletions datafusion/expr/src/field_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@
use arrow::datatypes::{DataType, Field};
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};

pub enum GetFieldAccessCharacteristic {
/// returns the field `struct[field]`. For example `struct["name"]`
NamedStructField { name: ScalarValue },
/// single list index
// list[i]
ListIndex { key_dt: DataType },
/// list range `list[i:j]`
ListRange {
start_dt: DataType,
stop_dt: DataType,
},
}

/// Returns the field access indexed by `key` and/or `extra_key` from a [`DataType::List`] or [`DataType::Struct`]
/// # Error
/// Errors if
Expand All @@ -28,35 +41,44 @@ use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
/// * there is no field key is not of the required index type
pub fn get_indexed_field(
data_type: &DataType,
key: &(Option<DataType>, Option<ScalarValue>),
extra_key: &Option<DataType>,
field_characteristic: &GetFieldAccessCharacteristic,
) -> Result<Field> {
match (data_type, key) {
(DataType::List(lt), (Some(DataType::Int64), None)) => {
match extra_key {
Some(DataType::Int64) => Ok(Field::new("list", data_type.clone(), true)),
None => Ok(Field::new("list", lt.data_type().clone(), true)),
_ => Err(DataFusionError::Plan(
"Only ints are valid as an indexed field in a list".to_string(),
)),
match field_characteristic {
GetFieldAccessCharacteristic::NamedStructField{ name } => {
match (data_type, name) {
(DataType::Struct(fields), ScalarValue::Utf8(Some(s))) => {
if s.is_empty() {
plan_err!(
"Struct based indexed access requires a non empty string"
)
} else {
let field = fields.iter().find(|f| f.name() == s);
field.ok_or(DataFusionError::Plan(format!("Field {s} not found in struct"))).map(|f| f.as_ref().clone())
}
}
(DataType::Struct(_), _) => plan_err!(
"Only utf8 strings are valid as an indexed field in a struct"
),
(other, _) => plan_err!("The expression to get an indexed field is only valid for `List` or `Struct` types, got {other}"),
}
}
GetFieldAccessCharacteristic::ListIndex{ key_dt } => {
match (data_type, key_dt) {
(DataType::List(lt), DataType::Int64) => Ok(Field::new("list", lt.data_type().clone(), true)),
(DataType::List(_), _) => plan_err!(
"Only ints are valid as an indexed field in a list"
),
(other, _) => plan_err!("The expression to get an indexed field is only valid for `List` or `Struct` types, got {other}"),
}
}
(DataType::Struct(fields), (None, Some(ScalarValue::Utf8(Some(s))))) => {
if s.is_empty() {
plan_err!(
"Struct based indexed access requires a non empty string"
)
} else {
let field = fields.iter().find(|f| f.name() == s);
field.ok_or(DataFusionError::Plan(format!("Field {s} not found in struct"))).map(|f| f.as_ref().clone())
GetFieldAccessCharacteristic::ListRange{ start_dt, stop_dt } => {
match (data_type, start_dt, stop_dt) {
(DataType::List(_), DataType::Int64, DataType::Int64) => Ok(Field::new("list", data_type.clone(), true)),
(DataType::List(_), _, _) => plan_err!(
"Only ints are valid as an indexed field in a list"
),
(other, _, _) => plan_err!("The expression to get an indexed field is only valid for `List` or `Struct` types, got {other}"),
}
}
(DataType::Struct(_), _) => plan_err!(
"Only utf8 strings are valid as an indexed field in a struct"
),
(DataType::List(_), _) => plan_err!(
"Only ints are valid as an indexed field in a list"
),
(other, _) => plan_err!("The expression to get an indexed field is only valid for `List` or `Struct` types, got {other}"),
}
}
4 changes: 2 additions & 2 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub use aggregate_function::AggregateFunction;
pub use built_in_function::BuiltinScalarFunction;
pub use columnar_value::ColumnarValue;
pub use expr::{
Between, BinaryExpr, Case, Cast, Expr, GetIndexedField, GetIndexedFieldKey,
GroupingSet, Like, TryCast,
Between, BinaryExpr, Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet,
Like, TryCast,
};
pub use expr_fn::*;
pub use expr_schema::ExprSchemable;
Expand Down
Loading