Skip to content

Commit

Permalink
feat(query): add json array lambda functions (#16573)
Browse files Browse the repository at this point in the history
* feat(query): add json array lambda functions

* feat(query): add json array lambda functions

* feat(query): address comments

* feat(query): address comments

* feat(query): address comments

* feat(query): address comments
  • Loading branch information
sundy-li authored Oct 9, 2024
1 parent add514c commit bcf3a6a
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 1 deletion.
95 changes: 95 additions & 0 deletions src/query/expression/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ use crate::types::array::ArrayColumn;
use crate::types::boolean::BooleanDomain;
use crate::types::nullable::NullableColumn;
use crate::types::nullable::NullableDomain;
use crate::types::ArgType;
use crate::types::ArrayType;
use crate::types::BooleanType;
use crate::types::DataType;
use crate::types::NullableType;
use crate::types::NumberScalar;
use crate::types::VariantType;
use crate::values::Column;
use crate::values::ColumnBuilder;
use crate::values::Scalar;
Expand Down Expand Up @@ -502,6 +505,98 @@ impl<'a> Evaluator<'a> {
}
other => unreachable!("source: {}", other),
},
(DataType::Variant, DataType::Array(inner_dest_ty)) => {
let empty_vec = vec![];
let temp_array: jsonb::Value;
match value {
Value::Scalar(Scalar::Variant(x)) => {
let array = if validity.as_ref().map(|v| v.get_bit(0)).unwrap_or(true) {
temp_array = jsonb::from_slice(&x).map_err(|e| {
ErrorCode::BadArguments(format!(
"Expect to be valid json, got err: {e:?}"
))
})?;
temp_array.as_array().unwrap_or(&empty_vec)
} else {
&empty_vec
};

let column = VariantType::create_column_from_variants(array.as_slice());

let validity = validity.map(|validity| {
Bitmap::new_constant(
validity.unset_bits() != validity.len(),
column.len(),
)
});

let new_array = self
.run_cast(
span,
&DataType::Variant,
inner_dest_ty,
Value::Column(Column::Variant(column)),
validity,
options,
)?
.into_column()
.unwrap();
Ok(Value::Scalar(Scalar::Array(new_array)))
}
Value::Column(Column::Variant(col)) => {
let mut array_builder =
ArrayType::<VariantType>::create_builder(col.len(), &[]);

let mut temp_array: jsonb::Value;
for (idx, x) in col.iter().enumerate() {
let array = if validity.as_ref().map(|v| v.get_bit(idx)).unwrap_or(true)
{
temp_array = jsonb::from_slice(x).map_err(|e| {
ErrorCode::BadArguments(format!(
"Expect to be valid json, got err: {e:?}"
))
})?;
temp_array.as_array().unwrap_or(&empty_vec)
} else {
&empty_vec
};

for v in array.iter() {
v.write_to_vec(&mut array_builder.builder.data);
array_builder.builder.commit_row();
}
array_builder.commit_row();
}
let col = array_builder.build();
let validity = validity.map(|validity| {
let mut inner_validity = MutableBitmap::with_capacity(col.len());
for (index, offsets) in col.offsets.windows(2).enumerate() {
inner_validity.extend_constant(
(offsets[1] - offsets[0]) as usize,
validity.get_bit(index),
);
}
inner_validity.into()
});
let new_col = self
.run_cast(
span,
&DataType::Variant,
inner_dest_ty,
Value::Column(Column::Variant(col.values)),
validity,
options,
)?
.into_column()
.unwrap();
Ok(Value::Column(Column::Array(Box::new(ArrayColumn {
values: new_col,
offsets: col.offsets,
}))))
}
other => unreachable!("source: {}", other),
}
}
(DataType::EmptyMap, DataType::Map(inner_dest_ty)) => match value {
Value::Scalar(Scalar::EmptyMap) => {
let new_column = ColumnBuilder::with_capacity(inner_dest_ty, 0).build();
Expand Down
12 changes: 12 additions & 0 deletions src/query/expression/src/types/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::ops::Range;
use databend_common_io::deserialize_bitmap;
use geozero::wkb::Ewkb;
use geozero::ToJson;
use jsonb::Value;

use super::binary::BinaryColumn;
use super::binary::BinaryColumnBuilder;
Expand Down Expand Up @@ -196,6 +197,17 @@ impl ArgType for VariantType {
}
}

impl VariantType {
pub fn create_column_from_variants(variants: &[Value]) -> BinaryColumn {
let mut builder = BinaryColumnBuilder::with_capacity(variants.len(), 0);
for v in variants {
v.write_to_vec(&mut builder.data);
builder.commit_row();
}
builder.build()
}
}

pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: TzLUT, buf: &mut Vec<u8>) {
let inner_tz = tz.tz;
let value = match scalar {
Expand Down
7 changes: 6 additions & 1 deletion src/query/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,17 @@ pub const GENERAL_WINDOW_FUNCTIONS: [&str; 13] = [
"cume_dist",
];

pub const GENERAL_LAMBDA_FUNCTIONS: [&str; 5] = [
pub const GENERAL_LAMBDA_FUNCTIONS: [&str; 10] = [
"array_transform",
"array_apply",
"array_map",
"array_filter",
"array_reduce",
"json_array_transform",
"json_array_apply",
"json_array_map",
"json_array_filter",
"json_array_reduce",
];

pub const GENERAL_SEARCH_FUNCTIONS: [&str; 3] = ["match", "query", "score"];
Expand Down
30 changes: 30 additions & 0 deletions src/query/sql/src/planner/semantic/type_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,7 @@ impl<'a> TypeChecker<'a> {
DataType::Null => DataType::Null,
DataType::Binary => DataType::Binary,
DataType::String => DataType::String,
DataType::Variant => DataType::Variant,
_ => {
return Err(ErrorCode::BadDataValueType(format!(
"array_reduce does not support type '{:?}'",
Expand All @@ -1827,6 +1828,35 @@ impl<'a> TypeChecker<'a> {
args: &[&Expr],
lambda: &Lambda,
) -> Result<Box<(ScalarExpr, DataType)>> {
if func_name.starts_with("json_") && !args.is_empty() {
let func_name = &func_name[5..];
let mut new_args: Vec<Expr> = args.iter().map(|v| (*v).to_owned()).collect();
new_args[0] = Expr::Cast {
span: new_args[0].span(),
expr: Box::new(new_args[0].clone()),
target_type: TypeName::Array(Box::new(TypeName::Variant)),
pg_style: false,
};

let args: Vec<&Expr> = new_args.iter().collect();
let result = self.resolve_lambda_function(span, func_name, &args, lambda)?;

let target_type = if result.1.is_nullable() {
DataType::Variant.wrap_nullable()
} else {
DataType::Variant
};

let result_expr = ScalarExpr::CastExpr(CastExpr {
span: new_args[0].span(),
is_try: false,
argument: Box::new(result.0.clone()),
target_type: Box::new(target_type.clone()),
});

return Ok(Box::new((result_expr, target_type)));
}

if matches!(
self.bind_context.expr_context,
ExprContext::InLambdaFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,5 +477,10 @@ select cast(position as String), cast(trim as String), cast(substring as String)
----
3 3 3

query TTT
select '[1,2,"3"]'::Variant a, a::Array(Variant) b, b::Variant = a;
----
[1,2,"3"] ['1','2','"3"'] 1

statement ok
drop table t
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,26 @@ select array_transform(col1, a -> a + col2), array_filter(col1, a -> a = col2),
NULL NULL NULL
[12,13] [] 20

## json array functions

query T
select json_array_transform(['data', 'a', 'b']::Variant, data -> CONCAT(data::String, 'bend') );
----
["databend","abend","bbend"]

query TT
select json_array_transform(try_cast(col1 as Variant), a -> a::Int + col2), json_array_filter(try_cast(col1 as Variant), a -> a::Int = col2) from t3;
----
[3,4,5] [2]
[null,null] []
NULL NULL
[12,13] []

query TT
select json_array_reduce([1,2,3,4]::Variant, (x, y) -> 3 + x + y), json_array_transform(parse_json('"aa"'), data -> CONCAT(data::String, 'bend'));
----
19 []

statement ok
USE default

Expand Down

0 comments on commit bcf3a6a

Please sign in to comment.