From 16e3ebc830caa5902d13ea2583b53cd892b23955 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 25 Oct 2024 13:30:38 +0800 Subject: [PATCH] feat(mysql-cdc): support mysql source capture multiple databases (#19038) Signed-off-by: xxchan --- e2e_test/source/cdc/cdc.share_stream.slt | 39 ++++++++++++++++++ .../source/common/DbzConnectorConfig.java | 7 ++++ .../source/common/MySqlValidator.java | 26 ++++++++++-- .../main/resources/validate_sql.properties | 1 + src/frontend/src/handler/create_table.rs | 40 +++++++++---------- 5 files changed, 87 insertions(+), 26 deletions(-) diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index b180a67c2cb2..68a46db42833 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -4,9 +4,17 @@ control substitution on system ok mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;" +system ok +mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS kdb; CREATE DATABASE kdb;" + system ok mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_create.sql +system ok +mysql --protocol=tcp -u root kdb -e " CREATE TABLE kt1 (id int primary key, v1 varchar(32)); + INSERT INTO kt1 VALUES (1,'aaa'),(2,'bbb'); +" + # generate data to mysql system ok mysql --protocol=tcp -u root mytest < e2e_test/source/cdc/mysql_init_data.sql @@ -28,6 +36,7 @@ create source mysql_mytest with ( server.id = '5601' ); + statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source create materialized view mv as select * from mysql_mytest; @@ -70,6 +79,19 @@ SINGLE {STREAM_SCAN} SINGLE {CDC_FILTER} HASH {SOURCE,DML} + +statement ok +create source mysql_source with ( + connector = 'mysql-cdc', + hostname = '${MYSQL_HOST:localhost}', + port = '${MYSQL_TCP_PORT:8306}', + username = 'rwcdc', + password = secret mysql_pwd, + database.name = 'mytest,kdb', + server.id = '5602' +); + + statement ok CREATE TABLE IF NOT EXISTS mysql_all_types( c_boolean boolean, @@ -112,6 +134,10 @@ create table orders_test ( PRIMARY KEY (order_id) ) from mysql_mytest table 'mytest.orders'; + +statement ok +create table kt1 (*) from mysql_source table 'kdb.kt1'; + statement ok create materialized view products_test_cnt as select count(*) as cnt from rw.products_test; @@ -121,6 +147,9 @@ create materialized view orders_test_cnt as select count(*) as cnt from orders_t system ok mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');" +system ok +mysql --protocol=tcp -u root kdb -e "INSERT INTO kt1 VALUES (3, 'ccc'),(4, 'ddd');" + system ok mysql --protocol=tcp -u root mytest -e "FLUSH LOGS" @@ -146,6 +175,16 @@ select count(*) from orders_no_backfill ---- 0 + +query IT +select * from kt1 order by id; +---- +1 aaa +2 bbb +3 ccc +4 ddd + + # check ingestion results query I SELECT * from products_test_cnt diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index ccf279dde4ee..872d6e900c91 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -174,6 +174,13 @@ public DbzConnectorConfig( dbzProps.putAll(mysqlProps); + if (isCdcSourceJob) { + // remove table filtering for the shared MySQL source, since we + // allow user to ingest tables in different database + LOG.info("Disable table filtering for the shared MySQL source"); + dbzProps.remove("table.include.list"); + } + } else if (source == SourceTypeE.POSTGRES) { var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index cd7b6d29bb41..00a5f9fc03ad 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -15,7 +15,6 @@ package com.risingwave.connector.source.common; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.proto.Data; import java.sql.Connection; import java.sql.DriverManager; @@ -45,9 +44,7 @@ public MySqlValidator( var dbHost = userProps.get(DbzConnectorConfig.HOST); var dbPort = userProps.get(DbzConnectorConfig.PORT); - var dbName = userProps.get(DbzConnectorConfig.DB_NAME); - var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.MYSQL, dbHost, dbPort, dbName); - + var jdbcUrl = String.format("jdbc:mysql://%s:%s", dbHost, dbPort); var properties = new Properties(); properties.setProperty("user", userProps.get(DbzConnectorConfig.USER)); properties.setProperty("password", userProps.get(DbzConnectorConfig.PASSWORD)); @@ -72,6 +69,27 @@ public void validateDbConfig() { if ((major > 8) || (major == 8 && minor >= 4)) { throw ValidatorUtils.failedPrecondition("MySQL version should be less than 8.4"); } + + // "database.name" is a comma-separated list of database names + var dbNames = userProps.get(DbzConnectorConfig.DB_NAME); + for (var dbName : dbNames.split(",")) { + // check the existence of the database + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("mysql.check_db_exist"))) { + stmt.setString(1, dbName.trim()); + var res = stmt.executeQuery(); + while (res.next()) { + var ret = res.getInt(1); + if (ret == 0) { + throw ValidatorUtils.invalidArgument( + String.format( + "MySQL database '%s' doesn't exist", dbName.trim())); + } + } + } + } + validateBinlogConfig(); } catch (SQLException e) { throw ValidatorUtils.internalError(e.getMessage()); diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index cb092acb5a26..6d565381d451 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -4,6 +4,7 @@ mysql.bin_row_image=show variables like 'binlog_row_image' mysql.table=SELECT count(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? mysql.table_schema=SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION mysql.grants=SHOW GRANTS FOR CURRENT_USER() +mysql.check_db_exist=SELECT count(*) FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = ? postgres.wal=show wal_level postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND tablename = ?) postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 366e63aec207..aae250c1d47d 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -876,18 +876,17 @@ fn derive_with_options_for_cdc_table( ) -> Result { use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR}; // we should remove the prefix from `full_table_name` - let mut connect_properties = source_with_properties.clone(); + let mut with_options = source_with_properties.clone(); if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) { - let table_name = match connector.as_str() { + match connector.as_str() { MYSQL_CDC_CONNECTOR => { - let db_name = connect_properties.get(DATABASE_NAME_KEY).ok_or_else(|| { - anyhow!("{} not found in source properties", DATABASE_NAME_KEY) + // MySQL doesn't allow '.' in database name and table name, so we can split the + // external table name by '.' to get the table name + let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| { + anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'") })?; - - let prefix = format!("{}.", db_name.as_str()); - external_table_name - .strip_prefix(prefix.as_str()) - .ok_or_else(|| anyhow!("The upstream table name must contain database name prefix, e.g. 'mydb.table'."))? + with_options.insert(DATABASE_NAME_KEY.into(), db_name.into()); + with_options.insert(TABLE_NAME_KEY.into(), table_name.into()); } POSTGRES_CDC_CONNECTOR => { let (schema_name, table_name) = external_table_name @@ -895,9 +894,8 @@ fn derive_with_options_for_cdc_table( .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?; // insert 'schema.name' into connect properties - connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); - - table_name + with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); + with_options.insert(TABLE_NAME_KEY.into(), table_name.into()); } SQL_SERVER_CDC_CONNECTOR => { // SQL Server external table name is in 'databaseName.schemaName.tableName' pattern, @@ -915,9 +913,8 @@ fn derive_with_options_for_cdc_table( })?; // insert 'schema.name' into connect properties - connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); - - table_name + with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); + with_options.insert(TABLE_NAME_KEY.into(), table_name.into()); } _ => { return Err(RwError::from(anyhow!( @@ -926,9 +923,8 @@ fn derive_with_options_for_cdc_table( ))); } }; - connect_properties.insert(TABLE_NAME_KEY.into(), table_name.into()); } - Ok(connect_properties) + Ok(with_options) } #[allow(clippy::too_many_arguments)] @@ -1025,7 +1021,7 @@ pub(super) async fn handle_create_table_plan( )?; source.clone() }; - let connect_properties = derive_with_options_for_cdc_table( + let cdc_with_options = derive_with_options_for_cdc_table( &source.with_properties, cdc_table.external_table_name.clone(), )?; @@ -1033,7 +1029,7 @@ pub(super) async fn handle_create_table_plan( let (columns, pk_names) = derive_schema_for_cdc_table( &column_defs, &constraints, - connect_properties.clone(), + cdc_with_options.clone(), wildcard_idx.is_some(), None, ) @@ -1048,7 +1044,7 @@ pub(super) async fn handle_create_table_plan( column_defs, columns, pk_names, - connect_properties, + cdc_with_options, col_id_gen, on_conflict, with_version_column, @@ -1153,7 +1149,7 @@ struct CdcSchemaChangeArgs { async fn derive_schema_for_cdc_table( column_defs: &Vec, constraints: &Vec, - connect_properties: WithOptionsSecResolved, + cdc_with_options: WithOptionsSecResolved, need_auto_schema_map: bool, schema_change_args: Option, ) -> Result<(Vec, Vec)> { @@ -1167,7 +1163,7 @@ async fn derive_schema_for_cdc_table( "Please define the schema manually".to_owned(), ) })?; - let (options, secret_refs) = connect_properties.into_parts(); + let (options, secret_refs) = cdc_with_options.into_parts(); let config = ExternalTableConfig::try_from_btreemap(options, secret_refs) .context("failed to extract external table config")?;