Skip to content

Commit 841bcdb

Browse files
committed
feat(meta): let stream actors in fragment share same stream node
1 parent 16121c8 commit 841bcdb

File tree

27 files changed

+555
-402
lines changed

27 files changed

+555
-402
lines changed

proto/meta.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ message TableFragments {
9797
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
9898
// Use `VnodeCountCompat::vnode_count` to access it.
9999
optional uint32 maybe_vnode_count = 8;
100+
101+
stream_plan.StreamNode nodes = 9;
100102
}
101103
// The id of the streaming job.
102104
uint32 table_id = 1;
@@ -218,7 +220,6 @@ message ListTableFragmentsRequest {
218220
message ListTableFragmentsResponse {
219221
message ActorInfo {
220222
uint32 id = 1;
221-
stream_plan.StreamNode node = 2;
222223
repeated stream_plan.Dispatcher dispatcher = 3;
223224
}
224225
message FragmentInfo {
@@ -258,6 +259,7 @@ message ListFragmentDistributionResponse {
258259
uint32 fragment_type_mask = 6;
259260
uint32 parallelism = 7;
260261
uint32 vnode_count = 8;
262+
stream_plan.StreamNode node = 9;
261263
}
262264
repeated FragmentDistribution distributions = 1;
263265
}

proto/stream_plan.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -966,7 +966,7 @@ message StreamActor {
966966

967967
uint32 actor_id = 1;
968968
uint32 fragment_id = 2;
969-
StreamNode nodes = 3;
969+
StreamNode nodes = 3 [deprecated = true];
970970
repeated Dispatcher dispatcher = 4;
971971
// The actors that send messages to this actor.
972972
// Note that upstream actor ids are also stored in the proto of merge nodes.

proto/stream_service.proto

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ message InjectBarrierRequest {
1717
repeated uint32 table_ids_to_sync = 5;
1818
uint32 partial_graph_id = 6;
1919

20+
message FragmentBuildActorInfo {
21+
uint32 fragment_id = 1;
22+
stream_plan.StreamNode node = 2;
23+
repeated BuildActorInfo actors = 3;
24+
}
25+
2026
message BuildActorInfo {
2127
message UpstreamActors {
2228
repeated uint32 actors = 1;
@@ -27,7 +33,7 @@ message InjectBarrierRequest {
2733
}
2834

2935
repeated common.ActorInfo broadcast_info = 8;
30-
repeated BuildActorInfo actors_to_build = 9;
36+
repeated FragmentBuildActorInfo actors_to_build = 9;
3137
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_add = 10;
3238
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11;
3339
}

src/common/src/util/stream_graph_visitor.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,11 @@ use risingwave_pb::stream_plan::stream_node::NodeBody;
1919
use risingwave_pb::stream_plan::{agg_call_state, StreamNode};
2020

2121
/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s recursively.
22-
pub fn visit_stream_node<F>(stream_node: &mut StreamNode, mut f: F)
23-
where
24-
F: FnMut(&mut NodeBody),
25-
{
26-
fn visit_inner<F>(stream_node: &mut StreamNode, f: &mut F)
27-
where
28-
F: FnMut(&mut NodeBody),
29-
{
22+
pub fn visit_stream_node_mut(stream_node: &mut StreamNode, mut f: impl FnMut(&mut NodeBody)) {
23+
visit_stream_node_cont_mut(stream_node, |stream_node| {
3024
f(stream_node.node_body.as_mut().unwrap());
31-
for input in &mut stream_node.input {
32-
visit_inner(input, f);
33-
}
34-
}
35-
36-
visit_inner(stream_node, &mut f)
25+
true
26+
})
3727
}
3828

3929
/// A utility for to accessing the [`StreamNode`] mutably. The returned bool is used to determine whether the access needs to continue.
@@ -56,6 +46,14 @@ where
5646
visit_inner(stream_node, &mut f)
5747
}
5848

49+
/// A utility for visiting the [`NodeBody`] of the [`StreamNode`]s recursively.
50+
pub fn visit_stream_node(stream_node: &StreamNode, mut f: impl FnMut(&NodeBody)) {
51+
visit_stream_node_cont(stream_node, |stream_node| {
52+
f(stream_node.node_body.as_ref().unwrap());
53+
true
54+
})
55+
}
56+
5957
/// A utility for to accessing the [`StreamNode`] immutably. The returned bool is used to determine whether the access needs to continue.
6058
pub fn visit_stream_node_cont<F>(stream_node: &StreamNode, mut f: F)
6159
where
@@ -78,11 +76,12 @@ where
7876

7977
/// A utility for visiting and mutating the [`NodeBody`] of the [`StreamNode`]s in a
8078
/// [`StreamFragment`] recursively.
81-
pub fn visit_fragment<F>(fragment: &mut StreamFragment, f: F)
82-
where
83-
F: FnMut(&mut NodeBody),
84-
{
85-
visit_stream_node(fragment.node.as_mut().unwrap(), f)
79+
pub fn visit_fragment_mut(fragment: &mut StreamFragment, f: impl FnMut(&mut NodeBody)) {
80+
visit_stream_node_mut(fragment.node.as_mut().unwrap(), f)
81+
}
82+
83+
pub fn visit_fragment(fragment: &StreamFragment, f: impl FnMut(&NodeBody)) {
84+
visit_stream_node(fragment.node.as_ref().unwrap(), f)
8685
}
8786

8887
/// Visit the tables of a [`StreamNode`].
@@ -279,7 +278,7 @@ pub fn visit_stream_node_tables_inner<F>(
279278
}
280279
};
281280
if visit_child_recursively {
282-
visit_stream_node(stream_node, visit_body)
281+
visit_stream_node_mut(stream_node, visit_body)
283282
} else {
284283
visit_body(stream_node.node_body.as_mut().unwrap())
285284
}

src/frontend/src/catalog/system_catalog/rw_catalog/rw_actor_infos.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ struct RwActorInfo {
2525
#[primary_key]
2626
actor_id: i32,
2727
fragment_id: i32,
28-
node: JsonbVal,
2928
dispatcher: JsonbVal,
3029
}
3130

@@ -47,7 +46,6 @@ async fn read_rw_actors(reader: &SysCatalogReaderImpl) -> Result<Vec<RwActorInfo
4746
fragment.actors.into_iter().map(move |actor| RwActorInfo {
4847
actor_id: actor.id as _,
4948
fragment_id: fragment_id as _,
50-
node: json!(actor.node).into(),
5149
dispatcher: json!(actor.dispatcher).into(),
5250
})
5351
})

src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use risingwave_common::types::Fields;
15+
use risingwave_common::types::{Fields, JsonbVal};
1616
use risingwave_frontend_macro::system_catalog;
1717
use risingwave_pb::stream_plan::FragmentTypeFlag;
18+
use serde_json::json;
1819

1920
use crate::catalog::system_catalog::SysCatalogReaderImpl;
2021
use crate::error::Result;
@@ -30,6 +31,7 @@ struct RwFragment {
3031
flags: Vec<String>,
3132
parallelism: i32,
3233
max_parallelism: i32,
34+
node: JsonbVal,
3335
}
3436

3537
pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec<FragmentTypeFlag> {
@@ -73,6 +75,7 @@ async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragmen
7375
.collect(),
7476
parallelism: distribution.parallelism as i32,
7577
max_parallelism: distribution.vnode_count as i32,
78+
node: json!(distribution.node).into(),
7679
})
7780
.collect())
7881
}

src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,8 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
112112
"rw_catalog.rw_sink_decouple",
113113
"WITH decoupled_sink_internal_table_ids AS (
114114
SELECT
115-
distinct (node->'sink'->'table'->'id')::int as internal_table_id
116-
FROM rw_catalog.rw_actor_infos actor
117-
JOIN
118-
rw_catalog.rw_fragments fragment
119-
ON actor.fragment_id = fragment.fragment_id
115+
(node->'sink'->'table'->'id')::int as internal_table_id
116+
FROM rw_catalog.rw_fragments
120117
WHERE
121118
'SINK' = any(flags)
122119
AND

src/meta/service/src/stream_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ impl StreamManagerService for StreamServiceImpl {
226226
.into_iter()
227227
.map(|actor| ActorInfo {
228228
id: actor.actor_id,
229-
node: actor.nodes,
230229
dispatcher: actor.dispatcher,
231230
})
232231
.collect_vec(),
@@ -309,6 +308,7 @@ impl StreamManagerService for StreamServiceImpl {
309308
fragment_type_mask: fragment_desc.fragment_type_mask as _,
310309
parallelism: fragment_desc.parallelism as _,
311310
vnode_count: fragment_desc.vnode_count as _,
311+
node: Some(fragment_desc.stream_node.to_protobuf()),
312312
},
313313
)
314314
.collect_vec();

src/meta/src/barrier/checkpoint/control.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,14 +1009,9 @@ impl DatabaseCheckpointControl {
10091009
"must not set previously"
10101010
);
10111011
}
1012-
for stream_actor in info
1013-
.stream_job_fragments
1014-
.fragments
1015-
.values_mut()
1016-
.flat_map(|fragment| fragment.actors.iter_mut())
1017-
{
1012+
for fragment in info.stream_job_fragments.fragments.values_mut() {
10181013
fill_snapshot_backfill_epoch(
1019-
stream_actor.nodes.as_mut().expect("should exist"),
1014+
fragment.nodes.as_mut().expect("should exist"),
10201015
&snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch,
10211016
)?;
10221017
}

src/meta/src/barrier/checkpoint/creating_job/status.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::mem::take;
1818

1919
use risingwave_common::hash::ActorId;
2020
use risingwave_common::util::epoch::Epoch;
21-
use risingwave_meta_model::WorkerId;
2221
use risingwave_pb::hummock::HummockVersionStats;
2322
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
2423
use risingwave_pb::stream_service::barrier_complete_response::{
@@ -28,7 +27,7 @@ use tracing::warn;
2827

2928
use crate::barrier::progress::CreateMviewProgressTracker;
3029
use crate::barrier::{BarrierInfo, BarrierKind, TracedEpoch};
31-
use crate::model::StreamActorWithUpstreams;
30+
use crate::model::StreamJobActorsToCreate;
3231

3332
#[derive(Debug)]
3433
pub(super) struct CreateMviewLogStoreProgressTracker {
@@ -110,7 +109,7 @@ pub(super) enum CreatingStreamingJobStatus {
110109
pending_non_checkpoint_barriers: Vec<u64>,
111110
/// Info of the first barrier: (`actors_to_create`, `mutation`)
112111
/// Take the mutation out when injecting the first barrier
113-
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActorWithUpstreams>>, Mutation)>,
112+
initial_barrier_info: Option<(StreamJobActorsToCreate, Mutation)>,
114113
},
115114
/// The creating job is consuming log store.
116115
///
@@ -126,7 +125,7 @@ pub(super) enum CreatingStreamingJobStatus {
126125

127126
pub(super) struct CreatingJobInjectBarrierInfo {
128127
pub barrier_info: BarrierInfo,
129-
pub new_actors: Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>>,
128+
pub new_actors: Option<StreamJobActorsToCreate>,
130129
pub mutation: Option<Mutation>,
131130
}
132131

@@ -252,10 +251,7 @@ impl CreatingStreamingJobStatus {
252251
pub(super) fn new_fake_barrier(
253252
prev_epoch_fake_physical_time: &mut u64,
254253
pending_non_checkpoint_barriers: &mut Vec<u64>,
255-
initial_barrier_info: &mut Option<(
256-
HashMap<WorkerId, Vec<StreamActorWithUpstreams>>,
257-
Mutation,
258-
)>,
254+
initial_barrier_info: &mut Option<(StreamJobActorsToCreate, Mutation)>,
259255
is_checkpoint: bool,
260256
) -> CreatingJobInjectBarrierInfo {
261257
{

src/meta/src/barrier/command.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,16 @@ use risingwave_pb::stream_plan::{
4242
use risingwave_pb::stream_service::BarrierCompleteResponse;
4343
use tracing::warn;
4444

45-
use super::info::{CommandFragmentChanges, InflightStreamingJobInfo};
45+
use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
4646
use crate::barrier::info::BarrierInfo;
4747
use crate::barrier::utils::collect_resp_info;
4848
use crate::barrier::InflightSubscriptionInfo;
4949
use crate::controller::fragment::InflightFragmentInfo;
5050
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
5151
use crate::manager::{StreamingJob, StreamingJobType};
5252
use crate::model::{
53-
ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobFragments,
53+
ActorId, DispatcherId, FragmentId, StreamActorWithUpstreams, StreamJobActorsToCreate,
54+
StreamJobFragments,
5455
};
5556
use crate::stream::{
5657
build_actor_connector_splits, JobReschedulePostUpdates, SplitAssignment, ThrottleConfig,
@@ -121,6 +122,8 @@ impl ReplaceStreamJobPlan {
121122
let fragment_change = CommandFragmentChanges::NewFragment(
122123
self.streaming_job.id().into(),
123124
InflightFragmentInfo {
125+
fragment_id: fragment.fragment_id,
126+
nodes: fragment.nodes.clone().unwrap(),
124127
actors: fragment
125128
.actors
126129
.iter()
@@ -207,6 +210,8 @@ impl CreateStreamingJobCommandInfo {
207210
(
208211
fragment.fragment_id,
209212
InflightFragmentInfo {
213+
fragment_id: fragment.fragment_id,
214+
nodes: fragment.nodes.clone().unwrap(),
210215
actors: fragment
211216
.actors
212217
.iter()
@@ -954,7 +959,10 @@ impl Command {
954959
mutation
955960
}
956961

957-
pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActorWithUpstreams>>> {
962+
pub fn actors_to_create(
963+
&self,
964+
graph_info: &InflightDatabaseInfo,
965+
) -> Option<StreamJobActorsToCreate> {
958966
match self {
959967
Command::CreateStreamingJob { info, job_type } => {
960968
let mut map = match job_type {
@@ -973,13 +981,25 @@ impl Command {
973981
Some(map)
974982
}
975983
Command::RescheduleFragment { reschedules, .. } => {
976-
let mut map: HashMap<WorkerId, Vec<_>> = HashMap::new();
977-
for (actor, status) in reschedules
978-
.values()
979-
.flat_map(|reschedule| reschedule.newly_created_actors.iter())
984+
let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
985+
for (fragment_id, actor, status) in
986+
reschedules.iter().flat_map(|(fragment_id, reschedule)| {
987+
reschedule
988+
.newly_created_actors
989+
.iter()
990+
.map(|(actors, status)| (*fragment_id, actors, status))
991+
})
980992
{
981993
let worker_id = status.location.as_ref().unwrap().worker_node_id as _;
982-
map.entry(worker_id).or_default().push(actor.clone());
994+
map.entry(worker_id)
995+
.or_default()
996+
.entry(fragment_id)
997+
.or_insert_with(|| {
998+
let node = graph_info.fragment(fragment_id).nodes.clone();
999+
(node, vec![])
1000+
})
1001+
.1
1002+
.push(actor.clone());
9831003
}
9841004
Some(map)
9851005
}

src/meta/src/barrier/info.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,16 @@ impl InflightDatabaseInfo {
8282
pub fn contains_job(&self, job_id: TableId) -> bool {
8383
self.jobs.contains_key(&job_id)
8484
}
85+
86+
pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
87+
let job_id = self.fragment_location[&fragment_id];
88+
self.jobs
89+
.get(&job_id)
90+
.expect("should exist")
91+
.fragment_infos
92+
.get(&fragment_id)
93+
.expect("should exist")
94+
}
8595
}
8696

8797
impl InflightDatabaseInfo {

0 commit comments

Comments
 (0)