From b7b29d4557a04dc8c4d0df4d4b3c3b56e75ff6c6 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Sat, 7 Jan 2017 14:59:19 +0800 Subject: [PATCH] Merge changes from 6.1 --- .../src/main/java/org/pentaho/di/job/Job.java | 97 +++++++++------- .../job/entries/RemoteJobEntryLogHelper.java | 62 +++++----- .../di/job/entries/job/JobEntryJob.java | 8 +- .../di/job/entries/trans/JobEntryTrans.java | 3 + .../main/java/org/pentaho/di/trans/Trans.java | 109 ++++++++++-------- .../UserDefinedJavaClass.java | 2 +- .../UserDefinedJavaClassMeta.java | 2 +- .../org/pentaho/di/www/CarteSingleton.java | 37 ++++-- .../pentaho/di/www/GetJobStatusServlet.java | 5 +- .../pentaho/di/www/GetTransStatusServlet.java | 5 +- 10 files changed, 189 insertions(+), 141 deletions(-) diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java b/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java index ef4e2a4..fe1faa4 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java @@ -23,8 +23,10 @@ package org.pentaho.di.job; +import com.google.common.base.Strings; import org.apache.commons.vfs2.FileName; import org.apache.commons.vfs2.FileObject; +import org.pentaho.di.cluster.ServerCache; import org.pentaho.di.cluster.SlaveServer; import org.pentaho.di.core.*; import org.pentaho.di.core.database.Database; @@ -1705,7 +1707,6 @@ public String getStatus() { */ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguration executionConfiguration, Repository repository, IMetaStore metaStore) throws KettleException { - String carteObjectId; SlaveServer slaveServer = executionConfiguration.getRemoteServer(); if (slaveServer == null) { @@ -1718,58 +1719,64 @@ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguratio // Align logging levels between execution configuration and remote server slaveServer.getLogChannel().setLogLevel(executionConfiguration.getLogLevel()); + String carteObjectId = ServerCache.getCachedIdentity(jobMeta, executionConfiguration.getParams(), slaveServer); + FileObject tempFile = null; try { - // Inject certain internal variables to make it more intuitive. - // - for (String var : Const.INTERNAL_TRANS_VARIABLES) { - executionConfiguration.getVariables().put(var, jobMeta.getVariable(var)); - } - for (String var : Const.INTERNAL_JOB_VARIABLES) { - executionConfiguration.getVariables().put(var, jobMeta.getVariable(var)); - } + if (Strings.isNullOrEmpty(carteObjectId)) { + // Inject certain internal variables to make it more intuitive. + // + for (String var : Const.INTERNAL_TRANS_VARIABLES) { + executionConfiguration.getVariables().put(var, jobMeta.getVariable(var)); + } + for (String var : Const.INTERNAL_JOB_VARIABLES) { + executionConfiguration.getVariables().put(var, jobMeta.getVariable(var)); + } - Map jobParams = new HashMap(); - for (String key : jobMeta.listParameters()) { - String value = jobMeta.getParameterValue(key); - String defaultValue = jobMeta.getParameterDefault(key); - jobParams.put(key, - executionConfiguration.getVariables().getOrDefault(key, value == null ? defaultValue : value)); - } + Map jobParams = new HashMap(); + for (String key : jobMeta.listParameters()) { + String value = jobMeta.getParameterValue(key); + String defaultValue = jobMeta.getParameterDefault(key); + jobParams.put(key, + executionConfiguration.getVariables().getOrDefault(key, value == null ? defaultValue : value)); + } - executionConfiguration.getParams().putAll(jobParams); + executionConfiguration.getParams().putAll(jobParams); - if (executionConfiguration.isPassingExport()) { - // First export the job... slaveServer.getVariable("MASTER_HOST") - // - tempFile = - KettleVFS.createTempFile("jobExport", ".zip", System.getProperty("java.io.tmpdir"), jobMeta); + if (executionConfiguration.isPassingExport()) { + // First export the job... slaveServer.getVariable("MASTER_HOST") + // + tempFile = + KettleVFS.createTempFile("jobExport", ".zip", System.getProperty("java.io.tmpdir"), jobMeta); - TopLevelResource topLevelResource = - ResourceUtil.serializeResourceExportInterface(tempFile.getName().toString(), jobMeta, jobMeta, repository, - metaStore, executionConfiguration.getXML(), CONFIGURATION_IN_EXPORT_FILENAME); + TopLevelResource topLevelResource = + ResourceUtil.serializeResourceExportInterface(tempFile.getName().toString(), jobMeta, jobMeta, repository, + metaStore, executionConfiguration.getXML(), CONFIGURATION_IN_EXPORT_FILENAME); - // Send the zip file over to the slave server... - // - String result = - slaveServer.sendExport(topLevelResource.getArchiveName(), AddExportServlet.TYPE_JOB, topLevelResource - .getBaseResourceName()); - WebResult webResult = WebResult.fromXMLString(result); - if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { - throw new KettleException("There was an error passing the exported job to the remote server: " + Const.CR - + webResult.getMessage()); - } - carteObjectId = webResult.getId(); - } else { - String xml = new JobConfiguration(jobMeta, executionConfiguration).getXML(); + // Send the zip file over to the slave server... + // + String result = + slaveServer.sendExport(topLevelResource.getArchiveName(), AddExportServlet.TYPE_JOB, topLevelResource + .getBaseResourceName()); + WebResult webResult = WebResult.fromXMLString(result); + if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { + throw new KettleException("There was an error passing the exported job to the remote server: " + Const.CR + + webResult.getMessage()); + } + carteObjectId = webResult.getId(); + } else { + String xml = new JobConfiguration(jobMeta, executionConfiguration).getXML(); - String reply = slaveServer.sendXML(xml, RegisterJobServlet.CONTEXT_PATH + "/?xml=Y"); - WebResult webResult = WebResult.fromXMLString(reply); - if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { - throw new KettleException("There was an error posting the job on the remote server: " + Const.CR + webResult - .getMessage()); + String reply = slaveServer.sendXML(xml, RegisterJobServlet.CONTEXT_PATH + "/?xml=Y"); + WebResult webResult = WebResult.fromXMLString(reply); + if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { + throw new KettleException("There was an error posting the job on the remote server: " + Const.CR + webResult + .getMessage()); + } + carteObjectId = webResult.getId(); } - carteObjectId = webResult.getId(); + + ServerCache.cacheIdentity(jobMeta, executionConfiguration.getParams(), slaveServer, carteObjectId); } // Start the job @@ -1779,6 +1786,8 @@ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguratio "UTF-8") + "&xml=Y&id=" + carteObjectId); WebResult webResult = WebResult.fromXMLString(reply); if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { + ServerCache.invalidate(jobMeta, executionConfiguration.getParams(), slaveServer); + throw new KettleException("There was an error starting the job on the remote server: " + Const.CR + webResult .getMessage()); } diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/RemoteJobEntryLogHelper.java b/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/RemoteJobEntryLogHelper.java index a5d397f..a2c109e 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/RemoteJobEntryLogHelper.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/RemoteJobEntryLogHelper.java @@ -25,7 +25,7 @@ */ public final class RemoteJobEntryLogHelper { private static final String UNKNOWN_SERVER = "unknown server"; - private static final String UNKNOW_OBJECT = "unknown object"; + private static final String UNKNOWN_OBJECT = "unknown object"; private final LogChannelInterface logger; @@ -38,9 +38,18 @@ public RemoteJobEntryLogHelper(SlaveServer server, String objectId, LogChannelIn this.logger = logger; this.serverAddress = server == null || server.getName() == null ? UNKNOWN_SERVER : server.getName(); - this.objectId = objectId == null ? UNKNOW_OBJECT : objectId; + this.objectId = objectId == null ? UNKNOWN_OBJECT : objectId; this.lastLogEntryNo = 0; + + if (this.logger != null) { + this.logger.logBasic(new StringBuilder() + .append("=== Log Replayer is ready for [") + .append(this.objectId) + .append('@') + .append(serverAddress) + .append("] ===").toString()); + } } public int getLastLogEntryNo() { @@ -53,43 +62,36 @@ public void log(String logString, int firstEntryLineNo, int lastEntryLineNo) { } int length = logString.length(); - int lineDiff = firstEntryLineNo - lastLogEntryNo; - if (length > 0 && lastLogEntryNo != lastEntryLineNo) { - try { + if (lastLogEntryNo != lastEntryLineNo) { + if (length > 0) { logger.logBasic(new StringBuilder() - .append("---> Replay logs L") + .append("---> Replay logs from #") .append(firstEntryLineNo) - .append(" ~ L") + .append(" to #") .append(lastEntryLineNo) - .append(" from [") - .append(objectId) - .append('@') - .append(serverAddress) - .append("]: ") + .append(": ") .append(length) .append(" bytes <---").toString()); - if (lineDiff != 0) { - logger.logError(new StringBuffer() - .append("*** Somehow we ") - .append(lineDiff > 0 ? "lost " : "got duplicated ") - .append(Math.abs(lineDiff)) - .append(" lines of logs from [") - .append(objectId) - .append('@') - .append(serverAddress) - .append("] ***") - .toString()); - } - logger.logBasic(logString); - } catch (Throwable t) { - // ignore as logging failure is trivial - // t.printStackTrace(); } - } - lastLogEntryNo = lastEntryLineNo; + int lineDiff = firstEntryLineNo - lastLogEntryNo; + + if (lineDiff != 0) { + logger.logError(new StringBuffer() + .append("*** Skip ") + .append(lineDiff) // could be negative + .append(" lines from #") + .append(firstEntryLineNo) + .append(" to #") + .append(lastLogEntryNo) + .append(" ***") + .toString()); + } + + lastLogEntryNo = lastEntryLineNo; + } } } diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/job/JobEntryJob.java b/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/job/JobEntryJob.java index 9ed5272..cb6bb13 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/job/JobEntryJob.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/job/JobEntryJob.java @@ -60,6 +60,8 @@ import java.text.SimpleDateFormat; import java.util.*; +import static org.pentaho.di.cluster.ServerCache.PARAM_ETL_JOB_ID; + /** * Recursive definition of a Job. This step means that an entire Job has to be executed. It can be the same Job, but * just make sure that you don't get an endless loop. Provide an escape routine using JobEval. @@ -910,11 +912,15 @@ public Result execute(Result result, int nr) throws KettleException { jobExecutionConfiguration.setLogLevel(jobLogLevel); jobExecutionConfiguration.setPassingExport(passingExport); jobExecutionConfiguration.setExpandingRemoteJob(expandingRemoteJob); + + Map params = jobExecutionConfiguration.getParams(); for (String param : namedParam.listParameters()) { String defValue = namedParam.getParameterDefault(param); String value = namedParam.getParameterValue(param); - jobExecutionConfiguration.getParams().put(param, Const.NVL(value, defValue)); + params.put(param, Const.NVL(value, defValue)); } + params.put(PARAM_ETL_JOB_ID, parentJob.getParameterValue(PARAM_ETL_JOB_ID)); + if (parentJob.getJobMeta().isBatchIdPassed()) { jobExecutionConfiguration.setPassedBatchId(parentJob.getBatchId()); } diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/trans/JobEntryTrans.java b/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/trans/JobEntryTrans.java index a31dd40..d768cb7 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/trans/JobEntryTrans.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/trans/JobEntryTrans.java @@ -65,6 +65,8 @@ import java.util.List; import java.util.Map; +import static org.pentaho.di.cluster.ServerCache.PARAM_ETL_JOB_ID; + /** * This is the job entry that defines a transformation to be run. * @@ -966,6 +968,7 @@ public Result execute(Result result, int nr) throws KettleException { transMeta.getParameterDefault(param), transMeta.getVariable(param))); params.put(param, value); } + params.put(PARAM_ETL_JOB_ID, parentJob.getParameterValue(PARAM_ETL_JOB_ID)); if (parentJob.getJobMeta().isBatchIdPassed()) { transExecutionConfiguration.setPassedBatchId(parentJob.getPassedBatchId()); diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java index 6ae29d3..37a7090 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java @@ -23,9 +23,11 @@ package org.pentaho.di.trans; +import com.google.common.base.Strings; import org.apache.commons.lang.StringUtils; import org.apache.commons.vfs2.FileName; import org.apache.commons.vfs2.FileObject; +import org.pentaho.di.cluster.ServerCache; import org.pentaho.di.cluster.SlaveServer; import org.pentaho.di.core.*; import org.pentaho.di.core.database.Database; @@ -4121,7 +4123,6 @@ public static final Result getClusteredTransformationResult(LogChannelInterface */ public static String sendToSlaveServer(TransMeta transMeta, TransExecutionConfiguration executionConfiguration, Repository repository, IMetaStore metaStore) throws KettleException { - String carteObjectId; SlaveServer slaveServer = executionConfiguration.getRemoteServer(); if (slaveServer == null) { @@ -4131,68 +4132,74 @@ public static String sendToSlaveServer(TransMeta transMeta, TransExecutionConfig throw new KettleException("The transformation needs a name to uniquely identify it by on the remote server."); } + String carteObjectId = ServerCache.getCachedIdentity(transMeta, executionConfiguration.getParams(), slaveServer); + FileObject tempFile = null; try { - // Inject certain internal variables to make it more intuitive. - // - Map vars = new HashMap(); + if (Strings.isNullOrEmpty(carteObjectId)) { + // Inject certain internal variables to make it more intuitive. + // + Map vars = new HashMap(); - for (String var : Const.INTERNAL_TRANS_VARIABLES) { - vars.put(var, transMeta.getVariable(var)); - } - for (String var : Const.INTERNAL_JOB_VARIABLES) { - vars.put(var, transMeta.getVariable(var)); - } + for (String var : Const.INTERNAL_TRANS_VARIABLES) { + vars.put(var, transMeta.getVariable(var)); + } + for (String var : Const.INTERNAL_JOB_VARIABLES) { + vars.put(var, transMeta.getVariable(var)); + } - Map transParams = new HashMap(); - for (String key : transMeta.listParameters()) { - String value = transMeta.getParameterValue(key); - String defaultValue = transMeta.getParameterDefault(key); - transParams.put(key, - executionConfiguration.getVariables().getOrDefault(key, value == null ? defaultValue : value)); - } + Map transParams = new HashMap(); + for (String key : transMeta.listParameters()) { + String value = transMeta.getParameterValue(key); + String defaultValue = transMeta.getParameterDefault(key); + transParams.put(key, + executionConfiguration.getVariables().getOrDefault(key, value == null ? defaultValue : value)); + } - executionConfiguration.getParams().putAll(transParams); + executionConfiguration.getParams().putAll(transParams); - executionConfiguration.getVariables().putAll(vars); - slaveServer.injectVariables(executionConfiguration.getVariables()); + executionConfiguration.getVariables().putAll(vars); + slaveServer.injectVariables(executionConfiguration.getVariables()); - slaveServer.getLogChannel().setLogLevel(executionConfiguration.getLogLevel()); + slaveServer.getLogChannel().setLogLevel(executionConfiguration.getLogLevel()); - if (executionConfiguration.isPassingExport()) { + if (executionConfiguration.isPassingExport()) { - // First export the job... - // - tempFile = - KettleVFS.createTempFile("transExport", ".zip", System.getProperty("java.io.tmpdir"), transMeta); + // First export the job... + // + tempFile = + KettleVFS.createTempFile("transExport", ".zip", System.getProperty("java.io.tmpdir"), transMeta); - TopLevelResource topLevelResource = - ResourceUtil.serializeResourceExportInterface(tempFile.getName().toString(), transMeta, transMeta, - repository, metaStore, executionConfiguration.getXML(), CONFIGURATION_IN_EXPORT_FILENAME); + TopLevelResource topLevelResource = + ResourceUtil.serializeResourceExportInterface(tempFile.getName().toString(), transMeta, transMeta, + repository, metaStore, executionConfiguration.getXML(), CONFIGURATION_IN_EXPORT_FILENAME); - // Send the zip file over to the slave server... - // - String result = - slaveServer.sendExport(topLevelResource.getArchiveName(), AddExportServlet.TYPE_TRANS, topLevelResource - .getBaseResourceName()); - WebResult webResult = WebResult.fromXMLString(result); - if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { - throw new KettleException("There was an error passing the exported transformation to the remote server: " - + Const.CR + webResult.getMessage()); - } - carteObjectId = webResult.getId(); - } else { + // Send the zip file over to the slave server... + // + String result = + slaveServer.sendExport(topLevelResource.getArchiveName(), AddExportServlet.TYPE_TRANS, topLevelResource + .getBaseResourceName()); + WebResult webResult = WebResult.fromXMLString(result); + if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { + throw new KettleException("There was an error passing the exported transformation to the remote server: " + + Const.CR + webResult.getMessage()); + } + carteObjectId = webResult.getId(); + } else { - // Now send it off to the remote server... - // - String xml = new TransConfiguration(transMeta, executionConfiguration).getXML(); - String reply = slaveServer.sendXML(xml, RegisterTransServlet.CONTEXT_PATH + "/?xml=Y"); - WebResult webResult = WebResult.fromXMLString(reply); - if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { - throw new KettleException("There was an error posting the transformation on the remote server: " + Const.CR - + webResult.getMessage()); + // Now send it off to the remote server... + // + String xml = new TransConfiguration(transMeta, executionConfiguration).getXML(); + String reply = slaveServer.sendXML(xml, RegisterTransServlet.CONTEXT_PATH + "/?xml=Y"); + WebResult webResult = WebResult.fromXMLString(reply); + if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { + throw new KettleException("There was an error posting the transformation on the remote server: " + Const.CR + + webResult.getMessage()); + } + carteObjectId = webResult.getId(); } - carteObjectId = webResult.getId(); + + ServerCache.cacheIdentity(transMeta, executionConfiguration.getParams(), slaveServer, carteObjectId); } // Prepare the transformation @@ -4202,6 +4209,8 @@ public static String sendToSlaveServer(TransMeta transMeta, TransExecutionConfig .getName(), "UTF-8") + "&xml=Y&id=" + carteObjectId); WebResult webResult = WebResult.fromXMLString(reply); if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { + ServerCache.invalidate(transMeta, executionConfiguration.getParams(), slaveServer); + throw new KettleException("There was an error preparing the transformation for excution on the remote server: " + Const.CR + webResult.getMessage()); } diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClass.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClass.java index f0eb55b..52ba00c 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClass.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClass.java @@ -57,7 +57,7 @@ public UserDefinedJavaClass(StepMeta stepMeta, StepDataInterface stepDataInterfa if (meta.cookErrors.size() > 0) { for (Exception e : meta.cookErrors) { - logErrorImpl("Error initializing UserDefinedJavaClass:", e); + logErrorImpl("Error initializing UserDefinedJavaClass:" + e.getMessage(), e.getCause()); } setErrorsImpl(meta.cookErrors.size()); stopAllImpl(); diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClassMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClassMeta.java index 3358e59..b86d3ca 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClassMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClassMeta.java @@ -174,7 +174,7 @@ public TransformClassBase newChildInstance(UserDefinedJavaClass parent, UserDefi if (log.isDebug()) { log.logError("Full debugging stacktrace of UserDefinedJavaClass instanciation exception:", e.getCause()); } - KettleException kettleException = new KettleException(e.getMessage()); + KettleException kettleException = new KettleException(e.toString(), e); kettleException.setStackTrace(new StackTraceElement[]{}); cookErrors.add(kettleException); return null; diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java index 7de8d16..c0ca1d3 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java @@ -22,6 +22,7 @@ package org.pentaho.di.www; +import org.pentaho.di.cluster.ServerCache; import org.pentaho.di.cluster.SlaveServer; import org.pentaho.di.core.Const; import org.pentaho.di.core.KettleEnvironment; @@ -127,6 +128,14 @@ public static void installPurgeTimer(final SlaveServerConfig config, final LogCh objectTimeout = 24 * 60; // 1440 : default is a one day time-out } + if (!ServerCache.RESOURCE_CACHE_DISABLED && objectTimeout <= ServerCache.RESOURCE_EXPIRATION_MINUTE) { + log.logBasic(new StringBuilder().append("You may want to increase ") + .append(Const.KETTLE_CARTE_OBJECT_TIMEOUT_MINUTES).append(" from ") + .append(objectTimeout).append(" minutes to ") + .append(ServerCache.RESOURCE_EXPIRATION_MINUTE + 1) + .append(" to fully utilize resource cache.").toString()); + } + // If we need to time out finished or idle objects, we should create a timer // in the background to clean // @@ -147,28 +156,31 @@ public void run() { // for (CarteObjectEntry entry : transformationMap.getTransformationObjects()) { Trans trans = transformationMap.getTransformation(entry); + Date logDate = trans.getLogDate(); // See if the transformation is finished or stopped. // - if (trans != null && (trans.isFinished() || trans.isStopped()) && trans.getLogDate() != null) { + if (trans != null && (trans.isFinished() || trans.isStopped()) && logDate != null) { // check the last log time // int diffInMinutes = - (int) Math.floor((System.currentTimeMillis() - trans.getLogDate().getTime()) / 60000); + (int) Math.floor((System.currentTimeMillis() - logDate.getTime()) / 60000); if (diffInMinutes >= objectTimeout) { + String logChannelId = trans.getLogChannelId(); + // Let's remove this from the transformation map... // transformationMap.removeTransformation(entry); // Remove the logging information from the log registry & central log store // - LoggingRegistry.getInstance().removeIncludingChildren(trans.getLogChannelId()); - KettleLogStore.discardLines(trans.getLogChannelId(), false); + LoggingRegistry.getInstance().removeIncludingChildren(logChannelId); + KettleLogStore.discardLines(logChannelId, false); // transformationMap.deallocateServerSocketPorts(entry); log.logMinimal("Cleaned up transformation " - + entry.getName() + " with id " + entry.getId() + " from " + trans.getLogDate() + + entry.getName() + " with id " + entry.getId() + " from " + logDate + ", diff=" + diffInMinutes); } } @@ -178,20 +190,29 @@ public void run() { // for (CarteObjectEntry entry : jobMap.getJobObjects()) { Job job = jobMap.getJob(entry); + Date logDate = job.getLogDate(); // See if the job is finished or stopped. // - if (job != null && (job.isFinished() || job.isStopped()) && job.getLogDate() != null) { + if (job != null && (job.isFinished() || job.isStopped()) && logDate != null) { // check the last log time // int diffInMinutes = - (int) Math.floor((System.currentTimeMillis() - job.getLogDate().getTime()) / 60000); + (int) Math.floor((System.currentTimeMillis() - logDate.getTime()) / 60000); if (diffInMinutes >= objectTimeout) { + String logChannelId = job.getLogChannelId(); + // Let's remove this from the job map... // jobMap.removeJob(entry); + + // Remove the logging information from the log registry & central log store + // + LoggingRegistry.getInstance().removeIncludingChildren(logChannelId); + KettleLogStore.discardLines(logChannelId, false); + log.logMinimal("Cleaned up job " - + entry.getName() + " with id " + entry.getId() + " from " + job.getLogDate()); + + entry.getName() + " with id " + entry.getId() + " from " + logDate); } } } diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetJobStatusServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetJobStatusServlet.java index 60886e4..9d109f3 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetJobStatusServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetJobStatusServlet.java @@ -234,9 +234,8 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro if (job != null) { String status = job.getStatus(); int lastLineNr = KettleLogStore.getLastBufferLineNr(); - String logText = - KettleLogStore.getAppender().getBuffer( - job.getLogChannel().getLogChannelId(), false, startLineNr, lastLineNr).toString(); + String logText = startLineNr >= lastLineNr ? "" : KettleLogStore.getAppender().getBuffer( + job.getLogChannel().getLogChannelId(), false, startLineNr, lastLineNr).toString(); if (useXML) { response.setContentType("text/xml"); diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetTransStatusServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetTransStatusServlet.java index bce0af7..24cc57c 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetTransStatusServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetTransStatusServlet.java @@ -239,9 +239,8 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro if (trans != null) { String status = trans.getStatus(); int lastLineNr = KettleLogStore.getLastBufferLineNr(); - String logText = - KettleLogStore.getAppender().getBuffer( - trans.getLogChannel().getLogChannelId(), false, startLineNr, lastLineNr).toString(); + String logText = startLineNr >= lastLineNr ? "" : KettleLogStore.getAppender().getBuffer( + trans.getLogChannel().getLogChannelId(), false, startLineNr, lastLineNr).toString(); if (useXML) { response.setContentType("text/xml");