diff --git a/Cargo.lock b/Cargo.lock index 3aaa6c612aba..6cf260c56233 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2405,6 +2405,20 @@ dependencies = [ "colorchoice", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "comfy-table" version = "6.2.0" @@ -2690,6 +2704,12 @@ dependencies = [ "wasmtime-types", ] +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crc32c" version = "0.6.8" @@ -5225,6 +5245,7 @@ version = "0.1.0" dependencies = [ "async-trait", "clap", + "databend-common-base", "databend-common-exception", "env_logger", "futures-util", @@ -10787,6 +10808,7 @@ dependencies = [ "prometheus-client", "prost 0.13.1", "quick-xml 0.36.1", + "redis", "reqsign", "reqwest 0.12.5", "serde", @@ -12588,6 +12610,39 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "redis" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e902a69d09078829137b4a5d9d082e0490393537badd7c91a3d69d14639e115f" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "crc16", + "futures", + "futures-util", + "itoa", + "log", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "rand 0.8.5", + "rustls 0.23.12", + "rustls-native-certs 0.7.1", + "rustls-pemfile 2.1.3", + "rustls-pki-types", + "ryu", + "sha1_smol", + "socket2 0.5.7", + "tokio", + "tokio-retry", + "tokio-rustls 0.26.0", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" diff --git a/Cargo.toml b/Cargo.toml index 45fda64798a2..cf0986912747 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -269,6 +269,7 @@ opendal = { version = "0.49.0", features = [ "services-moka", "services-webhdfs", "services-huggingface", + "services-redis", ] } openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.10.0-alpha.6", features = [ "serde", diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 7e1d75228939..721374fb1eb8 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -386,12 +386,7 @@ build_exceptions! { // dictionary DictionaryAlreadyExists(3113), UnknownDictionary(3114), - UnknownDictionaryId(3115), - UnsupportedDictionaryOption(3116), - UnsupportedDictionarySource(3117), - MissingDictionaryOption(3118), - WrongDictionaryFieldExpr(3119), - + DictionarySourceError(3115), // Procedure UnknownProcedure(3130), ProcedureAlreadyExists(3131), diff --git a/src/common/storage/src/lib.rs b/src/common/storage/src/lib.rs index 025743a39dec..60c505d2cc6a 100644 --- a/src/common/storage/src/lib.rs +++ b/src/common/storage/src/lib.rs @@ -36,6 +36,7 @@ pub use config::ShareTableConfig; pub use config::StorageConfig; mod operator; +pub use operator::build_operator; pub use operator::init_operator; pub use operator::DataOperator; diff --git a/src/meta/app/src/schema/dictionary.rs b/src/meta/app/src/schema/dictionary.rs index 3beb7b275ad3..99679b2a8c99 100644 --- a/src/meta/app/src/schema/dictionary.rs +++ b/src/meta/app/src/schema/dictionary.rs @@ -20,6 +20,8 @@ use std::sync::Arc; use chrono::DateTime; use chrono::Utc; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; use databend_common_expression::TableSchema; use super::dictionary_name_ident::DictionaryNameIdent; @@ -77,6 +79,47 @@ impl Default for DictionaryMeta { } } +impl DictionaryMeta { + pub fn build_sql_connection_url(&self) -> Result { + let username = self + .options + .get("username") + .ok_or_else(|| ErrorCode::BadArguments("Miss option `username`"))?; + let password = self + .options + .get("password") + .ok_or_else(|| ErrorCode::BadArguments("Miss option `password`"))?; + let host = self + .options + .get("host") + .ok_or_else(|| ErrorCode::BadArguments("Miss option `host`"))?; + let port = self + .options + .get("port") + .ok_or_else(|| ErrorCode::BadArguments("Miss option `port`"))?; + let db = self + .options + .get("db") + .ok_or_else(|| ErrorCode::BadArguments("Miss option `db`"))?; + Ok(format!( + "mysql://{}:{}@{}:{}/{}", + username, password, host, port, db + )) + } + + pub fn build_redis_connection_url(&self) -> Result { + let host = self + .options + .get("host") + .ok_or_else(|| ErrorCode::BadArguments("Miss option `host`"))?; + let port = self + .options + .get("port") + .ok_or_else(|| ErrorCode::BadArguments("Miss option `port`"))?; + Ok(format!("tcp://{}:{}", host, port)) + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct CreateDictionaryReq { pub dictionary_ident: DictionaryNameIdent, diff --git a/src/query/functions/src/lib.rs b/src/query/functions/src/lib.rs index 2e6ce7194385..734bb4b74248 100644 --- a/src/query/functions/src/lib.rs +++ b/src/query/functions/src/lib.rs @@ -44,7 +44,7 @@ pub fn is_builtin_function(name: &str) -> bool { #[ctor] pub static BUILTIN_FUNCTIONS: FunctionRegistry = builtin_functions(); -pub const ASYNC_FUNCTIONS: [&str; 1] = ["nextval"]; +pub const ASYNC_FUNCTIONS: [&str; 2] = ["nextval", "dict_get"]; pub const GENERAL_WINDOW_FUNCTIONS: [&str; 13] = [ "row_number", diff --git a/src/query/service/src/pipelines/builders/builder_async_function.rs b/src/query/service/src/pipelines/builders/builder_async_function.rs index 0f9de49dd382..1807ed099fae 100644 --- a/src/query/service/src/pipelines/builders/builder_async_function.rs +++ b/src/query/service/src/pipelines/builders/builder_async_function.rs @@ -23,8 +23,13 @@ impl PipelineBuilder { pub(crate) fn build_async_function(&mut self, async_function: &AsyncFunction) -> Result<()> { self.build_pipeline(&async_function.input)?; + let operators = TransformAsyncFunction::init_operators(&async_function.async_func_descs)?; self.main_pipeline.add_async_transformer(|| { - TransformAsyncFunction::new(self.ctx.clone(), async_function.async_func_descs.clone()) + TransformAsyncFunction::new( + self.ctx.clone(), + async_function.async_func_descs.clone(), + operators.clone(), + ) }); Ok(()) diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 2b39227e30e8..a34ec20725e0 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -25,6 +25,7 @@ mod transform_block_compact_no_split; mod transform_cache_scan; mod transform_cast_schema; mod transform_create_sets; +mod transform_dictionary; mod transform_expression_scan; mod transform_filter; mod transform_limit; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs index 9e9d13ee7bec..86a5ab076d11 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::sync::Arc; use databend_common_exception::Result; @@ -25,6 +26,7 @@ use databend_common_meta_app::schema::GetSequenceNextValueReq; use databend_common_meta_app::schema::SequenceIdent; use databend_common_pipeline_transforms::processors::AsyncTransform; use databend_common_storages_fuse::TableContext; +use opendal::Operator; use crate::sessions::QueryContext; use crate::sql::executor::physical_plans::AsyncFunctionDesc; @@ -32,14 +34,21 @@ use crate::sql::plans::AsyncFunctionArgument; pub struct TransformAsyncFunction { ctx: Arc, + // key is the index of async_func_desc + pub(crate) operators: BTreeMap>, async_func_descs: Vec, } impl TransformAsyncFunction { - pub fn new(ctx: Arc, async_func_descs: Vec) -> Self { + pub fn new( + ctx: Arc, + async_func_descs: Vec, + operators: BTreeMap>, + ) -> Self { Self { ctx, async_func_descs, + operators, } } @@ -80,7 +89,7 @@ impl AsyncTransform for TransformAsyncFunction { #[async_backtrace::framed] async fn transform(&mut self, mut data_block: DataBlock) -> Result { - for async_func_desc in &self.async_func_descs { + for (i, async_func_desc) in self.async_func_descs.iter().enumerate() { match &async_func_desc.func_arg { AsyncFunctionArgument::SequenceFunction(sequence_name) => { self.transform_sequence( @@ -90,9 +99,18 @@ impl AsyncTransform for TransformAsyncFunction { ) .await?; } + AsyncFunctionArgument::DictGetFunction(dict_arg) => { + self.transform_dict_get( + i, + &mut data_block, + dict_arg, + &async_func_desc.arg_indices, + &async_func_desc.data_type, + ) + .await?; + } } } - Ok(data_block) } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_dictionary.rs b/src/query/service/src/pipelines/processors/transforms/transform_dictionary.rs new file mode 100644 index 000000000000..5a4f41318a44 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/transform_dictionary.rs @@ -0,0 +1,143 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::BlockEntry; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::DataBlock; +use databend_common_expression::Scalar; +use databend_common_expression::ScalarRef; +use databend_common_expression::Value; +use databend_common_storage::build_operator; +use opendal::services::Redis; +use opendal::Operator; + +use crate::pipelines::processors::transforms::TransformAsyncFunction; +use crate::sql::executor::physical_plans::AsyncFunctionDesc; +use crate::sql::plans::AsyncFunctionArgument; +use crate::sql::plans::DictGetFunctionArgument; +use crate::sql::plans::DictionarySource; +use crate::sql::IndexType; + +impl TransformAsyncFunction { + pub fn init_operators( + async_func_descs: &[AsyncFunctionDesc], + ) -> Result>> { + let mut operators = BTreeMap::new(); + for (i, async_func_desc) in async_func_descs.iter().enumerate() { + if let AsyncFunctionArgument::DictGetFunction(dict_arg) = &async_func_desc.func_arg { + match &dict_arg.dict_source { + DictionarySource::Redis(redis_source) => { + let mut builder = Redis::default().endpoint(&redis_source.connection_url); + if let Some(ref username) = redis_source.username { + builder = builder.username(username); + } + if let Some(ref password) = redis_source.password { + builder = builder.password(password); + } + if let Some(db_index) = redis_source.db_index { + builder = builder.db(db_index); + } + let op = build_operator(builder)?; + operators.insert(i, Arc::new(op)); + } + DictionarySource::Mysql(_) => { + return Err(ErrorCode::Unimplemented("Mysql source is unsupported")); + } + } + } + } + Ok(operators) + } + + // transform add dict get column. + pub(crate) async fn transform_dict_get( + &self, + i: usize, + data_block: &mut DataBlock, + dict_arg: &DictGetFunctionArgument, + arg_indices: &[IndexType], + data_type: &DataType, + ) -> Result<()> { + let op = self.operators.get(&i).unwrap().clone(); + + // only support one key field. + let arg_index = arg_indices[0]; + let entry = data_block.get_by_offset(arg_index); + let value = match &entry.value { + Value::Scalar(scalar) => { + if let Scalar::String(key) = scalar { + let buffer = op.read(key).await; + match buffer { + Ok(res) => { + let value = + unsafe { String::from_utf8_unchecked(res.current().to_vec()) }; + Value::Scalar(Scalar::String(value)) + } + Err(e) => { + if e.kind() == opendal::ErrorKind::NotFound { + Value::Scalar(dict_arg.default_value.clone()) + } else { + return Err(ErrorCode::DictionarySourceError(format!( + "dictionary source error: {e}" + ))); + } + } + } + } else { + Value::Scalar(dict_arg.default_value.clone()) + } + } + Value::Column(column) => { + let mut builder = ColumnBuilder::with_capacity(data_type, column.len()); + for scalar in column.iter() { + if let ScalarRef::String(key) = scalar { + let buffer = op.read(key).await; + match buffer { + Ok(res) => { + let value = + unsafe { String::from_utf8_unchecked(res.current().to_vec()) }; + builder.push(ScalarRef::String(value.as_str())); + } + Err(e) => { + if e.kind() == opendal::ErrorKind::NotFound { + builder.push(dict_arg.default_value.as_ref()); + } else { + return Err(ErrorCode::DictionarySourceError(format!( + "dictionary source error: {e}" + ))); + } + } + }; + } else { + builder.push(dict_arg.default_value.as_ref()); + } + } + Value::Column(builder.build()) + } + }; + let entry = BlockEntry { + data_type: data_type.clone(), + value, + }; + data_block.add_column(entry); + + Ok(()) + } +} diff --git a/src/query/sql/src/planner/binder/ddl/dictionary.rs b/src/query/sql/src/planner/binder/ddl/dictionary.rs index c676c2afb25a..8bbe682c614a 100644 --- a/src/query/sql/src/planner/binder/ddl/dictionary.rs +++ b/src/query/sql/src/planner/binder/ddl/dictionary.rs @@ -13,6 +13,8 @@ // limitations under the License. use std::collections::BTreeMap; +use std::collections::HashSet; +use std::sync::LazyLock; use databend_common_ast::ast::CreateDictionaryStmt; use databend_common_ast::ast::DropDictionaryStmt; @@ -22,7 +24,10 @@ use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRefExt; +use databend_common_expression::TableDataType; +use databend_common_expression::TableSchema; use databend_common_meta_app::schema::DictionaryMeta; +use itertools::Itertools; use crate::plans::CreateDictionaryPlan; use crate::plans::DropDictionaryPlan; @@ -30,6 +35,199 @@ use crate::plans::Plan; use crate::plans::ShowCreateDictionaryPlan; use crate::Binder; +pub const DICT_OPT_KEY_SQL_HOST: &str = "host"; +pub const DICT_OPT_KEY_SQL_PORT: &str = "port"; +pub const DICT_OPT_KEY_SQL_USERNAME: &str = "username"; +pub const DICT_OPT_KEY_SQL_PASSWORD: &str = "password"; +pub const DICT_OPT_KEY_SQL_DB: &str = "db"; +pub const DICT_OPT_KEY_SQL_TABLE: &str = "table"; + +pub const DICT_OPT_KEY_REDIS_HOST: &str = "host"; +pub const DICT_OPT_KEY_REDIS_PORT: &str = "port"; +pub const DICT_OPT_KEY_REDIS_USERNAME: &str = "username"; +pub const DICT_OPT_KEY_REDIS_PASSWORD: &str = "password"; +pub const DICT_OPT_KEY_REDIS_DB_INDEX: &str = "db_index"; + +static DICT_REQUIRED_SQL_OPTION_KEYS: LazyLock> = LazyLock::new(|| { + let mut r = HashSet::new(); + r.insert(DICT_OPT_KEY_SQL_HOST); + r.insert(DICT_OPT_KEY_SQL_PORT); + r.insert(DICT_OPT_KEY_SQL_USERNAME); + r.insert(DICT_OPT_KEY_SQL_PASSWORD); + r.insert(DICT_OPT_KEY_SQL_DB); + r.insert(DICT_OPT_KEY_SQL_TABLE); + r +}); + +static DICT_REQUIRED_REDIS_OPTION_KEYS: LazyLock> = LazyLock::new(|| { + let mut r = HashSet::new(); + r.insert(DICT_OPT_KEY_REDIS_HOST); + r.insert(DICT_OPT_KEY_REDIS_PORT); + r +}); + +static DICT_OPTIONAL_REDIS_OPTION_KEYS: LazyLock> = LazyLock::new(|| { + let mut r = HashSet::new(); + r.insert(DICT_OPT_KEY_REDIS_USERNAME); + r.insert(DICT_OPT_KEY_REDIS_PASSWORD); + r.insert(DICT_OPT_KEY_REDIS_DB_INDEX); + r +}); + +fn is_dict_required_sql_opt_key>(opt_key: S) -> bool { + DICT_REQUIRED_SQL_OPTION_KEYS.contains(opt_key.as_ref()) +} + +fn is_dict_required_redis_opt_key>(opt_key: S) -> bool { + DICT_REQUIRED_REDIS_OPTION_KEYS.contains(opt_key.as_ref()) +} + +fn is_dict_optional_redis_opt_key>(opt_key: S) -> bool { + DICT_OPTIONAL_REDIS_OPTION_KEYS.contains(opt_key.as_ref()) +} + +fn insert_dictionary_sql_option_with_validation( + options: &mut BTreeMap, + key: String, + value: String, +) -> Result<()> { + if is_dict_required_sql_opt_key(&key) { + if key == DICT_OPT_KEY_SQL_PORT && value.parse::().is_err() { + return Err(ErrorCode::BadArguments(format!( + "dictionary option {key} must be a positive integer", + ))); + } + if options.insert(key.clone(), value).is_some() { + return Err(ErrorCode::BadArguments(format!( + "dictionary option {key} duplicated", + ))); + } + } else { + return Err(ErrorCode::BadArguments(format!( + "dictionary option {key} is not a valid option, required options are [`host`, `port`, `username`, `password`, `db`, `table`]", + ))); + } + Ok(()) +} + +fn insert_dictionary_redis_option_with_validation( + options: &mut BTreeMap, + key: String, + value: String, +) -> Result<()> { + if is_dict_required_redis_opt_key(&key) || is_dict_optional_redis_opt_key(&key) { + if key == DICT_OPT_KEY_REDIS_PORT { + if value.parse::().is_err() { + return Err(ErrorCode::BadArguments(format!( + "dictionary option {key} must be a positive integer", + ))); + } + } else if key == DICT_OPT_KEY_REDIS_DB_INDEX && !value.parse::().is_ok_and(|v| v <= 15) + { + return Err(ErrorCode::BadArguments(format!( + "dictionary option {key} must be between 0 to 15", + ))); + } + if options.insert(key.clone(), value).is_some() { + return Err(ErrorCode::BadArguments(format!( + "dictionary option {key} duplicated", + ))); + } + } else { + return Err(ErrorCode::BadArguments(format!( + "dictionary option {key} is not a valid option, required options are [`host`, `port`], optional options are [`username`, `password`, `db_index`]", + ))); + } + Ok(()) +} + +fn validate_dictionary_options( + source: &str, + source_options: &BTreeMap, +) -> Result> { + let mut options: BTreeMap = BTreeMap::new(); + match source { + "mysql" => { + for (key, value) in source_options { + insert_dictionary_sql_option_with_validation( + &mut options, + key.to_lowercase(), + value.to_string(), + )?; + } + let option_keys = options.keys().map(|k| k.as_str()).collect(); + let diff_keys = DICT_REQUIRED_SQL_OPTION_KEYS + .difference(&option_keys) + .collect::>() + .into_iter() + .join(", "); + if !diff_keys.is_empty() { + return Err(ErrorCode::BadArguments(format!( + "dictionary miss options {diff_keys}, required options are [`host`, `port`, `username`, `password`, `db`, `table`]", + ))); + } + } + "redis" => { + for (key, value) in source_options { + insert_dictionary_redis_option_with_validation( + &mut options, + key.to_lowercase(), + value.to_string(), + )?; + } + let option_keys = options.keys().map(|k| k.as_str()).collect(); + let diff_keys = DICT_REQUIRED_REDIS_OPTION_KEYS + .difference(&option_keys) + .collect::>() + .into_iter() + .join(", "); + if !diff_keys.is_empty() { + return Err(ErrorCode::BadArguments(format!( + "dictionary miss options {diff_keys}, required options are [`host`, `port`], optional options are [`username`, `password`, `db_index`]", + ))); + } + } + _ => unreachable!(), + } + + Ok(options) +} + +fn validate_mysql_fields(schema: &TableSchema) -> Result<()> { + for field in schema.fields() { + if !matches!( + field.data_type().remove_nullable(), + TableDataType::Boolean + | TableDataType::String + | TableDataType::Number(_) + | TableDataType::Date + | TableDataType::Timestamp + ) { + return Err(ErrorCode::BadArguments( + "The type of Mysql field must be in [`boolean`, `string`, `number`, `timestamp`, `date`]", + )); + } + } + Ok(()) +} + +fn validate_redis_fields(schema: &TableSchema) -> Result<()> { + let fields_names: Vec = schema.fields().iter().map(|f| f.name.clone()).collect(); + if fields_names.len() != 2 { + return Err(ErrorCode::BadArguments( + "The number of Redis fields must be two", + )); + } + for field in schema.fields() { + if field.data_type().remove_nullable() != TableDataType::String { + return Err(ErrorCode::BadArguments( + "The type of Redis field must be `string`", + )); + } + } + Ok(()) +} + impl Binder { #[async_backtrace::framed] pub(in crate::planner::binder) async fn bind_create_dictionary( @@ -59,59 +257,50 @@ impl Binder { database_id = db.get_db_info().database_id.db_id; } - let source = self.normalize_object_identifier(source_name); + let source = self.normalize_object_identifier(source_name).to_lowercase(); - if source.to_lowercase() != *"mysql" { - return Err(ErrorCode::UnsupportedDictionarySource(format!( - "The specified source '{}' is not currently supported.", - source.to_lowercase(), - ))); - } - // TODO: Authentication to connect to MySQL database will be implemented later - - let options: BTreeMap = source_options - .iter() - .map(|(k, v)| (k.to_lowercase(), v.to_string().to_lowercase())) - .collect(); - let required_options = ["host", "port", "username", "password", "db"]; - for option in required_options { - if !options.contains_key(option) { - return Err(ErrorCode::MissingDictionaryOption( - "The configuration is missing one or more required options. ".to_owned() - + "Please ensure you have provided values for 'host', 'port', 'username', 'password', and 'db'.", - )); - } - } - if required_options.len() != options.len() { - return Err(ErrorCode::UnsupportedDictionaryOption(format!( - "The provided options are not recognized." + if source != "mysql" && source != "redis" { + return Err(ErrorCode::BadArguments(format!( + "The specified source '{}' is not currently supported", + source, ))); } - let mut field_comments = BTreeMap::new(); - let mut primary_column_ids = Vec::new(); + // Check for options + let options = validate_dictionary_options(&source, source_options)?; + // Check for data source fields. let (schema, _) = self.analyze_create_table_schema_by_columns(columns).await?; - for table_field in schema.fields() { - if table_field.default_expr.is_some() || table_field.computed_expr.is_some() { - return Err(ErrorCode::WrongDictionaryFieldExpr( - "The table field configuration is invalid. ".to_owned() - + "Default expressions and computed expressions for the table fields should not be set.", - )); - } + match source.as_str() { + "redis" => validate_redis_fields(&schema)?, + "mysql" => validate_mysql_fields(&schema)?, + _ => unreachable!(), } + + // Collect field_comments. + let mut field_comments = BTreeMap::new(); for column in columns { if column.comment.is_some() { let column_id = schema.column_id_of(column.name.name.as_str())?; field_comments.insert(column_id, column.comment.clone().unwrap_or_default()); } } - for primary_key in primary_keys { - let pk_id = schema.column_id_of(primary_key.name.as_str())?; - primary_column_ids.push(pk_id); + + // Collect and check primary column. + let mut primary_column_ids = Vec::new(); + if primary_keys.len() != 1 { + return Err(ErrorCode::BadArguments("Only support one primary key")); } + let primary_key = match primary_keys.first() { + Some(pk) => pk.clone(), + None => return Err(ErrorCode::BadArguments("Miss primary key")), + }; + let pk_id = schema.column_id_of(primary_key.name.as_str())?; + primary_column_ids.push(pk_id); + // Comment. let comment = comment.clone().unwrap_or("".to_string()); + let meta = DictionaryMeta { source, options, @@ -121,7 +310,6 @@ impl Binder { comment, ..Default::default() }; - Ok(Plan::CreateDictionary(Box::new(CreateDictionaryPlan { create_option: create_option.clone(), tenant, diff --git a/src/query/sql/src/planner/plans/scalar_expr.rs b/src/query/sql/src/planner/plans/scalar_expr.rs index a3bf8b6421be..646c39dfb4b9 100644 --- a/src/query/sql/src/planner/plans/scalar_expr.rs +++ b/src/query/sql/src/planner/plans/scalar_expr.rs @@ -778,6 +778,43 @@ pub enum AsyncFunctionArgument { // Used by `nextval` function to call meta's `get_sequence_next_value` api // to get incremental values. SequenceFunction(String), + // The dictionary argument is connection URL of remote source, like Redis, MySQL ... + // Used by `dict_get` function to connect source and read data. + DictGetFunction(DictGetFunctionArgument), +} + +#[derive(Clone, Debug, Educe, serde::Serialize, serde::Deserialize)] +#[educe(PartialEq, Eq, Hash)] +pub struct RedisSource { + // Redis source connection URL, like `tcp://127.0.0.1:6379` + pub connection_url: String, + pub username: Option, + pub password: Option, + pub db_index: Option, +} + +#[derive(Clone, Debug, Educe, serde::Serialize, serde::Deserialize)] +#[educe(PartialEq, Eq, Hash)] +pub struct SqlSource { + // SQL source connection URL, like `mysql://user:password@localhost:3306/db` + pub connection_url: String, + pub table: String, + pub key_field: String, + pub value_field: String, +} + +#[derive(Clone, Debug, Educe, serde::Serialize, serde::Deserialize)] +#[educe(PartialEq, Eq, Hash)] +pub enum DictionarySource { + Mysql(SqlSource), + Redis(RedisSource), +} + +#[derive(Clone, Debug, Educe, serde::Serialize, serde::Deserialize)] +#[educe(PartialEq, Eq, Hash)] +pub struct DictGetFunctionArgument { + pub dict_source: DictionarySource, + pub default_value: Scalar, } // Asynchronous functions are functions that need to call remote interfaces. @@ -805,6 +842,9 @@ impl AsyncFunctionCall { let reply = catalog.get_sequence_next_value(req).await?; Ok(Scalar::Number(NumberScalar::UInt64(reply.start))) } + AsyncFunctionArgument::DictGetFunction(_dict_get_function_argument) => { + Err(ErrorCode::Internal("Cannot generate dict_get function")) + } } } } diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index e12b967979b6..0f5bedf86789 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -93,6 +93,8 @@ use databend_common_meta_app::principal::LambdaUDF; use databend_common_meta_app::principal::UDFDefinition; use databend_common_meta_app::principal::UDFScript; use databend_common_meta_app::principal::UDFServer; +use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent; +use databend_common_meta_app::schema::DictionaryIdentity; use databend_common_meta_app::schema::GetSequenceReq; use databend_common_meta_app::schema::SequenceIdent; use databend_common_storage::init_stage_operator; @@ -115,6 +117,7 @@ use crate::binder::CteInfo; use crate::binder::ExprContext; use crate::binder::InternalColumnBinding; use crate::binder::NameResolutionResult; +use crate::field_default_value; use crate::optimizer::RelExpr; use crate::optimizer::SExpr; use crate::parse_lambda_expr; @@ -130,13 +133,17 @@ use crate::plans::BoundColumnRef; use crate::plans::CastExpr; use crate::plans::ComparisonOp; use crate::plans::ConstantExpr; +use crate::plans::DictGetFunctionArgument; +use crate::plans::DictionarySource; use crate::plans::FunctionCall; use crate::plans::LagLeadFunction; use crate::plans::LambdaFunc; use crate::plans::NthValueFunction; use crate::plans::NtileFunction; +use crate::plans::RedisSource; use crate::plans::ScalarExpr; use crate::plans::ScalarItem; +use crate::plans::SqlSource; use crate::plans::SubqueryExpr; use crate::plans::SubqueryType; use crate::plans::UDFCall; @@ -3770,9 +3777,9 @@ impl<'a> TypeChecker<'a> { let original_context = self.bind_context.expr_context.clone(); self.bind_context .set_expr_context(ExprContext::InAsyncFunction); - let result = match func_name { "nextval" => self.resolve_nextval_async_function(span, func_name, arguments)?, + "dict_get" => self.resolve_dict_get_async_function(span, func_name, arguments)?, _ => { return Err(ErrorCode::SemanticError(format!( "cannot find async function {}", @@ -3848,6 +3855,165 @@ impl<'a> TypeChecker<'a> { Ok(Box::new((async_func.into(), return_type))) } + fn resolve_dict_get_async_function( + &mut self, + span: Span, + func_name: &str, + args: &[&Expr], + ) -> Result> { + if args.len() != 3 { + return Err(ErrorCode::SemanticError(format!( + "dict_get function need three arguments but got {}", + args.len() + )) + .set_span(span)); + } + let tenant = self.ctx.get_tenant(); + let catalog = self.ctx.get_default_catalog()?; + + let dict_name_arg = args[0]; + let field_arg = args[1]; + let key_arg = args[2]; + + // Get dict_name and dict_meta. + let (db_name, dict_name) = if let Expr::ColumnRef { column, .. } = dict_name_arg { + if column.database.is_some() { + return Err(ErrorCode::SemanticError( + "dict_get function argument identifier should contain one or two parts" + .to_string(), + ) + .set_span(dict_name_arg.span())); + } + let db_name = match &column.table { + Some(ident) => normalize_identifier(ident, self.name_resolution_ctx).name, + None => self.ctx.get_current_database(), + }; + let dict_name = match &column.column { + ColumnID::Name(ident) => normalize_identifier(ident, self.name_resolution_ctx).name, + ColumnID::Position(pos) => { + return Err(ErrorCode::SemanticError(format!( + "dict_get function argument don't support identifier {}", + pos + )) + .set_span(dict_name_arg.span())); + } + }; + (db_name, dict_name) + } else { + return Err(ErrorCode::SemanticError( + "async function can only used as column".to_string(), + ) + .set_span(dict_name_arg.span())); + }; + let db = databend_common_base::runtime::block_on( + catalog.get_database(&tenant, db_name.as_str()), + )?; + let db_id = db.get_db_info().database_id.db_id; + let req = DictionaryNameIdent::new( + tenant.clone(), + DictionaryIdentity::new(db_id, dict_name.clone()), + ); + let reply = databend_common_base::runtime::block_on(catalog.get_dictionary(req))?; + let dictionary = if let Some(r) = reply { + r.dictionary_meta + } else { + return Err(ErrorCode::UnknownDictionary(format!( + "Unknown dictionary {}", + dict_name, + ))); + }; + + // Get attr_name, attr_type and return_type. + let box (field_scalar, _field_data_type) = self.resolve(field_arg)?; + let Ok(field_expr) = ConstantExpr::try_from(field_scalar.clone()) else { + return Err(ErrorCode::SemanticError(format!( + "invalid arguments for dict_get function, attr_name must be a constant string, but got {}", + field_arg + )) + .set_span(field_scalar.span())); + }; + let Some(attr_name) = field_expr.value.as_string() else { + return Err(ErrorCode::SemanticError(format!( + "invalid arguments for dict_get function, attr_name must be a constant string, but got {}", + field_arg + )) + .set_span(field_scalar.span())); + }; + let attr_field = dictionary.schema.field_with_name(attr_name)?; + let attr_type: DataType = (&attr_field.data_type).into(); + let default_value = field_default_value(self.ctx.clone(), attr_field)?; + + // Get primary_key_value and check type. + let primary_column_id = dictionary.primary_column_ids[0]; + let primary_field = dictionary.schema.field_of_column_id(primary_column_id)?; + let primary_type: DataType = (&primary_field.data_type).into(); + + let mut args = Vec::with_capacity(1); + let box (key_scalar, key_type) = self.resolve(key_arg)?; + + if primary_type != key_type { + args.push(wrap_cast(&key_scalar, &primary_type)); + } else { + args.push(key_scalar); + } + let dict_source = match dictionary.source.as_str() { + "mysql" => { + let connection_url = dictionary.build_sql_connection_url()?; + let table = dictionary + .options + .get("table") + .ok_or_else(|| ErrorCode::BadArguments("Miss option `table`"))?; + DictionarySource::Mysql(SqlSource { + connection_url, + table: table.to_string(), + key_field: primary_field.name.clone(), + value_field: attr_field.name.clone(), + }) + } + "redis" => { + let connection_url = dictionary.build_redis_connection_url()?; + let username = dictionary.options.get("username").cloned(); + let password = dictionary.options.get("password").cloned(); + let db_index = dictionary + .options + .get("db_index") + .map(|i| i.parse::().unwrap()); + DictionarySource::Redis(RedisSource { + connection_url, + username, + password, + db_index, + }) + } + _ => { + return Err(ErrorCode::Unimplemented(format!( + "Unsupported source {}", + dictionary.source + ))); + } + }; + + let dict_get_func_arg = DictGetFunctionArgument { + dict_source, + default_value, + }; + let display_name = format!( + "{}({}.{}, {}, {})", + func_name, db_name, dict_name, field_arg, key_arg, + ); + Ok(Box::new(( + ScalarExpr::AsyncFunctionCall(AsyncFunctionCall { + span, + func_name: func_name.to_string(), + display_name, + return_type: Box::new(attr_type.clone()), + arguments: args, + func_arg: AsyncFunctionArgument::DictGetFunction(dict_get_func_arg), + }), + attr_type, + ))) + } + fn resolve_cast_to_variant( &mut self, span: Span, diff --git a/tests/sqllogictests/Cargo.toml b/tests/sqllogictests/Cargo.toml index 69c5e7f51672..23ba001cfae4 100644 --- a/tests/sqllogictests/Cargo.toml +++ b/tests/sqllogictests/Cargo.toml @@ -16,6 +16,7 @@ name = "databend-sqllogictests" [dependencies] async-trait = { workspace = true } clap = { workspace = true } +databend-common-base = { workspace = true } databend-common-exception = { workspace = true } env_logger = "0.10.0" futures-util = { workspace = true } diff --git a/tests/sqllogictests/src/lib.rs b/tests/sqllogictests/src/lib.rs index 858f2532c116..97e7972c89f2 100644 --- a/tests/sqllogictests/src/lib.rs +++ b/tests/sqllogictests/src/lib.rs @@ -13,4 +13,5 @@ pub mod arg; pub mod client; pub mod error; +pub mod mock_source; pub mod util; diff --git a/tests/sqllogictests/src/main.rs b/tests/sqllogictests/src/main.rs index 3d89f0e99a6a..3bf96392c43a 100644 --- a/tests/sqllogictests/src/main.rs +++ b/tests/sqllogictests/src/main.rs @@ -18,6 +18,7 @@ use std::path::Path; use std::time::Instant; use clap::Parser; +use databend_sqllogictests::mock_source::run_redis_source; use futures_util::stream; use futures_util::StreamExt; use sqllogictest::default_column_validator; @@ -74,6 +75,12 @@ impl sqllogictest::AsyncDB for Databend { #[tokio::main] pub async fn main() -> Result<()> { env_logger::init(); + + // Run a mock Redis server for dictionary tests. + databend_common_base::runtime::spawn(async move { + run_redis_source().await; + }); + let args = SqlLogicTestArgs::parse(); let handlers = match &args.handlers { Some(hs) => hs.iter().map(|s| s.as_str()).collect(), diff --git a/tests/sqllogictests/src/mock_source/mod.rs b/tests/sqllogictests/src/mock_source/mod.rs new file mode 100644 index 000000000000..05040f92fa14 --- /dev/null +++ b/tests/sqllogictests/src/mock_source/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod redis_source; +pub use redis_source::run_redis_source; diff --git a/tests/sqllogictests/src/mock_source/redis_source.rs b/tests/sqllogictests/src/mock_source/redis_source.rs new file mode 100644 index 000000000000..905de7cbf603 --- /dev/null +++ b/tests/sqllogictests/src/mock_source/redis_source.rs @@ -0,0 +1,124 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; + +use tokio::net::TcpListener; +use tokio::net::TcpStream; + +pub async fn run_redis_source() { + // Bind the listener to the address + let listener = TcpListener::bind("0.0.0.0:6379").await.unwrap(); + + loop { + let (socket, _) = listener.accept().await.unwrap(); + + // A new task is spawned for each inbound socket. The socket is + // moved to the new task and processed there. + databend_common_base::runtime::spawn(async move { + process(socket).await; + }); + } +} + +async fn process(stream: TcpStream) { + let mut buf = Vec::with_capacity(4096); + loop { + buf.clear(); + // Wait for the socket to be readable + stream.readable().await.unwrap(); + + let mut ret_values = VecDeque::new(); + match stream.try_read_buf(&mut buf) { + Ok(0) => break, + Ok(_) => { + let request = String::from_utf8(buf.clone()).unwrap(); + let cmds = parse_resp(request); + for cmd in cmds { + if let Command::Get(key) = cmd { + // Return a value if the first character of the key is ASCII alphanumeric, + // otherwise treat it as the key does not exist. + let ret_value = if key.starts_with(|c: char| c.is_ascii_alphanumeric()) { + let v = format!("{}_value", key); + format!("${}\r\n{}\r\n", v.len(), v) + } else { + "$-1\r\n".to_string() + }; + ret_values.push_back(ret_value); + } else { + let ret_value = "+OK\r\n".to_string(); + ret_values.push_back(ret_value); + } + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(_) => { + let ret_value = "+OK\r\n".to_string(); + ret_values.push_back(ret_value); + } + } + + while let Some(ret_value) = ret_values.pop_front() { + // Wait for the socket to be writable + stream.writable().await.unwrap(); + + match stream.try_write(ret_value.as_bytes()) { + Ok(_) => {} + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(_) => { + break; + } + } + } + } +} + +// Redis command, only support get, other commands are ignored. +enum Command { + Get(String), + Invalid, + Other, +} + +// parse RESP(REdis Serialization Protocol) +// for example: "*2\r\n$3\r\nGET\r\n$2\r\nabc\r\n" +fn parse_resp(request: String) -> Vec { + // split by \r\n + let mut lines = request.split("\r\n").collect::>(); + let mut cmds = Vec::new(); + while !lines.is_empty() { + if lines[0].is_empty() { + break; + } + let len: usize = lines[0][1..].parse().unwrap(); + let n = 2 * len + 1; + if lines.len() < n { + cmds.push(Command::Invalid); + return cmds; + } + // only parse GET command and ingore other commands + if lines[2] == "GET" { + let cmd = Command::Get(lines[4].to_string()); + cmds.push(cmd); + } else { + cmds.push(Command::Other); + } + lines.drain(0..n); + } + cmds +} diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0037_ddl_dictionary.test b/tests/sqllogictests/suites/base/05_ddl/05_0037_ddl_dictionary.test index d41c3dd58bbe..09e8bc224c4c 100644 --- a/tests/sqllogictests/suites/base/05_ddl/05_0037_ddl_dictionary.test +++ b/tests/sqllogictests/suites/base/05_ddl/05_0037_ddl_dictionary.test @@ -10,48 +10,68 @@ DROP DICTIONARY IF EXISTS d3 statement ok DROP DICTIONARY IF EXISTS d4 -statement error 3117 -CREATE DICTIONARY d(c1 int, c2 Varchar) PRIMARY KEY c1 SOURCE(postgresql(host='localhost' port='3306' username='root' password='1234' db='db1')) +statement error 1006 +CREATE DICTIONARY d(c1 int, c2 Varchar) PRIMARY KEY c1 SOURCE(postgresql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) statement ok -CREATE DICTIONARY d(c1 int, c2 Varchar) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) +CREATE DICTIONARY d(c1 VARCHAR NOT NULL, c2 VARCHAR NOT NULL) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) statement ok -CREATE DICTIONARY IF NOT EXISTS d(c1 int, c2 Varchar) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) +CREATE DICTIONARY IF NOT EXISTS d(c1 int NOT NULL, c2 Varchar NOT NULL) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) statement error 3113 -CREATE DICTIONARY d(c1 int, c2 Varchar) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) +CREATE DICTIONARY d(c1 int NOT NULL, c2 Varchar NOT NULL) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +statement error 1006 +CREATE DICTIONARY d(c1 int NOT NULL, c2 Varchar NOT NULL) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) statement ok -CREATE DICTIONARY d2(a int, b int) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) +CREATE DICTIONARY d2(a int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) statement error 3113 -CREATE DICTIONARY d2(a int, b int) PRIMARY KEY b SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) +CREATE DICTIONARY d2(a int NOT NULL, b int NOT NULL) PRIMARY KEY b SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) -statement error 3118 -create dictionary d3(`a` int, b int) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234')) +statement error 1006 +create dictionary d3(`a` int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' table='test_table')) statement ok -create dictionary d3(`a` int, b int) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) +create dictionary d3(`a` int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) statement ok -create or replace dictionary d3(a int, b Varchar) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) comment 'comment' +create or replace dictionary d3(a int NOT NULL, b Varchar NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) comment 'comment' + +statement error 1006 +create dictionary d4(a int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' name='dummy' table='test_table')) -statement error 3116 -create dictionary d4(a int, b int) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' name='dummy')) +statement ok +create or replace dictionary d4(a Varchar NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) statement ok -create or replace dictionary d4(a Varchar, b int) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) +CREATE or replace DICTIONARY d5(key string not null, value string not null) PRIMARY KEY key SOURCE(redis(host='127.0.0.1' port='6379')) + +statement error 1006 +create or replace dictionary d5(key string not null, value string not null) PRIMARY KEY key SOURCE(redis(host='127.0.0.1' port='6379' db_index='19')) + +statement error 1006 +create or replace dictionary d5(key string not null, value string not null) PRIMARY KEY key SOURCE(redis(host='127.0.0.1')) + +statement error 1006 +create or replace dictionary d5(key int not null, value int not null) PRIMARY KEY key SOURCE(redis(host='127.0.0.1' port='6379')) query TT show create dictionary d ---- -d CREATE DICTIONARY d ( c1 INT NULL, c2 VARCHAR NULL ) PRIMARY KEY c1 SOURCE(mysql(db='db1' host='localhost' password='[HIDDEN]' port='3306' username='root')) +d CREATE DICTIONARY d ( c1 VARCHAR NOT NULL, c2 VARCHAR NOT NULL ) PRIMARY KEY c1 SOURCE(mysql(db='db1' host='localhost' password='[HIDDEN]' port='3306' table='test_table' username='root')) query TT show create dictionary d3 ---- -d3 CREATE DICTIONARY d3 ( a INT NULL, b VARCHAR NULL ) PRIMARY KEY a SOURCE(mysql(db='db1' host='localhost' password='[HIDDEN]' port='3306' username='root')) COMMENT 'comment' +d3 CREATE DICTIONARY d3 ( a INT NOT NULL, b VARCHAR NOT NULL ) PRIMARY KEY a SOURCE(mysql(db='db1' host='localhost' password='[HIDDEN]' port='3306' table='test_table' username='root')) COMMENT 'comment' + +query TT +show create dictionary d5 +---- +d5 CREATE DICTIONARY d5 ( key VARCHAR NOT NULL, value VARCHAR NOT NULL ) PRIMARY KEY key SOURCE(redis(host='127.0.0.1' port='6379')) statement error 3114 show create dictionary test @@ -68,6 +88,9 @@ DROP DICTIONARY IF EXISTS d3 statement ok DROP DICTIONARY IF EXISTS d4 +statement ok +DROP DICTIONARY IF EXISTS d5 + statement error 3114 drop dictionary test @@ -78,7 +101,7 @@ statement ok CREATE DATABASE db1 statement ok -CREATE DICTIONARY db1.test1(a int, b int) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1')) +CREATE DICTIONARY db1.test1(a int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) statement ok DROP DATABASE db1 diff --git a/tests/sqllogictests/suites/query/functions/02_0077_function_dict_get.test b/tests/sqllogictests/suites/query/functions/02_0077_function_dict_get.test new file mode 100644 index 000000000000..3e45de55b4ee --- /dev/null +++ b/tests/sqllogictests/suites/query/functions/02_0077_function_dict_get.test @@ -0,0 +1,31 @@ +statement ok +create or replace table t(a string) + +statement ok +insert into t values('a'),('b'),('%c') + +statement ok +CREATE OR REPLACE DICTIONARY d(key string not null, value string not null) PRIMARY KEY key SOURCE(redis(host='127.0.0.1' port='6379')) + +query T +select a, dict_get(d, 'value', a) from t +---- +a a_value +b b_value +%c (empty) + +query T +SELECT dict_get(d, 'value', 'b') +---- +b_value + +statement error 1006 +select dict_get(d, 'value11', 'a') + +statement error 3114 +select dict_get(test, 'value', 'b') + +query T +SELECT dict_get(d, 'value', 1) +---- +1_value \ No newline at end of file