Skip to content

Commit 51f8c42

Browse files
committed
Merge branch 'main' into iceberg-split-serialization-dpp
2 parents b54c87a + aa5afd6 commit 51f8c42

15 files changed

Lines changed: 1054 additions & 925 deletions

File tree

.github/workflows/iceberg_spark_test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ jobs:
7777
# Use CI profile for faster builds (no LTO) and to share cache with pr_build_linux.yml.
7878
run: |
7979
cd native && cargo build --profile ci
80+
env:
81+
RUSTFLAGS: "-Ctarget-cpu=x86-64-v3"
8082

8183
- name: Save Cargo cache
8284
uses: actions/cache/save@v5

.github/workflows/pr_build_linux.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ jobs:
9494
# CI profile: same overflow behavior as release, but faster compilation
9595
# (no LTO, parallel codegen)
9696
cargo build --profile ci
97+
env:
98+
RUSTFLAGS: "-Ctarget-cpu=x86-64-v3"
9799

98100
- name: Upload native library
99101
uses: actions/upload-artifact@v6

.github/workflows/pr_build_macos.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ jobs:
9494
# CI profile: same overflow behavior as release, but faster compilation
9595
# (no LTO, parallel codegen)
9696
cargo build --profile ci
97+
env:
98+
RUSTFLAGS: "-Ctarget-cpu=apple-m1"
9799

98100
- name: Upload native library
99101
uses: actions/upload-artifact@v6

.github/workflows/spark_sql_test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ jobs:
8383
run: |
8484
cd native
8585
cargo build --profile ci
86+
env:
87+
RUSTFLAGS: "-Ctarget-cpu=x86-64-v3"
8688

8789
- name: Upload native library
8890
uses: actions/upload-artifact@v6

dev/regenerate-golden-files.sh

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,6 @@ build_native() {
7474
cd native && cargo build && cd ..
7575
}
7676

77-
# Install Comet for a specific Spark version
78-
install_for_spark_version() {
79-
local spark_version=$1
80-
echo ""
81-
echo "=============================================="
82-
echo "[INFO] Installing Comet for Spark $spark_version"
83-
echo "=============================================="
84-
./mvnw install -DskipTests -Pspark-$spark_version
85-
}
86-
8777
# Regenerate golden files for a specific Spark version
8878
regenerate_golden_files() {
8979
local spark_version=$1
@@ -94,12 +84,12 @@ regenerate_golden_files() {
9484
echo "=============================================="
9585

9686
echo "[INFO] Running CometTPCDSV1_4_PlanStabilitySuite..."
97-
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -pl spark \
87+
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw \
9888
-Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" \
9989
-Pspark-$spark_version -nsu test
10090

10191
echo "[INFO] Running CometTPCDSV2_7_PlanStabilitySuite..."
102-
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -pl spark \
92+
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw \
10393
-Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" \
10494
-Pspark-$spark_version -nsu test
10595
}
@@ -158,9 +148,8 @@ main() {
158148
versions=("3.4" "3.5" "4.0")
159149
fi
160150

161-
# Install and regenerate for each version
151+
# Regenerate for each version
162152
for version in "${versions[@]}"; do
163-
install_for_spark_version "$version"
164153
regenerate_golden_files "$version"
165154
done
166155

docs/source/contributor-guide/development.md

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -191,52 +191,43 @@ Spark version, and runs the plan stability tests with `SPARK_GENERATE_GOLDEN_FIL
191191

192192
Alternatively, you can run the tests manually using the following commands.
193193

194-
First, Comet needs to be installed for each Spark version to be tested:
195-
196-
```sh
197-
./mvnw install -DskipTests -Pspark-3.4
198-
./mvnw install -DskipTests -Pspark-3.5
199-
# note that Spark 4.0 requires JDK 17 or later
200-
./mvnw install -DskipTests -Pspark-4.0
201-
```
202-
203194
Note that the output files get written to `$SPARK_HOME`.
204195

205196
The tests can be run with:
206197

207198
```sh
208199
export SPARK_HOME=`pwd`
209-
./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-3.4 -nsu test
210-
./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-3.5 -nsu test
211-
./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-4.0 -nsu test
200+
./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-3.4 -nsu test
201+
./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-3.5 -nsu test
202+
./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-4.0 -nsu test
212203
```
213204

214205
and
215206

216207
```sh
217208
export SPARK_HOME=`pwd`
218-
./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-3.4 -nsu test
219-
./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-3.5 -nsu test
220-
./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-4.0 -nsu test
209+
./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-3.4 -nsu test
210+
./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-3.5 -nsu test
211+
./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-4.0 -nsu test
221212
```
222213

223214
If your pull request changes the query plans generated by Comet, you should regenerate the golden files.
224215
To regenerate the golden files, you can run the following commands.
225216

226217
```sh
227218
export SPARK_HOME=`pwd`
228-
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-3.4 -nsu test
229-
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-3.5 -nsu test
230-
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-4.0 -nsu test
219+
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-3.4 -nsu test
220+
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-3.5 -nsu test
221+
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV1_4_PlanStabilitySuite" -Pspark-4.0 -nsu test
231222
```
232223

233224
and
234225

235226
```sh
236227
export SPARK_HOME=`pwd`
237-
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-3.4 -nsu test
238-
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-3.5 -nsu test
239-
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -pl spark -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-4.0 -nsu test
228+
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-3.4 -nsu test
229+
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-3.5 -nsu test
230+
SPARK_GENERATE_GOLDEN_FILES=1 ./mvnw -Dsuites="org.apache.spark.sql.comet.CometTPCDSV2_7_PlanStabilitySuite" -Pspark-4.0 -nsu test
240231
```
241232

242233
## Benchmark

docs/source/contributor-guide/sql-file-tests.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,21 @@ way to add expression and operator test coverage without writing Scala test code
2525

2626
## Running the tests
2727

28+
Run all SQL file tests:
29+
30+
```shell
31+
./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite" -Dtest=none
32+
```
33+
34+
Run a single test file by adding the file name (without `.sql` extension) after the suite name:
35+
2836
```shell
29-
mvn test -pl spark -Dsuites="org.apache.comet.CometSqlFileTestSuite" -Dtest=none
37+
./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite create_named_struct" -Dtest=none
3038
```
3139

40+
This uses ScalaTest's substring matching, so the argument must match part of the test name.
41+
Test names follow the pattern `sql-file: expressions/<category>/<file>.sql [<config>]`.
42+
3243
## Test file location
3344

3445
SQL test files live under:
@@ -208,7 +219,7 @@ SELECT space(n) FROM test_space WHERE n < 0
208219
6. Run the tests to verify:
209220

210221
```shell
211-
mvn test -pl spark -Dsuites="org.apache.comet.CometSqlFileTestSuite" -Dtest=none
222+
./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite" -Dtest=none
212223
```
213224

214225
### Tips for writing thorough tests

native/core/src/execution/shuffle/comet_partitioning.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,26 @@ impl CometPartitioning {
4646
}
4747
}
4848
}
49+
50+
pub(super) fn pmod(hash: u32, n: usize) -> usize {
51+
let hash = hash as i32;
52+
let n = n as i32;
53+
let r = hash % n;
54+
let result = if r < 0 { (r + n) % n } else { r };
55+
result as usize
56+
}
57+
58+
#[cfg(test)]
59+
mod tests {
60+
use super::*;
61+
62+
#[test]
63+
fn test_pmod() {
64+
let i: Vec<u32> = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 0xcd1e64fb];
65+
let result = i.into_iter().map(|i| pmod(i, 200)).collect::<Vec<usize>>();
66+
67+
// expected partition from Spark with n=200
68+
let expected = vec![69, 5, 193, 171, 115];
69+
assert_eq!(result, expected);
70+
}
71+
}

native/core/src/execution/shuffle/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
pub(crate) mod codec;
1919
mod comet_partitioning;
2020
mod metrics;
21+
mod partitioners;
2122
mod shuffle_writer;
2223
pub mod spark_unsafe;
2324
mod writers;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
mod multi_partition;
19+
mod partitioned_batch_iterator;
20+
mod single_partition;
21+
22+
use arrow::record_batch::RecordBatch;
23+
use datafusion::common::Result;
24+
25+
pub(super) use multi_partition::MultiPartitionShuffleRepartitioner;
26+
pub(super) use partitioned_batch_iterator::PartitionedBatchIterator;
27+
pub(super) use single_partition::SinglePartitionShufflePartitioner;
28+
29+
#[async_trait::async_trait]
30+
pub(super) trait ShufflePartitioner: Send + Sync {
31+
/// Insert a batch into the partitioner
32+
async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;
33+
/// Write shuffle data and shuffle index file to disk
34+
fn shuffle_write(&mut self) -> Result<()>;
35+
}

0 commit comments

Comments
 (0)