Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/source/user-guide/latest/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,26 @@ Here is a guide to some of the native metrics.
| `ipc_time` | Time to encode batches in IPC format and compress using ZSTD. |
| `mempool_time` | Time interacting with memory pool. |
| `write_time` | Time spent writing bytes to disk. |

## Task-Level Input Metrics on Spark 4.1+

Comet's native scans set `inputMetrics.bytesRead` to the actual file IO performed by the
DataFusion parquet reader (`bytes_scanned`). This is the truthful number you would see at the
filesystem layer.

Spark 4.1 changed its own parquet reader to pre-open the `SeekableInputStream` and read the file
footer outside the `FileScanRDD.compute()` thread. Spark's `inputMetrics.bytesRead` is updated
from a Hadoop FileSystem thread-local byte counter that only captures reads on the
`compute()` thread, so reads serviced by the pre-opened stream's internal buffer go uncounted.
The under-count is largest when the file fits in the pre-fetched buffer (tiny files, unit test
sizes) and shrinks as files grow large enough that subsequent row-group reads cross the buffer
and trigger fresh FS reads on the `compute()` thread.

This is purely an observability difference: `inputMetrics.bytesRead` is reported to listeners
and the Spark UI but is not consumed by the planner, the optimizer, or AQE, so the discrepancy
does not affect query plans, partitioning, or correctness. Records read (`recordsRead`) is
unaffected and remains exactly equal between Comet and Spark.

If you compare Comet's `bytesRead` against vanilla Spark's on Spark 4.1+ (via the Spark UI or
the REST API), expect Comet's number to be substantially larger for small files, and closer to
Spark's for large files. Comet's value reflects what the storage layer actually delivered.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

test("native_datafusion scan reports task-level input metrics matching Spark") {
assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098")
val totalRows = 10000
withTempPath { dir =>
spark
Expand Down Expand Up @@ -230,15 +229,11 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// (e.g. footer reads, page headers, buffering granularity).
assert(sparkBytes > 0, s"Spark bytesRead should be > 0, got $sparkBytes")
assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes")
val ratio = cometBytes.toDouble / sparkBytes.toDouble
assert(
ratio >= 0.7 && ratio <= 1.3,
s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio")
assertCometBytesReadInRange(cometBytes, sparkBytes)
}
}

test("input metrics aggregate across multiple native scans in a join") {
assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098")
withTempPath { dir1 =>
withTempPath { dir2 =>
// Create two separate parquet tables
Expand Down Expand Up @@ -283,16 +278,34 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper {
assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords")

// Both sides should contribute to the total bytes
val ratio = cometBytes.toDouble / sparkBytes.toDouble
assert(
ratio >= 0.7 && ratio <= 1.3,
s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio")
assertCometBytesReadInRange(cometBytes, sparkBytes)
}
}
}

/**
* Compare Comet's `bytesRead` against Spark's baseline. On Spark <= 4.0 the two readers report
* the same Hadoop-FS thread-local byte count, so we keep a tight 0.7-1.3 band. Spark 4.1
* pre-opens the parquet `SeekableInputStream` outside the FileScanRDD `compute()` thread, so
* its `getFSBytesReadOnThreadCallback` no longer captures most of the parquet IO and
* `inputMetrics.bytesRead` is now a small fraction of the actual file IO. Comet (via
* DataFusion's `bytes_scanned`) still reports actual bytes, so the only safe cross-version
* invariant on 4.1+ is that Comet >= Spark and both are positive.
*/
private def assertCometBytesReadInRange(cometBytes: Long, sparkBytes: Long): Unit = {
if (isSpark41Plus) {
assert(
cometBytes >= sparkBytes,
s"Comet bytesRead should be >= Spark bytesRead on 4.1+: comet=$cometBytes, spark=$sparkBytes")
} else {
val ratio = cometBytes.toDouble / sparkBytes.toDouble
assert(
ratio >= 0.7 && ratio <= 1.3,
s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio")
}
}

test("input metrics aggregate across multiple native scans in a union") {
assume(!isSpark41Plus, "https://github.com/apache/datafusion-comet/issues/4098")
withTempPath { dir1 =>
withTempPath { dir2 =>
spark
Expand Down Expand Up @@ -335,10 +348,7 @@ class CometTaskMetricsSuite extends CometTestBase with AdaptiveSparkPlanHelper {
assert(sparkRecords > 0, s"Spark recordsRead should be > 0, got $sparkRecords")
assert(cometRecords > 0, s"Comet recordsRead should be > 0, got $cometRecords")

val ratio = cometBytes.toDouble / sparkBytes.toDouble
assert(
ratio >= 0.7 && ratio <= 1.3,
s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes, ratio=$ratio")
assertCometBytesReadInRange(cometBytes, sparkBytes)
}
}
}
Expand Down
Loading