Skip to content

Commit cc3e133

Browse files
committed
DataFusion 52 migration
1 parent ea5901a commit cc3e133

5 files changed

Lines changed: 59 additions & 254 deletions

File tree

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::execution::operators::ExecutionError;
4343
use crate::parquet::parquet_support::SparkParquetOptions;
4444
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
4545
use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper};
46+
use crate::parquet::schema_adapter::SparkSchemaMapperFactory;
4647
use datafusion_comet_spark_expr::EvalMode;
4748

4849
/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.

native/core/src/execution/planner.rs

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -975,42 +975,6 @@ impl PhysicalPlanner {
975975
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema)))
976976
.collect();
977977

978-
let default_values: Option<HashMap<usize, ScalarValue>> = if !scan
979-
.default_values
980-
.is_empty()
981-
{
982-
// We have default values. Extract the two lists (same length) of values and
983-
// indexes in the schema, and then create a HashMap to use in the SchemaMapper.
984-
let default_values: Result<Vec<ScalarValue>, DataFusionError> = scan
985-
.default_values
986-
.iter()
987-
.map(|expr| {
988-
let literal = self.create_expr(expr, Arc::clone(&required_schema))?;
989-
let df_literal = literal
990-
.as_any()
991-
.downcast_ref::<DataFusionLiteral>()
992-
.ok_or_else(|| {
993-
GeneralError("Expected literal of default value.".to_string())
994-
})?;
995-
Ok(df_literal.value().clone())
996-
})
997-
.collect();
998-
let default_values = default_values?;
999-
let default_values_indexes: Vec<usize> = scan
1000-
.default_values_indexes
1001-
.iter()
1002-
.map(|offset| *offset as usize)
1003-
.collect();
1004-
Some(
1005-
default_values_indexes
1006-
.into_iter()
1007-
.zip(default_values)
1008-
.collect(),
1009-
)
1010-
} else {
1011-
None
1012-
};
1013-
1014978
// Get one file from this partition (we know it's not empty due to early return above)
1015979
let one_file = partition_files
1016980
.partitioned_file
@@ -1034,23 +998,15 @@ impl PhysicalPlanner {
1034998
let files =
1035999
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
10361000
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
1037-
let partition_fields: Vec<Field> = partition_schema
1038-
.fields()
1039-
.iter()
1040-
.map(|field| {
1041-
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1042-
})
1043-
.collect_vec();
1001+
10441002
let scan = init_datasource_exec(
10451003
required_schema,
10461004
Some(data_schema),
10471005
Some(partition_schema),
1048-
Some(partition_fields),
10491006
object_store_url,
10501007
file_groups,
10511008
Some(projection_vector),
10521009
Some(data_filters?),
1053-
default_values,
10541010
scan.session_timezone.as_str(),
10551011
scan.case_sensitive,
10561012
self.session_ctx(),
@@ -3437,8 +3393,6 @@ mod tests {
34373393

34383394
use crate::execution::operators::ExecutionError;
34393395
use crate::execution::planner::literal_to_array_ref;
3440-
use crate::parquet::parquet_support::SparkParquetOptions;
3441-
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
34423396
use datafusion_comet_proto::spark_expression::expr::ExprStruct;
34433397
use datafusion_comet_proto::spark_expression::ListLiteral;
34443398
use datafusion_comet_proto::{
@@ -3448,7 +3402,6 @@ mod tests {
34483402
spark_operator,
34493403
spark_operator::{operator::OpStruct, Operator},
34503404
};
3451-
use datafusion_comet_spark_expr::EvalMode;
34523405

34533406
#[test]
34543407
fn test_unpack_dictionary_primitive() {

native/core/src/parquet/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -765,12 +765,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
765765
required_schema,
766766
Some(data_schema),
767767
None,
768-
None,
769768
object_store_url,
770769
file_groups,
771770
None,
772771
data_filters,
773-
None,
774772
session_timezone.as_str(),
775773
case_sensitive != JNI_FALSE,
776774
session_ctx,

native/core/src/parquet/parquet_exec.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use crate::execution::operators::ExecutionError;
1919
use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID};
2020
use crate::parquet::parquet_support::SparkParquetOptions;
21-
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
2221
use arrow::datatypes::{Field, SchemaRef};
2322
use datafusion::config::TableParquetOptions;
2423
use datafusion::datasource::listing::PartitionedFile;
@@ -30,11 +29,8 @@ use datafusion::execution::object_store::ObjectStoreUrl;
3029
use datafusion::physical_expr::expressions::BinaryExpr;
3130
use datafusion::physical_expr::PhysicalExpr;
3231
use datafusion::prelude::SessionContext;
33-
use datafusion::scalar::ScalarValue;
3432
use datafusion_comet_spark_expr::EvalMode;
3533
use datafusion_datasource::TableSchema;
36-
use itertools::Itertools;
37-
use std::collections::HashMap;
3834
use std::sync::Arc;
3935

4036
/// Initializes a DataSourceExec plan with a ParquetSource. This may be used by either the
@@ -61,18 +57,16 @@ pub(crate) fn init_datasource_exec(
6157
required_schema: SchemaRef,
6258
data_schema: Option<SchemaRef>,
6359
partition_schema: Option<SchemaRef>,
64-
partition_fields: Option<Vec<Field>>,
6560
object_store_url: ObjectStoreUrl,
6661
file_groups: Vec<Vec<PartitionedFile>>,
6762
projection_vector: Option<Vec<usize>>,
6863
data_filters: Option<Vec<Arc<dyn PhysicalExpr>>>,
69-
default_values: Option<HashMap<usize, ScalarValue>>,
7064
session_timezone: &str,
7165
case_sensitive: bool,
7266
session_ctx: &Arc<SessionContext>,
7367
encryption_enabled: bool,
7468
) -> Result<Arc<DataSourceExec>, ExecutionError> {
75-
let (table_parquet_options, spark_parquet_options) = get_options(
69+
let (table_parquet_options, _) = get_options(
7670
session_timezone,
7771
case_sensitive,
7872
&object_store_url,

0 commit comments

Comments
 (0)