Skip to content

Commit 4e13899

Browse files
committed
Add LRU cache to IcebergPlanDataInjector.
1 parent 212ebef commit 4e13899

1 file changed

Lines changed: 24 additions & 10 deletions

File tree

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.spark.sql.comet
2121

22+
import java.util
2223
import java.util.Locale
2324

2425
import scala.collection.mutable
@@ -140,13 +141,22 @@ private[comet] object PlanDataInjector {
140141
*/
141142
private[comet] object IcebergPlanDataInjector extends PlanDataInjector {
142143
import java.nio.ByteBuffer
143-
import java.util.concurrent.ConcurrentHashMap
144-
145-
// Cache parsed IcebergScanCommon by content to avoid repeated deserialization
146-
// ByteBuffer wrapper provides content-based equality and hashCode
147-
// TODO: This is a static singleton on the executor, should we cap the size (proper LRU cache?)
148-
private val commonCache =
149-
new ConcurrentHashMap[ByteBuffer, OperatorOuterClass.IcebergScanCommon]()
144+
import java.util.{LinkedHashMap, Map => JMap}
145+
146+
private final val maxCacheEntries = 16
147+
148+
// Cache parsed IcebergScanCommon to avoid reparsing for Iceberg tables with large numbers of
149+
// partitions (thousands or more) that may repeatedly parse the same commonBytes.
150+
// IcebergPlanDataInjector is a singleton, so we use an LRU cache to eventually evict old
151+
// IcebergScanCommon objects. 16 seems like a reasonable starting point since these objects
152+
// are not large. Thread-safe LinkedHashMap with accessOrder=true provides LRU ordering.
153+
private val commonCache = java.util.Collections.synchronizedMap(
154+
new LinkedHashMap[ByteBuffer, OperatorOuterClass.IcebergScanCommon](4, 0.75f, true) {
155+
override def removeEldestEntry(
156+
eldest: JMap.Entry[ByteBuffer, OperatorOuterClass.IcebergScanCommon]): Boolean = {
157+
size() > maxCacheEntries
158+
}
159+
})
150160

151161
override def canInject(op: Operator): Boolean =
152162
op.hasIcebergScan &&
@@ -164,9 +174,13 @@ private[comet] object IcebergPlanDataInjector extends PlanDataInjector {
164174

165175
// Cache the parsed common data to avoid deserializing on every partition
166176
val cacheKey = ByteBuffer.wrap(commonBytes)
167-
val common = commonCache.computeIfAbsent(
168-
cacheKey,
169-
_ => OperatorOuterClass.IcebergScanCommon.parseFrom(commonBytes))
177+
val common = commonCache.synchronized {
178+
Option(commonCache.get(cacheKey)).getOrElse {
179+
val parsed = OperatorOuterClass.IcebergScanCommon.parseFrom(commonBytes)
180+
commonCache.put(cacheKey, parsed)
181+
parsed
182+
}
183+
}
170184

171185
val tasksOnly = OperatorOuterClass.IcebergScan.parseFrom(partitionBytes)
172186

0 commit comments

Comments
 (0)