From 64454cc19daaf752d77d93bfcc1c23d973fc3b25 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 4 Dec 2025 10:01:40 +0000 Subject: [PATCH 1/3] Initial plan From c5ad54bf736c0141e8de0ccc4bd85f2c88633fb5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 4 Dec 2025 10:18:10 +0000 Subject: [PATCH 2/3] Refactor ALTER FRAGMENT RATE_LIMIT to support specific rate limit types - Change AlterFragmentOperation enum to support BACKFILL_RATE_LIMIT, SOURCE_RATE_LIMIT, DML_RATE_LIMIT, and SINK_RATE_LIMIT - Update parser to parse new rate limit types - Update handler to handle new rate limit operations - Update e2e test to use new BACKFILL_RATE_LIMIT syntax - Add unit tests for new rate limit parsing Co-authored-by: kwannoel <47273164+kwannoel@users.noreply.github.com> --- .../streaming/rate_limit/fragment.slt | 2 +- src/frontend/src/handler/mod.rs | 7 +- src/sqlparser/src/ast/ddl.rs | 16 ++++- src/sqlparser/src/parser.rs | 33 +++------ src/sqlparser/tests/sqlparser_common.rs | 71 +++++++++++++++++++ 5 files changed, 102 insertions(+), 27 deletions(-) diff --git a/e2e_test/slow_tests/streaming/rate_limit/fragment.slt b/e2e_test/slow_tests/streaming/rate_limit/fragment.slt index f5e3fb6ea726b..a8b83665ca7e3 100644 --- a/e2e_test/slow_tests/streaming/rate_limit/fragment.slt +++ b/e2e_test/slow_tests/streaming/rate_limit/fragment.slt @@ -38,7 +38,7 @@ skipif madsim system ok fragment_id=$(PGUSER=root psql -h ${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS} -p ${RISEDEV_RW_FRONTEND_PORT} -d dev -t -c "select fragment_id from rw_fragments where flags @> ARRAY['STREAM_SCAN']") echo "fragment_id:" $fragment_id -PGUSER=root psql -h ${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS} -p ${RISEDEV_RW_FRONTEND_PORT} -d dev -c "ALTER FRAGMENT $fragment_id SET RATE_LIMIT = default;" +PGUSER=root psql -h ${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS} -p ${RISEDEV_RW_FRONTEND_PORT} -d dev -c "ALTER FRAGMENT $fragment_id SET BACKFILL_RATE_LIMIT = default;" statement ok diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 0efdf985672d4..e0cb6e359a9af 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -1355,10 +1355,13 @@ pub async fn handle( fragment_ids, operation, } => match operation { - AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => { + AlterFragmentOperation::SetBackfillRateLimit { rate_limit } + | AlterFragmentOperation::SetSourceRateLimit { rate_limit } + | AlterFragmentOperation::SetDmlRateLimit { rate_limit } + | AlterFragmentOperation::SetSinkRateLimit { rate_limit } => { let [fragment_id] = fragment_ids.as_slice() else { return Err(ErrorCode::InvalidInputSyntax( - "ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id" + "ALTER FRAGMENT ... SET rate limit supports exactly one fragment id" .to_owned(), ) .into()); diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 5a09d4a8deddd..42fcee77a6d00 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -304,7 +304,10 @@ pub enum AlterSecretOperation { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum AlterFragmentOperation { - AlterBackfillRateLimit { rate_limit: i32 }, + SetBackfillRateLimit { rate_limit: i32 }, + SetSourceRateLimit { rate_limit: i32 }, + SetDmlRateLimit { rate_limit: i32 }, + SetSinkRateLimit { rate_limit: i32 }, SetParallelism { parallelism: SetVariableValue }, } @@ -729,9 +732,18 @@ impl fmt::Display for AlterColumnOperation { impl fmt::Display for AlterFragmentOperation { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => { + AlterFragmentOperation::SetBackfillRateLimit { rate_limit } => { write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit) } + AlterFragmentOperation::SetSourceRateLimit { rate_limit } => { + write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit) + } + AlterFragmentOperation::SetDmlRateLimit { rate_limit } => { + write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit) + } + AlterFragmentOperation::SetSinkRateLimit { rate_limit } => { + write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit) + } AlterFragmentOperation::SetParallelism { parallelism } => { write!(f, "SET PARALLELISM TO {}", parallelism) } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 51737128906a7..922f08110addd 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3864,9 +3864,18 @@ impl Parser<'_> { } let parallelism = self.parse_set_variable()?; AlterFragmentOperation::SetParallelism { parallelism } + } else if let Some(rate_limit) = self.parse_alter_backfill_rate_limit()? { + AlterFragmentOperation::SetBackfillRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(false)? { + AlterFragmentOperation::SetSourceRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_dml_rate_limit()? { + AlterFragmentOperation::SetDmlRateLimit { rate_limit } + } else if let Some(rate_limit) = self.parse_alter_sink_rate_limit()? { + AlterFragmentOperation::SetSinkRateLimit { rate_limit } } else { - let rate_limit = self.parse_alter_fragment_rate_limit()?; - AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } + return self.expected( + "PARALLELISM/BACKFILL_RATE_LIMIT/SOURCE_RATE_LIMIT/DML_RATE_LIMIT/SINK_RATE_LIMIT after SET", + ); }; Ok(Statement::AlterFragment { fragment_ids, @@ -3874,26 +3883,6 @@ impl Parser<'_> { }) } - fn parse_alter_fragment_rate_limit(&mut self) -> ModalResult { - if !self.parse_word("RATE_LIMIT") { - return self.expected("expected RATE_LIMIT after SET"); - } - if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() { - return self.expected("TO or = after RATE_LIMIT"); - } - let rate_limit = if self.parse_keyword(Keyword::DEFAULT) { - -1 - } else { - let s = self.parse_number_value()?; - if let Ok(n) = s.parse::() { - n - } else { - return self.expected("number or DEFAULT"); - } - }; - Ok(rate_limit) - } - /// Parse a copy statement pub fn parse_copy(&mut self) -> ModalResult { let entity = if self.consume_token(&Token::LParen) { diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index e1f37dc6623f8..7f029ab5b649e 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -4198,3 +4198,74 @@ fn parse_alter_fragment_set_parallelism() { _ => panic!("unexpected statement kind"), } } + +#[test] +fn parse_alter_fragment_set_rate_limits() { + // Test BACKFILL_RATE_LIMIT + match verified_stmt("ALTER FRAGMENT 1 SET BACKFILL_RATE_LIMIT TO 100") { + Statement::AlterFragment { + fragment_ids, + operation, + } => { + assert_eq!(fragment_ids, vec![1]); + match operation { + AlterFragmentOperation::SetBackfillRateLimit { rate_limit } => { + assert_eq!(rate_limit, 100); + } + _ => panic!("unexpected alter fragment operation"), + } + } + _ => panic!("unexpected statement kind"), + } + + // Test SOURCE_RATE_LIMIT + match verified_stmt("ALTER FRAGMENT 2 SET SOURCE_RATE_LIMIT TO 200") { + Statement::AlterFragment { + fragment_ids, + operation, + } => { + assert_eq!(fragment_ids, vec![2]); + match operation { + AlterFragmentOperation::SetSourceRateLimit { rate_limit } => { + assert_eq!(rate_limit, 200); + } + _ => panic!("unexpected alter fragment operation"), + } + } + _ => panic!("unexpected statement kind"), + } + + // Test DML_RATE_LIMIT + match verified_stmt("ALTER FRAGMENT 3 SET DML_RATE_LIMIT TO 300") { + Statement::AlterFragment { + fragment_ids, + operation, + } => { + assert_eq!(fragment_ids, vec![3]); + match operation { + AlterFragmentOperation::SetDmlRateLimit { rate_limit } => { + assert_eq!(rate_limit, 300); + } + _ => panic!("unexpected alter fragment operation"), + } + } + _ => panic!("unexpected statement kind"), + } + + // Test SINK_RATE_LIMIT + match verified_stmt("ALTER FRAGMENT 4 SET SINK_RATE_LIMIT TO 400") { + Statement::AlterFragment { + fragment_ids, + operation, + } => { + assert_eq!(fragment_ids, vec![4]); + match operation { + AlterFragmentOperation::SetSinkRateLimit { rate_limit } => { + assert_eq!(rate_limit, 400); + } + _ => panic!("unexpected alter fragment operation"), + } + } + _ => panic!("unexpected statement kind"), + } +} From fe2324b287607ba940ab8e4d6c0bd10405a2103e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 10:50:00 +0000 Subject: [PATCH 3/3] Add ThrottleType field to distinguish rate limit types for fragments - Add ThrottleType enum to proto (BACKFILL, SOURCE_RATE_LIMIT, DML, SINK_RATE_LIMIT) - Update ApplyThrottleRequest to include optional throttle_type field - Update meta_client.apply_throttle to accept throttle_type parameter - Update frontend handler to pass specific throttle_type for each fragment rate limit operation - Add update_fragment_rate_limit_by_fragment_id_and_type in catalog controller - Update meta service to use throttle_type to apply rate limit to specific node types only - This allows fragments with multiple rate limit types (e.g. sink + backfill) to be distinguished Co-authored-by: kwannoel <47273164+kwannoel@users.noreply.github.com> --- proto/meta.proto | 10 +++ .../src/handler/alter_streaming_rate_limit.rs | 5 +- src/frontend/src/handler/mod.rs | 60 ++++++++++++++- src/frontend/src/meta_client.rs | 4 +- src/frontend/src/test_utils.rs | 1 + src/meta/service/src/stream_service.rs | 10 ++- src/meta/src/controller/streaming_job.rs | 77 +++++++++++++++++++ src/meta/src/manager/metadata.rs | 16 ++++ src/rpc_client/src/meta_client.rs | 2 + 9 files changed, 177 insertions(+), 8 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 317758355b67e..d06fed47f9b2e 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -357,10 +357,20 @@ enum ThrottleTarget { FRAGMENT = 7; } +enum ThrottleType { + THROTTLE_TYPE_UNSPECIFIED = 0; + BACKFILL = 1; + SOURCE_RATE_LIMIT = 2; + DML = 3; + SINK_RATE_LIMIT = 4; +} + message ApplyThrottleRequest { ThrottleTarget kind = 1; uint32 id = 2; optional uint32 rate = 3; + // For FRAGMENT throttle target, this specifies the type of rate limit to apply + optional ThrottleType throttle_type = 4; } message ApplyThrottleResponse { diff --git a/src/frontend/src/handler/alter_streaming_rate_limit.rs b/src/frontend/src/handler/alter_streaming_rate_limit.rs index 7f0ae2cf37eec..a0014b225ccc6 100644 --- a/src/frontend/src/handler/alter_streaming_rate_limit.rs +++ b/src/frontend/src/handler/alter_streaming_rate_limit.rs @@ -108,7 +108,7 @@ pub async fn handle_alter_streaming_rate_limit( } _ => bail!("Unsupported throttle target: {:?}", kind), }; - handle_alter_streaming_rate_limit_by_id(&session, kind, id, rate_limit, stmt_type).await + handle_alter_streaming_rate_limit_by_id(&session, kind, id, rate_limit, stmt_type, None).await } pub async fn handle_alter_streaming_rate_limit_by_id( @@ -117,6 +117,7 @@ pub async fn handle_alter_streaming_rate_limit_by_id( id: u32, rate_limit: i32, stmt_type: StatementType, + throttle_type: Option, ) -> Result { let meta_client = session.env().meta_client(); @@ -126,7 +127,7 @@ pub async fn handle_alter_streaming_rate_limit_by_id( Some(rate_limit as u32) }; - meta_client.apply_throttle(kind, id, rate_limit).await?; + meta_client.apply_throttle(kind, id, rate_limit, throttle_type).await?; Ok(PgResponse::empty_result(stmt_type)) } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index e0cb6e359a9af..59a943b8085c2 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -1355,10 +1355,7 @@ pub async fn handle( fragment_ids, operation, } => match operation { - AlterFragmentOperation::SetBackfillRateLimit { rate_limit } - | AlterFragmentOperation::SetSourceRateLimit { rate_limit } - | AlterFragmentOperation::SetDmlRateLimit { rate_limit } - | AlterFragmentOperation::SetSinkRateLimit { rate_limit } => { + AlterFragmentOperation::SetBackfillRateLimit { rate_limit } => { let [fragment_id] = fragment_ids.as_slice() else { return Err(ErrorCode::InvalidInputSyntax( "ALTER FRAGMENT ... SET rate limit supports exactly one fragment id" @@ -1372,6 +1369,61 @@ pub async fn handle( *fragment_id, rate_limit, StatementType::SET_VARIABLE, + Some(risingwave_pb::meta::ThrottleType::Backfill), + ) + .await + } + AlterFragmentOperation::SetSourceRateLimit { rate_limit } => { + let [fragment_id] = fragment_ids.as_slice() else { + return Err(ErrorCode::InvalidInputSyntax( + "ALTER FRAGMENT ... SET rate limit supports exactly one fragment id" + .to_owned(), + ) + .into()); + }; + alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id( + &handler_args.session, + PbThrottleTarget::Fragment, + *fragment_id, + rate_limit, + StatementType::SET_VARIABLE, + Some(risingwave_pb::meta::ThrottleType::SourceRateLimit), + ) + .await + } + AlterFragmentOperation::SetDmlRateLimit { rate_limit } => { + let [fragment_id] = fragment_ids.as_slice() else { + return Err(ErrorCode::InvalidInputSyntax( + "ALTER FRAGMENT ... SET rate limit supports exactly one fragment id" + .to_owned(), + ) + .into()); + }; + alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id( + &handler_args.session, + PbThrottleTarget::Fragment, + *fragment_id, + rate_limit, + StatementType::SET_VARIABLE, + Some(risingwave_pb::meta::ThrottleType::Dml), + ) + .await + } + AlterFragmentOperation::SetSinkRateLimit { rate_limit } => { + let [fragment_id] = fragment_ids.as_slice() else { + return Err(ErrorCode::InvalidInputSyntax( + "ALTER FRAGMENT ... SET rate limit supports exactly one fragment id" + .to_owned(), + ) + .into()); + }; + alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id( + &handler_args.session, + PbThrottleTarget::Fragment, + *fragment_id, + rate_limit, + StatementType::SET_VARIABLE, + Some(risingwave_pb::meta::ThrottleType::SinkRateLimit), ) .await } diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index abe3c418cf39a..6974df78179c5 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -133,6 +133,7 @@ pub trait FrontendMetaClient: Send + Sync { kind: PbThrottleTarget, id: u32, rate_limit: Option, + throttle_type: Option, ) -> Result<()>; async fn alter_fragment_parallelism( @@ -371,9 +372,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl { kind: PbThrottleTarget, id: u32, rate_limit: Option, + throttle_type: Option, ) -> Result<()> { self.0 - .apply_throttle(kind, id, rate_limit) + .apply_throttle(kind, id, rate_limit, throttle_type) .await .map(|_| ()) } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index f73cef7ab3e99..4e77318badb0f 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -1190,6 +1190,7 @@ impl FrontendMetaClient for MockFrontendMetaClient { _kind: PbThrottleTarget, _id: u32, _rate_limit: Option, + _throttle_type: Option, ) -> RpcResult<()> { unimplemented!() } diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index ce9f915d8ecc1..92c530cff530c 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -167,8 +167,16 @@ impl StreamManagerService for StreamServiceImpl { .await? } ThrottleTarget::Fragment => { + let throttle_type = request.throttle_type(); + if throttle_type == ThrottleType::Unspecified { + return Err(Status::invalid_argument("unspecified throttle type for fragment")); + } self.metadata_manager - .update_fragment_rate_limit_by_fragment_id(request.id.into(), request.rate) + .update_fragment_rate_limit_by_fragment_id_and_type( + request.id.into(), + request.rate, + throttle_type, + ) .await? } ThrottleTarget::Unspecified => { diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index c9c7a4f4c246c..ba61262ce9db9 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -2386,6 +2386,83 @@ impl CatalogController { .await } + pub async fn update_fragment_rate_limit_by_fragment_id_and_type( + &self, + fragment_id: FragmentId, + rate_limit: Option, + throttle_type: risingwave_pb::meta::ThrottleType, + ) -> MetaResult>> { + use risingwave_pb::meta::ThrottleType; + let update_rate_limit = |fragment_type_mask: FragmentTypeMask, + stream_node: &mut PbStreamNode| { + let mut found = false; + match throttle_type { + ThrottleType::Backfill => { + if fragment_type_mask.contains_any(FragmentTypeFlag::backfill_rate_limit_fragments()) { + visit_stream_node_mut(stream_node, |node| { + if let PbNodeBody::StreamCdcScan(node) = node { + node.rate_limit = rate_limit; + found = true; + } + if let PbNodeBody::StreamScan(node) = node { + node.rate_limit = rate_limit; + found = true; + } + if let PbNodeBody::SourceBackfill(node) = node { + node.rate_limit = rate_limit; + found = true; + } + }); + } + } + ThrottleType::SourceRateLimit => { + if fragment_type_mask.contains_any(FragmentTypeFlag::source_rate_limit_fragments()) { + visit_stream_node_mut(stream_node, |node| { + if let PbNodeBody::Source(source_node) = node { + if let Some(node_inner) = &mut source_node.source_inner { + node_inner.rate_limit = rate_limit; + found = true; + } + } + if let PbNodeBody::StreamFsFetch(fs_fetch_node) = node { + if let Some(node_inner) = &mut fs_fetch_node.node_inner { + node_inner.rate_limit = rate_limit; + found = true; + } + } + }); + } + } + ThrottleType::Dml => { + if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) { + visit_stream_node_mut(stream_node, |node| { + if let PbNodeBody::Dml(node) = node { + node.rate_limit = rate_limit; + found = true; + } + }); + } + } + ThrottleType::SinkRateLimit => { + if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) { + visit_stream_node_mut(stream_node, |node| { + if let PbNodeBody::Sink(node) = node { + node.rate_limit = rate_limit; + found = true; + } + }); + } + } + ThrottleType::Unspecified => { + // Do nothing + } + } + found + }; + self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found") + .await + } + /// Note: `FsFetch` created in old versions are not included. /// Since this is only used for debugging, it should be fine. pub async fn list_rate_limits(&self) -> MetaResult> { diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index e1d63f8b0af65..dbfaa7b7f2c1a 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -659,6 +659,22 @@ impl MetadataManager { .collect()) } + pub async fn update_fragment_rate_limit_by_fragment_id_and_type( + &self, + fragment_id: FragmentId, + rate_limit: Option, + throttle_type: risingwave_pb::meta::ThrottleType, + ) -> MetaResult>> { + let fragment_actors = self + .catalog_controller + .update_fragment_rate_limit_by_fragment_id_and_type(fragment_id as _, rate_limit, throttle_type) + .await?; + Ok(fragment_actors + .into_iter() + .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect())) + .collect()) + } + #[await_tree::instrument] pub async fn update_fragment_splits( &self, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index bc6cdaeaba196..cfa44c3e1dcbc 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1237,11 +1237,13 @@ impl MetaClient { kind: PbThrottleTarget, id: u32, rate: Option, + throttle_type: Option, ) -> Result { let request = ApplyThrottleRequest { kind: kind as i32, id, rate, + throttle_type: throttle_type.map(|t| t as i32), }; let resp = self.inner.apply_throttle(request).await?; Ok(resp)