diff --git a/docs/source/user-guide/latest/metrics.md b/docs/source/user-guide/latest/metrics.md index 509d0ae8c0..0579b30818 100644 --- a/docs/source/user-guide/latest/metrics.md +++ b/docs/source/user-guide/latest/metrics.md @@ -64,3 +64,26 @@ Here is a guide to some of the native metrics. | `ipc_time` | Time to encode batches in IPC format and compress using ZSTD. | | `mempool_time` | Time interacting with memory pool. | | `write_time` | Time spent writing bytes to disk. | + +## Task-Level Input Metrics on Spark 4.1+ + +Comet's native scans set `inputMetrics.bytesRead` to the actual file IO performed by the +DataFusion parquet reader (`bytes_scanned`). This is the truthful number you would see at the +filesystem layer. + +Spark 4.1 changed its own parquet reader to pre-open the `SeekableInputStream` and read the file +footer outside the `FileScanRDD.compute()` thread. Spark's `inputMetrics.bytesRead` is updated +from a Hadoop FileSystem thread-local byte counter that only captures reads on the +`compute()` thread, so reads serviced by the pre-opened stream's internal buffer go uncounted. +The under-count is largest when the file fits in the pre-fetched buffer (tiny files, unit test +sizes) and shrinks as files grow large enough that subsequent row-group reads cross the buffer +and trigger fresh FS reads on the `compute()` thread. + +This is purely an observability difference: `inputMetrics.bytesRead` is reported to listeners +and the Spark UI but is not consumed by the planner, the optimizer, or AQE, so the discrepancy +does not affect query plans, partitioning, or correctness. Records read (`recordsRead`) is +unaffected and remains exactly equal between Comet and Spark. + +If you compare Comet's `bytesRead` against vanilla Spark's on Spark 4.1+ (via the Spark UI or +the REST API), expect Comet's number to be substantially larger for small files, and closer to +Spark's for large files. Comet's value reflects what the storage layer actually delivered. diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala index 06bbb50272..05bdfceb1d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala @@ -192,7 +192,6 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("native_datafusion scan reports task-level input metrics matching Spark") { - assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098") val totalRows = 10000 withTempPath { dir => spark @@ -230,15 +229,11 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { // (e.g. footer reads, page headers, buffering granularity). assert(sparkBytes > 0, s"Spark bytesRead should be > 0, got $sparkBytes") assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes") - val ratio = cometBytes.toDouble / sparkBytes.toDouble - assert( - ratio >= 0.7 && ratio <= 1.3, - s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") + assertCometBytesReadInRange(cometBytes, sparkBytes) } } test("input metrics aggregate across multiple native scans in a join") { - assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098") withTempPath { dir1 => withTempPath { dir2 => // Create two separate parquet tables @@ -283,16 +278,34 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords") // Both sides should contribute to the total bytes - val ratio = cometBytes.toDouble / sparkBytes.toDouble - assert( - ratio >= 0.7 && ratio <= 1.3, - s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") + assertCometBytesReadInRange(cometBytes, sparkBytes) } } } + /** + * Compare Comet's `bytesRead` against Spark's baseline. On Spark <= 4.0 the two readers report + * the same Hadoop-FS thread-local byte count, so we keep a tight 0.7-1.3 band. Spark 4.1 + * pre-opens the parquet `SeekableInputStream` outside the FileScanRDD `compute()` thread, so + * its `getFSBytesReadOnThreadCallback` no longer captures most of the parquet IO and + * `inputMetrics.bytesRead` is now a small fraction of the actual file IO. Comet (via + * DataFusion's `bytes_scanned`) still reports actual bytes, so the only safe cross-version + * invariant on 4.1+ is that Comet >= Spark and both are positive. + */ + private def assertCometBytesReadInRange(cometBytes: Long, sparkBytes: Long): Unit = { + if (isSpark41Plus) { + assert( + cometBytes >= sparkBytes, + s"Comet bytesRead should be >= Spark bytesRead on 4.1+: comet=$cometBytes, spark=$sparkBytes") + } else { + val ratio = cometBytes.toDouble / sparkBytes.toDouble + assert( + ratio >= 0.7 && ratio <= 1.3, + s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") + } + } + test("input metrics aggregate across multiple native scans in a union") { - assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098") withTempPath { dir1 => withTempPath { dir2 => spark @@ -335,10 +348,7 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper { assert(sparkRecords > 0, s"Spark recordsRead should be > 0, got $sparkRecords") assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords") - val ratio = cometBytes.toDouble / sparkBytes.toDouble - assert( - ratio >= 0.7 && ratio <= 1.3, - s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio") + assertCometBytesReadInRange(cometBytes, sparkBytes) } } }