Skip to content

Commit 4066448

Browse files
committed
DataFusion 52 migration
1 parent df99264 commit 4066448

6 files changed

Lines changed: 33 additions & 29 deletions

File tree

native/core/src/execution/operators/scan.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl ScanExec {
9494

9595
// Build schema directly from data types since get_next now always unpacks dictionaries
9696
let schema = schema_from_data_types(&data_types);
97-
dbg!(&schema);
97+
// dbg!(&schema);
9898

9999
let cache = PlanProperties::new(
100100
EquivalenceProperties::new(Arc::clone(&schema)),
@@ -210,7 +210,7 @@ impl ScanExec {
210210

211211
let array = make_array(array_data);
212212

213-
dbg!(&array, &selection_indices_arrays);
213+
// dbg!(&array, &selection_indices_arrays);
214214

215215
// Apply selection if selection vectors exist (applies to all columns)
216216
let array = if let Some(ref selection_arrays) = selection_indices_arrays {
@@ -490,7 +490,7 @@ impl ScanStream<'_> {
490490
) -> DataFusionResult<RecordBatch, DataFusionError> {
491491
let schema_fields = self.schema.fields();
492492
assert_eq!(columns.len(), schema_fields.len());
493-
dbg!(&columns, &self.schema);
493+
// dbg!(&columns, &self.schema);
494494
// Cast dictionary-encoded primitive arrays to regular arrays and cast
495495
// Utf8/LargeUtf8/Binary arrays to dictionary-encoded if the schema is
496496
// defined as dictionary-encoded and the data in this batch is not
@@ -510,7 +510,7 @@ impl ScanStream<'_> {
510510
})
511511
.collect::<Result<Vec<_>, _>>()?;
512512
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
513-
dbg!(&new_columns, &self.schema);
513+
// dbg!(&new_columns, &self.schema);
514514
RecordBatch::try_new_with_options(Arc::clone(&self.schema), new_columns, &options)
515515
.map_err(|e| arrow_datafusion_err!(e))
516516
}
@@ -521,7 +521,7 @@ impl Stream for ScanStream<'_> {
521521

522522
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
523523
let mut timer = self.baseline_metrics.elapsed_compute().timer();
524-
dbg!(&self.scan);
524+
// dbg!(&self.scan);
525525
let mut scan_batch = self.scan.batch.try_lock().unwrap();
526526

527527
let input_batch = &*scan_batch;

native/core/src/execution/planner.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ impl PhysicalPlanner {
946946
))
947947
}
948948
OpStruct::NativeScan(scan) => {
949-
dbg!(&scan);
949+
// dbg!(&scan);
950950
let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice());
951951
let required_schema: SchemaRef =
952952
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
@@ -1094,7 +1094,7 @@ impl PhysicalPlanner {
10941094
))
10951095
}
10961096
OpStruct::Scan(scan) => {
1097-
dbg!(&scan);
1097+
// dbg!(&scan);
10981098
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();
10991099

11001100
// If it is not test execution context for unit test, we should have at least one
@@ -1121,7 +1121,7 @@ impl PhysicalPlanner {
11211121
scan.arrow_ffi_safe,
11221122
)?;
11231123

1124-
dbg!(&scan);
1124+
// dbg!(&scan);
11251125

11261126
Ok((
11271127
vec![scan.clone()],
@@ -4449,7 +4449,8 @@ mod tests {
44494449
let int8_array = Int8Array::from(vec![Some(1i8), Some(2i8), Some(3i8)]);
44504450

44514451
// Set input batch for the scan
4452-
let input_batch = InputBatch::Batch(vec![Arc::new(date_array), Arc::new(int8_array)], row_count);
4452+
let input_batch =
4453+
InputBatch::Batch(vec![Arc::new(date_array), Arc::new(int8_array)], row_count);
44534454
scans[0].set_input_batch(input_batch);
44544455

44554456
let session_ctx = SessionContext::new();
@@ -4464,7 +4465,8 @@ mod tests {
44644465
// Create test data again for the second batch
44654466
let date_array = Date32Array::from(vec![Some(19000), Some(19001), Some(19002)]);
44664467
let int8_array = Int8Array::from(vec![Some(1i8), Some(2i8), Some(3i8)]);
4467-
let input_batch1 = InputBatch::Batch(vec![Arc::new(date_array), Arc::new(int8_array)], row_count);
4468+
let input_batch1 =
4469+
InputBatch::Batch(vec![Arc::new(date_array), Arc::new(int8_array)], row_count);
44684470
let input_batch2 = InputBatch::EOF;
44694471

44704472
let batches = vec![input_batch1, input_batch2];

native/core/src/parquet/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
703703
key_unwrapper_obj: JObject,
704704
metrics_node: JObject,
705705
) -> jlong {
706-
dbg!("Java_org_apache_comet_parquet_Native_initRecordBatchReader");
706+
// dbg!("Java_org_apache_comet_parquet_Native_initRecordBatchReader");
707707
try_unwrap_or_throw(&e, |mut env| unsafe {
708708
JVMClasses::init(&mut env);
709709
let session_config = SessionConfig::new().with_batch_size(batch_size as usize);
@@ -777,7 +777,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
777777
encryption_enabled,
778778
)?;
779779

780-
dbg!(&scan);
780+
// dbg!(&scan);
781781

782782
let partition_index: usize = 0;
783783
let batch_stream = Some(scan.execute(partition_index, session_ctx.task_ctx())?);
@@ -791,7 +791,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
791791
};
792792
let res = Box::new(ctx);
793793

794-
dbg!("end Java_org_apache_comet_parquet_Native_initRecordBatchReader");
794+
// dbg!("end Java_org_apache_comet_parquet_Native_initRecordBatchReader");
795795

796796
Ok(Box::into_raw(res) as i64)
797797
})

native/core/src/parquet/parquet_exec.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ use datafusion_comet_spark_expr::EvalMode;
3636
use datafusion_datasource::TableSchema;
3737
use std::collections::HashMap;
3838
use std::sync::Arc;
39-
use arrow::util::pretty::print_batches;
40-
use datafusion::physical_plan::ExecutionPlan;
41-
use futures::StreamExt;
4239

4340
/// Initializes a DataSourceExec plan with a ParquetSource. This may be used by either the
4441
/// `native_datafusion` scan or the `native_iceberg_compat` scan.
@@ -81,10 +78,14 @@ pub(crate) fn init_datasource_exec(
8178
encryption_enabled,
8279
);
8380

84-
dbg!(&required_schema, &data_schema);
81+
// dbg!(&required_schema, &data_schema);
8582

8683
// Determine the schema to use for ParquetSource
87-
let base_schema = required_schema.clone();
84+
// Use data_schema only if both data_schema and data_filters are set
85+
let base_schema = match (&data_schema, &data_filters) {
86+
(Some(schema), Some(_)) => Arc::clone(schema),
87+
_ => Arc::clone(&required_schema),
88+
};
8889
let partition_fields: Vec<_> = partition_schema
8990
.iter()
9091
.flat_map(|s| s.fields().iter())
@@ -98,7 +99,7 @@ pub(crate) fn init_datasource_exec(
9899
let mut parquet_source =
99100
ParquetSource::new(table_schema).with_table_parquet_options(table_parquet_options);
100101

101-
dbg!(&parquet_source);
102+
// dbg!(&parquet_source);
102103

103104
// Create a conjunctive form of the vector because ParquetExecBuilder takes
104105
// a single expression
@@ -135,12 +136,13 @@ pub(crate) fn init_datasource_exec(
135136
.map(|files| FileGroup::new(files.clone()))
136137
.collect();
137138

138-
let mut file_scan_config_builder =
139-
FileScanConfigBuilder::new(object_store_url, file_source).with_file_groups(file_groups);//.with_expr_adapter(Some(expr_adapter_factory));
139+
let mut file_scan_config_builder = FileScanConfigBuilder::new(object_store_url, file_source)
140+
.with_file_groups(file_groups)
141+
.with_expr_adapter(Some(expr_adapter_factory));
140142

141143
if let Some(projection_vector) = projection_vector {
142-
file_scan_config_builder = file_scan_config_builder
143-
.with_projection_indices(Some(projection_vector))?;
144+
file_scan_config_builder =
145+
file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
144146
}
145147

146148
let file_scan_config = file_scan_config_builder.build();

native/core/src/parquet/schema_adapter.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,15 @@ impl SparkPhysicalExprAdapter {
182182
// Get the physical datatype (actual file schema)
183183
let physical_field = self.physical_file_schema.fields().get(col_idx);
184184

185-
dbg!(&logical_field, &physical_field);
185+
// dbg!(&logical_field, &physical_field);
186186

187187
if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field)
188188
{
189189
let logical_type = logical_field.data_type();
190190
let physical_type = physical_field.data_type();
191191

192192
// If datatypes differ, insert a CastColumnExpr
193-
if logical_type != physical_type || 1==1 {
193+
if logical_type != physical_type {
194194
let input_field = Arc::new(Field::new(
195195
physical_field.name(),
196196
physical_type.clone(),
@@ -203,12 +203,12 @@ impl SparkPhysicalExprAdapter {
203203
));
204204

205205
let cast_expr: Arc<dyn PhysicalExpr> = Arc::new(CastColumnExpr::new(
206-
e.clone(),
206+
Arc::clone(&e),
207207
input_field,
208208
target_field,
209209
None,
210210
));
211-
dbg!(&cast_expr);
211+
// dbg!(&cast_expr);
212212
return Ok(Transformed::yes(cast_expr));
213213
}
214214
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
347347
}
348348

349349
test("date_sub with int scalars") {
350-
Seq(false).foreach { dictionaryEnabled =>
351-
Seq("INT").foreach { intType =>
350+
Seq(true, false).foreach { dictionaryEnabled =>
351+
Seq("TINYINT", "SHORT", "INT").foreach { intType =>
352352
withTempDir { dir =>
353353
val path = new Path(dir.toURI.toString, "test.parquet")
354354
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)

0 commit comments

Comments
 (0)