Skip to content

Commit af84b73

Browse files
committed
Fix.
1 parent 42813d9 commit af84b73

4 files changed

Lines changed: 16 additions & 17 deletions

File tree

native/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/src/parquet/schema_adapter.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -370,21 +370,29 @@ impl SparkPhysicalExprAdapter {
370370
.data()
371371
}
372372

373-
/// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast expression.
373+
/// Replace CastExpr (DataFusion's cast) with Spark's Cast expression.
374374
fn replace_with_spark_cast(
375375
&self,
376376
expr: Arc<dyn PhysicalExpr>,
377377
) -> DataFusionResult<Transformed<Arc<dyn PhysicalExpr>>> {
378-
// Check for CastColumnExpr and replace with spark_expr::Cast
379-
// CastColumnExpr is in datafusion_physical_expr::expressions
378+
// Check for CastExpr and replace with spark_expr::Cast
380379
if let Some(cast) = expr
381380
.as_any()
382-
.downcast_ref::<datafusion::physical_expr::expressions::CastColumnExpr>()
381+
.downcast_ref::<datafusion::physical_expr::expressions::CastExpr>()
383382
{
384383
let child = Arc::clone(cast.expr());
385-
let physical_type = cast.input_field().data_type();
386384
let target_type = cast.target_field().data_type();
387385

386+
// Derive input field from the child Column expression and the physical schema
387+
let input_field = if let Some(col) = child.as_any().downcast_ref::<Column>() {
388+
Arc::new(self.physical_file_schema.field(col.index()).clone())
389+
} else {
390+
// Fallback: synthesize a field from the target field name and child data type
391+
let child_type = cast.expr().data_type(&self.physical_file_schema)?;
392+
Arc::new(Field::new(cast.target_field().name(), child_type, true))
393+
};
394+
let physical_type = input_field.data_type();
395+
388396
// For complex nested types (Struct, List, Map), Timestamp timezone
389397
// mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr
390398
// with spark_parquet_convert which handles field-name-based selection,
@@ -413,7 +421,7 @@ impl SparkPhysicalExprAdapter {
413421
let comet_cast: Arc<dyn PhysicalExpr> = Arc::new(
414422
CometCastColumnExpr::new(
415423
child,
416-
Arc::clone(cast.input_field()),
424+
input_field,
417425
Arc::clone(cast.target_field()),
418426
None,
419427
)

native/spark-expr/src/datetime_funcs/date_from_unix_date.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use datafusion::common::{utils::take_function_args, DataFusionError, Result, Sca
2121
use datafusion::logical_expr::{
2222
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
2323
};
24-
use std::any::Any;
2524
use std::sync::Arc;
2625

2726
/// Spark-compatible date_from_unix_date function.
@@ -48,10 +47,6 @@ impl Default for SparkDateFromUnixDate {
4847
}
4948

5049
impl ScalarUDFImpl for SparkDateFromUnixDate {
51-
fn as_any(&self) -> &dyn Any {
52-
self
53-
}
54-
5550
fn name(&self) -> &str {
5651
"date_from_unix_date"
5752
}

native/spark-expr/src/datetime_funcs/hours.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion::logical_expr::{
3232
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
3333
};
3434
use num::integer::div_floor;
35-
use std::{any::Any, fmt::Debug, sync::Arc};
35+
use std::{fmt::Debug, sync::Arc};
3636

3737
const MICROS_PER_HOUR: i64 = 3_600_000_000;
3838

@@ -56,10 +56,6 @@ impl Default for SparkHoursTransform {
5656
}
5757

5858
impl ScalarUDFImpl for SparkHoursTransform {
59-
fn as_any(&self) -> &dyn Any {
60-
self
61-
}
62-
6359
fn name(&self) -> &str {
6460
"hours_transform"
6561
}

0 commit comments

Comments
 (0)