diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 81a8aa4f50a3..edb1d1c26743 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1121,6 +1121,21 @@ public class IoTDBConfig { private int loadTsFileSpiltPartitionMaxSize = 10; + /** + * The threshold for splitting statement when loading multiple TsFiles. When the number of TsFiles + * exceeds this threshold, the statement will be split into multiple sub-statements for batch + * execution to limit resource consumption during statement analysis. Default value is 10, which + * means splitting will occur when there are more than 10 files. + */ + private int loadTsFileStatementSplitThreshold = 10; + + /** + * The number of TsFiles that each sub-statement handles when splitting a statement. This + * parameter controls how many files are grouped together in each sub-statement during batch + * execution. Default value is 10, which means each sub-statement handles 10 files. + */ + private int loadTsFileSubStatementBatchSize = 10; + private String[] loadActiveListeningDirs = new String[] { IoTDBConstant.EXT_FOLDER_NAME @@ -4056,6 +4071,46 @@ public void setLoadTsFileSpiltPartitionMaxSize(int loadTsFileSpiltPartitionMaxSi this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize; } + public int getLoadTsFileStatementSplitThreshold() { + return loadTsFileStatementSplitThreshold; + } + + public void setLoadTsFileStatementSplitThreshold(final int loadTsFileStatementSplitThreshold) { + if (loadTsFileStatementSplitThreshold < 0) { + logger.warn( + "Invalid loadTsFileStatementSplitThreshold value: {}. Using default value: 10", + loadTsFileStatementSplitThreshold); + return; + } + if (this.loadTsFileStatementSplitThreshold != loadTsFileStatementSplitThreshold) { + logger.info( + "loadTsFileStatementSplitThreshold changed from {} to {}", + this.loadTsFileStatementSplitThreshold, + loadTsFileStatementSplitThreshold); + } + this.loadTsFileStatementSplitThreshold = loadTsFileStatementSplitThreshold; + } + + public int getLoadTsFileSubStatementBatchSize() { + return loadTsFileSubStatementBatchSize; + } + + public void setLoadTsFileSubStatementBatchSize(final int loadTsFileSubStatementBatchSize) { + if (loadTsFileSubStatementBatchSize <= 0) { + logger.warn( + "Invalid loadTsFileSubStatementBatchSize value: {}. Using default value: 10", + loadTsFileSubStatementBatchSize); + return; + } + if (this.loadTsFileSubStatementBatchSize != loadTsFileSubStatementBatchSize) { + logger.info( + "loadTsFileSubStatementBatchSize changed from {} to {}", + this.loadTsFileSubStatementBatchSize, + loadTsFileSubStatementBatchSize); + } + this.loadTsFileSubStatementBatchSize = loadTsFileSubStatementBatchSize; + } + public String[] getPipeReceiverFileDirs() { return (Objects.isNull(this.pipeReceiverFileDirs) || this.pipeReceiverFileDirs.length == 0) ? new String[] {systemDir + File.separator + "pipe" + File.separator + "receiver"} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 3c9e8159d0ff..d32b0b51f576 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2395,6 +2395,18 @@ private void loadLoadTsFileProps(TrimProperties properties) { properties.getProperty( "skip_failed_table_schema_check", String.valueOf(conf.isSkipFailedTableSchemaCheck())))); + + conf.setLoadTsFileStatementSplitThreshold( + Integer.parseInt( + properties.getProperty( + "load_tsfile_statement_split_threshold", + Integer.toString(conf.getLoadTsFileStatementSplitThreshold())))); + + conf.setLoadTsFileSubStatementBatchSize( + Integer.parseInt( + properties.getProperty( + "load_tsfile_sub_statement_batch_size", + Integer.toString(conf.getLoadTsFileSubStatementBatchSize())))); } private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOException { @@ -2443,6 +2455,18 @@ private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOE "load_tsfile_split_partition_max_size", Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize())))); + conf.setLoadTsFileStatementSplitThreshold( + Integer.parseInt( + properties.getProperty( + "load_tsfile_statement_split_threshold", + Integer.toString(conf.getLoadTsFileStatementSplitThreshold())))); + + conf.setLoadTsFileSubStatementBatchSize( + Integer.parseInt( + properties.getProperty( + "load_tsfile_sub_statement_batch_size", + Integer.toString(conf.getLoadTsFileSubStatementBatchSize())))); + conf.setSkipFailedTableSchemaCheck( Boolean.parseBoolean( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 5655758f1694..f31988bb5250 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -365,16 +365,30 @@ private TSExecuteStatementResp executeStatementInternal( queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - req.getTimeout(), - true); + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit()) { + result = + executeBatchStatement( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + true); + } else { + result = + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + req.getTimeout(), + true); + } } } else { org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement s = @@ -396,17 +410,32 @@ private TSExecuteStatementResp executeStatementInternal( queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); - result = - COORDINATOR.executeForTableModel( - s, - relationSqlParser, - clientSession, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - metadata, - req.getTimeout(), - true); + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit()) { + result = + executeBatchTableStatement( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold(), + true); + } else { + result = + COORDINATOR.executeForTableModel( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + req.getTimeout(), + true); + } } if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() @@ -1845,16 +1874,31 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { queryId = SESSION_MANAGER.requestQueryId(); type = s.getType() == null ? null : s.getType().name(); // create and cache dataset - result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - partitionFetcher, - schemaFetcher, - config.getQueryTimeoutThreshold(), - false); + + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit()) { + result = + executeBatchStatement( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); + } else { + result = + COORDINATOR.executeForTreeModel( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + partitionFetcher, + schemaFetcher, + config.getQueryTimeoutThreshold(), + false); + } } } else { @@ -1875,17 +1919,32 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { queryId = SESSION_MANAGER.requestQueryId(); - result = - COORDINATOR.executeForTableModel( - s, - relationSqlParser, - clientSession, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - statement, - metadata, - config.getQueryTimeoutThreshold(), - false); + // Split statement if needed to limit resource consumption during statement analysis + if (s.shouldSplit()) { + result = + executeBatchTableStatement( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold(), + false); + } else { + result = + COORDINATOR.executeForTableModel( + s, + relationSqlParser, + clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + statement, + metadata, + config.getQueryTimeoutThreshold(), + false); + } } results.add(result.status); @@ -3190,4 +3249,180 @@ public void handleClientExit() { PipeDataNodeAgent.receiver().legacy().handleClientExit(); SubscriptionAgent.receiver().handleClientExit(); } + + /** + * Executes tree-model Statement sub-statements in batch. + * + * @param statement the Statement to be executed + * @param queryId the query ID + * @param sessionInfo the session information + * @param statementStr the SQL statement string + * @param partitionFetcher the partition fetcher + * @param schemaFetcher the schema fetcher + * @param timeoutMs the timeout in milliseconds + * @return the execution result + */ + private ExecutionResult executeBatchStatement( + final Statement statement, + final long queryId, + final SessionInfo sessionInfo, + final String statementStr, + final IPartitionFetcher partitionFetcher, + final ISchemaFetcher schemaFetcher, + final long timeoutMs, + final boolean userQuery) { + + ExecutionResult result = null; + final List subStatements = statement.getSubStatements(); + final int totalSubStatements = subStatements.size(); + + LOGGER.info( + "Start batch executing {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, + queryId); + + for (int i = 0; i < totalSubStatements; i++) { + final Statement subStatement = subStatements.get(i); + + LOGGER.info( + "Executing sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + + result = + COORDINATOR.executeForTreeModel( + subStatement, + queryId, + sessionInfo, + statementStr, + partitionFetcher, + schemaFetcher, + timeoutMs, + userQuery); + + // Exit early if any sub-statement execution fails + if (result != null + && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + final int completed = i + 1; + final int remaining = totalSubStatements - completed; + final double percentage = (completed * 100.0) / totalSubStatements; + LOGGER.warn( + "Failed to execute sub-statement {}/{} in tree model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", + i + 1, + totalSubStatements, + queryId, + completed, + remaining, + String.format("%.2f", percentage), + result.status.getMessage()); + break; + } + + LOGGER.info( + "Successfully executed sub-statement {}/{} in tree model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + } + + if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.info( + "Completed batch executing all {} sub-statement(s) in tree model, queryId: {}", + totalSubStatements, + queryId); + } + + return result; + } + + /** + * Executes table-model Statement sub-statements in batch. + * + * @param statement the Statement to be executed + * @param relationSqlParser the relational SQL parser + * @param clientSession the client session + * @param queryId the query ID + * @param sessionInfo the session information + * @param statementStr the SQL statement string + * @param metadata the metadata + * @param timeoutMs the timeout in milliseconds + * @return the execution result + */ + private ExecutionResult executeBatchTableStatement( + final org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement, + final SqlParser relationSqlParser, + final IClientSession clientSession, + final long queryId, + final SessionInfo sessionInfo, + final String statementStr, + final Metadata metadata, + final long timeoutMs, + final boolean userQuery) { + + ExecutionResult result = null; + List + subStatements = statement.getSubStatements(); + int totalSubStatements = subStatements.size(); + LOGGER.info( + "Start batch executing {} sub-statement(s) in table model, queryId: {}", + totalSubStatements, + queryId); + + for (int i = 0; i < totalSubStatements; i++) { + final org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement subStatement = + subStatements.get(i); + + LOGGER.info( + "Executing sub-statement {}/{} in table model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + + result = + COORDINATOR.executeForTableModel( + subStatement, + relationSqlParser, + clientSession, + queryId, + sessionInfo, + statementStr, + metadata, + timeoutMs, + userQuery); + + // Exit early if any sub-statement execution fails + if (result != null + && result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + final int completed = i + 1; + final int remaining = totalSubStatements - completed; + final double percentage = (completed * 100.0) / totalSubStatements; + LOGGER.warn( + "Failed to execute sub-statement {}/{} in table model, queryId: {}, completed: {}, remaining: {}, progress: {}%, error: {}", + i + 1, + totalSubStatements, + queryId, + completed, + remaining, + String.format("%.2f", percentage), + result.status.getMessage()); + break; + } + + LOGGER.info( + "Successfully executed sub-statement {}/{} in table model, queryId: {}", + i + 1, + totalSubStatements, + queryId); + } + + if (result != null && result.status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.info( + "Completed batch executing all {} sub-statement(s) in table model, queryId: {}", + totalSubStatements, + queryId); + } + + return result; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index 93fc8c7b5833..166f06b85e32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -37,7 +37,7 @@ public class LoadTsFile extends Statement { - private final String filePath; + private String filePath; private int databaseLevel; // For loading to tree-model only private String database; // For loading to table-model only @@ -50,7 +50,7 @@ public class LoadTsFile extends Statement { private boolean isGeneratedByPipe = false; - private final Map loadAttributes; + private Map loadAttributes; private List tsFiles; private List resources; @@ -232,6 +232,63 @@ public boolean reconstructStatementIfMiniFileConverted(final List isMin return tsFiles == null || tsFiles.isEmpty(); } + @Override + public boolean shouldSplit() { + final int splitThreshold = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileStatementSplitThreshold(); + return tsFiles.size() > splitThreshold && !isAsyncLoad; + } + + /** + * Splits the current LoadTsFile statement into multiple sub-statements, each handling a batch of + * TsFiles. Used to limit resource consumption during statement analysis, etc. + * + * @return the list of sub-statements + */ + @Override + public List getSubStatements() { + final int batchSize = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileSubStatementBatchSize(); + final int totalBatches = (tsFiles.size() + batchSize - 1) / batchSize; // Ceiling division + final List subStatements = new ArrayList<>(totalBatches); + + for (int i = 0; i < tsFiles.size(); i += batchSize) { + final int endIndex = Math.min(i + batchSize, tsFiles.size()); + final List batchFiles = tsFiles.subList(i, endIndex); + + // Use the first file's path for the sub-statement + final String filePath = batchFiles.get(0).getAbsolutePath(); + final Map properties = this.loadAttributes; + + final LoadTsFile subStatement = + new LoadTsFile(getLocation().orElse(null), filePath, properties); + + // Copy all configuration properties + subStatement.databaseLevel = this.databaseLevel; + subStatement.database = this.database; + subStatement.verify = this.verify; + subStatement.deleteAfterLoad = this.deleteAfterLoad; + subStatement.convertOnTypeMismatch = this.convertOnTypeMismatch; + subStatement.tabletConversionThresholdBytes = this.tabletConversionThresholdBytes; + subStatement.autoCreateDatabase = this.autoCreateDatabase; + subStatement.isAsyncLoad = this.isAsyncLoad; + subStatement.isGeneratedByPipe = this.isGeneratedByPipe; + + // Set all files in the batch + subStatement.tsFiles = new ArrayList<>(batchFiles); + subStatement.resources = new ArrayList<>(batchFiles.size()); + subStatement.writePointCountList = new ArrayList<>(batchFiles.size()); + subStatement.isTableModel = new ArrayList<>(batchFiles.size()); + for (int j = 0; j < batchFiles.size(); j++) { + subStatement.isTableModel.add(true); + } + + subStatements.add(subStatement); + } + + return subStatements; + } + @Override public R accept(AstVisitor visitor, C context) { return visitor.visitLoadTsFile(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java index 0352c85f1eb4..7ba19b972a29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Statement.java @@ -21,6 +21,9 @@ import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; + public abstract class Statement extends Node { protected Statement(final @Nullable NodeLocation location) { @@ -31,4 +34,26 @@ protected Statement(final @Nullable NodeLocation location) { public R accept(final AstVisitor visitor, final C context) { return visitor.visitStatement(this, context); } + + /** + * Checks whether this statement should be split into multiple sub-statements based on the given + * async requirement. Used to limit resource consumption during statement analysis, etc. + * + * @param requireAsync whether async execution is required + * @return true if the statement should be split, false otherwise. Default implementation returns + * false. + */ + public boolean shouldSplit() { + return false; + } + + /** + * Splits the current statement into multiple sub-statements. Used to limit resource consumption + * during statement analysis, etc. + * + * @return the list of sub-statements. Default implementation returns empty list. + */ + public List getSubStatements() { + return Collections.emptyList(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java index 5b31f08ca677..0b2ecff6b5bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor; +import java.util.Collections; import java.util.List; /** @@ -68,4 +69,26 @@ public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelat public String getPipeLoggingString() { return toString(); } + + /** + * Checks whether this statement should be split into multiple sub-statements based on the given + * async requirement. Used to limit resource consumption during statement analysis, etc. + * + * @param requireAsync whether async execution is required + * @return true if the statement should be split, false otherwise. Default implementation returns + * false. + */ + public boolean shouldSplit() { + return false; + } + + /** + * Splits the current statement into multiple sub-statements. Used to limit resource consumption + * during statement analysis, etc. + * + * @return the list of sub-statements. Default implementation returns empty list. + */ + public List getSubStatements() { + return Collections.emptyList(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index d1dff1bb9cf2..a51dcaf09d2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -306,6 +306,54 @@ public boolean reconstructStatementIfMiniFileConverted(final List isMin return tsFiles == null || tsFiles.isEmpty(); } + @Override + public boolean shouldSplit() { + final int splitThreshold = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileStatementSplitThreshold(); + return tsFiles.size() > splitThreshold && !isAsyncLoad; + } + + /** + * Splits the current LoadTsFileStatement into multiple sub-statements, each handling a batch of + * TsFiles. Used to limit resource consumption during statement analysis, etc. + * + * @return the list of sub-statements + */ + @Override + public List getSubStatements() { + final int batchSize = + IoTDBDescriptor.getInstance().getConfig().getLoadTsFileSubStatementBatchSize(); + final int totalBatches = (tsFiles.size() + batchSize - 1) / batchSize; // Ceiling division + final List subStatements = new ArrayList<>(totalBatches); + + for (int i = 0; i < tsFiles.size(); i += batchSize) { + final int endIndex = Math.min(i + batchSize, tsFiles.size()); + final List batchFiles = tsFiles.subList(i, endIndex); + + final LoadTsFileStatement statement = new LoadTsFileStatement(); + statement.databaseLevel = this.databaseLevel; + statement.verifySchema = this.verifySchema; + statement.deleteAfterLoad = this.deleteAfterLoad; + statement.convertOnTypeMismatch = this.convertOnTypeMismatch; + statement.tabletConversionThresholdBytes = this.tabletConversionThresholdBytes; + statement.autoCreateDatabase = this.autoCreateDatabase; + statement.isAsyncLoad = this.isAsyncLoad; + statement.isGeneratedByPipe = this.isGeneratedByPipe; + + statement.tsFiles = new ArrayList<>(batchFiles); + statement.resources = new ArrayList<>(batchFiles.size()); + statement.writePointCountList = new ArrayList<>(batchFiles.size()); + statement.isTableModel = new ArrayList<>(batchFiles.size()); + for (int j = 0; j < batchFiles.size(); j++) { + statement.isTableModel.add(false); + } + + subStatements.add(statement); + } + + return subStatements; + } + @Override public List getPaths() { return Collections.emptyList();