Skip to content

Commit 02a52a3

Browse files
committed
cache parsed commonData.
1 parent aa048a7 commit 02a52a3

1 file changed

Lines changed: 15 additions & 1 deletion

File tree

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ private[comet] object PlanDataInjector {
139139
* Injector for Iceberg scan operators.
140140
*/
141141
private[comet] object IcebergPlanDataInjector extends PlanDataInjector {
142+
import java.nio.ByteBuffer
143+
import java.util.concurrent.ConcurrentHashMap
144+
145+
// Cache parsed IcebergScanCommon by content to avoid repeated deserialization
146+
// ByteBuffer wrapper provides content-based equality and hashCode
147+
// TODO: This is a static singleton on the executor, should we cap the size (proper LRU cache?)
148+
private val commonCache =
149+
new ConcurrentHashMap[ByteBuffer, OperatorOuterClass.IcebergScanCommon]()
142150

143151
override def canInject(op: Operator): Boolean =
144152
op.hasIcebergScan &&
@@ -153,7 +161,13 @@ private[comet] object IcebergPlanDataInjector extends PlanDataInjector {
153161
commonBytes: Array[Byte],
154162
partitionBytes: Array[Byte]): Operator = {
155163
val scan = op.getIcebergScan
156-
val common = OperatorOuterClass.IcebergScanCommon.parseFrom(commonBytes)
164+
165+
// Cache the parsed common data to avoid deserializing on every partition
166+
val cacheKey = ByteBuffer.wrap(commonBytes)
167+
val common = commonCache.computeIfAbsent(
168+
cacheKey,
169+
_ => OperatorOuterClass.IcebergScanCommon.parseFrom(commonBytes))
170+
157171
val tasksOnly = OperatorOuterClass.IcebergScan.parseFrom(partitionBytes)
158172

159173
val scanBuilder = scan.toBuilder

0 commit comments

Comments
 (0)