From f543ad14bcc29687bf1633acb0d04ebfab4db3a9 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 3 Dec 2025 15:48:44 +0800 Subject: [PATCH 1/3] Fix excessive GC caused by loading too many TsFiles at once When loading multiple TsFiles, all file resources were loaded into memory simultaneously, causing excessive memory consumption and frequent GC pauses. This commit introduces batch execution for multi-file loading scenarios: 1. Split LoadTsFileStatement/LoadTsFile into sub-statements, each handling one TsFile, to avoid loading all file resources at once 2. Refactor duplicate code in ClientRPCServiceImpl by extracting helper methods for both tree model and table model 3. Add progress logging to track the loading status of each file 4. Support both synchronous and asynchronous loading modes Changes: - Added getSubStatement() method to LoadTsFileStatement and LoadTsFile for splitting multi-file statements - Extracted shouldSplitLoadTsFileStatement() and shouldSplitTableLoadTsFile() to determine if splitting is needed - Extracted executeBatchLoadTsFile() and executeBatchTableLoadTsFile() to handle batch execution with progress logging - Applied the optimization to 4 execution paths (tree/table model, sync/async loading) This fix significantly reduces memory pressure and improves system stability when loading large numbers of TsFiles. --- .../thrift/impl/ClientRPCServiceImpl.java | 338 +++++++++++++++--- .../plan/relational/sql/ast/LoadTsFile.java | 41 ++- .../statement/crud/LoadTsFileStatement.java | 31 ++ 3 files changed, 366 insertions(+), 44 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 5655758f1694..976d2b3d16cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -102,6 +102,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement; @@ -365,16 +366,29 @@ private TSExecuteStatementResp executeStatementInternal( queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + // For synchronous multi-file loading, split into sub-statements for batch execution + if (shouldSplitLoadTsFileStatement(s, false)) { + result = + executeBatchLoadTsFile( + (LoadTsFileStatement) s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold()); + } else { + result = + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); + } } } else { org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement s = @@ -396,17 +410,31 @@ private TSExecuteStatementResp executeStatementInternal( queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - result = - COORDINATOR.executeForTableModel( - s, - relationSqlParser, - clientSession, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - metadata, - req.getTimeout(), - true); + // For synchronous multi-file loading, split into sub-statements for batch execution + if (shouldSplitTableLoadTsFile(s, false)) { + result = + executeBatchTableLoadTsFile( + (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile) s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold()); + } else { + result = + COORDINATOR.executeForTableModel( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + req.getTimeout(), + true); + } } if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() @@ -1845,16 +1873,30 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { queryId = SESSION_MANAGER.requestQueryId(); type = s.getType() == null ? null : s.getType().name(); // create and cache dataset - result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - false); + + // For asynchronous multi-file loading, split into sub-statements for batch execution + if (shouldSplitLoadTsFileStatement(s, true)) { + result = + executeBatchLoadTsFile( + (LoadTsFileStatement) s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold()); + } else { + result = + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); + } } } else { @@ -1875,17 +1917,31 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { queryId = SESSION_MANAGER.requestQueryId(); - result = - COORDINATOR.executeForTableModel( - s, - relationSqlParser, - clientSession, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - metadata, - config.getQueryTimeoutThreshold(), - false); + // For asynchronous multi-file loading, split into sub-statements for batch execution + if (shouldSplitTableLoadTsFile(s, true)) { + result = + executeBatchTableLoadTsFile( + (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile) s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold()); + } else { + result = + COORDINATOR.executeForTableModel( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold(), + false); + } } results.add(result.status); @@ -3190,4 +3246,202 @@ public void handleClientExit() { PipeDataNodeAgent.receiver().legacy().handleClientExit(); SubscriptionAgent.receiver().handleClientExit(); } + + /** + * Determines whether a tree-model LoadTsFileStatement should be split into multiple + * sub-statements for execution. + * + * @param statement the Statement to be executed + * @param requireAsync whether async loading is required + * @return true if the statement should be split for execution, false otherwise + */ + private boolean shouldSplitLoadTsFileStatement(Statement statement, boolean requireAsync) { + if (!(statement instanceof LoadTsFileStatement)) { + return false; + } + LoadTsFileStatement loadStmt = (LoadTsFileStatement) statement; + return loadStmt.getTsFiles().size() > 1 && loadStmt.isAsyncLoad() == requireAsync; + } + + /** + * Determines whether a table-model LoadTsFile should be split into multiple sub-statements for + * execution. + * + * @param statement the Statement to be executed + * @param requireAsync whether async loading is required + * @return true if the statement should be split for execution, false otherwise + */ + private boolean shouldSplitTableLoadTsFile( + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement, + boolean requireAsync) { + if (!(statement + instanceof org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile)) { + return false; + } + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile loadStmt = + (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile) statement; + return loadStmt.getTsFiles().size() > 1 && loadStmt.isAsyncLoad() == requireAsync; + } + + /** + * Executes tree-model LoadTsFileStatement sub-statements in batch. + * + * @param loadTsFileStatement the LoadTsFileStatement to be executed + * @param queryId the query ID + * @param sessionInfo the session information + * @param statement the SQL statement string + * @param partitionFetcher the partition fetcher + * @param schemaFetcher the schema fetcher + * @param timeoutMs the timeout in milliseconds + * @return the execution result + */ + private ExecutionResult executeBatchLoadTsFile( + LoadTsFileStatement loadTsFileStatement, + long queryId, + SessionInfo sessionInfo, + String statement, + IPartitionFetcher partitionFetcher, + ISchemaFetcher schemaFetcher, + long timeoutMs) { + + ExecutionResult result = null; + List subStatements = loadTsFileStatement.getSubStatement(); + int totalFiles = subStatements.size(); + + LOGGER.info("Start batch loading {} TsFile(s) in tree model, queryId: {}", totalFiles, queryId); + + for (int i = 0; i < totalFiles; i++) { + LoadTsFileStatement subStatement = subStatements.get(i); + LOGGER.info( + "Loading TsFile {}/{} in tree model, file: {}, queryId: {}", + i + 1, + totalFiles, + subStatement.getTsFiles().get(0).getName(), + queryId); + + result = + COORDINATOR.executeForTreeModel( + subStatement, + queryId, + sessionInfo, + statement, + partitionFetcher, + schemaFetcher, + timeoutMs, + false); + + // Exit early if any sub-statement execution fails + if (result != null + && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn( + "Failed to load TsFile {}/{} in tree model, file: {}, queryId: {}, error: {}", + i + 1, + totalFiles, + subStatement.getTsFiles().get(0).getName(), + queryId, + result.status.getMessage()); + break; + } + + LOGGER.info( + "Successfully loaded TsFile {}/{} in tree model, file: {}, queryId: {}", + i + 1, + totalFiles, + subStatement.getTsFiles().get(0).getName(), + queryId); + } + + if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.info( + "Completed batch loading all {} TsFile(s) in tree model, queryId: {}", + totalFiles, + queryId); + } + + return result; + } + + /** + * Executes table-model LoadTsFile sub-statements in batch. + * + * @param loadTsFile the LoadTsFile to be executed + * @param relationSqlParser the relational SQL parser + * @param clientSession the client session + * @param queryId the query ID + * @param sessionInfo the session information + * @param statement the SQL statement string + * @param metadata the metadata + * @param timeoutMs the timeout in milliseconds + * @return the execution result + */ + private ExecutionResult executeBatchTableLoadTsFile( + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile loadTsFile, + SqlParser relationSqlParser, + IClientSession clientSession, + long queryId, + SessionInfo sessionInfo, + String statement, + Metadata metadata, + long timeoutMs) { + + ExecutionResult result = null; + List subStatements = + loadTsFile.getSubStatement(); + int totalFiles = subStatements.size(); + + LOGGER.info( + "Start batch loading {} TsFile(s) in table model, queryId: {}", totalFiles, queryId); + + for (int i = 0; i < totalFiles; i++) { + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile subStatement = + subStatements.get(i); + LOGGER.info( + "Loading TsFile {}/{} in table model, file: {}, queryId: {}", + i + 1, + totalFiles, + subStatement.getTsFiles().get(0).getName(), + queryId); + + result = + COORDINATOR.executeForTableModel( + subStatement, + relationSqlParser, + clientSession, + queryId, + sessionInfo, + statement, + metadata, + timeoutMs, + false); + + // Exit early if any sub-statement execution fails + if (result != null + && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn( + "Failed to load TsFile {}/{} in table model, file: {}, queryId: {}, error: {}", + i + 1, + totalFiles, + subStatement.getTsFiles().get(0).getName(), + queryId, + result.status.getMessage()); + break; + } + + LOGGER.info( + "Successfully loaded TsFile {}/{} in table model, file: {}, queryId: {}", + i + 1, + totalFiles, + subStatement.getTsFiles().get(0).getName(), + queryId); + } + + if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.info( + "Completed batch loading all {} TsFile(s) in table model, queryId: {}", + totalFiles, + queryId); + } + + return result; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index 93fc8c7b5833..ac084c190cef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -37,7 +37,7 @@ public class LoadTsFile extends Statement { - private final String filePath; + private String filePath; private int databaseLevel; // For loading to tree-model only private String database; // For loading to table-model only @@ -50,7 +50,7 @@ public class LoadTsFile extends Statement { private boolean isGeneratedByPipe = false; - private final Map loadAttributes; + private Map loadAttributes; private List tsFiles; private List resources; @@ -232,6 +232,43 @@ public boolean reconstructStatementIfMiniFileConverted(final List isMin return tsFiles == null || tsFiles.isEmpty(); } + /** + * Splits the current LoadTsFile statement into multiple sub-statements, each handling one TsFile. + * Used to support batch execution when loading multiple files. + * + * @return the list of sub-statements + */ + public List getSubStatement() { + List subStatements = new ArrayList<>(tsFiles.size()); + for (int i = 0; i < tsFiles.size(); ++i) { + final String filePath = tsFiles.get(i).getAbsolutePath(); + final Map properties = this.loadAttributes; + + LoadTsFile subStatement = new LoadTsFile(getLocation().orElse(null), filePath, properties); + + // Copy all configuration properties + subStatement.databaseLevel = this.databaseLevel; + subStatement.database = this.database; + subStatement.verify = this.verify; + subStatement.deleteAfterLoad = this.deleteAfterLoad; + subStatement.convertOnTypeMismatch = this.convertOnTypeMismatch; + subStatement.tabletConversionThresholdBytes = this.tabletConversionThresholdBytes; + subStatement.autoCreateDatabase = this.autoCreateDatabase; + subStatement.isAsyncLoad = this.isAsyncLoad; + subStatement.isGeneratedByPipe = this.isGeneratedByPipe; + + // Set only the file and resources corresponding to the current index + subStatement.tsFiles = Collections.singletonList(tsFiles.get(i)); + subStatement.resources = new ArrayList<>(1); + subStatement.writePointCountList = new ArrayList<>(1); + subStatement.isTableModel = Collections.singletonList(true); + + subStatements.add(subStatement); + } + + return subStatements; + } + @Override public R accept(AstVisitor visitor, C context) { return visitor.visitLoadTsFile(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index d1dff1bb9cf2..471315c85e40 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -306,6 +306,37 @@ public boolean reconstructStatementIfMiniFileConverted(final List isMin return tsFiles == null || tsFiles.isEmpty(); } + /** + * Splits the current LoadTsFileStatement into multiple sub-statements, each handling one TsFile. + * Used to support batch execution when loading multiple files. + * + * @return the list of sub-statements + */ + public List getSubStatement() { + List subStatements = new ArrayList<>(tsFiles.size()); + for (int i = 0; i < tsFiles.size(); ++i) { + LoadTsFileStatement statement = new LoadTsFileStatement(); + statement.databaseLevel = this.databaseLevel; + statement.verifySchema = this.verifySchema; + statement.deleteAfterLoad = this.deleteAfterLoad; + statement.convertOnTypeMismatch = this.convertOnTypeMismatch; + statement.tabletConversionThresholdBytes = this.tabletConversionThresholdBytes; + statement.autoCreateDatabase = this.autoCreateDatabase; + statement.isAsyncLoad = this.isAsyncLoad; + statement.isGeneratedByPipe = this.isGeneratedByPipe; + + statement.tsFiles = Collections.singletonList(tsFiles.get(i)); + statement.resources = new ArrayList<>(1); + statement.writePointCountList = new ArrayList<>(1); + statement.isTableModel = new ArrayList<>(1); + statement.isTableModel.add(false); + statement.statementType = StatementType.MULTI_BATCH_INSERT; + subStatements.add(statement); + } + + return subStatements; + } + @Override public List getPaths() { return Collections.emptyList(); From 23192f093bb67a00534de0448868a83dd9ce5115 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Mon, 8 Dec 2025 16:37:34 +0800 Subject: [PATCH 2/3] fix --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 55 ++++ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 24 ++ .../thrift/impl/ClientRPCServiceImpl.java | 268 ++++++++++-------- .../plan/relational/sql/ast/LoadTsFile.java | 44 ++- .../plan/relational/sql/ast/Statement.java | 25 ++ .../queryengine/plan/statement/Statement.java | 23 ++ .../statement/crud/LoadTsFileStatement.java | 39 ++- 7 files changed, 335 insertions(+), 143 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 81a8aa4f50a3..edb1d1c26743 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1121,6 +1121,21 @@ public class IoTDBConfig { private int loadTsFileSpiltPartitionMaxSize = 10; + /** + * The threshold for splitting statement when loading multiple TsFiles. When the number of TsFiles + * exceeds this threshold, the statement will be split into multiple sub-statements for batch + * execution to limit resource consumption during statement analysis. Default value is 10, which + * means splitting will occur when there are more than 10 files. + */ + private int loadTsFileStatementSplitThreshold = 10; + + /** + * The number of TsFiles that each sub-statement handles when splitting a statement. This + * parameter controls how many files are grouped together in each sub-statement during batch + * execution. Default value is 10, which means each sub-statement handles 10 files. + */ + private int loadTsFileSubStatementBatchSize = 10; + private String[] loadActiveListeningDirs = new String[] { IoTDBConstant.EXT_FOLDER_NAME @@ -4056,6 +4071,46 @@ public void setLoadTsFileSpiltPartitionMaxSize(int loadTsFileSpiltPartitionMaxSi this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize; } + public int getLoadTsFileStatementSplitThreshold() { + return loadTsFileStatementSplitThreshold; + } + + public void setLoadTsFileStatementSplitThreshold(final int loadTsFileStatementSplitThreshold) { + if (loadTsFileStatementSplitThreshold < 0) { + logger.warn( + "Invalid loadTsFileStatementSplitThreshold value: {}. Using default value: 10", + loadTsFileStatementSplitThreshold); + return; + } + if (this.loadTsFileStatementSplitThreshold != loadTsFileStatementSplitThreshold) { + logger.info( + "loadTsFileStatementSplitThreshold changed from {} to {}", + this.loadTsFileStatementSplitThreshold, + loadTsFileStatementSplitThreshold); + } + this.loadTsFileStatementSplitThreshold = loadTsFileStatementSplitThreshold; + } + + public int getLoadTsFileSubStatementBatchSize() { + return loadTsFileSubStatementBatchSize; + } + + public void setLoadTsFileSubStatementBatchSize(final int loadTsFileSubStatementBatchSize) { + if (loadTsFileSubStatementBatchSize <= 0) { + logger.warn( + "Invalid loadTsFileSubStatementBatchSize value: {}. Using default value: 10", + loadTsFileSubStatementBatchSize); + return; + } + if (this.loadTsFileSubStatementBatchSize != loadTsFileSubStatementBatchSize) { + logger.info( + "loadTsFileSubStatementBatchSize changed from {} to {}", + this.loadTsFileSubStatementBatchSize, + loadTsFileSubStatementBatchSize); + } + this.loadTsFileSubStatementBatchSize = loadTsFileSubStatementBatchSize; + } + public String[] getPipeReceiverFileDirs() { return (Objects.isNull(this.pipeReceiverFileDirs) || this.pipeReceiverFileDirs.length == 0) ? new String[] {systemDir + File.separator + "pipe" + File.separator + "receiver"} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 3c9e8159d0ff..d32b0b51f576 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2395,6 +2395,18 @@ private void loadLoadTsFileProps(TrimProperties properties) { properties.getProperty( "skip_failed_table_schema_check", String.valueOf(conf.isSkipFailedTableSchemaCheck())))); + + conf.setLoadTsFileStatementSplitThreshold( + Integer.parseInt( + properties.getProperty( + "load_tsfile_statement_split_threshold", + Integer.toString(conf.getLoadTsFileStatementSplitThreshold())))); + + conf.setLoadTsFileSubStatementBatchSize( + Integer.parseInt( + properties.getProperty( + "load_tsfile_sub_statement_batch_size", + Integer.toString(conf.getLoadTsFileSubStatementBatchSize())))); } private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOException { @@ -2443,6 +2455,18 @@ private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOE "load_tsfile_split_partition_max_size", Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize())))); + conf.setLoadTsFileStatementSplitThreshold( + Integer.parseInt( + properties.getProperty( + "load_tsfile_statement_split_threshold", + Integer.toString(conf.getLoadTsFileStatementSplitThreshold())))); + + conf.setLoadTsFileSubStatementBatchSize( + Integer.parseInt( + properties.getProperty( + "load_tsfile_sub_statement_batch_size", + Integer.toString(conf.getLoadTsFileSubStatementBatchSize())))); + conf.setSkipFailedTableSchemaCheck( Boolean.parseBoolean( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 976d2b3d16cf..59b5c238a0a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -102,7 +102,6 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; -import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement; @@ -366,11 +365,11 @@ private TSExecuteStatementResp executeStatementInternal( queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - // For synchronous multi-file loading, split into sub-statements for batch execution - if (shouldSplitLoadTsFileStatement(s, false)) { + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit(false)) { result = - executeBatchLoadTsFile( - (LoadTsFileStatement) s, + executeBatchStatement( + s, queryId, SESSION_MANAGER.getSessionInfo(clientSession), statement, @@ -410,11 +409,11 @@ private TSExecuteStatementResp executeStatementInternal( queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - // For synchronous multi-file loading, split into sub-statements for batch execution - if (shouldSplitTableLoadTsFile(s, false)) { + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit(false)) { result = - executeBatchTableLoadTsFile( - (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile) s, + executeBatchTableStatement( + s, relationSqlParser, clientSession, queryId, @@ -1874,11 +1873,11 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { type = s.getType() == null ? null : s.getType().name(); // create and cache dataset - // For asynchronous multi-file loading, split into sub-statements for batch execution - if (shouldSplitLoadTsFileStatement(s, true)) { + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit(true)) { result = - executeBatchLoadTsFile( - (LoadTsFileStatement) s, + executeBatchStatement( + s, queryId, SESSION_MANAGER.getSessionInfo(clientSession), statement, @@ -1917,11 +1916,11 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { queryId = SESSION_MANAGER.requestQueryId(); - // For asynchronous multi-file loading, split into sub-statements for batch execution - if (shouldSplitTableLoadTsFile(s, true)) { + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit(true)) { result = - executeBatchTableLoadTsFile( - (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile) s, + executeBatchTableStatement( + s, relationSqlParser, clientSession, queryId, @@ -3248,83 +3247,61 @@ public void handleClientExit() { } /** - * Determines whether a tree-model LoadTsFileStatement should be split into multiple - * sub-statements for execution. + * Executes tree-model Statement sub-statements in batch. * * @param statement the Statement to be executed - * @param requireAsync whether async loading is required - * @return true if the statement should be split for execution, false otherwise - */ - private boolean shouldSplitLoadTsFileStatement(Statement statement, boolean requireAsync) { - if (!(statement instanceof LoadTsFileStatement)) { - return false; - } - LoadTsFileStatement loadStmt = (LoadTsFileStatement) statement; - return loadStmt.getTsFiles().size() > 1 && loadStmt.isAsyncLoad() == requireAsync; - } - - /** - * Determines whether a table-model LoadTsFile should be split into multiple sub-statements for - * execution. - * - * @param statement the Statement to be executed - * @param requireAsync whether async loading is required - * @return true if the statement should be split for execution, false otherwise - */ - private boolean shouldSplitTableLoadTsFile( - org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement, - boolean requireAsync) { - if (!(statement - instanceof org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile)) { - return false; - } - org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile loadStmt = - (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile) statement; - return loadStmt.getTsFiles().size() > 1 && loadStmt.isAsyncLoad() == requireAsync; - } - - /** - * Executes tree-model LoadTsFileStatement sub-statements in batch. - * - * @param loadTsFileStatement the LoadTsFileStatement to be executed * @param queryId the query ID * @param sessionInfo the session information - * @param statement the SQL statement string + * @param statementStr the SQL statement string * @param partitionFetcher the partition fetcher * @param schemaFetcher the schema fetcher * @param timeoutMs the timeout in milliseconds * @return the execution result */ - private ExecutionResult executeBatchLoadTsFile( - LoadTsFileStatement loadTsFileStatement, + private ExecutionResult executeBatchStatement( + Statement statement, long queryId, SessionInfo sessionInfo, - String statement, + String statementStr, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher, long timeoutMs) { ExecutionResult result = null; - List subStatements = loadTsFileStatement.getSubStatement(); - int totalFiles = subStatements.size(); - - LOGGER.info("Start batch loading {} TsFile(s) in tree model, queryId: {}", totalFiles, queryId); + final List subStatements = statement.getSubStatements(); + final int totalSubStatements = subStatements.size(); - for (int i = 0; i < totalFiles; i++) { - LoadTsFileStatement subStatement = subStatements.get(i); - LOGGER.info( - "Loading TsFile {}/{} in tree model, file: {}, queryId: {}", - i + 1, - totalFiles, - subStatement.getTsFiles().get(0).getName(), - queryId); + LOGGER.info( + "Start batch executing {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, + queryId); + + for (int i = 0; i < totalSubStatements; i++) { + final Statement subStatement = subStatements.get(i); + final List subSubStatements = subStatement.getSubStatements(); + final int batchSize = subSubStatements.isEmpty() ? 1 : subSubStatements.size(); + + if (batchSize == 1) { + LOGGER.info( + "Executing sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + } else { + LOGGER.info( + "Executing sub-statement {}/{} in tree model, batch size: {}, queryId: {}", + i + 1, + totalSubStatements, + batchSize, + queryId); + } result = COORDINATOR.executeForTreeModel( subStatement, queryId, sessionInfo, - statement, + statementStr, partitionFetcher, schemaFetcher, timeoutMs, @@ -3333,28 +3310,46 @@ private ExecutionResult executeBatchLoadTsFile( // Exit early if any sub-statement execution fails if (result != null && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.warn( - "Failed to load TsFile {}/{} in tree model, file: {}, queryId: {}, error: {}", - i + 1, - totalFiles, - subStatement.getTsFiles().get(0).getName(), - queryId, - result.status.getMessage()); + if (batchSize == 1) { + LOGGER.warn( + "Failed to execute sub-statement {}/{} in tree model, queryId: {}, error: {}", + i + 1, + totalSubStatements, + queryId, + result.status.getMessage()); + } else { + LOGGER.warn( + "Failed to execute sub-statement {}/{} in tree model, batch size: {}, queryId: {}, error: {}", + i + 1, + totalSubStatements, + batchSize, + queryId, + result.status.getMessage()); + } break; } - LOGGER.info( - "Successfully loaded TsFile {}/{} in tree model, file: {}, queryId: {}", - i + 1, - totalFiles, - subStatement.getTsFiles().get(0).getName(), - queryId); + processedCount += batchSize; + if (batchSize == 1) { + LOGGER.info( + "Successfully executed sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + } else { + LOGGER.info( + "Successfully executed sub-statement {}/{} in tree model, batch size: {}, queryId: {}", + i + 1, + totalSubStatements, + batchSize, + queryId); + } } if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( - "Completed batch loading all {} TsFile(s) in tree model, queryId: {}", - totalFiles, + "Completed batch executing all {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, queryId); } @@ -3362,45 +3357,60 @@ private ExecutionResult executeBatchLoadTsFile( } /** - * Executes table-model LoadTsFile sub-statements in batch. + * Executes table-model Statement sub-statements in batch. * - * @param loadTsFile the LoadTsFile to be executed + * @param statement the Statement to be executed * @param relationSqlParser the relational SQL parser * @param clientSession the client session * @param queryId the query ID * @param sessionInfo the session information - * @param statement the SQL statement string + * @param statementStr the SQL statement string * @param metadata the metadata * @param timeoutMs the timeout in milliseconds * @return the execution result */ - private ExecutionResult executeBatchTableLoadTsFile( - org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile loadTsFile, + private ExecutionResult executeBatchTableStatement( + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement, SqlParser relationSqlParser, IClientSession clientSession, long queryId, SessionInfo sessionInfo, - String statement, + String statementStr, Metadata metadata, long timeoutMs) { ExecutionResult result = null; - List subStatements = - loadTsFile.getSubStatement(); - int totalFiles = subStatements.size(); + List + subStatements = statement.getSubStatements(); + int totalSubStatements = subStatements.size(); + int processedCount = 0; LOGGER.info( - "Start batch loading {} TsFile(s) in table model, queryId: {}", totalFiles, queryId); + "Start batch executing {} sub-statement(s) in table model, queryId: {}", + totalSubStatements, + queryId); - for (int i = 0; i < totalFiles; i++) { - org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile subStatement = + for (int i = 0; i < totalSubStatements; i++) { + final org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement subStatement = subStatements.get(i); - LOGGER.info( - "Loading TsFile {}/{} in table model, file: {}, queryId: {}", - i + 1, - totalFiles, - subStatement.getTsFiles().get(0).getName(), - queryId); + final List + subSubStatements = subStatement.getSubStatements(); + final int batchSize = subSubStatements.isEmpty() ? 1 : subSubStatements.size(); + + if (batchSize == 1) { + LOGGER.info( + "Executing sub-statement {}/{} in table model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + } else { + LOGGER.info( + "Executing sub-statement {}/{} in table model, batch size: {}, queryId: {}", + i + 1, + totalSubStatements, + batchSize, + queryId); + } result = COORDINATOR.executeForTableModel( @@ -3409,7 +3419,7 @@ private ExecutionResult executeBatchTableLoadTsFile( clientSession, queryId, sessionInfo, - statement, + statementStr, metadata, timeoutMs, false); @@ -3417,28 +3427,46 @@ private ExecutionResult executeBatchTableLoadTsFile( // Exit early if any sub-statement execution fails if (result != null && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.warn( - "Failed to load TsFile {}/{} in table model, file: {}, queryId: {}, error: {}", - i + 1, - totalFiles, - subStatement.getTsFiles().get(0).getName(), - queryId, - result.status.getMessage()); + if (batchSize == 1) { + LOGGER.warn( + "Failed to execute sub-statement {}/{} in table model, queryId: {}, error: {}", + i + 1, + totalSubStatements, + queryId, + result.status.getMessage()); + } else { + LOGGER.warn( + "Failed to execute sub-statement {}/{} in table model, batch size: {}, queryId: {}, error: {}", + i + 1, + totalSubStatements, + batchSize, + queryId, + result.status.getMessage()); + } break; } - LOGGER.info( - "Successfully loaded TsFile {}/{} in table model, file: {}, queryId: {}", - i + 1, - totalFiles, - subStatement.getTsFiles().get(0).getName(), - queryId); + processedCount += batchSize; + if (batchSize == 1) { + LOGGER.info( + "Successfully executed sub-statement {}/{} in table model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + } else { + LOGGER.info( + "Successfully executed sub-statement {}/{} in table model, batch size: {}, queryId: {}", + i + 1, + totalSubStatements, + batchSize, + queryId); + } } if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( - "Completed batch loading all {} TsFile(s) in table model, queryId: {}", - totalFiles, + "Completed batch executing all {} sub-statement(s) in table model, queryId: {}", + totalSubStatements, queryId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index ac084c190cef..71ecae164526 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -232,19 +232,36 @@ public boolean reconstructStatementIfMiniFileConverted(final List isMin return tsFiles == null || tsFiles.isEmpty(); } + @Override + public boolean shouldSplit(final boolean requireAsync) { + final int splitThreshold = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileStatementSplitThreshold(); + return tsFiles.size() > splitThreshold && isAsyncLoad == requireAsync; + } + /** - * Splits the current LoadTsFile statement into multiple sub-statements, each handling one TsFile. - * Used to support batch execution when loading multiple files. + * Splits the current LoadTsFile statement into multiple sub-statements, each handling a batch of + * TsFiles. Used to limit resource consumption during statement analysis, etc. * * @return the list of sub-statements */ - public List getSubStatement() { - List subStatements = new ArrayList<>(tsFiles.size()); - for (int i = 0; i < tsFiles.size(); ++i) { - final String filePath = tsFiles.get(i).getAbsolutePath(); + @Override + public List getSubStatements() { + final int batchSize = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileSubStatementBatchSize(); + final int totalBatches = (tsFiles.size() + batchSize - 1) / batchSize; // Ceiling division + final List subStatements = new ArrayList<>(totalBatches); + + for (int i = 0; i < tsFiles.size(); i += batchSize) { + final int endIndex = Math.min(i + batchSize, tsFiles.size()); + final List batchFiles = tsFiles.subList(i, endIndex); + + // Use the first file's path for the sub-statement + final String filePath = batchFiles.get(0).getAbsolutePath(); final Map properties = this.loadAttributes; - LoadTsFile subStatement = new LoadTsFile(getLocation().orElse(null), filePath, properties); + final LoadTsFile subStatement = + new LoadTsFile(getLocation().orElse(null), filePath, properties); // Copy all configuration properties subStatement.databaseLevel = this.databaseLevel; @@ -257,11 +274,14 @@ public List getSubStatement() { subStatement.isAsyncLoad = this.isAsyncLoad; subStatement.isGeneratedByPipe = this.isGeneratedByPipe; - // Set only the file and resources corresponding to the current index - subStatement.tsFiles = Collections.singletonList(tsFiles.get(i)); - subStatement.resources = new ArrayList<>(1); - subStatement.writePointCountList = new ArrayList<>(1); - subStatement.isTableModel = Collections.singletonList(true); + // Set all files in the batch + subStatement.tsFiles = new ArrayList<>(batchFiles); + subStatement.resources = new ArrayList<>(batchFiles.size()); + subStatement.writePointCountList = new ArrayList<>(batchFiles.size()); + subStatement.isTableModel = new ArrayList<>(batchFiles.size()); + for (int j = 0; j < batchFiles.size(); j++) { + subStatement.isTableModel.add(true); + } subStatements.add(subStatement); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java index 0352c85f1eb4..20f56c1c2f5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java @@ -21,6 +21,9 @@ import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; + public abstract class Statement extends Node { protected Statement(final @Nullable NodeLocation location) { @@ -31,4 +34,26 @@ protected Statement(final @Nullable NodeLocation location) { public R accept(final AstVisitor visitor, final C context) { return visitor.visitStatement(this, context); } + + /** + * Checks whether this statement should be split into multiple sub-statements based on the given + * async requirement. Used to limit resource consumption during statement analysis, etc. + * + * @param requireAsync whether async execution is required + * @return true if the statement should be split, false otherwise. Default implementation returns + * false. + */ + public boolean shouldSplit(final boolean requireAsync) { + return false; + } + + /** + * Splits the current statement into multiple sub-statements. Used to limit resource consumption + * during statement analysis, etc. + * + * @return the list of sub-statements. Default implementation returns empty list. + */ + public List getSubStatements() { + return Collections.emptyList(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java index 5b31f08ca677..ec4e24b515ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor; +import java.util.Collections; import java.util.List; /** @@ -68,4 +69,26 @@ public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelat public String getPipeLoggingString() { return toString(); } + + /** + * Checks whether this statement should be split into multiple sub-statements based on the given + * async requirement. Used to limit resource consumption during statement analysis, etc. + * + * @param requireAsync whether async execution is required + * @return true if the statement should be split, false otherwise. Default implementation returns + * false. + */ + public boolean shouldSplit(final boolean requireAsync) { + return false; + } + + /** + * Splits the current statement into multiple sub-statements. Used to limit resource consumption + * during statement analysis, etc. + * + * @return the list of sub-statements. Default implementation returns empty list. + */ + public List getSubStatements() { + return Collections.emptyList(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index 471315c85e40..4d6602983c40 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -306,16 +306,31 @@ public boolean reconstructStatementIfMiniFileConverted(final List isMin return tsFiles == null || tsFiles.isEmpty(); } + @Override + public boolean shouldSplit(final boolean requireAsync) { + final int splitThreshold = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileStatementSplitThreshold(); + return tsFiles.size() > splitThreshold && isAsyncLoad == requireAsync; + } + /** - * Splits the current LoadTsFileStatement into multiple sub-statements, each handling one TsFile. - * Used to support batch execution when loading multiple files. + * Splits the current LoadTsFileStatement into multiple sub-statements, each handling a batch of + * TsFiles. Used to limit resource consumption during statement analysis, etc. * * @return the list of sub-statements */ - public List getSubStatement() { - List subStatements = new ArrayList<>(tsFiles.size()); - for (int i = 0; i < tsFiles.size(); ++i) { - LoadTsFileStatement statement = new LoadTsFileStatement(); + @Override + public List getSubStatements() { + final int batchSize = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileSubStatementBatchSize(); + final int totalBatches = (tsFiles.size() + batchSize - 1) / batchSize; // Ceiling division + final List subStatements = new ArrayList<>(totalBatches); + + for (int i = 0; i < tsFiles.size(); i += batchSize) { + final int endIndex = Math.min(i + batchSize, tsFiles.size()); + final List batchFiles = tsFiles.subList(i, endIndex); + + final LoadTsFileStatement statement = new LoadTsFileStatement(); statement.databaseLevel = this.databaseLevel; statement.verifySchema = this.verifySchema; statement.deleteAfterLoad = this.deleteAfterLoad; @@ -325,11 +340,13 @@ public List getSubStatement() { statement.isAsyncLoad = this.isAsyncLoad; statement.isGeneratedByPipe = this.isGeneratedByPipe; - statement.tsFiles = Collections.singletonList(tsFiles.get(i)); - statement.resources = new ArrayList<>(1); - statement.writePointCountList = new ArrayList<>(1); - statement.isTableModel = new ArrayList<>(1); - statement.isTableModel.add(false); + statement.tsFiles = new ArrayList<>(batchFiles); + statement.resources = new ArrayList<>(batchFiles.size()); + statement.writePointCountList = new ArrayList<>(batchFiles.size()); + statement.isTableModel = new ArrayList<>(batchFiles.size()); + for (int j = 0; j < batchFiles.size(); j++) { + statement.isTableModel.add(false); + } statement.statementType = StatementType.MULTI_BATCH_INSERT; subStatements.add(statement); } From 3e8d9b2123e69792469da7cb1e99033374def0ca Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Tue, 9 Dec 2025 11:55:18 +0800 Subject: [PATCH 3/3] update --- .../thrift/impl/ClientRPCServiceImpl.java | 197 +++++++----------- .../plan/relational/sql/ast/LoadTsFile.java | 4 +- .../plan/relational/sql/ast/Statement.java | 2 +- .../queryengine/plan/statement/Statement.java | 2 +- .../statement/crud/LoadTsFileStatement.java | 6 +- 5 files changed, 82 insertions(+), 129 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 59b5c238a0a1..f31988bb5250 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -366,7 +366,7 @@ private TSExecuteStatementResp executeStatementInternal( queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // Split statement if needed to limit resource consumption during statement analysis - if (s.shouldSplit(false)) { + if (s.shouldSplit()) { result = executeBatchStatement( s, @@ -375,7 +375,8 @@ private TSExecuteStatementResp executeStatementInternal( statement, partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); } else { result = COORDINATOR.executeForTreeModel( @@ -410,7 +411,7 @@ private TSExecuteStatementResp executeStatementInternal( queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // Split statement if needed to limit resource consumption during statement analysis - if (s.shouldSplit(false)) { + if (s.shouldSplit()) { result = executeBatchTableStatement( s, @@ -420,7 +421,8 @@ private TSExecuteStatementResp executeStatementInternal( SESSION_MANAGER.getSessionInfo(clientSession), statement, metadata, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); } else { result = COORDINATOR.executeForTableModel( @@ -1874,7 +1876,7 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { // create and cache dataset // Split statement if needed to limit resource consumption during statement analysis - if (s.shouldSplit(true)) { + if (s.shouldSplit()) { result = executeBatchStatement( s, @@ -1883,7 +1885,8 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { statement, partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + false); } else { result = COORDINATOR.executeForTreeModel( @@ -1917,7 +1920,7 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { queryId = SESSION_MANAGER.requestQueryId(); // Split statement if needed to limit resource consumption during statement analysis - if (s.shouldSplit(true)) { + if (s.shouldSplit()) { result = executeBatchTableStatement( s, @@ -1927,7 +1930,8 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { SESSION_MANAGER.getSessionInfo(clientSession), statement, metadata, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + false); } else { result = COORDINATOR.executeForTableModel( @@ -3259,13 +3263,14 @@ public void handleClientExit() { * @return the execution result */ private ExecutionResult executeBatchStatement( - Statement statement, - long queryId, - SessionInfo sessionInfo, - String statementStr, - IPartitionFetcher partitionFetcher, - ISchemaFetcher schemaFetcher, - long timeoutMs) { + final Statement statement, + final long queryId, + final SessionInfo sessionInfo, + final String statementStr, + final IPartitionFetcher partitionFetcher, + final ISchemaFetcher schemaFetcher, + final long timeoutMs, + final boolean userQuery) { ExecutionResult result = null; final List subStatements = statement.getSubStatements(); @@ -3278,23 +3283,12 @@ private ExecutionResult executeBatchStatement( for (int i = 0; i < totalSubStatements; i++) { final Statement subStatement = subStatements.get(i); - final List subSubStatements = subStatement.getSubStatements(); - final int batchSize = subSubStatements.isEmpty() ? 1 : subSubStatements.size(); - if (batchSize == 1) { - LOGGER.info( - "Executing sub-statement {}/{} in tree model, queryId: {}", - i + 1, - totalSubStatements, - queryId); - } else { - LOGGER.info( - "Executing sub-statement {}/{} in tree model, batch size: {}, queryId: {}", - i + 1, - totalSubStatements, - batchSize, - queryId); - } + LOGGER.info( + "Executing sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); result = COORDINATOR.executeForTreeModel( @@ -3305,45 +3299,31 @@ private ExecutionResult executeBatchStatement( partitionFetcher, schemaFetcher, timeoutMs, - false); + userQuery); // Exit early if any sub-statement execution fails if (result != null && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - if (batchSize == 1) { - LOGGER.warn( - "Failed to execute sub-statement {}/{} in tree model, queryId: {}, error: {}", - i + 1, - totalSubStatements, - queryId, - result.status.getMessage()); - } else { - LOGGER.warn( - "Failed to execute sub-statement {}/{} in tree model, batch size: {}, queryId: {}, error: {}", - i + 1, - totalSubStatements, - batchSize, - queryId, - result.status.getMessage()); - } - break; - } - - processedCount += batchSize; - if (batchSize == 1) { - LOGGER.info( - "Successfully executed sub-statement {}/{} in tree model, queryId: {}", - i + 1, - totalSubStatements, - queryId); - } else { - LOGGER.info( - "Successfully executed sub-statement {}/{} in tree model, batch size: {}, queryId: {}", + final int completed = i + 1; + final int remaining = totalSubStatements - completed; + final double percentage = (completed * 100.0) / totalSubStatements; + LOGGER.warn( + "Failed to execute sub-statement {}/{} in tree model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", i + 1, totalSubStatements, - batchSize, - queryId); + queryId, + completed, + remaining, + String.format("%.2f", percentage), + result.status.getMessage()); + break; } + + LOGGER.info( + "Successfully executed sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); } if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -3370,21 +3350,20 @@ private ExecutionResult executeBatchStatement( * @return the execution result */ private ExecutionResult executeBatchTableStatement( - org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement, - SqlParser relationSqlParser, - IClientSession clientSession, - long queryId, - SessionInfo sessionInfo, - String statementStr, - Metadata metadata, - long timeoutMs) { + final org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement, + final SqlParser relationSqlParser, + final IClientSession clientSession, + final long queryId, + final SessionInfo sessionInfo, + final String statementStr, + final Metadata metadata, + final long timeoutMs, + final boolean userQuery) { ExecutionResult result = null; List subStatements = statement.getSubStatements(); int totalSubStatements = subStatements.size(); - int processedCount = 0; - LOGGER.info( "Start batch executing {} sub-statement(s) in table model, queryId: {}", totalSubStatements, @@ -3393,24 +3372,12 @@ private ExecutionResult executeBatchTableStatement( for (int i = 0; i < totalSubStatements; i++) { final org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement subStatement = subStatements.get(i); - final List - subSubStatements = subStatement.getSubStatements(); - final int batchSize = subSubStatements.isEmpty() ? 1 : subSubStatements.size(); - if (batchSize == 1) { - LOGGER.info( - "Executing sub-statement {}/{} in table model, queryId: {}", - i + 1, - totalSubStatements, - queryId); - } else { - LOGGER.info( - "Executing sub-statement {}/{} in table model, batch size: {}, queryId: {}", - i + 1, - totalSubStatements, - batchSize, - queryId); - } + LOGGER.info( + "Executing sub-statement {}/{} in table model, queryId: {}", + i + 1, + totalSubStatements, + queryId); result = COORDINATOR.executeForTableModel( @@ -3422,45 +3389,31 @@ private ExecutionResult executeBatchTableStatement( statementStr, metadata, timeoutMs, - false); + userQuery); // Exit early if any sub-statement execution fails if (result != null && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - if (batchSize == 1) { - LOGGER.warn( - "Failed to execute sub-statement {}/{} in table model, queryId: {}, error: {}", - i + 1, - totalSubStatements, - queryId, - result.status.getMessage()); - } else { - LOGGER.warn( - "Failed to execute sub-statement {}/{} in table model, batch size: {}, queryId: {}, error: {}", - i + 1, - totalSubStatements, - batchSize, - queryId, - result.status.getMessage()); - } - break; - } - - processedCount += batchSize; - if (batchSize == 1) { - LOGGER.info( - "Successfully executed sub-statement {}/{} in table model, queryId: {}", - i + 1, - totalSubStatements, - queryId); - } else { - LOGGER.info( - "Successfully executed sub-statement {}/{} in table model, batch size: {}, queryId: {}", + final int completed = i + 1; + final int remaining = totalSubStatements - completed; + final double percentage = (completed * 100.0) / totalSubStatements; + LOGGER.warn( + "Failed to execute sub-statement {}/{} in table model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", i + 1, totalSubStatements, - batchSize, - queryId); + queryId, + completed, + remaining, + String.format("%.2f", percentage), + result.status.getMessage()); + break; } + + LOGGER.info( + "Successfully executed sub-statement {}/{} in table model, queryId: {}", + i + 1, + totalSubStatements, + queryId); } if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index 71ecae164526..166f06b85e32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -233,10 +233,10 @@ public boolean reconstructStatementIfMiniFileConverted(final List isMin } @Override - public boolean shouldSplit(final boolean requireAsync) { + public boolean shouldSplit() { final int splitThreshold = IoTDBDescriptor.getInstance().getConfig().getLoadTsFileStatementSplitThreshold(); - return tsFiles.size() > splitThreshold && isAsyncLoad == requireAsync; + return tsFiles.size() > splitThreshold && !isAsyncLoad; } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java index 20f56c1c2f5d..7ba19b972a29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java @@ -43,7 +43,7 @@ public R accept(final AstVisitor visitor, final C context) { * @return true if the statement should be split, false otherwise. Default implementation returns * false. */ - public boolean shouldSplit(final boolean requireAsync) { + public boolean shouldSplit() { return false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java index ec4e24b515ab..0b2ecff6b5bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java @@ -78,7 +78,7 @@ public String getPipeLoggingString() { * @return true if the statement should be split, false otherwise. Default implementation returns * false. */ - public boolean shouldSplit(final boolean requireAsync) { + public boolean shouldSplit() { return false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index 4d6602983c40..a51dcaf09d2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -307,10 +307,10 @@ public boolean reconstructStatementIfMiniFileConverted(final List isMin } @Override - public boolean shouldSplit(final boolean requireAsync) { + public boolean shouldSplit() { final int splitThreshold = IoTDBDescriptor.getInstance().getConfig().getLoadTsFileStatementSplitThreshold(); - return tsFiles.size() > splitThreshold && isAsyncLoad == requireAsync; + return tsFiles.size() > splitThreshold && !isAsyncLoad; } /** @@ -347,7 +347,7 @@ public List getSubStatements() { for (int j = 0; j < batchFiles.size(); j++) { statement.isTableModel.add(false); } - statement.statementType = StatementType.MULTI_BATCH_INSERT; + subStatements.add(statement); }