Skip to content

Commit

Permalink
Modify strategy of wait task time limitation & Fix proc id (#12552)
Browse files Browse the repository at this point in the history
  • Loading branch information
liyuheng55555 authored May 23, 2024
1 parent 40934dd commit 0ebac6b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
Expand Down Expand Up @@ -381,13 +382,11 @@ public Map<Integer, TSStatus> resetPeerList(

// TODO: will use 'procedure yield' to refactor later
public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation dataNodeLocation) {
// In some cases the DataNode is still working, but its status is unknown.
// In order to make task continue under this circumstance, some unconditional retries are
// performed here.
int unconditionallyRetry = 0;
while (unconditionallyRetry < 6
|| configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())
!= NodeStatus.Unknown) {
final long MAX_DISCONNECTION_TOLERATE_MS = 600_000;
final long INITIAL_DISCONNECTION_TOLERATE_MS = 60_000;
long startTime = System.nanoTime();
long lastReportTime = System.nanoTime();
while (true) {
try (SyncDataNodeInternalServiceClient dataNodeClient =
dataNodeClientManager.borrowClient(dataNodeLocation.getInternalEndPoint())) {
TRegionMigrateResult report = dataNodeClient.getRegionMaintainResult(taskId);
Expand All @@ -396,20 +395,29 @@ public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation dataNo
}
} catch (Exception ignore) {

} finally {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
unconditionallyRetry++;
long waitTime =
Math.min(
INITIAL_DISCONNECTION_TOLERATE_MS
+ TimeUnit.NANOSECONDS.toMillis(lastReportTime - startTime) / 60,
MAX_DISCONNECTION_TOLERATE_MS);
long disconnectionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastReportTime);
if (disconnectionTime > waitTime) {
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
LOGGER.warn(
"{} task {} cannot contact to DataNode {}",
"{} task {} cannot get task report from DataNode {}, last report time is {} ago",
REGION_MIGRATE_PROCESS,
taskId,
dataNodeLocation);
dataNodeLocation,
CommonDateTimeUtils.convertMillisecondToDurationStr(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastReportTime)));
TRegionMigrateResult report = new TRegionMigrateResult();
report.setTaskStatus(TRegionMaintainTaskStatus.FAIL);
report.setFailedNodeAndReason(new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ private Flow warnAndRollBackAndNoMoreState(
ConfigNodeProcedureEnv env, RegionMaintainHandler handler, String reason, Exception e)
throws ProcedureException {
if (e != null) {
LOGGER.warn("[pid{}][AddRegion] Start to roll back, because: {}", getRootProcId(), reason, e);
LOGGER.warn("[pid{}][AddRegion] Start to roll back, because: {}", getProcId(), reason, e);
} else {
LOGGER.warn("[pid{}][AddRegion] Start to roll back, because: {}", getRootProcId(), reason);
LOGGER.warn("[pid{}][AddRegion] Start to roll back, because: {}", getProcId(), reason);
}
handler.removeRegionLocation(consensusGroupId, destDataNode);

Expand Down

0 comments on commit 0ebac6b

Please sign in to comment.