Skip to content

Commit e65cf3b

Browse files
committed
DataFusion 52 migration
1 parent b68ad82 commit e65cf3b

2 files changed

Lines changed: 19 additions & 18 deletions

File tree

native/core/src/parquet/cast_column.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use arrow::{
2323

2424
use datafusion::common::format::DEFAULT_CAST_OPTIONS;
2525
use datafusion::common::Result as DataFusionResult;
26-
use datafusion::common::{cast_column, ScalarValue};
26+
use datafusion::common::ScalarValue;
2727
use datafusion::logical_expr::ColumnarValue;
2828
use datafusion::physical_expr::PhysicalExpr;
2929
use std::{
@@ -70,8 +70,7 @@ fn cast_timestamp_micros_to_millis_scalar(
7070
}
7171

7272
#[derive(Debug, Clone, Eq)]
73-
pub struct
74-
CometCastColumnExpr {
73+
pub struct CometCastColumnExpr {
7574
/// The physical expression producing the value to cast.
7675
expr: Arc<dyn PhysicalExpr>,
7776
/// The physical field of the input column.
@@ -146,8 +145,11 @@ impl PhysicalExpr for CometCastColumnExpr {
146145
fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
147146
let value = self.expr.evaluate(batch)?;
148147

149-
if value.data_type().equals_datatype(self.target_field.data_type()) {
150-
return Ok(value)
148+
if value
149+
.data_type()
150+
.equals_datatype(self.target_field.data_type())
151+
{
152+
return Ok(value);
151153
}
152154

153155
let input_physical_field = self.input_physical_field.data_type();
@@ -214,10 +216,10 @@ mod tests {
214216
fn test_cast_timestamp_micros_to_millis_array() {
215217
// Create a TimestampMicrosecond array with some values
216218
let micros_array: TimestampMicrosecondArray = vec![
217-
Some(1_000_000), // 1 second in micros
218-
Some(2_500_000), // 2.5 seconds in micros
219-
None, // null value
220-
Some(0), // zero
219+
Some(1_000_000), // 1 second in micros
220+
Some(2_500_000), // 2.5 seconds in micros
221+
None, // null value
222+
Some(0), // zero
221223
Some(-1_000_000), // negative value (before epoch)
222224
]
223225
.into();
@@ -263,10 +265,7 @@ mod tests {
263265
fn test_cast_timestamp_micros_to_millis_scalar() {
264266
// Test with a value
265267
let result = cast_timestamp_micros_to_millis_scalar(Some(1_500_000), None);
266-
assert_eq!(
267-
result,
268-
ScalarValue::TimestampMillisecond(Some(1500), None)
269-
);
268+
assert_eq!(result, ScalarValue::TimestampMillisecond(Some(1500), None));
270269

271270
// Test with null
272271
let null_result = cast_timestamp_micros_to_millis_scalar(None, None);
@@ -305,7 +304,8 @@ mod tests {
305304
let cast_expr = CometCastColumnExpr::new(col_expr, input_field, target_field, None);
306305

307306
// Create a record batch with TimestampMicrosecond data
308-
let micros_array: TimestampMicrosecondArray = vec![Some(1_000_000), Some(2_000_000), None].into();
307+
let micros_array: TimestampMicrosecondArray =
308+
vec![Some(1_000_000), Some(2_000_000), None].into();
309309
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(micros_array)]).unwrap();
310310

311311
// Evaluate

native/core/src/parquet/schema_adapter.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
use crate::parquet::cast_column::CometCastColumnExpr;
2727
use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
2828
use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
29-
use arrow::datatypes::{Field, Schema, SchemaRef};
29+
use arrow::datatypes::{Schema, SchemaRef};
3030
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
3131
use datafusion::common::{ColumnStatistics, Result as DataFusionResult};
3232
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
@@ -100,6 +100,7 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
100100
/// 2. Replace standard DataFusion cast expressions with Spark-compatible casts
101101
/// 3. Handle case-insensitive column matching
102102
#[derive(Debug)]
103+
#[allow(dead_code)]
103104
struct SparkPhysicalExprAdapter {
104105
/// The logical schema expected by the query
105106
logical_file_schema: SchemaRef,
@@ -125,6 +126,7 @@ impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
125126
}
126127
}
127128

129+
#[allow(dead_code)]
128130
impl SparkPhysicalExprAdapter {
129131
/// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast expression.
130132
fn replace_with_spark_cast(
@@ -179,7 +181,6 @@ impl SparkPhysicalExprAdapter {
179181
// Get the physical datatype (actual file schema)
180182
let physical_field = self.physical_file_schema.fields().get(col_idx);
181183

182-
183184
// dbg!(&logical_field, &physical_field);
184185

185186
if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field)
@@ -191,8 +192,8 @@ impl SparkPhysicalExprAdapter {
191192
if logical_type != physical_type {
192193
let cast_expr: Arc<dyn PhysicalExpr> = Arc::new(CometCastColumnExpr::new(
193194
Arc::clone(&e),
194-
physical_field.clone(),
195-
logical_field.clone(),
195+
Arc::clone(physical_field),
196+
Arc::clone(logical_field),
196197
None,
197198
));
198199
// dbg!(&cast_expr);

0 commit comments

Comments
 (0)