Skip to content

Commit

Permalink
[AMBARI-26135] Upgrade net.sf.ehcache to 3.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep318kumar committed Oct 1, 2024
1 parent 7f5b225 commit 1e5d384
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 261 deletions.
2 changes: 1 addition & 1 deletion ambari-metrics-timelineservice/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>net.sf.ehcache</groupId>
<groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>2.10.0</version>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,7 @@ public class TimelineMetricConfiguration {

public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS = "timeline.metrics.collector.ignite.nodes.backups";

public static final String INTERNAL_CACHE_HEAP_PERCENT =
"timeline.metrics.internal.cache.%s.heap.percent";
public static final String INTERNAL_CACHE_ENTRY_COUNT = "timeline.metrics.internal.cache.%s.entry.count";

public static final String EXTERNAL_SINK_INTERVAL =
"timeline.metrics.external.sink.%s.%s.interval";
Expand Down Expand Up @@ -628,12 +627,12 @@ public List<ExternalSinkProvider> getExternalSinkProviderList() {
return providerList;
}

public String getInternalCacheHeapPercent(String instanceName) {
String heapPercent = metricsConf.get(String.format(INTERNAL_CACHE_HEAP_PERCENT, instanceName));
if (StringUtils.isEmpty(heapPercent)) {
return "5%";
public Integer getInternalCacheSize(String instanceName) {
String cacheEntryCount = metricsConf.get(String.format(INTERNAL_CACHE_ENTRY_COUNT, instanceName));
if(StringUtils.isEmpty(cacheEntryCount)) {
return 500;
} else {
return heapPercent.endsWith("%") ? heapPercent : heapPercent + "%";
return Integer.parseInt(cacheEntryCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,39 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.EnumSet;
import java.util.Iterator;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;

import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheException;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.config.PersistenceConfiguration;
import net.sf.ehcache.config.SizeOfPolicyConfiguration;
import net.sf.ehcache.event.CacheEventListener;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
import org.ehcache.Cache;
import org.ehcache.CacheManager;
import org.ehcache.config.CacheConfiguration;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.EntryUnit;
import org.ehcache.event.CacheEvent;
import org.ehcache.event.CacheEventListener;
import org.ehcache.event.EventType;
import org.ehcache.event.EventFiring;
import org.ehcache.event.EventOrdering;
import org.ehcache.expiry.Expirations;

public class InternalMetricsCache {
private static final Log LOG = LogFactory.getLog(InternalMetricsCache.class);
private final String instanceName;
private final String maxHeapPercent;
private final Integer internalCacheEntryCount;
private volatile boolean isCacheInitialized = false;
private Cache cache;
static final String TIMELINE_METRIC_CACHE_MANAGER_NAME = "internalMetricsCacheManager";
private Cache<InternalMetricCacheKey, InternalMetricCacheValue> cache;
private final Lock lock = new ReentrantLock();
private static final int LOCK_TIMEOUT_SECONDS = 2;

public InternalMetricsCache(String instanceName, String maxHeapPercent) {
public InternalMetricsCache(String instanceName, Integer internalCacheEntryCount) {
this.instanceName = instanceName;
this.maxHeapPercent = maxHeapPercent;
this.internalCacheEntryCount = internalCacheEntryCount;
initialize();
}

Expand All @@ -63,69 +66,48 @@ private void initialize() {
throw new RuntimeException("Cannot initialize internal cache twice");
}

System.setProperty("net.sf.ehcache.skipUpdateCheck", "true");
System.setProperty("net.sf.ehcache.sizeofengine." + TIMELINE_METRIC_CACHE_MANAGER_NAME,
"org.apache.ambari.metrics.core.timeline.source.cache.InternalMetricsCacheSizeOfEngine");

net.sf.ehcache.config.Configuration managerConfig =
new net.sf.ehcache.config.Configuration();
managerConfig.setName(TIMELINE_METRIC_CACHE_MANAGER_NAME);

// Set max heap available to the cache manager
managerConfig.setMaxBytesLocalHeap(maxHeapPercent);

//Create a singleton CacheManager using defaults
CacheManager manager = CacheManager.create(managerConfig);

LOG.info("Creating Metrics Cache with maxHeapPercent => " + maxHeapPercent);
CacheManager manager = CacheManagerBuilder.newCacheManagerBuilder()
.build(true);

// Create a Cache specifying its configuration.
CacheConfiguration cacheConfiguration = new CacheConfiguration()
.name(instanceName)
.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
.sizeOfPolicy(new SizeOfPolicyConfiguration() // Set sizeOf policy to continue on max depth reached - avoid OOM
.maxDepth(10000)
.maxDepthExceededBehavior(SizeOfPolicyConfiguration.MaxDepthExceededBehavior.CONTINUE))
.eternal(true) // infinite time until eviction
.persistence(new PersistenceConfiguration()
.strategy(PersistenceConfiguration.Strategy.NONE.name()));

cache = new Cache(cacheConfiguration);
cache.getCacheEventNotificationService().registerListener(new InternalCacheEvictionListener());
CacheConfiguration<InternalMetricCacheKey, InternalMetricCacheValue> cacheConfig =
CacheConfigurationBuilder.newCacheConfigurationBuilder(
InternalMetricCacheKey.class, InternalMetricCacheValue.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().heap(internalCacheEntryCount, EntryUnit.ENTRIES)
).withExpiry(Expirations.noExpiration()).build();

LOG.info("Registering internal metrics cache with provider: name = " +
cache.getName() + ", guid: " + cache.getGuid());
cache = manager.createCache(instanceName, cacheConfig);
cache.getRuntimeConfiguration().registerCacheEventListener(new InternalCacheEvictionListener(), EventOrdering.ORDERED, EventFiring.SYNCHRONOUS, EnumSet.of(EventType.EVICTED));

manager.addCache(cache);
LOG.info("Registering internal metrics cache with provider: name = " +
instanceName);

isCacheInitialized = true;
}

public InternalMetricCacheValue getInternalMetricCacheValue(InternalMetricCacheKey key) {
Element ele = cache.get(key);
if (ele != null) {
return (InternalMetricCacheValue) ele.getObjectValue();
}
return null;
return cache.get(key);
}

public Collection<TimelineMetrics> evictAll() {
TimelineMetrics metrics = new TimelineMetrics();
try {
if (lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try{
List keys = cache.getKeys();
for (Object obj : keys) {
try {
Iterator<Cache.Entry<InternalMetricCacheKey, InternalMetricCacheValue>> iterator = cache.iterator();
while (iterator.hasNext()) {
Cache.Entry<InternalMetricCacheKey, InternalMetricCacheValue> entry = iterator.next();
TimelineMetric metric = new TimelineMetric();
InternalMetricCacheKey key = (InternalMetricCacheKey) obj;
InternalMetricCacheKey key = entry.getKey();
metric.setMetricName(key.getMetricName());
metric.setAppId(key.getAppId());
metric.setInstanceId(key.getInstanceId());
metric.setHostName(key.getHostname());
metric.setStartTime(key.getStartTime());
Element ele = cache.get(key);
metric.setMetricValues(((InternalMetricCacheValue) ele.getObjectValue()).getMetricValues());
InternalMetricCacheValue value = cache.get(key);
metric.setMetricValues(value.getMetricValues());
metrics.getMetrics().add(metric);
iterator.remove();
}
cache.removeAll();
} finally {
Expand Down Expand Up @@ -157,14 +139,13 @@ public void putAll(Collection<TimelineMetrics> metrics) {
timelineMetric.getStartTime()
);

Element ele = cache.get(key);
if (ele != null) {
InternalMetricCacheValue value = (InternalMetricCacheValue) ele.getObjectValue();
InternalMetricCacheValue value = cache.get(key);
if (value != null) {
value.addMetricValues(timelineMetric.getMetricValues());
} else {
InternalMetricCacheValue value = new InternalMetricCacheValue();
value = new InternalMetricCacheValue();
value.setMetricValues(timelineMetric.getMetricValues());
cache.put(new Element(key, value));
cache.put(key, value);
}
}
}
Expand All @@ -181,49 +162,14 @@ public void putAll(Collection<TimelineMetrics> metrics) {
}
}

class InternalCacheEvictionListener implements CacheEventListener {

@Override
public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
// expected
}

@Override
public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
// do nothing
}

class InternalCacheEvictionListener implements CacheEventListener<InternalMetricCacheKey, InternalMetricCacheValue> {
@Override
public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
// do nothing
}

@Override
public void notifyElementExpired(Ehcache cache, Element element) {
// do nothing
}

@Override
public void notifyElementEvicted(Ehcache cache, Element element) {
// Bad - Remote endpoint cannot keep up resulting in flooding
InternalMetricCacheKey key = (InternalMetricCacheKey) element.getObjectKey();
LOG.warn("Evicting element from internal metrics cache, metric => " + key
.getMetricName() + ", startTime = " + new Date(key.getStartTime()));
}

@Override
public void notifyRemoveAll(Ehcache cache) {
// expected
}

@Override
public Object clone() throws CloneNotSupportedException {
return null;
}

@Override
public void dispose() {
// do nothing
public void onEvent(CacheEvent<? extends InternalMetricCacheKey, ? extends InternalMetricCacheValue> event) {
if (event.getType() == EventType.EVICTED) {
InternalMetricCacheKey key = event.getKey();
LOG.warn("Evicting element from internal metrics cache, metric => " + key
.getMetricName() + ", startTime = " + new Date(key.getStartTime()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public InternalMetricsCache getCacheInstance(String instanceName) {
} else {
TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
InternalMetricsCache cache = new InternalMetricsCache(instanceName,
conf.getInternalCacheHeapPercent(instanceName));
conf.getInternalCacheSize(instanceName));

metricsCacheMap.put(instanceName, cache);
return cache;
Expand Down
Loading

0 comments on commit 1e5d384

Please sign in to comment.