Skip to content

Commit

Permalink
Port code from 7.1 back to 6.1
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jun 19, 2017
1 parent ef60057 commit 12ac949
Show file tree
Hide file tree
Showing 12 changed files with 622 additions and 730 deletions.
6 changes: 0 additions & 6 deletions pentaho-kettle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
<main.basedir>${project.parent.basedir}</main.basedir>
<servlet.version>3.1.0</servlet.version>
<jersey.version>1.16</jersey.version>
<oshi.version>3.2</oshi.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -52,11 +51,6 @@
<artifactId>pdi-pur-plugin</artifactId>
<version>${pentaho-ce.version}</version>
</dependency>
<dependency>
<groupId>com.github.dblock</groupId>
<artifactId>oshi-core</artifactId>
<version>${oshi.version}</version>
</dependency>
</dependencies>
<licenses>
<license>
Expand Down
325 changes: 205 additions & 120 deletions pentaho-kettle/src/main/java/org/pentaho/di/trans/Trans.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

package org.pentaho.di.trans.steps.userdefinedjavaclass;

import com.google.common.base.Charsets;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.hash.Hashing;
import org.codehaus.janino.ClassBodyEvaluator;
import org.codehaus.janino.CompileException;
import org.codehaus.janino.Parser.ParseException;
Expand Down Expand Up @@ -55,10 +59,25 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class UserDefinedJavaClassMeta extends BaseStepMeta implements StepMetaInterface {
private static Class<?> PKG = UserDefinedJavaClassMeta.class; // for i18n purposes, needed by Translator2!!

private static final int KETTLE_CLASS_CACHE_SIZE
= Integer.parseInt(System.getProperty("KETTLE_LISTEN_ANY_LOCAL_ADDRESS", "50"));
private static final int KETTLE_CLASS_CACHE_EXPIRE_MINUTE
= Integer.parseInt(System.getProperty("KETTLE_CLASS_CACHE_EXPIRE_MINUTE", "61"));

private static final Cache<Long, Class> classCache = CacheBuilder.newBuilder()
.maximumSize(KETTLE_CLASS_CACHE_SIZE)
.expireAfterAccess(KETTLE_CLASS_CACHE_EXPIRE_MINUTE, TimeUnit.MINUTES)
.recordStats()
.build();

public enum ElementNames {
class_type, class_name, class_source, definitions, definition, fields, field, field_name, field_type,
field_length, field_precision, clear_result_fields,
Expand Down Expand Up @@ -103,6 +122,21 @@ public Object clone() throws CloneNotSupportedException {
}
}

public static String getCacheStats() {
StringBuilder sb = new StringBuilder(classCache.stats().toString());

try {
Map<Long, Class> map = classCache.asMap();
for (Map.Entry<Long, Class> entry : map.entrySet()) {
sb.append(Const.CR).append(entry.getKey()).append(':').append(entry.getValue());
}
} catch (Exception e) {
// ignore
}

return sb.toString();
}

public UserDefinedJavaClassMeta() {
super();
changed = true;
Expand All @@ -111,6 +145,25 @@ public UserDefinedJavaClassMeta() {
usageParameters = new ArrayList<UsageParameter>();
}

private Class<?> loadOrCookClass(UserDefinedJavaClassDef def) throws Exception {
long hash = Hashing.md5().newHasher()
.putString(def.getClassName(), Charsets.UTF_8)
.putString(def.getSource(), Charsets.UTF_8).hash().asLong();
Class clazz = null;
try {
clazz = classCache.get(hash, new Callable<Class>() {
@Override
public Class call() throws Exception {
return cookClass(def);
}
});
} catch (ExecutionException e) {
throw new KettleStepException(e.getCause());
}

return clazz;
}

private Class<?> cookClass(UserDefinedJavaClassDef def) throws CompileException, ParseException,
ScanException, IOException, RuntimeException, KettleStepException {

Expand Down Expand Up @@ -145,7 +198,7 @@ public void cookClasses() {
for (UserDefinedJavaClassDef def : getDefinitions()) {
if (def.isActive()) {
try {
Class<?> cookedClass = cookClass(def);
Class<?> cookedClass = loadOrCookClass(def);
if (def.isTransformClass()) {
cookedTransformClass = (Class<TransformClassBase>) cookedClass;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private CarteSingleton(SlaveServerConfig config) throws KettleException {
detections = new ArrayList<SlaveServerDetection>();
socketRepository = new SocketRepository(log);

installPurgeTimer(config, log, transformationMap, jobMap);
// installPurgeTimer(config, log, transformationMap, jobMap);

SlaveServer slaveServer = config.getSlaveServer();
if (slaveServer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Method;

/**
* Get cache status mainly for three different types of resource: transformation, job and data source.
Expand All @@ -35,12 +36,51 @@ public class GetCacheStatusServlet extends BaseHttpServlet implements CartePlugi

private static final String XML_CONTENT_TYPE = "text/xml";

private static final String UDJC_CLASS_NAME = "org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta";
private static final String UDJC_METHOD_NAME = "getCacheStats";
private static final String DEFAULT_CACHE_STATS = "N/A";
private static final String CLASS_CACHE_NAME = "Class Cache: ";
private static final String JOB_CACHE_NAME = "Job Cache: ";
private static final String TRANS_CACHE_NAME = "Trans Cache: ";
private static final String RESOURCE_CACHE_NAME = "Resource Cache: ";

private static final long serialVersionUID = -519824343678414598L;

public static final String CONTEXT_PATH = "/kettle/cache";
public static final String PARAM_NAME = "name";
public static final String PARAM_INVALIDATE = "invalidate";

private String buildCacheStats() {
StringBuilder sb = new StringBuilder();

sb.append(CLASS_CACHE_NAME);
try {
Class clazz = Class.forName(UDJC_CLASS_NAME);
Method method = clazz.getMethod(UDJC_METHOD_NAME);
sb.append(method.invoke(null));
} catch (Exception e) {
sb.append(DEFAULT_CACHE_STATS);
}

sb.append('\r').append('\n').append(JOB_CACHE_NAME);
try {
sb.append(getJobMap().getStats());
} catch (Exception e) {
sb.append(DEFAULT_CACHE_STATS);
}

sb.append('\r').append('\n').append(TRANS_CACHE_NAME);
try {
sb.append(getTransformationMap().getStats());
} catch (Exception e) {
sb.append(DEFAULT_CACHE_STATS);
}

sb.append('\r').append('\n').append(RESOURCE_CACHE_NAME).append(ServerCache.getStats());

return sb.toString();
}

public GetCacheStatusServlet() {
}

Expand Down Expand Up @@ -74,7 +114,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro
} else { // just about queries
if (applyToAll) {
// keep the id empty, add information into description
result.setMessage(ServerCache.getStats());
result.setMessage(buildCacheStats());
} else {
String identity = ServerCache.getCachedIdentity(resourceName);
if (!Strings.isNullOrEmpty(identity)) {
Expand Down
Loading

0 comments on commit 12ac949

Please sign in to comment.