Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ pedantic = { level = "deny", priority = -1 }
[[bench]]
name = "main"
harness = false

[patch.crates-io]
datafusion = { git = "https://github.com/pydantic/datafusion.git", branch = "pydantic-main" }
18 changes: 18 additions & 0 deletions src/json_as_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ impl ScalarUDFImpl for JsonAsText {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

impl InvokeResult for StringArray {
Expand Down
18 changes: 18 additions & 0 deletions src/json_contains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ impl ScalarUDFImpl for JsonContains {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

impl InvokeResult for BooleanArray {
Expand Down
18 changes: 18 additions & 0 deletions src/json_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ impl ScalarUDFImpl for JsonGet {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

impl InvokeResult for JsonUnion {
Expand Down
18 changes: 18 additions & 0 deletions src/json_get_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ impl ScalarUDFImpl for JsonGetArray {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

#[derive(Debug)]
Expand Down
18 changes: 18 additions & 0 deletions src/json_get_bool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@ impl ScalarUDFImpl for JsonGetBool {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

fn jiter_json_get_bool(json_data: Option<&str>, path: &[JsonPath]) -> Result<bool, GetError> {
Expand Down
18 changes: 18 additions & 0 deletions src/json_get_float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ impl ScalarUDFImpl for JsonGetFloat {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

impl InvokeResult for Float64Array {
Expand Down
18 changes: 18 additions & 0 deletions src/json_get_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ impl ScalarUDFImpl for JsonGetInt {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

impl InvokeResult for Int64Array {
Expand Down
18 changes: 18 additions & 0 deletions src/json_get_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ impl ScalarUDFImpl for JsonGetJson {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

fn jiter_json_get_json(opt_json: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
Expand Down
18 changes: 18 additions & 0 deletions src/json_get_str.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@ impl ScalarUDFImpl for JsonGetStr {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

fn jiter_json_get_str(json_data: Option<&str>, path: &[JsonPath]) -> Result<String, GetError> {
Expand Down
18 changes: 18 additions & 0 deletions src/json_length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ impl ScalarUDFImpl for JsonLength {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

impl InvokeResult for UInt64Array {
Expand Down
18 changes: 18 additions & 0 deletions src/json_object_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ impl ScalarUDFImpl for JsonObjectKeys {
fn aliases(&self) -> &[String] {
&self.aliases
}

fn placement(
&self,
args: &[datafusion::logical_expr::ExpressionPlacement],
) -> datafusion::logical_expr::ExpressionPlacement {
// If the first argument is a column and the remaining arguments are literals (a path)
// then we can push this UDF down to the leaf nodes.
if args.len() >= 2
&& matches!(args[0], datafusion::logical_expr::ExpressionPlacement::Column)
&& args[1..]
.iter()
.all(|arg| matches!(arg, datafusion::logical_expr::ExpressionPlacement::Literal))
{
datafusion::logical_expr::ExpressionPlacement::MoveTowardsLeafNodes
} else {
datafusion::logical_expr::ExpressionPlacement::KeepInPlace
}
}
}

/// Struct used to build a `ListArray` from the result of `jiter_json_object_keys`.
Expand Down
2 changes: 1 addition & 1 deletion src/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn optimise_json_get_cast(cast: &Cast) -> Option<Transformed<Expr>> {
if scalar_func.func.name() != "json_get" {
return None;
}
let func = match &cast.data_type {
let func = match cast.field.data_type() {
DataType::Boolean => crate::json_get_bool::json_get_bool_udf(),
DataType::Float64 | DataType::Float32 | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
crate::json_get_float::json_get_float_udf()
Expand Down
31 changes: 22 additions & 9 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,27 @@ async fn test_json_get_array_with_path() {

#[tokio::test]
async fn test_json_get_equals() {
let e = run_query(r"select name, json_get(json_data, 'foo')='abc' from test")
// union comparison now works thanks to the union coercions upport in datafusion
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// union comparison now works thanks to the union coercions upport in datafusion
// union comparison now works thanks to the union coercion support in datafusion

// (previously failed with "Cannot infer common argument type for comparison operation Union")
// see https://github.com/apache/datafusion/issues/10180
let batches = run_query(r"select name, json_get(json_data, 'foo')='abc' from test")
.await
.unwrap_err();
.unwrap();

// see https://github.com/apache/datafusion/issues/10180
assert!(e
.to_string()
.starts_with("Error during planning: Cannot infer common argument type for comparison operation Union"));
let expected = [
"+------------------+----------------------------------------------------+",
r#"| name | json_get(test.json_data,Utf8("foo")) = Utf8("abc") |"#,
"+------------------+----------------------------------------------------+",
"| object_foo | true |",
"| object_foo_array | |",
"| object_foo_obj | |",
"| object_foo_null | |",
"| object_bar | |",
"| list_foo | |",
"| invalid_json | |",
"+------------------+----------------------------------------------------+",
];
assert_batches_eq!(expected, &batches);
}

#[tokio::test]
Expand Down Expand Up @@ -762,9 +775,9 @@ async fn test_plan_json_get_cte() {
select name, json_get(j, 0) v from t
";
let expected = [
"Projection: t.name, json_get(t.j, Int64(0)) AS v",
"Projection: t.name, __datafusion_extracted_1 AS v",
" SubqueryAlias: t",
" Projection: test.name, json_get(test.json_data, Utf8(\"foo\")) AS j",
" Projection: test.name, json_get(json_get(test.json_data, Utf8(\"foo\")), Int64(0)) AS __datafusion_extracted_1",
" TableScan: test projection=[name, json_data]",
];

Expand Down Expand Up @@ -1255,7 +1268,7 @@ async fn test_plan_double_arrow_double_nested_cast() {

// NB: json_as_text(..)::int is NOT the same as `json_get_int(..)`, hence the cast is not rewritten
let expected = [
"Projection: CAST(json_as_text(test.json_data, Utf8(\"foo\"), Int64(0)) AS json_data ->> 'foo' ->> 0 AS Int32)",
"Projection: CAST(json_as_text(test.json_data, Utf8(\"foo\"), Int64(0)) AS Int32) AS json_data ->> 'foo' ->> 0",
" TableScan: test projection=[json_data]",
];

Expand Down
Loading