Skip to content

Commit a64c6ab

Browse files
authored
fix: Iceberg reflection for current() on TableOperations hierarchy (#3895)
1 parent a80a3a8 commit a64c6ab

8 files changed

Lines changed: 108 additions & 9 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ jobs:
298298
org.apache.spark.sql.comet.ParquetEncryptionITCase
299299
org.apache.comet.exec.CometNativeReaderSuite
300300
org.apache.comet.CometIcebergNativeSuite
301+
org.apache.comet.iceberg.IcebergReflectionSuite
301302
- name: "csv"
302303
value: |
303304
org.apache.comet.csv.CometCsvNativeReadSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ jobs:
175175
org.apache.spark.sql.comet.ParquetEncryptionITCase
176176
org.apache.comet.exec.CometNativeReaderSuite
177177
org.apache.comet.CometIcebergNativeSuite
178+
org.apache.comet.iceberg.IcebergReflectionSuite
178179
- name: "csv"
179180
value: |
180181
org.apache.comet.csv.CometCsvNativeReadSuite

dev/diffs/iceberg/1.10.0.diff

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,6 +1395,12 @@ index 16fa726032..64e367cf47 100644
13951395
.getOrCreate();
13961396

13971397
catalog =
1398+
@@ -202,3 +222,4 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
1399+
protected boolean countDeletes() {
1400+
- return true;
1401+
+ // TODO: Enable once iceberg-rust exposes delete count metrics to Comet
1402+
+ return false;
1403+
}
13981404
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
13991405
index baf7fa8f88..665946ad82 100644
14001406
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java

dev/diffs/iceberg/1.8.1.diff

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,6 +1497,12 @@ index dda49b4946..529992de6b 100644
14971497
.getOrCreate();
14981498

14991499
catalog =
1500+
@@ -201,3 +221,4 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
1501+
protected boolean countDeletes() {
1502+
- return true;
1503+
+ // TODO: Enable once iceberg-rust exposes delete count metrics to Comet
1504+
+ return false;
1505+
}
15001506
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
15011507
index e5831b76e4..5c45a111d9 100644
15021508
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java

dev/diffs/iceberg/1.9.1.diff

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,6 +1471,12 @@ index dda49b4946..529992de6b 100644
14711471
.getOrCreate();
14721472

14731473
catalog =
1474+
@@ -201,3 +221,4 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
1475+
protected boolean countDeletes() {
1476+
- return true;
1477+
+ // TODO: Enable once iceberg-rust exposes delete count metrics to Comet
1478+
+ return false;
1479+
}
14741480
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
14751481
index e5831b76e4..5c45a111d9 100644
14761482
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,18 @@ object IcebergReflection extends Logging {
228228
val opsMethod = table.getClass.getDeclaredMethod("operations")
229229
opsMethod.setAccessible(true)
230230
val ops = opsMethod.invoke(table)
231-
val currentMethod = ops.getClass.getDeclaredMethod("current")
232-
currentMethod.setAccessible(true)
233-
val metadata = currentMethod.invoke(ops)
234-
val formatVersionMethod = metadata.getClass.getMethod("formatVersion")
235-
Some(formatVersionMethod.invoke(metadata).asInstanceOf[Int])
231+
findMethodInHierarchy(ops.getClass, "current")
232+
.flatMap { currentMethod =>
233+
val metadata = currentMethod.invoke(ops)
234+
val formatVersionMethod = metadata.getClass.getMethod("formatVersion")
235+
Some(formatVersionMethod.invoke(metadata).asInstanceOf[Int])
236+
}
237+
.orElse {
238+
logError(
239+
"Iceberg reflection failure: Failed to get format version: " +
240+
"current() method not found in operations class hierarchy")
241+
None
242+
}
236243
} catch {
237244
case e: Exception =>
238245
logError(s"Iceberg reflection failure: Failed to get format version: ${e.getMessage}")
@@ -327,9 +334,12 @@ object IcebergReflection extends Logging {
327334
operationsMethod.setAccessible(true)
328335
val operations = operationsMethod.invoke(table)
329336

330-
val currentMethod = operations.getClass.getDeclaredMethod("current")
331-
currentMethod.setAccessible(true)
332-
Some(currentMethod.invoke(operations))
337+
findMethodInHierarchy(operations.getClass, "current").map(_.invoke(operations)).orElse {
338+
logError(
339+
"Iceberg reflection failure: Failed to get table metadata: " +
340+
"current() method not found in operations class hierarchy")
341+
None
342+
}
333343
} catch {
334344
case e: Exception =>
335345
logError(s"Iceberg reflection failure: Failed to get table metadata: ${e.getMessage}")

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,11 @@ case class CometScanRule(session: SparkSession)
399399
// Check if table uses a FileIO implementation compatible with iceberg-rust
400400

401401
val fileIOCompatible = IcebergReflection.getFileIO(metadata.table) match {
402+
case Some(fileIO)
403+
if fileIO.getClass.getName == "org.apache.iceberg.inmemory.InMemoryFileIO" =>
404+
fallbackReasons += "InMemoryFileIO is not supported by Comet's native reader"
405+
false
402406
case Some(_) =>
403-
// InMemoryFileIO is now supported with table location fallback for REST catalogs
404407
true
405408
case None =>
406409
fallbackReasons += "Could not check FileIO compatibility"
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.iceberg
21+
22+
import java.util.Collections
23+
24+
import org.scalatest.funsuite.AnyFunSuite
25+
26+
import org.apache.iceberg.BaseMetastoreTableOperations
27+
import org.apache.iceberg.BaseTable
28+
import org.apache.iceberg.Schema
29+
import org.apache.iceberg.TableMetadata
30+
import org.apache.iceberg.io.FileIO
31+
import org.apache.iceberg.types.Types
32+
33+
class IcebergReflectionSuite extends AnyFunSuite {
34+
35+
/** Mimics HiveTableOperations/GlueTableOperations which inherit current(). */
36+
class StubTableOperations extends BaseMetastoreTableOperations {
37+
override protected def tableName(): String = "test"
38+
override def refresh(): TableMetadata = null
39+
override def io(): FileIO = null
40+
}
41+
42+
test("getTableMetadata succeeds when operations class inherits current()") {
43+
val ops = new StubTableOperations()
44+
val schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))
45+
val expectedMetadata = TableMetadata.newTableMetadata(
46+
schema,
47+
org.apache.iceberg.PartitionSpec.unpartitioned(),
48+
"file:///tmp/test-table",
49+
Collections.emptyMap[String, String]())
50+
val metadataField = classOf[BaseMetastoreTableOperations]
51+
.getDeclaredField("currentMetadata")
52+
metadataField.setAccessible(true)
53+
metadataField.set(ops, expectedMetadata)
54+
// current() checks shouldRefresh (default true) and calls refresh() instead of
55+
// returning currentMetadata. Set to false so current() returns our stubbed metadata.
56+
val refreshField = classOf[BaseMetastoreTableOperations]
57+
.getDeclaredField("shouldRefresh")
58+
refreshField.setAccessible(true)
59+
refreshField.set(ops, false)
60+
61+
val table = new BaseTable(ops, "test-table")
62+
val metadata = IcebergReflection.getTableMetadata(table)
63+
assert(metadata.isDefined)
64+
assert(metadata.get.isInstanceOf[TableMetadata])
65+
}
66+
}

0 commit comments

Comments
 (0)