Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): simplify stream job table/source id assignment #19171

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ c583e2c6c054764249acf484438c7bf7197765f4

# chore: replace all ProstXxx with PbXxx (#8621)
6fd8821f2e053957b183d648bea9c95b6703941f

# chore: cleanup v2 naming for sql metastore (#18941)
9a6a7f9052d5679165ff57cc01417c742c95351c
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ message StreamSourceInfo {
}

message Source {
// For shared source, this is the same as the job id.
// For non-shared source and table with connector, this is a different oid.
uint32 id = 1;
uint32 schema_id = 2;
uint32 database_id = 3;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use risingwave_sqlparser::ast::{
use risingwave_sqlparser::parser::Parser;

use super::create_source::get_json_schema_location;
use super::create_table::{generate_stream_graph_for_table, ColumnIdGenerator};
use super::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
Expand Down Expand Up @@ -192,7 +192,7 @@ pub async fn get_replace_table_plan(
panic!("unexpected statement type: {:?}", definition);
};

let (mut graph, table, source, job_type) = generate_stream_graph_for_table(
let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
session,
table_name,
original_catalog,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{rewrite_now_to_proctime, ExprImpl, InputRef};
use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::create_mv::parse_column_names;
use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator};
use crate::handler::create_table::{generate_stream_graph_for_replace_table, ColumnIdGenerator};
use crate::handler::privilege::resolve_query_privileges;
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::HandlerArgs;
Expand Down Expand Up @@ -672,7 +672,7 @@ pub(crate) async fn reparse_table_for_sink(
panic!("unexpected statement type: {:?}", definition);
};

let (graph, table, source, _) = generate_stream_graph_for_table(
let (graph, table, source, _) = generate_stream_graph_for_replace_table(
session,
table_name,
table_catalog,
Expand Down
18 changes: 12 additions & 6 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_connector::source::cdc::external::{
ExternalTableConfig, ExternalTableImpl, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::{source, WithOptionsSecResolved};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, Table, WatermarkDesc};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
Expand Down Expand Up @@ -1322,7 +1323,7 @@ pub fn check_create_table_with_source(
}

#[allow(clippy::too_many_arguments)]
pub async fn generate_stream_graph_for_table(
pub async fn generate_stream_graph_for_replace_table(
_session: &Arc<SessionImpl>,
table_name: ObjectName,
original_catalog: &Arc<TableCatalog>,
Expand All @@ -1341,7 +1342,7 @@ pub async fn generate_stream_graph_for_table(
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>, TableJobType)> {
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;

let ((plan, source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
let ((plan, mut source, table), job_type) = match (format_encode, cdc_table_info.as_ref()) {
(Some(format_encode), None) => (
gen_create_table_plan_with_source(
handler_args,
Expand Down Expand Up @@ -1441,13 +1442,18 @@ pub async fn generate_stream_graph_for_table(
let graph = build_graph(plan)?;

// Fill the original table ID.
let table = Table {
let mut table = Table {
id: original_catalog.id().table_id(),
optional_associated_source_id: original_catalog
.associated_source_id()
.map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
..table
};
if let Some(source_id) = original_catalog.associated_source_id() {
table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(
source_id.table_id,
));
source.as_mut().unwrap().id = source_id.table_id;
source.as_mut().unwrap().optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id))
Comment on lines +1454 to +1455
Copy link
Member Author

@xxchan xxchan Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange not to set optional_associated_table_id here.


Note: for replace table, the Source catalog here were both created again. It looks error-prone, but do not intend to change it.

}

Ok((graph, table, source, job_type))
}
Expand Down
37 changes: 18 additions & 19 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info;
use risingwave_meta::rpc::metrics::MetaMetrics;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{Comment, CreateType, Secret, Table};
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::WorkerType;
Expand Down Expand Up @@ -84,27 +82,28 @@ impl DdlServiceImpl {
}
}

fn extract_replace_table_info(change: ReplaceTablePlan) -> ReplaceTableInfo {
let job_type = change.get_job_type().unwrap_or_default();
let mut source = change.source;
let mut fragment_graph = change.fragment_graph.unwrap();
let mut table = change.table.unwrap();
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) =
table.optional_associated_source_id
{
source.as_mut().unwrap().id = source_id;
fill_table_stream_graph_info(&mut source, &mut table, job_type, &mut fragment_graph);
}
let table_col_index_mapping = change
.table_col_index_mapping
Comment on lines -87 to -99
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: extract_replace_table_info is used in: create/drop sink & table schema change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some work is done in fill_job, and some is done in frontend generate_stream_graph_for_replace_table

fn extract_replace_table_info(
ReplaceTablePlan {
table,
fragment_graph,
table_col_index_mapping,
source,
job_type,
}: ReplaceTablePlan,
) -> ReplaceTableInfo {
let table = table.unwrap();
let col_index_mapping = table_col_index_mapping
.as_ref()
.map(ColIndexMapping::from_protobuf);

let stream_job = StreamingJob::Table(source, table, job_type);
ReplaceTableInfo {
streaming_job: stream_job,
fragment_graph,
col_index_mapping: table_col_index_mapping,
streaming_job: StreamingJob::Table(
source,
table,
TableJobType::try_from(job_type).unwrap(),
),
fragment_graph: fragment_graph.unwrap(),
col_index_mapping,
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,4 @@ impl StreamingJob {
StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
}
}

pub fn is_source_job(&self) -> bool {
matches!(self, StreamingJob::Source(_))
}
}
76 changes: 4 additions & 72 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::secret::SecretEncryption;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::{
visit_fragment, visit_stream_node, visit_stream_node_cont_mut,
visit_stream_node, visit_stream_node_cont_mut,
};
use risingwave_common::{bail, hash, must_match};
use risingwave_connector::error::ConnectorError;
Expand All @@ -40,11 +40,9 @@ use risingwave_meta_model::{
ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId,
SubscriptionId, TableId, UserId, ViewId,
};
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, Schema, Secret,
Sink, Source, Subscription, Table, View,
Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
Subscription, Table, View,
};
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
Expand Down Expand Up @@ -903,7 +901,7 @@ impl DdlController {
pub async fn create_streaming_job(
&self,
mut streaming_job: StreamingJob,
mut fragment_graph: StreamFragmentGraphProto,
fragment_graph: StreamFragmentGraphProto,
affected_table_replace_info: Option<ReplaceTableInfo>,
) -> MetaResult<NotificationVersion> {
let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
Expand All @@ -918,24 +916,6 @@ impl DdlController {
.await?;
let job_id = streaming_job.id();

match &mut streaming_job {
StreamingJob::Table(src, table, job_type) => {
// If we're creating a table with connector, we should additionally fill its ID first.
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph);
}
StreamingJob::Source(src) => {
// set the inner source id of source node.
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
source_node.source_inner.as_mut().unwrap().source_id = src.id;
}
});
}
}
_ => {}
Comment on lines -921 to -936
Copy link
Member Author

@xxchan xxchan Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do it in fill_job instead. (They already have some overlaps)

}

tracing::debug!(
id = job_id,
definition = streaming_job.definition(),
Expand Down Expand Up @@ -1951,51 +1931,3 @@ impl DdlController {
.await
}
}

/// Fill in necessary information for `Table` stream graph.
/// e.g., fill source id for table with connector, fill external table id for CDC table.
pub fn fill_table_stream_graph_info(
source: &mut Option<PbSource>,
table: &mut PbTable,
table_job_type: TableJobType,
fragment_graph: &mut PbStreamFragmentGraph,
) {
let mut source_count = 0;
for fragment in fragment_graph.fragments.values_mut() {
visit_fragment(fragment, |node_body| {
if let NodeBody::Source(source_node) = node_body {
if source_node.source_inner.is_none() {
// skip empty source for dml node
return;
}

// If we're creating a table with connector, we should additionally fill its ID first.
if let Some(source) = source {
source_node.source_inner.as_mut().unwrap().source_id = source.id;
source_count += 1;

assert_eq!(
source_count, 1,
"require exactly 1 external stream source when creating table with a connector"
);

// Fill in the correct table id for source.
source.optional_associated_table_id =
Some(OptionalAssociatedTableId::AssociatedTableId(table.id));
// Fill in the correct source id for mview.
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source.id));
}
}

// fill table id for cdc backfill
if let NodeBody::StreamCdcScan(node) = node_body
&& table_job_type == TableJobType::SharedCdcSource
{
if let Some(table_desc) = node.cdc_table_desc.as_mut() {
table_desc.table_id = table.id;
}
}
});
}
}
1 change: 1 addition & 0 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,7 @@ impl SourceManager {

/// create and register connector worker for source.
pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
tracing::debug!("register_source: {}", source.get_id());
let mut core = self.core.lock().await;
if let Entry::Vacant(e) = core.managed_sources.entry(source.get_id() as _) {
let handle = create_source_worker_handle(source, self.metrics.clone())
Expand Down
Loading
Loading