Skip to content

Commit 89b0f1a

Browse files
andygroveclaude
authored andcommitted
fix: handle complex nested type casts in schema adapter for DF52 (apache#3475)
DataFusion 52's default PhysicalExprAdapter can fail when casting complex nested types (List<Struct>, Map) between physical and logical schemas. This adds a fallback path in SparkPhysicalExprAdapter that wraps type-mismatched columns with CometCastColumnExpr using spark_parquet_convert for the actual conversion. Changes to CometCastColumnExpr: - Add optional SparkParquetOptions for complex nested type conversions - Use == instead of equals_datatype to detect field name differences in nested types (Struct, List, Map) - Add relabel_array for types that differ only in field names (e.g., List element "item" vs "element", Map "key_value" vs "entries") - Fallback to spark_parquet_convert for structural nested type changes Changes to SparkPhysicalExprAdapter: - Try default adapter first, fall back to wrap_all_type_mismatches when it fails on complex nested types - Route Struct/List/Map casts to CometCastColumnExpr instead of Spark Cast, which doesn't handle nested type rewriting Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ac49b47 commit 89b0f1a

2 files changed

Lines changed: 186 additions & 21 deletions

File tree

native/core/src/parquet/cast_column.rs

Lines changed: 102 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717
use arrow::{
18-
array::{ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray},
18+
array::{make_array, ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray},
1919
compute::CastOptions,
2020
datatypes::{DataType, FieldRef, Schema, TimeUnit},
2121
record_batch::RecordBatch,
2222
};
2323

24+
use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
2425
use datafusion::common::format::DEFAULT_CAST_OPTIONS;
2526
use datafusion::common::Result as DataFusionResult;
2627
use datafusion::common::ScalarValue;
@@ -33,6 +34,59 @@ use std::{
3334
sync::Arc,
3435
};
3536

37+
/// Returns true if two DataTypes are structurally equivalent (same data layout)
38+
/// but may differ in field names within nested types.
39+
fn types_differ_only_in_field_names(physical: &DataType, logical: &DataType) -> bool {
40+
match (physical, logical) {
41+
(DataType::List(pf), DataType::List(lf)) => {
42+
pf.is_nullable() == lf.is_nullable()
43+
&& (pf.data_type() == lf.data_type()
44+
|| types_differ_only_in_field_names(pf.data_type(), lf.data_type()))
45+
}
46+
(DataType::LargeList(pf), DataType::LargeList(lf)) => {
47+
pf.is_nullable() == lf.is_nullable()
48+
&& (pf.data_type() == lf.data_type()
49+
|| types_differ_only_in_field_names(pf.data_type(), lf.data_type()))
50+
}
51+
(DataType::Map(pf, p_sorted), DataType::Map(lf, l_sorted)) => {
52+
p_sorted == l_sorted
53+
&& pf.is_nullable() == lf.is_nullable()
54+
&& (pf.data_type() == lf.data_type()
55+
|| types_differ_only_in_field_names(pf.data_type(), lf.data_type()))
56+
}
57+
(DataType::Struct(pfields), DataType::Struct(lfields)) => {
58+
// For Struct types, field names are semantically meaningful (they
59+
// identify different columns), so we require name equality here.
60+
// This distinguishes from List/Map wrapper field names ("item" vs
61+
// "element") which are purely cosmetic.
62+
pfields.len() == lfields.len()
63+
&& pfields.iter().zip(lfields.iter()).all(|(pf, lf)| {
64+
pf.name() == lf.name()
65+
&& pf.is_nullable() == lf.is_nullable()
66+
&& (pf.data_type() == lf.data_type()
67+
|| types_differ_only_in_field_names(pf.data_type(), lf.data_type()))
68+
})
69+
}
70+
_ => false,
71+
}
72+
}
73+
74+
/// Recursively relabel an array so its DataType matches `target_type`.
75+
/// This only changes metadata (field names, nullability flags in nested fields);
76+
/// it does NOT change the underlying buffer data.
77+
fn relabel_array(array: ArrayRef, target_type: &DataType) -> ArrayRef {
78+
if array.data_type() == target_type {
79+
return array;
80+
}
81+
let data = array.to_data();
82+
let new_data = data
83+
.into_builder()
84+
.data_type(target_type.clone())
85+
.build()
86+
.expect("relabel_array: data layout must be compatible");
87+
make_array(new_data)
88+
}
89+
3690
/// Casts a Timestamp(Microsecond) array to Timestamp(Millisecond) by dividing values by 1000.
3791
/// Preserves the timezone from the target type.
3892
fn cast_timestamp_micros_to_millis_array(
@@ -79,6 +133,9 @@ pub struct CometCastColumnExpr {
79133
target_field: FieldRef,
80134
/// Options forwarded to [`cast_column`].
81135
cast_options: CastOptions<'static>,
136+
/// Spark parquet options for complex nested type conversions.
137+
/// When present, enables `spark_parquet_convert` as a fallback.
138+
parquet_options: Option<SparkParquetOptions>,
82139
}
83140

84141
// Manually derive `PartialEq`/`Hash` as `Arc<dyn PhysicalExpr>` does not
@@ -89,6 +146,7 @@ impl PartialEq for CometCastColumnExpr {
89146
&& self.input_physical_field.eq(&other.input_physical_field)
90147
&& self.target_field.eq(&other.target_field)
91148
&& self.cast_options.eq(&other.cast_options)
149+
&& self.parquet_options.eq(&other.parquet_options)
92150
}
93151
}
94152

@@ -98,6 +156,7 @@ impl Hash for CometCastColumnExpr {
98156
self.input_physical_field.hash(state);
99157
self.target_field.hash(state);
100158
self.cast_options.hash(state);
159+
self.parquet_options.hash(state);
101160
}
102161
}
103162

@@ -114,8 +173,15 @@ impl CometCastColumnExpr {
114173
input_physical_field: physical_field,
115174
target_field,
116175
cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS),
176+
parquet_options: None,
117177
}
118178
}
179+
180+
/// Set Spark parquet options to enable complex nested type conversions.
181+
pub fn with_parquet_options(mut self, options: SparkParquetOptions) -> Self {
182+
self.parquet_options = Some(options);
183+
self
184+
}
119185
}
120186

121187
impl Display for CometCastColumnExpr {
@@ -145,18 +211,17 @@ impl PhysicalExpr for CometCastColumnExpr {
145211
fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
146212
let value = self.expr.evaluate(batch)?;
147213

148-
if value
149-
.data_type()
150-
.equals_datatype(self.target_field.data_type())
151-
{
214+
// Use == (PartialEq) instead of equals_datatype because equals_datatype
215+
// ignores field names in nested types (Struct, List, Map). We need to detect
216+
// when field names differ (e.g., Struct("a","b") vs Struct("c","d")) so that
217+
// we can apply spark_parquet_convert for field-name-based selection.
218+
if value.data_type() == *self.target_field.data_type() {
152219
return Ok(value);
153220
}
154221

155222
let input_physical_field = self.input_physical_field.data_type();
156223
let target_field = self.target_field.data_type();
157224

158-
// dbg!(&input_physical_field, &target_field, &value);
159-
160225
// Handle specific type conversions with custom casts
161226
match (input_physical_field, target_field) {
162227
// Timestamp(Microsecond) -> Timestamp(Millisecond)
@@ -174,7 +239,30 @@ impl PhysicalExpr for CometCastColumnExpr {
174239
}
175240
_ => Ok(value),
176241
},
177-
_ => Ok(value),
242+
// Nested types that differ only in field names (e.g., List element named
243+
// "item" vs "element", or Map entries named "key_value" vs "entries").
244+
// Re-label the array so the DataType metadata matches the logical schema.
245+
(physical, logical)
246+
if physical != logical && types_differ_only_in_field_names(physical, logical) =>
247+
{
248+
match value {
249+
ColumnarValue::Array(array) => {
250+
let relabeled = relabel_array(array, logical);
251+
Ok(ColumnarValue::Array(relabeled))
252+
}
253+
other => Ok(other),
254+
}
255+
}
256+
// Fallback: use spark_parquet_convert for complex nested type conversions
257+
// (e.g., List<Struct{a,b,c}> → List<Struct{a,c}>, Map field selection, etc.)
258+
_ => {
259+
if let Some(parquet_options) = &self.parquet_options {
260+
let converted = spark_parquet_convert(value, target_field, parquet_options)?;
261+
Ok(converted)
262+
} else {
263+
Ok(value)
264+
}
265+
}
178266
}
179267
}
180268

@@ -192,12 +280,16 @@ impl PhysicalExpr for CometCastColumnExpr {
192280
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
193281
assert_eq!(children.len(), 1);
194282
let child = children.pop().expect("CastColumnExpr child");
195-
Ok(Arc::new(Self::new(
283+
let mut new_expr = Self::new(
196284
child,
197285
Arc::clone(&self.input_physical_field),
198286
Arc::clone(&self.target_field),
199287
Some(self.cast_options.clone()),
200-
)))
288+
);
289+
if let Some(opts) = &self.parquet_options {
290+
new_expr = new_expr.with_parquet_options(opts.clone());
291+
}
292+
Ok(Arc::new(new_expr))
201293
}
202294

203295
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

native/core/src/parquet/schema_adapter.rs

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
use crate::parquet::cast_column::CometCastColumnExpr;
2727
use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
2828
use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
29-
use arrow::datatypes::{Schema, SchemaRef};
29+
use arrow::datatypes::{DataType, Schema, SchemaRef};
3030
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
3131
use datafusion::common::{ColumnStatistics, Result as DataFusionResult};
3232
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
@@ -116,18 +116,69 @@ struct SparkPhysicalExprAdapter {
116116

117117
impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
118118
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
119-
// dbg!(&expr);
120-
121-
let expr = self.default_adapter.rewrite(expr)?;
122-
123-
//self.cast_datafusion_unsupported_expr(expr)
124-
125-
expr.transform(|e| self.replace_with_spark_cast(e)).data()
119+
// First let the default adapter handle column remapping, missing columns,
120+
// and simple scalar type casts. Then replace DataFusion's CastColumnExpr
121+
// with Spark-compatible equivalents.
122+
//
123+
// The default adapter may fail for complex nested type casts (List, Map).
124+
// In that case, fall back to wrapping everything ourselves.
125+
let expr = match self.default_adapter.rewrite(Arc::clone(&expr)) {
126+
Ok(rewritten) => {
127+
// Replace DataFusion's CastColumnExpr with either:
128+
// - CometCastColumnExpr (for Struct/List/Map, uses spark_parquet_convert)
129+
// - Spark Cast (for simple scalar types)
130+
rewritten
131+
.transform(|e| self.replace_with_spark_cast(e))
132+
.data()?
133+
}
134+
Err(_) => {
135+
// Default adapter failed (likely complex nested type cast).
136+
// Handle all type mismatches ourselves using spark_parquet_convert.
137+
self.wrap_all_type_mismatches(expr)?
138+
}
139+
};
140+
Ok(expr)
126141
}
127142
}
128143

129144
#[allow(dead_code)]
130145
impl SparkPhysicalExprAdapter {
146+
/// Wrap ALL Column expressions that have type mismatches with CometCastColumnExpr.
147+
/// This is the fallback path when the default adapter fails (e.g., for complex
148+
/// nested type casts like List<Struct> or Map). Uses `spark_parquet_convert`
149+
/// under the hood for the actual type conversion.
150+
fn wrap_all_type_mismatches(
151+
&self,
152+
expr: Arc<dyn PhysicalExpr>,
153+
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
154+
expr.transform(|e| {
155+
if let Some(column) = e.as_any().downcast_ref::<Column>() {
156+
let col_idx = column.index();
157+
158+
let logical_field = self.logical_file_schema.fields().get(col_idx);
159+
let physical_field = self.physical_file_schema.fields().get(col_idx);
160+
161+
if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field)
162+
{
163+
if logical_field.data_type() != physical_field.data_type() {
164+
let cast_expr: Arc<dyn PhysicalExpr> = Arc::new(
165+
CometCastColumnExpr::new(
166+
Arc::clone(&e),
167+
Arc::clone(physical_field),
168+
Arc::clone(logical_field),
169+
None,
170+
)
171+
.with_parquet_options(self.parquet_options.clone()),
172+
);
173+
return Ok(Transformed::yes(cast_expr));
174+
}
175+
}
176+
}
177+
Ok(Transformed::no(e))
178+
})
179+
.data()
180+
}
181+
131182
/// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast expression.
132183
fn replace_with_spark_cast(
133184
&self,
@@ -140,9 +191,31 @@ impl SparkPhysicalExprAdapter {
140191
.downcast_ref::<datafusion::physical_expr::expressions::CastColumnExpr>()
141192
{
142193
let child = Arc::clone(cast.expr());
143-
let target_type = cast.target_field().data_type().clone();
194+
let physical_type = cast.input_field().data_type();
195+
let target_type = cast.target_field().data_type();
196+
197+
// For complex nested types (Struct, List, Map), use CometCastColumnExpr
198+
// with spark_parquet_convert which handles field-name-based selection,
199+
// reordering, and nested type casting correctly.
200+
if matches!(
201+
(physical_type, target_type),
202+
(DataType::Struct(_), DataType::Struct(_))
203+
| (DataType::List(_), DataType::List(_))
204+
| (DataType::Map(_, _), DataType::Map(_, _))
205+
) {
206+
let comet_cast: Arc<dyn PhysicalExpr> = Arc::new(
207+
CometCastColumnExpr::new(
208+
child,
209+
Arc::clone(cast.input_field()),
210+
Arc::clone(cast.target_field()),
211+
None,
212+
)
213+
.with_parquet_options(self.parquet_options.clone()),
214+
);
215+
return Ok(Transformed::yes(comet_cast));
216+
}
144217

145-
// Create Spark-compatible cast options
218+
// For simple scalar type casts, use Spark-compatible Cast expression
146219
let mut cast_options = SparkCastOptions::new(
147220
self.parquet_options.eval_mode,
148221
&self.parquet_options.timezone,
@@ -151,7 +224,7 @@ impl SparkPhysicalExprAdapter {
151224
cast_options.allow_cast_unsigned_ints = self.parquet_options.allow_cast_unsigned_ints;
152225
cast_options.is_adapting_schema = true;
153226

154-
let spark_cast = Arc::new(Cast::new(child, target_type, cast_options));
227+
let spark_cast = Arc::new(Cast::new(child, target_type.clone(), cast_options));
155228

156229
return Ok(Transformed::yes(spark_cast as Arc<dyn PhysicalExpr>));
157230
}

0 commit comments

Comments
 (0)