Skip to content

Commit 779c251

Browse files
chore: enable array_union (#4043)
## Which issue does this PR close? Closes #3644 ## Rationale for this change DF 53 fixed the behavior ## What changes are included in this PR? ## How are these changes tested?
1 parent b358012 commit 779c251

5 files changed

Lines changed: 190 additions & 54 deletions

File tree

docs/source/user-guide/latest/compatibility.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on
6666

6767
### Array Expressions
6868

69-
- **ArrayUnion**: Sorts input arrays before performing the union, while Spark preserves the order of the first array
70-
and appends unique elements from the second.
71-
[#3644](https://github.com/apache/datafusion-comet/issues/3644)
7269
- **SortArray**: Nested arrays with `Struct` or `Null` child values are not supported natively and will fall back to Spark.
7370

7471
### Date/Time Expressions

docs/source/user-guide/latest/expressions.md

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -237,27 +237,27 @@ Comet supports using the following aggregate functions within window contexts wi
237237

238238
## Array Expressions
239239

240-
| Expression | Spark-Compatible? | Compatibility Notes |
241-
| -------------- | ----------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
242-
| ArrayAppend | Yes | |
243-
| ArrayCompact | No | |
244-
| ArrayContains | Yes | |
245-
| ArrayDistinct | Yes | |
246-
| ArrayExcept | No | |
247-
| ArrayFilter | Yes | Only supports case where function is `IsNotNull` |
248-
| ArrayInsert | No | |
249-
| ArrayIntersect | No | |
250-
| ArrayJoin | No | |
251-
| ArrayMax | Yes | |
252-
| ArrayMin | Yes | |
253-
| ArrayRemove | Yes | |
254-
| ArrayRepeat | No | |
255-
| ArrayUnion | No | Behaves differently than spark. Comet sorts the input arrays before performing the union, while Spark preserves the order of the first array and appends unique elements from the second. |
256-
| ArraysOverlap | Yes | |
257-
| CreateArray | Yes | |
258-
| ElementAt | Yes | Input must be an array. Map inputs are not supported. |
259-
| Flatten | Yes | |
260-
| GetArrayItem | Yes | |
240+
| Expression | Spark-Compatible? | Compatibility Notes |
241+
| -------------- | ----------------- | ----------------------------------------------------- |
242+
| ArrayAppend | Yes | |
243+
| ArrayCompact | No | |
244+
| ArrayContains | Yes | |
245+
| ArrayDistinct | Yes | |
246+
| ArrayExcept | No | |
247+
| ArrayFilter | Yes | Only supports case where function is `IsNotNull` |
248+
| ArrayInsert | No | |
249+
| ArrayIntersect | No | |
250+
| ArrayJoin | No | |
251+
| ArrayMax | Yes | |
252+
| ArrayMin | Yes | |
253+
| ArrayRemove | Yes | |
254+
| ArrayRepeat | No | |
255+
| ArrayUnion | Yes | |
256+
| ArraysOverlap | Yes | |
257+
| CreateArray | Yes | |
258+
| ElementAt | Yes | Input must be an array. Map inputs are not supported. |
259+
| Flatten | Yes | |
260+
| GetArrayItem | Yes | |
261261

262262
## Map Expressions
263263

spark/src/main/scala/org/apache/comet/serde/arrays.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,6 @@ object CometArrayRepeat extends CometExpressionSerde[ArrayRepeat] {
286286
}
287287

288288
object CometArrayCompact extends CometExpressionSerde[Expression] {
289-
290-
override def getSupportLevel(expr: Expression): SupportLevel = Compatible()
291-
292289
override def convert(
293290
expr: Expression,
294291
inputs: Seq[Attribute],
@@ -426,13 +423,6 @@ object CometArrayInsert extends CometExpressionSerde[ArrayInsert] {
426423
}
427424

428425
object CometArrayUnion extends CometExpressionSerde[ArrayUnion] {
429-
430-
override def getSupportLevel(expr: ArrayUnion): SupportLevel =
431-
Incompatible(
432-
Some(
433-
"Correctness issue" +
434-
" (https://github.com/apache/datafusion-comet/issues/3644)"))
435-
436426
override def convert(
437427
expr: ArrayUnion,
438428
inputs: Seq[Attribute],

spark/src/test/resources/sql-tests/expressions/array/array_union.sql

Lines changed: 155 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,17 @@
1515
-- specific language governing permissions and limitations
1616
-- under the License.
1717

18-
-- Config: spark.comet.expression.ArrayUnion.allowIncompatible=true
19-
2018
statement
2119
CREATE TABLE test_array_union(a array<int>, b array<int>) USING parquet
2220

2321
statement
2422
INSERT INTO test_array_union VALUES (array(1, 2, 3), array(3, 4, 5)), (array(1, 2), array()), (array(), array(1)), (NULL, array(1)), (array(1, NULL), array(NULL, 2))
2523

26-
query ignore(https://github.com/apache/datafusion-comet/issues/3644)
24+
query
2725
SELECT array_union(a, b) FROM test_array_union
2826

2927
-- column + literal
30-
query ignore(https://github.com/apache/datafusion-comet/issues/3644)
28+
query
3129
SELECT array_union(a, array(3, 4, 5)) FROM test_array_union
3230

3331
-- literal + column
@@ -37,3 +35,156 @@ SELECT array_union(array(1, 2, 3), b) FROM test_array_union
3735
-- literal + literal
3836
query
3937
SELECT array_union(array(1, 2, 3), array(3, 4, 5)), array_union(array(1, 2), array()), array_union(array(), array(1)), array_union(cast(NULL as array<int>), array(1))
38+
39+
-- NULL element deduplication (NULLs treated as values, kept once in result)
40+
statement
41+
CREATE TABLE test_union_nulls(a array<int>, b array<int>) USING parquet
42+
43+
statement
44+
INSERT INTO test_union_nulls VALUES (array(1, NULL, 3), array(4, NULL, 5)), (array(NULL), array(NULL)), (array(NULL, NULL), array(NULL, NULL)), (array(1, NULL), array(2, NULL)), (array(NULL, 2), array(1, NULL)), (array(1, NULL, 3), array(1, 5))
45+
46+
query
47+
SELECT a, b, array_union(a, b) FROM test_union_nulls
48+
49+
-- empty array combinations
50+
query
51+
SELECT array_union(array(), array()) FROM test_union_nulls
52+
53+
query
54+
SELECT array_union(array(), array(1, 2)) FROM test_union_nulls
55+
56+
query
57+
SELECT array_union(array(1, 2), array()) FROM test_union_nulls
58+
59+
query
60+
SELECT array_union(array(), array(NULL)) FROM test_union_nulls
61+
62+
-- both-NULL arrays
63+
query
64+
SELECT array_union(cast(NULL as array<int>), cast(NULL as array<int>)) FROM test_union_nulls
65+
66+
-- self-union (deduplication)
67+
query
68+
SELECT a, array_union(a, a) FROM test_union_nulls
69+
70+
-- duplicate elements within and across arrays
71+
statement
72+
CREATE TABLE test_union_dups(a array<int>, b array<int>) USING parquet
73+
74+
statement
75+
INSERT INTO test_union_dups VALUES (array(1, 1, 1), array(2, 2, 2)), (array(1, 1, 1), array(1, 2, 2)), (array(1, 2, 1, 2), array(3, 4, 3, 4)), (array(1, 2, 1, 2), array(2, 3, 2, 3)), (array(1, 2, 3), array(1, 2, 3))
76+
77+
query
78+
SELECT a, b, array_union(a, b) FROM test_union_dups
79+
80+
-- single element arrays
81+
query
82+
SELECT array_union(array(1), array(1)) FROM test_union_dups
83+
84+
query
85+
SELECT array_union(array(1), array(2)) FROM test_union_dups
86+
87+
-- string arrays
88+
statement
89+
CREATE TABLE test_union_str(a array<string>, b array<string>) USING parquet
90+
91+
statement
92+
INSERT INTO test_union_str VALUES (array('a', 'b', 'c'), array('c', 'd')), (array('a', 'b'), array('c', 'd')), (array('a', NULL), array('b', NULL)), (array('a', NULL), array('a', 'b')), (NULL, array('a')), (array(''), array('')), (array('', NULL), array('x'))
93+
94+
query
95+
SELECT a, b, array_union(a, b) FROM test_union_str
96+
97+
-- empty string handling
98+
query
99+
SELECT array_union(array('', 'a'), array('', 'b')) FROM test_union_str
100+
101+
-- double arrays with special values
102+
statement
103+
CREATE TABLE test_union_dbl(a array<double>, b array<double>) USING parquet
104+
105+
statement
106+
INSERT INTO test_union_dbl VALUES (array(1.0, 2.0), array(2.0, 3.0)), (array(1.0, double('NaN')), array(double('NaN'), 2.0)), (array(double('Infinity'), 1.0), array(double('Infinity'))), (array(double('-Infinity')), array(double('Infinity'))), (array(0.0), array(-0.0)), (array(1.0, NULL), array(2.0, NULL))
107+
108+
query
109+
SELECT a, b, array_union(a, b) FROM test_union_dbl
110+
111+
-- boolean arrays
112+
query
113+
SELECT array_union(array(true, false), array(false)) FROM test_union_dbl
114+
115+
query
116+
SELECT array_union(array(true), array(false)) FROM test_union_dbl
117+
118+
query
119+
SELECT array_union(array(true, NULL), array(false, NULL)) FROM test_union_dbl
120+
121+
-- bigint arrays
122+
statement
123+
CREATE TABLE test_union_long(a array<bigint>, b array<bigint>) USING parquet
124+
125+
statement
126+
INSERT INTO test_union_long VALUES (array(9223372036854775807, 1), array(9223372036854775807)), (array(-9223372036854775808), array(-9223372036854775808)), (array(0), array(1))
127+
128+
query
129+
SELECT a, b, array_union(a, b) FROM test_union_long
130+
131+
-- decimal arrays
132+
statement
133+
CREATE TABLE test_union_dec(a array<decimal(10,2)>, b array<decimal(10,2)>) USING parquet
134+
135+
statement
136+
INSERT INTO test_union_dec VALUES (array(1.00, 2.50), array(2.50, 3.00)), (array(1.00, 2.00), array(3.00, 4.00)), (array(1.10, NULL), array(2.20, NULL))
137+
138+
query
139+
SELECT a, b, array_union(a, b) FROM test_union_dec
140+
141+
-- date arrays
142+
statement
143+
CREATE TABLE test_union_date(a array<date>, b array<date>) USING parquet
144+
145+
statement
146+
INSERT INTO test_union_date VALUES (array(date '2024-01-01', date '2024-06-15'), array(date '2024-06-15', date '2024-12-31')), (array(date '2024-01-01'), array(date '2024-12-31')), (array(date '2024-01-01', NULL), array(date '2024-12-31'))
147+
148+
query
149+
SELECT a, b, array_union(a, b) FROM test_union_date
150+
151+
-- timestamp arrays
152+
statement
153+
CREATE TABLE test_union_ts(a array<timestamp>, b array<timestamp>) USING parquet
154+
155+
statement
156+
INSERT INTO test_union_ts VALUES (array(timestamp '2024-01-01 00:00:00', timestamp '2024-06-15 12:00:00'), array(timestamp '2024-06-15 12:00:00')), (array(timestamp '2024-01-01 00:00:00'), array(timestamp '2024-12-31 23:59:59'))
157+
158+
query
159+
SELECT a, b, array_union(a, b) FROM test_union_ts
160+
161+
-- nested arrays
162+
statement
163+
CREATE TABLE test_union_nested(a array<array<int>>, b array<array<int>>) USING parquet
164+
165+
statement
166+
INSERT INTO test_union_nested VALUES (array(array(1, 2), array(3, 4)), array(array(3, 4), array(5, 6))), (array(array(1, 2)), array(array(3, 4))), (array(array(1, 2), cast(NULL as array<int>)), array(array(3, 4), cast(NULL as array<int>))), (array(array(1, NULL)), array(array(1, NULL)))
167+
168+
query
169+
SELECT a, b, array_union(a, b) FROM test_union_nested
170+
171+
-- struct element arrays
172+
statement
173+
CREATE TABLE test_union_struct(a array<struct<x:int, y:int>>, b array<struct<x:int, y:int>>) USING parquet
174+
175+
statement
176+
INSERT INTO test_union_struct VALUES (array(named_struct('x', 1, 'y', 2)), array(named_struct('x', 1, 'y', 2))), (array(named_struct('x', 1, 'y', 2)), array(named_struct('x', 3, 'y', 4))), (array(named_struct('x', 1, 'y', cast(NULL as int))), array(named_struct('x', 1, 'y', cast(NULL as int)))), (array(cast(NULL as struct<x:int, y:int>)), array(cast(NULL as struct<x:int, y:int>)))
177+
178+
query
179+
SELECT a, b, array_union(a, b) FROM test_union_struct
180+
181+
-- mixed column and literal with NULL elements
182+
query
183+
SELECT array_union(a, array(99, NULL)) FROM test_array_union
184+
185+
query
186+
SELECT array_union(array(NULL, 99), b) FROM test_array_union
187+
188+
-- conditional (CASE WHEN) arrays
189+
query
190+
SELECT array_union(CASE WHEN a IS NOT NULL THEN a ELSE array(0) END, b) FROM test_array_union

spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.util.Random
2323

2424
import org.apache.hadoop.fs.Path
2525
import org.apache.spark.sql.CometTestBase
26-
import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArrayUnion}
26+
import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat}
2727
import org.apache.spark.sql.catalyst.expressions.{ArrayContains, ArrayRemove}
2828
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2929
import org.apache.spark.sql.functions._
@@ -426,21 +426,19 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
426426
}
427427

428428
test("array_union") {
429-
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayUnion]) -> "true") {
430-
Seq(true, false).foreach { dictionaryEnabled =>
431-
withTempDir { dir =>
432-
withTempView("t1") {
433-
val path = new Path(dir.toURI.toString, "test.parquet")
434-
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000)
435-
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
436-
checkSparkAnswerAndOperator(
437-
spark.sql("SELECT array_union(array(_2, _3, _4), array(_3, _4)) FROM t1"))
438-
checkSparkAnswerAndOperator(sql("SELECT array_union(array(_18), array(_19)) from t1"))
439-
checkSparkAnswerAndOperator(spark.sql(
440-
"SELECT array_union(array(CAST(NULL AS INT), _2, _3, _4), array(CAST(NULL AS INT), _2, _3)) FROM t1"))
441-
checkSparkAnswerAndOperator(spark.sql(
442-
"SELECT array_union(array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _3, _4), array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _3)) FROM t1"))
443-
}
429+
Seq(true, false).foreach { dictionaryEnabled =>
430+
withTempDir { dir =>
431+
withTempView("t1") {
432+
val path = new Path(dir.toURI.toString, "test.parquet")
433+
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000)
434+
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
435+
checkSparkAnswerAndOperator(
436+
spark.sql("SELECT array_union(array(_2, _3, _4), array(_3, _4)) FROM t1"))
437+
checkSparkAnswerAndOperator(sql("SELECT array_union(array(_18), array(_19)) from t1"))
438+
checkSparkAnswerAndOperator(spark.sql(
439+
"SELECT array_union(array(CAST(NULL AS INT), _2, _3, _4), array(CAST(NULL AS INT), _2, _3)) FROM t1"))
440+
checkSparkAnswerAndOperator(spark.sql(
441+
"SELECT array_union(array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _3, _4), array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _3)) FROM t1"))
444442
}
445443
}
446444
}

0 commit comments

Comments
 (0)