Skip to content

Commit 9b2f1b1

Browse files
viiryaclaude
andauthored
feat: support LEAD and LAG window functions with IGNORE NULLS (#3876)
* feat: support LAG window function with IGNORE NULLS - Add ignore_nulls field to WindowExpr proto message - Serialize Lag window function with its ignoreNulls flag in CometWindowExec - Extend find_df_window_function to also look up WindowUDFs (not just AggregateUDFs) - Pass ignore_nulls to DataFusion's create_window_expr - Enable previously-ignored LAG tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: make LAG window tests deterministic by adding secondary sort key ORDER BY b alone has ties, causing Spark and DataFusion to produce different but both-valid row orderings. Add c as a secondary sort key so tie-breaking is deterministic and results are comparable. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: enable WindowExec allowIncompatible for LAG tests CometWindowExec is marked Incompatible by default. Add allowIncompatible=true config so LAG tests actually run via Comet and checkSparkAnswerAndOperator can verify native execution. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: skip partition/order validation for offset window functions LAG/LEAD (FrameLessOffsetWindowFunction) support arbitrary partition and order specs. The existing validatePartitionAndSortSpecsForWindowFunc check (which requires partition columns == order columns) is only needed for aggregate window functions, not offset functions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat: add LEAD support and IGNORE NULLS test for LAG window function - Handle Lead case in windowExprToProto alongside Lag - Add comment explaining hasOnlyOffsetFunctions guard - Add test for LAG IGNORE NULLS - Enable LEAD tests with allowIncompatible config and deterministic ORDER BY Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test: add LEAD with IGNORE NULLS test case Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test: add SQL tests for LAG/LEAD window functions with IGNORE NULLS Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b6a07e0 commit 9b2f1b1

File tree

5 files changed

+444
-77
lines changed

5 files changed

+444
-77
lines changed

native/core/src/execution/planner.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2452,7 +2452,7 @@ impl PhysicalPlanner {
24522452
sort_phy_exprs,
24532453
window_frame.into(),
24542454
input_schema,
2455-
false, // TODO: Ignore nulls
2455+
spark_expr.ignore_nulls,
24562456
false, // TODO: Spark does not support DISTINCT ... OVER
24572457
None,
24582458
)
@@ -2506,6 +2506,12 @@ impl PhysicalPlanner {
25062506
.udaf(name)
25072507
.map(WindowFunctionDefinition::AggregateUDF)
25082508
.ok()
2509+
.or_else(|| {
2510+
registry
2511+
.udwf(name)
2512+
.map(WindowFunctionDefinition::WindowUDF)
2513+
.ok()
2514+
})
25092515
}
25102516

25112517
/// Create a DataFusion physical partitioning from Spark physical partitioning

native/proto/src/proto/operator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ message WindowExpr {
369369
spark.spark_expression.Expr built_in_window_function = 1;
370370
spark.spark_expression.AggExpr agg_func = 2;
371371
WindowSpecDefinition spec = 3;
372+
bool ignore_nulls = 4;
372373
}
373374

374375
enum WindowFrameType {

spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.spark.sql.comet
2121

2222
import scala.jdk.CollectionConverters._
2323

24-
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CurrentRow, Expression, NamedExpression, RangeFrame, RowFrame, SortOrder, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, WindowExpression}
24+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CurrentRow, Expression, FrameLessOffsetWindowFunction, Lag, Lead, NamedExpression, RangeFrame, RowFrame, SortOrder, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, WindowExpression}
2525
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Max, Min, Sum}
2626
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
2727
import org.apache.spark.sql.execution.SparkPlan
@@ -36,7 +36,7 @@ import org.apache.comet.{CometConf, ConfigEntry}
3636
import org.apache.comet.CometSparkSessionExtensions.withInfo
3737
import org.apache.comet.serde.{AggSerde, CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel}
3838
import org.apache.comet.serde.OperatorOuterClass.Operator
39-
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto}
39+
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, scalarFunctionExprToProto}
4040

4141
object CometWindowExec extends CometOperatorSerde[WindowExec] {
4242

@@ -72,7 +72,12 @@ object CometWindowExec extends CometOperatorSerde[WindowExec] {
7272
return None
7373
}
7474

75-
if (op.partitionSpec.nonEmpty && op.orderSpec.nonEmpty &&
75+
// Offset window functions (LAG, LEAD) support arbitrary partition and order specs, so skip
76+
// the validatePartitionAndSortSpecsForWindowFunc check which requires partition columns to
77+
// equal order columns. That stricter check is only needed for aggregate window functions.
78+
val hasOnlyOffsetFunctions = winExprs.nonEmpty &&
79+
winExprs.forall(e => e.windowFunction.isInstanceOf[FrameLessOffsetWindowFunction])
80+
if (!hasOnlyOffsetFunctions && op.partitionSpec.nonEmpty && op.orderSpec.nonEmpty &&
7681
!validatePartitionAndSortSpecsForWindowFunc(op.partitionSpec, op.orderSpec, op)) {
7782
return None
7883
}
@@ -141,12 +146,27 @@ object CometWindowExec extends CometOperatorSerde[WindowExec] {
141146
}
142147
}.toArray
143148

144-
val (aggExpr, builtinFunc) = if (aggregateExpressions.nonEmpty) {
149+
val (aggExpr, builtinFunc, ignoreNulls) = if (aggregateExpressions.nonEmpty) {
145150
val modes = aggregateExpressions.map(_.mode).distinct
146151
assert(modes.size == 1 && modes.head == Complete)
147-
(aggExprToProto(aggregateExpressions.head, output, true, conf), None)
152+
(aggExprToProto(aggregateExpressions.head, output, true, conf), None, false)
148153
} else {
149-
(None, exprToProto(windowExpr.windowFunction, output))
154+
windowExpr.windowFunction match {
155+
case lag: Lag =>
156+
val inputExpr = exprToProto(lag.input, output)
157+
val offsetExpr = exprToProto(lag.inputOffset, output)
158+
val defaultExpr = exprToProto(lag.default, output)
159+
val func = scalarFunctionExprToProto("lag", inputExpr, offsetExpr, defaultExpr)
160+
(None, func, lag.ignoreNulls)
161+
case lead: Lead =>
162+
val inputExpr = exprToProto(lead.input, output)
163+
val offsetExpr = exprToProto(lead.offset, output)
164+
val defaultExpr = exprToProto(lead.default, output)
165+
val func = scalarFunctionExprToProto("lead", inputExpr, offsetExpr, defaultExpr)
166+
(None, func, lead.ignoreNulls)
167+
case _ =>
168+
(None, exprToProto(windowExpr.windowFunction, output), false)
169+
}
150170
}
151171

152172
if (aggExpr.isEmpty && builtinFunc.isEmpty) {
@@ -254,6 +274,7 @@ object CometWindowExec extends CometOperatorSerde[WindowExec] {
254274
.newBuilder()
255275
.setBuiltInWindowFunction(builtinFunc.get)
256276
.setSpec(spec)
277+
.setIgnoreNulls(ignoreNulls)
257278
.build())
258279
} else if (aggExpr.isDefined) {
259280
Some(
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one
2+
-- or more contributor license agreements. See the NOTICE file
3+
-- distributed with this work for additional information
4+
-- regarding copyright ownership. The ASF licenses this file
5+
-- to you under the Apache License, Version 2.0 (the
6+
-- "License"); you may not use this file except in compliance
7+
-- with the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing,
12+
-- software distributed under the License is distributed on an
13+
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
-- KIND, either express or implied. See the License for the
15+
-- specific language governing permissions and limitations
16+
-- under the License.
17+
18+
-- Config: spark.comet.operator.WindowExec.allowIncompatible=true
19+
20+
-- ============================================================
21+
-- Setup: shared tables
22+
-- ============================================================
23+
24+
statement
25+
CREATE TABLE test_lag_lead(id int, val int, grp string) USING parquet
26+
27+
statement
28+
INSERT INTO test_lag_lead VALUES
29+
(1, 10, 'a'),
30+
(2, 20, 'a'),
31+
(3, 30, 'a'),
32+
(4, 40, 'b'),
33+
(5, 50, 'b')
34+
35+
statement
36+
CREATE TABLE test_nulls(id int, val int, grp string) USING parquet
37+
38+
statement
39+
INSERT INTO test_nulls VALUES
40+
(1, NULL, 'a'),
41+
(2, 10, 'a'),
42+
(3, NULL, 'a'),
43+
(4, 20, 'a'),
44+
(5, NULL, 'b'),
45+
(6, 30, 'b'),
46+
(7, NULL, 'b')
47+
48+
statement
49+
CREATE TABLE test_all_nulls(id int, val int, grp string) USING parquet
50+
51+
statement
52+
INSERT INTO test_all_nulls VALUES
53+
(1, NULL, 'a'),
54+
(2, NULL, 'a'),
55+
(3, NULL, 'b'),
56+
(4, 1, 'b')
57+
58+
statement
59+
CREATE TABLE test_single_row(id int, val int) USING parquet
60+
61+
statement
62+
INSERT INTO test_single_row VALUES (1, 42)
63+
64+
statement
65+
CREATE TABLE test_types(
66+
id int,
67+
i_val int,
68+
l_val bigint,
69+
d_val double,
70+
s_val string,
71+
grp string
72+
) USING parquet
73+
74+
statement
75+
INSERT INTO test_types VALUES
76+
(1, NULL, NULL, NULL, NULL, 'a'),
77+
(2, 1, 100, 1.5, 'foo', 'a'),
78+
(3, 2, 200, 2.5, 'bar', 'a'),
79+
(4, NULL, NULL, NULL, NULL, 'b'),
80+
(5, 3, 300, 3.5, 'baz', 'b')
81+
82+
-- ############################################################
83+
-- LAG
84+
-- ############################################################
85+
86+
-- ============================================================
87+
-- lag: basic (default offset = 1)
88+
-- ============================================================
89+
90+
query
91+
SELECT id, val,
92+
LAG(val) OVER (ORDER BY id) as lag_val
93+
FROM test_lag_lead
94+
95+
query
96+
SELECT grp, id, val,
97+
LAG(val) OVER (PARTITION BY grp ORDER BY id) as lag_val
98+
FROM test_lag_lead
99+
100+
-- ============================================================
101+
-- lag: with explicit offset
102+
-- ============================================================
103+
104+
query
105+
SELECT id, val,
106+
LAG(val, 2) OVER (ORDER BY id) as lag_val_2
107+
FROM test_lag_lead
108+
109+
-- ============================================================
110+
-- lag: with offset and default value
111+
-- ============================================================
112+
113+
query
114+
SELECT id, val,
115+
LAG(val, 2, -1) OVER (ORDER BY id) as lag_val_2
116+
FROM test_lag_lead
117+
118+
-- ============================================================
119+
-- lag IGNORE NULLS: basic
120+
-- ============================================================
121+
122+
query
123+
SELECT id, val,
124+
LAG(val) IGNORE NULLS OVER (ORDER BY id) as lag_val
125+
FROM test_nulls
126+
127+
query
128+
SELECT grp, id, val,
129+
LAG(val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id) as lag_val
130+
FROM test_nulls
131+
132+
-- ============================================================
133+
-- lag IGNORE NULLS: all values null in a group
134+
-- ============================================================
135+
136+
query
137+
SELECT grp, id, val,
138+
LAG(val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id) as lag_val
139+
FROM test_all_nulls
140+
141+
-- ============================================================
142+
-- lag IGNORE NULLS: single row
143+
-- ============================================================
144+
145+
query
146+
SELECT id, val,
147+
LAG(val) IGNORE NULLS OVER (ORDER BY id) as lag_val
148+
FROM test_single_row
149+
150+
-- ============================================================
151+
-- lag IGNORE NULLS: multiple data types
152+
-- ============================================================
153+
154+
query
155+
SELECT grp, id,
156+
LAG(i_val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id),
157+
LAG(l_val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id),
158+
LAG(d_val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id),
159+
LAG(s_val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id)
160+
FROM test_types
161+
162+
-- ============================================================
163+
-- lag IGNORE NULLS: with offset > 1
164+
-- ============================================================
165+
166+
query
167+
SELECT id, val,
168+
LAG(val, 2) IGNORE NULLS OVER (ORDER BY id) as lag_val_2
169+
FROM test_nulls
170+
171+
-- ============================================================
172+
-- lag: contrast IGNORE NULLS vs RESPECT NULLS
173+
-- ============================================================
174+
175+
query
176+
SELECT id, val,
177+
LAG(val) OVER (ORDER BY id) as lag_respect,
178+
LAG(val) IGNORE NULLS OVER (ORDER BY id) as lag_ignore
179+
FROM test_nulls
180+
181+
-- ############################################################
182+
-- LEAD
183+
-- ############################################################
184+
185+
-- ============================================================
186+
-- lead: basic (default offset = 1)
187+
-- ============================================================
188+
189+
query
190+
SELECT id, val,
191+
LEAD(val) OVER (ORDER BY id) as lead_val
192+
FROM test_lag_lead
193+
194+
query
195+
SELECT grp, id, val,
196+
LEAD(val) OVER (PARTITION BY grp ORDER BY id) as lead_val
197+
FROM test_lag_lead
198+
199+
-- ============================================================
200+
-- lead: with explicit offset
201+
-- ============================================================
202+
203+
query
204+
SELECT id, val,
205+
LEAD(val, 2) OVER (ORDER BY id) as lead_val_2
206+
FROM test_lag_lead
207+
208+
-- ============================================================
209+
-- lead: with offset and default value
210+
-- ============================================================
211+
212+
query
213+
SELECT id, val,
214+
LEAD(val, 2, -1) OVER (ORDER BY id) as lead_val_2
215+
FROM test_lag_lead
216+
217+
-- ============================================================
218+
-- lead IGNORE NULLS: basic
219+
-- ============================================================
220+
221+
query
222+
SELECT id, val,
223+
LEAD(val) IGNORE NULLS OVER (ORDER BY id) as lead_val
224+
FROM test_nulls
225+
226+
query
227+
SELECT grp, id, val,
228+
LEAD(val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id) as lead_val
229+
FROM test_nulls
230+
231+
-- ============================================================
232+
-- lead IGNORE NULLS: all values null in a group
233+
-- ============================================================
234+
235+
query
236+
SELECT grp, id, val,
237+
LEAD(val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id) as lead_val
238+
FROM test_all_nulls
239+
240+
-- ============================================================
241+
-- lead IGNORE NULLS: single row
242+
-- ============================================================
243+
244+
query
245+
SELECT id, val,
246+
LEAD(val) IGNORE NULLS OVER (ORDER BY id) as lead_val
247+
FROM test_single_row
248+
249+
-- ============================================================
250+
-- lead IGNORE NULLS: multiple data types
251+
-- ============================================================
252+
253+
query
254+
SELECT grp, id,
255+
LEAD(i_val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id),
256+
LEAD(l_val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id),
257+
LEAD(d_val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id),
258+
LEAD(s_val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id)
259+
FROM test_types
260+
261+
-- ============================================================
262+
-- lead IGNORE NULLS: with offset > 1
263+
-- ============================================================
264+
265+
query
266+
SELECT id, val,
267+
LEAD(val, 2) IGNORE NULLS OVER (ORDER BY id) as lead_val_2
268+
FROM test_nulls
269+
270+
-- ============================================================
271+
-- lead: contrast IGNORE NULLS vs RESPECT NULLS
272+
-- ============================================================
273+
274+
query
275+
SELECT id, val,
276+
LEAD(val) OVER (ORDER BY id) as lead_respect,
277+
LEAD(val) IGNORE NULLS OVER (ORDER BY id) as lead_ignore
278+
FROM test_nulls
279+
280+
-- ############################################################
281+
-- LAG + LEAD combined
282+
-- ############################################################
283+
284+
query
285+
SELECT id, val,
286+
LAG(val) OVER (ORDER BY id) as lag_val,
287+
LEAD(val) OVER (ORDER BY id) as lead_val
288+
FROM test_lag_lead
289+
290+
query
291+
SELECT grp, id, val,
292+
LAG(val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id) as lag_ignore,
293+
LEAD(val) IGNORE NULLS OVER (PARTITION BY grp ORDER BY id) as lead_ignore
294+
FROM test_nulls

0 commit comments

Comments
 (0)