Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 85 additions & 92 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,57 +579,85 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
)
}

/// Appends or prepends elements to a ListArray.
///
/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag
/// indicating whether to append or prepend the elements. It returns a `Result<ArrayRef>`
/// representing the resulting ListArray after the operation.
///
/// # Arguments
///
/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended.
/// * `element_array` - A reference to the Array containing elements to be appended/prepended.
/// * `field` - A reference to the Field describing the data type of the arrays.
/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements.
///
/// # Examples
///
/// general_append_and_prepend(
/// [1, 2, 3], 4, append => [1, 2, 3, 4]
/// 5, [6, 7, 8], prepend => [5, 6, 7, 8]
/// )
fn general_append_and_prepend(
list_array: &ListArray,
element_array: &ArrayRef,
data_type: &DataType,
is_append: bool,
) -> Result<ArrayRef> {
let mut offsets = vec![0];
let values = list_array.values();
let original_data = values.to_data();
let element_data = element_array.to_data();
let capacity = Capacities::Array(original_data.len() + element_data.len());

let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &element_data],
false,
capacity,
);

let values_index = 0;
let element_index = 1;

for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
let start = offset_window[0] as usize;
let end = offset_window[1] as usize;
if is_append {
mutable.extend(values_index, start, end);
mutable.extend(element_index, row_index, row_index + 1);
} else {
mutable.extend(element_index, row_index, row_index + 1);
mutable.extend(values_index, start, end);
}
offsets.push(offsets[row_index] + (end - start + 1) as i32);
}

let data = mutable.freeze();

Ok(Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
arrow_array::make_array(data),
None,
)?))
}

/// Array_append SQL function
pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
let arr = as_list_array(&args[0])?;
let element = &args[1];
let list_array = as_list_array(&args[0])?;
let element_array = &args[1];

check_datatypes("array_append", &[arr.values(), element])?;
let res = match arr.value_type() {
check_datatypes("array_append", &[list_array.values(), element_array])?;
let res = match list_array.value_type() {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
DataType::Null => return make_array(&[element_array.to_owned()]),
data_type => {
let mut new_values = vec![];
let mut offsets = vec![0];

let elem_data = element.to_data();
for (row_index, arr) in arr.iter().enumerate() {
let new_array = if let Some(arr) = arr {
let original_data = arr.to_data();
let capacity = Capacities::Array(original_data.len() + 1);
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &elem_data],
false,
capacity,
);
mutable.extend(0, 0, original_data.len());
mutable.extend(1, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
} else {
let capacity = Capacities::Array(1);
let mut mutable = MutableArrayData::with_capacities(
vec![&elem_data],
false,
capacity,
);
mutable.extend(0, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
};
offsets.push(offsets[row_index] + new_array.len() as i32);
new_values.push(new_array);
}

let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
let values = arrow::compute::concat(&new_values)?;

Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
values,
None,
)?)
return general_append_and_prepend(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

giphy

list_array,
element_array,
&data_type,
true,
);
}
};

Expand All @@ -638,55 +666,20 @@ pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {

/// Array_prepend SQL function
pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
let element = &args[0];
let arr = as_list_array(&args[1])?;
let list_array = as_list_array(&args[1])?;
let element_array = &args[0];

check_datatypes("array_prepend", &[element, arr.values()])?;
let res = match arr.value_type() {
check_datatypes("array_prepend", &[element_array, list_array.values()])?;
let res = match list_array.value_type() {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
DataType::Null => return make_array(&[element_array.to_owned()]),
data_type => {
let mut new_values = vec![];
let mut offsets = vec![0];

let elem_data = element.to_data();
for (row_index, arr) in arr.iter().enumerate() {
let new_array = if let Some(arr) = arr {
let original_data = arr.to_data();
let capacity = Capacities::Array(original_data.len() + 1);
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &elem_data],
false,
capacity,
);
mutable.extend(1, row_index, row_index + 1);
mutable.extend(0, 0, original_data.len());
let data = mutable.freeze();
arrow_array::make_array(data)
} else {
let capacity = Capacities::Array(1);
let mut mutable = MutableArrayData::with_capacities(
vec![&elem_data],
false,
capacity,
);
mutable.extend(0, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
};
offsets.push(offsets[row_index] + new_array.len() as i32);
new_values.push(new_array);
}

let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
let values = arrow::compute::concat(&new_values)?;

Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
values,
None,
)?)
return general_append_and_prepend(
list_array,
element_array,
&data_type,
false,
);
}
};

Expand Down