Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Nov 6, 2024
1 parent 23b2011 commit 673bccb
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 62 deletions.
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ message ConnectionParams {
CONNECTION_TYPE_UNSPECIFIED = 0;
CONNECTION_TYPE_KAFKA = 1;
CONNECTION_TYPE_ICEBERG = 2;
CONNECTION_TYPE_SCHEMA_REGISTRY = 3;
}

ConnectionType connection_type = 1;
Expand Down
4 changes: 2 additions & 2 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,9 @@ message CreateConnectionRequest {
uint32 schema_id = 3;
oneof payload {
PrivateLink private_link = 4 [deprecated = true];
catalog.ConnectionParams connection_params = 7;
catalog.ConnectionParams connection_params = 6;
}
uint32 owner_id = 6;
uint32 owner_id = 5;
}

message CreateConnectionResponse {
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/handler/create_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ fn resolve_create_connection_payload(
with_properties: WithOptions,
session: &SessionImpl,
) -> Result<create_connection_request::Payload> {
if !with_properties.connection_ref().is_empty() {
return Err(RwError::from(ErrorCode::InvalidParameterValue(
"Connection reference is not allowed in options in CREATE CONNECTION".to_string(),
)));
}

let (mut props, secret_refs) =
resolve_secret_ref_in_with_options(with_properties, session)?.into_parts();
let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?;
Expand Down
88 changes: 75 additions & 13 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::rc::Rc;
use std::sync::LazyLock;

use anyhow::{anyhow, Context};
use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use maplit::{convert_args, hashmap, hashset};
use pgwire::pg_response::{PgResponse, StatementType};
use rand::Rng;
use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert};
Expand Down Expand Up @@ -59,6 +59,7 @@ use risingwave_connector::source::{
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::connection_params::PbConnectionType;
use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc};
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{EncodeType, FormatType};
Expand Down Expand Up @@ -86,7 +87,8 @@ use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext};
use crate::session::SessionImpl;
use crate::utils::{
resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options, OverwriteOptions,
resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option,
resolve_secret_ref_in_with_options, OverwriteOptions,
};
use crate::{bind_data_type, build_graph, OptimizerContext, WithOptions, WithOptionsSecResolved};

Expand Down Expand Up @@ -308,16 +310,36 @@ pub(crate) async fn bind_columns_from_source(
const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";

let options_with_secret = match with_properties {
Either::Left(options) => resolve_secret_ref_in_with_options(options.clone(), session)?,
Either::Left(options) => {
let (sec_resolve_props, connection_type) =
resolve_connection_ref_and_secret_ref(options.clone(), session)?;
if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
return Err(RwError::from(ProtocolError(format!(
"connection type {:?} is not allowed, allowed types: {:?}",
connection_type, ALLOWED_CONNECTION_CONNECTOR
))));
}

sec_resolve_props
}
Either::Right(options_with_secret) => options_with_secret.clone(),
};

let is_kafka: bool = options_with_secret.is_kafka_connector();
let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options(

// todo: need to resolve connection ref for schema registry
let (sec_resolve_props, connection_type) = resolve_connection_ref_and_secret_ref(
WithOptions::try_from(format_encode.row_options())?,
session,
)?
.into_parts();
)?;
if !ALLOWED_CONNECTION_SCHEMA_REGISTRY.contains(&connection_type) {
return Err(RwError::from(ProtocolError(format!(
"connection type {:?} is not allowed, allowed types: {:?}",
connection_type, ALLOWED_CONNECTION_SCHEMA_REGISTRY
))));
}

let (format_encode_options, format_encode_secret_refs) = sec_resolve_props.into_parts();
// Need real secret to access the schema registry
let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
format_encode_options.clone(),
Expand Down Expand Up @@ -542,11 +564,15 @@ fn bind_columns_from_source_for_cdc(
session: &SessionImpl,
format_encode: &FormatEncodeOptions,
) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
let (format_encode_options, format_encode_secret_refs) = resolve_secret_ref_in_with_options(
WithOptions::try_from(format_encode.row_options())?,
session,
)?
.into_parts();
let with_options = WithOptions::try_from(format_encode.row_options())?;
if !with_options.connection_ref().is_empty() {
return Err(RwError::from(NotSupported(
"CDC connector does not support connection ref yet".to_string(),
"Explicitly specify the connection in WITH clause".to_string(),
)));
}
let (format_encode_options, format_encode_secret_refs) =
resolve_secret_ref_in_with_options(with_options, session)?.into_parts();

// Need real secret to access the schema registry
let mut format_encode_options_to_consume = LocalSecretManager::global().fill_secrets(
Expand Down Expand Up @@ -1049,6 +1075,20 @@ pub(super) fn bind_source_watermark(
Ok(watermark_descs)
}

static ALLOWED_CONNECTION_CONNECTOR: LazyLock<HashSet<PbConnectionType>> = LazyLock::new(|| {
hashset! {
PbConnectionType::Kafka,
PbConnectionType::Iceberg,
}
});

static ALLOWED_CONNECTION_SCHEMA_REGISTRY: LazyLock<HashSet<PbConnectionType>> =
LazyLock::new(|| {
hashset! {
PbConnectionType::SchemaRegistry,
}
});

// TODO: Better design if we want to support ENCODE KEY where we will have 4 dimensional array
static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, Vec<Encode>>>> =
LazyLock::new(|| {
Expand Down Expand Up @@ -1561,7 +1601,21 @@ pub async fn bind_create_source_or_table_with_connector(
let mut with_properties = with_properties;
resolve_privatelink_in_with_option(&mut with_properties)?;

let with_properties = resolve_secret_ref_in_with_options(with_properties, session)?;
let connector = with_properties.get_connector().unwrap();
let (with_properties, connection_type) =
resolve_connection_ref_and_secret_ref(with_properties, session)?;
if !ALLOWED_CONNECTION_CONNECTOR.contains(&connection_type) {
return Err(RwError::from(ProtocolError(format!(
"connection type {:?} is not allowed, allowed types: {:?}",
connection_type, ALLOWED_CONNECTION_CONNECTOR
))));
}
if !connector.eq(connection_type_to_connector(&connection_type)) {
return Err(RwError::from(ProtocolError(format!(
"connector {} and connection type {:?} are not compatible",
connector, connection_type
))));
}

let pk_names = bind_source_pk(
&format_encode,
Expand Down Expand Up @@ -1772,6 +1826,14 @@ fn row_encode_to_prost(row_encode: &Encode) -> EncodeType {
}
}

fn connection_type_to_connector(connection_type: &PbConnectionType) -> &str {
match connection_type {
PbConnectionType::Kafka => KAFKA_CONNECTOR,
PbConnectionType::Iceberg => ICEBERG_CONNECTOR,
_ => unreachable!(),
}
}

#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,9 +551,9 @@ pub(crate) fn gen_create_table_plan(
c.column_desc.column_id = col_id_gen.generate(c.name())
}

let (_, secret_refs) = context.with_options().clone().into_parts();
if !secret_refs.is_empty() {
return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference is not allowed in options when creating table without external source".to_string()).into());
let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
if !secret_refs.is_empty() || !connection_refs.is_empty() {
return Err(crate::error::ErrorCode::InvalidParameterValue("Secret reference and Connection reference are not allowed in options when creating table without external source".to_string()).into());
}

gen_create_table_plan_without_source(
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ pub async fn handle_create_as(

let (graph, source, table) = {
let context = OptimizerContext::from_handler_args(handler_args.clone());
let (_, secret_refs) = context.with_options().clone().into_parts();
if !secret_refs.is_empty() {
let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
if !secret_refs.is_empty() || !connection_refs.is_empty() {
return Err(crate::error::ErrorCode::InvalidParameterValue(
"Secret reference is not allowed in options for CREATE TABLE AS".to_string(),
"Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_string(),
)
.into());
}
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/handler/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ pub async fn handle_create_view(
.collect()
};

let (properties, secret_refs) = properties.into_parts();
if !secret_refs.is_empty() {
let (properties, secret_refs, connection_refs) = properties.into_parts();
if !secret_refs.is_empty() || !connection_refs.is_empty() {
return Err(crate::error::ErrorCode::InvalidParameterValue(
"Secret reference is not allowed in create view options".to_string(),
"Secret reference and Connection reference are not allowed in create view options"
.to_string(),
)
.into());
}
Expand Down
12 changes: 10 additions & 2 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ use risingwave_common::util::addr::HostAddr;
use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
use risingwave_expr::scalar::like::{i_like_default, like_default};
use risingwave_pb::catalog::connection;
use risingwave_pb::secret::SecretRef;
use risingwave_sqlparser::ast::{
display_comma_separated, Ident, ObjectName, ShowCreateType, ShowObject, ShowStatementFilter,
};

use super::{fields_to_descriptors, RwPgResponse, RwPgResponseBuilderExt};
use crate::binder::{Binder, Relation};
use crate::catalog::{CatalogError, IndexCatalog};
use crate::catalog::{CatalogError, IndexCatalog, SecretId};
use crate::error::Result;
use crate::handler::HandlerArgs;
use crate::session::cursor_manager::SubscriptionCursor;
Expand Down Expand Up @@ -444,7 +445,14 @@ pub async fn handle_show_object(
connection::Info::ConnectionParams(params) => {
// todo: check secrets are not exposed
// todo: show dep relations
serde_json::to_string(&params.get_properties()).unwrap()
let print_secret_ref = |secret_ref: &SecretRef| -> String {
let secret_name = schema.get_secret_by_id(&SecretId::from(secret_ref.secret_id)).map(|s| s.name.as_str()).unwrap();
format!("SECRET {} AS {}", secret_name, secret_ref.get_ref_as().unwrap().as_str_name())
};
let deref_secrets = params.get_secret_refs().iter().map(|(k, v)| (k.clone(), print_secret_ref(v)));
let mut props = params.get_properties().clone();
props.extend(deref_secrets);
serde_json::to_string(&props).unwrap()
}
};
ShowConnectionRow {
Expand Down
Loading

0 comments on commit 673bccb

Please sign in to comment.