@@ -21,7 +21,7 @@ use arrow::array::{
2121} ;
2222use arrow:: buffer:: MutableBuffer ;
2323use arrow:: datatypes:: { DataType , Int32Type } ;
24- use datafusion:: common:: { exec_err, internal_datafusion_err, DataFusionError , Result } ;
24+ use datafusion:: common:: { exec_err, internal_datafusion_err, DataFusionError , Result , ScalarValue } ;
2525use datafusion:: logical_expr:: {
2626 ColumnarValue , ScalarFunctionArgs , ScalarUDFImpl , Signature , Volatility ,
2727} ;
@@ -86,30 +86,50 @@ impl ScalarUDFImpl for SparkStringSpace {
8686pub fn spark_string_space ( args : & [ ColumnarValue ; 1 ] ) -> Result < ColumnarValue > {
8787 match args {
8888 [ ColumnarValue :: Array ( array) ] => {
89- let result = string_space ( & array) ?;
90-
89+ let result = string_space_array ( & array) ?;
9190 Ok ( ColumnarValue :: Array ( result) )
9291 }
93- _ => exec_err ! ( "StringSpace(scalar) should be fold in Spark JVM side." ) ,
92+ [ ColumnarValue :: Scalar ( scalar) ] => {
93+ let result = string_space_scalar ( scalar) ?;
94+ Ok ( ColumnarValue :: Scalar ( result) )
95+ }
9496 }
9597}
9698
97- fn string_space ( length : & dyn Array ) -> std:: result:: Result < ArrayRef , DataFusionError > {
99+ fn string_space_array ( length : & dyn Array ) -> std:: result:: Result < ArrayRef , DataFusionError > {
98100 match length. data_type ( ) {
99101 DataType :: Int32 => {
100102 let array = length. as_any ( ) . downcast_ref :: < Int32Array > ( ) . unwrap ( ) ;
101103 Ok ( generic_string_space :: < i32 > ( array) )
102104 }
103105 DataType :: Dictionary ( _, _) => {
104106 let dict = as_dictionary_array :: < Int32Type > ( length) ;
105- let values = string_space ( dict. values ( ) ) ?;
107+ let values = string_space_array ( dict. values ( ) ) ?;
106108 let result = DictionaryArray :: try_new ( dict. keys ( ) . clone ( ) , values) ?;
107109 Ok ( Arc :: new ( result) )
108110 }
109111 other => exec_err ! ( "Unsupported input type for function 'string_space': {other:?}" ) ,
110112 }
111113}
112114
115+ fn string_space_scalar ( scalar : & ScalarValue ) -> Result < ScalarValue > {
116+ match scalar {
117+ ScalarValue :: Int32 ( value) => {
118+ let result = value. map ( |v| {
119+ if v <= 0 {
120+ String :: new ( )
121+ } else {
122+ " " . repeat ( v as usize )
123+ }
124+ } ) ;
125+ Ok ( ScalarValue :: Utf8 ( result) )
126+ }
127+ other => {
128+ exec_err ! ( "Unsupported data type {other:?} for function `space`" )
129+ }
130+ }
131+ }
132+
113133fn generic_string_space < OffsetSize : OffsetSizeTrait > ( length : & Int32Array ) -> ArrayRef {
114134 let array_len = length. len ( ) ;
115135 let mut offsets = MutableBuffer :: new ( ( array_len + 1 ) * std:: mem:: size_of :: < OffsetSize > ( ) ) ;
0 commit comments