Skip to content

Commit 2bd01af

Browse files
authored
feat: Support Spark expression: arrays_zip (#3643)
* Define ArraysZip expr proto * Create ArraysZip SerDe * Register SerDe to arrayExpressions * Add SQL test * Register expression to planner * Rust wrapper around DF's arrays_zip * Null checks * Update supported Spark expressions doc
1 parent bb79752 commit 2bd01af

9 files changed

Lines changed: 661 additions & 5 deletions

File tree

docs/spark_expressions_support.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
- [x] array_repeat
9999
- [x] array_union
100100
- [x] arrays_overlap
101-
- [ ] arrays_zip
101+
- [x] arrays_zip
102102
- [x] element_at
103103
- [ ] flatten
104104
- [x] get

native/core/src/execution/planner.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ use datafusion::{
6868
};
6969
use datafusion_comet_spark_expr::{
7070
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
71-
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SumInteger, ToCsv,
71+
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SparkArraysZipFunc,
72+
SumInteger, ToCsv,
7273
};
7374
use datafusion_spark::function::aggregate::collect::SparkCollectSet;
7475
use iceberg::expr::Bind;
@@ -94,7 +95,6 @@ use datafusion::physical_expr::window::WindowExpr;
9495
use datafusion::physical_expr::LexOrdering;
9596

9697
use crate::parquet::parquet_exec::init_datasource_exec;
97-
9898
use arrow::array::{
9999
new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array,
100100
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
@@ -683,6 +683,24 @@ impl PhysicalPlanner {
683683
csv_write_options,
684684
)))
685685
}
686+
ExprStruct::ArraysZip(expr) => {
687+
if expr.values.is_empty() {
688+
return Err(GeneralError(
689+
"arrays_zip requires at least one argument".to_string(),
690+
));
691+
}
692+
693+
let children = expr
694+
.values
695+
.iter()
696+
.map(|child| self.create_expr(child, Arc::clone(&input_schema)))
697+
.collect::<Result<Vec<_>, _>>()?;
698+
699+
Ok(Arc::new(SparkArraysZipFunc::new(
700+
children,
701+
expr.names.clone(),
702+
)))
703+
}
686704
expr => Err(GeneralError(format!("Not implemented: {expr:?}"))),
687705
}
688706
}

native/core/src/execution/planner/expression_registry.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ pub enum ExpressionType {
103103
Randn,
104104
SparkPartitionId,
105105
MonotonicallyIncreasingId,
106+
ArraysZip,
106107

107108
// Time functions
108109
Hour,
@@ -381,6 +382,7 @@ impl ExpressionRegistry {
381382
Some(ExprStruct::MonotonicallyIncreasingId(_)) => {
382383
Ok(ExpressionType::MonotonicallyIncreasingId)
383384
}
385+
Some(ExprStruct::ArraysZip(_)) => Ok(ExpressionType::ArraysZip),
384386

385387
Some(ExprStruct::Hour(_)) => Ok(ExpressionType::Hour),
386388
Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute),

native/proto/src/proto/expr.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ message Expr {
8989
FromJson from_json = 66;
9090
ToCsv to_csv = 67;
9191
HoursTransform hours_transform = 68;
92+
ArraysZip arrays_zip = 69;
9293
}
9394

9495
// Optional QueryContext for error reporting (contains SQL text and position)
@@ -495,3 +496,10 @@ message ArrayJoin {
495496
message Rand {
496497
int64 seed = 1;
497498
}
499+
500+
// Spark's ArraysZip takes children: Seq[Expression] and names: Seq[Expression]
501+
// https://github.com/apache/spark/blob/branch-4.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L296
502+
message ArraysZip {
503+
repeated Expr values = 1;
504+
repeated string names = 2;
505+
}

0 commit comments

Comments
 (0)