Skip to content

Commit ea5901a

Browse files
committed
DataFusion 52 migration
1 parent 30c57ce commit ea5901a

5 files changed

Lines changed: 25 additions & 32 deletions

File tree

native/core/src/execution/planner.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4042,14 +4042,13 @@ mod tests {
40424042

40434043
let source = Arc::new(
40444044
ParquetSource::new(Arc::new(read_schema.clone()))
4045-
.with_table_parquet_options(TableParquetOptions::new())
4045+
.with_table_parquet_options(TableParquetOptions::new()),
40464046
) as Arc<dyn FileSource>;
40474047

40484048
let object_store_url = ObjectStoreUrl::local_filesystem();
4049-
let file_scan_config =
4050-
FileScanConfigBuilder::new(object_store_url, source)
4051-
.with_file_groups(file_groups)
4052-
.build();
4049+
let file_scan_config = FileScanConfigBuilder::new(object_store_url, source)
4050+
.with_file_groups(file_groups)
4051+
.build();
40534052

40544053
// Run native read
40554054
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));

native/core/src/parquet/parquet_exec.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ pub(crate) fn init_datasource_exec(
8585
let partition_fields: Vec<_> = partition_schema
8686
.fields()
8787
.iter()
88-
.map(|f| Arc::new(Field::new(f.name(), f.data_type().clone(), f.is_nullable())) as _)
88+
.map(|f| {
89+
Arc::new(Field::new(f.name(), f.data_type().clone(), f.is_nullable())) as _
90+
})
8991
.collect();
9092
TableSchema::new(Arc::clone(data_schema), partition_fields)
9193
} else {
@@ -95,8 +97,8 @@ pub(crate) fn init_datasource_exec(
9597
TableSchema::from_file_schema(Arc::clone(&required_schema))
9698
};
9799

98-
let mut parquet_source = ParquetSource::new(table_schema)
99-
.with_table_parquet_options(table_parquet_options);
100+
let mut parquet_source =
101+
ParquetSource::new(table_schema).with_table_parquet_options(table_parquet_options);
100102

101103
// Create a conjunctive form of the vector because ParquetExecBuilder takes
102104
// a single expression
@@ -129,13 +131,14 @@ pub(crate) fn init_datasource_exec(
129131
.map(|files| FileGroup::new(files.clone()))
130132
.collect();
131133

132-
let mut file_scan_config_builder = FileScanConfigBuilder::new(object_store_url, file_source)
133-
.with_file_groups(file_groups);
134-
134+
let mut file_scan_config_builder =
135+
FileScanConfigBuilder::new(object_store_url, file_source).with_file_groups(file_groups);
136+
135137
if let Some(projection_vector) = projection_vector {
136-
file_scan_config_builder = file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
138+
file_scan_config_builder =
139+
file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
137140
}
138-
141+
139142
let file_scan_config = file_scan_config_builder.build();
140143

141144
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))

native/core/src/parquet/schema_adapter.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -346,14 +346,13 @@ mod test {
346346

347347
let parquet_source = Arc::new(
348348
ParquetSource::new(Arc::clone(&required_schema))
349-
.with_table_parquet_options(TableParquetOptions::new())
349+
.with_table_parquet_options(TableParquetOptions::new()),
350350
) as Arc<dyn FileSource>;
351351

352352
let files = FileGroup::new(vec![PartitionedFile::from_path(filename.to_string())?]);
353-
let file_scan_config =
354-
FileScanConfigBuilder::new(object_store_url, parquet_source)
355-
.with_file_groups(vec![files])
356-
.build();
353+
let file_scan_config = FileScanConfigBuilder::new(object_store_url, parquet_source)
354+
.with_file_groups(vec![files])
355+
.build();
357356

358357
let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config));
359358

native/spark-expr/src/agg_funcs/covariance.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ use arrow::{
2323
compute::cast,
2424
datatypes::{DataType, Field},
2525
};
26-
use datafusion::common::{
27-
downcast_value, unwrap_or_internal_err, Result, ScalarValue,
28-
};
26+
use datafusion::common::{downcast_value, unwrap_or_internal_err, Result, ScalarValue};
2927
use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
3028
use datafusion::logical_expr::type_coercion::aggregates::NUMERICS;
3129
use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};

native/spark-expr/src/math_funcs/round.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ use arrow::array::{Array, ArrowNativeTypeOp};
2121
use arrow::array::{Int16Array, Int32Array, Int64Array, Int8Array};
2222
use arrow::datatypes::{DataType, Field};
2323
use arrow::error::ArrowError;
24-
use datafusion::common::{exec_err, internal_err, DataFusionError, ScalarValue};
2524
use datafusion::common::config::ConfigOptions;
26-
use datafusion::physical_plan::ColumnarValue;
25+
use datafusion::common::{exec_err, internal_err, DataFusionError, ScalarValue};
2726
use datafusion::functions::math::round::RoundFunc;
28-
use datafusion::logical_expr::{ScalarUDFImpl, ScalarFunctionArgs};
27+
use datafusion::logical_expr::{ScalarFunctionArgs, ScalarUDFImpl};
28+
use datafusion::physical_plan::ColumnarValue;
2929
use std::{cmp::min, sync::Arc};
3030

3131
macro_rules! integer_round {
@@ -133,10 +133,7 @@ pub fn spark_round(
133133
let round_udf = RoundFunc::new();
134134
let return_field = Arc::new(Field::new("round", array.data_type().clone(), true));
135135
let args_for_round = ScalarFunctionArgs {
136-
args: vec![
137-
ColumnarValue::Array(Arc::clone(array)),
138-
args[1].clone(),
139-
],
136+
args: vec![ColumnarValue::Array(Arc::clone(array)), args[1].clone()],
140137
number_rows: array.len(),
141138
return_field,
142139
arg_fields: vec![],
@@ -169,10 +166,7 @@ pub fn spark_round(
169166
let data_type = a.data_type();
170167
let return_field = Arc::new(Field::new("round", data_type, true));
171168
let args_for_round = ScalarFunctionArgs {
172-
args: vec![
173-
ColumnarValue::Scalar(a.clone()),
174-
args[1].clone(),
175-
],
169+
args: vec![ColumnarValue::Scalar(a.clone()), args[1].clone()],
176170
number_rows: 1,
177171
return_field,
178172
arg_fields: vec![],

0 commit comments

Comments
 (0)