Skip to content

Commit 91368c4

Browse files
fuweng11healchow
authored andcommitted
[INLONG-8564][Manager] Fix unable to issue tasks when modifying data node info (#8565)
* [INLONG-8564][Manager] Fix modify dataNode information unable to issue task * [INLONG-8564][Manager] Improve the code readability --------- Co-authored-by: healchow <healchow@gmail.com> (cherry picked from commit f8039b6)
1 parent ccf040e commit 91368c4

File tree

5 files changed

+24
-57
lines changed

5 files changed

+24
-57
lines changed

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public Boolean testConnection(DataNodeRequest request) {
107107
}
108108

109109
@Override
110-
public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
110+
public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity oldEntity, String operator) {
111111
LOGGER.info("do nothing for the data node type ={}", request.getType());
112112
}
113113

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ public interface DataNodeOperator {
8383
* Update related stream source.
8484
*
8585
* @param request data node request
86-
* @param entity data node entity
86+
* @param oldEntity old data node entity
8787
* @param operator operator
8888
*/
89-
void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator);
89+
void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity oldEntity, String operator);
9090

9191
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
2222
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
2323
import org.apache.inlong.manager.common.exceptions.BusinessException;
24+
import org.apache.inlong.manager.common.util.CommonBeanUtils;
2425
import org.apache.inlong.manager.common.util.Preconditions;
2526
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
2627
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
@@ -184,68 +185,35 @@ public List<DataNodeInfo> list(DataNodePageRequest request, UserInfo opInfo) {
184185
@Transactional(rollbackFor = Throwable.class)
185186
public Boolean update(DataNodeRequest request, String operator) {
186187
LOGGER.info("begin to update data node by id: {}", request);
187-
// check whether record existed
188+
// check whether the record existed
188189
DataNodeEntity curEntity = dataNodeMapper.selectById(request.getId());
189190
if (curEntity == null) {
190191
throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
191192
String.format("data node record not found by id=%d", request.getId()));
192193
}
193194
userService.checkUser(curEntity.getInCharges(), operator,
194195
"Current user does not have permission to update data node info");
196+
195197
// check whether modify unmodifiable parameters
196198
chkUnmodifiableParams(curEntity, request);
197-
// Check whether the data node name exists with the same name and type
198-
if (request.getName() != null) {
199-
if (StringUtils.isBlank(request.getName())) {
200-
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
201-
"the name changed of data node is blank!");
202-
}
203-
DataNodeEntity existEntity =
204-
dataNodeMapper.selectByUniqueKey(request.getName(), request.getType());
205-
if (existEntity != null && !existEntity.getId().equals(request.getId())) {
206-
throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
207-
String.format("data node already exist for name=%s, type=%s, required id=%s, exist id=%s",
208-
request.getName(), request.getType(), request.getId(), existEntity.getId()));
209-
}
210-
}
199+
200+
// after the update operation, `curEntity` will be updated to the latest info by the MyBatis cache mechanism,
201+
// so we need to get an `oldEntity` by copying `curEntity` before the update operation.
202+
DataNodeEntity oldEntity = CommonBeanUtils.copyProperties(curEntity, DataNodeEntity::new);
211203
DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
212204
dataNodeOperator.updateOpt(request, operator);
213-
dataNodeOperator.updateRelatedStreamSource(request, curEntity, operator);
205+
206+
// update the related stream sources if the request and old entity ha
207+
dataNodeOperator.updateRelatedStreamSource(request, oldEntity, operator);
214208
LOGGER.info("success to update data node={}", request);
215209
return true;
216210
}
217211

218212
@Override
219213
@Transactional(rollbackFor = Throwable.class)
220214
public Boolean update(DataNodeRequest request, UserInfo opInfo) {
221-
// check the record existed
222-
DataNodeEntity curEntity = dataNodeMapper.selectById(request.getId());
223-
if (curEntity == null) {
224-
throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
225-
String.format("data node record not found by id=%d", request.getId()));
226-
}
227-
userService.checkUser(curEntity.getInCharges(), opInfo.getName(),
228-
"Current user does not have permission to update data node info");
229-
// check whether modify unmodifiable parameters
230-
chkUnmodifiableParams(curEntity, request);
231-
// Check whether the data node name exists with the same name and type
232-
if (request.getName() != null) {
233-
if (StringUtils.isBlank(request.getName())) {
234-
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
235-
"the name changed of data node is blank!");
236-
}
237-
DataNodeEntity existEntity =
238-
dataNodeMapper.selectByUniqueKey(request.getName(), request.getType());
239-
if (existEntity != null && !existEntity.getId().equals(request.getId())) {
240-
throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
241-
String.format("data node already exist for name=%s, type=%s, required id=%s, exist id=%s",
242-
request.getName(), request.getType(), request.getId(), existEntity.getId()));
243-
}
244-
}
245-
DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
246-
dataNodeOperator.updateOpt(request, opInfo.getName());
247-
dataNodeOperator.updateRelatedStreamSource(request, curEntity, opInfo.getName());
248-
return true;
215+
String operator = opInfo.getName();
216+
return this.update(request, operator);
249217
}
250218

251219
@Override

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,13 @@ public Boolean testConnection(DataNodeRequest request) {
110110
}
111111

112112
@Override
113-
public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
114-
MySQLDataNodeRequest mySQLDataNodeRequest = (MySQLDataNodeRequest) request;
115-
MySQLDataNodeInfo mySQLDataNodeInfo = (MySQLDataNodeInfo) this.getFromEntity(entity);
116-
boolean changed = !Objects.equals(mySQLDataNodeRequest.getUrl(), mySQLDataNodeInfo.getUrl())
117-
|| !Objects.equals(mySQLDataNodeRequest.getBackupUrl(), mySQLDataNodeInfo.getBackupUrl())
118-
|| !Objects.equals(mySQLDataNodeRequest.getUsername(), mySQLDataNodeInfo.getUsername())
119-
|| !Objects.equals(mySQLDataNodeRequest.getToken(), mySQLDataNodeInfo.getToken());
113+
public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity oldEntity, String operator) {
114+
MySQLDataNodeRequest nodeRequest = (MySQLDataNodeRequest) request;
115+
MySQLDataNodeInfo nodeInfo = (MySQLDataNodeInfo) this.getFromEntity(oldEntity);
116+
boolean changed = !Objects.equals(nodeRequest.getUrl(), nodeInfo.getUrl())
117+
|| !Objects.equals(nodeRequest.getBackupUrl(), nodeInfo.getBackupUrl())
118+
|| !Objects.equals(nodeRequest.getUsername(), nodeInfo.getUsername())
119+
|| !Objects.equals(nodeRequest.getToken(), nodeInfo.getToken());
120120
if (changed) {
121121
retryStreamSourceByDataNodeNameAndType(request.getName(), SourceType.MYSQL_SQL, operator);
122122
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/user/UserServiceImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,8 @@ public void login(UserLoginRequest req) {
357357
public void checkUser(String inCharges, String user, String errMsg) {
358358
UserEntity userEntity = userMapper.selectByName(user);
359359
boolean isInCharge = Preconditions.inSeparatedString(user, inCharges, InlongConstants.COMMA);
360-
Preconditions.expectTrue(
361-
isInCharge || TenantUserTypeEnum.TENANT_ADMIN.getCode().equals(userEntity.getAccountType()),
362-
errMsg);
360+
Preconditions.expectTrue(isInCharge
361+
|| TenantUserTypeEnum.TENANT_ADMIN.getCode().equals(userEntity.getAccountType()), errMsg);
363362
}
364363

365364
public void removeInChargeForGroup(String user, String operator) {

0 commit comments

Comments
 (0)