Skip to content

Commit

Permalink
Merge changes from 6.1
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jan 7, 2017
1 parent eebaffb commit b7b29d4
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 141 deletions.
97 changes: 53 additions & 44 deletions pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

package org.pentaho.di.job;

import com.google.common.base.Strings;
import org.apache.commons.vfs2.FileName;
import org.apache.commons.vfs2.FileObject;
import org.pentaho.di.cluster.ServerCache;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.*;
import org.pentaho.di.core.database.Database;
Expand Down Expand Up @@ -1705,7 +1707,6 @@ public String getStatus() {
*/
public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguration executionConfiguration,
Repository repository, IMetaStore metaStore) throws KettleException {
String carteObjectId;
SlaveServer slaveServer = executionConfiguration.getRemoteServer();

if (slaveServer == null) {
Expand All @@ -1718,58 +1719,64 @@ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguratio
// Align logging levels between execution configuration and remote server
slaveServer.getLogChannel().setLogLevel(executionConfiguration.getLogLevel());

String carteObjectId = ServerCache.getCachedIdentity(jobMeta, executionConfiguration.getParams(), slaveServer);

FileObject tempFile = null;
try {
// Inject certain internal variables to make it more intuitive.
//
for (String var : Const.INTERNAL_TRANS_VARIABLES) {
executionConfiguration.getVariables().put(var, jobMeta.getVariable(var));
}
for (String var : Const.INTERNAL_JOB_VARIABLES) {
executionConfiguration.getVariables().put(var, jobMeta.getVariable(var));
}
if (Strings.isNullOrEmpty(carteObjectId)) {
// Inject certain internal variables to make it more intuitive.
//
for (String var : Const.INTERNAL_TRANS_VARIABLES) {
executionConfiguration.getVariables().put(var, jobMeta.getVariable(var));
}
for (String var : Const.INTERNAL_JOB_VARIABLES) {
executionConfiguration.getVariables().put(var, jobMeta.getVariable(var));
}

Map<String, String> jobParams = new HashMap<String, String>();
for (String key : jobMeta.listParameters()) {
String value = jobMeta.getParameterValue(key);
String defaultValue = jobMeta.getParameterDefault(key);
jobParams.put(key,
executionConfiguration.getVariables().getOrDefault(key, value == null ? defaultValue : value));
}
Map<String, String> jobParams = new HashMap<String, String>();
for (String key : jobMeta.listParameters()) {
String value = jobMeta.getParameterValue(key);
String defaultValue = jobMeta.getParameterDefault(key);
jobParams.put(key,
executionConfiguration.getVariables().getOrDefault(key, value == null ? defaultValue : value));
}

executionConfiguration.getParams().putAll(jobParams);
executionConfiguration.getParams().putAll(jobParams);

if (executionConfiguration.isPassingExport()) {
// First export the job... slaveServer.getVariable("MASTER_HOST")
//
tempFile =
KettleVFS.createTempFile("jobExport", ".zip", System.getProperty("java.io.tmpdir"), jobMeta);
if (executionConfiguration.isPassingExport()) {
// First export the job... slaveServer.getVariable("MASTER_HOST")
//
tempFile =
KettleVFS.createTempFile("jobExport", ".zip", System.getProperty("java.io.tmpdir"), jobMeta);

TopLevelResource topLevelResource =
ResourceUtil.serializeResourceExportInterface(tempFile.getName().toString(), jobMeta, jobMeta, repository,
metaStore, executionConfiguration.getXML(), CONFIGURATION_IN_EXPORT_FILENAME);
TopLevelResource topLevelResource =
ResourceUtil.serializeResourceExportInterface(tempFile.getName().toString(), jobMeta, jobMeta, repository,
metaStore, executionConfiguration.getXML(), CONFIGURATION_IN_EXPORT_FILENAME);

// Send the zip file over to the slave server...
//
String result =
slaveServer.sendExport(topLevelResource.getArchiveName(), AddExportServlet.TYPE_JOB, topLevelResource
.getBaseResourceName());
WebResult webResult = WebResult.fromXMLString(result);
if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
throw new KettleException("There was an error passing the exported job to the remote server: " + Const.CR
+ webResult.getMessage());
}
carteObjectId = webResult.getId();
} else {
String xml = new JobConfiguration(jobMeta, executionConfiguration).getXML();
// Send the zip file over to the slave server...
//
String result =
slaveServer.sendExport(topLevelResource.getArchiveName(), AddExportServlet.TYPE_JOB, topLevelResource
.getBaseResourceName());
WebResult webResult = WebResult.fromXMLString(result);
if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
throw new KettleException("There was an error passing the exported job to the remote server: " + Const.CR
+ webResult.getMessage());
}
carteObjectId = webResult.getId();
} else {
String xml = new JobConfiguration(jobMeta, executionConfiguration).getXML();

String reply = slaveServer.sendXML(xml, RegisterJobServlet.CONTEXT_PATH + "/?xml=Y");
WebResult webResult = WebResult.fromXMLString(reply);
if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
throw new KettleException("There was an error posting the job on the remote server: " + Const.CR + webResult
.getMessage());
String reply = slaveServer.sendXML(xml, RegisterJobServlet.CONTEXT_PATH + "/?xml=Y");
WebResult webResult = WebResult.fromXMLString(reply);
if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
throw new KettleException("There was an error posting the job on the remote server: " + Const.CR + webResult
.getMessage());
}
carteObjectId = webResult.getId();
}
carteObjectId = webResult.getId();

ServerCache.cacheIdentity(jobMeta, executionConfiguration.getParams(), slaveServer, carteObjectId);
}

// Start the job
Expand All @@ -1779,6 +1786,8 @@ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguratio
"UTF-8") + "&xml=Y&id=" + carteObjectId);
WebResult webResult = WebResult.fromXMLString(reply);
if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) {
ServerCache.invalidate(jobMeta, executionConfiguration.getParams(), slaveServer);

throw new KettleException("There was an error starting the job on the remote server: " + Const.CR + webResult
.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public final class RemoteJobEntryLogHelper {
private static final String UNKNOWN_SERVER = "unknown server";
private static final String UNKNOW_OBJECT = "unknown object";
private static final String UNKNOWN_OBJECT = "unknown object";

private final LogChannelInterface logger;

Expand All @@ -38,9 +38,18 @@ public RemoteJobEntryLogHelper(SlaveServer server, String objectId, LogChannelIn
this.logger = logger;

this.serverAddress = server == null || server.getName() == null ? UNKNOWN_SERVER : server.getName();
this.objectId = objectId == null ? UNKNOW_OBJECT : objectId;
this.objectId = objectId == null ? UNKNOWN_OBJECT : objectId;

this.lastLogEntryNo = 0;

if (this.logger != null) {
this.logger.logBasic(new StringBuilder()
.append("=== Log Replayer is ready for [")
.append(this.objectId)
.append('@')
.append(serverAddress)
.append("] ===").toString());
}
}

public int getLastLogEntryNo() {
Expand All @@ -53,43 +62,36 @@ public void log(String logString, int firstEntryLineNo, int lastEntryLineNo) {
}

int length = logString.length();
int lineDiff = firstEntryLineNo - lastLogEntryNo;

if (length > 0 && lastLogEntryNo != lastEntryLineNo) {
try {
if (lastLogEntryNo != lastEntryLineNo) {
if (length > 0) {
logger.logBasic(new StringBuilder()
.append("---> Replay logs L")
.append("---> Replay logs from #")
.append(firstEntryLineNo)
.append(" ~ L")
.append(" to #")
.append(lastEntryLineNo)
.append(" from [")
.append(objectId)
.append('@')
.append(serverAddress)
.append("]: ")
.append(": ")
.append(length)
.append(" bytes <---").toString());

if (lineDiff != 0) {
logger.logError(new StringBuffer()
.append("*** Somehow we ")
.append(lineDiff > 0 ? "lost " : "got duplicated ")
.append(Math.abs(lineDiff))
.append(" lines of logs from [")
.append(objectId)
.append('@')
.append(serverAddress)
.append("] ***")
.toString());
}

logger.logBasic(logString);
} catch (Throwable t) {
// ignore as logging failure is trivial
// t.printStackTrace();
}
}

lastLogEntryNo = lastEntryLineNo;
int lineDiff = firstEntryLineNo - lastLogEntryNo;

if (lineDiff != 0) {
logger.logError(new StringBuffer()
.append("*** Skip ")
.append(lineDiff) // could be negative
.append(" lines from #")
.append(firstEntryLineNo)
.append(" to #")
.append(lastLogEntryNo)
.append(" ***")
.toString());
}

lastLogEntryNo = lastEntryLineNo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import java.text.SimpleDateFormat;
import java.util.*;

import static org.pentaho.di.cluster.ServerCache.PARAM_ETL_JOB_ID;

/**
* Recursive definition of a Job. This step means that an entire Job has to be executed. It can be the same Job, but
* just make sure that you don't get an endless loop. Provide an escape routine using JobEval.
Expand Down Expand Up @@ -910,11 +912,15 @@ public Result execute(Result result, int nr) throws KettleException {
jobExecutionConfiguration.setLogLevel(jobLogLevel);
jobExecutionConfiguration.setPassingExport(passingExport);
jobExecutionConfiguration.setExpandingRemoteJob(expandingRemoteJob);

Map<String, String> params = jobExecutionConfiguration.getParams();
for (String param : namedParam.listParameters()) {
String defValue = namedParam.getParameterDefault(param);
String value = namedParam.getParameterValue(param);
jobExecutionConfiguration.getParams().put(param, Const.NVL(value, defValue));
params.put(param, Const.NVL(value, defValue));
}
params.put(PARAM_ETL_JOB_ID, parentJob.getParameterValue(PARAM_ETL_JOB_ID));

if (parentJob.getJobMeta().isBatchIdPassed()) {
jobExecutionConfiguration.setPassedBatchId(parentJob.getBatchId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import java.util.List;
import java.util.Map;

import static org.pentaho.di.cluster.ServerCache.PARAM_ETL_JOB_ID;

/**
* This is the job entry that defines a transformation to be run.
*
Expand Down Expand Up @@ -966,6 +968,7 @@ public Result execute(Result result, int nr) throws KettleException {
transMeta.getParameterDefault(param), transMeta.getVariable(param)));
params.put(param, value);
}
params.put(PARAM_ETL_JOB_ID, parentJob.getParameterValue(PARAM_ETL_JOB_ID));

if (parentJob.getJobMeta().isBatchIdPassed()) {
transExecutionConfiguration.setPassedBatchId(parentJob.getPassedBatchId());
Expand Down
Loading

0 comments on commit b7b29d4

Please sign in to comment.