From 0882020549c561e1bd8ef99df6ffe0a649dbdcd9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 3 May 2026 13:21:38 -0600 Subject: [PATCH 1/2] test: relax bytesRead ratio assertion for Spark 4.1+ Closes #4194. Spark 4.1 changed `ParquetFileFormat` to pre-open the `SeekableInputStream` and read the file footer outside the `FileScanRDD.compute()` thread. Spark's `inputMetrics.bytesRead` is updated from a Hadoop FileSystem thread-local byte counter (`SparkHadoopUtil.getFSBytesReadOnThreadCallback`) that only captures reads on the compute() thread, so reads served by the pre-opened stream's internal buffer go uncounted. Spark's reported `bytesRead` is now 5-15x smaller than the actual file IO for the small files used in `CometTaskMetricsSuite` (the discrepancy shrinks for larger files where row-group reads cross buffer boundaries). Comet (via DataFusion's `bytes_scanned`) still reports actual file IO. The existing 0.7-1.3 ratio assertion treats the truthful Comet value as broken; on Spark 4.1+ it is the only meaningful number. Replace the ratio assertion with a Spark-version-aware helper: - Spark <= 4.0: keep the tight 0.7-1.3 band (semantics unchanged). - Spark 4.1+: assert only `cometBytes >= sparkBytes` and `> 0`, since the Spark side under-reports. `recordsRead` is unaffected and continues to be checked exactly. The `assume(!isSpark41Plus, ...)` guards on the three affected tests are removed. Adds a doc section in `docs/source/user-guide/latest/metrics.md` describing the difference, when it applies, and that it is purely observability (does not affect Spark's planner, optimizer, or AQE). --- docs/source/user-guide/latest/metrics.md | 32 +++++++++++++++ .../sql/comet/CometTaskMetricsSuite.scala | 40 ++++++++++++------- 2 files changed, 57 insertions(+), 15 deletions(-) diff --git a/docs/source/user-guide/latest/metrics.md b/docs/source/user-guide/latest/metrics.md index 509d0ae8c0..9cebe3effd 100644 --- a/docs/source/user-guide/latest/metrics.md +++ b/docs/source/user-guide/latest/metrics.md @@ -64,3 +64,35 @@ Here is a guide to some of the native metrics. | `ipc_time` | Time to encode batches in IPC format and compress using ZSTD. | | `mempool_time` | Time interacting with memory pool. | | `write_time` | Time spent writing bytes to disk. | + +## Task-Level Input Metrics on Spark 4.1+ + +Comet's native scans set `inputMetrics.bytesRead` to the actual file IO performed by the +DataFusion parquet reader (`bytes_scanned`). This is the truthful number you would see at the +filesystem layer. + +Spark 4.1 changed its own parquet reader to pre-open the `SeekableInputStream` and read the file +footer outside the `FileScanRDD.compute()` thread. Spark's `inputMetrics.bytesRead` is updated +from a Hadoop FileSystem thread-local byte counter that only captures reads on the +`compute()` thread, so reads serviced by the pre-opened stream's internal buffer go uncounted. +The magnitude of the under-count scales with how much of the file is pre-buffered relative to +the file size: + +| File size | Spark 4.1 bytesRead vs actual | Comet / Spark ratio | +| ------------------------------ | ------------------------------------------------------ | ------------------- | +| Tiny (KB; whole file buffered) | Near 0 | 10-15× | +| Small (MB) | Significant under-count | 2-5× | +| Medium / Large (10+ MB) | Subsequent row-group reads cross buffer; mostly counted | Closer to 1× | + +Unit tests that write a few hundred rows to parquet hit the worst case and see the largest +discrepancy. Production workloads with realistic file sizes typically see the smallest +discrepancy. + +This is purely an observability difference: `inputMetrics.bytesRead` is reported to listeners +and the Spark UI but is not consumed by the planner, the optimizer, or AQE, so the discrepancy +does not affect query plans, partitioning, or correctness. Records read (`recordsRead`) is +unaffected and remains exactly equal between Comet and Spark. + +If you compare Comet's `bytesRead` against vanilla Spark's on Spark 4.1+ (via the Spark UI or +the REST API), expect Comet's number to be substantially larger for small files, and closer to +Spark's for large files. Comet's value reflects what the storage layer actually delivered. diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala index 06bbb50272..05bdfceb1d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala @@ -192,7 +192,6 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("native_datafusion scan reports task-level input metrics matching Spark") { - assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098") val totalRows = 10000 withTempPath { dir => spark @@ -230,15 +229,11 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { // (e.g. footer reads, page headers, buffering granularity). assert(sparkBytes > 0, s"Spark bytesRead should be > 0, got $sparkBytes") assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes") - val ratio = cometBytes.toDouble / sparkBytes.toDouble - assert( - ratio >= 0.7 && ratio <= 1.3, - s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") + assertCometBytesReadInRange(cometBytes, sparkBytes) } } test("input metrics aggregate across multiple native scans in a join") { - assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098") withTempPath { dir1 => withTempPath { dir2 => // Create two separate parquet tables @@ -283,16 +278,34 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords") // Both sides should contribute to the total bytes - val ratio = cometBytes.toDouble / sparkBytes.toDouble - assert( - ratio >= 0.7 && ratio <= 1.3, - s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") + assertCometBytesReadInRange(cometBytes, sparkBytes) } } } + /** + * Compare Comet's `bytesRead` against Spark's baseline. On Spark <= 4.0 the two readers report + * the same Hadoop-FS thread-local byte count, so we keep a tight 0.7-1.3 band. Spark 4.1 + * pre-opens the parquet `SeekableInputStream` outside the FileScanRDD `compute()` thread, so + * its `getFSBytesReadOnThreadCallback` no longer captures most of the parquet IO and + * `inputMetrics.bytesRead` is now a small fraction of the actual file IO. Comet (via + * DataFusion's `bytes_scanned`) still reports actual bytes, so the only safe cross-version + * invariant on 4.1+ is that Comet >= Spark and both are positive. + */ + private def assertCometBytesReadInRange(cometBytes: Long, sparkBytes: Long): Unit = { + if (isSpark41Plus) { + assert( + cometBytes >= sparkBytes, + s"Comet bytesRead should be >= Spark bytesRead on 4.1+: comet=$cometBytes, spark=$sparkBytes") + } else { + val ratio = cometBytes.toDouble / sparkBytes.toDouble + assert( + ratio >= 0.7 && ratio <= 1.3, + s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") + } + } + test("input metrics aggregate across multiple native scans in a union") { - assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098") withTempPath { dir1 => withTempPath { dir2 => spark @@ -335,10 +348,7 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { assert(sparkRecords > 0, s"Spark recordsRead should be > 0, got $sparkRecords") assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords") - val ratio = cometBytes.toDouble / sparkBytes.toDouble - assert( - ratio >= 0.7 && ratio <= 1.3, - s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") + assertCometBytesReadInRange(cometBytes, sparkBytes) } } } From 8a117a23ebf968500b4c5bf20a9f6336925b9c9c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 3 May 2026 13:22:26 -0600 Subject: [PATCH 2/2] docs: drop bytesRead ratio table --- docs/source/user-guide/latest/metrics.md | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/docs/source/user-guide/latest/metrics.md b/docs/source/user-guide/latest/metrics.md index 9cebe3effd..0579b30818 100644 --- a/docs/source/user-guide/latest/metrics.md +++ b/docs/source/user-guide/latest/metrics.md @@ -75,18 +75,9 @@ Spark 4.1 changed its own parquet reader to pre-open the `SeekableInputStream` a footer outside the `FileScanRDD.compute()` thread. Spark's `inputMetrics.bytesRead` is updated from a Hadoop FileSystem thread-local byte counter that only captures reads on the `compute()` thread, so reads serviced by the pre-opened stream's internal buffer go uncounted. -The magnitude of the under-count scales with how much of the file is pre-buffered relative to -the file size: - -| File size | Spark 4.1 bytesRead vs actual | Comet / Spark ratio | -| ------------------------------ | ------------------------------------------------------ | ------------------- | -| Tiny (KB; whole file buffered) | Near 0 | 10-15× | -| Small (MB) | Significant under-count | 2-5× | -| Medium / Large (10+ MB) | Subsequent row-group reads cross buffer; mostly counted | Closer to 1× | - -Unit tests that write a few hundred rows to parquet hit the worst case and see the largest -discrepancy. Production workloads with realistic file sizes typically see the smallest -discrepancy. +The under-count is largest when the file fits in the pre-fetched buffer (tiny files, unit test +sizes) and shrinks as files grow large enough that subsequent row-group reads cross the buffer +and trigger fresh FS reads on the `compute()` thread. This is purely an observability difference: `inputMetrics.bytesRead` is reported to listeners and the Spark UI but is not consumed by the planner, the optimizer, or AQE, so the discrepancy