Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ parquet = { version = "57.0.0", default-features = false, features = ["experimen
datafusion = { version = "51.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "51.0.0" }
datafusion-spark = { version = "51.0.0" }
datafusion-physical-expr-adapter = { version = "51.0.0" }
datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-proto = { path = "proto" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand Down
1 change: 1 addition & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ paste = "1.0.14"
datafusion = { workspace = true, features = ["parquet_encryption", "sql"] }
datafusion-datasource = { workspace = true }
datafusion-spark = { workspace = true }
datafusion-physical-expr-adapter = { workspace = true }
once_cell = "1.18.0"
regex = { workspace = true }
crc32fast = "1.3.2"
Expand Down
30 changes: 11 additions & 19 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ use iceberg::io::FileIO;

use crate::execution::operators::ExecutionError;
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use datafusion::datasource::schema_adapter::SchemaAdapterFactory;
use crate::parquet::schema_adapter::adapt_batch_with_expressions;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_datasource::file_stream::FileStreamMetrics;

Expand Down Expand Up @@ -294,27 +293,20 @@ impl IcebergFileStream {
let target_schema = Arc::clone(&schema);

// Schema adaptation handles differences in Arrow field names and metadata
// between the file schema and expected output schema
// between the file schema and expected output schema using expression-based
// adaptation (PhysicalExprAdapter approach)
let mapped_stream = stream
.map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e)))
.and_then(move |batch| {
let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
let adapter_factory = SparkSchemaAdapterFactory::new(spark_options, None);
let file_schema = batch.schema();
let adapter = adapter_factory
.create(Arc::clone(&target_schema), Arc::clone(&file_schema));

let result = match adapter.map_schema(file_schema.as_ref()) {
Ok((schema_mapper, _projection)) => {
schema_mapper.map_batch(batch).map_err(|e| {
DataFusionError::Execution(format!("Batch mapping failed: {}", e))
})
}
Err(e) => Err(DataFusionError::Execution(format!(
"Schema mapping failed: {}",
e
))),
};
let result = adapt_batch_with_expressions(
batch,
&target_schema,
&spark_options,
)
.map_err(|e| {
DataFusionError::Execution(format!("Batch adaptation failed: {}", e))
});
futures::future::ready(result)
});

Expand Down
18 changes: 14 additions & 4 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::execution::operators::ExecutionError;
use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID};
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
use arrow::datatypes::{Field, SchemaRef};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
Expand All @@ -32,6 +32,7 @@ use datafusion::physical_expr::PhysicalExpr;
use datafusion::prelude::SessionContext;
use datafusion::scalar::ScalarValue;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use itertools::Itertools;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -104,9 +105,14 @@ pub(crate) fn init_datasource_exec(
);
}

let file_source = parquet_source.with_schema_adapter_factory(Arc::new(
SparkSchemaAdapterFactory::new(spark_parquet_options, default_values),
))?;
// Create the expression adapter factory for Spark-compatible schema adaptation
let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> =
Arc::new(SparkPhysicalExprAdapterFactory::new(
spark_parquet_options,
default_values,
));

let file_source: Arc<dyn FileSource> = Arc::new(parquet_source);

let file_groups = file_groups
.iter()
Expand All @@ -124,6 +130,7 @@ pub(crate) fn init_datasource_exec(
)
.with_projection_indices(Some(projection_vector))
.with_table_partition_cols(partition_fields)
.with_expr_adapter(Some(expr_adapter_factory))
.build()
}
_ => get_file_config_builder(
Expand All @@ -133,8 +140,11 @@ pub(crate) fn init_datasource_exec(
object_store_url,
file_source,
)
.with_expr_adapter(Some(expr_adapter_factory))
.build(),
};
// Note: expr_adapter_factory is only used in one branch due to the match,
// so no clone is needed

Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
}
Expand Down
Loading
Loading