diff --git a/Cargo.toml b/Cargo.toml index 0e1c1725baa8..a8ac8c7c6921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,7 +153,11 @@ arrow-udf-flight = "0.4" clap = { version = "4", features = ["cargo", "derive", "env"] } # Use a forked version which removes the dependencies on dynamo db to reduce # compile time and binary size. -deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] } +deltalake = { version = "0.20.1", features = [ + "s3", + "gcs", + "datafusion", +] } itertools = "0.13.0" jsonbb = "0.1.4" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" } diff --git a/proto/catalog.proto b/proto/catalog.proto index 5383104e9c0f..6d54903cc259 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -92,6 +92,9 @@ message StreamSourceInfo { // Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type. // For format and encode options. map format_encode_secret_refs = 16; + + // ref connection for schema registry + optional uint32 connection_id = 17; } message Source { @@ -123,6 +126,8 @@ message Source { uint32 associated_table_id = 12; } string definition = 13; + + // ref connection for connector optional uint32 connection_id = 14; optional uint64 initialized_at_epoch = 15; @@ -156,6 +161,9 @@ message SinkFormatDesc { optional plan_common.EncodeType key_encode = 4; // Secret used for format encode options. map secret_refs = 5; + + // ref connection for schema registry + optional uint32 connection_id = 6; } // the catalog of the sink. There are two kind of schema here. The full schema is all columns @@ -178,6 +186,8 @@ message Sink { uint32 owner = 11; map properties = 12; string definition = 13; + + // ref connection for connector optional uint32 connection_id = 14; optional uint64 initialized_at_epoch = 15; optional uint64 created_at_epoch = 16; @@ -226,6 +236,19 @@ message Subscription { SubscriptionState subscription_state = 19; } +message ConnectionParams { + enum ConnectionType { + CONNECTION_TYPE_UNSPECIFIED = 0; + CONNECTION_TYPE_KAFKA = 1; + CONNECTION_TYPE_ICEBERG = 2; + CONNECTION_TYPE_SCHEMA_REGISTRY = 3; + } + + ConnectionType connection_type = 1; + map properties = 2; + map secret_refs = 3; +} + message Connection { message PrivateLinkService { enum PrivateLinkProvider { @@ -246,6 +269,7 @@ message Connection { string name = 4; oneof info { PrivateLinkService private_link_service = 5 [deprecated = true]; + ConnectionParams connection_params = 7; } uint32 owner = 6; } diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 6467bd6e1d7e..0d4b7b806cda 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -424,7 +424,8 @@ message CreateConnectionRequest { uint32 database_id = 2; uint32 schema_id = 3; oneof payload { - PrivateLink private_link = 4; + PrivateLink private_link = 4 [deprecated = true]; + catalog.ConnectionParams connection_params = 6; } uint32 owner_id = 5; } diff --git a/src/connector/src/connector_common/mod.rs b/src/connector/src/connector_common/mod.rs index 57b614fdf548..8ddc1d1c5515 100644 --- a/src/connector/src/connector_common/mod.rs +++ b/src/connector/src/connector_common/mod.rs @@ -26,3 +26,16 @@ pub use common::{ mod iceberg; pub use iceberg::IcebergCommon; + +// #[derive(Debug, Clone, Deserialize)] +// pub enum ConnectionImpl { +// Kafka(KafkaConnection), +// } + +// macro_rules! impl_connection_enum { +// ($impl:expr, $inner_name:ident, $prop_type_name:ident, $body:expr) => { +// impl ConnectionImpl { +// pub fn get_ +// } +// }; +// } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 23f34eab9741..cc3855d4022c 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -221,6 +221,7 @@ impl SinkFormatDesc { options, key_encode, secret_refs: self.secret_refs.clone(), + connection_id: None, } } } diff --git a/src/ctl/src/cmd_impl/meta/connection.rs b/src/ctl/src/cmd_impl/meta/connection.rs index 5df81d301ea0..c7bd94b58de6 100644 --- a/src/ctl/src/cmd_impl/meta/connection.rs +++ b/src/ctl/src/cmd_impl/meta/connection.rs @@ -35,6 +35,13 @@ pub async fn list_connections(context: &CtlContext) -> anyhow::Result<()> { "PrivateLink: service_name: {}, endpoint_id: {}, dns_entries: {:?}", svc.service_name, svc.endpoint_id, svc.dns_entries, ), + Some(Info::ConnectionParams(params)) => { + format!( + "CONNECTION_PARAMS_{}: {}", + params.get_connection_type().unwrap().as_str_name(), + serde_json::to_string(¶ms.get_properties()).unwrap() + ) + } None => "None".to_string(), } ); diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index 03b2ff4203c5..a938328c46d8 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -30,12 +30,14 @@ impl ConnectionCatalog { pub fn connection_type(&self) -> &str { match &self.info { Info::PrivateLinkService(srv) => srv.get_provider().unwrap().as_str_name(), + Info::ConnectionParams(params) => params.get_connection_type().unwrap().as_str_name(), } } pub fn provider(&self) -> &str { match &self.info { Info::PrivateLinkService(_) => "PRIVATELINK", + Info::ConnectionParams(_) => panic!("ConnectionParams is not supported as provider."), } } } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs index 2af0b29b16f7..cb1243b65532 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_connections.rs @@ -28,6 +28,7 @@ struct RwConnection { type_: String, provider: String, acl: String, + connection_params: String, } #[system_catalog(table, "rw_catalog.rw_connections")] @@ -35,16 +36,31 @@ fn read_rw_connections(reader: &SysCatalogReaderImpl) -> Result { + rw_connection.provider = conn.provider().into(); + } + risingwave_pb::catalog::connection::Info::ConnectionParams(params) => { + rw_connection.connection_params = + serde_json::to_string(¶ms.get_properties()).unwrap(); + } + }; + + rw_connection }) }) .collect()) diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index d7ef3aa10b88..cbfe6979a9ef 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -15,7 +15,10 @@ use std::collections::BTreeMap; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; +use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION}; +use risingwave_pb::catalog::connection_params::ConnectionType; +use risingwave_pb::catalog::ConnectionParams; use risingwave_pb::ddl_service::create_connection_request; use risingwave_sqlparser::ast::CreateConnectionStatement; @@ -24,36 +27,58 @@ use crate::binder::Binder; use crate::error::ErrorCode::ProtocolError; use crate::error::{ErrorCode, Result, RwError}; use crate::handler::HandlerArgs; +use crate::session::SessionImpl; +use crate::utils::resolve_secret_ref_in_with_options; +use crate::WithOptions; pub(crate) const CONNECTION_TYPE_PROP: &str = "type"; #[inline(always)] fn get_connection_property_required( - with_properties: &BTreeMap, + with_properties: &mut BTreeMap, property: &str, ) -> Result { - with_properties - .get(property) - .map(|s| s.to_lowercase()) - .ok_or_else(|| { - RwError::from(ProtocolError(format!( - "Required property \"{property}\" is not provided" - ))) - }) + with_properties.remove(property).ok_or_else(|| { + RwError::from(ProtocolError(format!( + "Required property \"{property}\" is not provided" + ))) + }) } fn resolve_create_connection_payload( - with_properties: &BTreeMap, + with_properties: WithOptions, + session: &SessionImpl, ) -> Result { - let connection_type = get_connection_property_required(with_properties, CONNECTION_TYPE_PROP)?; - match connection_type.as_str() { - PRIVATELINK_CONNECTION => Err(RwError::from(ErrorCode::Deprecated( + if !with_properties.connection_ref().is_empty() { + return Err(RwError::from(ErrorCode::InvalidParameterValue( + "Connection reference is not allowed in options in CREATE CONNECTION".to_string(), + ))); + } + + let (mut props, secret_refs) = + resolve_secret_ref_in_with_options(with_properties, session)?.into_parts(); + let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?; + let connection_type = match connection_type.as_str() { + PRIVATELINK_CONNECTION => { + return Err(RwError::from(ErrorCode::Deprecated( "CREATE CONNECTION to Private Link".to_string(), "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_string(), - ))), - _ => Err(RwError::from(ProtocolError(format!( - "Connection type \"{connection_type}\" is not supported" - )))), - } + ))); + } + KAFKA_CONNECTOR => ConnectionType::Kafka, + ICEBERG_CONNECTOR => ConnectionType::Iceberg, + _ => { + return Err(RwError::from(ProtocolError(format!( + "Connection type \"{connection_type}\" is not supported" + )))); + } + }; + Ok(create_connection_request::Payload::ConnectionParams( + ConnectionParams { + connection_type: connection_type as i32, + properties: props.into_iter().collect(), + secret_refs: secret_refs.into_iter().collect(), + }, + )) } pub async fn handle_create_connection( @@ -79,8 +104,7 @@ pub async fn handle_create_connection( } let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; let with_properties = handler_args.with_options.clone().into_connector_props(); - - let create_connection_payload = resolve_create_connection_payload(&with_properties)?; + let create_connection_payload = resolve_create_connection_payload(with_properties, &session)?; let catalog_writer = session.catalog_writer()?; catalog_writer diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9c37799422a5..530ff1f2689a 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::rc::Rc; use std::sync::LazyLock; use anyhow::{anyhow, Context}; use either::Either; use itertools::Itertools; -use maplit::{convert_args, hashmap}; +use maplit::{convert_args, hashmap, hashset}; use pgwire::pg_response::{PgResponse, StatementType}; use rand::Rng; use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; @@ -59,6 +59,7 @@ use risingwave_connector::source::{ POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; use risingwave_connector::WithPropertiesExt; +use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{EncodeType, FormatType}; @@ -86,7 +87,8 @@ use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; use crate::session::SessionImpl; use crate::utils::{ - resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options, OverwriteOptions, + resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option, + resolve_secret_ref_in_with_options, OverwriteOptions, }; use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions, WithOptionsSecResolved}; @@ -308,16 +310,36 @@ pub(crate) async fn bind_columns_from_source( const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; let options_with_secret = match with_properties { - Either::Left(options) => resolve_secret_ref_in_with_options(options.clone(), session)?, + Either::Left(options) => { + let (sec_resolve_props, connection_type) = + resolve_connection_ref_and_secret_ref(options.clone(), session)?; + if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) { + return Err(RwError::from(ProtocolError(format!( + "connection type {:?} is not allowed, allowed types: {:?}", + connection_type, ALLOWED_CONNECTION_CONNECTOR + )))); + } + + sec_resolve_props + } Either::Right(options_with_secret) => options_with_secret.clone(), }; let is_kafka: bool = options_with_secret.is_kafka_connector(); - let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options( + + // todo: need to resolve connection ref for schema registry + let (sec_resolve_props, connection_type) = resolve_connection_ref_and_secret_ref( WithOptions::try_from(format_encode.row_options())?, session, - )? - .into_parts(); + )?; + if !ALLOWED_CONNECTION_SCHEMA_REGISTRY.contains(&connection_type) { + return Err(RwError::from(ProtocolError(format!( + "connection type {:?} is not allowed, allowed types: {:?}", + connection_type, ALLOWED_CONNECTION_SCHEMA_REGISTRY + )))); + } + + let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts(); // Need real secret to access the schema registry let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( format_encode_options.clone(), @@ -542,11 +564,15 @@ fn bind_columns_from_source_for_cdc( session: &SessionImpl, format_encode: &FormatEncodeOptions, ) -> Result<(Option>, StreamSourceInfo)> { - let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options( - WithOptions::try_from(format_encode.row_options())?, - session, - )? - .into_parts(); + let with_options = WithOptions::try_from(format_encode.row_options())?; + if !with_options.connection_ref().is_empty() { + return Err(RwError::from(NotSupported( + "CDC connector does not support connection ref yet".to_string(), + "Explicitly specify the connection in WITH clause".to_string(), + ))); + } + let (format_encode_options, format_encode_secret_refs) = + resolve_secret_ref_in_with_options(with_options, session)?.into_parts(); // Need real secret to access the schema registry let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets( @@ -1049,6 +1075,21 @@ pub(super) fn bind_source_watermark( Ok(watermark_descs) } +static ALLOWED_CONNECTION_CONNECTOR: LazyLock> = LazyLock::new(|| { + hashset! { + PbConnectionType::Kafka, + PbConnectionType::Iceberg, + } +}); + +static ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock> = + LazyLock::new(|| { + hashset! { + PbConnectionType::Unspecified, + PbConnectionType::SchemaRegistry, + } + }); + // TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array static CONNECTORS_COMPATIBLE_FORMATS: LazyLock>>> = LazyLock::new(|| { @@ -1561,7 +1602,21 @@ pub async fn bind_create_source_or_table_with_connector( let mut with_properties = with_properties; resolve_privatelink_in_with_option(&mut with_properties)?; - let with_properties = resolve_secret_ref_in_with_options(with_properties, session)?; + let connector = with_properties.get_connector().unwrap(); + let (with_properties, connection_type) = + resolve_connection_ref_and_secret_ref(with_properties, session)?; + if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) { + return Err(RwError::from(ProtocolError(format!( + "connection type {:?} is not allowed, allowed types: {:?}", + connection_type, ALLOWED_CONNECTION_CONNECTOR + )))); + } + if !connector.eq(connection_type_to_connector(&connection_type)) { + return Err(RwError::from(ProtocolError(format!( + "connector {} and connection type {:?} are not compatible", + connector, connection_type + )))); + } let pk_names = bind_source_pk( &format_encode, @@ -1772,6 +1827,14 @@ fn row_encode_to_prost(row_encode: &Encode) -> EncodeType { } } +fn connection_type_to_connector(connection_type: &PbConnectionType) -> &str { + match connection_type { + PbConnectionType::Kafka => KAFKA_CONNECTOR, + PbConnectionType::Iceberg => ICEBERG_CONNECTOR, + _ => unreachable!(), + } +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 1e5dc489c1a0..e002fbfb121c 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -551,9 +551,9 @@ pub(crate) fn gen_create_table_plan( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let (_, secret_refs) = context.with_options().clone().into_parts(); - if !secret_refs.is_empty() { - return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference is not allowed in options when creating table without external source".to_string()).into()); + let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts(); + if !secret_refs.is_empty() || !connection_refs.is_empty() { + return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_string()).into()); } gen_create_table_plan_without_source( diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 27c527969f9b..f2ba78a125b9 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -89,10 +89,10 @@ pub async fn handle_create_as( let (graph, source, table) = { let context = OptimizerContext::from_handler_args(handler_args.clone()); - let (_, secret_refs) = context.with_options().clone().into_parts(); - if !secret_refs.is_empty() { + let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts(); + if !secret_refs.is_empty() || !connection_refs.is_empty() { return Err(crate::error::ErrorCode::InvalidParameterValue( - "Secret reference is not allowed in options for CREATE TABLE AS".to_string(), + "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_string(), ) .into()); } diff --git a/src/frontend/src/handler/create_view.rs b/src/frontend/src/handler/create_view.rs index 5ad0e8956b96..851c3a4fa89d 100644 --- a/src/frontend/src/handler/create_view.rs +++ b/src/frontend/src/handler/create_view.rs @@ -87,10 +87,11 @@ pub async fn handle_create_view( .collect() }; - let (properties, secret_refs) = properties.into_parts(); - if !secret_refs.is_empty() { + let (properties, secret_refs, connection_refs) = properties.into_parts(); + if !secret_refs.is_empty() || !connection_refs.is_empty() { return Err(crate::error::ErrorCode::InvalidParameterValue( - "Secret reference is not allowed in create view options".to_string(), + "Secret reference and Connection reference are not allowed in create view options" + .to_string(), ) .into()); } diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index d8ec8b44d827..ac64f133d21d 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -27,13 +27,14 @@ use risingwave_common::util::addr::HostAddr; use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; use risingwave_expr::scalar::like::{i_like_default, like_default}; use risingwave_pb::catalog::connection; +use risingwave_pb::secret::SecretRef; use risingwave_sqlparser::ast::{ display_comma_separated, Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter, }; use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt}; use crate::binder::{Binder, Relation}; -use crate::catalog::{CatalogError, IndexCatalog}; +use crate::catalog::{CatalogError, IndexCatalog, SecretId}; use crate::error::Result; use crate::handler::HandlerArgs; use crate::session::cursor_manager::SubscriptionCursor; @@ -413,6 +414,9 @@ pub async fn handle_show_object( connection::Info::PrivateLinkService(_) => { PRIVATELINK_CONNECTION.to_string() }, + connection::Info::ConnectionParams(params) => { + params.get_connection_type().unwrap().as_str_name().to_string() + } }; let source_names = schema .get_source_ids_by_connection(c.id) @@ -438,6 +442,18 @@ pub async fn handle_show_object( serde_json::to_string(&sink_names).unwrap(), ) } + connection::Info::ConnectionParams(params) => { + // todo: check secrets are not exposed + // todo: show dep relations + let print_secret_ref = |secret_ref: &SecretRef| -> String { + let secret_name = schema.get_secret_by_id(&SecretId::from(secret_ref.secret_id)).map(|s| s.name.as_str()).unwrap(); + format!("SECRET {} AS {}", secret_name, secret_ref.get_ref_as().unwrap().as_str_name()) + }; + let deref_secrets = params.get_secret_refs().iter().map(|(k, v)| (k.clone(), print_secret_ref(v))); + let mut props = params.get_properties().clone(); + props.extend(deref_secrets); + serde_json::to_string(&props).unwrap() + } }; ShowConnectionRow { name, diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 9d61021dab4f..870680482b2d 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -15,20 +15,22 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; +use risingwave_common::catalog::ConnectionId; use risingwave_connector::source::kafka::private_link::{ insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, }; pub use risingwave_connector::WithOptionsSecResolved; use risingwave_connector::WithPropertiesExt; +use risingwave_pb::catalog::connection::Info as ConnectionInfo; +use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::secret::secret_ref::PbRefAsType; use risingwave_pb::secret::PbSecretRef; use risingwave_sqlparser::ast::{ - CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, - CreateSubscriptionStatement, SecretRef, SecretRefAsType, SqlOption, Statement, Value, + ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, + CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption, Statement, Value, }; use super::OverwriteOptions; -use crate::catalog::ConnectionId; use crate::error::{ErrorCode, Result as RwResult, RwError}; use crate::session::SessionImpl; use crate::Binder; @@ -38,11 +40,14 @@ mod options { pub const RETENTION_SECONDS: &str = "retention_seconds"; } +const CONNECTION_REF_KEY: &str = "profile"; + /// Options or properties extracted from the `WITH` clause of DDLs. #[derive(Default, Clone, Debug, PartialEq, Eq, Hash)] pub struct WithOptions { inner: BTreeMap, - secret_ref: BTreeMap, + secret_ref: BTreeMap, + connection_ref: BTreeMap, } impl std::ops::Deref for WithOptions { @@ -65,12 +70,21 @@ impl WithOptions { Self { inner, secret_ref: Default::default(), + connection_ref: Default::default(), } } /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref. - pub fn new(inner: BTreeMap, secret_ref: BTreeMap) -> Self { - Self { inner, secret_ref } + pub fn new( + inner: BTreeMap, + secret_ref: BTreeMap, + connection_ref: BTreeMap, + ) -> Self { + Self { + inner, + secret_ref, + connection_ref, + } } pub fn inner_mut(&mut self) -> &mut BTreeMap { @@ -78,8 +92,14 @@ impl WithOptions { } /// Take the value of the option map and secret refs. - pub fn into_parts(self) -> (BTreeMap, BTreeMap) { - (self.inner, self.secret_ref) + pub fn into_parts( + self, + ) -> ( + BTreeMap, + BTreeMap, + BTreeMap, + ) { + (self.inner, self.secret_ref, self.connection_ref) } /// Convert to connector props, remove the key-value pairs used in the top-level. @@ -95,6 +115,7 @@ impl WithOptions { Self { inner, secret_ref: self.secret_ref, + connection_ref: self.connection_ref, } } @@ -119,6 +140,7 @@ impl WithOptions { Self { inner, secret_ref: self.secret_ref.clone(), + connection_ref: self.connection_ref.clone(), } } @@ -131,23 +153,26 @@ impl WithOptions { false } - pub fn secret_ref(&self) -> &BTreeMap { + pub fn secret_ref(&self) -> &BTreeMap { &self.secret_ref } - pub fn encode_options_to_map(sql_options: &[SqlOption]) -> RwResult> { - let WithOptions { inner, secret_ref } = WithOptions::try_from(sql_options)?; - if secret_ref.is_empty() { - Ok(inner) - } else { - Err(RwError::from(ErrorCode::InvalidParameterValue( - "Secret reference is not allowed in encode options".to_string(), - ))) - } + pub fn secret_ref_mut(&mut self) -> &mut BTreeMap { + &mut self.secret_ref + } + + pub fn connection_ref(&self) -> &BTreeMap { + &self.connection_ref + } + + pub fn connection_ref_mut(&mut self) -> &mut BTreeMap { + &mut self.connection_ref } pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult> { - let WithOptions { inner, secret_ref } = WithOptions::try_from(sql_options)?; + let WithOptions { + inner, secret_ref, .. + } = WithOptions::try_from(sql_options)?; if secret_ref.is_empty() { Ok(inner) } else { @@ -158,12 +183,88 @@ impl WithOptions { } } +pub(crate) fn resolve_connection_ref_and_secret_ref( + with_options: WithOptions, + session: &SessionImpl, +) -> RwResult<(WithOptionsSecResolved, PbConnectionType)> { + let db_name: &str = session.database(); + let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts(); + + let mut connection_params = None; + for connection_ref in connection_refs.values() { + // at most one connection ref in the map + connection_params = { + // get connection params from catalog + let (schema_name, connection_name) = Binder::resolve_schema_qualified_name( + db_name, + connection_ref.connection_name.clone(), + )?; + let connection_catalog = + session.get_connection_by_name(schema_name, &connection_name)?; + if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info { + Some(params.clone()) + } else { + return Err(RwError::from(ErrorCode::InvalidParameterValue( + "Private Link Service has been deprecated. Please create a new connection instead." + .to_string(), + ))); + } + }; + } + + let mut inner_secret_refs = { + let mut resolved_secret_refs = BTreeMap::new(); + for (key, secret_ref) in secret_refs { + let (schema_name, secret_name) = + Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?; + let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?; + let ref_as = match secret_ref.ref_as { + SecretRefAsType::Text => PbRefAsType::Text, + SecretRefAsType::File => PbRefAsType::File, + }; + let pb_secret_ref = PbSecretRef { + secret_id: secret_catalog.id.secret_id(), + ref_as: ref_as.into(), + }; + resolved_secret_refs.insert(key.clone(), pb_secret_ref); + } + resolved_secret_refs + }; + + let mut connection_type = PbConnectionType::Unspecified; + if let Some(connection_params) = connection_params { + connection_type = connection_params.connection_type(); + for (k, v) in connection_params.properties { + if options.insert(k.clone(), v).is_some() { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated key: {}", + k + )))); + } + } + + for (k, v) in connection_params.secret_refs { + if inner_secret_refs.insert(k.clone(), v).is_some() { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated key: {}", + k + )))); + } + } + } + + Ok(( + WithOptionsSecResolved::new(options, inner_secret_refs), + connection_type, + )) +} + /// Get the secret id from the name. pub(crate) fn resolve_secret_ref_in_with_options( with_options: WithOptions, session: &SessionImpl, ) -> RwResult { - let (options, secret_refs) = with_options.into_parts(); + let (options, secret_refs, _) = with_options.into_parts(); let mut resolved_secret_refs = BTreeMap::new(); let db_name: &str = session.database(); for (key, secret_ref) in secret_refs { @@ -207,17 +308,40 @@ impl TryFrom<&[SqlOption]> for WithOptions { fn try_from(options: &[SqlOption]) -> Result { let mut inner: BTreeMap = BTreeMap::new(); - let mut secret_ref: BTreeMap = BTreeMap::new(); + let mut secret_ref: BTreeMap = BTreeMap::new(); + let mut connection_ref: BTreeMap = BTreeMap::new(); for option in options { let key = option.name.real_value(); - if let Value::Ref(r) = &option.value { - if secret_ref.insert(key.clone(), r.clone()).is_some() || inner.contains_key(&key) { - return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( - "Duplicated option: {}", - key - )))); + match &option.value { + Value::SecretRef(r) => { + if secret_ref.insert(key.clone(), r.clone()).is_some() + || inner.contains_key(&key) + { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + continue; } - continue; + Value::ConnectionRef(r) => { + if key != CONNECTION_REF_KEY { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "expect 'profile' as the key for connection ref, but got: {}", + key + )))); + } + if connection_ref.insert(key.clone(), r.clone()).is_some() + || inner.contains_key(&key) + { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "Duplicated option: {}", + key + )))); + } + continue; + } + _ => {} } let value: String = match option.value.clone() { Value::CstyleEscapedString(s) => s.value, @@ -239,7 +363,11 @@ impl TryFrom<&[SqlOption]> for WithOptions { } } - Ok(Self { inner, secret_ref }) + Ok(Self { + inner, + secret_ref, + connection_ref, + }) } } diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index b84a29891eee..6cffc3413f3e 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -24,6 +24,7 @@ mod m20240820_081248_add_time_travel_per_table_epoch; mod m20240911_083152_variable_vnode_count; mod m20241016_065621_hummock_gc_history; mod m20241025_062548_singleton_vnode_count; +mod m20241103_043732_connection_params; mod utils; pub struct Migrator; @@ -86,6 +87,7 @@ impl MigratorTrait for Migrator { Box::new(m20240911_083152_variable_vnode_count::Migration), Box::new(m20241016_065621_hummock_gc_history::Migration), Box::new(m20241025_062548_singleton_vnode_count::Migration), + Box::new(m20241103_043732_connection_params::Migration), ] } } diff --git a/src/meta/model/migration/src/m20241103_043732_connection_params.rs b/src/meta/model/migration/src/m20241103_043732_connection_params.rs new file mode 100644 index 000000000000..f2e03bafd920 --- /dev/null +++ b/src/meta/model/migration/src/m20241103_043732_connection_params.rs @@ -0,0 +1,35 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Connection::Table) + .add_column(ColumnDef::new(Connection::Params).binary().not_null()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Connection::Table) + .drop_column(Connection::Params) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Connection { + Table, + Params, +} diff --git a/src/meta/model/src/connection.rs b/src/meta/model/src/connection.rs index dce0daa462fc..eb93a1d82fd3 100644 --- a/src/meta/model/src/connection.rs +++ b/src/meta/model/src/connection.rs @@ -18,7 +18,7 @@ use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; use serde::{Deserialize, Serialize}; -use crate::{ConnectionId, PrivateLinkService}; +use crate::{ConnectionId, ConnectionParams, PrivateLinkService}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "connection")] @@ -27,8 +27,9 @@ pub struct Model { pub connection_id: ConnectionId, pub name: String, - // todo: Private link service has been deprecated, consider using a new field for the connection info + // Private link service has been deprecated pub info: PrivateLinkService, + pub params: ConnectionParams, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -69,14 +70,15 @@ impl ActiveModelBehavior for ActiveModel {} impl From for ActiveModel { fn from(conn: PbConnection) -> Self { - let Some(PbInfo::PrivateLinkService(private_link_srv)) = conn.info else { - unreachable!("private link not provided.") + let Some(PbInfo::ConnectionParams(connection_params)) = conn.info else { + unreachable!("private link has been deprecated.") }; Self { connection_id: Set(conn.id as _), name: Set(conn.name), - info: Set(PrivateLinkService::from(&private_link_srv)), + info: Set(PrivateLinkService::default()), + params: Set(ConnectionParams::from(&connection_params)), } } } diff --git a/src/meta/model/src/lib.rs b/src/meta/model/src/lib.rs index 6610484a8918..335169e9b757 100644 --- a/src/meta/model/src/lib.rs +++ b/src/meta/model/src/lib.rs @@ -397,6 +397,7 @@ derive_from_blob!( PrivateLinkService, risingwave_pb::catalog::connection::PbPrivateLinkService ); +derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams); derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo); derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits); diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index e59c4a410014..674515fcf87c 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -24,7 +24,8 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; use risingwave_meta::rpc::metrics::MetaMetrics; -use risingwave_pb::catalog::{Comment, CreateType, Secret, Table}; +use risingwave_pb::catalog::connection::Info as ConnectionInfo; +use risingwave_pb::catalog::{Comment, Connection, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; use risingwave_pb::ddl_service::ddl_service_server::DdlService; @@ -730,7 +731,22 @@ impl DdlService for DdlServiceImpl { create_connection_request::Payload::PrivateLink(_) => { panic!("Private Link Connection has been deprecated") } - }; + create_connection_request::Payload::ConnectionParams(params) => { + let pb_connection = Connection { + id: 0, + schema_id: req.schema_id, + database_id: req.database_id, + name: req.name, + info: Some(ConnectionInfo::ConnectionParams(params)), + owner: req.owner_id, + }; + let version = self + .ddl_controller + .run_command(DdlCommand::CreateConnection(pb_connection)) + .await?; + Ok(Response::new(CreateConnectionResponse { version })) + } + } } async fn list_connections( diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 2bdd59ea3912..ada0f027baa7 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -36,6 +36,7 @@ use risingwave_meta_model::{ SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, ViewId, }; +use risingwave_pb::catalog::connection::Info as ConnectionInfo; use risingwave_pb::catalog::subscription::SubscriptionState; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ @@ -1464,6 +1465,16 @@ impl CatalogController { ensure_object_id(ObjectType::Schema, pb_connection.schema_id as _, &txn).await?; check_connection_name_duplicate(&pb_connection, &txn).await?; + let mut dep_secrets = HashSet::new(); + if let Some(ConnectionInfo::ConnectionParams(params)) = &pb_connection.info { + dep_secrets.extend( + params + .secret_refs + .values() + .map(|secret_ref| secret_ref.secret_id), + ); + } + let conn_obj = Self::create_object( &txn, ObjectType::Connection, @@ -1476,6 +1487,16 @@ impl CatalogController { let connection: connection::ActiveModel = pb_connection.clone().into(); Connection::insert(connection).exec(&txn).await?; + for secret_id in dep_secrets { + ObjectDependency::insert(object_dependency::ActiveModel { + oid: Set(secret_id as _), + used_by: Set(conn_obj.oid), + ..Default::default() + }) + .exec(&txn) + .await?; + } + txn.commit().await?; let version = self diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index c7cf45daad9e..3f89c6e42ef6 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -19,7 +19,7 @@ use risingwave_common::hash::VnodeCount; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model::{ connection, database, function, index, object, schema, secret, sink, source, subscription, - table, view, + table, view, PrivateLinkService, }; use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; @@ -327,15 +327,18 @@ impl From> for PbView { impl From> for PbConnection { fn from(value: ObjectModel) -> Self { + let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() { + PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf()) + } else { + PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf()) + }; Self { id: value.1.oid as _, schema_id: value.1.schema_id.unwrap() as _, database_id: value.1.database_id.unwrap() as _, name: value.0.name, owner: value.1.owner_id as _, - info: Some(PbConnectionInfo::PrivateLinkService( - value.0.info.to_protobuf(), - )), + info: Some(info), } } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index fd336a1d0a5b..0921472aae0e 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -482,6 +482,7 @@ impl DdlController { } async fn create_connection(&self, connection: Connection) -> MetaResult { + // todo: do validation here self.metadata_manager .catalog_controller .create_connection(connection) diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 563dc66be478..c737913d43ad 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -52,8 +52,8 @@ pub use self::query::{ }; pub use self::statement::*; pub use self::value::{ - CstyleEscapedString, DateTimeField, DollarQuotedString, JsonPredicateType, SecretRef, - SecretRefAsType, TrimWhereField, Value, + ConnectionRefValue, CstyleEscapedString, DateTimeField, DollarQuotedString, JsonPredicateType, + SecretRefAsType, SecretRefValue, TrimWhereField, Value, }; pub use crate::ast::ddl::{ AlterIndexOperation, AlterSinkOperation, AlterSourceOperation, AlterSubscriptionOperation, diff --git a/src/sqlparser/src/ast/value.rs b/src/sqlparser/src/ast/value.rs index 2bf8a6fdf3a0..051e0814a3d9 100644 --- a/src/sqlparser/src/ast/value.rs +++ b/src/sqlparser/src/ast/value.rs @@ -60,7 +60,9 @@ pub enum Value { /// `NULL` value Null, /// name of the reference to secret - Ref(SecretRef), + SecretRef(SecretRefValue), + /// name of the reference to connection + ConnectionRef(ConnectionRefValue), } impl fmt::Display for Value { @@ -115,7 +117,8 @@ impl fmt::Display for Value { Ok(()) } Value::Null => write!(f, "NULL"), - Value::Ref(v) => write!(f, "secret {}", v), + Value::SecretRef(v) => write!(f, "secret {}", v), + Value::ConnectionRef(v) => write!(f, "connection {}", v), } } } @@ -240,12 +243,12 @@ impl fmt::Display for JsonPredicateType { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct SecretRef { +pub struct SecretRefValue { pub secret_name: ObjectName, pub ref_as: SecretRefAsType, } -impl fmt::Display for SecretRef { +impl fmt::Display for SecretRefValue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.ref_as { SecretRefAsType::Text => write!(f, "{}", self.secret_name), @@ -260,3 +263,15 @@ pub enum SecretRefAsType { Text, File, } + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct ConnectionRefValue { + pub connection_name: ObjectName, +} + +impl fmt::Display for ConnectionRefValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.connection_name) + } +} diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 7d01a2b35cf6..8ee17617c4eb 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3641,11 +3641,15 @@ impl Parser<'_> { } else { SecretRefAsType::Text }; - Ok(Value::Ref(SecretRef { + Ok(Value::SecretRef(SecretRefValue { secret_name, ref_as, })) } + Keyword::CONNECTION => { + let connection_name = self.parse_object_name()?; + Ok(Value::ConnectionRef(ConnectionRefValue { connection_name })) + } _ => self.expected_at(checkpoint, "a concrete value"), }, Token::Number(ref n) => Ok(Value::Number(n.clone())),