diff --git a/Cargo.lock b/Cargo.lock index 4764857f27c67..498524f071c22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3085,6 +3085,7 @@ dependencies = [ "common-meta-app", "common-metrics", "common-pipeline-core", + "common-pipeline-sources", "common-sql", "common-storages-fuse", "futures", @@ -3168,6 +3169,7 @@ dependencies = [ "minitrace-opentelemetry", "opentelemetry", "opentelemetry-otlp", + "opentelemetry_sdk", "serde", "serde_json", "tonic 0.10.2", @@ -3193,6 +3195,7 @@ dependencies = [ "common-meta-kvapi", "common-meta-store", "common-meta-types", + "enumflags2", "jwt-simple", "log", "p256 0.13.0", @@ -8655,6 +8658,8 @@ dependencies = [ "regex", "serde_json", "thiserror", + "tokio", + "tokio-stream", ] [[package]] diff --git a/src/binaries/meta/entry.rs b/src/binaries/meta/entry.rs index e60baeaadb5e0..2f8e7fe1fb660 100644 --- a/src/binaries/meta/entry.rs +++ b/src/binaries/meta/entry.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::env; use std::ops::Deref; use std::sync::Arc; @@ -78,7 +79,12 @@ pub async fn entry(conf: Config) -> anyhow::Result<()> { "databend-meta-{}@{}", conf.raft_config.id, conf.raft_config.cluster_name ); - let _guards = init_logging(&app_name_shuffle, &conf.log); + let mut log_labels = BTreeMap::new(); + log_labels.insert( + "cluster_name".to_string(), + conf.raft_config.cluster_name.clone(), + ); + let _guards = init_logging(&app_name_shuffle, &conf.log, log_labels); info!("Databend Meta version: {}", METASRV_COMMIT_VERSION.as_str()); info!( @@ -139,6 +145,7 @@ pub async fn entry(conf: Config) -> anyhow::Result<()> { println!("Log:"); println!(" File: {}", conf.log.file); println!(" Stderr: {}", conf.log.stderr); + println!(" OTLP: {}", conf.log.otlp); println!(" Tracing: {}", conf.log.tracing); println!("Id: {}", conf.raft_config.id); println!("Raft Cluster Name: {}", conf.raft_config.cluster_name); diff --git a/src/binaries/metabench/main.rs b/src/binaries/metabench/main.rs index 93ac91a15473d..0dce97bf7b61b 100644 --- a/src/binaries/metabench/main.rs +++ b/src/binaries/metabench/main.rs @@ -14,6 +14,7 @@ #![allow(clippy::uninlined_format_args)] +use std::collections::BTreeMap; use std::fmt::Debug; use std::fmt::Display; use std::sync::Arc; @@ -44,6 +45,7 @@ use common_meta_types::Operation; use common_meta_types::TxnRequest; use common_tracing::init_logging; use common_tracing::FileConfig; +use common_tracing::OTLPConfig; use common_tracing::QueryLogConfig; use common_tracing::StderrConfig; use common_tracing::TracingConfig; @@ -96,18 +98,12 @@ async fn main() { level: "WARN".to_string(), format: "text".to_string(), }, - query: QueryLogConfig { - on: false, - dir: "./.databend/logs/query-details".to_string(), - }, - tracing: TracingConfig { - on: false, - capture_log_level: "TRACE".to_string(), - otlp_endpoint: "http://127.0.0.1:4317".to_string(), - }, + otlp: OTLPConfig::default(), + query: QueryLogConfig::default(), + tracing: TracingConfig::default(), }; - let _guards = init_logging("databend-metabench", &log_config); + let _guards = init_logging("databend-metabench", &log_config, BTreeMap::new()); println!("config: {:?}", config); if config.grpc_api_address.is_empty() { diff --git a/src/binaries/metactl/main.rs b/src/binaries/metactl/main.rs index 4c23cbc1a1a04..65c409a0b438c 100644 --- a/src/binaries/metactl/main.rs +++ b/src/binaries/metactl/main.rs @@ -15,12 +15,14 @@ #![allow(clippy::uninlined_format_args)] mod grpc; +use common_tracing::OTLPConfig; use common_tracing::QueryLogConfig; use common_tracing::TracingConfig; use grpc::export_meta; mod snapshot; +use std::collections::BTreeMap; use std::time::Duration; use clap::Parser; @@ -122,11 +124,12 @@ async fn main() -> anyhow::Result<()> { format: "text".to_string(), }, stderr: StderrConfig::default(), + otlp: OTLPConfig::default(), query: QueryLogConfig::default(), tracing: TracingConfig::default(), }; - let _guards = init_logging("metactl", &log_config); + let _guards = init_logging("metactl", &log_config, BTreeMap::new()); if config.status { return show_status(&config).await; diff --git a/src/binaries/query/entry.rs b/src/binaries/query/entry.rs index 231b2d4c088a0..649fcfef2cfa5 100644 --- a/src/binaries/query/entry.rs +++ b/src/binaries/query/entry.rs @@ -239,6 +239,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> { println!("Logging:"); println!(" file: {}", conf.log.file); println!(" stderr: {}", conf.log.stderr); + println!(" otlp: {}", conf.log.otlp); println!(" query: {}", conf.log.query); println!(" tracing: {}", conf.log.tracing); println!( diff --git a/src/common/tracing/Cargo.toml b/src/common/tracing/Cargo.toml index bab88d902fd22..183abe876fe2a 100644 --- a/src/common/tracing/Cargo.toml +++ b/src/common/tracing/Cargo.toml @@ -25,8 +25,9 @@ humantime = "2.1.0" log = { workspace = true } minitrace = { workspace = true } minitrace-opentelemetry = "0.6" -opentelemetry = { version = "0.20", features = ["trace"] } -opentelemetry-otlp = { version = "0.13", features = ["trace"] } +opentelemetry = { version = "0.20", features = ["trace", "logs"] } +opentelemetry-otlp = { version = "0.13", features = ["trace", "logs"] } +opentelemetry_sdk = { version = "0.20", features = ["trace", "logs", "rt-tokio"] } serde = { workspace = true } serde_json = "1" tonic = { workspace = true } diff --git a/src/common/tracing/src/config.rs b/src/common/tracing/src/config.rs index e2358d0331b78..a5baba5fc02bc 100644 --- a/src/common/tracing/src/config.rs +++ b/src/common/tracing/src/config.rs @@ -20,6 +20,7 @@ use std::fmt::Formatter; pub struct Config { pub file: FileConfig, pub stderr: StderrConfig, + pub otlp: OTLPConfig, pub query: QueryLogConfig, pub tracing: TracingConfig, } @@ -39,9 +40,15 @@ impl Config { level: "WARN".to_string(), format: "text".to_string(), }, + otlp: OTLPConfig { + on: false, + level: "INFO".to_string(), + endpoint: "http://127.0.0.1:4317".to_string(), + }, query: QueryLogConfig { - on: true, - dir: "./.databend/logs/query-details".to_string(), + on: false, + dir: "".to_string(), + otlp_endpoint: "".to_string(), }, tracing: TracingConfig { on: false, @@ -115,23 +122,56 @@ impl Default for StderrConfig { } } +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] +pub struct OTLPConfig { + pub on: bool, + pub level: String, + pub endpoint: String, +} + +impl Display for OTLPConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "enabled={}, level={}, endpoint={}", + self.on, self.level, self.endpoint + ) + } +} + +impl Default for OTLPConfig { + fn default() -> Self { + Self { + on: false, + level: "INFO".to_string(), + endpoint: "http://127.0.0.1:4317".to_string(), + } + } +} + #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] pub struct QueryLogConfig { pub on: bool, pub dir: String, + pub otlp_endpoint: String, } impl Display for QueryLogConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "enabled={}, dir={}", self.on, self.dir) + write!( + f, + "enabled={}, dir={}, otlp_endpoint={}", + self.on, self.dir, self.otlp_endpoint + ) } } impl Default for QueryLogConfig { fn default() -> Self { Self { - on: true, - dir: "./.databend/logs/query-details".to_string(), + on: false, + dir: "".to_string(), + otlp_endpoint: "".to_string(), } } } @@ -158,7 +198,7 @@ impl Default for TracingConfig { Self { on: false, capture_log_level: "INFO".to_string(), - otlp_endpoint: "http://localhost:4317".to_string(), + otlp_endpoint: "http://127.0.0.1:4317".to_string(), } } } diff --git a/src/common/tracing/src/minitrace.rs b/src/common/tracing/src/init.rs similarity index 70% rename from src/common/tracing/src/minitrace.rs rename to src/common/tracing/src/init.rs index 130166ac1f54e..917d6c37afdf1 100644 --- a/src/common/tracing/src/minitrace.rs +++ b/src/common/tracing/src/init.rs @@ -13,9 +13,8 @@ // limitations under the License. use std::borrow::Cow; +use std::collections::BTreeMap; use std::fmt; -use std::io; -use std::io::BufWriter; use std::io::Write; use std::time::Duration; use std::time::SystemTime; @@ -27,11 +26,10 @@ use log::LevelFilter; use log::Log; use minitrace::prelude::*; use serde_json::Map; -use tracing_appender::non_blocking::NonBlocking; -use tracing_appender::non_blocking::WorkerGuard; -use tracing_appender::rolling::RollingFileAppender; -use tracing_appender::rolling::Rotation; +use crate::loggers::new_file_log_writer; +use crate::loggers::new_otlp_log_writer; +use crate::loggers::MinitraceLogger; use crate::Config; const HEADER_TRACE_PARENT: &str = "traceparent"; @@ -42,8 +40,8 @@ pub struct GlobalLogger { } impl GlobalLogger { - pub fn init(name: &str, cfg: &Config) { - let _guards = init_logging(name, cfg); + pub fn init(name: &str, cfg: &Config, labels: BTreeMap) { + let _guards = init_logging(name, cfg, labels); GlobalInstance::set(Self { _guards }); } } @@ -72,8 +70,16 @@ pub fn inject_span_to_tonic_request(msg: impl tonic::IntoRequest) -> tonic } #[allow(dyn_drop)] -pub fn init_logging(name: &str, cfg: &Config) -> Vec> { +pub fn init_logging( + name: &str, + cfg: &Config, + mut labels: BTreeMap, +) -> Vec> { let mut guards: Vec> = Vec::new(); + // use name as service name if not specified + if !labels.contains_key("service") { + labels.insert("service".to_string(), name.to_string()); + } // Initialize tracing reporter if cfg.tracing.on { @@ -131,28 +137,36 @@ pub fn init_logging(name: &str, cfg: &Config) -> Vec); + normal_logger = normal_logger.chain(dispatch); + } - normal_logger = normal_logger.chain( - fern::Dispatch::new() - .level(cfg.file.level.parse().unwrap_or(LevelFilter::Info)) - .format(formatter(&cfg.file.format)) - .chain(Box::new(normal_log_file) as Box), - ); + // Console logger + if cfg.stderr.on { + let dispatch = fern::Dispatch::new() + .level(cfg.stderr.level.parse().unwrap_or(LevelFilter::Info)) + .format(formatter(&cfg.stderr.format)) + .chain(std::io::stderr()); + normal_logger = normal_logger.chain(dispatch) + } + + // OpenTelemetry logger + if cfg.otlp.on { + let mut labels = labels.clone(); + labels.insert("category".to_string(), "system".to_string()); + let logger = new_otlp_log_writer(&cfg.tracing.otlp_endpoint, labels); + let dispatch = fern::Dispatch::new() + .level(cfg.otlp.level.parse().unwrap_or(LevelFilter::Info)) + .format(formatter("json")) + .chain(Box::new(logger) as Box); + normal_logger = normal_logger.chain(dispatch); } // Log to minitrace @@ -172,11 +186,17 @@ pub fn init_logging(name: &str, cfg: &Config) -> Vec); + if !cfg.query.dir.is_empty() { + let (query_log_file, flush_guard) = new_file_log_writer(&cfg.query.dir, name); + guards.push(Box::new(flush_guard)); + query_logger = query_logger.chain(Box::new(query_log_file) as Box); + } + if !cfg.query.otlp_endpoint.is_empty() { + let mut labels = labels.clone(); + labels.insert("category".to_string(), "query".to_string()); + let logger = new_otlp_log_writer(&cfg.tracing.otlp_endpoint, labels); + query_logger = query_logger.chain(Box::new(logger) as Box); + } } let logger = fern::Dispatch::new() @@ -194,7 +214,7 @@ pub fn init_logging(name: &str, cfg: &Config) -> Vec) -> bool { - true - } - - fn log(&self, record: &log::Record<'_>) { - if record.key_values().count() == 0 { - minitrace::Event::add_to_local_parent(record.level().as_str(), || { - [("message".into(), format!("{}", record.args()).into())] - }); - } else { - minitrace::Event::add_to_local_parent(record.level().as_str(), || { - let mut pairs = Vec::with_capacity(record.key_values().count() + 1); - pairs.push(("message".into(), format!("{}", record.args()).into())); - let mut visitor = KvCollector { fields: &mut pairs }; - record.key_values().visit(&mut visitor).ok(); - pairs - }); - } - - struct KvCollector<'a> { - fields: &'a mut Vec<(Cow<'static, str>, Cow<'static, str>)>, - } - - impl<'a, 'kvs> log::kv::Visitor<'kvs> for KvCollector<'a> { - fn visit_pair( - &mut self, - key: log::kv::Key<'kvs>, - value: log::kv::Value<'kvs>, - ) -> Result<(), log::kv::Error> { - self.fields - .push((key.as_str().to_string().into(), value.to_string().into())); - Ok(()) - } - } - } - - fn flush(&self) {} -} - -/// Create a `BufWriter` for a rolling file logger. -/// -/// `BufWriter` collects log segments into a whole before sending to underlying writer. -/// `NonBlocking` sends log to another thread to execute the write IO to avoid blocking the thread -/// that calls `log`. -/// -/// Note that `NonBlocking` will discard logs if there are too many `io::Write::write(NonBlocking)`, -/// especially when `fern` sends log segments one by one to the `Writer`. -/// Therefore a `BufWriter` is used to reduce the number of `io::Write::write(NonBlocking)`. -fn new_file_log_writer(dir: &str, name: impl ToString) -> (BufWriter, WorkerGuard) { - let rolling = RollingFileAppender::new(Rotation::HOURLY, dir, name.to_string()); - let (non_blocking, flush_guard) = tracing_appender::non_blocking(rolling); - let buffered_non_blocking = io::BufWriter::with_capacity(64 * 1024 * 1024, non_blocking); - - (buffered_non_blocking, flush_guard) -} diff --git a/src/common/tracing/src/lib.rs b/src/common/tracing/src/lib.rs index 488e00e72e7c3..2362ef8fb143c 100644 --- a/src/common/tracing/src/lib.rs +++ b/src/common/tracing/src/lib.rs @@ -16,18 +16,20 @@ #![allow(clippy::uninlined_format_args)] mod config; -mod minitrace; +mod init; +mod loggers; mod panic_hook; pub use crate::config::Config; pub use crate::config::FileConfig; +pub use crate::config::OTLPConfig; pub use crate::config::QueryLogConfig; pub use crate::config::StderrConfig; pub use crate::config::TracingConfig; -pub use crate::minitrace::init_logging; -pub use crate::minitrace::inject_span_to_tonic_request; -pub use crate::minitrace::start_trace_for_remote_request; -pub use crate::minitrace::GlobalLogger; +pub use crate::init::init_logging; +pub use crate::init::inject_span_to_tonic_request; +pub use crate::init::start_trace_for_remote_request; +pub use crate::init::GlobalLogger; pub use crate::panic_hook::log_panic; pub use crate::panic_hook::set_panic_hook; diff --git a/src/common/tracing/src/loggers.rs b/src/common/tracing/src/loggers.rs new file mode 100644 index 0000000000000..41e949da034f6 --- /dev/null +++ b/src/common/tracing/src/loggers.rs @@ -0,0 +1,165 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::BTreeMap; +use std::io::BufWriter; +use std::time::Duration; +use std::time::SystemTime; + +use opentelemetry::logs::AnyValue; +use opentelemetry::logs::Logger as _; +use opentelemetry::logs::Severity; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::logs::Logger; +use tracing_appender::non_blocking::NonBlocking; +use tracing_appender::non_blocking::WorkerGuard; +use tracing_appender::rolling::RollingFileAppender; +use tracing_appender::rolling::Rotation; + +/// Create a `BufWriter` for a rolling file logger. +/// +/// `BufWriter` collects log segments into a whole before sending to underlying writer. +/// `NonBlocking` sends log to another thread to execute the write IO to avoid blocking the thread +/// that calls `log`. +/// +/// Note that `NonBlocking` will discard logs if there are too many `io::Write::write(NonBlocking)`, +/// especially when `fern` sends log segments one by one to the `Writer`. +/// Therefore a `BufWriter` is used to reduce the number of `io::Write::write(NonBlocking)`. +pub(crate) fn new_file_log_writer( + dir: &str, + name: impl ToString, +) -> (BufWriter, WorkerGuard) { + let rolling = RollingFileAppender::new(Rotation::HOURLY, dir, name.to_string()); + let (non_blocking, flush_guard) = tracing_appender::non_blocking(rolling); + let buffered_non_blocking = BufWriter::with_capacity(64 * 1024 * 1024, non_blocking); + + (buffered_non_blocking, flush_guard) +} + +pub(crate) struct MinitraceLogger; + +impl log::Log for MinitraceLogger { + fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool { + true + } + + fn log(&self, record: &log::Record<'_>) { + if record.key_values().count() == 0 { + minitrace::Event::add_to_local_parent(record.level().as_str(), || { + [("message".into(), format!("{}", record.args()).into())] + }); + } else { + minitrace::Event::add_to_local_parent(record.level().as_str(), || { + let mut pairs = Vec::with_capacity(record.key_values().count() + 1); + pairs.push(("message".into(), format!("{}", record.args()).into())); + let mut visitor = KvCollector { fields: &mut pairs }; + record.key_values().visit(&mut visitor).ok(); + pairs + }); + } + + struct KvCollector<'a> { + fields: &'a mut Vec<(Cow<'static, str>, Cow<'static, str>)>, + } + + impl<'a, 'kvs> log::kv::Visitor<'kvs> for KvCollector<'a> { + fn visit_pair( + &mut self, + key: log::kv::Key<'kvs>, + value: log::kv::Value<'kvs>, + ) -> Result<(), log::kv::Error> { + self.fields + .push((key.as_str().to_string().into(), value.to_string().into())); + Ok(()) + } + } + } + + fn flush(&self) {} +} + +pub(crate) struct OpenTelemetryOTLPLogWriter { + logger: Logger, +} + +pub(crate) fn new_otlp_log_writer( + endpoint: &str, + labels: BTreeMap, +) -> OpenTelemetryOTLPLogWriter { + let kvs = labels + .into_iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k, v)) + .collect::>(); + let log_config = opentelemetry_sdk::logs::Config { + resource: Cow::Owned(opentelemetry::sdk::Resource::new(kvs)), + }; + let export_config = opentelemetry_otlp::ExportConfig { + endpoint: endpoint.to_string(), + protocol: opentelemetry_otlp::Protocol::Grpc, + timeout: Duration::from_secs(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT), + }; + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_export_config(export_config); + let logger = opentelemetry_otlp::new_pipeline() + .logging() + .with_exporter(exporter) + .with_log_config(log_config) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .expect("install query log otlp pipeline"); + OpenTelemetryOTLPLogWriter { logger } +} + +impl log::Log for OpenTelemetryOTLPLogWriter { + fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool { + // we handle level and target filter with fern + true + } + + fn log(&self, record: &log::Record<'_>) { + let builder = opentelemetry::logs::LogRecord::builder() + .with_observed_timestamp(SystemTime::now()) + .with_severity_number(map_severity_to_otel_severity(record.level())) + .with_severity_text(record.level().as_str()) + .with_body(AnyValue::from(record.args().to_string())); + self.logger.emit(builder.build()) + } + + fn flush(&self) { + match self.logger.provider() { + Some(provider) => { + let result = provider.force_flush(); + for r in result { + if let Err(e) = r { + eprintln!("flush log failed: {}", e); + } + } + } + None => { + eprintln!("flush log failed: logger provider is None"); + } + } + } +} + +fn map_severity_to_otel_severity(level: log::Level) -> Severity { + match level { + log::Level::Error => Severity::Error, + log::Level::Warn => Severity::Warn, + log::Level::Info => Severity::Info, + log::Level::Debug => Severity::Debug, + log::Level::Trace => Severity::Trace, + } +} diff --git a/src/meta/process/src/examples.rs b/src/meta/process/src/examples.rs index 9321a66093309..67630c668fd0d 100644 --- a/src/meta/process/src/examples.rs +++ b/src/meta/process/src/examples.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; + use clap::Parser; use common_meta_raft_store::key_spaces::RaftStoreEntry; use common_meta_sled_store::init_sled_db; @@ -61,7 +63,11 @@ impl Default for RaftConfig { async fn upgrade_09() -> anyhow::Result<()> { let config = Config::parse(); - let _guards = init_logging("databend-meta-upgrade-09", &LogConfig::default()); + let _guards = init_logging( + "databend-meta-upgrade-09", + &LogConfig::default(), + BTreeMap::new(), + ); eprintln!("config: {}", pretty(&config)?); diff --git a/src/meta/raft-store/tests/it/testing.rs b/src/meta/raft-store/tests/it/testing.rs index 8af467a00ce7d..67922226e82bd 100644 --- a/src/meta/raft-store/tests/it/testing.rs +++ b/src/meta/raft-store/tests/it/testing.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::sync::Once; use common_base::base::GlobalSequence; @@ -67,7 +68,7 @@ fn setup_test() { let t = tempfile::tempdir().expect("create temp dir to sled db"); common_meta_sled_store::init_temp_sled_db(t); - let guards = init_logging("meta_unittests", &Config::new_testing()); + let guards = init_logging("meta_unittests", &Config::new_testing(), BTreeMap::new()); Box::leak(Box::new(guards)); }); } diff --git a/src/meta/service/src/configs/outer_v0.rs b/src/meta/service/src/configs/outer_v0.rs index 9f8441a9a83f2..98713ff9df892 100644 --- a/src/meta/service/src/configs/outer_v0.rs +++ b/src/meta/service/src/configs/outer_v0.rs @@ -22,6 +22,7 @@ use common_meta_raft_store::config::RaftConfig as InnerRaftConfig; use common_meta_types::MetaStartupError; use common_tracing::Config as InnerLogConfig; use common_tracing::FileConfig as InnerFileLogConfig; +use common_tracing::OTLPConfig; use common_tracing::QueryLogConfig; use common_tracing::StderrConfig as InnerStderrLogConfig; use common_tracing::TracingConfig; @@ -589,10 +590,8 @@ impl Into for LogConfig { InnerLogConfig { file: self.file.into(), stderr: self.stderr.into(), - query: QueryLogConfig { - on: false, - dir: "".to_string(), - }, + otlp: OTLPConfig::default(), + query: QueryLogConfig::default(), tracing: TracingConfig::default(), } } diff --git a/src/meta/service/tests/it/testing.rs b/src/meta/service/tests/it/testing.rs index 57f9d6fb0de7a..beb630c35a35b 100644 --- a/src/meta/service/tests/it/testing.rs +++ b/src/meta/service/tests/it/testing.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::sync::Once; use common_base::base::tokio; @@ -57,7 +58,7 @@ fn setup_test() { let t = tempfile::tempdir().expect("create temp dir to sled db"); common_meta_sled_store::init_temp_sled_db(t); - let guards = init_logging("meta_unittests", &Config::new_testing()); + let guards = init_logging("meta_unittests", &Config::new_testing(), BTreeMap::new()); Box::leak(Box::new(guards)); }); } diff --git a/src/meta/sled-store/tests/it/testing/mod.rs b/src/meta/sled-store/tests/it/testing/mod.rs index 6cea072e8abf8..7e821788f82c7 100644 --- a/src/meta/sled-store/tests/it/testing/mod.rs +++ b/src/meta/sled-store/tests/it/testing/mod.rs @@ -15,6 +15,7 @@ pub mod fake_key_spaces; pub mod fake_state_machine_meta; +use std::collections::BTreeMap; use std::sync::Once; use common_base::base::GlobalSequence; @@ -66,7 +67,7 @@ fn setup_test() { let t = tempfile::tempdir().expect("create temp dir to sled db"); common_meta_sled_store::init_temp_sled_db(t); - let guards = init_logging("meta_unittests", &Config::new_testing()); + let guards = init_logging("meta_unittests", &Config::new_testing(), BTreeMap::new()); Box::leak(Box::new(guards)); }); } diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index e04a4d25dff1c..d9e0bf3141f8c 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -44,6 +44,7 @@ use common_meta_app::tenant::TenantQuota; use common_storage::StorageConfig as InnerStorageConfig; use common_tracing::Config as InnerLogConfig; use common_tracing::FileConfig as InnerFileLogConfig; +use common_tracing::OTLPConfig as InnerOTLPLogConfig; use common_tracing::QueryLogConfig as InnerQueryLogConfig; use common_tracing::StderrConfig as InnerStderrLogConfig; use common_tracing::TracingConfig as InnerTracingConfig; @@ -1792,6 +1793,9 @@ pub struct LogConfig { #[clap(flatten)] pub stderr: StderrLogConfig, + #[clap(flatten)] + pub otlp: OTLPLogConfig, + #[clap(flatten)] pub query: QueryLogConfig, @@ -1838,14 +1842,22 @@ impl TryInto for LogConfig { file.dir = self.dir.to_string(); } + let otlp: InnerOTLPLogConfig = self.otlp.try_into()?; + if otlp.on && otlp.endpoint.is_empty() { + return Err(ErrorCode::InvalidConfig( + "`endpoint` must be set when `otlp.on` is true".to_string(), + )); + } + let mut query: InnerQueryLogConfig = self.query.try_into()?; - if query.dir.is_empty() { + if query.on && query.dir.is_empty() && query.otlp_endpoint.is_empty() { if file.dir.is_empty() { return Err(ErrorCode::InvalidConfig( "`dir` or `file.dir` must be set when `query.dir` is empty".to_string(), )); + } else { + query.dir = format!("{}/query-details", &file.dir); } - query.dir = format!("{}/query-details", &file.dir); } let tracing: InnerTracingConfig = self.tracing.try_into()?; @@ -1853,6 +1865,7 @@ impl TryInto for LogConfig { Ok(InnerLogConfig { file, stderr: self.stderr.try_into()?, + otlp, query, tracing, }) @@ -1866,6 +1879,7 @@ impl From for LogConfig { dir: inner.file.dir.clone(), file: inner.file.into(), stderr: inner.stderr.into(), + otlp: inner.otlp.into(), query: inner.query.into(), tracing: inner.tracing.into(), @@ -1988,22 +2002,76 @@ impl From for StderrLogConfig { } } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)] +#[serde(default)] +pub struct OTLPLogConfig { + #[clap(long = "log-otlp-on", value_name = "VALUE", default_value = "false", action = ArgAction::Set, num_args = 0..=1, require_equals = true, default_missing_value = "true")] + #[serde(rename = "on")] + pub otlp_on: bool, + + /// Log level + #[clap(long = "log-otlp-level", value_name = "VALUE", default_value = "INFO")] + #[serde(rename = "level")] + pub otlp_level: String, + + /// Log OpenTelemetry OTLP endpoint + #[clap( + long = "log-otlp-endpoint", + value_name = "VALUE", + default_value = "http://127.0.0.1:4317" + )] + #[serde(rename = "endpoint")] + pub otlp_endpoint: String, +} + +impl Default for OTLPLogConfig { + fn default() -> Self { + InnerOTLPLogConfig::default().into() + } +} + +impl TryInto for OTLPLogConfig { + type Error = ErrorCode; + + fn try_into(self) -> Result { + Ok(InnerOTLPLogConfig { + on: self.otlp_on, + level: self.otlp_level, + endpoint: self.otlp_endpoint, + }) + } +} + +impl From for OTLPLogConfig { + fn from(inner: InnerOTLPLogConfig) -> Self { + Self { + otlp_on: inner.on, + otlp_level: inner.level, + otlp_endpoint: inner.endpoint, + } + } +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)] #[serde(default)] pub struct QueryLogConfig { - #[clap(long = "log-query-on", value_name = "VALUE", default_value = "true", action = ArgAction::Set, num_args = 0..=1, require_equals = true, default_missing_value = "true")] + #[clap(long = "log-query-on", value_name = "VALUE", default_value = "false", action = ArgAction::Set, num_args = 0..=1, require_equals = true, default_missing_value = "true")] #[serde(rename = "on")] pub log_query_on: bool, /// Query Log file dir + #[clap(long = "log-query-dir", value_name = "VALUE", default_value = "")] + #[serde(rename = "dir")] + pub log_query_dir: String, + + /// Query Log OpenTelemetry OTLP endpoint #[clap( - long = "log-query-dir", + long = "log-query-otlp-endpoint", value_name = "VALUE", - default_value = "", - help = "Default to /query-details" + default_value = "" )] - #[serde(rename = "dir")] - pub log_query_dir: String, + #[serde(rename = "otlp_endpoint")] + pub log_query_otlp_endpoint: String, } impl Default for QueryLogConfig { @@ -2019,6 +2087,7 @@ impl TryInto for QueryLogConfig { Ok(InnerQueryLogConfig { on: self.log_query_on, dir: self.log_query_dir, + otlp_endpoint: self.log_query_otlp_endpoint, }) } } @@ -2028,6 +2097,7 @@ impl From for QueryLogConfig { Self { log_query_on: inner.on, log_query_dir: inner.dir, + log_query_otlp_endpoint: inner.otlp_endpoint, } } } @@ -2035,7 +2105,7 @@ impl From for QueryLogConfig { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args)] #[serde(default)] pub struct TracingConfig { - #[clap(long = "log-tracing-on", value_name = "VALUE", default_value = "false", action = ArgAction::Set, num_args = 0..=1, require_equals = true, default_missing_value = "false")] + #[clap(long = "log-tracing-on", value_name = "VALUE", default_value = "false", action = ArgAction::Set, num_args = 0..=1, require_equals = true, default_missing_value = "true")] #[serde(rename = "on")] pub tracing_on: bool, @@ -2052,7 +2122,7 @@ pub struct TracingConfig { #[clap( long = "log-tracing-otlp-endpoint", value_name = "VALUE", - default_value = "http://localhost:4317" + default_value = "http://127.0.0.1:4317" )] #[serde(rename = "otlp_endpoint")] pub tracing_otlp_endpoint: String, diff --git a/src/query/functions/src/scalars/array.rs b/src/query/functions/src/scalars/array.rs index 83189facc8464..79bee6d813995 100644 --- a/src/query/functions/src/scalars/array.rs +++ b/src/query/functions/src/scalars/array.rs @@ -257,6 +257,38 @@ pub fn register(registry: &mut FunctionRegistry) { ), ); + registry + .register_passthrough_nullable_1_arg::>>, ArrayType>, _, _>( + "array_flatten", + |_, _| FunctionDomain::Full, + vectorize_1_arg::>>, ArrayType>>( + |a, b| { + let mut builder = ColumnBuilder::with_capacity(&b.generics[0], a.len()); + for a in a.iter() { + builder.append_column(&a); + } + builder.build() + } + ), + ); + + registry + .register_passthrough_nullable_2_arg::, StringType, StringType, _, _>( + "array_to_string", + |_, _, _| FunctionDomain::Full, + vectorize_with_builder_2_arg::, StringType, StringType>( + |lhs, rhs, output, _| { + for (i, d) in lhs.iter().enumerate() { + if i != 0 { + output.put_slice(rhs); + } + output.put_slice(d); + } + output.commit_row(); + }, + ), + ); + registry .register_passthrough_nullable_2_arg::( "slice", diff --git a/src/query/functions/tests/it/scalars/testdata/function_list.txt b/src/query/functions/tests/it/scalars/testdata/function_list.txt index 295fe25e30328..0f74dd0104809 100644 --- a/src/query/functions/tests/it/scalars/testdata/function_list.txt +++ b/src/query/functions/tests/it/scalars/testdata/function_list.txt @@ -97,6 +97,8 @@ Functions overloads: 1 array_distinct(Array(Nothing) NULL) :: Array(Nothing) NULL 2 array_distinct(Array(T0)) :: Array(T0) 3 array_distinct(Array(T0) NULL) :: Array(T0) NULL +0 array_flatten(Array(Array(T0))) :: Array(T0) +1 array_flatten(Array(Array(T0)) NULL) :: Array(T0) NULL 0 array_indexof(NULL, NULL) :: NULL 1 array_indexof(Array(T0), T0) :: UInt64 2 array_indexof(Array(T0) NULL, T0 NULL) :: UInt64 NULL @@ -135,6 +137,8 @@ Functions overloads: 0 array_stddev_pop FACTORY 0 array_stddev_samp FACTORY 0 array_sum FACTORY +0 array_to_string(Array(String), String) :: String +1 array_to_string(Array(String) NULL, String NULL) :: String NULL 0 array_unique(Array(Nothing)) :: UInt64 1 array_unique(Array(Nothing) NULL) :: UInt64 NULL 2 array_unique(Array(T0)) :: UInt64 diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 2669668dae2ce..1e102cc78cb3d 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::sync::Arc; use common_base::base::GlobalInstance; @@ -70,7 +71,12 @@ impl GlobalServices { GlobalConfig::init(config.clone())?; // 2. log init. - GlobalLogger::init(&app_name_shuffle, &config.log); + let mut log_labels = BTreeMap::new(); + log_labels.insert("service".to_string(), "databend-query".to_string()); + log_labels.insert("tenant_id".to_string(), config.query.tenant_id.clone()); + log_labels.insert("cluster_id".to_string(), config.query.cluster_id.clone()); + log_labels.insert("node_id".to_string(), config.query.node_id.clone()); + GlobalLogger::init(&app_name_shuffle, &config.log, log_labels); // 3. runtime init. GlobalIORuntime::init(config.storage.num_cpus as usize)?; diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index a3219af6b5302..0e85c450122e3 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -15,6 +15,7 @@ use std::collections::HashSet; use std::sync::Arc; +use common_catalog::plan::DataSourceInfo; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; @@ -212,6 +213,40 @@ impl AccessChecker for PrivilegeAccess { let metadata = metadata.read().clone(); for table in metadata.tables() { + if table.is_source_of_stage() { + match table.table().get_data_source_info() { + DataSourceInfo::StageSource(stage_info) => { + self + .validate_access( + &GrantObject::Stage(stage_info.stage_info.stage_name.clone()), + vec![UserPrivilegeType::Read], + false, + ) + .await?; + } + DataSourceInfo::Parquet2Source(stage_info) => { + self + .validate_access( + &GrantObject::Stage(stage_info.stage_info.stage_name.clone()), + vec![UserPrivilegeType::Read], + false, + ) + .await?; + } + DataSourceInfo::ParquetSource(stage_info) => { + self + .validate_access( + &GrantObject::Stage(stage_info.stage_info.stage_name.clone()), + vec![UserPrivilegeType::Read], + false, + ) + .await?; + } + DataSourceInfo::TableSource(_) | DataSourceInfo::ResultScanSource(_) => {} + } + } + + if table.is_source_of_view() { continue; } @@ -832,7 +867,16 @@ impl AccessChecker for PrivilegeAccess { let from = plan.from.clone(); return self.check(ctx, &from).await; } - + Plan::RemoveStage(plan) => { + let stage_name = &plan.stage.stage_name; + self + .validate_access( + &GrantObject::Stage(stage_name.clone()), + vec![UserPrivilegeType::Write], + false, + ) + .await?; + } Plan::CreateShareEndpoint(_) | Plan::ShowShareEndpoint(_) | Plan::DropShareEndpoint(_) @@ -845,7 +889,6 @@ impl AccessChecker for PrivilegeAccess { | Plan::DropCatalog(_) | Plan::CreateStage(_) | Plan::DropStage(_) - | Plan::RemoveStage(_) | Plan::CreateFileFormat(_) | Plan::DropFileFormat(_) | Plan::ShowFileFormats(_) diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index cfbcd9f54a981..a01d9e8c6eab3 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -267,7 +267,7 @@ async fn query_cancel_handler( let http_query_manager = HttpQueryManager::instance(); match http_query_manager.get_query(&query_id).await { Some(query) => { - query.kill().await; + query.kill("http query cancel by handler").await; http_query_manager.remove_query(&query_id).await; StatusCode::OK } diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 6b1971769f640..7eb110e953a39 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -486,16 +486,11 @@ impl HttpQuery { } #[async_backtrace::framed] - pub async fn kill(&self) { + pub async fn kill(&self, reason: &str) { // the query will be removed from the query manager before the session is dropped. self.detach().await; - Executor::stop( - &self.state, - Err(ErrorCode::AbortedQuery("killed by http")), - true, - ) - .await; + Executor::stop(&self.state, Err(ErrorCode::AbortedQuery(reason)), true).await; } #[async_backtrace::framed] diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs index 7beb5b1b549c8..5efcba8ea1445 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs @@ -91,7 +91,7 @@ impl HttpQueryManager { warn!("{msg}, but fail to remove"); } else { warn!("{msg}"); - query.kill().await; + query.kill(&msg).await; }; break; } diff --git a/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs b/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs index 41d804cd12091..e390b5d85703c 100644 --- a/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs +++ b/src/query/service/src/table_functions/infer_schema/infer_schema_table.rs @@ -181,6 +181,14 @@ impl AsyncSource for InferSchemaSource { let (stage_info, path) = resolve_stage_location(&self.ctx, &self.args_parsed.location).await?; + let visibility_checker = self.ctx.get_visibility_checker().await?; + if !visibility_checker.check_stage_read_visibility(&stage_info.stage_name) { + return Err(ErrorCode::PermissionDenied(format!( + "Permission denied, privilege READ is required on stage {} for user {}", + stage_info.stage_name.clone(), + &self.ctx.get_current_user()?.identity(), + ))); + } let files_info = StageFilesInfo { path: path.clone(), ..self.args_parsed.files_info.clone() diff --git a/src/query/service/src/table_functions/inspect_parquet/inspect_parquet_table.rs b/src/query/service/src/table_functions/inspect_parquet/inspect_parquet_table.rs index 1aa86b756a1db..704d0c27172b2 100644 --- a/src/query/service/src/table_functions/inspect_parquet/inspect_parquet_table.rs +++ b/src/query/service/src/table_functions/inspect_parquet/inspect_parquet_table.rs @@ -210,6 +210,14 @@ impl AsyncSource for InspectParquetSource { self.is_finished = true; let uri = self.uri.strip_prefix('@').unwrap().to_string(); let (stage_info, path) = resolve_stage_location(&self.ctx, &uri).await?; + let visibility_checker = self.ctx.get_visibility_checker().await?; + if !visibility_checker.check_stage_read_visibility(&stage_info.stage_name) { + return Err(ErrorCode::PermissionDenied(format!( + "Permission denied, privilege READ is required on stage {} for user {}", + stage_info.stage_name.clone(), + &self.ctx.get_current_user()?.identity(), + ))); + } let operator = init_stage_operator(&stage_info)?; diff --git a/src/query/service/src/table_functions/list_stage/list_stage_table.rs b/src/query/service/src/table_functions/list_stage/list_stage_table.rs index 9f9e5503b8358..c7d9b43cc5fef 100644 --- a/src/query/service/src/table_functions/list_stage/list_stage_table.rs +++ b/src/query/service/src/table_functions/list_stage/list_stage_table.rs @@ -23,6 +23,7 @@ use common_catalog::table::Table; use common_catalog::table_args::TableArgs; use common_catalog::table_context::TableContext; use common_catalog::table_function::TableFunction; +use common_exception::ErrorCode; use common_exception::Result; use common_expression::types::NumberDataType; use common_expression::types::StringType; @@ -184,6 +185,14 @@ impl AsyncSource for ListStagesSource { let (stage_info, path) = resolve_stage_location(&self.ctx, &self.args_parsed.location).await?; + let visibility_checker = self.ctx.get_visibility_checker().await?; + if !visibility_checker.check_stage_read_visibility(&stage_info.stage_name) { + return Err(ErrorCode::PermissionDenied(format!( + "Permission denied, privilege READ is required on stage {} for user {}", + stage_info.stage_name.clone(), + &self.ctx.get_current_user()?.identity(), + ))); + } let op = StageTable::get_op(&stage_info)?; let files_info = StageFilesInfo { diff --git a/src/query/service/src/table_functions/table_function_factory.rs b/src/query/service/src/table_functions/table_function_factory.rs index e10f59c0c99b1..8fc9bd4ccf8cf 100644 --- a/src/query/service/src/table_functions/table_function_factory.rs +++ b/src/query/service/src/table_functions/table_function_factory.rs @@ -21,6 +21,7 @@ use common_exception::Result; use common_meta_types::MetaId; use common_storages_fuse::table_functions::FuseColumnTable; use common_storages_fuse::table_functions::FuseEncodingTable; +use common_storages_stream::stream_status_table_func::StreamStatusTable; use itertools::Itertools; use parking_lot::RwLock; @@ -137,6 +138,11 @@ impl TableFunctionFactory { (next_id(), Arc::new(ClusteringInformationTable::create)), ); + creators.insert( + "stream_status".to_string(), + (next_id(), Arc::new(StreamStatusTable::create)), + ); + creators.insert( "sync_crash_me".to_string(), (next_id(), Arc::new(SyncCrashMeTable::create)), diff --git a/src/query/service/tests/it/sql/planner/format/mod.rs b/src/query/service/tests/it/sql/planner/format/mod.rs index 87d15c80dc16c..6379e13610dc0 100644 --- a/src/query/service/tests/it/sql/planner/format/mod.rs +++ b/src/query/service/tests/it/sql/planner/format/mod.rs @@ -85,6 +85,7 @@ fn test_format() { None, false, false, + false, ); let col1 = metadata.add_base_table_column( "col1".to_string(), diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 3de6c37b01d04..a25ab4876bc64 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -28,8 +28,12 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'log' | 'log_dir' | 'null' | '' | | 'log' | 'log_level' | 'null' | '' | | 'log' | 'log_query_enabled' | 'null' | '' | -| 'log' | 'query.dir' | './.databend/logs/query-details' | '' | -| 'log' | 'query.on' | 'true' | '' | +| 'log' | 'otlp.endpoint' | 'http://127.0.0.1:4317' | '' | +| 'log' | 'otlp.level' | 'INFO' | '' | +| 'log' | 'otlp.on' | 'false' | '' | +| 'log' | 'query.dir' | '' | '' | +| 'log' | 'query.on' | 'false' | '' | +| 'log' | 'query.otlp_endpoint' | '' | '' | | 'log' | 'query_enabled' | 'null' | '' | | 'log' | 'stderr.format' | 'text' | '' | | 'log' | 'stderr.level' | 'WARN' | '' | diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index b6b3e29dbda6d..b7f226e61024a 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -148,6 +148,7 @@ impl Binder { None, false, false, + false, ); self.bind_base_table(bind_context, database, table_index) @@ -283,6 +284,7 @@ impl Binder { table_alias_name, false, false, + false, ); let (s_expr, mut new_bind_context) = self.bind_query(&mut new_bind_context, query).await?; @@ -317,6 +319,7 @@ impl Binder { table_alias_name, bind_context.view_info.is_some(), bind_context.planning_agg_index, + false, ); let (s_expr, mut bind_context) = self @@ -532,6 +535,7 @@ impl Binder { table_alias_name, false, false, + false, ); let (s_expr, mut bind_context) = self @@ -598,6 +602,7 @@ impl Binder { table_alias_name, false, false, + false, ); let (s_expr, mut bind_context) = self @@ -870,6 +875,7 @@ impl Binder { table_alias_name, false, false, + true, ); let (s_expr, mut bind_context) = self diff --git a/src/query/sql/src/planner/dataframe.rs b/src/query/sql/src/planner/dataframe.rs index 1c8e65eedd755..4292befe526a6 100644 --- a/src/query/sql/src/planner/dataframe.rs +++ b/src/query/sql/src/planner/dataframe.rs @@ -96,6 +96,7 @@ impl Dataframe { None, false, false, + false, ); binder diff --git a/src/query/sql/src/planner/expression_parser.rs b/src/query/sql/src/planner/expression_parser.rs index 553d2d31c2180..69bf699ac2988 100644 --- a/src/query/sql/src/planner/expression_parser.rs +++ b/src/query/sql/src/planner/expression_parser.rs @@ -70,6 +70,7 @@ pub fn bind_one_table(table_meta: Arc) -> Result<(BindContext, Metada None, false, false, + false, ); let columns = metadata.read().columns_by_table_index(table_index); diff --git a/src/query/sql/src/planner/metadata.rs b/src/query/sql/src/planner/metadata.rs index b66e64b8ed726..e0b74afdff771 100644 --- a/src/query/sql/src/planner/metadata.rs +++ b/src/query/sql/src/planner/metadata.rs @@ -262,6 +262,7 @@ impl Metadata { self.agg_indexes.get(table).map(|v| v.as_slice()) } + #[allow(clippy::too_many_arguments)] pub fn add_table( &mut self, catalog: String, @@ -270,6 +271,7 @@ impl Metadata { table_alias_name: Option, source_of_view: bool, source_of_index: bool, + source_of_stage: bool, ) -> IndexType { let table_name = table_meta.name().to_string(); @@ -284,6 +286,7 @@ impl Metadata { alias_name: table_alias_name, source_of_view, source_of_index, + source_of_stage, }; self.tables.push(table_entry); let table_schema = table_meta.schema_with_stream(); @@ -394,6 +397,7 @@ pub struct TableEntry { /// If this table is bound to an index. source_of_index: bool, + source_of_stage: bool, table: Arc, } @@ -426,6 +430,7 @@ impl TableEntry { alias_name, source_of_view: false, source_of_index: false, + source_of_stage: false, } } @@ -464,6 +469,11 @@ impl TableEntry { self.source_of_view } + /// Return true if it is source from stage. + pub fn is_source_of_stage(&self) -> bool { + self.source_of_stage + } + /// Return true if it is bound for an index. pub fn is_source_of_index(&self) -> bool { self.source_of_index diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index 90895768afd33..f99d8d99876e0 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -17,7 +17,6 @@ use std::collections::VecDeque; use std::sync::Arc; use std::vec; -use chrono::Local; use common_ast::ast::BinaryOperator; use common_ast::ast::ColumnID; use common_ast::ast::Expr; @@ -348,8 +347,7 @@ impl<'a> TypeChecker<'a> { }, ]) .await?; - self.resolve_scalar_function_call(*span, "assume_not_null", vec![], vec![scalar]) - .await? + self.resolve_scalar_function_call(*span, "assume_not_null", vec![], vec![scalar])? } Expr::InList { @@ -471,8 +469,7 @@ impl<'a> TypeChecker<'a> { self.resolve_scalar_function_call(*span, "and", vec![], vec![ ge_func.clone(), le_func.clone(), - ]) - .await? + ])? } else { // Rewrite `expr NOT BETWEEN low AND high` // into `expr < low OR expr > high` @@ -483,8 +480,7 @@ impl<'a> TypeChecker<'a> { .resolve_binary_op(*span, &BinaryOperator::Gt, expr.as_ref(), high.as_ref()) .await?; - self.resolve_scalar_function_call(*span, "or", vec![], vec![lt_func, gt_func]) - .await? + self.resolve_scalar_function_call(*span, "or", vec![], vec![lt_func, gt_func])? } } @@ -696,10 +692,7 @@ impl<'a> TypeChecker<'a> { .await? } - Expr::Literal { span, lit } => { - let box (value, data_type) = self.resolve_literal(lit)?; - Box::new((ConstantExpr { span: *span, value }.into(), data_type)) - } + Expr::Literal { span, lit } => self.resolve_literal(*span, lit)?, Expr::FunctionCall { span, @@ -1238,7 +1231,7 @@ impl<'a> TypeChecker<'a> { #[inline] fn resolve_rows_offset(&self, expr: &Expr) -> Result { if let Expr::Literal { lit, .. } = expr { - let box (value, _) = self.resolve_literal(lit)?; + let box (value, _) = self.resolve_constant_literal(None, lit)?; match value { Scalar::Number(NumberScalar::UInt8(v)) => { return Ok(Scalar::Number(NumberScalar::UInt64(v as u64))); @@ -1260,6 +1253,24 @@ impl<'a> TypeChecker<'a> { .set_span(expr.span())) } + #[inline] + fn resolve_constant_literal( + &self, + span: Span, + literal: &common_ast::ast::Literal, + ) -> Result> { + let box (scalar, data_type) = self.resolve_literal(span, literal)?; + + if let ScalarExpr::ConstantExpr(scalar) = scalar { + Ok(Box::new((scalar.value, data_type))) + } else { + Err( + ErrorCode::SemanticError("Expect to have constant literal".to_string()) + .set_span(span), + ) + } + } + fn resolve_window_rows_frame(&self, frame: WindowFrame) -> Result { let units = match frame.units { WindowFrameUnits::Rows => WindowFuncFrameUnits::Rows, @@ -1693,7 +1704,10 @@ impl<'a> TypeChecker<'a> { // Check aggregate function let params = params .iter() - .map(|literal| self.resolve_literal(literal).map(|box (value, _)| value)) + .map(|literal| { + self.resolve_constant_literal(span, literal) + .map(|box (value, _)| value) + }) .collect::>>()?; self.in_aggregate_function = true; @@ -1827,12 +1841,10 @@ impl<'a> TypeChecker<'a> { }; self.resolve_scalar_function_call(span, &func_name, params, args) - .await } - #[async_backtrace::framed] - pub async fn resolve_scalar_function_call( - &mut self, + pub fn resolve_scalar_function_call( + &self, span: Span, func_name: &str, params: Vec, @@ -1916,19 +1928,16 @@ impl<'a> TypeChecker<'a> { .resolve_binary_op(span, &positive_op, left, right) .await?; self.resolve_scalar_function_call(span, "not", vec![], vec![positive]) - .await } BinaryOperator::SoundsLike => { // rewrite "expr1 SOUNDS LIKE expr2" to "SOUNDEX(expr1) = SOUNDEX(expr2)" let box (left, _) = self.resolve(left).await?; let box (right, _) = self.resolve(right).await?; - let (left, _) = *self - .resolve_scalar_function_call(span, "soundex", vec![], vec![left]) - .await?; - let (right, _) = *self - .resolve_scalar_function_call(span, "soundex", vec![], vec![right]) - .await?; + let (left, _) = + *self.resolve_scalar_function_call(span, "soundex", vec![], vec![left])?; + let (right, _) = + *self.resolve_scalar_function_call(span, "soundex", vec![], vec![right])?; self.resolve_scalar_function_call( span, @@ -1936,7 +1945,6 @@ impl<'a> TypeChecker<'a> { vec![], vec![left, right], ) - .await } BinaryOperator::Like => { // Convert `Like` to compare function , such as `p_type like PROMO%` will be converted to `p_type >= PROMO and p_type < PROMP` @@ -2051,7 +2059,6 @@ impl<'a> TypeChecker<'a> { arg_types.push(interval_type); self.resolve_scalar_function_call(span, &func_name, vec![], args) - .await } #[async_recursion::async_recursion] @@ -2210,6 +2217,7 @@ impl<'a> TypeChecker<'a> { "try_to_variant", "greatest", "least", + "stream_has_data", ] } @@ -2580,20 +2588,14 @@ impl<'a> TypeChecker<'a> { .resolve_function(span, "array", vec![], args) .await .ok()?; - Some( - self.resolve_scalar_function_call(span, "array_max", vec![], vec![array]) - .await, - ) + Some(self.resolve_scalar_function_call(span, "array_max", vec![], vec![array])) } ("least", args) => { let (array, _) = *self .resolve_function(span, "array", vec![], args) .await .ok()?; - Some( - self.resolve_scalar_function_call(span, "array_min", vec![], vec![array]) - .await, - ) + Some(self.resolve_scalar_function_call(span, "array_min", vec![], vec![array])) } _ => None, } @@ -2630,14 +2632,14 @@ impl<'a> TypeChecker<'a> { let args = vec![trim_source, trim_scalar]; self.resolve_scalar_function_call(span, func_name, vec![], args) - .await } /// Resolve literal values. pub fn resolve_literal( &self, + span: Span, literal: &common_ast::ast::Literal, - ) -> Result> { + ) -> Result> { let value = match literal { Literal::UInt64(value) => Scalar::Number(NumberScalar::UInt64(*value)), Literal::Decimal256 { @@ -2652,11 +2654,21 @@ impl<'a> TypeChecker<'a> { Literal::String(string) => Scalar::String(string.as_bytes().to_vec()), Literal::Boolean(boolean) => Scalar::Boolean(*boolean), Literal::Null => Scalar::Null, - Literal::CurrentTimestamp => Scalar::Timestamp(Local::now().timestamp_micros()), + Literal::CurrentTimestamp => { + return self.resolve_scalar_function_call( + None, + "current_timestamp", + vec![], + vec![], + ); + } }; let value = shrink_scalar(value); let data_type = value.as_ref().infer_data_type(); - Ok(Box::new((value, data_type))) + Ok(Box::new(( + ScalarExpr::ConstantExpr(ConstantExpr { span, value }), + data_type, + ))) } // TODO(leiysky): use an array builder function instead, since we should allow declaring @@ -2675,7 +2687,6 @@ impl<'a> TypeChecker<'a> { } self.resolve_scalar_function_call(span, "array", vec![], elems) - .await } #[async_recursion::async_recursion] @@ -2688,27 +2699,18 @@ impl<'a> TypeChecker<'a> { let mut keys = Vec::with_capacity(kvs.len()); let mut vals = Vec::with_capacity(kvs.len()); for (key_expr, val_expr) in kvs { - let box (key_arg, _data_type) = self.resolve_literal(key_expr)?; - keys.push( - ConstantExpr { - span, - value: key_arg, - } - .into(), - ); + let box (key_arg, _data_type) = self.resolve_literal(span, key_expr)?; + keys.push(key_arg); let box (val_arg, _data_type) = self.resolve(val_expr).await?; vals.push(val_arg); } - let box (key_arg, _data_type) = self - .resolve_scalar_function_call(span, "array", vec![], keys) - .await?; - let box (val_arg, _data_type) = self - .resolve_scalar_function_call(span, "array", vec![], vals) - .await?; + let box (key_arg, _data_type) = + self.resolve_scalar_function_call(span, "array", vec![], keys)?; + let box (val_arg, _data_type) = + self.resolve_scalar_function_call(span, "array", vec![], vals)?; let args = vec![key_arg, val_arg]; self.resolve_scalar_function_call(span, "map", vec![], args) - .await } #[async_recursion::async_recursion] @@ -2725,7 +2727,6 @@ impl<'a> TypeChecker<'a> { } self.resolve_scalar_function_call(span, "tuple", vec![], args) - .await } #[async_recursion::async_recursion] @@ -2762,7 +2763,6 @@ impl<'a> TypeChecker<'a> { }) .await?; self.resolve_scalar_function_call(span, "and", vec![], vec![new_left, new_right]) - .await } else { let name = op.to_func_name(); self.resolve_function(span, name.as_str(), vec![], &[left, right]) @@ -3121,12 +3121,7 @@ impl<'a> TypeChecker<'a> { .into(); continue; } - let box (path_value, _) = self.resolve_literal(&path_lit)?; - let path_scalar: ScalarExpr = ConstantExpr { - span, - value: path_value, - } - .into(); + let box (path_scalar, _) = self.resolve_literal(span, &path_lit)?; if let TableDataType::Array(inner_type) = table_data_type { table_data_type = *inner_type; } diff --git a/src/query/storages/stream/Cargo.toml b/src/query/storages/stream/Cargo.toml index 777ce21a86d3e..99782bc9d00ad 100644 --- a/src/query/storages/stream/Cargo.toml +++ b/src/query/storages/stream/Cargo.toml @@ -19,6 +19,7 @@ common-expression = { path = "../../expression" } common-meta-app = { path = "../../../meta/app" } common-metrics = { path = "../../../common/metrics" } common-pipeline-core = { path = "../../pipeline/core" } +common-pipeline-sources = { path = "../../pipeline/sources" } common-sql = { path = "../../sql" } common-storages-fuse = { path = "../fuse" } diff --git a/src/query/storages/stream/src/lib.rs b/src/query/storages/stream/src/lib.rs index b4e45c6d826f6..db8df38c77d83 100644 --- a/src/query/storages/stream/src/lib.rs +++ b/src/query/storages/stream/src/lib.rs @@ -12,5 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(impl_trait_in_assoc_type)] + pub mod stream_pruner; pub mod stream_table; + +pub mod stream_status_table_func; diff --git a/src/query/storages/stream/src/stream_status_table_func.rs b/src/query/storages/stream/src/stream_status_table_func.rs new file mode 100644 index 0000000000000..0f401fe47f582 --- /dev/null +++ b/src/query/storages/stream/src/stream_status_table_func.rs @@ -0,0 +1,244 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use common_catalog::plan::DataSourcePlan; +use common_catalog::plan::PartStatistics; +use common_catalog::plan::Partitions; +use common_catalog::plan::PushDownInfo; +use common_catalog::table::Table; +use common_catalog::table_args::TableArgs; +use common_catalog::table_context::TableContext; +use common_catalog::table_function::TableFunction; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::types::BooleanType; +use common_expression::DataBlock; +use common_expression::FromData; +use common_expression::TableDataType; +use common_expression::TableField; +use common_expression::TableSchema; +use common_expression::TableSchemaRefExt; +use common_meta_app::schema::TableIdent; +use common_meta_app::schema::TableInfo; +use common_meta_app::schema::TableMeta; +use common_pipeline_core::processors::OutputPort; +use common_pipeline_core::processors::ProcessorPtr; +use common_pipeline_core::Pipeline; +use common_pipeline_sources::AsyncSource; +use common_pipeline_sources::AsyncSourcer; +use common_storages_fuse::table_functions::string_literal; +use common_storages_fuse::table_functions::string_value; + +use crate::stream_table::StreamStatus; +use crate::stream_table::StreamTable; + +const STREAM_STATUS: &str = "stream_status"; + +pub struct StreamStatusTable { + table_info: TableInfo, + stream_name: String, +} + +impl StreamStatusTable { + pub fn create( + database_name: &str, + table_func_name: &str, + table_id: u64, + table_args: TableArgs, + ) -> Result> { + let args = table_args.expect_all_positioned(STREAM_STATUS, Some(1))?; + let stream_name = string_value(&args[0])?; + + let engine = STREAM_STATUS.to_owned(); + + let table_info = TableInfo { + ident: TableIdent::new(table_id, 0), + desc: format!("'{}'.'{}'", database_name, table_func_name), + name: table_func_name.to_string(), + meta: TableMeta { + schema: schema(), + engine, + ..Default::default() + }, + ..Default::default() + }; + + Ok(Arc::new(StreamStatusTable { + table_info, + stream_name, + })) + } +} + +#[async_trait::async_trait] +impl Table for StreamStatusTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + #[async_backtrace::framed] + async fn read_partitions( + &self, + _ctx: Arc, + _push_downs: Option, + _dry_run: bool, + ) -> Result<(PartStatistics, Partitions)> { + Ok((PartStatistics::default(), Partitions::default())) + } + + fn table_args(&self) -> Option { + Some(TableArgs::new_positioned(vec![string_literal( + self.stream_name.as_str(), + )])) + } + + fn read_data( + &self, + ctx: Arc, + _plan: &DataSourcePlan, + pipeline: &mut Pipeline, + _put_cache: bool, + ) -> Result<()> { + pipeline.add_source( + |output| { + StreamStatusDataSource::create(ctx.clone(), output, self.stream_name.to_owned()) + }, + 1, + )?; + + Ok(()) + } +} + +impl TableFunction for StreamStatusTable { + fn function_name(&self) -> &str { + self.name() + } + + fn as_table<'a>(self: Arc) -> Arc + where Self: 'a { + self + } +} + +struct StreamStatusDataSource { + ctx: Arc, + finish: bool, + cat_name: String, + db_name: String, + stream_name: String, +} + +impl StreamStatusDataSource { + pub fn create( + ctx: Arc, + output: Arc, + stream_name: String, + ) -> Result { + let (cat_name, db_name, stream_name) = + Self::extract_fully_qualified_stream_name(ctx.as_ref(), stream_name.as_str())?; + AsyncSourcer::create(ctx.clone(), output, StreamStatusDataSource { + ctx, + finish: false, + cat_name, + db_name, + stream_name, + }) + } + + fn extract_fully_qualified_stream_name( + ctx: &dyn TableContext, + target: &str, + ) -> Result<(String, String, String)> { + let current_cat_name; + let current_db_name; + let stream_name_vec: Vec<&str> = target.split('.').collect(); + let (cat, db, stream) = { + match stream_name_vec.len() { + 1 => { + current_cat_name = ctx.get_current_catalog(); + current_db_name = ctx.get_current_database(); + ( + current_cat_name, + current_db_name, + stream_name_vec[0].to_owned(), + ) + } + 2 => { + current_cat_name = ctx.get_current_catalog(); + ( + current_cat_name, + stream_name_vec[0].to_owned(), + stream_name_vec[1].to_owned(), + ) + } + 3 => ( + stream_name_vec[0].to_owned(), + stream_name_vec[1].to_owned(), + stream_name_vec[2].to_owned(), + ), + _ => { + return Err(ErrorCode::BadArguments( + "Invalid stream name. Use the format '[catalog.][database.]stream'", + )); + } + } + }; + Ok((cat, db, stream)) + } +} + +#[async_trait::async_trait] +impl AsyncSource for StreamStatusDataSource { + const NAME: &'static str = "stream_status"; + + #[async_trait::unboxed_simple] + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + if self.finish { + return Ok(None); + } + + self.finish = true; + let tenant_id = self.ctx.get_tenant(); + let tbl = self + .ctx + .get_catalog(&self.cat_name) + .await? + .get_table(tenant_id.as_str(), &self.db_name, &self.stream_name) + .await?; + + let tbl = StreamTable::try_from_table(tbl.as_ref())?; + + let has_data = matches!( + tbl.check_stream_status(self.ctx.clone()).await?, + StreamStatus::MayHaveData + ); + + Ok(Some(DataBlock::new_from_columns(vec![ + BooleanType::from_data(vec![has_data]), + ]))) + } +} + +fn schema() -> Arc { + TableSchemaRefExt::create(vec![TableField::new("has_data", TableDataType::Boolean)]) +} diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index 47e48731e9e4c..7b9052d12495b 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::collections::HashSet; +use std::fmt::Display; use std::str::FromStr; use std::sync::Arc; use std::time::Instant; @@ -59,6 +60,11 @@ pub enum StreamMode { AppendOnly, } +pub enum StreamStatus { + MayHaveData, + NoData, +} + impl FromStr for StreamMode { type Err = ErrorCode; fn from_str(s: &str) -> Result { @@ -72,11 +78,11 @@ impl FromStr for StreamMode { } } -impl ToString for StreamMode { - fn to_string(&self) -> String { - match self { +impl Display for StreamMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", match self { StreamMode::AppendOnly => MODE_APPEND_ONLY.to_string(), - } + }) } } @@ -304,6 +310,17 @@ impl StreamTable { pruning_stats, ) } + + #[minitrace::trace] + pub async fn check_stream_status(&self, ctx: Arc) -> Result { + let base_table = self.source_table(ctx).await?; + let status = if base_table.get_table_info().ident.seq == self.table_version { + StreamStatus::NoData + } else { + StreamStatus::MayHaveData + }; + Ok(status) + } } #[async_trait::async_trait] diff --git a/src/query/users/Cargo.toml b/src/query/users/Cargo.toml index 2cdb2fa2a7aa4..1a8948f94c1dc 100644 --- a/src/query/users/Cargo.toml +++ b/src/query/users/Cargo.toml @@ -31,6 +31,7 @@ async-backtrace = { workspace = true } base64 = "0.21" chrono = { workspace = true } cidr = { version = "0.2.2" } +enumflags2 = { version = "0.7.7", features = ["serde"] } jwt-simple = "0.11" log = { workspace = true } p256 = "0.13" diff --git a/src/query/users/src/visibility_checker.rs b/src/query/users/src/visibility_checker.rs index 237e27c8be84b..02e4f6e666c58 100644 --- a/src/query/users/src/visibility_checker.rs +++ b/src/query/users/src/visibility_checker.rs @@ -18,6 +18,8 @@ use common_meta_app::principal::GrantObject; use common_meta_app::principal::RoleInfo; use common_meta_app::principal::UserGrantSet; use common_meta_app::principal::UserInfo; +use common_meta_app::principal::UserPrivilegeType; +use enumflags2::BitFlags; /// GrantObjectVisibilityChecker is used to check whether a user has the privilege to access a /// database or table. @@ -28,7 +30,8 @@ pub struct GrantObjectVisibilityChecker { granted_tables: HashSet<(String, String, String)>, extra_databases: HashSet<(String, String)>, granted_udfs: HashSet, - granted_stages: HashSet, + granted_write_stages: HashSet, + granted_read_stages: HashSet, } impl GrantObjectVisibilityChecker { @@ -37,7 +40,8 @@ impl GrantObjectVisibilityChecker { let mut granted_databases = HashSet::new(); let mut granted_tables = HashSet::new(); let mut granted_udfs = HashSet::new(); - let mut granted_stages = HashSet::new(); + let mut granted_write_stages = HashSet::new(); + let mut granted_read_stages = HashSet::new(); let mut extra_databases = HashSet::new(); let mut grant_sets: Vec<&UserGrantSet> = vec![&user.grants]; @@ -67,7 +71,18 @@ impl GrantObjectVisibilityChecker { granted_udfs.insert(udf.to_string()); } GrantObject::Stage(stage) => { - granted_stages.insert(stage.to_string()); + if ent + .privileges() + .contains(BitFlags::from(UserPrivilegeType::Write)) + { + granted_write_stages.insert(stage.to_string()); + } + if ent + .privileges() + .contains(BitFlags::from(UserPrivilegeType::Read)) + { + granted_read_stages.insert(stage.to_string()); + } } } } @@ -79,7 +94,8 @@ impl GrantObjectVisibilityChecker { granted_tables, extra_databases, granted_udfs, - granted_stages, + granted_write_stages, + granted_read_stages, } } @@ -88,7 +104,18 @@ impl GrantObjectVisibilityChecker { return true; } - if self.granted_stages.contains(stage) { + if self.granted_read_stages.contains(stage) || self.granted_write_stages.contains(stage) { + return true; + } + false + } + + pub fn check_stage_read_visibility(&self, stage: &str) -> bool { + if self.granted_global { + return true; + } + + if self.granted_read_stages.contains(stage) { return true; } false diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test b/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test index 8a12f0a70419a..42a9ca65567a0 100644 --- a/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test +++ b/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test @@ -141,7 +141,18 @@ statement error 1006 CREATE TABLE db2.test5(a Varchar null, y Varchar null) ENGINE=fuse AS SELECT a, b, a FROM db1.test1 statement ok -create table db2.test55(id Int8, created timestamp DEFAULT CURRENT_TIMESTAMP) +create table db2.test55(id Int8, created timestamp not null DEFAULT CURRENT_TIMESTAMP) + +statement ok +insert into db2.test55(id) select number % 3 from numbers(1000) + +statement ok +insert into db2.test55(id) select number % 3 from numbers(1000) + +query I +select count(distinct created) > 1 from db2.test55; +---- +1 statement error 1065 create table db2.test6(id Int8, created timestamp DEFAULT today() + a) diff --git a/tests/sqllogictests/suites/ee/01_ee_system/01_0004_stream_status.test b/tests/sqllogictests/suites/ee/01_ee_system/01_0004_stream_status.test new file mode 100644 index 0000000000000..006c2171834da --- /dev/null +++ b/tests/sqllogictests/suites/ee/01_ee_system/01_0004_stream_status.test @@ -0,0 +1,86 @@ +## Copyright 2023 Databend Cloud +## +## Licensed under the Elastic License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## https://www.elastic.co/licensing/elastic-license +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. + + +# Although the stream_status function is not a EE function, it is dependent on the EE feature change_tracking. + +statement ok +DROP DATABASE IF EXISTS test_stream_status + +statement ok +CREATE DATABASE test_stream_status + +statement ok +USE test_stream_status + +statement error 1025 +call system$stream_status('not_exist') + +statement error 1025 +select * from stream_status('not_exist') + +statement ok +create table t(a int) + +statement ok +alter table t set options(change_tracking=true) + +statement ok +create stream if not exists s on table t + +query I +select * from stream_status('s') +---- +0 + +query I +select * from stream_status('test_stream_status.s') +---- +0 + +query I +select * from stream_status('default.test_stream_status.s') +---- +0 + +query I +call system$stream_status('s') +---- +0 + +query I +call system$stream_status('test_stream_status.s') +---- +0 + +query I +call system$stream_status('default.test_stream_status.s') +---- +0 + +statement ok +insert into t values(2) + +query I +select * from stream_status('s') +---- +1 + +query I +call system$stream_status('s') +---- +1 + +statement ok +DROP DATABASE IF EXISTS test_stream_status diff --git a/tests/sqllogictests/suites/query/02_function/02_0061_function_array.test b/tests/sqllogictests/suites/query/02_function/02_0061_function_array.test index 711651d5668f5..64b6f3d57fdce 100644 --- a/tests/sqllogictests/suites/query/02_function/02_0061_function_array.test +++ b/tests/sqllogictests/suites/query/02_function/02_0061_function_array.test @@ -251,5 +251,10 @@ select array_filter(col1, a -> a % 2 = 1), array_filter(col2, b -> b = 'x') from statement error 1065 select array_filter([1, 2], x -> x + 1) +query TT +select array_flatten( [ [1,2], [3,4] ] ), array_to_string(['open', 'ai'], ' love '); +---- +[1,2,3,4] open love ai + statement ok DROP DATABASE array_func_test diff --git a/tests/suites/0_stateless/18_rbac/00_0012_stage_priv.result b/tests/suites/0_stateless/18_rbac/00_0012_stage_priv.result index a32d505cacf9f..42bcf46d6febf 100644 --- a/tests/suites/0_stateless/18_rbac/00_0012_stage_priv.result +++ b/tests/suites/0_stateless/18_rbac/00_0012_stage_priv.result @@ -12,3 +12,12 @@ Error: APIError: ResponseError with 1063: Permission denied, privilege [Write] i 000 Error: APIError: ResponseError with 1063: Permission denied, privilege [Read] is required on STAGE presign_stage for user 'u1'@'%' with roles [public] 000 +Error: APIError: ResponseError with 1063: Permission denied, privilege [Write] is required on STAGE s3 for user 'u1'@'%' with roles [public] +Error: APIError: ResponseError with 1063: Permission denied, privilege [Read] is required on STAGE s3 for user 'u1'@'%' with roles [public] +Error: APIError: ResponseError with 1063: Permission denied, privilege READ is required on stage s3 for user 'u1'@'%' +Error: APIError: ResponseError with 1063: Permission denied, privilege READ is required on stage s3 for user 'u1'@'%' +Error: APIError: ResponseError with 1063: Permission denied, privilege READ is required on stage s3 for user 'u1'@'%' +2 +1 +1 +1 diff --git a/tests/suites/0_stateless/18_rbac/00_0012_stage_priv.sh b/tests/suites/0_stateless/18_rbac/00_0012_stage_priv.sh index 53fe535ac87ba..039493a7daf17 100755 --- a/tests/suites/0_stateless/18_rbac/00_0012_stage_priv.sh +++ b/tests/suites/0_stateless/18_rbac/00_0012_stage_priv.sh @@ -51,7 +51,7 @@ echo "copy into test_table from @s2 FILE_FORMAT = (type = CSV skip_header = 0) f echo "grant Read on stage s2 to 'u1'" | $BENDSQL_CLIENT_CONNECT echo "copy into test_table from @s2 FILE_FORMAT = (type = CSV skip_header = 0) force=true;" | $TEST_USER_CONNECT -echo "remove @s2;" | $BENDSQL_CLIENT_CONNECT +echo "remove @s2;" | $TEST_USER_CONNECT echo "remove @s1;" | $BENDSQL_CLIENT_CONNECT echo "drop STAGE s2;" | $BENDSQL_CLIENT_CONNECT echo "drop STAGE s1;" | $BENDSQL_CLIENT_CONNECT @@ -71,6 +71,31 @@ curl -s -w "%{http_code}\n" -X PUT -o /dev/null -H Content-Type:application/octe echo "revoke Read on stage presign_stage from 'u1'" | $BENDSQL_CLIENT_CONNECT curl -s -w "%{http_code}\n" -o /dev/null "`echo "PRESIGN @presign_stage/hello_word.txt" | $TEST_USER_CONNECT`" +echo "drop stage if exists s3" | $BENDSQL_CLIENT_CONNECT + +echo "create stage s3;" | $BENDSQL_CLIENT_CONNECT +echo "remove @s3;" | $TEST_USER_CONNECT +echo "grant write on stage s3 to u1" | $BENDSQL_CLIENT_CONNECT +echo "remove @s3;" | $TEST_USER_CONNECT +echo "copy into '@s3/a b' from (select 2);" | $TEST_USER_CONNECT + +echo "grant select on system.* to u1" | $BENDSQL_CLIENT_CONNECT + +echo "select * from @s3" | $TEST_USER_CONNECT +echo "select * from infer_schema(location => '@s3')" | $TEST_USER_CONNECT +echo "select * from list_stage(location => '@s3')" | $TEST_USER_CONNECT +echo "select * from inspect_parquet('@s3')" | $TEST_USER_CONNECT + +echo "grant read on stage s3 to u1" | $BENDSQL_CLIENT_CONNECT + +echo "select * from @s3" | $TEST_USER_CONNECT +echo "select 1 from infer_schema(location => '@s3')" | $TEST_USER_CONNECT +echo "select 1 from list_stage(location => '@s3')" | $TEST_USER_CONNECT +echo "select 1 from inspect_parquet('@s3')" | $TEST_USER_CONNECT + ## Drop table. echo "drop stage if exists presign_stage" | $BENDSQL_CLIENT_CONNECT +echo "drop stage if exists s3" | $BENDSQL_CLIENT_CONNECT echo "drop user u1" | $BENDSQL_CLIENT_CONNECT + +