Skip to content

Commit 88f8b68

Browse files
committed
build: Add spark-4.1 profile and shims
1 parent 1ec3563 commit 88f8b68

28 files changed

Lines changed: 973 additions & 9 deletions

.github/workflows/pr_build_linux.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ jobs:
9898
java_version: "17"
9999
maven_opts: "-Pspark-4.0"
100100
scan_impl: "native_comet"
101+
102+
- name: "Spark 4.1, JDK 17"
103+
java_version: "17"
104+
maven_opts: "-Pspark-4.1"
105+
scan_impl: "native_comet"
101106
suite:
102107
- name: "fuzz"
103108
value: |
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.internal.types.StringTypeWithCollation
23+
import org.apache.spark.sql.types.DataType
24+
25+
trait CometTypeShim {
26+
def isStringCollationType(dt: DataType): Boolean = dt.isInstanceOf[StringTypeWithCollation]
27+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.paths.SparkPath
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.execution.datasources.PartitionedFile
25+
26+
object ShimBatchReader {
27+
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
28+
PartitionedFile(
29+
partitionValues,
30+
SparkPath.fromUrlString(file),
31+
-1, // -1 means we read the entire file
32+
-1,
33+
Array.empty[String],
34+
0,
35+
0)
36+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
trait ShimCometConf {
23+
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true
24+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
23+
import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil
24+
import org.apache.spark.sql.types.StructType
25+
26+
object ShimFileFormat {
27+
// A name for a temporary column that holds row indexes computed by the file format reader
28+
// until they can be placed in the _metadata struct.
29+
val ROW_INDEX_TEMPORARY_COLUMN_NAME = ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
30+
31+
def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
32+
ParquetRowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
33+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import org.apache.spark.executor.TaskMetrics
23+
import org.apache.spark.util.AccumulatorV2
24+
25+
object ShimTaskMetrics {
26+
27+
def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
28+
taskMetrics._externalAccums.lastOption
29+
}

pom.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,33 @@ under the License.
651651
</properties>
652652
</profile>
653653

654+
<profile>
655+
<!-- FIXME: this is WIP. Tests may fail https://github.com/apache/datafusion-comet/issues/551 -->
656+
<id>spark-4.1</id>
657+
<properties>
658+
<!-- Use Scala 2.13 by default -->
659+
<scala.version>2.13.17</scala.version>
660+
<scala.binary.version>2.13</scala.binary.version>
661+
<spark.version>4.1.0</spark.version>
662+
<spark.version.short>4.1</spark.version.short>
663+
<parquet.version>1.16.0</parquet.version>
664+
<semanticdb.version>4.13.9</semanticdb.version>
665+
<slf4j.version>2.0.17</slf4j.version>
666+
<shims.majorVerSrc>spark-4.1</shims.majorVerSrc>
667+
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
668+
<!-- Use jdk17 by default -->
669+
<java.version>17</java.version>
670+
<maven.compiler.source>${java.version}</maven.compiler.source>
671+
<maven.compiler.target>${java.version}</maven.compiler.target>
672+
</properties>
673+
<repositories>
674+
<repository>
675+
<id>apache-staging</id>
676+
<url>https://repository.apache.org/content/repositories/orgapachespark-1506/</url>
677+
</repository>
678+
</repositories>
679+
</profile>
680+
654681
<profile>
655682
<id>scala-2.12</id>
656683
</profile>

spark/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,19 @@ under the License.
217217
</dependency>
218218
</dependencies>
219219
</profile>
220+
221+
<profile>
222+
<id>spark-4.1</id>
223+
<dependencies>
224+
<dependency>
225+
<groupId>org.apache.iceberg</groupId>
226+
<!-- TODO: Upgrade after iceberg-spark-runtime-4.1_2.13 release -->
227+
<artifactId>iceberg-spark-runtime-4.0_${scala.binary.version}</artifactId>
228+
<version>1.10.0</version>
229+
<scope>test</scope>
230+
</dependency>
231+
</dependencies>
232+
</profile>
220233
</profiles>
221234

222235
<build>

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
172172
.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE)
173173
.getPartitionLengths();
174174
mapStatus =
175-
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
175+
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId, 0);
176176
return;
177177
}
178178
final long openStartTime = System.nanoTime();
@@ -261,7 +261,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
261261

262262
// TODO: We probably can move checksum generation here when concatenating partition files
263263
partitionLengths = writePartitionedData(mapOutputWriter);
264-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
264+
mapStatus =
265+
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId, 0);
265266
} catch (Exception e) {
266267
try {
267268
mapOutputWriter.abort(e);

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ void closeAndWriteOutput() throws IOException {
288288
}
289289
}
290290
}
291-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
291+
mapStatus =
292+
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId, 0);
292293
}
293294

294295
@VisibleForTesting

0 commit comments

Comments
 (0)