Skip to content

Commit 43c843b

Browse files
andygroveclaude
andauthored
fix: [df52] schema pruning crash on complex nested types (#3500)
* fix: [df52] schema pruning crash on complex nested types When `data_schema` is provided but `projection_vector` is None (the NativeBatchReader / native_iceberg_compat path), the base schema was incorrectly set to the pruned `required_schema`. This caused DataFusion to think the table had only the pruned columns, leading to column index misalignment in PhysicalExprAdapter. For example, reading "friends" at logical index 0 would map to physical index 0 ("id") instead of the correct index 4. Fix: when `data_schema` is provided without a `projection_vector`, compute the projection by mapping required field names to their indices in the full data schema. Also harden `wrap_all_type_mismatches` to use name-based lookup for physical fields instead of positional index. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: handle field ID mapping in projection computation When computing a name-based projection from required_schema to data_schema, fall back to using required_schema directly when not all fields can be matched by name. This handles Parquet field ID mapping where column names differ between the read schema and file schema. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 802b794 commit 43c843b

2 files changed

Lines changed: 49 additions & 18 deletions

File tree

native/core/src/parquet/parquet_exec.rs

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,40 @@ pub(crate) fn init_datasource_exec(
8080
encryption_enabled,
8181
);
8282

83-
// dbg!(&required_schema, &data_schema);
84-
85-
// Determine the schema to use for ParquetSource
86-
// // Use data_schema only if both data_schema and data_filters are set
87-
let base_schema = match (&data_schema, &projection_vector) {
88-
(Some(schema), Some(_)) => Arc::clone(schema),
89-
_ => Arc::clone(&required_schema),
83+
// Determine the schema and projection to use for ParquetSource.
84+
// When data_schema is provided, use it as the base schema so DataFusion knows the full
85+
// file schema. Compute a projection vector to select only the required columns.
86+
let (base_schema, projection) = match (&data_schema, &projection_vector) {
87+
(Some(schema), Some(proj)) => (Arc::clone(schema), Some(proj.clone())),
88+
(Some(schema), None) => {
89+
// Compute projection: map required_schema field names to data_schema indices.
90+
// This is needed for schema pruning when the data_schema has more columns than
91+
// the required_schema.
92+
let projection: Vec<usize> = required_schema
93+
.fields()
94+
.iter()
95+
.filter_map(|req_field| {
96+
schema.fields().iter().position(|data_field| {
97+
if case_sensitive {
98+
data_field.name() == req_field.name()
99+
} else {
100+
data_field.name().to_lowercase() == req_field.name().to_lowercase()
101+
}
102+
})
103+
})
104+
.collect();
105+
// Only use data_schema + projection when all required fields were found by name.
106+
// When some fields can't be matched (e.g., Parquet field ID mapping where names
107+
// differ between required and data schemas), fall back to using required_schema
108+
// directly with no projection.
109+
if projection.len() == required_schema.fields().len() {
110+
(Arc::clone(schema), Some(projection))
111+
} else {
112+
(Arc::clone(&required_schema), None)
113+
}
114+
}
115+
_ => (Arc::clone(&required_schema), None),
90116
};
91-
//let base_schema = required_schema;
92-
// dbg!(&base_schema);
93-
// dbg!(&data_schema);
94-
// dbg!(&data_filters);
95117
let partition_fields: Vec<_> = partition_schema
96118
.iter()
97119
.flat_map(|s| s.fields().iter())
@@ -100,13 +122,9 @@ pub(crate) fn init_datasource_exec(
100122
let table_schema =
101123
TableSchema::from_file_schema(base_schema).with_table_partition_cols(partition_fields);
102124

103-
// dbg!(&table_schema);
104-
105125
let mut parquet_source =
106126
ParquetSource::new(table_schema).with_table_parquet_options(table_parquet_options);
107127

108-
// dbg!(&parquet_source);
109-
110128
// Create a conjunctive form of the vector because ParquetExecBuilder takes
111129
// a single expression
112130
if let Some(data_filters) = data_filters {
@@ -146,9 +164,9 @@ pub(crate) fn init_datasource_exec(
146164
.with_file_groups(file_groups)
147165
.with_expr_adapter(Some(expr_adapter_factory));
148166

149-
if let Some(projection_vector) = projection_vector {
167+
if let Some(projection) = projection {
150168
file_scan_config_builder =
151-
file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
169+
file_scan_config_builder.with_projection_indices(Some(projection))?;
152170
}
153171

154172
let file_scan_config = file_scan_config_builder.build();

native/core/src/parquet/schema_adapter.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,22 @@ impl SparkPhysicalExprAdapter {
262262
expr.transform(|e| {
263263
if let Some(column) = e.as_any().downcast_ref::<Column>() {
264264
let col_idx = column.index();
265+
let col_name = column.name();
265266

266267
let logical_field = self.logical_file_schema.fields().get(col_idx);
267-
let physical_field = self.physical_file_schema.fields().get(col_idx);
268+
// Look up physical field by name instead of index for correctness
269+
// when logical and physical schemas have different column orderings
270+
let physical_field = if self.parquet_options.case_sensitive {
271+
self.physical_file_schema
272+
.fields()
273+
.iter()
274+
.find(|f| f.name() == col_name)
275+
} else {
276+
self.physical_file_schema
277+
.fields()
278+
.iter()
279+
.find(|f| f.name().to_lowercase() == col_name.to_lowercase())
280+
};
268281

269282
if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field)
270283
{

0 commit comments

Comments
 (0)