Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/common_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use datafusion::arrow::datatypes::{DataType, Field, UnionFields, UnionMode};
use datafusion::arrow::error::ArrowError;
use datafusion::common::ScalarValue;

/// Field metadata used to mark a `Utf8` field as containing raw JSON.
///
/// Attach this to any Arrow `Field` whose values are JSON-encoded strings so
/// downstream consumers can recognize them as JSON rather than opaque text.
pub fn json_field_metadata() -> HashMap<String, String> {
HashMap::from([("is_json".to_string(), "true".to_string())])
}

pub fn is_json_union(data_type: &DataType) -> bool {
match data_type {
DataType::Union(fields, UnionMode::Sparse) => fields == &union_fields(),
Expand Down Expand Up @@ -161,8 +169,6 @@ fn union_fields() -> UnionFields {
static FIELDS: OnceLock<UnionFields> = OnceLock::new();
FIELDS
.get_or_init(|| {
let json_metadata: HashMap<String, String> =
HashMap::from_iter(vec![("is_json".to_string(), "true".to_string())]);
UnionFields::from_iter([
(TYPE_ID_NULL, Arc::new(Field::new("null", DataType::Null, true))),
(TYPE_ID_BOOL, Arc::new(Field::new("bool", DataType::Boolean, false))),
Expand All @@ -171,11 +177,11 @@ fn union_fields() -> UnionFields {
(TYPE_ID_STR, Arc::new(Field::new("str", DataType::Utf8, false))),
(
TYPE_ID_ARRAY,
Arc::new(Field::new("array", DataType::Utf8, false).with_metadata(json_metadata.clone())),
Arc::new(Field::new("array", DataType::Utf8, false).with_metadata(json_field_metadata())),
),
(
TYPE_ID_OBJECT,
Arc::new(Field::new("object", DataType::Utf8, false).with_metadata(json_metadata.clone())),
Arc::new(Field::new("object", DataType::Utf8, false).with_metadata(json_field_metadata())),
),
])
})
Expand Down
21 changes: 9 additions & 12 deletions src/json_get_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ use std::any::Any;
use std::sync::Arc;

use datafusion::arrow::array::{ArrayRef, ListBuilder, StringBuilder};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::{Result as DataFusionResult, ScalarValue};
use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};
use jiter::Peek;

use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, InvokeResult, JsonPath};
use crate::common_macros::make_udf_function;
use crate::common_union::json_field_metadata;

fn list_item_field() -> Field {
Field::new("item", DataType::Utf8, true).with_metadata(json_field_metadata())
}

make_udf_function!(
JsonGetArray,
Expand Down Expand Up @@ -46,15 +51,7 @@ impl ScalarUDFImpl for JsonGetArray {
}

fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
return_type_check(
arg_types,
self.name(),
DataType::List(Arc::new(datafusion::arrow::datatypes::Field::new(
"item",
DataType::Utf8,
true,
))),
)
return_type_check(arg_types, self.name(), DataType::List(Arc::new(list_item_field())))
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
Expand Down Expand Up @@ -96,7 +93,7 @@ impl InvokeResult for BuildArrayList {

fn builder(capacity: usize) -> Self::Builder {
let values_builder = StringBuilder::new();
ListBuilder::with_capacity(values_builder, capacity)
ListBuilder::with_capacity(values_builder, capacity).with_field(list_item_field())
}

fn append_value(builder: &mut Self::Builder, value: Option<Self::Item>) {
Expand All @@ -108,7 +105,7 @@ impl InvokeResult for BuildArrayList {
}

fn scalar(value: Option<Self::Item>) -> ScalarValue {
let mut builder = ListBuilder::new(StringBuilder::new());
let mut builder = ListBuilder::new(StringBuilder::new()).with_field(list_item_field());

if let Some(array_items) = value {
for item in array_items {
Expand Down
16 changes: 14 additions & 2 deletions src/json_get_json.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::any::Any;
use std::sync::Arc;

use datafusion::arrow::array::StringArray;
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::{DataType, Field, FieldRef};
use datafusion::common::Result as DataFusionResult;
use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility};
use datafusion::logical_expr::{
ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};

use crate::common::{get_err, invoke, jiter_json_find, return_type_check, GetError, JsonPath};
use crate::common_macros::make_udf_function;
use crate::common_union::json_field_metadata;

make_udf_function!(
JsonGetJson,
Expand Down Expand Up @@ -47,6 +51,14 @@ impl ScalarUDFImpl for JsonGetJson {
return_type_check(arg_types, self.name(), DataType::Utf8)
}

fn return_field_from_args(&self, args: ReturnFieldArgs) -> DataFusionResult<FieldRef> {
let arg_types: Vec<DataType> = args.arg_fields.iter().map(|f| f.data_type().clone()).collect();
let return_type = self.return_type(&arg_types)?;
Ok(Arc::new(
Field::new(self.name(), return_type, true).with_metadata(json_field_metadata()),
))
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DataFusionResult<ColumnarValue> {
invoke::<StringArray>(&args.args, jiter_json_get_json)
}
Expand Down
34 changes: 34 additions & 0 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,31 @@ async fn test_json_get_array_with_path() {
assert_eq!(value_repr, "[1, 2, 3]");
}

#[tokio::test]
async fn test_json_get_array_inner_field_is_json_metadata() {
let sql = r#"select json_get_array('[{"a": 1}, {"b": 2}]') as v"#;
let batches = run_query(sql).await.unwrap();
let schema = batches[0].schema();
let field = schema.field(0);
let DataType::List(inner_field) = field.data_type() else {
panic!("expected List, got {:?}", field.data_type());
};
assert_eq!(inner_field.metadata().get("is_json").map(String::as_str), Some("true"));

let array_field = batches[0]
.column(0)
.as_any()
.downcast_ref::<datafusion::arrow::array::ListArray>()
.unwrap();
let DataType::List(produced_inner) = array_field.data_type() else {
panic!("expected List in produced array");
};
assert_eq!(
produced_inner.metadata().get("is_json").map(String::as_str),
Some("true")
);
}

#[tokio::test]
async fn test_json_get_equals() {
let e = run_query(r"select name, json_get(json_data, 'foo')='abc' from test")
Expand Down Expand Up @@ -411,6 +436,15 @@ async fn test_json_get_json_float() {
assert_eq!(display_val(batches).await, (DataType::Utf8, "4.2e-1".to_string()));
}

#[tokio::test]
async fn test_json_get_json_is_json_metadata() {
let sql = r#"select json_get_json('{"x": [1, 2]}', 'x') as v"#;
let batches = run_query(sql).await.unwrap();
let schema = batches[0].schema();
let field = schema.field(0);
assert_eq!(field.metadata().get("is_json").map(String::as_str), Some("true"));
}

#[tokio::test]
async fn test_json_length_array() {
let sql = "select json_length('[1, 2, 3]')";
Expand Down
Loading