Skip to content

Commit e9dafd0

Browse files
Feat: to_csv (#3004)
1 parent 9b05dfe commit e9dafd0

14 files changed

Lines changed: 813 additions & 4 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ jobs:
262262
org.apache.comet.CometStringExpressionSuite
263263
org.apache.comet.CometBitwiseExpressionSuite
264264
org.apache.comet.CometMapExpressionSuite
265+
org.apache.comet.CometCsvExpressionSuite
265266
org.apache.comet.CometJsonExpressionSuite
266267
org.apache.comet.expressions.conditional.CometIfSuite
267268
org.apache.comet.expressions.conditional.CometCoalesceSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ jobs:
206206
org.apache.comet.CometBitwiseExpressionSuite
207207
org.apache.comet.CometMapExpressionSuite
208208
org.apache.comet.CometJsonExpressionSuite
209+
org.apache.comet.CometCsvExpressionSuite
209210
org.apache.comet.expressions.conditional.CometIfSuite
210211
org.apache.comet.expressions.conditional.CometCoalesceSuite
211212
org.apache.comet.expressions.conditional.CometCaseWhenSuite

native/core/src/execution/planner.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ use datafusion::{
7171
};
7272
use datafusion_comet_spark_expr::{
7373
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
74-
BloomFilterAgg, BloomFilterMightContain, EvalMode, SumInteger,
74+
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SumInteger, ToCsv,
7575
};
7676
use iceberg::expr::Bind;
7777

@@ -585,6 +585,25 @@ impl PhysicalPlanner {
585585
ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new(
586586
MonotonicallyIncreasingId::from_partition_id(self.partition),
587587
)),
588+
ExprStruct::ToCsv(expr) => {
589+
let csv_struct_expr =
590+
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
591+
let options = expr.options.clone().unwrap();
592+
let csv_write_options = CsvWriteOptions::new(
593+
options.delimiter,
594+
options.quote,
595+
options.escape,
596+
options.null_value,
597+
options.quote_all,
598+
options.ignore_leading_white_space,
599+
options.ignore_trailing_white_space,
600+
);
601+
Ok(Arc::new(ToCsv::new(
602+
csv_struct_expr,
603+
&options.timezone,
604+
csv_write_options,
605+
)))
606+
}
588607
expr => Err(GeneralError(format!("Not implemented: {expr:?}"))),
589608
}
590609
}

native/proto/src/proto/expr.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ message Expr {
8787
EmptyExpr monotonically_increasing_id = 64;
8888
UnixTimestamp unix_timestamp = 65;
8989
FromJson from_json = 66;
90+
ToCsv to_csv = 67;
9091
}
9192
}
9293

@@ -276,6 +277,22 @@ message FromJson {
276277
string timezone = 3;
277278
}
278279

280+
message ToCsv {
281+
Expr child = 1;
282+
CsvWriteOptions options = 2;
283+
}
284+
285+
message CsvWriteOptions {
286+
string delimiter = 1;
287+
string quote = 2;
288+
string escape = 3;
289+
string null_value = 4;
290+
bool quote_all = 5;
291+
bool ignore_leading_white_space = 6;
292+
bool ignore_trailing_white_space = 7;
293+
string timezone = 8;
294+
}
295+
279296
enum BinaryOutputStyle {
280297
UTF8 = 0;
281298
BASIC = 1;

native/spark-expr/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ harness = false
8888
name = "normalize_nan"
8989
harness = false
9090

91+
[[bench]]
92+
name = "to_csv"
93+
harness = false
94+
9195
[[test]]
9296
name = "test_udf_registration"
9397
path = "tests/spark_expr_reg.rs"
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{
19+
BooleanBuilder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, StringBuilder,
20+
StructArray, StructBuilder,
21+
};
22+
use arrow::datatypes::{DataType, Field};
23+
use criterion::{criterion_group, criterion_main, Criterion};
24+
use datafusion_comet_spark_expr::{to_csv_inner, CsvWriteOptions, EvalMode, SparkCastOptions};
25+
use std::hint::black_box;
26+
27+
fn create_struct_array(array_size: usize) -> StructArray {
28+
let fields = vec![
29+
Field::new("f1", DataType::Boolean, true),
30+
Field::new("f2", DataType::Int8, true),
31+
Field::new("f3", DataType::Int16, true),
32+
Field::new("f4", DataType::Int32, true),
33+
Field::new("f5", DataType::Int64, true),
34+
Field::new("f6", DataType::Utf8, true),
35+
];
36+
let mut struct_builder = StructBuilder::from_fields(fields, array_size);
37+
for i in 0..array_size {
38+
struct_builder
39+
.field_builder::<BooleanBuilder>(0)
40+
.unwrap()
41+
.append_option(if i % 10 == 0 { None } else { Some(i % 2 == 0) });
42+
43+
struct_builder
44+
.field_builder::<Int8Builder>(1)
45+
.unwrap()
46+
.append_option(if i % 10 == 0 {
47+
None
48+
} else {
49+
Some((i % 128) as i8)
50+
});
51+
52+
struct_builder
53+
.field_builder::<Int16Builder>(2)
54+
.unwrap()
55+
.append_option(if i % 10 == 0 { None } else { Some(i as i16) });
56+
57+
struct_builder
58+
.field_builder::<Int32Builder>(3)
59+
.unwrap()
60+
.append_option(if i % 10 == 0 { None } else { Some(i as i32) });
61+
62+
struct_builder
63+
.field_builder::<Int64Builder>(4)
64+
.unwrap()
65+
.append_option(if i % 10 == 0 { None } else { Some(i as i64) });
66+
67+
struct_builder
68+
.field_builder::<StringBuilder>(5)
69+
.unwrap()
70+
.append_option(if i % 10 == 0 {
71+
None
72+
} else {
73+
Some(format!("string_{}", i))
74+
});
75+
76+
struct_builder.append(true);
77+
}
78+
struct_builder.finish()
79+
}
80+
81+
fn criterion_benchmark(c: &mut Criterion) {
82+
let array_size = 8192;
83+
let timezone = "UTC";
84+
let struct_array = create_struct_array(array_size);
85+
let default_delimiter = ",";
86+
let default_null_value = "";
87+
let default_quote = "\"";
88+
let default_escape = "\\";
89+
let mut cast_options = SparkCastOptions::new(EvalMode::Legacy, timezone, false);
90+
cast_options.null_string = default_null_value.to_string();
91+
let csv_write_options = CsvWriteOptions::new(
92+
default_delimiter.to_string(),
93+
default_quote.to_string(),
94+
default_escape.to_string(),
95+
default_null_value.to_string(),
96+
false,
97+
true,
98+
true,
99+
);
100+
c.bench_function("to_csv", |b| {
101+
b.iter(|| {
102+
black_box(to_csv_inner(&struct_array, &cast_options, &csv_write_options).unwrap())
103+
})
104+
});
105+
}
106+
107+
criterion_group!(benches, criterion_benchmark);
108+
criterion_main!(benches);
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::fmt::{Display, Formatter};
19+
20+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
21+
pub struct CsvWriteOptions {
22+
pub delimiter: String,
23+
pub quote: String,
24+
pub escape: String,
25+
pub null_value: String,
26+
pub quote_all: bool,
27+
pub ignore_leading_white_space: bool,
28+
pub ignore_trailing_white_space: bool,
29+
}
30+
31+
impl Display for CsvWriteOptions {
32+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33+
write!(
34+
f,
35+
"csv_write_options(delimiter={}, quote={}, escape={}, null_value={}, quote_all={}, ignore_leading_white_space={}, ignore_trailing_white_space={})",
36+
self.delimiter, self.quote, self.escape, self.null_value, self.quote_all, self.ignore_leading_white_space, self.ignore_trailing_white_space
37+
)
38+
}
39+
}
40+
41+
impl CsvWriteOptions {
42+
pub fn new(
43+
delimiter: String,
44+
quote: String,
45+
escape: String,
46+
null_value: String,
47+
quote_all: bool,
48+
ignore_leading_white_space: bool,
49+
ignore_trailing_white_space: bool,
50+
) -> Self {
51+
Self {
52+
delimiter,
53+
quote,
54+
escape,
55+
null_value,
56+
quote_all,
57+
ignore_leading_white_space,
58+
ignore_trailing_white_space,
59+
}
60+
}
61+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
mod csv_write_options;
19+
mod to_csv;
20+
21+
pub use csv_write_options::CsvWriteOptions;
22+
pub use to_csv::{to_csv_inner, ToCsv};

0 commit comments

Comments
 (0)