diff --git a/src/cast_to_variant.rs b/src/cast_to_variant.rs index 8513d7e..7d7e233 100644 --- a/src/cast_to_variant.rs +++ b/src/cast_to_variant.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, AsArray, StructArray}; use arrow_schema::{DataType, Field}; use datafusion::{ - common::exec_err, error::Result, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, @@ -14,7 +13,9 @@ use datafusion::{ use parquet_variant::Variant; use parquet_variant_compute::{VariantArray, VariantArrayBuilder, cast_to_variant}; -use crate::shared::{try_parse_binary_columnar, try_parse_binary_scalar}; +use crate::shared::{ + arg_shape_err, args_count_err, try_parse_binary_columnar, try_parse_binary_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct CastToVariantUdf { @@ -57,15 +58,19 @@ impl CastToVariantUdf { } fn from_metadata_value( + udf_name: &str, metadata_argument: &ColumnarValue, variant_argument: &ColumnarValue, ) -> Result { let out = match (metadata_argument, variant_argument) { (ColumnarValue::Array(metadata_array), ColumnarValue::Array(value_array)) => { if metadata_array.len() != value_array.len() { - return exec_err!( - "expected metadata array to be of same length as variant array" - ); + return Err(arg_shape_err( + udf_name, + 2, + "array with same length as arg #1", + "array with different length", + )); } let metadata_array = try_parse_binary_columnar(metadata_array)?; @@ -180,11 +185,11 @@ impl ScalarUDFImpl for CastToVariantUdf { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { match args.args.as_slice() { [metadata_value, variant_value] => { - Self::from_metadata_value(metadata_value, variant_value) + Self::from_metadata_value(self.name(), metadata_value, variant_value) } [ColumnarValue::Scalar(scalar_value)] => Self::from_scalar_value(scalar_value), [ColumnarValue::Array(array)] => Self::from_array(array), - _ => exec_err!("unrecognized argument"), + _ => Err(args_count_err(self.name(), "1 or 2", args.args.len())), } } } diff --git a/src/is_variant_null.rs b/src/is_variant_null.rs index ac6a6d0..8b24773 100644 --- a/src/is_variant_null.rs +++ b/src/is_variant_null.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use arrow::array::{ArrayRef, BooleanArray}; use arrow_schema::DataType; -use datafusion::common::{exec_datafusion_err, exec_err}; use datafusion::error::Result; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -11,7 +10,10 @@ use datafusion::scalar::ScalarValue; use parquet_variant::Variant; use parquet_variant_compute::VariantArray; -use crate::shared::{try_field_as_variant_array, try_parse_variant_scalar}; +use crate::shared::{ + arg_field_meta_missing_err, args_count_err, try_field_as_variant_array, + try_parse_variant_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct IsVariantNullUdf { @@ -47,12 +49,12 @@ impl ScalarUDFImpl for IsVariantNullUdf { let variant_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("expected 1 argument field type"))?; + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(variant_field.as_ref())?; let [variant_arg] = args.args.as_slice() else { - return exec_err!("expected 1 argument"); + return Err(args_count_err(self.name(), "1", args.args.len())); }; let out = match variant_arg { diff --git a/src/json_to_variant.rs b/src/json_to_variant.rs index d50e07e..9d83a26 100644 --- a/src/json_to_variant.rs +++ b/src/json_to_variant.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray, StructArray}; use arrow_schema::{DataType, Field, Fields}; use datafusion::{ - common::{exec_datafusion_err, exec_err}, + common::exec_datafusion_err, error::Result, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, @@ -15,7 +15,7 @@ use datafusion::{ use parquet_variant_compute::{VariantArrayBuilder, VariantType}; use parquet_variant_json::JsonToVariant as JsonToVariantExt; -use crate::shared::{try_field_as_string, try_parse_string_scalar}; +use crate::shared::{arg_type_err, args_count_err, try_field_as_string, try_parse_string_scalar}; /// Returns a Variant from a JSON string #[derive(Debug, Hash, PartialEq, Eq)] @@ -74,14 +74,14 @@ impl ScalarUDFImpl for JsonToVariantUdf { let arg_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + .ok_or_else(|| args_count_err(self.name(), "1", args.arg_fields.len()))?; try_field_as_string(arg_field.as_ref())?; let arg = args .args .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + .ok_or_else(|| args_count_err(self.name(), "1", args.args.len()))?; let out = match arg { ColumnarValue::Scalar(scalar_value) => { @@ -101,7 +101,14 @@ impl ScalarUDFImpl for JsonToVariantUdf { DataType::Utf8 => ColumnarValue::Array(from_utf8_arr(arr)?), DataType::LargeUtf8 => ColumnarValue::Array(from_large_utf8_arr(arr)?), DataType::Utf8View => ColumnarValue::Array(from_utf8view_arr(arr)?), - _ => return exec_err!("Invalid data type {}", arr.data_type()), + _ => { + return arg_type_err( + self.name(), + 1, + "Utf8, LargeUtf8, or Utf8View", + arr.data_type(), + ); + } }, }; diff --git a/src/shared.rs b/src/shared.rs index 162a2c2..ee52b0c 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -8,7 +8,7 @@ use arrow_schema::Fields; use arrow_schema::extension::ExtensionType; use arrow_schema::{DataType, Field}; use datafusion::common::exec_datafusion_err; -use datafusion::error::Result; +use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; use datafusion::{common::exec_err, scalar::ScalarValue}; use parquet_variant::Variant; @@ -262,6 +262,60 @@ pub fn ensure(pred: bool, err_msg: &str) -> Result<()> { Ok(()) } +/// Helper for argument count errors. +pub fn args_count_err(udf: &str, expected: &'static str, actual: usize) -> DataFusionError { + DataFusionError::Execution(format!( + "{udf}: expected {expected} argument(s), got {actual}" + )) +} + +/// Helper for argument type errors. +pub fn arg_type_err( + udf: &str, + arg_index: u8, + expected: &str, + actual: &DataType, +) -> Result { + Err(DataFusionError::Execution(format!( + "{udf} arg #{arg_index}: expected {expected}, got {actual}" + ))) +} + +/// Helper for unexpected NULL argument values. +pub fn arg_null_err(udf: &str, arg_index: u8, expected: &str) -> Result { + Err(arg_null_error(udf, arg_index, expected)) +} + +/// Helper for unexpected NULL argument values as a plain DataFusionError. +pub fn arg_null_error(udf: &str, arg_index: u8, expected: &str) -> DataFusionError { + DataFusionError::Execution(format!( + "{udf} arg #{arg_index}: expected {expected}, got NULL" + )) +} + +/// Helper for scalar/array shape mismatches. +pub fn arg_shape_err(udf: &str, arg_index: u8, expected: &str, actual: &str) -> DataFusionError { + DataFusionError::Execution(format!( + "{udf} arg #{arg_index}: expected {expected}, got {actual}" + )) +} + +/// Helper for invalid Variant kind in an argument. +pub fn arg_variant_kind_err( + udf: &str, + arg_index: u8, + expected_variant_kind: &str, +) -> DataFusionError { + DataFusionError::Execution(format!( + "{udf} arg #{arg_index}: expected variant {expected_variant_kind}" + )) +} + +/// Helper for missing argument field metadata. +pub fn arg_field_meta_missing_err(udf: &str, arg_index: u8) -> DataFusionError { + DataFusionError::Execution(format!("{udf} arg #{arg_index} field metadata is missing")) +} + // test related methods #[cfg(test)] diff --git a/src/variant_get.rs b/src/variant_get.rs index add0a0c..d300cab 100644 --- a/src/variant_get.rs +++ b/src/variant_get.rs @@ -8,7 +8,7 @@ use arrow::{ }; use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields}; use datafusion::{ - common::{arrow_datafusion_err, exec_datafusion_err, exec_err}, + common::{arrow_datafusion_err, exec_datafusion_err}, error::{DataFusionError, Result}, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, @@ -22,19 +22,29 @@ use parquet_variant_json::VariantToJson; use crate::impl_variant_get::impl_variant_get_typed; use crate::shared::{ + arg_field_meta_missing_err, arg_null_err, arg_shape_err, arg_type_err, args_count_err, invoke_variant_get_typed, try_field_as_variant_array, try_parse_string_columnar, try_parse_string_scalar, }; -fn type_hint_from_scalar(field_name: &str, scalar: &ScalarValue) -> Result { +fn type_hint_from_scalar( + udf_name: &str, + field_name: &str, + scalar: &ScalarValue, +) -> Result { let type_name = match scalar { ScalarValue::Utf8(Some(value)) | ScalarValue::Utf8View(Some(value)) | ScalarValue::LargeUtf8(Some(value)) => value.as_str(), + ScalarValue::Utf8(None) | ScalarValue::Utf8View(None) | ScalarValue::LargeUtf8(None) => { + return arg_null_err(udf_name, 3, "a non-null UTF8 literal"); + } other => { - return exec_err!( - "type hint must be a non-null UTF8 literal, got {}", - other.data_type() + return arg_type_err( + udf_name, + 3, + "Utf8, LargeUtf8, or Utf8View", + &other.data_type(), ); } }; @@ -48,12 +58,10 @@ fn type_hint_from_scalar(field_name: &str, scalar: &ScalarValue) -> Result Result { +fn type_hint_from_value(udf_name: &str, field_name: &str, arg: &ColumnarValue) -> Result { match arg { - ColumnarValue::Scalar(value) => type_hint_from_scalar(field_name, value), - ColumnarValue::Array(_) => { - exec_err!("type hint argument must be a scalar UTF8 literal") - } + ColumnarValue::Scalar(value) => type_hint_from_scalar(udf_name, field_name, value), + ColumnarValue::Array(_) => Err(arg_shape_err(udf_name, 3, "scalar value", "array value")), } } @@ -96,18 +104,19 @@ fn invoke_variant_get( let (variant_arg, variant_path, type_arg) = match args.args.as_slice() { [variant_arg, variant_path] => (variant_arg, variant_path, None), [variant_arg, variant_path, type_arg] => (variant_arg, variant_path, Some(type_arg)), - _ => return exec_err!("expected 2 or 3 arguments"), + _ => return Err(args_count_err(udf_name, "2 or 3", args.args.len())), }; let variant_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("expected argument field"))?; + .ok_or_else(|| arg_field_meta_missing_err(udf_name, 1))?; try_field_as_variant_array(variant_field.as_ref())?; + let type_field_name = args.return_field.name(); let type_field = type_arg - .map(|arg| type_hint_from_value(udf_name, arg)) + .map(|arg| type_hint_from_value(udf_name, type_field_name, arg)) .transpose()?; let out = match (variant_arg, variant_path) { @@ -125,7 +134,7 @@ fn invoke_variant_get( } (ColumnarValue::Scalar(scalar_variant), ColumnarValue::Scalar(variant_path)) => { let ScalarValue::Struct(variant_array) = scalar_variant else { - return exec_err!("expected struct array"); + return arg_type_err(udf_name, 1, "Struct", &scalar_variant.data_type()); }; let variant_array = Arc::clone(variant_array) as ArrayRef; @@ -144,7 +153,12 @@ fn invoke_variant_get( } (ColumnarValue::Array(variant_array), ColumnarValue::Array(variant_paths)) => { if variant_array.len() != variant_paths.len() { - return exec_err!("expected variant_array and variant paths to be of same length"); + return Err(arg_shape_err( + udf_name, + 2, + "array with same length as arg #1", + "array with different length", + )); } let variant_paths = try_parse_string_columnar(variant_paths)?; @@ -175,7 +189,7 @@ fn invoke_variant_get( } (ColumnarValue::Scalar(scalar_variant), ColumnarValue::Array(variant_paths)) => { let ScalarValue::Struct(variant_array) = scalar_variant else { - return exec_err!("expected struct array"); + return arg_type_err(udf_name, 1, "Struct", &scalar_variant.data_type()); }; let variant_array = Arc::clone(variant_array) as ArrayRef; @@ -206,7 +220,7 @@ fn return_field_for_variant_get(name: &str, args: ReturnFieldArgs) -> Result Result { fn delete_list_element(variant_list: Variant, index: usize) -> Result<(Vec, Vec)> { let Variant::List(variant_list) = variant_list else { - return exec_err!("expected variant list"); + return Err(arg_variant_kind_err("variant_list_delete", 1, "list")); }; if index >= variant_list.len() { @@ -109,7 +111,7 @@ impl ScalarUDFImpl for VariantListDelete { )?; let [variant_list_to_update, index_to_delete] = argument_values.as_slice() else { - return exec_err!("expected 2 arguments"); + return Err(args_count_err(self.name(), "2", argument_values.len())); }; ensure( @@ -119,7 +121,12 @@ impl ScalarUDFImpl for VariantListDelete { let index = { let ColumnarValue::Scalar(index) = index_to_delete else { - return exec_err!("expected scalar value for index"); + return Err(arg_shape_err( + self.name(), + 2, + "scalar integer value", + "array value", + )); }; try_parse_index_scalar(index)? diff --git a/src/variant_list_insert.rs b/src/variant_list_insert.rs index 8ceb8e1..7fb797c 100644 --- a/src/variant_list_insert.rs +++ b/src/variant_list_insert.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::StructArray; use arrow_schema::{DataType, Field, Fields}; use datafusion::{ - common::exec_err, error::{DataFusionError, Result}, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -13,7 +12,9 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{ensure, try_parse_variant_scalar}; +use crate::shared::{ + arg_shape_err, arg_variant_kind_err, args_count_err, ensure, try_parse_variant_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantListInsert { @@ -71,7 +72,7 @@ impl ScalarUDFImpl for VariantListInsert { )?; let [variant_list_to_update, element_to_append] = argument_values.as_slice() else { - return exec_err!("expected 2 arguments"); + return Err(args_count_err(self.name(), "2", argument_values.len())); }; let all_arguments_variant_field = argument_fields @@ -195,9 +196,12 @@ impl ScalarUDFImpl for VariantListInsert { Ok(ColumnarValue::Array(Arc::new(out) as _)) } - (ColumnarValue::Scalar(_), ColumnarValue::Array(_)) => { - exec_err!("unsupported argument") - } + (ColumnarValue::Scalar(_), ColumnarValue::Array(_)) => Err(arg_shape_err( + self.name(), + 2, + "scalar value when arg #1 is scalar", + "array value", + )), } } } @@ -209,7 +213,7 @@ fn create_variant_list_with_new_elements<'m, 'v>( elements_to_insert: impl Iterator>, ) -> Result<(Vec, Vec)> { let Variant::List(variant_list) = variant_list else { - return exec_err!("expected variant list"); + return Err(arg_variant_kind_err("variant_list_insert", 1, "list")); }; // note: I wonder if we can abstract this away diff --git a/src/variant_normalize.rs b/src/variant_normalize.rs index 4a92b20..2415000 100644 --- a/src/variant_normalize.rs +++ b/src/variant_normalize.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use arrow::array::{ArrayRef, StructArray}; use arrow_schema::{DataType, Field, Fields}; -use datafusion::common::{exec_datafusion_err, exec_err}; use datafusion::error::Result; use datafusion::logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, @@ -11,7 +10,9 @@ use datafusion::logical_expr::{ use datafusion::scalar::ScalarValue; use parquet_variant_compute::{VariantArray, VariantArrayBuilder, VariantType}; -use crate::shared::try_field_as_variant_array; +use crate::shared::{ + arg_field_meta_missing_err, arg_type_err, args_count_err, try_field_as_variant_array, +}; /// Normalizes a Variant value into a canonical binary form. /// @@ -92,18 +93,18 @@ impl ScalarUDFImpl for VariantNormalizeUdf { let variant_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("expected 1 argument"))?; + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(variant_field.as_ref())?; let [variant_arg] = args.args.as_slice() else { - return exec_err!("expected 1 argument"); + return Err(args_count_err(self.name(), "1", args.args.len())); }; let out = match variant_arg { ColumnarValue::Scalar(scalar_variant) => { let ScalarValue::Struct(struct_array) = scalar_variant else { - return exec_err!("expected variant struct"); + return arg_type_err(self.name(), 1, "Struct", &scalar_variant.data_type()); }; let variant_array = VariantArray::try_new(struct_array.as_ref())?; diff --git a/src/variant_object_delete.rs b/src/variant_object_delete.rs index 9d21bda..9bd0fc7 100644 --- a/src/variant_object_delete.rs +++ b/src/variant_object_delete.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::StructArray; use arrow_schema::{DataType, Field, Fields}; use datafusion::{ - common::exec_err, error::{DataFusionError, Result}, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -13,7 +12,10 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{ensure, try_parse_string_scalar, try_parse_variant_scalar}; +use crate::shared::{ + arg_null_error, arg_shape_err, arg_variant_kind_err, args_count_err, ensure, + try_parse_string_scalar, try_parse_variant_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantObjectDelete { @@ -71,7 +73,7 @@ impl ScalarUDFImpl for VariantObjectDelete { )?; let [variant_object_to_update, key_to_delete] = argument_values.as_slice() else { - return exec_err!("expected 2 arguments"); + return Err(args_count_err(self.name(), "2", argument_values.len())); }; ensure( @@ -81,11 +83,16 @@ impl ScalarUDFImpl for VariantObjectDelete { let key = { let ColumnarValue::Scalar(key) = key_to_delete else { - return exec_err!("expected scalar value for key"); + return Err(arg_shape_err( + self.name(), + 2, + "scalar string value", + "array value", + )); }; try_parse_string_scalar(key)? - .ok_or_else(|| DataFusionError::Execution("expected non null string".into()))? + .ok_or_else(|| arg_null_error(self.name(), 2, "a non-null string literal"))? }; match variant_object_to_update { @@ -93,7 +100,7 @@ impl ScalarUDFImpl for VariantObjectDelete { let variant_object = try_parse_variant_scalar(scalar_variant_object_to_update)?; let variant_object = variant_object.value(0); let Variant::Object(variant_object) = variant_object else { - return exec_err!("expected variant object"); + return Err(arg_variant_kind_err(self.name(), 1, "object")); }; let mut v = VariantBuilder::new(); @@ -123,7 +130,7 @@ impl ScalarUDFImpl for VariantObjectDelete { v_opt .map(|variant_object| { let Variant::Object(variant_object) = variant_object else { - return exec_err!("expected variant object"); + return Err(arg_variant_kind_err(self.name(), 1, "object")); }; let mut v = VariantBuilder::new(); diff --git a/src/variant_object_insert.rs b/src/variant_object_insert.rs index ad5580e..46d78bf 100644 --- a/src/variant_object_insert.rs +++ b/src/variant_object_insert.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::StructArray; use arrow_schema::{DataType, Field, Fields}; use datafusion::{ - common::exec_err, error::{DataFusionError, Result}, logical_expr::{ ColumnarValue, ReturnFieldArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -13,7 +12,10 @@ use datafusion::{ use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::{VariantArray, VariantType}; -use crate::shared::{ensure, try_parse_string_scalar, try_parse_variant_scalar}; +use crate::shared::{ + arg_null_error, arg_shape_err, arg_variant_kind_err, args_count_err, ensure, + try_parse_string_scalar, try_parse_variant_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantObjectInsert { @@ -71,7 +73,7 @@ impl ScalarUDFImpl for VariantObjectInsert { )?; let [variant_object_to_update, key, value] = argument_values.as_slice() else { - return exec_err!("expected 3 arguments"); + return Err(args_count_err(self.name(), "3", argument_values.len())); }; { @@ -87,16 +89,26 @@ impl ScalarUDFImpl for VariantObjectInsert { let key = { let ColumnarValue::Scalar(key) = key else { - return exec_err!("expected scalar value for key"); + return Err(arg_shape_err( + self.name(), + 2, + "scalar string value", + "array value", + )); }; try_parse_string_scalar(key)? - .ok_or_else(|| DataFusionError::Execution("expected non null string".into()))? + .ok_or_else(|| arg_null_error(self.name(), 2, "a non-null string literal"))? }; let value_array = { let ColumnarValue::Scalar(value) = value else { - return exec_err!("expected scalar value for value"); + return Err(arg_shape_err( + self.name(), + 3, + "scalar variant value", + "array value", + )); }; try_parse_variant_scalar(value)? @@ -108,7 +120,7 @@ impl ScalarUDFImpl for VariantObjectInsert { let variant_object = try_parse_variant_scalar(scalar_variant_object_to_update)?; let variant_object = variant_object.value(0); let Variant::Object(variant_object) = variant_object else { - return exec_err!("expected variant object"); + return Err(arg_variant_kind_err(self.name(), 1, "object")); }; let mut v = VariantBuilder::new(); @@ -136,7 +148,7 @@ impl ScalarUDFImpl for VariantObjectInsert { v_opt .map(|variant_object| { let Variant::Object(variant_object) = variant_object else { - return exec_err!("expected variant object"); + return Err(arg_variant_kind_err(self.name(), 1, "object")); }; let mut v = VariantBuilder::new(); diff --git a/src/variant_object_keys.rs b/src/variant_object_keys.rs index a7bfaea..cb46133 100644 --- a/src/variant_object_keys.rs +++ b/src/variant_object_keys.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use arrow::array::{ArrayRef, ListBuilder, StringBuilder}; use arrow_schema::{DataType, Field}; -use datafusion::common::{exec_datafusion_err, exec_err}; use datafusion::error::Result; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -12,7 +11,10 @@ use parquet_variant::Variant; use parquet_variant::VariantPath; use parquet_variant_compute::{GetOptions, VariantArray, variant_get as compute_variant_get}; -use crate::shared::{try_field_as_variant_array, try_parse_string_scalar}; +use crate::shared::{ + arg_field_meta_missing_err, arg_null_error, arg_shape_err, arg_type_err, args_count_err, + try_field_as_variant_array, try_parse_string_scalar, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantObjectKeys { @@ -70,26 +72,31 @@ impl ScalarUDFImpl for VariantObjectKeys { [variant_arg] => (variant_arg, None), [variant_arg, path_arg] => { let ColumnarValue::Scalar(path_scalar) = path_arg else { - return exec_err!("expected scalar value for path"); + return Err(arg_shape_err( + self.name(), + 2, + "scalar string value", + "array value", + )); }; let path = try_parse_string_scalar(path_scalar)? - .ok_or_else(|| exec_datafusion_err!("expected non-null string for path"))?; + .ok_or_else(|| arg_null_error(self.name(), 2, "a non-null string literal"))?; (variant_arg, Some(path)) } - _ => return exec_err!("expected 1 or 2 arguments"), + _ => return Err(args_count_err(self.name(), "1 or 2", args.args.len())), }; let variant_field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("expected 1 argument field type"))?; + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(variant_field.as_ref())?; let out = match variant_arg { ColumnarValue::Scalar(scalar_variant) => { let ScalarValue::Struct(struct_arr) = scalar_variant else { - return exec_err!("expected variant scalar value"); + return arg_type_err(self.name(), 1, "Struct", &scalar_variant.data_type()); }; let arr: ArrayRef = Arc::clone(struct_arr) as ArrayRef; diff --git a/src/variant_pretty.rs b/src/variant_pretty.rs index 33e3783..fded548 100644 --- a/src/variant_pretty.rs +++ b/src/variant_pretty.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::array::StringViewArray; use arrow_schema::DataType; use datafusion::{ - common::{exec_datafusion_err, exec_err}, error::Result, logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -12,7 +11,9 @@ use datafusion::{ }; use parquet_variant_compute::VariantArray; -use crate::shared::try_field_as_variant_array; +use crate::shared::{ + arg_field_meta_missing_err, arg_type_err, args_count_err, try_field_as_variant_array, +}; #[derive(Debug, Hash, PartialEq, Eq)] pub struct VariantPretty { @@ -48,25 +49,25 @@ impl ScalarUDFImpl for VariantPretty { let field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(field.as_ref())?; let arg = args .args .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + .ok_or_else(|| args_count_err(self.name(), "1", args.args.len()))?; let out = match arg { ColumnarValue::Scalar(scalar) => { let ScalarValue::Struct(variant_array) = scalar else { - return exec_err!("Unsupported data type: {}", scalar.data_type()); + return arg_type_err(self.name(), 1, "Struct", &scalar.data_type()); }; let variant_array = VariantArray::try_new(variant_array.as_ref())?; let v = variant_array.value(0); - ColumnarValue::Scalar(ScalarValue::Utf8View(Some(format!("{:?}", v)))) + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(format!("{v:?}")))) } ColumnarValue::Array(arr) => match arr.data_type() { DataType::Struct(_) => { @@ -74,14 +75,14 @@ impl ScalarUDFImpl for VariantPretty { let out = variant_array .iter() - .map(|v| v.map(|v| format!("{:?}", v))) + .map(|v| v.map(|v| format!("{v:?}"))) .collect::>(); let out: StringViewArray = out.into(); ColumnarValue::Array(Arc::new(out)) } - unsupported => return exec_err!("Invalid data type: {unsupported}"), + unsupported => return arg_type_err(self.name(), 1, "Struct", unsupported), }, }; diff --git a/src/variant_to_json.rs b/src/variant_to_json.rs index 271c0b0..3c8a39f 100644 --- a/src/variant_to_json.rs +++ b/src/variant_to_json.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use arrow::array::StringViewArray; use arrow_schema::DataType; use datafusion::{ - common::{exec_datafusion_err, exec_err}, error::Result, logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -15,7 +14,9 @@ use datafusion::{ use parquet_variant_compute::VariantArray; use parquet_variant_json::VariantToJson; -use crate::shared::try_field_as_variant_array; +use crate::shared::{ + arg_field_meta_missing_err, arg_type_err, args_count_err, try_field_as_variant_array, +}; /// Returns a JSON string from a VariantArray /// @@ -59,19 +60,19 @@ impl ScalarUDFImpl for VariantToJsonUdf { let field = args .arg_fields .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + .ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?; try_field_as_variant_array(field.as_ref())?; let arg = args .args .first() - .ok_or_else(|| exec_datafusion_err!("empty argument, expected 1 argument"))?; + .ok_or_else(|| args_count_err(self.name(), "1", args.args.len()))?; let out = match arg { ColumnarValue::Scalar(scalar) => { let ScalarValue::Struct(variant_array) = scalar else { - return exec_err!("Unsupported data type: {}", scalar.data_type()); + return arg_type_err(self.name(), 1, "Struct", &scalar.data_type()); }; let variant_array = VariantArray::try_new(variant_array.as_ref())?; @@ -91,7 +92,7 @@ impl ScalarUDFImpl for VariantToJsonUdf { ColumnarValue::Array(Arc::new(out)) } - unsupported => return exec_err!("Invalid data type: {unsupported}"), + unsupported => return arg_type_err(self.name(), 1, "Struct", unsupported), }, };