Skip to content

Commit 627cb3d

Browse files
committed
Df52 migration
1 parent f7aad61 commit 627cb3d

1 file changed

Lines changed: 23 additions & 2 deletions

File tree

native/core/src/parquet/parquet_exec.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ use datafusion_comet_spark_expr::EvalMode;
3636
use datafusion_datasource::TableSchema;
3737
use std::collections::HashMap;
3838
use std::sync::Arc;
39+
use arrow::util::pretty::print_batches;
40+
use datafusion::physical_plan::ExecutionPlan;
41+
use futures::StreamExt;
3942

4043
/// Initializes a DataSourceExec plan with a ParquetSource. This may be used by either the
4144
/// `native_datafusion` scan or the `native_iceberg_compat` scan.
@@ -137,7 +140,7 @@ pub(crate) fn init_datasource_exec(
137140
.collect();
138141

139142
let mut file_scan_config_builder =
140-
FileScanConfigBuilder::new(object_store_url, file_source).with_file_groups(file_groups).with_expr_adapter(Some(expr_adapter_factory));
143+
FileScanConfigBuilder::new(object_store_url, file_source).with_file_groups(file_groups);//.with_expr_adapter(Some(expr_adapter_factory));
141144

142145
if let Some(projection_vector) = projection_vector {
143146
file_scan_config_builder = file_scan_config_builder
@@ -146,7 +149,25 @@ pub(crate) fn init_datasource_exec(
146149

147150
let file_scan_config = file_scan_config_builder.build();
148151

149-
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
152+
let data_source_exec = Arc::new(DataSourceExec::new(Arc::new(file_scan_config)));
153+
154+
// Debug: Execute the plan and print output RecordBatches
155+
// let debug_ctx = SessionContext::default();
156+
// let task_ctx = debug_ctx.task_ctx();
157+
// if let Ok(stream) = data_source_exec.execute(0, task_ctx) {
158+
// let rt = tokio::runtime::Runtime::new().unwrap();
159+
// rt.block_on(async {
160+
// let batches: Vec<_> = stream.collect::<Vec<_>>().await;
161+
// let record_batches: Vec<_> = batches.into_iter().filter_map(|r| r.ok()).collect();
162+
// println!("=== DataSourceExec output RecordBatches ===");
163+
// if let Err(e) = print_batches(&record_batches) {
164+
// println!("Error printing batches: {:?}", e);
165+
// }
166+
// println!("=== End of DataSourceExec output ===");
167+
// });
168+
// }
169+
170+
Ok(data_source_exec)
150171
}
151172

152173
fn get_options(

0 commit comments

Comments
 (0)