Skip to content

Commit

Permalink
- table with connector: filled when creating job catalog https://gith…
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Oct 29, 2024
1 parent a568646 commit 0dab5fe
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 96 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ c583e2c6c054764249acf484438c7bf7197765f4

# chore: replace all ProstXxx with PbXxx (#8621)
6fd8821f2e053957b183d648bea9c95b6703941f

# chore: cleanup v2 naming for sql metastore (#18941)
9a6a7f9052d5679165ff57cc01417c742c95351c
37 changes: 19 additions & 18 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info;
use risingwave_meta::rpc::metrics::MetaMetrics;
use risingwave_pb::catalog::connection::private_link_service::{
PbPrivateLinkProvider, PrivateLinkProvider,
Expand Down Expand Up @@ -94,27 +93,26 @@ impl DdlServiceImpl {
}
}

fn extract_replace_table_info(change: ReplaceTablePlan) -> ReplaceTableInfo {
let job_type = change.get_job_type().unwrap_or_default();
let mut source = change.source;
let mut fragment_graph = change.fragment_graph.unwrap();
let mut table = change.table.unwrap();
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) =
table.optional_associated_source_id
{
source.as_mut().unwrap().id = source_id;
fill_table_stream_graph_info(&mut source, &mut table, job_type, &mut fragment_graph);
}
let table_col_index_mapping = change
.table_col_index_mapping
fn extract_replace_table_info(
ReplaceTablePlan {
table,
fragment_graph,
table_col_index_mapping,
source,
job_type,
}: ReplaceTablePlan,
) -> ReplaceTableInfo {
let job_type = get_job_type().unwrap_or_default();
let source = source.unwrap();
let table = table.unwrap();
let col_index_mapping = table_col_index_mapping
.as_ref()
.map(ColIndexMapping::from_protobuf);

let stream_job = StreamingJob::Table(source, table, job_type);
ReplaceTableInfo {
streaming_job: stream_job,
fragment_graph,
col_index_mapping: table_col_index_mapping,
streaming_job: StreamingJob::Table(Some(source), table, job_type),
fragment_graph: fragment_graph.unwrap(),
col_index_mapping,
}
}
}
Expand Down Expand Up @@ -337,6 +335,7 @@ impl DdlService for DdlServiceImpl {
let command = DdlCommand::DropStreamingJob(
StreamingJobId::Sink(sink_id as _),
drop_mode,
// FIXME: need to fill source id, etc here?
request
.affected_table_change
.map(Self::extract_replace_table_info),
Expand Down Expand Up @@ -635,6 +634,7 @@ impl DdlService for DdlServiceImpl {
) -> Result<Response<ReplaceTablePlanResponse>, Status> {
let req = request.into_inner().get_plan().cloned()?;

// FIXME: need to fill source id, etc here?
let version = self
.ddl_controller
.run_command(DdlCommand::ReplaceTable(Self::extract_replace_table_info(
Expand Down Expand Up @@ -1018,6 +1018,7 @@ impl DdlService for DdlServiceImpl {
"Start the replace config change")
});
// start the schema change procedure
// FIXME: need to fill source id, etc here?
let replace_res = self
.ddl_controller
.run_command(DdlCommand::ReplaceTable(
Expand Down
66 changes: 0 additions & 66 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -939,24 +939,6 @@ impl DdlController {
.await?;
let job_id = streaming_job.id();

match &mut streaming_job {
StreamingJob::Table(src, table, job_type) => {
// If we're creating a table with connector, we should additionally fill its ID first.
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph);
}
StreamingJob::Source(src) => {
// set the inner source id of source node.
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
source_node.source_inner.as_mut().unwrap().source_id = src.id;
}
});
}
}
_ => {}
}

tracing::debug!(
id = job_id,
definition = streaming_job.definition(),
Expand Down Expand Up @@ -1983,51 +1965,3 @@ impl DdlController {
.await
}
}

/// Fill in necessary information for `Table` stream graph.
/// e.g., fill source id for table with connector, fill external table id for CDC table.
pub fn fill_table_stream_graph_info(
source: &mut Option<PbSource>,
table: &mut PbTable,
table_job_type: TableJobType,
fragment_graph: &mut PbStreamFragmentGraph,
) {
let mut source_count = 0;
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
if source_node.source_inner.is_none() {
// skip empty source for dml node
return;
}

// If we're creating a table with connector, we should additionally fill its ID first.
if let Some(source) = source {
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
);

// Fill in the correct table id for source.
source.optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id));
// Fill in the correct source id for mview.
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id));
}
}

// fill table id for cdc backfill
if let NodeBody::StreamCdcScan(node) = node_body
&& table_job_type == TableJobType::SharedCdcSource
{
if let Some(table_desc) = node.cdc_table_desc.as_mut() {
table_desc.table_id = table.id;
}
}
});
}
}
48 changes: 36 additions & 12 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::hash::VnodeCount;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::stream_graph_visitor;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
use risingwave_connector::WithPropertiesExt;
use risingwave_meta_model::WorkerId;
use risingwave_pb::catalog::Table;
use risingwave_pb::ddl_service::TableJobType;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub(super) struct BuildingFragment {
inner: StreamFragment,

/// The ID of the job if it contains the streaming job node.
table_id: Option<u32>,
job_id: Option<u32>,

/// The required column IDs of each upstream table.
/// Will be converted to indices when building the edge connected to the upstream.
Expand All @@ -82,12 +83,12 @@ impl BuildingFragment {
// Fill the information of the internal tables in the fragment.
Self::fill_internal_tables(&mut fragment, job, table_id_gen);

let table_id = Self::fill_job(&mut fragment, job).then(|| job.id());
let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
let upstream_table_columns = Self::extract_upstream_table_columns(&mut fragment);

Self {
inner: fragment,
table_id,
job_id,
upstream_table_columns,
}
}
Expand Down Expand Up @@ -126,17 +127,17 @@ impl BuildingFragment {

/// Fill the information with the job in the fragment.
fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
let table_id = job.id();
let job_id = job.id();
let fragment_id = fragment.fragment_id;
let mut has_table = false;

stream_graph_visitor::visit_fragment(fragment, |node_body| match node_body {
NodeBody::Materialize(materialize_node) => {
materialize_node.table_id = table_id;
materialize_node.table_id = job_id;

// Fill the ID of the `Table`.
let table = materialize_node.table.as_mut().unwrap();
table.id = table_id;
table.id = job_id;
table.database_id = job.database_id();
table.schema_id = job.schema_id();
table.fragment_id = fragment_id;
Expand All @@ -148,19 +149,42 @@ impl BuildingFragment {
has_table = true;
}
NodeBody::Sink(sink_node) => {
sink_node.sink_desc.as_mut().unwrap().id = table_id;
sink_node.sink_desc.as_mut().unwrap().id = job_id;

has_table = true;
}
NodeBody::Dml(dml_node) => {
dml_node.table_id = table_id;
dml_node.table_id = job_id;
dml_node.table_version_id = job.table_version_id().unwrap();
}
NodeBody::Source(_) => {
NodeBody::Source(source_node) => {
// Notice: Table job has a dumb Source node, we should be careful that `has_table` should not be overwrite to `false`
if !has_table {
has_table = job.is_source_job();
}
if let Some(source_inner) = source_node.source_inner.as_mut() {
source_inner.source_id = job_id;

if source_inner.with_properties.is_mysql_cdc_connector() {
// Generate a random server id for mysql cdc source if needed
// `server.id` (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication
// group (that is, different from any other server id being used by any master or slave)

let props = &mut source_node.source_inner.as_mut().unwrap().with_properties;
let rand_server_id = rand::thread_rng().gen_range(1..u32::MAX);
props
.entry("server.id".to_string())
.or_insert(rand_server_id.to_string());

// make these two `Source` consistent
props.clone_into(&mut source.with_properties);
}
}
}
NodeBody::StreamCdcScan(node) => {
if let Some(table_desc) = node.cdc_table_desc.as_mut() {
table_desc.table_id = job_id;
}
}
_ => {}
});
Expand Down Expand Up @@ -499,7 +523,7 @@ impl StreamFragmentGraph {
pub fn table_fragment_id(&self) -> FragmentId {
self.fragments
.values()
.filter(|b| b.table_id.is_some())
.filter(|b| b.job_id.is_some())
.map(|b| b.fragment_id)
.exactly_one()
.expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
Expand Down Expand Up @@ -1095,7 +1119,7 @@ impl CompleteStreamFragmentGraph {
let internal_tables = building_fragment.extract_internal_tables();
let BuildingFragment {
inner,
table_id,
job_id,
upstream_table_columns: _,
} = building_fragment;

Expand All @@ -1104,7 +1128,7 @@ impl CompleteStreamFragmentGraph {

let materialized_fragment_id =
if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 {
table_id
job_id
} else {
None
};
Expand Down

0 comments on commit 0dab5fe

Please sign in to comment.