Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.plugin.task.datax;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
Expand Down Expand Up @@ -95,6 +94,11 @@ public class DataxParameters extends AbstractParameters {
*/
private int jobSpeedRecord;

/**
* datax channel
*/
private int jobChannel;

/**
* Xms memory
*/
Expand Down Expand Up @@ -206,6 +210,14 @@ public void setJobSpeedRecord(int jobSpeedRecord) {
this.jobSpeedRecord = jobSpeedRecord;
}

public int getJobChannel() {
return jobChannel;
}

public void setJobChannel(int jobChannel) {
this.jobChannel = jobChannel;
}

public int getXms() {
return xms;
}
Expand Down Expand Up @@ -249,23 +261,24 @@ public List<ResourceInfo> getResourceFilesList() {

@Override
public String toString() {
return "DataxParameters{"
+ "customConfig=" + customConfig
+ ", json='" + json + '\''
+ ", dsType='" + dsType + '\''
+ ", dataSource=" + dataSource
+ ", dtType='" + dtType + '\''
+ ", dataTarget=" + dataTarget
+ ", sql='" + sql + '\''
+ ", targetTable='" + targetTable + '\''
+ ", preStatements=" + preStatements
+ ", postStatements=" + postStatements
+ ", jobSpeedByte=" + jobSpeedByte
+ ", jobSpeedRecord=" + jobSpeedRecord
+ ", xms=" + xms
+ ", xmx=" + xmx
+ ", resourceList=" + JSONUtils.toJsonString(resourceList)
+ '}';
return "DataxParameters{" +
"customConfig=" + customConfig +
", json='" + json + '\'' +
", dsType='" + dsType + '\'' +
", dataSource=" + dataSource +
", dtType='" + dtType + '\'' +
", dataTarget=" + dataTarget +
", sql='" + sql + '\'' +
", targetTable='" + targetTable + '\'' +
", preStatements=" + preStatements +
", postStatements=" + postStatements +
", jobSpeedByte=" + jobSpeedByte +
", jobSpeedRecord=" + jobSpeedRecord +
", jobChannel=" + jobChannel +
", xms=" + xms +
", xmx=" + xmx +
", resourceList=" + resourceList +
'}';
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -309,7 +310,7 @@ private ObjectNode buildDataxJobSettingJson() {

ObjectNode speed = JSONUtils.createObjectNode();

speed.put("channel", DATAX_CHANNEL_COUNT);
speed.put("channel", Optional.of(dataXParameters.getJobChannel()).orElse(DATAX_CHANNEL_COUNT));

if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
Expand All @@ -333,7 +334,7 @@ private ObjectNode buildDataxJobSettingJson() {
private ObjectNode buildDataxCoreJson() {

ObjectNode speed = JSONUtils.createObjectNode();
speed.put("channel", DATAX_CHANNEL_COUNT);
speed.put("channel", Optional.of(dataXParameters.getJobChannel()).orElse(DATAX_CHANNEL_COUNT));

if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
Expand Down
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ export default {
datax_job_speed_byte_info: '(0 means unlimited)',
datax_job_speed_record: 'Speed(Record count)',
datax_job_speed_record_info: '(0 means unlimited)',
datax_job_channel: 'datax channel',
datax_job_runtime_memory: 'Runtime Memory Limits',
datax_job_runtime_memory_xms: 'Low Limit Value',
datax_job_runtime_memory_xmx: 'High Limit Value',
Expand Down
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ export default {
datax_job_speed_byte_info: '(KB,0代表不限制)',
datax_job_speed_record: '限流(记录数)',
datax_job_speed_record_info: '(0代表不限制)',
datax_job_channel: '数据管道数',
datax_job_runtime_memory: '运行内存',
datax_job_runtime_memory_xms: '最小内存',
datax_job_runtime_memory_xmx: '最大内存',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,28 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
value: 3000
}
]
const jobChannelOptions: any[] = [
{
label: '1',
value: 1
},
{
label: '3',
value: 3
},
{
label: '5',
value: 5
},
{
label: '10',
value: 10
},
{
label: '15',
value: 15
}
]
const memoryLimitOptions = [
{
label: '1G',
Expand Down Expand Up @@ -264,6 +286,14 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
options: memoryLimitOptions,
value: 1
},
{
type: 'input',
field: 'jobChannel',
name: t('project.node.datax_job_channel'),
span: jobSpeedSpan,
options: jobChannelOptions,
value: 1
},
...useCustomParams({ model, field: 'localParams', isSimple: true })
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ export function formatParams(data: INodeData): {
taskParams.targetTable = data.targetTable
taskParams.jobSpeedByte = data.jobSpeedByte
taskParams.jobSpeedRecord = data.jobSpeedRecord
taskParams.jobChannel = data.jobChannel
taskParams.preStatements = data.preStatements
taskParams.postStatements = data.postStatements
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ interface ITaskParams {
targetTable?: string
jobSpeedByte?: number
jobSpeedRecord?: number
jobChannel?: number
xms?: number
xmx?: number
sparkParameters?: ISparkParameters
Expand Down
Loading