Skip to content

Commit 98e8096

Browse files
committed
[SPARK] Implement ReflectionCache for Iceberg serialization optimization (apache#3456)
This PR re-implements the reflection caching optimizations from PR apache#3298 that were lost during subsequent refactoring in apache#3349 and apache#3443. Changes: - Add ReflectionCache case class to cache all Iceberg classes and methods - Update serializePartitions() to create cache once and pass to helper methods - Update extractDeleteFilesList() to use cached reflection methods - Update serializePartitionData() to use cached reflection methods - Add field ID mapping cache to avoid redundant buildFieldIdMapping() calls - Add benchmark (CometIcebergSerializationBenchmark) to measure improvements Performance improvement: ~28% faster serialization (7,235ms -> 5,211ms for 10,000 partitions/tasks) by eliminating repeated Class.forName() and getMethod() calls that would otherwise execute tens of thousands of times.
1 parent 3e9f850 commit 98e8096

4 files changed

Lines changed: 809 additions & 63 deletions

File tree

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Iceberg Serialization Optimization Analysis
21+
22+
**GitHub Issue:** [#3456](https://github.com/apache/datafusion-comet/issues/3456)
23+
**Date:** 2026-02-20
24+
**Branch:** `feature/iceberg-serialization-optimizations-3456`
25+
26+
## Executive Summary
27+
28+
PR #3298 introduced **~50% faster** Iceberg serialization through reflection caching and deduplication optimizations. However, subsequent PRs #3349 and #3443 significantly refactored the code, and **most of these optimizations were lost**. This analysis identifies which optimizations can be re-applied to the current codebase.
29+
30+
---
31+
32+
## PR #3298 Original Optimizations
33+
34+
### 1. ReflectionCache Case Class
35+
**Status: ❌ Removed**
36+
37+
PR #3298 introduced a comprehensive `ReflectionCache` that cached all Iceberg classes and methods once:
38+
39+
```scala
40+
case class ReflectionCache(
41+
// Iceberg classes (loaded once)
42+
contentScanTaskClass: Class[_],
43+
fileScanTaskClass: Class[_],
44+
contentFileClass: Class[_],
45+
deleteFileClass: Class[_],
46+
schemaParserClass: Class[_],
47+
// ... many more
48+
49+
// Cached methods (looked up once)
50+
fileMethod: java.lang.reflect.Method,
51+
startMethod: java.lang.reflect.Method,
52+
deletesMethod: java.lang.reflect.Method,
53+
// ... 20+ cached methods
54+
)
55+
```
56+
57+
**Impact:** Avoided 30,000+ `Class.forName()` and `getMethod()` calls per query.
58+
59+
### 2. Partition Spec Deduplication by Object Identity
60+
**Status: ❌ Reverted to JSON string comparison**
61+
62+
PR #3298 used object identity for deduplication:
63+
```scala
64+
// PR #3298: Object identity (fast)
65+
partitionSpecToPoolIndex = mutable.HashMap[AnyRef, Int]()
66+
67+
// Current code: JSON string (slow)
68+
partitionSpecToPoolIndex = mutable.HashMap[String, Int]()
69+
```
70+
71+
**Impact:** Avoided redundant `toJson()` calls for duplicate specs.
72+
73+
### 3. Partition Type Deduplication by Spec Identity
74+
**Status: ❌ Reverted to JSON string comparison**
75+
76+
Same spec → same partition type. PR #3298 cached this relationship:
77+
```scala
78+
// PR #3298: Spec identity → type index
79+
partitionTypeToPoolIndex = mutable.HashMap[AnyRef, Int]()
80+
81+
// Current code: JSON string comparison
82+
partitionTypeToPoolIndex = mutable.HashMap[String, Int]()
83+
```
84+
85+
### 4. Field ID Mapping Cache
86+
**Status: ❌ Removed**
87+
88+
PR #3298 cached `buildFieldIdMapping()` results by schema identity:
89+
```scala
90+
// PR #3298
91+
val fieldIdMappingCache = mutable.HashMap[AnyRef, Map[String, Int]]()
92+
val nameToFieldId = fieldIdMappingCache.getOrElseUpdate(
93+
schema,
94+
IcebergReflection.buildFieldIdMapping(schema))
95+
96+
// Current code: Called every task iteration (line 878)
97+
val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema)
98+
```
99+
100+
**Impact:** `buildFieldIdMapping()` does reflection per-column. With 30K tasks × 10 columns = 300K redundant reflection calls.
101+
102+
### 5. Delete Files Method Caching
103+
**Status: ⚠️ Partially present**
104+
105+
Current code caches some methods but not all delete-related ones:
106+
- `deletesMethod` is NOT cached (called via `IcebergReflection.getDeleteFilesFromTask()`)
107+
- `contentMethod`, `specIdMethod`, `equalityIdsMethod` are looked up per-delete-file
108+
109+
---
110+
111+
## Current Code State
112+
113+
### What's Preserved ✅
114+
115+
In `serializePartitions()` (lines 773-789), some method caching exists:
116+
```scala
117+
// Load Iceberg classes once
118+
val contentScanTaskClass = Class.forName(...)
119+
val fileScanTaskClass = Class.forName(...)
120+
121+
// Cache method lookups
122+
val fileMethod = contentScanTaskClass.getMethod("file")
123+
val startMethod = contentScanTaskClass.getMethod("start")
124+
val lengthMethod = contentScanTaskClass.getMethod("length")
125+
val residualMethod = contentScanTaskClass.getMethod("residual")
126+
val taskSchemaMethod = fileScanTaskClass.getMethod("schema")
127+
val toJsonMethod = schemaParserClass.getMethod("toJson", schemaClass)
128+
```
129+
130+
### What's Missing ❌
131+
132+
| Optimization | Location | Impact |
133+
|-------------|----------|--------|
134+
| `deletesMethod` caching | `extractDeleteFilesList()` | Per-task overhead |
135+
| `specMethod` caching | `serializePartitionData()` line 318 | Per-task overhead |
136+
| `partitionTypeMethod` caching | `serializePartitionData()` line 353 | Per-task overhead |
137+
| `fieldsMethod` caching | `serializePartitionData()` line 357 | Per-task overhead |
138+
| `fieldIdMethod`, `nameMethod`, `isOptionalMethod` | `serializePartitionData()` lines 390-397 | Per-field per-task |
139+
| `contentMethod`, `specIdMethod`, `equalityIdsMethod` | `extractDeleteFilesList()` | Per-delete-file |
140+
| Field ID mapping cache | `serializePartitions()` line 878 | Per-task schema reflection |
141+
| Object identity deduplication | Pool index maps | Per-task JSON serialization |
142+
143+
---
144+
145+
## Recommended Optimizations
146+
147+
### Priority 1: Restore ReflectionCache (High Impact)
148+
149+
Create a comprehensive cache in `IcebergReflection.scala`:
150+
151+
```scala
152+
case class ReflectionCache(
153+
// Classes
154+
contentScanTaskClass: Class[_],
155+
fileScanTaskClass: Class[_],
156+
contentFileClass: Class[_],
157+
deleteFileClass: Class[_],
158+
schemaParserClass: Class[_],
159+
schemaClass: Class[_],
160+
partitionSpecParserClass: Class[_],
161+
partitionSpecClass: Class[_],
162+
structTypeClass: Class[_],
163+
nestedFieldClass: Class[_],
164+
structLikeClass: Class[_],
165+
166+
// ContentScanTask methods
167+
fileMethod: Method,
168+
startMethod: Method,
169+
lengthMethod: Method,
170+
partitionMethod: Method,
171+
residualMethod: Method,
172+
173+
// FileScanTask methods
174+
taskSchemaMethod: Method,
175+
deletesMethod: Method,
176+
specMethod: Method,
177+
178+
// ContentFile methods
179+
fileLocationMethod: Method,
180+
181+
// DeleteFile methods
182+
deleteContentMethod: Method,
183+
deleteSpecIdMethod: Method,
184+
deleteEqualityIdsMethod: Method,
185+
186+
// Schema methods
187+
schemaToJsonMethod: Method,
188+
189+
// PartitionSpec methods
190+
partitionSpecToJsonMethod: Method,
191+
partitionTypeMethod: Method,
192+
193+
// StructType/NestedField methods
194+
structTypeFieldsMethod: Method,
195+
nestedFieldTypeMethod: Method,
196+
nestedFieldIdMethod: Method,
197+
nestedFieldNameMethod: Method,
198+
nestedFieldIsOptionalMethod: Method,
199+
200+
// StructLike methods
201+
structLikeGetMethod: Method
202+
)
203+
204+
def createReflectionCache(): ReflectionCache = {
205+
// Load all classes and methods once
206+
// ...
207+
}
208+
```
209+
210+
### Priority 2: Restore Field ID Mapping Cache (Medium Impact)
211+
212+
In `serializePartitions()`:
213+
214+
```scala
215+
// Add cache before the loop
216+
val fieldIdMappingCache = mutable.HashMap[AnyRef, Map[String, Int]]()
217+
218+
// Inside the loop, replace:
219+
val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema)
220+
221+
// With:
222+
val nameToFieldId = fieldIdMappingCache.getOrElseUpdate(
223+
schema,
224+
IcebergReflection.buildFieldIdMapping(schema))
225+
```
226+
227+
### Priority 3: Restore Object Identity Deduplication (Medium Impact)
228+
229+
Change pool index map types:
230+
231+
```scala
232+
// From:
233+
val partitionTypeToPoolIndex = mutable.HashMap[String, Int]()
234+
val partitionSpecToPoolIndex = mutable.HashMap[String, Int]()
235+
236+
// To:
237+
val partitionTypeToPoolIndex = mutable.HashMap[AnyRef, Int]() // Spec identity
238+
val partitionSpecToPoolIndex = mutable.HashMap[AnyRef, Int]() // Object identity
239+
```
240+
241+
Then update `serializePartitionData()` to use spec object as key instead of JSON string.
242+
243+
### Priority 4: Pass Cache to Helper Methods (Low Impact)
244+
245+
Update method signatures to accept the cache:
246+
247+
```scala
248+
// From:
249+
private def extractDeleteFilesList(
250+
task: Any,
251+
contentFileClass: Class[_],
252+
fileScanTaskClass: Class[_]): Seq[...]
253+
254+
// To:
255+
private def extractDeleteFilesList(
256+
task: Any,
257+
cache: ReflectionCache): Seq[...]
258+
```
259+
260+
---
261+
262+
## Estimated Performance Impact
263+
264+
Based on PR #3298 benchmark (30,000 tasks):
265+
266+
| Metric | Before Optimization | After Optimization | Improvement |
267+
|--------|--------------------|--------------------|-------------|
268+
| Serialization time | 34,425 ms | 16,618 ms | **52% faster** |
269+
| Reflection calls | ~1M+ | ~100 | **99.99% reduction** |
270+
| JSON serializations | ~60K | ~100 | **99.8% reduction** |
271+
272+
---
273+
274+
## Implementation Plan
275+
276+
1. **Phase 1:** Add `ReflectionCache` to `IcebergReflection.scala`
277+
2. **Phase 2:** Update `serializePartitions()` to create cache once and pass to helpers
278+
3. **Phase 3:** Update `extractDeleteFilesList()` to use cache
279+
4. **Phase 4:** Update `serializePartitionData()` to use cache
280+
5. **Phase 5:** Add field ID mapping cache
281+
6. **Phase 6:** Restore object identity deduplication for pools
282+
7. **Phase 7:** Add benchmark test to validate improvements
283+
284+
---
285+
286+
## Files to Modify
287+
288+
1. `spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala`
289+
- Add `ReflectionCache` case class
290+
- Add `createReflectionCache()` method
291+
292+
2. `spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala`
293+
- Update `serializePartitions()` to use cache
294+
- Update `extractDeleteFilesList()` signature and implementation
295+
- Update `serializePartitionData()` signature and implementation
296+
- Add field ID mapping cache
297+
- Change pool index map types to use object identity
298+
299+
---
300+
301+
## References
302+
303+
- PR #3298: Original optimizations (merged Jan 2026)
304+
- PR #3349: Per-partition plan building refactor (merged Feb 2026)
305+
- PR #3443: Validation cleanup (merged Feb 2026)
306+
- Issue #3456: This analysis task

0 commit comments

Comments
 (0)