Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e_test/slow_tests/streaming/rate_limit/fragment.slt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ skipif madsim
system ok
fragment_id=$(PGUSER=root psql -h ${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS} -p ${RISEDEV_RW_FRONTEND_PORT} -d dev -t -c "select fragment_id from rw_fragments where flags @> ARRAY['STREAM_SCAN']")
echo "fragment_id:" $fragment_id
PGUSER=root psql -h ${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS} -p ${RISEDEV_RW_FRONTEND_PORT} -d dev -c "ALTER FRAGMENT $fragment_id SET RATE_LIMIT = default;"
PGUSER=root psql -h ${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS} -p ${RISEDEV_RW_FRONTEND_PORT} -d dev -c "ALTER FRAGMENT $fragment_id SET BACKFILL_RATE_LIMIT = default;"


statement ok
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1355,10 +1355,13 @@ pub async fn handle(
fragment_ids,
operation,
} => match operation {
AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
AlterFragmentOperation::SetBackfillRateLimit { rate_limit }
| AlterFragmentOperation::SetSourceRateLimit { rate_limit }
| AlterFragmentOperation::SetDmlRateLimit { rate_limit }
| AlterFragmentOperation::SetSinkRateLimit { rate_limit } => {
let [fragment_id] = fragment_ids.as_slice() else {
return Err(ErrorCode::InvalidInputSyntax(
"ALTER FRAGMENT ... SET RATE_LIMIT supports exactly one fragment id"
"ALTER FRAGMENT ... SET rate limit supports exactly one fragment id"
.to_owned(),
)
.into());
Expand Down
16 changes: 14 additions & 2 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,10 @@ pub enum AlterSecretOperation {

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum AlterFragmentOperation {
AlterBackfillRateLimit { rate_limit: i32 },
SetBackfillRateLimit { rate_limit: i32 },
SetSourceRateLimit { rate_limit: i32 },
SetDmlRateLimit { rate_limit: i32 },
SetSinkRateLimit { rate_limit: i32 },
SetParallelism { parallelism: SetVariableValue },
}

Expand Down Expand Up @@ -729,9 +732,18 @@ impl fmt::Display for AlterColumnOperation {
impl fmt::Display for AlterFragmentOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
AlterFragmentOperation::SetBackfillRateLimit { rate_limit } => {
write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
}
AlterFragmentOperation::SetSourceRateLimit { rate_limit } => {
write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
}
AlterFragmentOperation::SetDmlRateLimit { rate_limit } => {
write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
}
AlterFragmentOperation::SetSinkRateLimit { rate_limit } => {
write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
}
AlterFragmentOperation::SetParallelism { parallelism } => {
write!(f, "SET PARALLELISM TO {}", parallelism)
}
Expand Down
33 changes: 11 additions & 22 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3864,36 +3864,25 @@ impl Parser<'_> {
}
let parallelism = self.parse_set_variable()?;
AlterFragmentOperation::SetParallelism { parallelism }
} else if let Some(rate_limit) = self.parse_alter_backfill_rate_limit()? {
AlterFragmentOperation::SetBackfillRateLimit { rate_limit }
} else if let Some(rate_limit) = self.parse_alter_source_rate_limit(false)? {
AlterFragmentOperation::SetSourceRateLimit { rate_limit }
} else if let Some(rate_limit) = self.parse_alter_dml_rate_limit()? {
AlterFragmentOperation::SetDmlRateLimit { rate_limit }
} else if let Some(rate_limit) = self.parse_alter_sink_rate_limit()? {
AlterFragmentOperation::SetSinkRateLimit { rate_limit }
} else {
let rate_limit = self.parse_alter_fragment_rate_limit()?;
AlterFragmentOperation::AlterBackfillRateLimit { rate_limit }
return self.expected(
"PARALLELISM/BACKFILL_RATE_LIMIT/SOURCE_RATE_LIMIT/DML_RATE_LIMIT/SINK_RATE_LIMIT after SET",
);
};
Ok(Statement::AlterFragment {
fragment_ids,
operation,
})
}

fn parse_alter_fragment_rate_limit(&mut self) -> ModalResult<i32> {
if !self.parse_word("RATE_LIMIT") {
return self.expected("expected RATE_LIMIT after SET");
}
if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() {
return self.expected("TO or = after RATE_LIMIT");
}
let rate_limit = if self.parse_keyword(Keyword::DEFAULT) {
-1
} else {
let s = self.parse_number_value()?;
if let Ok(n) = s.parse::<i32>() {
n
} else {
return self.expected("number or DEFAULT");
}
};
Ok(rate_limit)
}

/// Parse a copy statement
pub fn parse_copy(&mut self) -> ModalResult<Statement> {
let entity = if self.consume_token(&Token::LParen) {
Expand Down
71 changes: 71 additions & 0 deletions src/sqlparser/tests/sqlparser_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4198,3 +4198,74 @@ fn parse_alter_fragment_set_parallelism() {
_ => panic!("unexpected statement kind"),
}
}

#[test]
fn parse_alter_fragment_set_rate_limits() {
// Test BACKFILL_RATE_LIMIT
match verified_stmt("ALTER FRAGMENT 1 SET BACKFILL_RATE_LIMIT TO 100") {
Statement::AlterFragment {
fragment_ids,
operation,
} => {
assert_eq!(fragment_ids, vec![1]);
match operation {
AlterFragmentOperation::SetBackfillRateLimit { rate_limit } => {
assert_eq!(rate_limit, 100);
}
_ => panic!("unexpected alter fragment operation"),
}
}
_ => panic!("unexpected statement kind"),
}

// Test SOURCE_RATE_LIMIT
match verified_stmt("ALTER FRAGMENT 2 SET SOURCE_RATE_LIMIT TO 200") {
Statement::AlterFragment {
fragment_ids,
operation,
} => {
assert_eq!(fragment_ids, vec![2]);
match operation {
AlterFragmentOperation::SetSourceRateLimit { rate_limit } => {
assert_eq!(rate_limit, 200);
}
_ => panic!("unexpected alter fragment operation"),
}
}
_ => panic!("unexpected statement kind"),
}

// Test DML_RATE_LIMIT
match verified_stmt("ALTER FRAGMENT 3 SET DML_RATE_LIMIT TO 300") {
Statement::AlterFragment {
fragment_ids,
operation,
} => {
assert_eq!(fragment_ids, vec![3]);
match operation {
AlterFragmentOperation::SetDmlRateLimit { rate_limit } => {
assert_eq!(rate_limit, 300);
}
_ => panic!("unexpected alter fragment operation"),
}
}
_ => panic!("unexpected statement kind"),
}

// Test SINK_RATE_LIMIT
match verified_stmt("ALTER FRAGMENT 4 SET SINK_RATE_LIMIT TO 400") {
Statement::AlterFragment {
fragment_ids,
operation,
} => {
assert_eq!(fragment_ids, vec![4]);
match operation {
AlterFragmentOperation::SetSinkRateLimit { rate_limit } => {
assert_eq!(rate_limit, 400);
}
_ => panic!("unexpected alter fragment operation"),
}
}
_ => panic!("unexpected statement kind"),
}
}