-
Notifications
You must be signed in to change notification settings - Fork 307
docs: Update Parquet scan documentation #3433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
8f3d2de
87cd794
e394f2a
a3014ce
fbe2f33
c25a7cd
69a4e0b
1baabfe
32334bd
15c3049
2789c36
0266613
ba192a1
a696cf1
df37c22
7702b24
955699b
5616fe1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,71 +19,59 @@ under the License. | |||||||||||||||||
|
|
||||||||||||||||||
| # Comet Parquet Scan Implementations | ||||||||||||||||||
|
|
||||||||||||||||||
| Comet currently has three distinct implementations of the Parquet scan operator. The configuration property | ||||||||||||||||||
| `spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, and | ||||||||||||||||||
| Comet will choose the most appropriate implementation based on the Parquet schema and other Comet configuration | ||||||||||||||||||
| settings. Most users should not need to change this setting. However, it is possible to force Comet to try and use | ||||||||||||||||||
| a particular implementation for all scan operations by setting this configuration property to one of the following | ||||||||||||||||||
| implementations. | ||||||||||||||||||
|
|
||||||||||||||||||
| | Implementation | Description | | ||||||||||||||||||
| | ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | ||||||||||||||||||
| | `native_comet` | **Deprecated.** This implementation provides strong compatibility with Spark but does not support complex types. This is the original scan implementation in Comet and will be removed in a future release. | | ||||||||||||||||||
| | `native_iceberg_compat` | This implementation delegates to DataFusion's `DataSourceExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. | | ||||||||||||||||||
| | `native_datafusion` | This experimental implementation delegates to DataFusion's `DataSourceExec` for full native execution. There are known compatibility issues when using this scan. | | ||||||||||||||||||
|
|
||||||||||||||||||
| The `native_datafusion` and `native_iceberg_compat` scans provide the following benefits over the `native_comet` | ||||||||||||||||||
| implementation: | ||||||||||||||||||
|
|
||||||||||||||||||
| - Leverages the DataFusion community's ongoing improvements to `DataSourceExec` | ||||||||||||||||||
| - Provides support for reading complex types (structs, arrays, and maps) | ||||||||||||||||||
| - Delegates Parquet decoding to native Rust code rather than JVM-side decoding | ||||||||||||||||||
| - Improves performance | ||||||||||||||||||
|
|
||||||||||||||||||
| > **Note on mutable buffers:** Both `native_comet` and `native_iceberg_compat` use reusable mutable buffers | ||||||||||||||||||
| > when transferring data from JVM to native code via Arrow FFI. The `native_iceberg_compat` implementation uses DataFusion's native Parquet reader for data columns, bypassing Comet's mutable buffer infrastructure entirely. However, partition columns still use `ConstantColumnReader`, which relies on Comet's mutable buffers that are reused across batches. This means native operators that buffer data (such as `SortExec` or `ShuffleWriterExec`) must perform deep copies to avoid data corruption. | ||||||||||||||||||
| > See the [FFI documentation](ffi.md) for details on the `arrow_ffi_safe` flag and ownership semantics. | ||||||||||||||||||
|
|
||||||||||||||||||
| The `native_datafusion` and `native_iceberg_compat` scans share the following limitations: | ||||||||||||||||||
|
|
||||||||||||||||||
| - When reading Parquet files written by systems other than Spark that contain columns with the logical type `UINT_8` | ||||||||||||||||||
| (unsigned 8-bit integers), Comet may produce different results than Spark. Spark maps `UINT_8` to `ShortType`, but | ||||||||||||||||||
| Comet's Arrow-based readers respect the unsigned type and read the data as unsigned rather than signed. Since Comet | ||||||||||||||||||
| cannot distinguish `ShortType` columns that came from `UINT_8` versus signed `INT16`, by default Comet falls back to | ||||||||||||||||||
| Spark when scanning Parquet files containing `ShortType` columns. This behavior can be disabled by setting | ||||||||||||||||||
| `spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Note that `ByteType` columns are always safe because they can | ||||||||||||||||||
| only come from signed `INT8`, where truncation preserves the signed value. | ||||||||||||||||||
| - No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. | ||||||||||||||||||
| - No support for datetime rebasing detection or the `spark.comet.exceptionOnDatetimeRebase` configuration. When reading | ||||||||||||||||||
| Parquet files containing dates or timestamps written before Spark 3.0 (which used a hybrid Julian/Gregorian calendar), | ||||||||||||||||||
| the `native_comet` implementation can detect these legacy values and either throw an exception or read them without | ||||||||||||||||||
| rebasing. The DataFusion-based implementations do not have this detection capability and will read all dates/timestamps | ||||||||||||||||||
| as if they were written using the Proleptic Gregorian calendar. This may produce incorrect results for dates before | ||||||||||||||||||
| October 15, 1582. | ||||||||||||||||||
| - No support for Spark's Datasource V2 API. When `spark.sql.sources.useV1SourceList` does not include `parquet`, | ||||||||||||||||||
| Spark uses the V2 API for Parquet scans. The DataFusion-based implementations only support the V1 API, so Comet | ||||||||||||||||||
| will fall back to `native_comet` when V2 is enabled. | ||||||||||||||||||
|
|
||||||||||||||||||
| The `native_datafusion` scan has some additional limitations: | ||||||||||||||||||
| Comet currently has two distinct implementations of the Parquet scan operator. | ||||||||||||||||||
|
|
||||||||||||||||||
| The two implementations are `native_datafusion` and `native_iceberg_compat`. They both delegate to DataFusion's | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
| `DataSourceExec`. The main difference between these implementations is that `native_datafusion` runs fully natively, and | ||||||||||||||||||
| `native_iceberg_compat` is a hybrid JVM/Rust implementation that can support some Spark features that | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sentence is hard to follow with the subject switching back and forth, making it unclear what "but has some performance overhead due to crossing the JVM/Rust boundary." is actually referring to. Suggest breaking it up. |
||||||||||||||||||
| `native_datafusion` can not, but has some performance overhead due to crossing the JVM/Rust boundary. | ||||||||||||||||||
|
|
||||||||||||||||||
| The configuration property | ||||||||||||||||||
| `spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, which | ||||||||||||||||||
| currently always uses the `native_iceberg_compat` implementation. Most users should not need to change this setting. | ||||||||||||||||||
| However, it is possible to force Comet to try and use a particular implementation for all scan operations by setting | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "to try and" is not doing anything here. Suggest removing. |
||||||||||||||||||
| this configuration property to one of the following implementations. | ||||||||||||||||||
|
andygrove marked this conversation as resolved.
Outdated
|
||||||||||||||||||
|
|
||||||||||||||||||
| The following unsupported features are shared by both scans and cause Comet to fall back to Spark: | ||||||||||||||||||
|
|
||||||||||||||||||
| - `ShortType` columns, by default. When reading Parquet files written by systems other than Spark that contain | ||||||||||||||||||
| columns with the logical type `UINT_8` (unsigned 8-bit integers), Comet may produce different results than Spark. | ||||||||||||||||||
| Spark maps `UINT_8` to `ShortType`, but Comet's Arrow-based readers respect the unsigned type and read the data as | ||||||||||||||||||
| unsigned rather than signed. Since Comet cannot distinguish `ShortType` columns that came from `UINT_8` versus | ||||||||||||||||||
| signed `INT16`, by default Comet falls back to Spark when scanning Parquet files containing `ShortType` columns. | ||||||||||||||||||
| This behavior can be disabled by setting `spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Note that `ByteType` | ||||||||||||||||||
| columns are always safe because they can only come from signed `INT8`, where truncation preserves the signed value. | ||||||||||||||||||
| - Default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported. | ||||||||||||||||||
| - Spark's Datasource V2 API. When `spark.sql.sources.useV1SourceList` does not include `parquet`, Spark uses the | ||||||||||||||||||
| V2 API for Parquet scans. The DataFusion-based implementations only support the V1 API. | ||||||||||||||||||
| - Spark metadata columns (e.g., `_metadata.file_path`) | ||||||||||||||||||
| - No support for Dynamic Partition Pruning (DPP) | ||||||||||||||||||
|
|
||||||||||||||||||
| The following shared limitation may produce incorrect results without falling back to Spark: | ||||||||||||||||||
|
|
||||||||||||||||||
| - No support for datetime rebasing detection or the `spark.comet.exceptionOnDatetimeRebase` configuration. When | ||||||||||||||||||
| reading Parquet files containing dates or timestamps written before Spark 3.0 (which used a hybrid | ||||||||||||||||||
| Julian/Gregorian calendar), dates/timestamps will be read as if they were written using the Proleptic Gregorian | ||||||||||||||||||
| calendar. This may produce incorrect results for dates before October 15, 1582. | ||||||||||||||||||
|
|
||||||||||||||||||
| The `native_datafusion` scan has some additional limitations. All of these cause Comet to fall back to Spark. | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mostly related to Parquet metadata columns. |
||||||||||||||||||
|
|
||||||||||||||||||
| - No support for row indexes | ||||||||||||||||||
| - `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758] | ||||||||||||||||||
| - There are failures in the Spark SQL test suite [#1545] | ||||||||||||||||||
| - Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with Spark | ||||||||||||||||||
| - No support for reading Parquet field IDs | ||||||||||||||||||
| - No support for `input_file_name()`, `input_file_block_start()`, or `input_file_block_length()` SQL functions. | ||||||||||||||||||
| The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values. | ||||||||||||||||||
| - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true` | ||||||||||||||||||
|
|
||||||||||||||||||
| ## S3 Support | ||||||||||||||||||
|
|
||||||||||||||||||
| There are some differences in S3 support between the scan implementations. | ||||||||||||||||||
|
|
||||||||||||||||||
| ### `native_comet` (Deprecated) | ||||||||||||||||||
| The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results | ||||||||||||||||||
| without falling back to Spark: | ||||||||||||||||||
|
|
||||||||||||||||||
| > **Note:** The `native_comet` scan implementation is deprecated and will be removed in a future release. | ||||||||||||||||||
| - Some Spark configuration values are hard-coded to their defaults rather than respecting user-specified values. | ||||||||||||||||||
| This may produce incorrect results when non-default values are set. The affected configurations are | ||||||||||||||||||
| `spark.sql.parquet.binaryAsString`, `spark.sql.parquet.int96AsTimestamp`, `spark.sql.caseSensitive`, | ||||||||||||||||||
| `spark.sql.parquet.inferTimestampNTZ.enabled`, and `spark.sql.legacy.parquet.nanosAsLong`. See | ||||||||||||||||||
| [issue #1816](https://github.com/apache/datafusion-comet/issues/1816) for more details. | ||||||||||||||||||
|
|
||||||||||||||||||
| The `native_comet` Parquet scan implementation reads data from S3 using the [Hadoop-AWS module](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html), which | ||||||||||||||||||
| is identical to the approach commonly used with vanilla Spark. AWS credential configuration and other Hadoop S3A | ||||||||||||||||||
| configurations works the same way as in vanilla Spark. | ||||||||||||||||||
|
|
||||||||||||||||||
| ### `native_datafusion` and `native_iceberg_compat` | ||||||||||||||||||
| ## S3 Support | ||||||||||||||||||
|
|
||||||||||||||||||
| The `native_datafusion` and `native_iceberg_compat` Parquet scan implementations completely offload data loading | ||||||||||||||||||
| to native code. They use the [`object_store` crate](https://crates.io/crates/object_store) to read data from S3 and | ||||||||||||||||||
|
|
@@ -95,7 +83,8 @@ continue to work as long as the configurations are supported and can be translat | |||||||||||||||||
|
|
||||||||||||||||||
| #### Additional S3 Configuration Options | ||||||||||||||||||
|
|
||||||||||||||||||
| Beyond credential providers, the `native_datafusion` implementation supports additional S3 configuration options: | ||||||||||||||||||
| Beyond credential providers, the `native_datafusion` and `native_iceberg_compat` implementations support additional | ||||||||||||||||||
| S3 configuration options: | ||||||||||||||||||
|
|
||||||||||||||||||
| | Option | Description | | ||||||||||||||||||
| | ------------------------------- | -------------------------------------------------------------------------------------------------- | | ||||||||||||||||||
|
|
@@ -108,7 +97,8 @@ All configuration options support bucket-specific overrides using the pattern `f | |||||||||||||||||
|
|
||||||||||||||||||
| #### Examples | ||||||||||||||||||
|
|
||||||||||||||||||
| The following examples demonstrate how to configure S3 access with the `native_datafusion` Parquet scan implementation using different authentication methods. | ||||||||||||||||||
| The following examples demonstrate how to configure S3 access with the `native_datafusion` and `native_iceberg_compat` | ||||||||||||||||||
| Parquet scan implementations using different authentication methods. | ||||||||||||||||||
|
|
||||||||||||||||||
| **Example 1: Simple Credentials** | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -140,11 +130,8 @@ $SPARK_HOME/bin/spark-shell \ | |||||||||||||||||
|
|
||||||||||||||||||
| #### Limitations | ||||||||||||||||||
|
|
||||||||||||||||||
| The S3 support of `native_datafusion` has the following limitations: | ||||||||||||||||||
| The S3 support of `native_datafusion` and `native_iceberg_compat` has the following limitations: | ||||||||||||||||||
|
|
||||||||||||||||||
| 1. **Partial Hadoop S3A configuration support**: Not all Hadoop S3A configurations are currently supported. Only the configurations listed in the tables above are translated and applied to the underlying `object_store` crate. | ||||||||||||||||||
|
|
||||||||||||||||||
| 2. **Custom credential providers**: Custom implementations of AWS credential providers are not supported. The implementation only supports the standard credential providers listed in the table above. We are planning to add support for custom credential providers through a JNI-based adapter that will allow calling Java credential providers from native code. See [issue #1829](https://github.com/apache/datafusion-comet/issues/1829) for more details. | ||||||||||||||||||
|
|
||||||||||||||||||
| [#1545]: https://github.com/apache/datafusion-comet/issues/1545 | ||||||||||||||||||
| [#1758]: https://github.com/apache/datafusion-comet/issues/1758 | ||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg Java is a bit ambiguous since the native reader still needs Iceberg Java for planning. We could link to the hybrid reader here:
https://datafusion.apache.org/comet/user-guide/latest/iceberg.html#hybrid-reader
I am interested in standardizing terminology on referring to this codepath as a legacy path.