Skip to content

Commit

Permalink
Use guava cache for CassandraMetrics (#536)
Browse files Browse the repository at this point in the history
Closes #535
  • Loading branch information
masokol authored Aug 9, 2023
1 parent e326b2f commit c6bcb16
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,7 @@ public ECChronosInternals(final Config configuration,

myReplicatedTableProvider = new ReplicatedTableProviderImpl(node, session, myTableReferenceFactory);

myCassandraMetrics = CassandraMetrics.builder()
.withJmxProxyFactory(myJmxProxyFactory)
.withReplicatedTableProvider(myReplicatedTableProvider)
.build();
myCassandraMetrics = new CassandraMetrics(myJmxProxyFactory);

if (configuration.getStatisticsConfig().isEnabled())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,72 +15,71 @@

package com.ericsson.bss.cassandra.ecchronos.core;

import com.ericsson.bss.cassandra.ecchronos.core.utils.ReplicatedTableProvider;
import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.utils.logging.ThrottlingLogger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* Used to fetch metrics from Cassandra through JMX and keep them updated.
*/
public class CassandraMetrics implements Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(CassandraMetrics.class);
private static final long DEFAULT_INITIAL_DELAY_IN_MS = 0;
private static final long DEFAULT_UPDATE_DELAY_IN_MS = TimeUnit.SECONDS.toMillis(60);
private static final ThrottlingLogger THROTTLED_LOGGER = new ThrottlingLogger(LOG, 5, TimeUnit.MINUTES);
private static final long DEFAULT_CACHE_EXPIRY_TIME_MINUTES = 30;
private static final long DEFAULT_CACHE_REFRESH_TIME_SECONDS = 30;

private final AtomicReference<ImmutableMap<TableReference, Double>> myPercentRepaired = new AtomicReference<>();
private final AtomicReference<ImmutableMap<TableReference, Long>> myMaxRepairedAt = new AtomicReference<>();
private final ScheduledExecutorService myScheduledExecutorService;
private final ReplicatedTableProvider myReplicatedTableProvider;
private final LoadingCache<TableReference, CassandraMetric> myCache;
private final JmxProxyFactory myJmxProxyFactory;

CassandraMetrics(final Builder builder)
public CassandraMetrics(final JmxProxyFactory jmxProxyFactory)
{
this(jmxProxyFactory, Duration.ofSeconds(DEFAULT_CACHE_REFRESH_TIME_SECONDS),
Duration.ofMinutes(DEFAULT_CACHE_EXPIRY_TIME_MINUTES));
}
public CassandraMetrics(final JmxProxyFactory jmxProxyFactory, final Duration refreshAfter,
final Duration expireAfter)
{
myReplicatedTableProvider = Preconditions.checkNotNull(builder.myReplicatedTableProvider,
"Replicated table provider must be set");
myJmxProxyFactory = Preconditions.checkNotNull(builder.myJmxProxyFactory,
"JMX proxy factory must be set");
myScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("CassandraMetricsUpdater-%d").build());
myScheduledExecutorService.scheduleAtFixedRate(this::updateMetrics,
builder.myInitialDelayInMs,
builder.myUpdateDelayInMs,
TimeUnit.MILLISECONDS);
myJmxProxyFactory = Preconditions.checkNotNull(jmxProxyFactory, "JMX proxy factory must be set");
myCache = CacheBuilder.newBuilder()
.refreshAfterWrite(Preconditions.checkNotNull(refreshAfter, "Refresh after must be set"))
.expireAfterAccess(Preconditions.checkNotNull(expireAfter, "Expire after must be set"))
.build(new CacheLoader<>()
{
@Override
public CassandraMetric load(final TableReference key) throws Exception
{
return getMetrics(key);
}
});
}

@VisibleForTesting
final void updateMetrics()
final void refreshCache(final TableReference tableReference)
{
try (JmxProxy jmxProxy = myJmxProxyFactory.connect())
{
Map<TableReference, Double> tablesPercentRepaired = new HashMap<>();
Map<TableReference, Long> tablesMaxRepairedAt = new HashMap<>();
for (TableReference tableReference : myReplicatedTableProvider.getAll())
{
long maxRepairedAt = jmxProxy.getMaxRepairedAt(tableReference);
tablesMaxRepairedAt.put(tableReference, maxRepairedAt);
myCache.refresh(tableReference);
}

double percentRepaired = jmxProxy.getPercentRepaired(tableReference);
tablesPercentRepaired.put(tableReference, percentRepaired);
LOG.debug("{}, maxRepairedAt: {}, percentRepaired: {}", tableReference, maxRepairedAt, percentRepaired);
}
myPercentRepaired.set(ImmutableMap.copyOf(tablesPercentRepaired));
myMaxRepairedAt.set(ImmutableMap.copyOf(tablesMaxRepairedAt));
}
catch (IOException e)
private CassandraMetric getMetrics(final TableReference tableReference) throws IOException
{
try (JmxProxy jmxProxy = myJmxProxyFactory.connect())
{
LOG.error("Unable to update Cassandra metrics, future metrics might contain stale data", e);
long maxRepairedAt = jmxProxy.getMaxRepairedAt(tableReference);
double percentRepaired = jmxProxy.getPercentRepaired(tableReference);
LOG.debug("{}, maxRepairedAt: {}, percentRepaired: {}", tableReference, maxRepairedAt, percentRepaired);
return new CassandraMetric(percentRepaired, maxRepairedAt);
}
}

Expand All @@ -91,12 +90,16 @@ final void updateMetrics()
*/
public long getMaxRepairedAt(final TableReference tableReference)
{
ImmutableMap<TableReference, Long> maxRepairedAt = myMaxRepairedAt.get();
if (maxRepairedAt != null && maxRepairedAt.containsKey(tableReference))
try
{
CassandraMetric cassandraMetric = myCache.get(tableReference);
return cassandraMetric.myMaxRepairedAt;
}
catch (ExecutionException e)
{
return maxRepairedAt.get(tableReference);
THROTTLED_LOGGER.error("Failed to get CassandraMetric, future metrics might contain stale values", e);
return 0L;
}
return 0L;
}

/**
Expand All @@ -106,61 +109,37 @@ public long getMaxRepairedAt(final TableReference tableReference)
*/
public double getPercentRepaired(final TableReference tableReference)
{
ImmutableMap<TableReference, Double> percentRepaired = myPercentRepaired.get();
if (percentRepaired != null && percentRepaired.containsKey(tableReference))
try
{
return percentRepaired.get(tableReference);
CassandraMetric cassandraMetric = myCache.get(tableReference);
return cassandraMetric.myPercentRepaired;
}
catch (ExecutionException e)
{
THROTTLED_LOGGER.error("Failed to get CassandraMetric, future metrics might contain stale values", e);
return 0.0d;
}
return 0.0d;
}

/**
* Cleans the cache.
*/
@Override
public final void close()
public void close()
{
myScheduledExecutorService.shutdown();
myPercentRepaired.set(null);
myMaxRepairedAt.set(null);
myCache.invalidateAll();
myCache.cleanUp();
}

public static Builder builder()
private class CassandraMetric
{
return new Builder();
}

public static class Builder
{
private ReplicatedTableProvider myReplicatedTableProvider;
private JmxProxyFactory myJmxProxyFactory;
private long myInitialDelayInMs = DEFAULT_INITIAL_DELAY_IN_MS;
private long myUpdateDelayInMs = DEFAULT_UPDATE_DELAY_IN_MS;

public final Builder withReplicatedTableProvider(final ReplicatedTableProvider replicatedTableProvider)
{
myReplicatedTableProvider = replicatedTableProvider;
return this;
}

public final Builder withJmxProxyFactory(final JmxProxyFactory jmxProxyFactory)
{
myJmxProxyFactory = jmxProxyFactory;
return this;
}

public final Builder withInitialDelay(final long initialDelay, final TimeUnit timeUnit)
{
myInitialDelayInMs = timeUnit.toMillis(initialDelay);
return this;
}

public final Builder withUpdateDelay(final long updateDelay, final TimeUnit timeUnit)
{
myUpdateDelayInMs = timeUnit.toMillis(updateDelay);
return this;
}
private final double myPercentRepaired;
private final long myMaxRepairedAt;

public final CassandraMetrics build()
CassandraMetric(final Double percentRepaired, final Long maxRepairedAt)
{
return new CassandraMetrics(this);
myPercentRepaired = percentRepaired;
myMaxRepairedAt = maxRepairedAt;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package com.ericsson.bss.cassandra.ecchronos.core;

import com.ericsson.bss.cassandra.ecchronos.core.utils.ReplicatedTableProvider;
import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference;
import org.junit.After;
import org.junit.Before;
Expand All @@ -25,15 +24,14 @@
import org.mockito.runners.MockitoJUnitRunner;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.ericsson.bss.cassandra.ecchronos.core.MockTableReferenceFactory.tableReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class TestCassandraMetrics
Expand All @@ -42,22 +40,14 @@ public class TestCassandraMetrics
private JmxProxyFactory myJmxProxyFactoryMock;
@Mock
private JmxProxy myJmxProxyMock;
@Mock
private ReplicatedTableProvider myReplicatedTableProviderMock;

private Set<TableReference> myReplicatedTables = new HashSet<>();
private CassandraMetrics myCassandraMetrics;

@Before
public void setup() throws Exception
{
doReturn(myJmxProxyMock).when(myJmxProxyFactoryMock).connect();
doReturn(myReplicatedTables).when(myReplicatedTableProviderMock).getAll();
myCassandraMetrics = CassandraMetrics.builder()
.withJmxProxyFactory(myJmxProxyFactoryMock)
.withReplicatedTableProvider(myReplicatedTableProviderMock)
.withInitialDelay(60, TimeUnit.SECONDS)
.build();
myCassandraMetrics = new CassandraMetrics(myJmxProxyFactoryMock);
}

@After
Expand Down Expand Up @@ -85,10 +75,9 @@ public void testUpdateMetricsUnableToConnectToJmx() throws IOException

mockTable(tableReference, maxRepairedAt, percentRepaired);

myCassandraMetrics.updateMetrics();

assertThat(myCassandraMetrics.getMaxRepairedAt(tableReference)).isEqualTo(0L);
assertThat(myCassandraMetrics.getPercentRepaired(tableReference)).isEqualTo(0.0d);
verify(myJmxProxyFactoryMock, times(2)).connect();
}

@Test
Expand All @@ -103,46 +92,41 @@ public void testUpdateMetricsTwiceUnableToConnectToJmxSecondTime() throws IOExce

mockTable(tableReference, firstMaxRepairedAt, firstPercentRepaired);

myCassandraMetrics.updateMetrics();

assertThat(myCassandraMetrics.getMaxRepairedAt(tableReference)).isEqualTo(firstMaxRepairedAt);
assertThat(myCassandraMetrics.getPercentRepaired(tableReference)).isEqualTo(firstPercentRepaired);

doThrow(IOException.class).when(myJmxProxyFactoryMock).connect();

mockTable(tableReference, secondMaxRepairedAt, secondPercentRepaired);

myCassandraMetrics.updateMetrics();
myCassandraMetrics.refreshCache(tableReference);

assertThat(myCassandraMetrics.getMaxRepairedAt(tableReference)).isEqualTo(firstMaxRepairedAt);
assertThat(myCassandraMetrics.getPercentRepaired(tableReference)).isEqualTo(firstPercentRepaired);
verify(myJmxProxyFactoryMock, times(2)).connect();
}

@Test
public void testUpdateMetricsForMultipleTables()
public void testUpdateMetricsForMultipleTables() throws IOException
{
TableReference firstTable = tableReference("keyspace", "table");
long firstTableMaxRepairedAt = 1234L;
double firstTablePercentRepaired = 0.5d;
mockTable(firstTable, firstTableMaxRepairedAt, firstTablePercentRepaired);

assertThat(myCassandraMetrics.getMaxRepairedAt(firstTable)).isEqualTo(firstTableMaxRepairedAt);
assertThat(myCassandraMetrics.getPercentRepaired(firstTable)).isEqualTo(firstTablePercentRepaired);

TableReference secondTable = tableReference("keyspace", "table2");
long secondTableMaxRepairedAt = 2345L;
double secondTablePercentRepaired = 1.0d;
mockTable(secondTable, secondTableMaxRepairedAt, secondTablePercentRepaired);

myCassandraMetrics.updateMetrics();

assertThat(myCassandraMetrics.getMaxRepairedAt(firstTable)).isEqualTo(firstTableMaxRepairedAt);
assertThat(myCassandraMetrics.getPercentRepaired(firstTable)).isEqualTo(firstTablePercentRepaired);

assertThat(myCassandraMetrics.getMaxRepairedAt(secondTable)).isEqualTo(secondTableMaxRepairedAt);
assertThat(myCassandraMetrics.getPercentRepaired(secondTable)).isEqualTo(secondTablePercentRepaired);
verify(myJmxProxyFactoryMock, times(2)).connect();
}

private void mockTable(TableReference tableReference, long maxRepairedAt, double percentRepaired)
{
myReplicatedTables.add(tableReference);
doReturn(maxRepairedAt).when(myJmxProxyMock).getMaxRepairedAt(eq(tableReference));
doReturn(percentRepaired).when(myJmxProxyMock).getPercentRepaired(eq(tableReference));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import javax.management.MalformedObjectNameException;
import javax.management.ReflectionException;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -122,12 +123,8 @@ public static void init(Boolean remoteRoutingOption) throws IOException
myScheduleManagerImpl = ScheduleManagerImpl.builder().withLockFactory(myLockFactory)
.withRunInterval(100, TimeUnit.MILLISECONDS).build();

myCassandraMetrics = CassandraMetrics.builder()
.withJmxProxyFactory(getJmxProxyFactory())
.withReplicatedTableProvider(new ReplicatedTableProviderImpl(localNode, session, myTableReferenceFactory))
.withInitialDelay(0, TimeUnit.MILLISECONDS)
.withUpdateDelay(5, TimeUnit.SECONDS)
.build();
myCassandraMetrics = new CassandraMetrics(getJmxProxyFactory(),
Duration.ofSeconds(5), Duration.ofMinutes(30));

myOnDemandRepairSchedulerImpl = OnDemandRepairSchedulerImpl.builder().withJmxProxyFactory(getJmxProxyFactory())
.withTableRepairMetrics(mockTableRepairMetrics).withScheduleManager(myScheduleManagerImpl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import javax.management.MalformedObjectNameException;
import javax.management.ReflectionException;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -133,12 +134,8 @@ public static void init(Boolean remoteRoutingOption) throws IOException
.withRunInterval(1, TimeUnit.SECONDS)
.build();

myCassandraMetrics = CassandraMetrics.builder()
.withJmxProxyFactory(getJmxProxyFactory())
.withReplicatedTableProvider(new ReplicatedTableProviderImpl(localNode, session, myTableReferenceFactory))
.withInitialDelay(0, TimeUnit.MILLISECONDS)
.withUpdateDelay(CASSANDRA_METRICS_UPDATE_IN_SECONDS, TimeUnit.SECONDS)
.build();
myCassandraMetrics = new CassandraMetrics(getJmxProxyFactory(),
Duration.ofSeconds(CASSANDRA_METRICS_UPDATE_IN_SECONDS), Duration.ofMinutes(30));

myRepairSchedulerImpl = RepairSchedulerImpl.builder()
.withJmxProxyFactory(getJmxProxyFactory())
Expand Down

0 comments on commit c6bcb16

Please sign in to comment.