Skip to content

Commit c869383

Browse files
committed
[ISSUE #9097]Add new command to check async task status in broker.
1 parent de4e48d commit c869383

File tree

12 files changed

+500
-0
lines changed

12 files changed

+500
-0
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.broker;
18+
19+
import org.apache.rocketmq.common.AsyncTask;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.UUID;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
28+
public class AdminAsyncTaskManager {
29+
30+
private static final Map<String, AsyncTask> ASYNC_TASK_MAP = new ConcurrentHashMap<>();
31+
32+
private static final Map<String, List<String>> TASK_NAME_TO_IDS_MAP = new ConcurrentHashMap<>();
33+
34+
public static String createTask(String taskName) {
35+
String taskId = UUID.randomUUID().toString();
36+
ASYNC_TASK_MAP.put(taskId, new AsyncTask(taskName, taskId));
37+
TASK_NAME_TO_IDS_MAP.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId);
38+
return taskId;
39+
}
40+
41+
public static List<String> getTaskIdsByName(String taskName) {
42+
return TASK_NAME_TO_IDS_MAP.getOrDefault(taskName, Collections.emptyList());
43+
}
44+
45+
public static AsyncTask getTaskStatus(String taskId) {
46+
return ASYNC_TASK_MAP.get(taskId);
47+
}
48+
49+
public static void updateTaskStatus(String taskId, int status, String result) {
50+
AsyncTask task = ASYNC_TASK_MAP.get(taskId);
51+
if (task != null) {
52+
task.setStatus(status);
53+
task.setResult(result);
54+
}
55+
}
56+
57+
public static void removeTask(String taskId) {
58+
AsyncTask task = ASYNC_TASK_MAP.remove(taskId);
59+
if (task != null) {
60+
TASK_NAME_TO_IDS_MAP.computeIfPresent(task.getTaskName(), (k, v) -> {
61+
v.remove(taskId);
62+
return v.isEmpty() ? null : v;
63+
});
64+
}
65+
}
66+
}

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.rocketmq.auth.authorization.exception.AuthorizationException;
5757
import org.apache.rocketmq.auth.authorization.model.Acl;
5858
import org.apache.rocketmq.auth.authorization.model.Resource;
59+
import org.apache.rocketmq.broker.AdminAsyncTaskManager;
5960
import org.apache.rocketmq.broker.BrokerController;
6061
import org.apache.rocketmq.broker.auth.converter.AclConverter;
6162
import org.apache.rocketmq.broker.auth.converter.UserConverter;
@@ -72,6 +73,7 @@
7273
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
7374
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
7475
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
76+
import org.apache.rocketmq.common.AsyncTask;
7577
import org.apache.rocketmq.common.BoundaryType;
7678
import org.apache.rocketmq.common.BrokerConfig;
7779
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
@@ -81,6 +83,7 @@
8183
import org.apache.rocketmq.common.MixAll;
8284
import org.apache.rocketmq.common.Pair;
8385
import org.apache.rocketmq.common.PlainAccessConfig;
86+
import org.apache.rocketmq.common.TaskStatus;
8487
import org.apache.rocketmq.common.TopicAttributes;
8588
import org.apache.rocketmq.common.TopicConfig;
8689
import org.apache.rocketmq.common.UnlockCallback;
@@ -149,6 +152,8 @@
149152
import org.apache.rocketmq.remoting.protocol.body.TopicList;
150153
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
151154
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
155+
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader;
156+
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusResponseHeader;
152157
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
153158
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
154159
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
@@ -415,6 +420,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
415420
return this.listAcl(ctx, request);
416421
case RequestCode.POP_ROLLBACK:
417422
return this.transferPopToFsStore(ctx, request);
423+
case RequestCode.CHECK_ASYNC_TASK_STATUS:
424+
return this.checkAsyncTaskStatus(ctx, request);
418425
default:
419426
return getUnknownCmdResponse(ctx, request);
420427
}
@@ -487,11 +494,14 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
487494
private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) {
488495
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
489496
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
497+
String taskId = AdminAsyncTaskManager.createTask("checkRocksdbCqWriteProgress");
490498
Runnable runnable = () -> {
491499
try {
492500
CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request);
501+
AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.SUCCESS.getValue(), JSON.toJSONString(checkResult));
493502
LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult));
494503
} catch (Exception e) {
504+
AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.ERROR.getValue(), e.getMessage());
495505
LOGGER.error("checkRocksdbCqWriteProgress error", e);
496506
}
497507
};
@@ -3597,4 +3607,32 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting
35973607
}
35983608
return response;
35993609
}
3610+
3611+
private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
3612+
final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class);
3613+
try {
3614+
List<String> taskIds = AdminAsyncTaskManager.getTaskIdsByName(requestHeader.getTaskName());
3615+
if (taskIds == null || taskIds.isEmpty()) {
3616+
throw new RemotingCommandException("taskId not found");
3617+
}
3618+
3619+
List<AsyncTask> result = new ArrayList<>();
3620+
for (String taskId : taskIds) {
3621+
AsyncTask taskStatus = AdminAsyncTaskManager.getTaskStatus(taskId);
3622+
result.add(taskStatus);
3623+
3624+
if (taskStatus.getStatus() == TaskStatus.SUCCESS.getValue()) {
3625+
AdminAsyncTaskManager.removeTask(taskId);
3626+
}
3627+
}
3628+
3629+
RemotingCommand response = RemotingCommand.createResponseCommand(CheckAsyncTaskStatusResponseHeader.class);
3630+
response.setCode(ResponseCode.SUCCESS);
3631+
response.setBody(JSON.toJSONBytes(result));
3632+
return response;
3633+
} catch (Exception e) {
3634+
LOGGER.error("checkAsyncTaskStatus error", e);
3635+
return RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, e.getMessage());
3636+
}
3637+
}
36003638
}

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.rocketmq.client.producer.SendResult;
5656
import org.apache.rocketmq.client.producer.SendStatus;
5757
import org.apache.rocketmq.client.rpchook.NamespaceRpcHook;
58+
import org.apache.rocketmq.common.AsyncTask;
5859
import org.apache.rocketmq.common.BoundaryType;
5960
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
6061
import org.apache.rocketmq.common.MQVersion;
@@ -149,6 +150,7 @@
149150
import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader;
150151
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
151152
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
153+
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader;
152154
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
153155
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
154156
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
@@ -3601,4 +3603,17 @@ public void exportPopRecord(String brokerAddr, long timeout) throws RemotingConn
36013603
}
36023604
throw new MQBrokerException(response.getCode(), response.getRemark());
36033605
}
3606+
3607+
public List<AsyncTask> checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader,
3608+
long timeoutMillis)
3609+
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
3610+
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS,
3611+
requestHeader);
3612+
RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
3613+
assert response != null;
3614+
if (response.getCode() == SUCCESS) {
3615+
return RemotingSerializable.decodeList(response.getBody(), AsyncTask.class);
3616+
}
3617+
throw new MQBrokerException(response.getCode(), response.getRemark());
3618+
}
36043619
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.common;
19+
20+
import java.util.Date;
21+
22+
public class AsyncTask {
23+
24+
private String taskName;
25+
26+
private String taskId;
27+
28+
private int status;
29+
30+
private Date createTime;
31+
32+
private String result;
33+
34+
public AsyncTask(String taskName, String taskId) {
35+
this.taskName = taskName;
36+
this.taskId = taskId;
37+
this.status = TaskStatus.INIT.getValue();
38+
this.createTime = new Date();
39+
this.result = null;
40+
}
41+
42+
public String getTaskName() {
43+
return taskName;
44+
}
45+
46+
public void setTaskName(String taskName) {
47+
this.taskName = taskName;
48+
}
49+
50+
public int getStatus() {
51+
return status;
52+
}
53+
54+
public void setStatus(int status) {
55+
this.status = status;
56+
}
57+
58+
public String getResult() {
59+
return result;
60+
}
61+
62+
public void setResult(String result) {
63+
this.result = result;
64+
}
65+
66+
public Date getCreateTime() {
67+
return createTime;
68+
}
69+
70+
public void setCreateTime(Date createTime) {
71+
this.createTime = createTime;
72+
}
73+
74+
public String getTaskId() {
75+
return taskId;
76+
}
77+
78+
public void setTaskId(String taskId) {
79+
this.taskId = taskId;
80+
}
81+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.common;
19+
20+
public enum TaskStatus {
21+
22+
INIT(0, "Initialized"),
23+
24+
IN_PROGRESS(1, "In Progress"),
25+
26+
ERROR(2, "Error"),
27+
28+
SUCCESS(3, "Success");
29+
30+
private final int value;
31+
32+
private final String desc;
33+
34+
TaskStatus(int value, String desc) {
35+
this.value = value;
36+
this.desc = desc;
37+
}
38+
39+
public int getValue() {
40+
return value;
41+
}
42+
43+
public String getDesc() {
44+
return desc;
45+
}
46+
}

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ public class RequestCode {
220220
public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353;
221221
public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354;
222222
public static final int EXPORT_ROCKSDB_CONFIG_TO_JSON = 355;
223+
public static final int CHECK_ASYNC_TASK_STATUS = 356;
223224

224225
public static final int LITE_PULL_MESSAGE = 361;
225226
public static final int RECALL_MESSAGE = 370;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.remoting.protocol.header;
19+
20+
import org.apache.commons.lang3.StringUtils;
21+
import org.apache.rocketmq.common.action.Action;
22+
import org.apache.rocketmq.common.action.RocketMQAction;
23+
import org.apache.rocketmq.remoting.CommandCustomHeader;
24+
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
25+
import org.apache.rocketmq.remoting.protocol.RequestCode;
26+
27+
@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, action = Action.GET)
28+
public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader {
29+
30+
private String taskName;
31+
32+
@Override
33+
public void checkFields() throws RemotingCommandException {
34+
if (StringUtils.isBlank(taskName)) {
35+
throw new RemotingCommandException("taskName cannot be null or blank");
36+
}
37+
}
38+
39+
public String getTaskName() {
40+
return taskName;
41+
}
42+
43+
public void setTaskName(String taskId) {
44+
this.taskName = taskId;
45+
}
46+
}

0 commit comments

Comments
 (0)