diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java index 70e774ea8307..34bf879a8821 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.datax; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; @@ -95,6 +94,11 @@ public class DataxParameters extends AbstractParameters { */ private int jobSpeedRecord; + /** + * datax channel + */ + private int jobChannel; + /** * Xms memory */ @@ -206,6 +210,14 @@ public void setJobSpeedRecord(int jobSpeedRecord) { this.jobSpeedRecord = jobSpeedRecord; } + public int getJobChannel() { + return jobChannel; + } + + public void setJobChannel(int jobChannel) { + this.jobChannel = jobChannel; + } + public int getXms() { return xms; } @@ -249,23 +261,24 @@ public List getResourceFilesList() { @Override public String toString() { - return "DataxParameters{" - + "customConfig=" + customConfig - + ", json='" + json + '\'' - + ", dsType='" + dsType + '\'' - + ", dataSource=" + dataSource - + ", dtType='" + dtType + '\'' - + ", dataTarget=" + dataTarget - + ", sql='" + sql + '\'' - + ", targetTable='" + targetTable + '\'' - + ", preStatements=" + preStatements - + ", postStatements=" + postStatements - + ", jobSpeedByte=" + jobSpeedByte - + ", jobSpeedRecord=" + jobSpeedRecord - + ", xms=" + xms - + ", xmx=" + xmx - + ", resourceList=" + JSONUtils.toJsonString(resourceList) - + '}'; + return "DataxParameters{" + + "customConfig=" + customConfig + + ", json='" + json + '\'' + + ", dsType='" + dsType + '\'' + + ", dataSource=" + dataSource + + ", dtType='" + dtType + '\'' + + ", dataTarget=" + dataTarget + + ", sql='" + sql + '\'' + + ", targetTable='" + targetTable + '\'' + + ", preStatements=" + preStatements + + ", postStatements=" + postStatements + + ", jobSpeedByte=" + jobSpeedByte + + ", jobSpeedRecord=" + jobSpeedRecord + + ", jobChannel=" + jobChannel + + ", xms=" + xms + + ", xmx=" + xmx + + ", resourceList=" + resourceList + + '}'; } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index fd173eb4ef7c..d9e0bd27fe3f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -55,6 +55,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; @@ -309,7 +310,7 @@ private ObjectNode buildDataxJobSettingJson() { ObjectNode speed = JSONUtils.createObjectNode(); - speed.put("channel", DATAX_CHANNEL_COUNT); + speed.put("channel", Optional.of(dataXParameters.getJobChannel()).orElse(DATAX_CHANNEL_COUNT)); if (dataXParameters.getJobSpeedByte() > 0) { speed.put("byte", dataXParameters.getJobSpeedByte()); @@ -333,7 +334,7 @@ private ObjectNode buildDataxJobSettingJson() { private ObjectNode buildDataxCoreJson() { ObjectNode speed = JSONUtils.createObjectNode(); - speed.put("channel", DATAX_CHANNEL_COUNT); + speed.put("channel", Optional.of(dataXParameters.getJobChannel()).orElse(DATAX_CHANNEL_COUNT)); if (dataXParameters.getJobSpeedByte() > 0) { speed.put("byte", dataXParameters.getJobSpeedByte()); diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 916893b02a0c..ab30af82cf5e 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -647,6 +647,7 @@ export default { datax_job_speed_byte_info: '(0 means unlimited)', datax_job_speed_record: 'Speed(Record count)', datax_job_speed_record_info: '(0 means unlimited)', + datax_job_channel: 'datax channel', datax_job_runtime_memory: 'Runtime Memory Limits', datax_job_runtime_memory_xms: 'Low Limit Value', datax_job_runtime_memory_xmx: 'High Limit Value', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 46fd12e348fd..4f66d8765996 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -629,6 +629,7 @@ export default { datax_job_speed_byte_info: '(KB,0代表不限制)', datax_job_speed_record: '限流(记录数)', datax_job_speed_record_info: '(0代表不限制)', + datax_job_channel: '数据管道数', datax_job_runtime_memory: '运行内存', datax_job_runtime_memory_xms: '最小内存', datax_job_runtime_memory_xmx: '最大内存', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts index b390d8891a12..cc38bca9b4b4 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts @@ -78,6 +78,28 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] { value: 3000 } ] + const jobChannelOptions: any[] = [ + { + label: '1', + value: 1 + }, + { + label: '3', + value: 3 + }, + { + label: '5', + value: 5 + }, + { + label: '10', + value: 10 + }, + { + label: '15', + value: 15 + } + ] const memoryLimitOptions = [ { label: '1G', @@ -264,6 +286,14 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] { options: memoryLimitOptions, value: 1 }, + { + type: 'input', + field: 'jobChannel', + name: t('project.node.datax_job_channel'), + span: jobSpeedSpan, + options: jobChannelOptions, + value: 1 + }, ...useCustomParams({ model, field: 'localParams', isSimple: true }) ] } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 8fc67906775c..974da0b44c15 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -274,6 +274,7 @@ export function formatParams(data: INodeData): { taskParams.targetTable = data.targetTable taskParams.jobSpeedByte = data.jobSpeedByte taskParams.jobSpeedRecord = data.jobSpeedRecord + taskParams.jobChannel = data.jobChannel taskParams.preStatements = data.preStatements taskParams.postStatements = data.postStatements } else { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 76dcde23ee2d..b576ca8f1a73 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -340,6 +340,7 @@ interface ITaskParams { targetTable?: string jobSpeedByte?: number jobSpeedRecord?: number + jobChannel?: number xms?: number xmx?: number sparkParameters?: ISparkParameters