From 8ebfcb99883be1f9f61b56d0b7c9056d54b01414 Mon Sep 17 00:00:00 2001 From: Zikun Ma <55695098+DanielWang2035@users.noreply.github.com> Date: Fri, 14 Feb 2025 18:00:45 +0800 Subject: [PATCH] Load: convert to insert tablet on region replica set changes (#14717) (#14833) * Load: convert to insert tablet on region replica set changes (#14717) (cherry picked from commit 7ac71fb2a33fdaae4fe56804ce0a2c48640dbed8) * resolve --- .../plan/analyze/LoadTsFileAnalyzer.java | 7 +++ .../plan/node/load/LoadTsFilePieceNode.java | 6 +-- .../scheduler/load/LoadTsFileScheduler.java | 47 +++++++------------ .../iotdb/db/storageengine/StorageEngine.java | 23 ++++++++- ...tementDataTypeConvertExecutionVisitor.java | 4 ++ 5 files changed, 53 insertions(+), 34 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java index 5604b687068c..30cd8c23389e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java @@ -236,9 +236,16 @@ private void executeTabletConversion(final Analysis analysis, final LoadAnalyzeE : null; if (status == null) { + LOGGER.warn( + "Load: Failed to convert to tablets from statement {}. Status is null.", + loadTsFileStatement); analysis.setFailStatus( new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage())); } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) { + LOGGER.warn( + "Load: Failed to convert to tablets from statement {}. Status: {}", + loadTsFileStatement, + status); analysis.setFailStatus(status); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java index 5d4f02378fc2..f578148e014b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java @@ -155,9 +155,9 @@ public static PlanNode deserialize(ByteBuffer buffer) { InputStream stream = new ByteArrayInputStream(buffer.array()); try { ReadWriteIOUtils.readShort(stream); // read PlanNodeType - File tsFile = new File(ReadWriteIOUtils.readString(stream)); - LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new PlanNodeId(""), tsFile); - int tsFileDataSize = ReadWriteIOUtils.readInt(stream); + final File tsFile = new File(ReadWriteIOUtils.readString(stream)); + final LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new PlanNodeId(""), tsFile); + final int tsFileDataSize = ReadWriteIOUtils.readInt(stream); for (int i = 0; i < tsFileDataSize; i++) { TsFileData tsFileData = TsFileData.deserialize(stream); pieceNode.addTsFileData(tsFileData); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index be3c9345f803..eaa23a59cf2d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -90,6 +90,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -204,8 +205,7 @@ public void start() { long startTime = System.nanoTime(); final boolean isFirstPhaseSuccess; try { - isFirstPhaseSuccess = - firstPhaseWithRetry(node, CONFIG.getLoadTsFileRetryCountOnRegionChange()); + isFirstPhaseSuccess = firstPhase(node); } finally { LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() - startTime); @@ -259,9 +259,23 @@ public void start() { if (isLoadSuccess) { stateMachine.transitionToFinished(); } else { + final StringBuilder failedTsFiles = + new StringBuilder( + !tsFileNodeList.isEmpty() + ? tsFileNodeList.get(0).getTsFileResource().getTsFilePath() + : ""); + final ListIterator iterator = failedTsFileNodeIndexes.listIterator(1); + while (iterator.hasNext()) { + failedTsFiles + .append(", ") + .append(tsFileNodeList.get(iterator.next()).getTsFileResource().getTsFilePath()); + } final long startTime = System.nanoTime(); try { // if failed to load some TsFiles, then try to convert the TsFiles to Tablets + LOGGER.info( + "Load TsFile(s) failed, will try to convert to tablets and insert. Failed TsFiles: {}", + failedTsFiles); convertFailedTsFilesToTabletsAndRetry(); } finally { LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( @@ -273,30 +287,7 @@ public void start() { } } - private boolean firstPhaseWithRetry(LoadSingleTsFileNode node, int retryCountOnRegionChange) { - retryCountOnRegionChange = Math.max(0, retryCountOnRegionChange); - while (true) { - try { - return firstPhase(node); - } catch (RegionReplicaSetChangedException e) { - if (retryCountOnRegionChange > 0) { - LOGGER.warn( - "Region replica set changed during loading TsFile {}, maybe due to region migration, will retry for {} times.", - node.getTsFileResource(), - retryCountOnRegionChange); - retryCountOnRegionChange--; - } else { - stateMachine.transitionToFailed(e); - LOGGER.warn( - "Region replica set changed during loading TsFile {} after retry.", - node.getTsFileResource()); - return false; - } - } - } - } - - private boolean firstPhase(LoadSingleTsFileNode node) throws RegionReplicaSetChangedException { + private boolean firstPhase(LoadSingleTsFileNode node) { final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block); try { new TsFileSplitter( @@ -306,8 +297,6 @@ private boolean firstPhase(LoadSingleTsFileNode node) throws RegionReplicaSetCha stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())); return false; } - } catch (RegionReplicaSetChangedException e) { - throw e; } catch (IllegalStateException e) { stateMachine.transitionToFailed(e); LOGGER.warn( @@ -657,7 +646,7 @@ private boolean addOrSendChunkData(ChunkData chunkData) throws LoadFileException dataSize -= pieceNode.getDataSize(); block.reduceMemoryUsage(pieceNode.getDataSize()); - regionId2ReplicaSetAndNode.put( + regionId2ReplicaSetAndNode.replace( sortedRegionId, new Pair<>( replicaSet, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index cc0b852e3b08..d88703b5b0c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -904,10 +904,20 @@ public TSStatus writeLoadTsFileNode( LoadTsFileRateLimiter.getInstance().acquire(pieceNode.getDataSize()); + final DataRegion dataRegion = getDataRegion(dataRegionId); + if (dataRegion == null) { + LOGGER.warn( + "DataRegion {} not found on this DataNode when writing piece node" + + "of TsFile {} (maybe due to region migration), will skip.", + dataRegionId, + pieceNode.getTsFile()); + return RpcUtils.SUCCESS_STATUS; + } + try { - loadTsFileManager.writeToDataRegion(getDataRegion(dataRegionId), pieceNode, uuid); + loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid); } catch (IOException e) { - LOGGER.error( + LOGGER.warn( "IO error when writing piece node of TsFile {} to DataRegion {}.", pieceNode.getTsFile(), dataRegionId, @@ -915,6 +925,15 @@ public TSStatus writeLoadTsFileNode( status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); status.setMessage(e.getMessage()); return status; + } catch (Exception e) { + LOGGER.warn( + "Exception occurred when writing piece node of TsFile {} to DataRegion {}.", + pieceNode.getTsFile(), + dataRegionId, + e); + status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); + status.setMessage(e.getMessage()); + return status; } return RpcUtils.SUCCESS_STATUS; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index 222fdb5d93fd..d37316a29080 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -109,6 +109,10 @@ file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() || result.getCode() == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}, status code is {}.", + loadTsFileStatement, + result.getCode()); return Optional.empty(); } }