Skip to content

Commit

Permalink
refactor: move server.id for MySQL CDC from meta to fe
Browse files Browse the repository at this point in the history
motivation is to simplify create stream job process in meta

Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan committed Oct 29, 2024
1 parent 193e93f commit 3f64c85
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 22 deletions.
9 changes: 9 additions & 0 deletions src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::secret::PbSecretRef;

use crate::sink::catalog::SinkFormatDesc;
use crate::source::cdc::external::CdcTableType;
use crate::source::cdc::MYSQL_CDC_CONNECTOR;
use crate::source::iceberg::ICEBERG_CONNECTOR;
use crate::source::{
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
Expand Down Expand Up @@ -104,6 +105,14 @@ pub trait WithPropertiesExt: Get + Sized {
connector == KAFKA_CONNECTOR
}

#[inline(always)]
fn is_mysql_cdc_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
return false;
};
connector == MYSQL_CDC_CONNECTOR
}

#[inline(always)]
fn is_cdc_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use rand::Rng;
use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use either::Either;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use rand::Rng;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{
CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME,
Expand Down Expand Up @@ -896,6 +897,13 @@ fn derive_with_options_for_cdc_table(
})?;
with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
// Generate a random server id for mysql cdc source if needed

// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
with_options
.entry("server.id".to_string())
.or_insert(rand::thread_rng().gen_range(1..u32::MAX).to_string());
}
POSTGRES_CDC_CONNECTOR => {
let (schema_name, table_name) = external_table_name
Expand Down
22 changes: 0 additions & 22 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::time::Duration;

use anyhow::{anyhow, Context};
use itertools::Itertools;
use rand::Rng;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
Expand All @@ -32,10 +31,8 @@ use risingwave_common::util::stream_graph_visitor::{
};
use risingwave_common::{bail, hash, must_match};
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::cdc::CdcSourceType;
use risingwave_connector::source::{
ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator,
UPSTREAM_SOURCE_KEY,
};
use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved};
use risingwave_meta_model::object::ObjectType;
Expand Down Expand Up @@ -2009,25 +2006,6 @@ pub fn fill_table_stream_graph_info(
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)
if let Some(connector) = source.with_properties.get(UPSTREAM_SOURCE_KEY)
&& matches!(
CdcSourceType::from(connector.as_str()),
CdcSourceType::Mysql
)
{
let props = &mut source_node.source_inner.as_mut().unwrap().with_properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());

// make these two `Source` consistent
props.clone_into(&mut source.with_properties);
}

assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
Expand Down

0 comments on commit 3f64c85

Please sign in to comment.