Skip to content

Commit

Permalink
Load: convert to insert tablet on region replica set changes (#14717) (
Browse files Browse the repository at this point in the history
…#14833)

* Load: convert to insert tablet on region replica set changes (#14717)

(cherry picked from commit 7ac71fb)

* resolve
  • Loading branch information
DanielWang2035 authored Feb 14, 2025
1 parent b359323 commit 8ebfcb9
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,16 @@ private void executeTabletConversion(final Analysis analysis, final LoadAnalyzeE
: null;

if (status == null) {
LOGGER.warn(
"Load: Failed to convert to tablets from statement {}. Status is null.",
loadTsFileStatement);
analysis.setFailStatus(
new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
} else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
LOGGER.warn(
"Load: Failed to convert to tablets from statement {}. Status: {}",
loadTsFileStatement,
status);
analysis.setFailStatus(status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ public static PlanNode deserialize(ByteBuffer buffer) {
InputStream stream = new ByteArrayInputStream(buffer.array());
try {
ReadWriteIOUtils.readShort(stream); // read PlanNodeType
File tsFile = new File(ReadWriteIOUtils.readString(stream));
LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new PlanNodeId(""), tsFile);
int tsFileDataSize = ReadWriteIOUtils.readInt(stream);
final File tsFile = new File(ReadWriteIOUtils.readString(stream));
final LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new PlanNodeId(""), tsFile);
final int tsFileDataSize = ReadWriteIOUtils.readInt(stream);
for (int i = 0; i < tsFileDataSize; i++) {
TsFileData tsFileData = TsFileData.deserialize(stream);
pieceNode.addTsFileData(tsFileData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -204,8 +205,7 @@ public void start() {
long startTime = System.nanoTime();
final boolean isFirstPhaseSuccess;
try {
isFirstPhaseSuccess =
firstPhaseWithRetry(node, CONFIG.getLoadTsFileRetryCountOnRegionChange());
isFirstPhaseSuccess = firstPhase(node);
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() - startTime);
Expand Down Expand Up @@ -259,9 +259,23 @@ public void start() {
if (isLoadSuccess) {
stateMachine.transitionToFinished();
} else {
final StringBuilder failedTsFiles =
new StringBuilder(
!tsFileNodeList.isEmpty()
? tsFileNodeList.get(0).getTsFileResource().getTsFilePath()
: "");
final ListIterator<Integer> iterator = failedTsFileNodeIndexes.listIterator(1);
while (iterator.hasNext()) {
failedTsFiles
.append(", ")
.append(tsFileNodeList.get(iterator.next()).getTsFileResource().getTsFilePath());
}
final long startTime = System.nanoTime();
try {
// if failed to load some TsFiles, then try to convert the TsFiles to Tablets
LOGGER.info(
"Load TsFile(s) failed, will try to convert to tablets and insert. Failed TsFiles: {}",
failedTsFiles);
convertFailedTsFilesToTabletsAndRetry();
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
Expand All @@ -273,30 +287,7 @@ public void start() {
}
}

private boolean firstPhaseWithRetry(LoadSingleTsFileNode node, int retryCountOnRegionChange) {
retryCountOnRegionChange = Math.max(0, retryCountOnRegionChange);
while (true) {
try {
return firstPhase(node);
} catch (RegionReplicaSetChangedException e) {
if (retryCountOnRegionChange > 0) {
LOGGER.warn(
"Region replica set changed during loading TsFile {}, maybe due to region migration, will retry for {} times.",
node.getTsFileResource(),
retryCountOnRegionChange);
retryCountOnRegionChange--;
} else {
stateMachine.transitionToFailed(e);
LOGGER.warn(
"Region replica set changed during loading TsFile {} after retry.",
node.getTsFileResource());
return false;
}
}
}
}

private boolean firstPhase(LoadSingleTsFileNode node) throws RegionReplicaSetChangedException {
private boolean firstPhase(LoadSingleTsFileNode node) {
final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block);
try {
new TsFileSplitter(
Expand All @@ -306,8 +297,6 @@ private boolean firstPhase(LoadSingleTsFileNode node) throws RegionReplicaSetCha
stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
return false;
}
} catch (RegionReplicaSetChangedException e) {
throw e;
} catch (IllegalStateException e) {
stateMachine.transitionToFailed(e);
LOGGER.warn(
Expand Down Expand Up @@ -657,7 +646,7 @@ private boolean addOrSendChunkData(ChunkData chunkData) throws LoadFileException

dataSize -= pieceNode.getDataSize();
block.reduceMemoryUsage(pieceNode.getDataSize());
regionId2ReplicaSetAndNode.put(
regionId2ReplicaSetAndNode.replace(
sortedRegionId,
new Pair<>(
replicaSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,17 +904,36 @@ public TSStatus writeLoadTsFileNode(

LoadTsFileRateLimiter.getInstance().acquire(pieceNode.getDataSize());

final DataRegion dataRegion = getDataRegion(dataRegionId);
if (dataRegion == null) {
LOGGER.warn(
"DataRegion {} not found on this DataNode when writing piece node"
+ "of TsFile {} (maybe due to region migration), will skip.",
dataRegionId,
pieceNode.getTsFile());
return RpcUtils.SUCCESS_STATUS;
}

try {
loadTsFileManager.writeToDataRegion(getDataRegion(dataRegionId), pieceNode, uuid);
loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid);
} catch (IOException e) {
LOGGER.error(
LOGGER.warn(
"IO error when writing piece node of TsFile {} to DataRegion {}.",
pieceNode.getTsFile(),
dataRegionId,
e);
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(e.getMessage());
return status;
} catch (Exception e) {
LOGGER.warn(
"Exception occurred when writing piece node of TsFile {} to DataRegion {}.",
pieceNode.getTsFile(),
dataRegionId,
e);
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(e.getMessage());
return status;
}

return RpcUtils.SUCCESS_STATUS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
|| result.getCode()
== TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
LOGGER.warn(
"Failed to convert data type for LoadTsFileStatement: {}, status code is {}.",
loadTsFileStatement,
result.getCode());
return Optional.empty();
}
}
Expand Down

0 comments on commit 8ebfcb9

Please sign in to comment.