|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 | use arrow::{ |
18 | | - array::{make_array, ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray}, |
| 18 | + array::{ |
| 19 | + make_array, Array, ArrayRef, LargeListArray, ListArray, MapArray, StructArray, |
| 20 | + TimestampMicrosecondArray, TimestampMillisecondArray, |
| 21 | + }, |
19 | 22 | compute::CastOptions, |
20 | 23 | datatypes::{DataType, FieldRef, Schema, TimeUnit}, |
21 | 24 | record_batch::RecordBatch, |
@@ -78,13 +81,66 @@ fn relabel_array(array: ArrayRef, target_type: &DataType) -> ArrayRef { |
78 | 81 | if array.data_type() == target_type { |
79 | 82 | return array; |
80 | 83 | } |
81 | | - let data = array.to_data(); |
82 | | - let new_data = data |
83 | | - .into_builder() |
84 | | - .data_type(target_type.clone()) |
85 | | - .build() |
86 | | - .expect("relabel_array: data layout must be compatible"); |
87 | | - make_array(new_data) |
| 84 | + match target_type { |
| 85 | + DataType::List(target_field) => { |
| 86 | + let list = array.as_any().downcast_ref::<ListArray>().unwrap(); |
| 87 | + let values = relabel_array(Arc::clone(list.values()), target_field.data_type()); |
| 88 | + Arc::new(ListArray::new( |
| 89 | + Arc::clone(target_field), |
| 90 | + list.offsets().clone(), |
| 91 | + values, |
| 92 | + list.nulls().cloned(), |
| 93 | + )) |
| 94 | + } |
| 95 | + DataType::LargeList(target_field) => { |
| 96 | + let list = array.as_any().downcast_ref::<LargeListArray>().unwrap(); |
| 97 | + let values = relabel_array(Arc::clone(list.values()), target_field.data_type()); |
| 98 | + Arc::new(LargeListArray::new( |
| 99 | + Arc::clone(target_field), |
| 100 | + list.offsets().clone(), |
| 101 | + values, |
| 102 | + list.nulls().cloned(), |
| 103 | + )) |
| 104 | + } |
| 105 | + DataType::Map(target_entries_field, sorted) => { |
| 106 | + let map = array.as_any().downcast_ref::<MapArray>().unwrap(); |
| 107 | + let entries = relabel_array( |
| 108 | + Arc::new(map.entries().clone()), |
| 109 | + target_entries_field.data_type(), |
| 110 | + ); |
| 111 | + let entries_struct = entries.as_any().downcast_ref::<StructArray>().unwrap(); |
| 112 | + Arc::new(MapArray::new( |
| 113 | + Arc::clone(target_entries_field), |
| 114 | + map.offsets().clone(), |
| 115 | + entries_struct.clone(), |
| 116 | + map.nulls().cloned(), |
| 117 | + *sorted, |
| 118 | + )) |
| 119 | + } |
| 120 | + DataType::Struct(target_fields) => { |
| 121 | + let struct_arr = array.as_any().downcast_ref::<StructArray>().unwrap(); |
| 122 | + let columns: Vec<ArrayRef> = target_fields |
| 123 | + .iter() |
| 124 | + .zip(struct_arr.columns()) |
| 125 | + .map(|(tf, col)| relabel_array(Arc::clone(col), tf.data_type())) |
| 126 | + .collect(); |
| 127 | + Arc::new(StructArray::new( |
| 128 | + target_fields.clone(), |
| 129 | + columns, |
| 130 | + struct_arr.nulls().cloned(), |
| 131 | + )) |
| 132 | + } |
| 133 | + // Primitive types - shallow swap is safe |
| 134 | + _ => { |
| 135 | + let data = array.to_data(); |
| 136 | + let new_data = data |
| 137 | + .into_builder() |
| 138 | + .data_type(target_type.clone()) |
| 139 | + .build() |
| 140 | + .expect("relabel_array: data layout must be compatible"); |
| 141 | + make_array(new_data) |
| 142 | + } |
| 143 | + } |
88 | 144 | } |
89 | 145 |
|
90 | 146 | /// Casts a Timestamp(Microsecond) array to Timestamp(Millisecond) by dividing values by 1000. |
@@ -300,8 +356,8 @@ impl PhysicalExpr for CometCastColumnExpr { |
300 | 356 | #[cfg(test)] |
301 | 357 | mod tests { |
302 | 358 | use super::*; |
303 | | - use arrow::array::Array; |
304 | | - use arrow::datatypes::Field; |
| 359 | + use arrow::array::{Array, Int32Array, StringArray}; |
| 360 | + use arrow::datatypes::{Field, Fields}; |
305 | 361 | use datafusion::physical_expr::expressions::Column; |
306 | 362 |
|
307 | 363 | #[test] |
@@ -455,4 +511,129 @@ mod tests { |
455 | 511 | _ => panic!("Expected Scalar result"), |
456 | 512 | } |
457 | 513 | } |
| 514 | + |
| 515 | + #[test] |
| 516 | + fn test_relabel_list_field_name() { |
| 517 | + // Physical: List(Field("item", Int32)) |
| 518 | + // Logical: List(Field("element", Int32)) |
| 519 | + let physical_field = Arc::new(Field::new("item", DataType::Int32, true)); |
| 520 | + let logical_field = Arc::new(Field::new("element", DataType::Int32, true)); |
| 521 | + |
| 522 | + let values = Int32Array::from(vec![1, 2, 3]); |
| 523 | + let list = ListArray::new( |
| 524 | + physical_field, |
| 525 | + arrow::buffer::OffsetBuffer::new(vec![0, 2, 3].into()), |
| 526 | + Arc::new(values), |
| 527 | + None, |
| 528 | + ); |
| 529 | + let array: ArrayRef = Arc::new(list); |
| 530 | + |
| 531 | + let target_type = DataType::List(logical_field.clone()); |
| 532 | + let result = relabel_array(array, &target_type); |
| 533 | + assert_eq!(result.data_type(), &target_type); |
| 534 | + } |
| 535 | + |
| 536 | + #[test] |
| 537 | + fn test_relabel_map_entries_field_name() { |
| 538 | + // Physical: Map(Field("key_value", Struct{key, value})) |
| 539 | + // Logical: Map(Field("entries", Struct{key, value})) |
| 540 | + let key_field = Arc::new(Field::new("key", DataType::Utf8, false)); |
| 541 | + let value_field = Arc::new(Field::new("value", DataType::Int32, true)); |
| 542 | + let struct_fields = Fields::from(vec![key_field.clone(), value_field.clone()]); |
| 543 | + |
| 544 | + let physical_entries_field = Arc::new(Field::new( |
| 545 | + "key_value", |
| 546 | + DataType::Struct(struct_fields.clone()), |
| 547 | + false, |
| 548 | + )); |
| 549 | + let logical_entries_field = Arc::new(Field::new( |
| 550 | + "entries", |
| 551 | + DataType::Struct(struct_fields.clone()), |
| 552 | + false, |
| 553 | + )); |
| 554 | + |
| 555 | + let keys = StringArray::from(vec!["a", "b"]); |
| 556 | + let values = Int32Array::from(vec![1, 2]); |
| 557 | + let entries = StructArray::new(struct_fields, vec![Arc::new(keys), Arc::new(values)], None); |
| 558 | + let map = MapArray::new( |
| 559 | + physical_entries_field, |
| 560 | + arrow::buffer::OffsetBuffer::new(vec![0, 2].into()), |
| 561 | + entries, |
| 562 | + None, |
| 563 | + false, |
| 564 | + ); |
| 565 | + let array: ArrayRef = Arc::new(map); |
| 566 | + |
| 567 | + let target_type = DataType::Map(logical_entries_field, false); |
| 568 | + let result = relabel_array(array, &target_type); |
| 569 | + assert_eq!(result.data_type(), &target_type); |
| 570 | + } |
| 571 | + |
| 572 | + #[test] |
| 573 | + fn test_relabel_struct_metadata() { |
| 574 | + // Physical: Struct { Field("a", Int32, metadata={"PARQUET:field_id": "1"}) } |
| 575 | + // Logical: Struct { Field("a", Int32, metadata={}) } |
| 576 | + let mut metadata = std::collections::HashMap::new(); |
| 577 | + metadata.insert("PARQUET:field_id".to_string(), "1".to_string()); |
| 578 | + let physical_field = |
| 579 | + Arc::new(Field::new("a", DataType::Int32, true).with_metadata(metadata)); |
| 580 | + let logical_field = Arc::new(Field::new("a", DataType::Int32, true)); |
| 581 | + |
| 582 | + let col = Int32Array::from(vec![10, 20]); |
| 583 | + let physical_fields = Fields::from(vec![physical_field]); |
| 584 | + let logical_fields = Fields::from(vec![logical_field]); |
| 585 | + |
| 586 | + let struct_arr = StructArray::new(physical_fields, vec![Arc::new(col)], None); |
| 587 | + let array: ArrayRef = Arc::new(struct_arr); |
| 588 | + |
| 589 | + let target_type = DataType::Struct(logical_fields); |
| 590 | + let result = relabel_array(array, &target_type); |
| 591 | + assert_eq!(result.data_type(), &target_type); |
| 592 | + } |
| 593 | + |
| 594 | + #[test] |
| 595 | + fn test_relabel_nested_struct_containing_list() { |
| 596 | + // Physical: Struct { Field("col", List(Field("item", Int32))) } |
| 597 | + // Logical: Struct { Field("col", List(Field("element", Int32))) } |
| 598 | + let physical_list_field = Arc::new(Field::new("item", DataType::Int32, true)); |
| 599 | + let logical_list_field = Arc::new(Field::new("element", DataType::Int32, true)); |
| 600 | + |
| 601 | + let physical_struct_field = Arc::new(Field::new( |
| 602 | + "col", |
| 603 | + DataType::List(physical_list_field.clone()), |
| 604 | + true, |
| 605 | + )); |
| 606 | + let logical_struct_field = Arc::new(Field::new( |
| 607 | + "col", |
| 608 | + DataType::List(logical_list_field.clone()), |
| 609 | + true, |
| 610 | + )); |
| 611 | + |
| 612 | + let values = Int32Array::from(vec![1, 2, 3]); |
| 613 | + let list = ListArray::new( |
| 614 | + physical_list_field, |
| 615 | + arrow::buffer::OffsetBuffer::new(vec![0, 2, 3].into()), |
| 616 | + Arc::new(values), |
| 617 | + None, |
| 618 | + ); |
| 619 | + |
| 620 | + let physical_fields = Fields::from(vec![physical_struct_field]); |
| 621 | + let logical_fields = Fields::from(vec![logical_struct_field]); |
| 622 | + |
| 623 | + let struct_arr = StructArray::new(physical_fields, vec![Arc::new(list) as ArrayRef], None); |
| 624 | + let array: ArrayRef = Arc::new(struct_arr); |
| 625 | + |
| 626 | + let target_type = DataType::Struct(logical_fields); |
| 627 | + let result = relabel_array(array, &target_type); |
| 628 | + assert_eq!(result.data_type(), &target_type); |
| 629 | + |
| 630 | + // Verify we can access the nested data without panics |
| 631 | + let result_struct = result.as_any().downcast_ref::<StructArray>().unwrap(); |
| 632 | + let result_list = result_struct |
| 633 | + .column(0) |
| 634 | + .as_any() |
| 635 | + .downcast_ref::<ListArray>() |
| 636 | + .unwrap(); |
| 637 | + assert_eq!(result_list.len(), 2); |
| 638 | + } |
458 | 639 | } |
0 commit comments