diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/SqlSourceType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/SqlSourceType.java new file mode 100644 index 000000000000..3c812049bbcf --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/SqlSourceType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.enums; + +public enum SqlSourceType { + /** + * SCRIPT: inline sql text + * FILE: sql from resource center file + */ + SCRIPT, FILE +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java index 8864943bac43..c614bfc15e8c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.task.api.SQLTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; +import org.apache.dolphinscheduler.plugin.task.api.enums.SqlSourceType; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; @@ -58,6 +59,16 @@ public class SqlParameters extends AbstractParameters { */ private String sql; + /** + * sql source + */ + private SqlSourceType sqlSource; + + /** + * sql resource file path in resource center + */ + private String sqlResource; + /** * sql type * 0 query @@ -139,6 +150,22 @@ public void setSql(String sql) { this.sql = sql; } + public SqlSourceType getSqlSource() { + return sqlSource; + } + + public void setSqlSource(SqlSourceType sqlSource) { + this.sqlSource = sqlSource; + } + + public String getSqlResource() { + return sqlResource; + } + + public void setSqlResource(String sqlResource) { + this.sqlResource = sqlResource; + } + public int getSqlType() { return sqlType; } @@ -213,12 +240,24 @@ public void setGroupId(int groupId) { @Override public boolean checkParameters() { - return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql); + if (datasource == 0 || StringUtils.isEmpty(type)) { + return false; + } + if (StringUtils.isNotEmpty(sql)) { + return true; + } + return StringUtils.isNotEmpty(sqlResource); } @Override public List getResourceFilesList() { - return new ArrayList<>(); + List resourceFiles = new ArrayList<>(); + if (StringUtils.isNotEmpty(sqlResource)) { + ResourceInfo resourceInfo = new ResourceInfo(); + resourceInfo.setResourceName(sqlResource); + resourceFiles.add(resourceInfo); + } + return resourceFiles; } public void dealOutParam(String result) { @@ -272,6 +311,8 @@ public String toString() { + "type='" + type + '\'' + ", datasource=" + datasource + ", sql='" + sql + '\'' + + ", sqlSource='" + sqlSource + '\'' + + ", sqlResource='" + sqlResource + '\'' + ", sqlType=" + sqlType + ", sendEmail=" + sendEmail + ", displayRows=" + displayRows diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java index 8f1ee7656000..c141612f96c0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParametersTest.java @@ -87,5 +87,10 @@ public void testSqlParameters() { sqlParameters.setLocalParams(properties); sqlParameters.dealOutParam(sqlResult); Assertions.assertNotNull(sqlParameters.getVarPool().get(0)); + + // resource files list should contain sqlResource when it is set + sqlParameters.setSql(null); + sqlParameters.setSqlResource("/sql/demo.sql"); + Assertions.assertFalse(CollectionUtils.isEmpty(sqlParameters.getResourceFilesList())); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 33b85e1a66b8..dd976821db40 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -36,11 +36,16 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; +import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.commons.lang3.StringUtils; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -48,6 +53,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -128,6 +134,8 @@ public void handle(TaskCallBack taskCallBack) throws TaskException { sqlParameters.getLimit()); try { + ensureSqlContent(); + // get datasource baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType, sqlTaskExecutionContext.getConnectionParams()); @@ -405,6 +413,31 @@ private void printReplacedSql(String content, String formatSql, String rgex, Map log.info("Sql Params are {}", logPrint); } + private void ensureSqlContent() { + if (StringUtils.isNotEmpty(sqlParameters.getSql())) { + return; + } + if (StringUtils.isEmpty(sqlParameters.getSqlResource())) { + return; + } + String resourcePathInStorage = sqlParameters.getSqlResource(); + try { + ResourceContext resourceContext = taskExecutionContext.getResourceContext(); + ResourceContext.ResourceItem resourceItem = + resourceContext.getResourceItem(resourcePathInStorage); + String localPath = resourceItem.getResourceAbsolutePathInLocal(); + log.info("Load sql content from resource file: {}", resourcePathInStorage); + String sqlContent = new String( + Files.readAllBytes(Paths.get(localPath)), + StandardCharsets.UTF_8); + sqlParameters.setSql(sqlContent); + } catch (IOException e) { + log.error("Read sql content from resource file {} error", resourcePathInStorage, e); + throw new TaskException(String.format("Read sql content from resource file %s error", resourcePathInStorage), + e); + } + } + /** * ready to execute SQL and parameter entity Map * @@ -420,19 +453,22 @@ private SqlBinds getSqlAndSqlParamsMap(String sql) { Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - // spell SQL according to the final user-defined variable - if (paramsMap == null) { - sqlBuilder.append(sql); - return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); - } + Map placeholderParamsMap = paramsMap == null + ? Collections.emptyMap() + : ParameterUtils.convert(paramsMap); if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { - String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(), - ParameterUtils.convert(paramsMap)); + String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(), placeholderParamsMap); log.info("SQL title : {}", title); sqlParameters.setTitle(title); } + // spell SQL according to the final user-defined variable + if (paramsMap == null) { + sqlBuilder.append(sql); + return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); + } + // special characters need to be escaped, ${} needs to be escaped setSqlParamsMap(sql, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId()); // Replace the original value in sql !{...} ,Does not participate in precompilation diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java index 3832d653a527..587019267653 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java @@ -31,11 +31,15 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.dolphinscheduler.spi.enums.DbType; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.util.HashMap; @@ -44,6 +48,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -69,6 +74,47 @@ void setup() { sqlTask = new SqlTask(ctx); } + @Test + void testSqlLoadedFromResourceFileWhenSqlIsEmpty(@TempDir Path tempDir) throws Exception { + Path sqlFile = tempDir.resolve("test.sql"); + String sqlContent = "SELECT 1"; + Files.write(sqlFile, sqlContent.getBytes(StandardCharsets.UTF_8)); + + SqlParameters sqlParameters = new SqlParameters(); + sqlParameters.setType("MYSQL"); + sqlParameters.setDatasource(1); + sqlParameters.setSql(null); + sqlParameters.setSqlResource("/sql/test.sql"); + + DataSourceParameters dataSourceParameters = new DataSourceParameters(); + dataSourceParameters.setType(DbType.MYSQL); + dataSourceParameters.setResourceType(ResourceType.DATASOURCE.name()); + + ResourceParametersHelper resourceParametersHelper = new ResourceParametersHelper(); + resourceParametersHelper.put(ResourceType.DATASOURCE, 1, dataSourceParameters); + + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskParams(JSONUtils.toJsonString(sqlParameters)); + taskExecutionContext.setScheduleTime(System.currentTimeMillis()); + taskExecutionContext.setResourceParametersHelper(resourceParametersHelper); + + ResourceContext resourceContext = new ResourceContext(); + resourceContext.addResourceItem(ResourceContext.ResourceItem.builder() + .resourceAbsolutePathInStorage(sqlParameters.getSqlResource()) + .resourceAbsolutePathInLocal(sqlFile.toString()) + .build()); + taskExecutionContext.setResourceContext(resourceContext); + + SqlTask task = new SqlTask(taskExecutionContext); + + Method ensureSqlContent = SqlTask.class.getDeclaredMethod("ensureSqlContent"); + ensureSqlContent.setAccessible(true); + ensureSqlContent.invoke(task); + + SqlParameters loadedParameters = (SqlParameters) task.getParameters(); + Assertions.assertEquals(sqlContent, loadedParameters.getSql()); + } + @Test void testReplacingSqlWithoutParams() { String querySql = "select 1"; @@ -210,7 +256,6 @@ void testVarPoolSetting() { @Test void testGenerateEmptyRow_WithNonNullResultSet_ReturnsEmptyValuesForAllColumns() throws Exception { - // Arrange ResultSet mockResultSet = mock(ResultSet.class); ResultSetMetaData mockMetaData = mock(ResultSetMetaData.class); @@ -222,10 +267,8 @@ void testGenerateEmptyRow_WithNonNullResultSet_ReturnsEmptyValuesForAllColumns() Method method = SqlTask.class.getDeclaredMethod("generateEmptyRow", ResultSet.class); method.setAccessible(true); - // Act ArrayNode result = (ArrayNode) method.invoke(sqlTask, mockResultSet); - // Assert Assertions.assertNotNull(result); Assertions.assertEquals(1, result.size()); @@ -236,14 +279,11 @@ void testGenerateEmptyRow_WithNonNullResultSet_ReturnsEmptyValuesForAllColumns() @Test void testGenerateEmptyRow_WithNullResultSet_ReturnsErrorObject() throws Exception { - // Arrange Method method = SqlTask.class.getDeclaredMethod("generateEmptyRow", ResultSet.class); method.setAccessible(true); - // Act ArrayNode result = (ArrayNode) method.invoke(sqlTask, (ResultSet) null); - // Assert Assertions.assertNotNull(result); Assertions.assertEquals(1, result.size()); @@ -260,7 +300,7 @@ void testGenerateEmptyRow_WithDuplicateColumns_DeduplicatesLabels() throws Excep when(mockResultSet.getMetaData()).thenReturn(mockMetaData); when(mockMetaData.getColumnCount()).thenReturn(3); when(mockMetaData.getColumnLabel(1)).thenReturn("id"); - when(mockMetaData.getColumnLabel(2)).thenReturn("id"); // duplicate + when(mockMetaData.getColumnLabel(2)).thenReturn("id"); when(mockMetaData.getColumnLabel(3)).thenReturn("name"); Method method = SqlTask.class.getDeclaredMethod("generateEmptyRow", ResultSet.class); @@ -281,7 +321,6 @@ void testResultProcess_NullResultSet_ReturnsEmptyResult() throws Exception { Method resultProcessMethod = SqlTask.class.getDeclaredMethod("resultProcess", ResultSet.class); resultProcessMethod.setAccessible(true); - // Mock a null ResultSet String result = (String) resultProcessMethod.invoke(sqlTask, (ResultSet) null); Assertions.assertNotNull(result); @@ -290,7 +329,6 @@ void testResultProcess_NullResultSet_ReturnsEmptyResult() throws Exception { @Test void testResultProcess_EmptyResultSet_ReturnsEmptyResult() throws Exception { - // Mock a non-null ResultSet that contains no data rows ResultSet mockResultSet = mock(ResultSet.class); ResultSetMetaData mockMetaData = mock(ResultSetMetaData.class); @@ -298,7 +336,7 @@ void testResultProcess_EmptyResultSet_ReturnsEmptyResult() throws Exception { when(mockMetaData.getColumnCount()).thenReturn(2); when(mockMetaData.getColumnLabel(1)).thenReturn("id"); when(mockMetaData.getColumnLabel(2)).thenReturn("name"); - when(mockResultSet.next()).thenReturn(false); // no rows available + when(mockResultSet.next()).thenReturn(false); Method resultProcessMethod = SqlTask.class.getDeclaredMethod("resultProcess", ResultSet.class); resultProcessMethod.setAccessible(true); @@ -306,7 +344,6 @@ void testResultProcess_EmptyResultSet_ReturnsEmptyResult() throws Exception { String result = (String) resultProcessMethod.invoke(sqlTask, mockResultSet); Assertions.assertNotNull(result); - // Verify the result contains empty string values for all columns and is a valid JSON array Assertions.assertTrue(result.contains("\"id\":\"\"")); Assertions.assertTrue(result.contains("\"name\":\"\"")); Assertions.assertTrue(result.startsWith("[{")); @@ -321,17 +358,15 @@ void testResultProcess_DuplicateColumnLabels_ThrowsTaskException() throws Except when(mockRs.getMetaData()).thenReturn(mockMd); when(mockMd.getColumnCount()).thenReturn(2); when(mockMd.getColumnLabel(1)).thenReturn("id"); - when(mockMd.getColumnLabel(2)).thenReturn("id"); // duplicate column name + when(mockMd.getColumnLabel(2)).thenReturn("id"); Method method = SqlTask.class.getDeclaredMethod("resultProcess", ResultSet.class); method.setAccessible(true); - // Assert that InvocationTargetException is thrown InvocationTargetException thrown = Assertions.assertThrows( InvocationTargetException.class, () -> method.invoke(sqlTask, mockRs)); - // Check the actual cause Throwable cause = thrown.getCause(); Assertions.assertNotNull(cause); Assertions.assertInstanceOf(TaskException.class, cause, diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 667153f2156a..f10e7fbdd0c2 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -552,10 +552,14 @@ export default { sql_type_query: 'Query', sql_type_non_query: 'Non Query', sql_statement: 'SQL Statement', + sql_source: 'SQL Source', + sql_source_script: 'Script', + sql_source_file: 'Resource file', pre_sql_statement: 'Pre SQL Statement', post_sql_statement: 'Post SQL Statement', sql_input_placeholder: 'Please enter non-query sql.', sql_empty_tips: 'The sql can not be empty.', + sql_resource_file: 'SQL Resource File', procedure_method: 'SQL Statement', procedure_method_tips: 'Please enter the procedure script', procedure_method_snippet: diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index ef9634095d9f..a3ad17816afb 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -534,10 +534,14 @@ export default { sql_type_query: '查询', sql_type_non_query: '非查询', sql_statement: 'SQL语句', + sql_source: 'SQL来源', + sql_source_script: '脚本', + sql_source_file: '资源文件', pre_sql_statement: '前置SQL语句', post_sql_statement: '后置SQL语句', sql_input_placeholder: '请输入非查询SQL语句', sql_empty_tips: '语句不能为空', + sql_resource_file: 'SQL资源文件', procedure_method: 'SQL语句', procedure_method_tips: '请输入存储脚本', procedure_method_snippet: diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql.ts index 0044fa9cf8a2..1de39efec114 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sql.ts @@ -22,6 +22,9 @@ import type { IJsonItem } from '../types' export function useSql(model: { [field: string]: any }): IJsonItem[] { const { t } = useI18n() const hiveSpan = computed(() => (model.type === 'HIVE' ? 24 : 0)) + const showScriptEditor = computed( + () => model.sqlSource === 'SCRIPT' || !model.sqlSource + ) return [ { @@ -34,10 +37,27 @@ export function useSql(model: { [field: string]: any }): IJsonItem[] { }, span: hiveSpan }, + { + type: 'radio', + field: 'sqlSource', + name: t('project.node.sql_source'), + options: [ + { + label: t('project.node.sql_source_script'), + value: 'SCRIPT' + }, + { + label: t('project.node.sql_source_file'), + value: 'FILE' + } + ], + span: 24 + }, { type: 'editor', field: 'sql', name: t('project.node.sql_statement'), + if: showScriptEditor, validate: { trigger: ['input', 'trigger'], required: true, @@ -47,6 +67,20 @@ export function useSql(model: { [field: string]: any }): IJsonItem[] { language: 'sql' } }, + { + type: 'tree-select', + field: 'sqlResource', + name: t('project.node.sql_resource_file'), + span: 24, + if: () => model.sqlSource === 'FILE', + props: { + placeholder: t('project.node.resources_tips'), + keyField: 'fullName', + labelField: 'name', + disabledField: 'disable' + }, + slots: {} + }, ...useCustomParams({ model, field: 'localParams', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts index e967c2f09b5c..371ae737fa7d 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts @@ -47,6 +47,8 @@ export function useSql({ type: 'MYSQL', displayRows: 10, sql: '', + sqlSource: 'SCRIPT', + sqlResource: '', sqlType: '0', preStatements: [], postStatements: [],