-
Notifications
You must be signed in to change notification settings - Fork 577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(meta): simplify stream job table/source id assignment #19171
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
0dab5fe
to
016a1e5
Compare
a568646
to
3f64c85
Compare
016a1e5
to
2c7a812
Compare
3f64c85
to
0399933
Compare
2c7a812
to
744d680
Compare
fa620ec
to
9f4c672
Compare
744d680
to
2404113
Compare
9f4c672
to
2cab4a1
Compare
2404113
to
ae16bf3
Compare
ae16bf3
to
6de33ff
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename table -> job
2cab4a1
to
f59bc27
Compare
6de33ff
to
0e24225
Compare
0e24225
to
f33a42a
Compare
d43598b
to
0012e27
Compare
…ub.com/risingwavelabs/risingwave/blob/193e93fd8d9f9dbae717fe6a5b411e7f33382f27/src/meta/src/controller/streaming_job.rs#L247-L251 - stream node: filled in fill_job Signed-off-by: xxchan <xxchan22f@gmail.com>
0012e27
to
613044f
Compare
NodeBody::Source(source_node) => { | ||
match job { | ||
// Note: For table without connector, it has a dumb Source node. | ||
// Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog. | ||
StreamingJob::Table(source, _table, _table_job_type) => { | ||
if let Some(source_inner) = source_node.source_inner.as_mut() { | ||
if let Some(source) = source { | ||
debug_assert_ne!(source.id, job_id); | ||
source_inner.source_id = source.id; | ||
} | ||
} | ||
} | ||
StreamingJob::Source(source) => { | ||
has_job = true; | ||
if let Some(source_inner) = source_node.source_inner.as_mut() { | ||
debug_assert_eq!(source.id, job_id); | ||
source_inner.source_id = source.id; | ||
} | ||
} | ||
// For other job types, no need to fill the source id, since it refers to an existing source. | ||
_ => {} | ||
} | ||
} | ||
NodeBody::StreamCdcScan(node) => { | ||
if let Some(table_desc) = node.cdc_table_desc.as_mut() { | ||
table_desc.table_id = job_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This replaces the code in create_streaming_job
match &mut streaming_job { | ||
StreamingJob::Table(src, table, job_type) => { | ||
// If we're creating a table with connector, we should additionally fill its ID first. | ||
fill_table_stream_graph_info(src, table, *job_type, &mut fragment_graph); | ||
} | ||
StreamingJob::Source(src) => { | ||
// set the inner source id of source node. | ||
for fragment in fragment_graph.fragments.values_mut() { | ||
visit_fragment(fragment, |node_body| { | ||
if let NodeBody::Source(source_node) = node_body { | ||
source_node.source_inner.as_mut().unwrap().source_id = src.id; | ||
} | ||
}); | ||
} | ||
} | ||
_ => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do it in fill_job
instead. (They already have some overlaps)
fn extract_replace_table_info(change: ReplaceTablePlan) -> ReplaceTableInfo { | ||
let job_type = change.get_job_type().unwrap_or_default(); | ||
let mut source = change.source; | ||
let mut fragment_graph = change.fragment_graph.unwrap(); | ||
let mut table = change.table.unwrap(); | ||
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = | ||
table.optional_associated_source_id | ||
{ | ||
source.as_mut().unwrap().id = source_id; | ||
fill_table_stream_graph_info(&mut source, &mut table, job_type, &mut fragment_graph); | ||
} | ||
let table_col_index_mapping = change | ||
.table_col_index_mapping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: extract_replace_table_info
is used in: create/drop sink & table schema change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some work is done in fill_job
, and some is done in frontend generate_stream_graph_for_replace_table
source.as_mut().unwrap().optional_associated_table_id = | ||
Some(OptionalAssociatedTableId::AssociatedTableId(table.id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's strange not to set optional_associated_table_id
here.
Note: for replace table, the Source
catalog here were both created again. It looks error-prone, but do not intend to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate the work done in this PR👍
Co-authored-by: Bugen Zhao <i@bugenzhao.com>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
We fill table_id/source_id in several places, which seems unnecessary. This PR tries to remove unnecessary codes to make it more understandable and maintainable.
We leave it with 2 steps:
create_job_catalog
, we assign id toStreamingJob
StreamFragmentGraph::new
(fill_job
), we traverse the nodes, and set the id to corresponding plan nodes. (Note that previously we traverse the nodes multiple times)Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.