Skip to content

Commit 2a9e2bc

Browse files
committed
DataFusion 52 migration
1 parent cc3e133 commit 2a9e2bc

5 files changed

Lines changed: 214 additions & 107 deletions

File tree

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ use iceberg::io::FileIO;
4141

4242
use crate::execution::operators::ExecutionError;
4343
use crate::parquet::parquet_support::SparkParquetOptions;
44-
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
4544
use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper};
4645
use crate::parquet::schema_adapter::SparkSchemaMapperFactory;
4746
use datafusion_comet_spark_expr::EvalMode;
47+
use datafusion_datasource::file_stream::FileStreamMetrics;
48+
use crate::parquet::schema_adapter::adapt_batch_with_expressions;
4849

4950
/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.
5051
///

native/core/src/execution/planner.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -975,6 +975,42 @@ impl PhysicalPlanner {
975975
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema)))
976976
.collect();
977977

978+
let default_values: Option<HashMap<usize, ScalarValue>> = if !scan
979+
.default_values
980+
.is_empty()
981+
{
982+
// We have default values. Extract the two lists (same length) of values and
983+
// indexes in the schema, and then create a HashMap to use in the SchemaMapper.
984+
let default_values: Result<Vec<ScalarValue>, DataFusionError> = scan
985+
.default_values
986+
.iter()
987+
.map(|expr| {
988+
let literal = self.create_expr(expr, Arc::clone(&required_schema))?;
989+
let df_literal = literal
990+
.as_any()
991+
.downcast_ref::<DataFusionLiteral>()
992+
.ok_or_else(|| {
993+
GeneralError("Expected literal of default value.".to_string())
994+
})?;
995+
Ok(df_literal.value().clone())
996+
})
997+
.collect();
998+
let default_values = default_values?;
999+
let default_values_indexes: Vec<usize> = scan
1000+
.default_values_indexes
1001+
.iter()
1002+
.map(|offset| *offset as usize)
1003+
.collect();
1004+
Some(
1005+
default_values_indexes
1006+
.into_iter()
1007+
.zip(default_values)
1008+
.collect(),
1009+
)
1010+
} else {
1011+
None
1012+
};
1013+
9781014
// Get one file from this partition (we know it's not empty due to early return above)
9791015
let one_file = partition_files
9801016
.partitioned_file
@@ -1007,6 +1043,7 @@ impl PhysicalPlanner {
10071043
file_groups,
10081044
Some(projection_vector),
10091045
Some(data_filters?),
1046+
default_values,
10101047
scan.session_timezone.as_str(),
10111048
scan.case_sensitive,
10121049
self.session_ctx(),

native/core/src/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
769769
file_groups,
770770
None,
771771
data_filters,
772+
None,
772773
session_timezone.as_str(),
773774
case_sensitive != JNI_FALSE,
774775
session_ctx,

native/core/src/parquet/parquet_exec.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashMap;
1819
use crate::execution::operators::ExecutionError;
1920
use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID};
2021
use crate::parquet::parquet_support::SparkParquetOptions;
@@ -32,6 +33,9 @@ use datafusion::prelude::SessionContext;
3233
use datafusion_comet_spark_expr::EvalMode;
3334
use datafusion_datasource::TableSchema;
3435
use std::sync::Arc;
36+
use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory;
37+
use datafusion::scalar::ScalarValue;
38+
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
3539

3640
/// Initializes a DataSourceExec plan with a ParquetSource. This may be used by either the
3741
/// `native_datafusion` scan or the `native_iceberg_compat` scan.
@@ -61,12 +65,13 @@ pub(crate) fn init_datasource_exec(
6165
file_groups: Vec<Vec<PartitionedFile>>,
6266
projection_vector: Option<Vec<usize>>,
6367
data_filters: Option<Vec<Arc<dyn PhysicalExpr>>>,
68+
default_values: Option<HashMap<usize, ScalarValue>>,
6469
session_timezone: &str,
6570
case_sensitive: bool,
6671
session_ctx: &Arc<SessionContext>,
6772
encryption_enabled: bool,
6873
) -> Result<Arc<DataSourceExec>, ExecutionError> {
69-
let (table_parquet_options, _) = get_options(
74+
let (table_parquet_options, spark_parquet_options) = get_options(
7075
session_timezone,
7176
case_sensitive,
7277
&object_store_url,
@@ -118,7 +123,11 @@ pub(crate) fn init_datasource_exec(
118123
);
119124
}
120125

121-
let file_source = Arc::new(parquet_source) as Arc<dyn FileSource>;
126+
let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
127+
SparkPhysicalExprAdapterFactory::new(spark_parquet_options, default_values),
128+
);
129+
130+
let file_source: Arc<dyn FileSource> = Arc::new(parquet_source);
122131

123132
let file_groups = file_groups
124133
.iter()
@@ -133,7 +142,7 @@ pub(crate) fn init_datasource_exec(
133142
file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
134143
}
135144

136-
let file_scan_config = file_scan_config_builder.build();
145+
let file_scan_config = file_scan_config_builder.with_expr_adapter(Some(expr_adapter_factory)).build();
137146

138147
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
139148
}

0 commit comments

Comments
 (0)