diff --git a/pentaho-kettle/lib/pdi-pur-plugin-core-8.0.0.0-1.jar b/pentaho-kettle/lib/pdi-pur-plugin-core-8.0.0.0-1.jar new file mode 100644 index 0000000..4fb9d58 Binary files /dev/null and b/pentaho-kettle/lib/pdi-pur-plugin-core-8.0.0.0-1.jar differ diff --git a/pentaho-kettle/pom.xml b/pentaho-kettle/pom.xml index 7664e34..1f285c4 100644 --- a/pentaho-kettle/pom.xml +++ b/pentaho-kettle/pom.xml @@ -4,7 +4,7 @@ com.github.zhicwu pdi-cluster - 7.1.0.5-SNAPSHOT + 8.0.0.0-SNAPSHOT pentaho-kettle jar @@ -55,6 +55,8 @@ pentaho-kettle pdi-pur-plugin ${pentaho-ce.version} + system + ${basedir}/lib/pdi-pur-plugin-core-${pentaho-ce.version}.jar diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/cluster/SlaveConnectionManager.java b/pentaho-kettle/src/main/java/org/pentaho/di/cluster/SlaveConnectionManager.java index 6adaa63..e1bb453 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/cluster/SlaveConnectionManager.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/cluster/SlaveConnectionManager.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -22,10 +22,17 @@ package org.pentaho.di.cluster; -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; -import org.apache.commons.httpclient.params.HttpClientParams; -import org.apache.commons.httpclient.params.HttpConnectionManagerParams; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.config.SocketConfig; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; @@ -62,7 +69,8 @@ public class SlaveConnectionManager { private static SlaveConnectionManager slaveConnectionManager; - private MultiThreadedHttpConnectionManager manager; + private final PoolingHttpClientConnectionManager manager; + private final RequestConfig defaultRequestConfig; private SlaveConnectionManager() { if (needToInitializeSSLContext()) { @@ -74,14 +82,19 @@ private SlaveConnectionManager() { //log.logError( "Default SSL context hasn't been initialized", e ); } } - manager = new MultiThreadedHttpConnectionManager(); - HttpConnectionManagerParams connParams = manager.getParams(); - connParams.setDefaultMaxConnectionsPerHost(KETTLE_HTTPCLIENT_MAX_CONNECTIONS_PER_HOST); - connParams.setMaxTotalConnections(KETTLE_HTTPCLIENT_MAX_CONNECTIONS); - - connParams.setConnectionTimeout(KETTLE_HTTPCLIENT_CONNECTION_TIMEOUT * 1000); - connParams.setLinger(KETTLE_HTTPCLIENT_SOCKET_LINGER); - connParams.setStaleCheckingEnabled(KETTLE_HTTPCLIENT_STALE_CHECKING); + + manager = new PoolingHttpClientConnectionManager(); + manager.setDefaultMaxPerRoute(KETTLE_HTTPCLIENT_MAX_CONNECTIONS_PER_HOST); + manager.setMaxTotal(KETTLE_HTTPCLIENT_MAX_CONNECTIONS); + + manager.setDefaultSocketConfig( + SocketConfig.custom() + .setSoTimeout(KETTLE_HTTPCLIENT_SOCKET_TIMEOUT) + .setSoLinger(KETTLE_HTTPCLIENT_SOCKET_LINGER).build()); + + defaultRequestConfig = RequestConfig.custom() + .setConnectTimeout(KETTLE_HTTPCLIENT_CONNECTION_MANAGER_TIMEOUT * 1000) + .build(); } private static boolean needToInitializeSSLContext() { @@ -96,15 +109,45 @@ public static SlaveConnectionManager getInstance() { } public HttpClient createHttpClient() { - HttpClient client = new HttpClient(manager); - - HttpClientParams clientParams = client.getParams(); + return HttpClients.custom().setConnectionManager(manager) + .setDefaultRequestConfig(defaultRequestConfig) + .build(); + } - clientParams.setConnectionManagerTimeout(KETTLE_HTTPCLIENT_CONNECTION_MANAGER_TIMEOUT * 1000); - clientParams.setSoTimeout(KETTLE_HTTPCLIENT_SOCKET_TIMEOUT * 1000); - client.getHostConfiguration().getParams().setDefaults(clientParams); + public HttpClient createHttpClient(String user, String password) { + CredentialsProvider provider = new BasicCredentialsProvider(); + UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(user, password); + provider.setCredentials(AuthScope.ANY, credentials); + + return + HttpClientBuilder + .create() + .setDefaultCredentialsProvider(provider) + .setConnectionManager(manager) + .setDefaultRequestConfig(defaultRequestConfig) + .build(); + } - return client; + public HttpClient createHttpClient(String user, String password, + String proxyHost, int proxyPort, AuthScope authScope) { + HttpHost httpHost = new HttpHost(proxyHost, proxyPort); + + RequestConfig requestConfig = RequestConfig.custom() + .setProxy(httpHost) + .setConnectTimeout(defaultRequestConfig.getConnectTimeout()) + .build(); + + CredentialsProvider provider = new BasicCredentialsProvider(); + UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(user, password); + provider.setCredentials(authScope, credentials); + + return + HttpClientBuilder + .create() + .setDefaultCredentialsProvider(provider) + .setDefaultRequestConfig(requestConfig) + .setConnectionManager(manager) + .build(); } public void shutdown() { diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/cluster/SlaveServer.java b/pentaho-kettle/src/main/java/org/pentaho/di/cluster/SlaveServer.java index 21f7b32..fa8b505 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/cluster/SlaveServer.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/cluster/SlaveServer.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -23,13 +23,24 @@ package org.pentaho.di.cluster; import com.google.common.base.Strings; -import org.apache.commons.httpclient.Header; -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.HttpState; -import org.apache.commons.httpclient.UsernamePasswordCredentials; -import org.apache.commons.httpclient.auth.AuthScope; -import org.apache.commons.httpclient.methods.*; import org.apache.commons.lang.StringUtils; +import org.apache.http.*; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.AuthCache; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.impl.auth.BasicScheme; +import org.apache.http.impl.client.BasicAuthCache; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.message.BasicHeader; import org.pentaho.di.core.Const; import org.pentaho.di.core.changed.ChangedFlag; import org.pentaho.di.core.encryption.Encr; @@ -60,8 +71,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.pentaho.di.cluster.SlaveConnectionManager.KETTLE_HTTPCLIENT_SOCKET_TIMEOUT; - public class SlaveServer extends ChangedFlag implements Cloneable, SharedObjectInterface, VariableSpace, RepositoryElementInterface, XMLInterface { private static Class PKG = SlaveServer.class; // for i18n purposes, needed by Translator2!! @@ -79,8 +88,6 @@ public class SlaveServer extends ChangedFlag implements Cloneable, SharedObjectI public static final String SSL_MODE_TAG = "sslMode"; - private static final int NOT_FOUND_ERROR = 404; - public static final int KETTLE_CARTE_RETRIES = getNumberOfSlaveServerRetries(); public static final int KETTLE_CARTE_RETRY_BACKOFF_INCREMENTS = getBackoffIncrements(); @@ -546,64 +553,32 @@ public String constructUrl(String serviceAndArguments) throws UnsupportedEncodin } // Method is defined as package-protected in order to be accessible by unit tests - PostMethod buildSendXMLMethod(byte[] content, String service) throws Exception { + HttpPost buildSendXMLMethod(byte[] content, String service) throws Exception { // Prepare HTTP put // String urlString = constructUrl(service); if (log.isDebug()) { log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ConnectingTo", urlString)); } - PostMethod postMethod = new PostMethod(urlString); + HttpPost postMethod = new HttpPost(urlString); // Request content will be retrieved directly from the input stream // - RequestEntity entity = new ByteArrayRequestEntity(content); + HttpEntity entity = new ByteArrayEntity(content); - postMethod.setRequestEntity(entity); - postMethod.setDoAuthentication(true); - postMethod.addRequestHeader(new Header("Content-Type", "text/xml;charset=" + Const.XML_ENCODING)); + postMethod.setEntity(entity); + postMethod.addHeader(new BasicHeader("Content-Type", "text/xml;charset=" + Const.XML_ENCODING)); return postMethod; } public String sendXML(String xml, String service) throws Exception { - PostMethod method = buildSendXMLMethod(xml.getBytes(Const.XML_ENCODING), service); + HttpPost method = buildSendXMLMethod(xml.getBytes(Const.XML_ENCODING), service); // Execute request // try { - int result = getHttpClient().executeMethod(method); - - // The status code - if (log.isDebug()) { - log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ResponseStatus", Integer.toString(result))); - } - - String responseBody = getResponseBodyAsString(method.getResponseBodyAsStream()); - - if (log.isDebug()) { - log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ResponseBody", responseBody)); - } - - if (result >= 400) { - String message; - if (result == NOT_FOUND_ERROR) { - message = String.format("%s%s%s%s", - BaseMessages.getString(PKG, "SlaveServer.Error.404.Title"), - Const.CR, Const.CR, - BaseMessages.getString(PKG, "SlaveServer.Error.404.Message") - ); - } else { - message = String.format("HTTP Status %d - %s - %s", - method.getStatusCode(), - method.getPath(), - method.getStatusText() - ); - } - throw new KettleException(message); - } - - return responseBody; + return executeAuth(method); } finally { // Release current connection to the connection pool once you are done method.releaseConnection(); @@ -614,13 +589,35 @@ public String sendXML(String xml, String service) throws Exception { } } + /** + * Throws if not ok + */ + private void handleStatus(HttpUriRequest method, StatusLine statusLine, int status) throws KettleException { + if (status >= 300) { + String message; + if (status == HttpStatus.SC_NOT_FOUND) { + message = String.format("%s%s%s%s", + BaseMessages.getString(PKG, "SlaveServer.Error.404.Title"), + Const.CR, Const.CR, + BaseMessages.getString(PKG, "SlaveServer.Error.404.Message") + ); + } else { + message = String.format("HTTP Status %d - %s - %s", + status, + method.getURI().toString(), + statusLine.getReasonPhrase()); + } + throw new KettleException(message); + } + } + // Method is defined as package-protected in order to be accessible by unit tests - PostMethod buildSendExportMethod(String type, String load, InputStream is) throws UnsupportedEncodingException { + HttpPost buildSendExportMethod(String type, String load, InputStream is) throws UnsupportedEncodingException { String serviceUrl = RegisterPackageServlet.CONTEXT_PATH; if (type != null && load != null) { serviceUrl += - "/?" + AddExportServlet.PARAMETER_TYPE + "=" + type + "&" + AddExportServlet.PARAMETER_LOAD + "=" - + URLEncoder.encode(load, "UTF-8"); + "/?" + RegisterPackageServlet.PARAMETER_TYPE + "=" + type + + "&" + RegisterPackageServlet.PARAMETER_LOAD + "=" + URLEncoder.encode(load, "UTF-8"); } String urlString = constructUrl(serviceUrl); @@ -628,11 +625,9 @@ PostMethod buildSendExportMethod(String type, String load, InputStream is) throw log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ConnectingTo", urlString)); } - PostMethod method = new PostMethod(urlString); - method.setRequestEntity(new InputStreamRequestEntity(is)); - method.setDoAuthentication(true); - method.addRequestHeader(new Header("Content-Type", "binary/zip")); - method.getParams().setSoTimeout(KETTLE_HTTPCLIENT_SOCKET_TIMEOUT * 1000); + HttpPost method = new HttpPost(urlString); + method.setEntity(new InputStreamEntity(is)); + method.addHeader(new BasicHeader("Content-Type", "binary/zip")); return method; } @@ -648,35 +643,11 @@ PostMethod buildSendExportMethod(String type, String load, InputStream is) throw */ public String sendExport(String filename, String type, String load) throws Exception { // Request content will be retrieved directly from the input stream - // - InputStream is = null; - try { - is = KettleVFS.getInputStream(KettleVFS.getFileObject(filename)); - + try (InputStream is = KettleVFS.getInputStream(KettleVFS.getFileObject(filename))) { // Execute request - // - PostMethod method = buildSendExportMethod(type, load, is); + HttpPost method = buildSendExportMethod(type, load, is); try { - int result = getHttpClient().executeMethod(method); - - // The status code - if (log.isDebug()) { - log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ResponseStatus", Integer.toString(result))); - } - - String responseBody = getResponseBodyAsString(method.getResponseBodyAsStream()); - - // String body = post.getResponseBodyAsString(); - if (log.isDebug()) { - log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ResponseBody", responseBody)); - } - - if (result >= 400) { - throw new KettleException(String.format("HTTP Status %d - %s - %s", method.getStatusCode(), method - .getPath(), method.getStatusText())); - } - - return responseBody; + return executeAuth(method); } finally { // Release current connection to the connection pool once you are done method.releaseConnection(); @@ -685,56 +656,76 @@ public String sendExport(String filename, String type, String load) throws Excep RegisterPackageServlet.CONTEXT_PATH, environmentSubstitute(hostname))); } } - } finally { - try { - if (is != null) { - is.close(); - } - } catch (IOException ignored) { - // nothing to do here... - } } } - public void addProxy(HttpClient client) { - String hostName; - String proxyHost; - String proxyPort; - String nonProxyHosts; + /** + * Executes method with authentication. + * + * @param method + * @return + * @throws IOException + * @throws ClientProtocolException + * @throws KettleException if response not ok + */ + private String executeAuth(HttpUriRequest method) throws IOException, ClientProtocolException, KettleException { + HttpResponse httpResponse = getHttpClient().execute(method, getAuthContext()); + return getResponse(method, httpResponse); + } - lock.readLock().lock(); - try { - hostName = environmentSubstitute(this.hostname); - proxyHost = environmentSubstitute(this.proxyHostname); - proxyPort = environmentSubstitute(this.proxyPort); - nonProxyHosts = environmentSubstitute(this.nonProxyHosts); - } finally { - lock.readLock().unlock(); + private String getResponse(HttpUriRequest method, HttpResponse httpResponse) throws IOException, KettleException { + StatusLine statusLine = httpResponse.getStatusLine(); + int statusCode = statusLine.getStatusCode(); + // The status code + if (log.isDebug()) { + log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ResponseStatus", Integer.toString(statusCode))); } - if (!Utils.isEmpty(proxyHost) && !Utils.isEmpty(proxyPort)) { - // skip applying proxy if non-proxy host matches - if (!Utils.isEmpty(nonProxyHosts) && !Utils.isEmpty(hostName) && hostName.matches(nonProxyHosts)) { - return; - } - client.getHostConfiguration().setProxy(proxyHost, Integer.parseInt(proxyPort)); + String responseBody = getResponseBodyAsString(httpResponse.getEntity().getContent()); + if (log.isDebug()) { + log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ResponseBody", responseBody)); } - } - public void addCredentials(HttpClient client) { - HttpState state = client.getState(); + // throw if not ok + handleStatus(method, statusLine, statusCode); + + return responseBody; + } + private void addCredentials(HttpClientContext context) { + String userName; + String password; + String host; + int port; lock.readLock().lock(); try { - state.setCredentials( - new AuthScope(environmentSubstitute(hostname), Const.toInt(environmentSubstitute(port), 80)), - new UsernamePasswordCredentials(environmentSubstitute(username), Encr - .decryptPasswordOptionallyEncrypted(environmentSubstitute(password)))); + host = environmentSubstitute(hostname); + port = Const.toInt(environmentSubstitute(this.port), 80); + userName = environmentSubstitute(username); + password = Encr.decryptPasswordOptionallyEncrypted(environmentSubstitute(this.password)); } finally { lock.readLock().unlock(); } + CredentialsProvider provider = new BasicCredentialsProvider(); + UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(userName, password); + provider.setCredentials(new AuthScope(host, port), credentials); + context.setCredentialsProvider(provider); + + // Generate BASIC scheme object and add it to the local auth cache + HttpHost target = new HttpHost(host, port, "http"); + AuthCache authCache = new BasicAuthCache(); + BasicScheme basicAuth = new BasicScheme(); + authCache.put(target, basicAuth); + context.setAuthCache(authCache); + } - client.getParams().setAuthenticationPreemptive(true); + /** + * @return HttpClientContext with authorization credentials + */ + protected HttpClientContext getAuthContext() { + HttpClientContext context = HttpClientContext.create(); + addCredentials(context); + return context; } /** @@ -816,35 +807,32 @@ String getResponseBodyAsString(InputStream is) throws IOException { } // Method is defined as package-protected in order to be accessible by unit tests - GetMethod buildExecuteServiceMethod(String service, Map headerValues) + HttpGet buildExecuteServiceMethod(String service, Map headerValues) throws UnsupportedEncodingException { - GetMethod method = new GetMethod(constructUrl(service)); + HttpGet method = new HttpGet(constructUrl(service)); for (String key : headerValues.keySet()) { - method.setRequestHeader(key, headerValues.get(key)); + method.setHeader(key, headerValues.get(key)); } - - method.getParams().setSoTimeout(KETTLE_HTTPCLIENT_SOCKET_TIMEOUT * 1000); - return method; } public String execService(String service, Map headerValues) throws Exception { // Prepare HTTP get - // - GetMethod method = buildExecuteServiceMethod(service, headerValues); - + HttpGet method = buildExecuteServiceMethod(service, headerValues); // Execute request - // try { - int result = getHttpClient().executeMethod(method); + HttpResponse httpResponse = getHttpClient().execute(method, getAuthContext()); + StatusLine statusLine = httpResponse.getStatusLine(); + int statusCode = statusLine.getStatusCode(); // The status code if (log.isDebug()) { - log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ResponseStatus", Integer.toString(result))); + log.logDebug( + BaseMessages.getString(PKG, "SlaveServer.DEBUG_ResponseStatus", Integer.toString(statusCode))); } - String responseBody = method.getResponseBodyAsString(); + String responseBody = getResponseBodyAsString(httpResponse.getEntity().getContent()); if (log.isDetailed()) { log.logDetailed(BaseMessages.getString(PKG, "SlaveServer.DETAILED_FinishedReading", Integer @@ -854,9 +842,9 @@ public String execService(String service, Map headerValues) thro log.logDebug(BaseMessages.getString(PKG, "SlaveServer.DEBUG_ResponseBody", responseBody)); } - if (result >= 400) { - throw new KettleException(String.format("HTTP Status %d - %s - %s", method.getStatusCode(), method.getPath(), - method.getStatusText())); + if (statusCode >= 400) { + throw new KettleException(String.format("HTTP Status %d - %s - %s", statusCode, method.getURI().toString(), + statusLine.getReasonPhrase())); } return responseBody; @@ -871,10 +859,41 @@ public String execService(String service, Map headerValues) thro // Method is defined as package-protected in order to be accessible by unit tests HttpClient getHttpClient() { - HttpClient client = SlaveConnectionManager.getInstance().createHttpClient(); - addCredentials(client); - addProxy(client); - return client; + SlaveConnectionManager connectionManager = SlaveConnectionManager.getInstance(); + String userName; + String password; + + String hostName; + int httpPort; + + String proxyHost; + String proxyPort; + String nonProxyHosts; + + lock.readLock().lock(); + try { + hostName = environmentSubstitute(this.hostname); + httpPort = Const.toInt(environmentSubstitute(this.port), 80); + proxyHost = environmentSubstitute(this.proxyHostname); + proxyPort = environmentSubstitute(this.proxyPort); + nonProxyHosts = environmentSubstitute(this.nonProxyHosts); + userName = environmentSubstitute(username); + password = Encr + .decryptPasswordOptionallyEncrypted(environmentSubstitute(environmentSubstitute(this.password))); + } finally { + lock.readLock().unlock(); + } + if (!Utils.isEmpty(proxyHost) && !Utils.isEmpty(proxyPort)) { + // skip applying proxy if non-proxy host matches + if (!Utils.isEmpty(nonProxyHosts) && !Utils.isEmpty(hostName) && hostName.matches(nonProxyHosts)) { + return connectionManager.createHttpClient(userName, password); + } + AuthScope authScope = new AuthScope(hostName, httpPort); + return connectionManager + .createHttpClient(userName, password, hostName, Integer.parseInt(proxyPort), authScope); + } else { + return connectionManager.createHttpClient(); + } } public SlaveServerStatus getStatus() throws Exception { @@ -905,7 +924,8 @@ public SlaveServerTransStatus getTransStatus(String transName, String carteObjec return SlaveServerTransStatus.fromXML(xml); } - public SlaveServerJobStatus getJobStatus(String jobName, String carteObjectId, int startLogLineNr) throws Exception { + public SlaveServerJobStatus getJobStatus(String jobName, String carteObjectId, int startLogLineNr) + throws Exception { String xml = execService(GetJobStatusServlet.CONTEXT_PATH + "/?name=" + URLEncoder.encode(jobName, "UTF-8") + "&id=" + Const.NVL(carteObjectId, "") + "&xml=Y&from=" + startLogLineNr, true); diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/core/database/DatabaseMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/core/database/DatabaseMeta.java index 6e257be..0d49610 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/core/database/DatabaseMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/core/database/DatabaseMeta.java @@ -3,7 +3,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -998,7 +998,7 @@ public String getXML() { retval.append(" ").append(Const.CR); - List list = new ArrayList(); + List list = new ArrayList<>(); Set keySet = getAttributes().keySet(); for (Object object : keySet) { list.add((String) object); @@ -1337,7 +1337,7 @@ public static final String getAccessTypeDescLong(int dbaccess) { } public static final DatabaseInterface[] getDatabaseInterfaces() { - List list = new ArrayList(getDatabaseInterfacesMap().values()); + List list = new ArrayList<>(getDatabaseInterfacesMap().values()); return list.toArray(new DatabaseInterface[list.size()]); } @@ -1357,7 +1357,7 @@ private Map doCreate() { PluginRegistry registry = PluginRegistry.getInstance(); List plugins = registry.getPlugins(DatabasePluginType.class); - HashMap tmpAllDatabaseInterfaces = new HashMap(); + HashMap tmpAllDatabaseInterfaces = new HashMap<>(); for (PluginInterface plugin : plugins) { try { DatabaseInterface databaseInterface = (DatabaseInterface) registry.loadClass(plugin); @@ -1519,7 +1519,7 @@ public String getFunctionCount() { * @return an array of remarks Strings */ public String[] checkParameters() { - ArrayList remarks = new ArrayList(); + ArrayList remarks = new ArrayList<>(); if (getDatabaseInterface() == null) { remarks.add(BaseMessages.getString(PKG, "DatabaseMeta.BadInterface")); @@ -1948,7 +1948,7 @@ public String getSQLUnlockTables(String[] tableNames) { * @return a feature list for the chosen database type. */ public List getFeatureSummary() { - List list = new ArrayList(); + List list = new ArrayList<>(); RowMetaAndData r = null; final String par = "Parameter"; final String val = "Value"; @@ -2523,13 +2523,14 @@ public void setForcingIdentifiersToUpperCase(boolean forceUpperCase) { * @return The database object if one was found, null otherwise. */ public static final DatabaseMeta findDatabase(List databases, String dbname) { - if (databases == null) { + if (databases == null || dbname == null) { return null; } + dbname = dbname.trim(); for (int i = 0; i < databases.size(); i++) { DatabaseMeta ci = (DatabaseMeta) databases.get(i); - if (ci.getName().equalsIgnoreCase(dbname)) { + if (ci.getName().trim().equalsIgnoreCase(dbname)) { return ci; } } diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/core/database/util/DatabaseUtil.java b/pentaho-kettle/src/main/java/org/pentaho/di/core/database/util/DatabaseUtil.java index 2224026..9fcf810 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/core/database/util/DatabaseUtil.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/core/database/util/DatabaseUtil.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/core/logging/LogChannelFileWriter.java b/pentaho-kettle/src/main/java/org/pentaho/di/core/logging/LogChannelFileWriter.java index ed3beb5..f894115 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/core/logging/LogChannelFileWriter.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/core/logging/LogChannelFileWriter.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -38,6 +38,7 @@ */ public class LogChannelFileWriter { private final AtomicBoolean active; + private final AtomicBoolean finished; private String logChannelId; private FileObject logFile; @@ -64,6 +65,7 @@ public LogChannelFileWriter(String logChannelId, FileObject logFile, boolean app this.pollingInterval = pollingInterval; active = new AtomicBoolean(false); + finished = new AtomicBoolean(false); lastBufferLineNr = KettleLogStore.getLastBufferLineNr(); // it's basic move to create the directory *before* creating log file @@ -125,6 +127,8 @@ public void run() { } } catch (Exception e) { exception = new KettleException("There was an error closing log file file '" + logFile + "'", e); + } finally { + finished.set(true); } } } @@ -147,6 +151,9 @@ public synchronized void flush() { public void stopLogging() { flush(); active.set(false); + while (!finished.get()) { + Thread.yield(); + } } public KettleException getException() { diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/core/logging/LoggingRegistry.java b/pentaho-kettle/src/main/java/org/pentaho/di/core/logging/LoggingRegistry.java index 3973bcf..98eba24 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/core/logging/LoggingRegistry.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/core/logging/LoggingRegistry.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/core/row/RowMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/core/row/RowMeta.java index bd064fc..9014956 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/core/row/RowMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/core/row/RowMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java b/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java index a9b1645..04532d6 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/job/Job.java @@ -3,7 +3,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -424,6 +424,7 @@ public void run() { shutdownHeartbeat(heartbeat); ExtensionPointHandler.callExtensionPoint(log, KettleExtensionPoint.JobFinish.id, this); + jobMeta.disposeEmbeddedMetastoreProvider(); fireJobFinishListeners(); } catch (KettleException e) { @@ -655,6 +656,12 @@ private Result execute(final int nr, Result prev_result, final JobEntryCopy jobE JobExecutionExtension extension = new JobExecutionExtension(this, prevResult, jobEntryCopy, true); ExtensionPointHandler.callExtensionPoint(log, KettleExtensionPoint.JobBeforeJobEntryExecution.id, extension); + jobMeta.disposeEmbeddedMetastoreProvider(); + if (jobMeta.getMetastoreLocatorOsgi() != null) { + jobMeta.setEmbeddedMetastoreProviderKey( + jobMeta.getMetastoreLocatorOsgi().setEmbeddedMetastore(jobMeta.getEmbeddedMetaStore())); + } + if (extension.result != null) { prevResult = extension.result; } @@ -687,6 +694,7 @@ private Result execute(final int nr, Result prev_result, final JobEntryCopy jobE cloneJei.setMetaStore(rep.getMetaStore()); } cloneJei.setParentJob(this); + cloneJei.setParentJobMeta(this.getJobMeta()); final long start = System.currentTimeMillis(); cloneJei.getLogChannel().logDetailed("Starting job entry"); @@ -1785,9 +1793,10 @@ public static String sendToSlaveServer(JobMeta jobMeta, JobExecutionConfiguratio // Send the zip file over to the slave server... // - String result = - slaveServer.sendExport(topLevelResource.getArchiveName(), AddExportServlet.TYPE_JOB, topLevelResource - .getBaseResourceName()); + String result = slaveServer.sendExport( + topLevelResource.getArchiveName(), + RegisterPackageServlet.TYPE_JOB, + topLevelResource.getBaseResourceName()); WebResult webResult = WebResult.fromXMLString(result); if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK)) { throw new KettleException("There was an error passing the exported job to the remote server: " + Const.CR diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/job/JobMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/job/JobMeta.java index 37e7ca5..007c123 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/job/JobMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/job/JobMeta.java @@ -3,7 +3,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -61,6 +61,7 @@ import org.pentaho.di.job.entry.JobEntryInterface; import org.pentaho.di.repository.*; import org.pentaho.di.resource.*; +import org.pentaho.di.trans.steps.named.cluster.NamedClusterEmbedManager; import org.pentaho.metastore.api.IMetaStore; import org.w3c.dom.Document; import org.w3c.dom.Node; @@ -112,6 +113,11 @@ public class JobMeta extends AbstractMeta protected List extraLogTables; + /** + * The log channel interface. + */ + protected LogChannelInterface log; + /** * Constant = "SPECIAL" **/ @@ -198,6 +204,8 @@ public void clear() { // setInternalKettleVariables(); Don't clear the internal variables for // ad-hoc jobs, it's ruins the previews // etc. + + log = LogChannel.GENERAL; } /** @@ -300,65 +308,7 @@ public JobEntryCopy getDummy() { * @return 0 if the two jobs are equal, 1 or -1 depending on the values (see description above) */ public int compare(JobMeta j1, JobMeta j2) { - // If we don't have a filename, the jobs comes from a repository - // - if (Utils.isEmpty(j1.getFilename())) { - - if (!Utils.isEmpty(j2.getFilename())) { - return -1; - } - - // First compare names... - if (Utils.isEmpty(j1.getName()) && !Utils.isEmpty(j2.getName())) { - return -1; - } - if (!Utils.isEmpty(j1.getName()) && Utils.isEmpty(j2.getName())) { - return 1; - } - int cmpName = j1.getName().compareTo(j2.getName()); - if (cmpName != 0) { - return cmpName; - } - - // Same name, compare Repository directory... - int cmpDirectory = j1.getRepositoryDirectory().getPath().compareTo(j2.getRepositoryDirectory().getPath()); - if (cmpDirectory != 0) { - return cmpDirectory; - } - - // Same name, same directory, compare versions - if (j1.getObjectRevision() != null && j2.getObjectRevision() == null) { - return 1; - } - if (j1.getObjectRevision() == null && j2.getObjectRevision() != null) { - return -1; - } - if (j1.getObjectRevision() == null && j2.getObjectRevision() == null) { - return 0; - } - return j1.getObjectRevision().getName().compareTo(j2.getObjectRevision().getName()); - - } else { - if (Utils.isEmpty(j2.getFilename())) { - return 1; - } - - // First compare names - // - if (Utils.isEmpty(j1.getName()) && !Utils.isEmpty(j2.getName())) { - return -1; - } - if (!Utils.isEmpty(j1.getName()) && Utils.isEmpty(j2.getName())) { - return 1; - } - int cmpName = j1.getName().compareTo(j2.getName()); - if (cmpName != 0) { - return cmpName; - } - - // Same name, compare filenames... - return j1.getFilename().compareTo(j2.getFilename()); - } + return super.compare(j1, j2); } /** @@ -583,6 +533,9 @@ public String getDefaultExtension() { * @see org.pentaho.di.core.xml.XMLInterface#getXML() */ public String getXML() { + //Clear the embedded named clusters. We will be repopulating from steps that used named clusters + getNamedClusterEmbedManager().clear(); + Props props = null; if (Props.isInitialized()) { props = Props.getInstance(); @@ -1289,6 +1242,8 @@ public void addJobHop(int p, JobHopMeta hi) { public void removeJobEntry(int i) { JobEntryCopy deleted = jobcopies.remove(i); if (deleted != null) { + // give step a chance to cleanup + deleted.setParentJobMeta(null); if (deleted.getEntry() instanceof MissingEntry) { removeMissingEntry((MissingEntry) deleted.getEntry()); } @@ -2357,9 +2312,17 @@ public void setInternalKettleVariables(VariableSpace var) { variables.getVariable(Const.INTERNAL_VARIABLE_JOB_FILENAME_DIRECTORY)); } - variables.setVariable(Const.INTERNAL_VARIABLE_ENTRY_CURRENT_DIRECTORY, variables.getVariable( - repository != null ? Const.INTERNAL_VARIABLE_JOB_REPOSITORY_DIRECTORY - : Const.INTERNAL_VARIABLE_JOB_FILENAME_DIRECTORY)); + updateCurrentDir(); + } + + private void updateCurrentDir() { + String prevCurrentDir = variables.getVariable(Const.INTERNAL_VARIABLE_ENTRY_CURRENT_DIRECTORY); + String currentDir = variables.getVariable( + repository != null + ? Const.INTERNAL_VARIABLE_JOB_REPOSITORY_DIRECTORY + : Const.INTERNAL_VARIABLE_JOB_FILENAME_DIRECTORY); + variables.setVariable(Const.INTERNAL_VARIABLE_ENTRY_CURRENT_DIRECTORY, currentDir); + fireCurrentDirectoryChanged(prevCurrentDir, currentDir); } /** @@ -2641,6 +2604,15 @@ public RepositoryObjectType getRepositoryElementType() { return REPOSITORY_ELEMENT_TYPE; } + /** + * Gets the log channel. + * + * @return the log channel + */ + public LogChannelInterface getLogChannel() { + return log; + } + /** * Create a unique list of job entry interfaces * @@ -2806,4 +2778,12 @@ public void removeMissingEntry(MissingEntry missingEntry) { public boolean hasMissingPlugins() { return missingEntries != null && !missingEntries.isEmpty(); } + + @Override + public NamedClusterEmbedManager getNamedClusterEmbedManager() { + if (namedClusterEmbedManager == null) { + namedClusterEmbedManager = new NamedClusterEmbedManager(this, LogChannel.GENERAL); + } + return namedClusterEmbedManager; + } } diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/job/JobEntryJob.java b/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/job/JobEntryJob.java index 7315bee..a0dfc79 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/job/JobEntryJob.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/job/entries/job/JobEntryJob.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -29,6 +29,10 @@ import org.pentaho.di.core.exception.KettleDatabaseException; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleXMLException; +import org.pentaho.di.core.extension.ExtensionPointHandler; +import org.pentaho.di.core.extension.KettleExtensionPoint; +import org.pentaho.di.core.listeners.CurrentDirectoryChangedListener; +import org.pentaho.di.core.listeners.impl.EntryCurrentDirectoryChangedListener; import org.pentaho.di.core.logging.LogChannelFileWriter; import org.pentaho.di.core.logging.LogLevel; import org.pentaho.di.core.parameters.DuplicateParamException; @@ -110,10 +114,17 @@ public class JobEntryJob extends JobEntryBase implements Cloneable, JobEntryInte private boolean passingExport; + private String runConfiguration; + public static final LogLevel DEFAULT_LOG_LEVEL = LogLevel.NOTHING; private Job job; + private CurrentDirectoryChangedListener dirListener = new EntryCurrentDirectoryChangedListener( + this::getSpecificationMethod, + this::getDirectory, + this::setDirectory); + public JobEntryJob(String name) { super(name, ""); } @@ -198,6 +209,14 @@ public void setPassingExport(boolean passingExport) { this.passingExport = passingExport; } + public String getRunConfiguration() { + return runConfiguration; + } + + public void setRunConfiguration(String runConfiguration) { + this.runConfiguration = runConfiguration; + } + public String getLogFilename() { String retval = ""; if (setLogfile) { @@ -245,6 +264,10 @@ public String getXML() { // Ignore object reference problems. It simply means that the reference is no longer valid. } } + if (parentJobMeta != null) { + parentJobMeta.getNamedClusterEmbedManager().registerUrl(filename); + } + retval.append(" ").append(XMLHandler.addTagValue("filename", filename)); retval.append(" ").append(XMLHandler.addTagValue("jobname", jobname)); @@ -270,6 +293,7 @@ public String getXML() { retval.append(" ").append(XMLHandler.addTagValue("expand_remote_job", expandingRemoteJob)); retval.append(" ").append(XMLHandler.addTagValue("create_parent_folder", createParentFolder)); retval.append(" ").append(XMLHandler.addTagValue("pass_export", passingExport)); + retval.append(" ").append(XMLHandler.addTagValue("run_configuration", runConfiguration)); if (arguments != null) { for (int i = 0; i < arguments.length; i++) { @@ -355,6 +379,7 @@ public void loadXML(Node entrynode, List databases, List new Trans(meta)).get(); trans.setParent(this); // Pass the socket repository as early as possible... @@ -1675,48 +1684,15 @@ public Object loadReferencedObject(int index, Repository rep, IMetaStore metaSto return getTransMeta(rep, metaStore, space); } - /** - * Creates the appropriate trans. Either - * 1) A {@link TransEngineAdapter} wrapping an {@link Engine} - * if an alternate execution engine has been selected - * 2) A legacy {@link Trans} otherwise. - */ - private Trans createTrans(TransMeta transMeta) throws KettleException { - if (Utils.isEmpty(transMeta.getVariable("engine"))) { - log.logBasic("Using legacy execution engine"); - return new Trans(transMeta); - } - - return PluginRegistry.getInstance().getPlugins(EnginePluginType.class).stream() - .filter(useThisEngine(transMeta)) - .findFirst() - .map(plugin -> (Engine) loadPlugin(plugin)) - .map(engine -> { - log.logBasic("Using execution engine " + engine.getClass().getCanonicalName()); - return (Trans) new TransEngineAdapter(engine, transMeta); - }) - .orElseThrow(() -> new KettleException("Unable to find engine [" + transMeta.getVariable("engine") + "]")); - } - - /** - * Uses a trans variable called "engine" to determine which engine to use. - * Will be replaced when UI engine selection is available. - * - * @return - */ - private Predicate useThisEngine(TransMeta transMeta) { - return plugin -> Arrays.stream(plugin.getIds()) - .filter(id -> id.equals((transMeta.getVariable("engine")))) - .findAny() - .isPresent(); - } - - private Object loadPlugin(PluginInterface plugin) { - try { - return PluginRegistry.getInstance().loadClass(plugin); - } catch (KettlePluginException e) { - throw new RuntimeException(e); + @Override + public void setParentJobMeta(JobMeta parentJobMeta) { + JobMeta previous = getParentJobMeta(); + super.setParentJobMeta(parentJobMeta); + if (parentJobMeta != null) { + parentJobMeta.addCurrentDirectoryChangedListener(currentDirListener); + variables.setParentVariableSpace(parentJobMeta); + } else if (previous != null) { + previous.removeCurrentDirectoryChangedListener(currentDirListener); } } - } diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/repository/RepositoriesMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/repository/RepositoriesMeta.java index 3de9ef2..08efff9 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/repository/RepositoriesMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/repository/RepositoriesMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/repository/RepositoryMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/repository/RepositoryMeta.java index 56b5af3..e022ab7 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/repository/RepositoryMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/repository/RepositoryMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/resource/ResourceDefinition.java b/pentaho-kettle/src/main/java/org/pentaho/di/resource/ResourceDefinition.java index cb35b04..6540254 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/resource/ResourceDefinition.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/resource/ResourceDefinition.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/resource/ResourceUtil.java b/pentaho-kettle/src/main/java/org/pentaho/di/resource/ResourceUtil.java index 133ce28..12b6559 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/resource/ResourceUtil.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/resource/ResourceUtil.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/StepWithMappingMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/StepWithMappingMeta.java index a66f0c0..8b53360 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/StepWithMappingMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/StepWithMappingMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java index 9ed25a6..9d50ded 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java @@ -3,7 +3,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/TransMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/TransMeta.java index bcd2f06..b7b8962 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/TransMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/TransMeta.java @@ -3,7 +3,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/append/AppendMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/append/AppendMeta.java deleted file mode 100644 index e602203..0000000 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/append/AppendMeta.java +++ /dev/null @@ -1,230 +0,0 @@ -/*! ****************************************************************************** - * - * Pentaho Data Integration - * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com - * - ******************************************************************************* - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - ******************************************************************************/ - -package org.pentaho.di.trans.steps.append; - -import org.pentaho.di.core.CheckResult; -import org.pentaho.di.core.CheckResultInterface; -import org.pentaho.di.core.database.DatabaseMeta; -import org.pentaho.di.core.exception.KettleException; -import org.pentaho.di.core.exception.KettleStepException; -import org.pentaho.di.core.exception.KettleXMLException; -import org.pentaho.di.core.injection.Injection; -import org.pentaho.di.core.injection.InjectionSupported; -import org.pentaho.di.core.row.RowMetaInterface; -import org.pentaho.di.core.variables.VariableSpace; -import org.pentaho.di.core.xml.XMLHandler; -import org.pentaho.di.i18n.BaseMessages; -import org.pentaho.di.repository.ObjectId; -import org.pentaho.di.repository.Repository; -import org.pentaho.di.trans.Trans; -import org.pentaho.di.trans.TransMeta; -import org.pentaho.di.trans.TransMeta.TransformationType; -import org.pentaho.di.trans.step.*; -import org.pentaho.di.trans.step.errorhandling.Stream; -import org.pentaho.di.trans.step.errorhandling.StreamIcon; -import org.pentaho.di.trans.step.errorhandling.StreamInterface; -import org.pentaho.di.trans.step.errorhandling.StreamInterface.StreamType; -import org.pentaho.di.trans.steps.StreamingSteps; -import org.pentaho.metastore.api.IMetaStore; -import org.w3c.dom.Node; - -import java.util.List; - -/** - * @author Sven Boden - * @since 3-june-2007 - */ -@InjectionSupported(localizationPrefix = "AppendMeta.Injection.") -public class AppendMeta extends BaseStepMeta implements StepMetaInterface { - private static Class PKG = Append.class; // for i18n purposes, needed by Translator2!! - - private StreamingSteps inputSteps; - - @Injection(name = "HEAD_STEP") - public String headStepname; - @Injection(name = "TAIL_STEP") - public String tailStepname; - - public AppendMeta() { - super(); // allocate BaseStepMeta - } - - public void loadXML(Node stepnode, List databases, IMetaStore metaStore) throws KettleXMLException { - readData(stepnode); - } - - public Object clone() { - AppendMeta retval = (AppendMeta) super.clone(); - - return retval; - } - - public String getXML() { - StringBuilder retval = new StringBuilder(); - - List infoStreams = getStepIOMeta().getInfoStreams(); - // retval.append(XMLHandler.addTagValue("head_name", infoStreams.get(0).getStepname())); - // retval.append(XMLHandler.addTagValue("tail_name", infoStreams.get(1).getStepname())); - retval.append(XMLHandler.addTagValue("head_name", - inputSteps == null ? infoStreams.get(0).getStepname() : inputSteps.getStepName())); - retval.append(XMLHandler.addTagValue("tail_name", - inputSteps == null ? infoStreams.get(1).getStepname() : inputSteps.getStepName(1))); - - return retval.toString(); - } - - private void readData(Node stepnode) throws KettleXMLException { - try { - List infoStreams = getStepIOMeta().getInfoStreams(); - StreamInterface headStream = infoStreams.get(0); - StreamInterface tailStream = infoStreams.get(1); - headStream.setSubject(XMLHandler.getTagValue(stepnode, "head_name")); - tailStream.setSubject(XMLHandler.getTagValue(stepnode, "tail_name")); - inputSteps = new StreamingSteps(this); - } catch (Exception e) { - throw new KettleXMLException(BaseMessages.getString(PKG, "AppendMeta.Exception.UnableToLoadStepInfo"), e); - } - } - - public void setDefault() { - } - - public void readRep(Repository rep, IMetaStore metaStore, ObjectId id_step, List databases) throws KettleException { - try { - List infoStreams = getStepIOMeta().getInfoStreams(); - StreamInterface headStream = infoStreams.get(0); - StreamInterface tailStream = infoStreams.get(1); - headStream.setSubject(rep.getStepAttributeString(id_step, "head_name")); - tailStream.setSubject(rep.getStepAttributeString(id_step, "tail_name")); - inputSteps = new StreamingSteps(this); - } catch (Exception e) { - throw new KettleException(BaseMessages.getString( - PKG, "AppendMeta.Exception.UnexpectedErrorReadingStepInfo"), e); - } - } - - public void saveRep(Repository rep, IMetaStore metaStore, ObjectId id_transformation, ObjectId id_step) throws KettleException { - try { - List infoStreams = getStepIOMeta().getInfoStreams(); - StreamInterface headStream = infoStreams.get(0); - StreamInterface tailStream = infoStreams.get(1); - // rep.saveStepAttribute(id_transformation, id_step, "head_name", headStream.getStepname()); - // rep.saveStepAttribute(id_transformation, id_step, "tail_name", tailStream.getStepname()); - rep.saveStepAttribute(id_transformation, id_step, "head_name", - inputSteps == null ? headStream.getStepname() : inputSteps.getStepName()); - rep.saveStepAttribute(id_transformation, id_step, "tail_name", - inputSteps == null ? tailStream.getStepname() : inputSteps.getStepName(1)); - } catch (Exception e) { - throw new KettleException(BaseMessages.getString(PKG, "AppendMeta.Exception.UnableToSaveStepInfo") - + id_step, e); - } - } - - @Override - public void searchInfoAndTargetSteps(List steps) { - for (StreamInterface stream : getStepIOMeta().getInfoStreams()) { - stream.setStepMeta(StepMeta.findStep(steps, (String) stream.getSubject())); - } - } - - public boolean chosesTargetSteps() { - return false; - } - - public String[] getTargetSteps() { - return null; - } - - public void getFields(RowMetaInterface r, String name, RowMetaInterface[] info, StepMeta nextStep, - VariableSpace space, Repository repository, IMetaStore metaStore) throws KettleStepException { - // We don't have any input fields here in "r" as they are all info fields. - // So we just take the info fields. - // - if (info != null) { - if (info.length > 0 && info[0] != null) { - r.mergeRowMeta(info[0]); - } - } - } - - public void check(List remarks, TransMeta transMeta, StepMeta stepMeta, - RowMetaInterface prev, String[] input, String[] output, RowMetaInterface info, VariableSpace space, - Repository repository, IMetaStore metaStore) { - CheckResult cr; - - List infoStreams = getStepIOMeta().getInfoStreams(); - StreamInterface headStream = infoStreams.get(0); - StreamInterface tailStream = infoStreams.get(1); - - if (headStream.getStepname() != null && tailStream.getStepname() != null) { - cr = - new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString( - PKG, "AppendMeta.CheckResult.SourceStepsOK"), stepMeta); - remarks.add(cr); - } else if (headStream.getStepname() == null && tailStream.getStepname() == null) { - cr = - new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, BaseMessages.getString( - PKG, "AppendMeta.CheckResult.SourceStepsMissing"), stepMeta); - remarks.add(cr); - } else { - cr = - new CheckResult(CheckResultInterface.TYPE_RESULT_OK, BaseMessages.getString( - PKG, "AppendMeta.CheckResult.OneSourceStepMissing"), stepMeta); - remarks.add(cr); - } - } - - public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta tr, - Trans trans) { - return new Append(stepMeta, stepDataInterface, cnr, tr, trans); - } - - public StepDataInterface getStepData() { - return new AppendData(); - } - - /** - * Returns the Input/Output metadata for this step. - */ - public StepIOMetaInterface getStepIOMeta() { - if (ioMeta == null) { - - ioMeta = new StepIOMeta(true, true, false, false, false, false); - - ioMeta.addStream(new Stream(StreamType.INFO, null, BaseMessages.getString( - PKG, "AppendMeta.InfoStream.FirstStream.Description"), StreamIcon.INFO, null)); - ioMeta.addStream(new Stream(StreamType.INFO, null, BaseMessages.getString( - PKG, "AppendMeta.InfoStream.SecondStream.Description"), StreamIcon.INFO, null)); - } - - return ioMeta; - } - - @Override - public void resetStepIoMeta() { - } - - public TransformationType[] getSupportedTransformationTypes() { - return new TransformationType[]{TransformationType.Normal,}; - } -} diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelinput/ExcelInput.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelinput/ExcelInput.java index eca9019..43585da 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelinput/ExcelInput.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelinput/ExcelInput.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/exceloutput/ExcelOutput.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/exceloutput/ExcelOutput.java index 36b5c9c..7cfa5e9 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/exceloutput/ExcelOutput.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/exceloutput/ExcelOutput.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelwriter/ExcelWriterStep.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelwriter/ExcelWriterStep.java index cb775eb..3df038d 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelwriter/ExcelWriterStep.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelwriter/ExcelWriterStep.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelwriter/ExcelWriterStepMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelwriter/ExcelWriterStepMeta.java index d26d755..6472098 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelwriter/ExcelWriterStepMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/excelwriter/ExcelWriterStepMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/filterrows/FilterRowsMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/filterrows/FilterRowsMeta.java index 90ec9aa..8330273 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/filterrows/FilterRowsMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/filterrows/FilterRowsMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/javafilter/JavaFilterMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/javafilter/JavaFilterMeta.java index 8b574d9..810cb18 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/javafilter/JavaFilterMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/javafilter/JavaFilterMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -47,6 +47,7 @@ import org.w3c.dom.Node; import java.util.List; +import java.util.Objects; /** * Contains the meta-data for the java filter step: calculates conditions using Janino @@ -114,6 +115,11 @@ public boolean equals(Object obj) { return false; } + @Override + public int hashCode() { + return Objects.hash(getStepIOMeta().getTargetStreams(), condition); + } + public Object clone() { JavaFilterMeta retval = (JavaFilterMeta) super.clone(); return retval; diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/jobexecutor/JobExecutorMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/jobexecutor/JobExecutorMeta.java index 9ef2c1d..5618692 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/jobexecutor/JobExecutorMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/jobexecutor/JobExecutorMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mail/Mail.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mail/Mail.java index b38f89d..395a51f 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mail/Mail.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mail/Mail.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mergejoin/MergeJoinMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mergejoin/MergeJoinMeta.java index 4287f93..1fcc739 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mergejoin/MergeJoinMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mergejoin/MergeJoinMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mergerows/MergeRowsMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mergerows/MergeRowsMeta.java index e3564b4..2e8a694 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mergerows/MergeRowsMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/mergerows/MergeRowsMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/tableinput/TableInputMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/tableinput/TableInputMeta.java index 1ef4f5a..693c26c 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/tableinput/TableInputMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/tableinput/TableInputMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -455,9 +455,9 @@ public void analyseImpact(List impact, TransMeta transMeta, Step RowMetaInterface prev, String[] input, String[] output, RowMetaInterface info, Repository repository, IMetaStore metaStore) throws KettleStepException { - if (stepMeta.getName().equalsIgnoreCase("cdc_cust")) { - System.out.println("HERE!"); - } + // if (stepMeta.getName().equalsIgnoreCase("cdc_cust")) { + // System.out.println("HERE!"); + // } // Find the lookupfields... RowMetaInterface out = new RowMeta(); diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/transexecutor/TransExecutorMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/transexecutor/TransExecutorMeta.java index 159aa17..a93a4b9 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/transexecutor/TransExecutorMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/transexecutor/TransExecutorMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClass.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClass.java index 52ba00c..b271ed5 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClass.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClass.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClassMeta.java b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClassMeta.java index 66a65f5..9f05ad2 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClassMeta.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/trans/steps/userdefinedjavaclass/UserDefinedJavaClassMeta.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -478,9 +478,8 @@ public String getXML() { XMLHandler.addTagValue(ElementNames.class_type.name(), def.getClassType().name())); retval.append("\n ").append( XMLHandler.addTagValue(ElementNames.class_name.name(), def.getClassName())); - retval.append("\n ").append(XMLHandler.openTag(ElementNames.class_source.name())); - retval.append(XMLHandler.buildCDATA(def.getSource())).append( - XMLHandler.closeTag(ElementNames.class_source.name())); + retval.append("\n "); + retval.append(XMLHandler.addTagValue(ElementNames.class_source.name(), def.getSource())); retval.append(String.format("\n ", ElementNames.definition.name())); } retval.append(String.format("\n ", ElementNames.definitions.name())); diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/Carte.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/Carte.java index e1c2eac..af80464 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/Carte.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/Carte.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteServlet.java index 63af4ad..64d1843 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteServlet.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java index 50691c7..3edf50a 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetJobStatusServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetJobStatusServlet.java index 3e0063b..e5e8fa8 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetJobStatusServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetJobStatusServlet.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetStatusServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetStatusServlet.java index ea2ba8d..440b8e7 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetStatusServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetStatusServlet.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetTransStatusServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetTransStatusServlet.java index 3f1bc68..67b6e7a 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/GetTransStatusServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/GetTransStatusServlet.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/SlaveServerJobStatus.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/SlaveServerJobStatus.java index d65f135..1924d1a 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/SlaveServerJobStatus.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/SlaveServerJobStatus.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/SlaveServerTransStatus.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/SlaveServerTransStatus.java index a4118ce..28bf183 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/SlaveServerTransStatus.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/SlaveServerTransStatus.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/StartExecutionTransServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/StartExecutionTransServlet.java index 990dcf3..ef46029 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/StartExecutionTransServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/StartExecutionTransServlet.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/StartJobServlet.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/StartJobServlet.java index cc52c95..a687f47 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/StartJobServlet.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/StartJobServlet.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/WebServer.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/WebServer.java index 86fbb4d..bf45e53 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/WebServer.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/WebServer.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * diff --git a/pentaho-platform/lib/DIS-7.1.0.5-70.jar b/pentaho-platform/lib/DIS-8.0.0.0-1.jar similarity index 58% rename from pentaho-platform/lib/DIS-7.1.0.5-70.jar rename to pentaho-platform/lib/DIS-8.0.0.0-1.jar index 965280c..360b651 100644 Binary files a/pentaho-platform/lib/DIS-7.1.0.5-70.jar and b/pentaho-platform/lib/DIS-8.0.0.0-1.jar differ diff --git a/pentaho-platform/lib/kettle5-log4j-plugin-7.1.0.5-70.jar b/pentaho-platform/lib/kettle-log4j-core-8.0.0.0-1.jar similarity index 50% rename from pentaho-platform/lib/kettle5-log4j-plugin-7.1.0.5-70.jar rename to pentaho-platform/lib/kettle-log4j-core-8.0.0.0-1.jar index e509cdb..520e65b 100644 Binary files a/pentaho-platform/lib/kettle5-log4j-plugin-7.1.0.5-70.jar and b/pentaho-platform/lib/kettle-log4j-core-8.0.0.0-1.jar differ diff --git a/pentaho-platform/pom.xml b/pentaho-platform/pom.xml index a6fe2e5..1e42ef9 100644 --- a/pentaho-platform/pom.xml +++ b/pentaho-platform/pom.xml @@ -4,7 +4,7 @@ com.github.zhicwu pdi-cluster - 7.1.0.5-SNAPSHOT + 8.0.0.0-SNAPSHOT pentaho-platform jar @@ -25,14 +25,19 @@ ${pentaho-ce.version} - pentaho-reporting-engine - pentaho-reporting-engine-classic-core - 7.1-SNAPSHOT + pentaho + pentaho-platform-core + ${pentaho-ce.version} - pentaho-reporting-engine - pentaho-reporting-engine-classic-core-platform-plugin - 7.1-SNAPSHOT + pentaho + pentaho-platform-extensions + ${pentaho-ce.version} + + + org.pentaho.reporting.plugin + classic-core-platform-plugin + ${pentaho-ce.version} pentaho @@ -78,10 +83,10 @@ pentaho - kettle5-log4j-plugin + kettle-log4j-plugin ${pentaho-ce.version} system - ${basedir}/lib/kettle5-log4j-plugin-${pentaho-ce.version}.jar + ${basedir}/lib/kettle-log4j-core-${pentaho-ce.version}.jar diff --git a/pentaho-platform/src/main/java/org/pentaho/platform/engine/services/connection/datasource/dbcp/PooledDatasourceHelper.java b/pentaho-platform/src/main/java/org/pentaho/platform/engine/services/connection/datasource/dbcp/PooledDatasourceHelper.java index 8e7b916..a9348b0 100644 --- a/pentaho-platform/src/main/java/org/pentaho/platform/engine/services/connection/datasource/dbcp/PooledDatasourceHelper.java +++ b/pentaho-platform/src/main/java/org/pentaho/platform/engine/services/connection/datasource/dbcp/PooledDatasourceHelper.java @@ -13,7 +13,7 @@ * See the GNU General Public License for more details. * * - * Copyright 2006 - 2017 Pentaho Corporation. All rights reserved. + * Copyright 2006 - 2017 Hitachi Vantara. All rights reserved. */ package org.pentaho.platform.engine.services.connection.datasource.dbcp; diff --git a/pentaho-platform/src/main/java/org/pentaho/platform/plugin/kettle/PdiAction.java b/pentaho-platform/src/main/java/org/pentaho/platform/plugin/kettle/PdiAction.java index 1012e70..6844b0d 100644 --- a/pentaho-platform/src/main/java/org/pentaho/platform/plugin/kettle/PdiAction.java +++ b/pentaho-platform/src/main/java/org/pentaho/platform/plugin/kettle/PdiAction.java @@ -12,7 +12,7 @@ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * -* Copyright (c) 2002-2017 Pentaho Corporation.. All rights reserved. +* Copyright (c) 2002-2017 Hitachi Vantara.. All rights reserved. */ package org.pentaho.platform.plugin.kettle; diff --git a/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/ActionAdapterQuartzJob.java b/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/ActionAdapterQuartzJob.java index 03eac7d..ce3f677 100644 --- a/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/ActionAdapterQuartzJob.java +++ b/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/ActionAdapterQuartzJob.java @@ -12,7 +12,7 @@ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * - * Copyright (c) 2002-2016 Pentaho Corporation.. All rights reserved. + * Copyright (c) 2002-2017 Hitachi Vantara.. All rights reserved. */ package org.pentaho.platform.scheduler2.quartz; @@ -21,10 +21,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.pentaho.platform.api.action.IAction; -import org.pentaho.platform.api.action.IPostProcessingAction; -import org.pentaho.platform.api.action.IStreamingAction; -import org.pentaho.platform.api.action.IVarArgsAction; +import org.pentaho.platform.api.action.*; import org.pentaho.platform.api.engine.IPluginManager; import org.pentaho.platform.api.engine.PluginBeanException; import org.pentaho.platform.api.repository.IContentItem; @@ -38,22 +35,24 @@ import org.pentaho.platform.engine.core.system.PentahoSystem; import org.pentaho.platform.engine.security.SecurityHelper; import org.pentaho.platform.engine.services.solution.ActionSequenceCompatibilityFormatter; +import org.pentaho.platform.scheduler2.action.DefaultActionInvoker; import org.pentaho.platform.scheduler2.blockout.BlockoutAction; import org.pentaho.platform.scheduler2.email.Emailer; import org.pentaho.platform.scheduler2.messsages.Messages; +import org.pentaho.platform.util.ActionUtil; +import org.pentaho.platform.util.StringUtil; import org.pentaho.platform.util.beans.ActionHarness; import org.pentaho.platform.util.messages.LocaleHelper; import org.pentaho.platform.util.web.MimeHelper; +import org.pentaho.platform.workitem.WorkItemLifecycleEventUtil; +import org.pentaho.platform.workitem.WorkItemLifecyclePhase; import org.quartz.Job; import org.quartz.*; import java.io.OutputStream; import java.io.Serializable; import java.text.MessageFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; +import java.util.*; import java.util.concurrent.Callable; import static org.pentaho.platform.scheduler2.quartz.QuartzSchedulerHelper.KEY_ETL_JOB_ID; @@ -67,62 +66,9 @@ public class ActionAdapterQuartzJob implements Job { static final Log log = LogFactory.getLog(ActionAdapterQuartzJob.class); - private static final long RETRY_COUNT = 6; - private static final long RETRY_SLEEP_AMOUNT = 10000; - - private String outputFilePath = null; - // Without "final" here it's kind of scary... - private final Object lock = new Object(); - - protected Class resolveClass(JobDataMap jobDataMap) throws PluginBeanException, JobExecutionException { - String actionClass = jobDataMap.getString(QuartzScheduler.RESERVEDMAPKEY_ACTIONCLASS); - String actionId = jobDataMap.getString(QuartzScheduler.RESERVEDMAPKEY_ACTIONID); - - Class clazz = null; - - if (StringUtils.isEmpty(actionId) && StringUtils.isEmpty(actionClass)) { - /* - StringBuilder sb = new StringBuilder(); - for(Object key : jobDataMap.keySet()) { - Object value = jobDataMap.get(key); - sb.append("\t* ").append(key).append('=').append(value).append('/') - .append(value == null ? "" : value.getClass().getName()).append('\n'); - } - log.error(sb.toString()); - */ - throw new LoggingJobExecutionException(Messages.getInstance().getErrorString( - "ActionAdapterQuartzJob.ERROR_0001_REQUIRED_PARAM_MISSING", //$NON-NLS-1$ - QuartzScheduler.RESERVEDMAPKEY_ACTIONCLASS, QuartzScheduler.RESERVEDMAPKEY_ACTIONID)); - } - for (int i = 0; i < RETRY_COUNT; i++) { - try { - if (!StringUtils.isEmpty(actionId)) { - IPluginManager pluginManager = PentahoSystem.get(IPluginManager.class); - clazz = pluginManager.loadClass(actionId); - return clazz; - } else if (!StringUtils.isEmpty(actionClass)) { - clazz = Class.forName(actionClass); - return clazz; - } - } catch (Throwable t) { - try { - Thread.sleep(RETRY_SLEEP_AMOUNT); - } catch (InterruptedException ie) { - log.info(ie.getMessage(), ie); - } - } - } + private IActionInvoker actionInvoker = new DefaultActionInvoker(); // default - // we have failed to locate the class for the actionClass - // and we're giving up waiting for it to become available/registered - // which can typically happen at system startup - throw new LoggingJobExecutionException(Messages.getInstance().getErrorString( - "ActionAdapterQuartzJob.ERROR_0002_FAILED_TO_CREATE_ACTION", //$NON-NLS-1$ - StringUtils.isEmpty(actionId) ? actionClass : actionId)); - } - - @SuppressWarnings("unchecked") public void execute(JobExecutionContext context) throws JobExecutionException { Scheduler scheduler = null; try { @@ -137,349 +83,171 @@ public void execute(JobExecutionContext context) throws JobExecutionException { QuartzSchedulerHelper.Phase.EXECUTION, scheduler, context.getJobDetail(), context); JobDataMap jobDataMap = context.getMergedJobDataMap(); - String actionUser = jobDataMap.getString(QuartzScheduler.RESERVEDMAPKEY_ACTIONUSER); - - Object bean; - Class actionClass = null; - try { - actionClass = resolveClass(jobDataMap); - bean = actionClass.newInstance(); - } catch (Exception e) { - throw new LoggingJobExecutionException(Messages.getInstance().getErrorString( - "ActionAdapterQuartzJob.ERROR_0002_FAILED_TO_CREATE_ACTION", //$NON-NLS-1$ - String.valueOf(context.getJobDetail())), e); //$NON-NLS-1$ - } - - if (!(bean instanceof IAction)) { - throw new LoggingJobExecutionException(Messages.getInstance().getErrorString( - "ActionAdapterQuartzJob.ERROR_0003_ACTION_WRONG_TYPE", actionClass.getName(), //$NON-NLS-1$ - IAction.class.getName())); - } + String actionUser = jobDataMap.getString( QuartzScheduler.RESERVEDMAPKEY_ACTIONUSER ); - final IAction actionBean = (IAction) bean; + final String actionClassName = jobDataMap.getString( QuartzScheduler.RESERVEDMAPKEY_ACTIONCLASS ); + final String actionId = jobDataMap.getString( QuartzScheduler.RESERVEDMAPKEY_ACTIONID ); try { - invokeAction(actionBean, actionUser, context, jobDataMap.getWrappedMap()); - - } catch (Throwable t) { - // ensure that scheduler thread isn't blocked on lock - synchronized (lock) { - lock.notifyAll(); - } + invokeAction( actionClassName, actionId, actionUser, context, jobDataMap.getWrappedMap() ); + } catch ( Throwable t ) { // We should not distinguish between checked and unchecked exceptions here. All job execution failures // should result in a rethrow of a quartz exception - throw new LoggingJobExecutionException(Messages.getInstance().getErrorString( - "ActionAdapterQuartzJob.ERROR_0004_ACTION_FAILED", actionBean //$NON-NLS-1$ - .getClass().getName()), t); + throw new LoggingJobExecutionException( Messages.getInstance().getErrorString( + "ActionAdapterQuartzJob.ERROR_0004_ACTION_FAILED", //$NON-NLS-1$ + getActionIdentifier( null, actionClassName, actionId ) ), t ); } } + private static String getActionIdentifier( final IAction actionBean, final String actionClassName, final String + actionId ) { + if ( actionBean != null ) { + return actionBean.getClass().getName(); + } else if ( !StringUtil.isEmpty( actionClassName ) ) { + return actionClassName; + } else if ( !StringUtil.isEmpty( actionId ) ) { + return actionId; + } + return "?"; //$NON-NLS-1$ + } + + /** + * @deprecated as of 8.0, use {@link #invokeAction(String, String, String, JobExecutionContext, Map)}} instead + */ + @Deprecated protected void invokeAction(final IAction actionBean, final String actionUser, final JobExecutionContext context, final Map params) throws Exception { - final IScheduler scheduler = PentahoSystem.getObjectFactory().get(IScheduler.class, "IScheduler2", null); - final Map jobParams = new HashMap(params); // shallow copy - // remove the scheduling infrastructure properties - params.remove(QuartzScheduler.RESERVEDMAPKEY_ACTIONCLASS); - params.remove(QuartzScheduler.RESERVEDMAPKEY_ACTIONID); - params.remove(QuartzScheduler.RESERVEDMAPKEY_ACTIONUSER); - Object objsp = params.get(QuartzScheduler.RESERVEDMAPKEY_STREAMPROVIDER); - IBackgroundExecutionStreamProvider sp = null; - if (objsp != null && IBackgroundExecutionStreamProvider.class.isAssignableFrom(objsp.getClass())) { - sp = (IBackgroundExecutionStreamProvider) objsp; - } - final IBackgroundExecutionStreamProvider streamProvider = sp; - params.remove(QuartzScheduler.RESERVEDMAPKEY_STREAMPROVIDER); - params.remove(QuartzScheduler.RESERVEDMAPKEY_UIPASSPARAM); - // The scheduled_fire_time is useful only to the blockoutAction see PDI-10171 - if (actionBean instanceof BlockoutAction) { - params.put(IBlockoutManager.SCHEDULED_FIRE_TIME, context.getScheduledFireTime()); - } + } - if (log.isDebugEnabled()) { - log.debug(MessageFormat.format( - "Scheduling system invoking action {0} as user {1} with params [ {2} ]", actionBean //$NON-NLS-1$ - .getClass().getName(), actionUser, QuartzScheduler.prettyPrintMap(params))); + /** + * Invokes the {@link IAction} bean that is created from the provided {@code actionClassName} and {@code actionId} as + * the provided {@code actionUser}. If the {@code IAction} execution fails as-is, the scheduler attempts to re-create + * the job that will try to invoke the {@link IAction} again. + * + * @param actionClassName The class name of the {@link IAction} bean; used as a backup, if the {@code actionId} is not + * available or vald + * @param actionId The bean id of the {@link IAction} requested to be invoked. + * @param actionUser The user invoking the {@link IAction} + * @param context the {@code JobExecutionContext} + * @param params the {@link Map} or parameters needed to invoke the {@link IAction} + * @throws Exception when the {@code IAction} cannot be invoked for some reason. + */ + protected void invokeAction( final String actionClassName, final String actionId, final String actionUser, final + JobExecutionContext context, final Map params ) throws Exception { + + final String workItemUid = ActionUtil.extractUid( params ); + + WorkItemLifecycleEventUtil.publish( workItemUid, params, WorkItemLifecyclePhase.SUBMITTED ); + + // creates an instance of IActionInvoker, which knows how to invoke this IAction - if the IActionInvoker bean is + // not defined through spring, fall back on the default action invoker + final IActionInvoker actionInvoker = Optional.ofNullable( PentahoSystem.get( IActionInvoker.class ) ).orElse( + getActionInvoker() ); + // Instantiate the requested IAction bean + final IAction actionBean = (IAction) ActionUtil.createActionBean( actionClassName, actionId ); + + if ( actionInvoker == null || actionBean == null ) { + final String failureMessage = Messages.getInstance().getErrorString( + "ActionAdapterQuartzJob.ERROR_0002_FAILED_TO_CREATE_ACTION", //$NON-NLS-1$ + getActionIdentifier( null, actionClassName, actionId ), StringUtil.getMapAsPrettyString( params ) ); + WorkItemLifecycleEventUtil.publish( workItemUid, params, WorkItemLifecyclePhase.FAILED, failureMessage ); + throw new LoggingJobExecutionException( failureMessage ); } - Callable actionBeanRunner = new Callable() { - - public Boolean call() throws Exception { - LocaleHelper.setLocaleOverride((Locale) params.get(LocaleHelper.USER_LOCALE_PARAM)); - // sync job params to the action bean - ActionHarness actionHarness = new ActionHarness(actionBean); - boolean updateJob = false; - - final Map actionParams = new HashMap(); - actionParams.putAll(params); - if (streamProvider != null) { - actionParams.put("inputStream", streamProvider.getInputStream()); - } - actionHarness.setValues(actionParams, new ActionSequenceCompatibilityFormatter()); - - if (actionBean instanceof IVarArgsAction) { - actionParams.remove("inputStream"); - actionParams.remove("outputStream"); - ((IVarArgsAction) actionBean).setVarArgs(actionParams); - } - - boolean waitForFileCreated = false; - OutputStream stream = null; - - // FIXME this is rude but I really want an empty file created each time after running a job/trans - Object map = params.get(RESERVEDMAPKEY_PARAMETERS); - boolean isManaged = map instanceof Map ? ((Map) map).containsKey(KEY_ETL_JOB_ID) : false; - - if (!isManaged && streamProvider != null) { - actionParams.remove("inputStream"); - if (actionBean instanceof IStreamingAction) { - streamProvider.setStreamingAction((IStreamingAction) actionBean); - } - - // BISERVER-9414 - validate that output path still exist - SchedulerOutputPathResolver resolver = - new SchedulerOutputPathResolver(streamProvider.getOutputPath(), actionUser); - String outputPath = resolver.resolveOutputFilePath(); - actionParams.put("useJcr", Boolean.TRUE); - actionParams.put("jcrOutputPath", outputPath.substring(0, outputPath.lastIndexOf("/"))); + if ( actionBean instanceof BlockoutAction ) { + params.put( IBlockoutManager.SCHEDULED_FIRE_TIME, context.getScheduledFireTime() ); + } - if (!outputPath.equals(streamProvider.getOutputPath())) { - streamProvider.setOutputFilePath(outputPath); // set fallback path - updateJob = true; // job needs to be deleted and recreated with the new output path - } + // Invoke the action and get the status of the invocation + final IActionInvokeStatus status = actionInvoker.invokeAction( actionBean, actionUser, params ); - stream = streamProvider.getOutputStream(); - if (stream instanceof ISourcesStreamEvents) { - ((ISourcesStreamEvents) stream).addListener(new IStreamListener() { - public void fileCreated(final String filePath) { - synchronized (lock) { - outputFilePath = filePath; - lock.notifyAll(); - } - } - }); - waitForFileCreated = true; - } - actionParams.put("outputStream", stream); - // The lineage_id is only useful for the metadata and not needed at this level see PDI-10171 - actionParams.remove(QuartzScheduler.RESERVEDMAPKEY_LINEAGE_ID); - actionHarness.setValues(actionParams); - } - - actionBean.execute(); - - if (stream != null) { - IOUtils.closeQuietly(stream); - } - - if (waitForFileCreated) { - synchronized (lock) { - if (outputFilePath == null) { - lock.wait(); - } - } - sendEmail(actionParams, params, outputFilePath); - } - if (actionBean instanceof IPostProcessingAction) { - closeContentOutputStreams((IPostProcessingAction) actionBean); - markContentAsGenerated((IPostProcessingAction) actionBean); - } - return updateJob; + // Status may not be available for remote execution, which is expected + if ( status == null ) { + if ( log.isWarnEnabled() ) { + log.warn( Messages.getInstance().getErrorString( + "ActionAdapterQuartzJob.WARN_0002_NO_STATUS", //$NON-NLS-1$ + getActionIdentifier( actionBean, actionClassName, actionId ), StringUtil.getMapAsPrettyString( params ) ) ); } + return; + } - private void closeContentOutputStreams(IPostProcessingAction actionBean) { - for (IContentItem contentItem : actionBean.getActionOutputContents()) { - contentItem.closeOutputStream(); - } - } + final boolean requiresUpdate = status.requiresUpdate(); + final Throwable throwable = status.getThrowable(); + Object objsp = status.getStreamProvider(); + IBackgroundExecutionStreamProvider sp = null; + if ( objsp != null && IBackgroundExecutionStreamProvider.class.isAssignableFrom( objsp.getClass() ) ) { + sp = (IBackgroundExecutionStreamProvider) objsp; + } + final IBackgroundExecutionStreamProvider streamProvider = sp; - private void markContentAsGenerated(IPostProcessingAction actionBean) { - IUnifiedRepository repo = PentahoSystem.get(IUnifiedRepository.class); - String lineageId = (String) params.get(QuartzScheduler.RESERVEDMAPKEY_LINEAGE_ID); - for (IContentItem contentItem : actionBean.getActionOutputContents()) { - RepositoryFile sourceFile = getRepositoryFileSafe(repo, contentItem.getPath()); - // add metadata if we have access and we have file - if (sourceFile != null) { - Map metadata = repo.getFileMetadata(sourceFile.getId()); - metadata.put(QuartzScheduler.RESERVEDMAPKEY_LINEAGE_ID, lineageId); - repo.setFileMetadata(sourceFile.getId(), metadata); - } else { - String fileName = getFSFileNameSafe(contentItem); - log.warn(Messages.getInstance().getString("ActionAdapterQuartzJob.WARN_0001_SKIP_REMOVING_OUTPUT_FILE", fileName)); - } - } - } - private RepositoryFile getRepositoryFileSafe(IUnifiedRepository repo, String path) { - try { - return repo.getFile(path); - } catch (Exception e) { - log.debug(MessageFormat.format("Cannot get repository file \"{0}\": {1}", path, e.getMessage()), e); - return null; - } - } + final Map jobParams = new HashMap( params ); // shallow copy - private String getFSFileNameSafe(IContentItem contentItem) { - if (contentItem instanceof FileContentItem) { - return ((FileContentItem) contentItem).getFile().getName(); - } - return null; - } - }; - - boolean requiresUpdate = false; - if ((actionUser == null) || (actionUser.equals("system session"))) { //$NON-NLS-1$ - // For now, don't try to run quartz jobs as authenticated if the user - // that created the job is a system user. See PPP-2350 - requiresUpdate = SecurityHelper.getInstance().runAsAnonymous(actionBeanRunner); - } else { - try { - requiresUpdate = SecurityHelper.getInstance().runAsUser(actionUser, actionBeanRunner); - } catch (Throwable t) { - Object restartFlag = jobParams.get(QuartzScheduler.RESERVEDMAPKEY_RESTART_FLAG); - if (restartFlag == null) { - final SimpleJobTrigger trigger = new SimpleJobTrigger(new Date(), null, 0, 0); - final Class iaction = (Class) actionBean.getClass(); - // recreate the job in the context of the original creator - SecurityHelper.getInstance().runAsUser(actionUser, new Callable() { - @Override - public Void call() throws Exception { - if (streamProvider != null) { - streamProvider.setStreamingAction(null); // remove generated content - } - QuartzJobKey jobKey = QuartzJobKey.parse(context.getJobDetail().getName()); - String jobName = jobKey.getJobName(); - jobParams.put(QuartzScheduler.RESERVEDMAPKEY_RESTART_FLAG, Boolean.TRUE); - scheduler.createJob(jobName, iaction, jobParams, trigger, streamProvider); - log.warn("New RunOnce job created for " + jobName + " -> possible startup synchronization error"); - return null; + final IScheduler scheduler = PentahoSystem.getObjectFactory().get( IScheduler.class, "IScheduler2", null ); + if ( throwable != null ) { + Object restartFlag = jobParams.get( QuartzScheduler.RESERVEDMAPKEY_RESTART_FLAG ); + if ( restartFlag == null ) { + final SimpleJobTrigger trigger = new SimpleJobTrigger( new Date(), null, 0, 0 ); + final Class iaction = (Class) actionBean.getClass(); + // recreate the job in the context of the original creator + SecurityHelper.getInstance().runAsUser( actionUser, new Callable() { + @Override + public Void call() throws Exception { + if ( streamProvider != null ) { + streamProvider.setStreamingAction( null ); // remove generated content } - }); - } else { - log.warn("RunOnce already created, skipping"); - throw new Exception(t); - } + QuartzJobKey jobKey = QuartzJobKey.parse( context.getJobDetail().getName() ); + String jobName = jobKey.getJobName(); + jobParams.put( QuartzScheduler.RESERVEDMAPKEY_RESTART_FLAG, Boolean.TRUE ); + WorkItemLifecycleEventUtil.publish( workItemUid, params, WorkItemLifecyclePhase.RESTARTED ); + scheduler.createJob( jobName, iaction, jobParams, trigger, streamProvider ); + log.warn( "New RunOnce job created for " + jobName + " -> possible startup synchronization error" ); + return null; + } + } ); + } else { + log.warn( "RunOnce already created, skipping" ); + throw new Exception( throwable ); } } - scheduler.fireJobCompleted(actionBean, actionUser, params, streamProvider); + scheduler.fireJobCompleted( actionBean, actionUser, params, streamProvider ); - if (requiresUpdate) { - log.warn("Output path for job: " + context.getJobDetail().getName() + " has changed. Job requires update"); + if ( requiresUpdate ) { + log.warn( "Output path for job: " + context.getJobDetail().getName() + " has changed. Job requires update" ); try { - final IJobTrigger trigger = scheduler.getJob(context.getJobDetail().getName()).getJobTrigger(); + final IJobTrigger trigger = scheduler.getJob( context.getJobDetail().getName() ).getJobTrigger(); final Class iaction = (Class) actionBean.getClass(); // remove job with outdated/invalid output path - scheduler.removeJob(context.getJobDetail().getName()); + scheduler.removeJob( context.getJobDetail().getName() ); // recreate the job in the context of the original creator - SecurityHelper.getInstance().runAsUser(actionUser, new Callable() { + SecurityHelper.getInstance().runAsUser( actionUser, new Callable() { @Override public Void call() throws Exception { - streamProvider.setStreamingAction(null); // remove generated content - QuartzJobKey jobKey = QuartzJobKey.parse(context.getJobDetail().getName()); + streamProvider.setStreamingAction( null ); // remove generated content + QuartzJobKey jobKey = QuartzJobKey.parse( context.getJobDetail().getName() ); String jobName = jobKey.getJobName(); + WorkItemLifecycleEventUtil.publish( workItemUid, params, WorkItemLifecyclePhase.RESTARTED ); org.pentaho.platform.api.scheduler2.Job j = - scheduler.createJob(jobName, iaction, jobParams, trigger, streamProvider); - log.warn("New Job: " + j.getJobId() + " created"); + scheduler.createJob( jobName, iaction, jobParams, trigger, streamProvider ); + log.warn( "New Job: " + j.getJobId() + " created" ); return null; } - }); - } catch (Exception e) { - log.error(e.getMessage(), e); + } ); + } catch ( Exception e ) { + log.error( e.getMessage(), e ); } } - if (log.isDebugEnabled()) { - log.debug(MessageFormat.format( + if ( log.isDebugEnabled() ) { + log.debug( MessageFormat.format( "Scheduling system successfully invoked action {0} as user {1} with params [ {2} ]", actionBean //$NON-NLS-1$ - .getClass().getName(), actionUser, QuartzScheduler.prettyPrintMap(params))); - } - - } - - private void sendEmail(Map actionParams, Map params, String filePath) { - try { - IUnifiedRepository repo = PentahoSystem.get(IUnifiedRepository.class); - RepositoryFile sourceFile = repo.getFile(filePath); - // add metadata - Map metadata = repo.getFileMetadata(sourceFile.getId()); - String lineageId = (String) params.get(QuartzScheduler.RESERVEDMAPKEY_LINEAGE_ID); - metadata.put(QuartzScheduler.RESERVEDMAPKEY_LINEAGE_ID, lineageId); - repo.setFileMetadata(sourceFile.getId(), metadata); - // send email - SimpleRepositoryFileData data = repo.getDataForRead(sourceFile.getId(), SimpleRepositoryFileData.class); - // if email is setup and we have tos, then do it - Emailer emailer = new Emailer(); - if (!emailer.setup()) { - // email not configured - return; - } - String to = (String) actionParams.get("_SCH_EMAIL_TO"); - String cc = (String) actionParams.get("_SCH_EMAIL_CC"); - String bcc = (String) actionParams.get("_SCH_EMAIL_BCC"); - if ((to == null || "".equals(to)) && (cc == null || "".equals(cc)) - && (bcc == null || "".equals(bcc))) { - // no destination - return; - } - emailer.setTo(to); - emailer.setCc(cc); - emailer.setBcc(bcc); - emailer.setAttachment(data.getInputStream()); - emailer.setAttachmentName("attachment"); - String attachmentName = (String) actionParams.get("_SCH_EMAIL_ATTACHMENT_NAME"); - if (attachmentName != null && !"".equals(attachmentName)) { - String path = filePath; - if (path.endsWith(".*")) { - path = path.replace(".*", ""); - } - String extension = MimeHelper.getExtension(data.getMimeType()); - if (extension == null) { - extension = ".bin"; - } - if (!attachmentName.endsWith(extension)) { - emailer.setAttachmentName(attachmentName + extension); - } else { - emailer.setAttachmentName(attachmentName); - } - } else if (data != null) { - String path = filePath; - if (path.endsWith(".*")) { - path = path.replace(".*", ""); - } - String extension = MimeHelper.getExtension(data.getMimeType()); - if (extension == null) { - extension = ".bin"; - } - path = path.substring(path.lastIndexOf("/") + 1, path.length()); - if (!path.endsWith(extension)) { - emailer.setAttachmentName(path + extension); - } else { - emailer.setAttachmentName(path); - } - } - if (data == null || data.getMimeType() == null || "".equals(data.getMimeType())) { - emailer.setAttachmentMimeType("binary/octet-stream"); - } else { - emailer.setAttachmentMimeType(data.getMimeType()); - } - String subject = (String) actionParams.get("_SCH_EMAIL_SUBJECT"); - if (subject != null && !"".equals(subject)) { - emailer.setSubject(subject); - } else { - emailer.setSubject("Pentaho Scheduler: " + emailer.getAttachmentName()); - } - String message = (String) actionParams.get("_SCH_EMAIL_MESSAGE"); - if (subject != null && !"".equals(subject)) { - emailer.setBody(message); - } - emailer.send(); - } catch (Exception e) { - log.warn(e.getMessage(), e); + .getClass().getName(), actionUser, QuartzScheduler.prettyPrintMap( params ) ) ); } } @@ -498,4 +266,11 @@ public LoggingJobExecutionException(String msg, Throwable t) { } + public IActionInvoker getActionInvoker() { + return actionInvoker; + } + + public void setActionInvoker( IActionInvoker actionInvoker ) { + this.actionInvoker = actionInvoker; + } } diff --git a/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/QuartzScheduler.java b/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/QuartzScheduler.java index 459fbb2..8225828 100644 --- a/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/QuartzScheduler.java +++ b/pentaho-platform/src/main/java/org/pentaho/platform/scheduler2/quartz/QuartzScheduler.java @@ -12,7 +12,7 @@ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * - * Copyright (c) 2002-2017 Pentaho Corporation.. All rights reserved. + * Copyright (c) 2002-2017 Hitachi Vantara.. All rights reserved. */ package org.pentaho.platform.scheduler2.quartz; @@ -37,6 +37,7 @@ import org.pentaho.platform.scheduler2.recur.*; import org.pentaho.platform.scheduler2.recur.QualifiedDayOfWeek.DayOfWeek; import org.pentaho.platform.scheduler2.recur.QualifiedDayOfWeek.DayOfWeekQualifier; +import org.pentaho.platform.util.ActionUtil; import org.quartz.Calendar; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; @@ -57,19 +58,23 @@ */ public class QuartzScheduler implements IScheduler { - public static final String RESERVEDMAPKEY_ACTIONCLASS = "ActionAdapterQuartzJob-ActionClass"; //$NON-NLS-1$ + public static final String RESERVEDMAPKEY_ACTIONCLASS = ActionUtil.QUARTZ_ACTIONCLASS; - public static final String RESERVEDMAPKEY_ACTIONUSER = "ActionAdapterQuartzJob-ActionUser"; //$NON-NLS-1$ + public static final String RESERVEDMAPKEY_ACTIONUSER = ActionUtil.QUARTZ_ACTIONUSER; - public static final String RESERVEDMAPKEY_ACTIONID = "ActionAdapterQuartzJob-ActionId"; //$NON-NLS-1$ + public static final String RESERVEDMAPKEY_ACTIONID = ActionUtil.QUARTZ_ACTIONID; - public static final String RESERVEDMAPKEY_STREAMPROVIDER = "ActionAdapterQuartzJob-StreamProvider"; //$NON-NLS-1$ + public static final String RESERVEDMAPKEY_STREAMPROVIDER = ActionUtil.QUARTZ_STREAMPROVIDER; - public static final String RESERVEDMAPKEY_UIPASSPARAM = "uiPassParam"; + public static final String RESERVEDMAPKEY_STREAMPROVIDER_INPUTFILE = ActionUtil.QUARTZ_STREAMPROVIDER_INPUT_FILE; - public static final String RESERVEDMAPKEY_LINEAGE_ID = "lineage-id"; + public static final String RESERVEDMAPKEY_UIPASSPARAM = ActionUtil.QUARTZ_UIPASSPARAM; - public static final String RESERVEDMAPKEY_RESTART_FLAG = "ActionAdapterQuartzJob-Restart"; + public static final String RESERVEDMAPKEY_LINEAGE_ID = ActionUtil.QUARTZ_LINEAGE_ID; + + public static final String RESERVEDMAPKEY_RESTART_FLAG = ActionUtil.QUARTZ_RESTART_FLAG; + + public static final String RESERVEDMAPKEY_AUTO_CREATE_UNIQUE_FILENAME = ActionUtil.QUARTZ_AUTO_CREATE_UNIQUE_FILENAME; private static final Log logger = LogFactory.getLog(QuartzScheduler.class); diff --git a/pentaho-platform/src/main/java/org/pentaho/reporting/platform/plugin/connection/PentahoJndiDatasourceConnectionProvider.java b/pentaho-platform/src/main/java/org/pentaho/reporting/platform/plugin/connection/PentahoJndiDatasourceConnectionProvider.java index ff53968..fd965a9 100644 --- a/pentaho-platform/src/main/java/org/pentaho/reporting/platform/plugin/connection/PentahoJndiDatasourceConnectionProvider.java +++ b/pentaho-platform/src/main/java/org/pentaho/reporting/platform/plugin/connection/PentahoJndiDatasourceConnectionProvider.java @@ -12,7 +12,7 @@ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * - * Copyright (c) 2002-2013 Pentaho Corporation.. All rights reserved. + * Copyright (c) 2002-2017 Hitachi Vantara.. All rights reserved. */ package org.pentaho.reporting.platform.plugin.connection; diff --git a/pom.xml b/pom.xml index cb63f3c..3905646 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.github.zhicwu pdi-cluster - 7.1.0.5-SNAPSHOT + 8.0.0.0-SNAPSHOT pom PDI Cluster Instructions and workarounds for building a cluster using Pentaho BA server and Kettle. @@ -17,7 +17,7 @@ - 7.1.0.5-70 + 8.0.0.0-1 UTF-8 ${project.basedir} 3.5.1