@@ -25,7 +25,7 @@ use datafusion::common::{internal_err, ScalarValue};
2525use datafusion:: logical_expr:: ColumnarValue ;
2626use datafusion:: physical_expr:: PhysicalExpr ;
2727use jni:: {
28- objects:: JByteArray ,
28+ objects:: { JByteArray , JString } ,
2929 sys:: { jboolean, jbyte, jint, jlong, jshort} ,
3030} ;
3131use std:: {
@@ -80,68 +80,66 @@ impl PhysicalExpr for Subquery {
8080 }
8181
8282 fn evaluate ( & self , _: & RecordBatch ) -> datafusion:: common:: Result < ColumnarValue > {
83- let mut env = JVMClasses :: get_env ( ) ?;
84-
85- unsafe {
86- let is_null = jni_static_call ! ( & mut env,
83+ JVMClasses :: with_env ( |env| unsafe {
84+ let is_null = jni_static_call ! ( env,
8785 comet_exec. is_null( self . exec_context_id, self . id) -> jboolean
8886 ) ?;
8987
90- if is_null > 0 {
88+ if is_null {
9189 return Ok ( ColumnarValue :: Scalar ( ScalarValue :: try_from (
9290 & self . data_type ,
9391 ) ?) ) ;
9492 }
9593
9694 match & self . data_type {
9795 DataType :: Boolean => {
98- let r = jni_static_call ! ( & mut env,
96+ let r = jni_static_call ! ( env,
9997 comet_exec. get_bool( self . exec_context_id, self . id) -> jboolean
10098 ) ?;
101- Ok ( ColumnarValue :: Scalar ( ScalarValue :: Boolean ( Some ( r > 0 ) ) ) )
99+ Ok ( ColumnarValue :: Scalar ( ScalarValue :: Boolean ( Some ( r) ) ) )
102100 }
103101 DataType :: Int8 => {
104- let r = jni_static_call ! ( & mut env,
102+ let r = jni_static_call ! ( env,
105103 comet_exec. get_byte( self . exec_context_id, self . id) -> jbyte
106104 ) ?;
107105 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Int8 ( Some ( r) ) ) )
108106 }
109107 DataType :: Int16 => {
110- let r = jni_static_call ! ( & mut env,
108+ let r = jni_static_call ! ( env,
111109 comet_exec. get_short( self . exec_context_id, self . id) -> jshort
112110 ) ?;
113111 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Int16 ( Some ( r) ) ) )
114112 }
115113 DataType :: Int32 => {
116- let r = jni_static_call ! ( & mut env,
114+ let r = jni_static_call ! ( env,
117115 comet_exec. get_int( self . exec_context_id, self . id) -> jint
118116 ) ?;
119117 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Int32 ( Some ( r) ) ) )
120118 }
121119 DataType :: Int64 => {
122- let r = jni_static_call ! ( & mut env,
120+ let r = jni_static_call ! ( env,
123121 comet_exec. get_long( self . exec_context_id, self . id) -> jlong
124122 ) ?;
125123 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Int64 ( Some ( r) ) ) )
126124 }
127125 DataType :: Float32 => {
128- let r = jni_static_call ! ( & mut env,
126+ let r = jni_static_call ! ( env,
129127 comet_exec. get_float( self . exec_context_id, self . id) -> f32
130128 ) ?;
131129 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Float32 ( Some ( r) ) ) )
132130 }
133131 DataType :: Float64 => {
134- let r = jni_static_call ! ( & mut env,
132+ let r = jni_static_call ! ( env,
135133 comet_exec. get_double( self . exec_context_id, self . id) -> f64
136134 ) ?;
137135
138136 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Float64 ( Some ( r) ) ) )
139137 }
140138 DataType :: Decimal128 ( p, s) => {
141- let bytes = jni_static_call ! ( & mut env,
139+ let bytes = jni_static_call ! ( env,
142140 comet_exec. get_decimal( self . exec_context_id, self . id) -> BinaryWrapper
143141 ) ?;
144- let bytes: & JByteArray = bytes. get ( ) . into ( ) ;
142+ let bytes = JByteArray :: from_raw ( env , bytes. get ( ) . as_raw ( ) ) ;
145143 let slice = env. convert_byte_array ( bytes) . unwrap ( ) ;
146144
147145 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Decimal128 (
@@ -151,14 +149,14 @@ impl PhysicalExpr for Subquery {
151149 ) ) )
152150 }
153151 DataType :: Date32 => {
154- let r = jni_static_call ! ( & mut env,
152+ let r = jni_static_call ! ( env,
155153 comet_exec. get_int( self . exec_context_id, self . id) -> jint
156154 ) ?;
157155
158156 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Date32 ( Some ( r) ) ) )
159157 }
160158 DataType :: Timestamp ( TimeUnit :: Microsecond , timezone) => {
161- let r = jni_static_call ! ( & mut env,
159+ let r = jni_static_call ! ( env,
162160 comet_exec. get_long( self . exec_context_id, self . id) -> jlong
163161 ) ?;
164162
@@ -168,25 +166,27 @@ impl PhysicalExpr for Subquery {
168166 ) ) )
169167 }
170168 DataType :: Utf8 => {
171- let string = jni_static_call ! ( & mut env,
169+ let string = jni_static_call ! ( env,
172170 comet_exec. get_string( self . exec_context_id, self . id) -> StringWrapper
173171 ) ?;
174172
175- let string = env. get_string ( string. get ( ) ) . unwrap ( ) . into ( ) ;
173+ let string = JString :: from_raw ( env, string. get ( ) . as_raw ( ) )
174+ . try_to_string ( env)
175+ . unwrap ( ) ;
176176 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Utf8 ( Some ( string) ) ) )
177177 }
178178 DataType :: Binary => {
179- let bytes = jni_static_call ! ( & mut env,
179+ let bytes = jni_static_call ! ( env,
180180 comet_exec. get_binary( self . exec_context_id, self . id) -> BinaryWrapper
181181 ) ?;
182- let bytes: & JByteArray = bytes. get ( ) . into ( ) ;
182+ let bytes = JByteArray :: from_raw ( env , bytes. get ( ) . as_raw ( ) ) ;
183183 let slice = env. convert_byte_array ( bytes) . unwrap ( ) ;
184184
185185 Ok ( ColumnarValue :: Scalar ( ScalarValue :: Binary ( Some ( slice) ) ) )
186186 }
187187 _ => internal_err ! ( "Unsupported scalar subquery data type {:?}" , self . data_type) ,
188188 }
189- }
189+ } )
190190 }
191191
192192 fn children ( & self ) -> Vec < & Arc < dyn PhysicalExpr > > {
0 commit comments