Skip to content

Commit

Permalink
Periodic storage consistency checks (#1885)
Browse files Browse the repository at this point in the history
* add ability to perform periodic storage checks

* add periodic consistency check spec

* register gauge based on state object
  • Loading branch information
moscicky authored Aug 8, 2024
1 parent 0bf4c0c commit 3b02b2b
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;

import java.util.function.ToDoubleFunction;


public class ConsistencyMetrics {
private final MeterRegistry meterRegistry;

ConsistencyMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;

}

public <T> void registerStorageConsistencyGauge(T stateObject, ToDoubleFunction<T> valueFunction) {
meterRegistry.gauge("storage.consistency", stateObject, valueFunction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class MetricsFacade {
private final OffsetCommitsMetrics offsetCommitsMetrics;
private final MaxRateMetrics maxRateMetrics;
private final BrokerMetrics brokerMetrics;
private final ConsistencyMetrics consistencyMetrics;

public MetricsFacade(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
Expand All @@ -45,6 +46,7 @@ public MetricsFacade(MeterRegistry meterRegistry) {
this.offsetCommitsMetrics = new OffsetCommitsMetrics(meterRegistry);
this.maxRateMetrics = new MaxRateMetrics(meterRegistry);
this.brokerMetrics = new BrokerMetrics(meterRegistry);
this.consistencyMetrics = new ConsistencyMetrics(meterRegistry);
}

public TopicMetrics topics() {
Expand Down Expand Up @@ -107,6 +109,10 @@ public BrokerMetrics broker() {
return brokerMetrics;
}

public ConsistencyMetrics consistency() {
return consistencyMetrics;
}

public void unregisterAllMetricsRelatedTo(SubscriptionName subscription) {
Collection<Meter> meters = Search.in(meterRegistry)
.tags(subscriptionTags(subscription))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

import org.springframework.boot.context.properties.ConfigurationProperties;

import java.time.Duration;

@ConfigurationProperties(prefix = "consistency-checker")
public class ConsistencyCheckerProperties {

private int threadPoolSize = 2;
private boolean periodicCheckEnabled = false;
private Duration refreshInterval = Duration.ofMinutes(15);
private Duration initialRefreshDelay = Duration.ofMinutes(2);

public int getThreadPoolSize() {
return threadPoolSize;
Expand All @@ -14,4 +19,30 @@ public int getThreadPoolSize() {
public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}


public boolean isPeriodicCheckEnabled() {
return periodicCheckEnabled;
}

public void setPeriodicCheckEnabled(boolean periodicCheckEnabled) {
this.periodicCheckEnabled = periodicCheckEnabled;
}


public Duration getRefreshInterval() {
return refreshInterval;
}

public void setRefreshInterval(Duration refreshInterval) {
this.refreshInterval = refreshInterval;
}

public Duration getInitialRefreshDelay() {
return initialRefreshDelay;
}

public void setInitialRefreshDelay(Duration initialRefreshDelay) {
this.initialRefreshDelay = initialRefreshDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.Group;
import pl.allegro.tech.hermes.api.InconsistentGroup;
Expand All @@ -13,6 +15,7 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.domain.group.GroupNotExistsException;
import pl.allegro.tech.hermes.domain.group.GroupRepository;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
Expand All @@ -31,6 +34,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand All @@ -39,15 +45,20 @@

@Component
public class DcConsistencyService {
private static final Logger logger = LoggerFactory.getLogger(DcConsistencyService.class);

private final ExecutorService executor;
private final ScheduledExecutorService scheduler;
private final List<DatacenterBoundRepositoryHolder<GroupRepository>> groupRepositories;
private final List<DatacenterBoundRepositoryHolder<TopicRepository>> topicRepositories;
private final List<DatacenterBoundRepositoryHolder<SubscriptionRepository>> subscriptionRepositories;
private final ObjectMapper objectMapper;
private final AtomicBoolean isStorageConsistent = new AtomicBoolean(true);

public DcConsistencyService(RepositoryManager repositoryManager,
ObjectMapper objectMapper,
ConsistencyCheckerProperties properties) {
ConsistencyCheckerProperties properties,
MetricsFacade metricsFacade) {
this.groupRepositories = repositoryManager.getRepositories(GroupRepository.class);
this.topicRepositories = repositoryManager.getRepositories(TopicRepository.class);
this.subscriptionRepositories = repositoryManager.getRepositories(SubscriptionRepository.class);
Expand All @@ -58,11 +69,33 @@ public DcConsistencyService(RepositoryManager repositoryManager,
.setNameFormat("consistency-checker-%d")
.build()
);
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("consistency-checker-scheduler-%d")
.build()
);
if (properties.isPeriodicCheckEnabled()) {
scheduler.scheduleAtFixedRate(this::reportConsistency,
properties.getInitialRefreshDelay().getSeconds(),
properties.getRefreshInterval().getSeconds(),
TimeUnit.SECONDS);
metricsFacade.consistency().registerStorageConsistencyGauge(isStorageConsistent, isConsistent -> isConsistent.get() ? 1 : 0);
}
}

@PreDestroy
public void stop() {
executor.shutdown();
scheduler.shutdown();
}

private void reportConsistency() {
long start = System.currentTimeMillis();
Set<String> groups = listAllGroupNames();
List<InconsistentGroup> inconsistentGroups = listInconsistentGroups(groups);
long durationSeconds = (System.currentTimeMillis() - start) / 1000;
logger.info("Consistency check finished in {}s, number of inconsistent groups: {}", durationSeconds, inconsistentGroups.size());
isStorageConsistent.set(inconsistentGroups.isEmpty());
}

public List<InconsistentGroup> listInconsistentGroups(Set<String> groupNames) {
Expand Down Expand Up @@ -208,4 +241,6 @@ private <T> T resolveFuture(Future<T> future) {
throw new ConsistencyCheckingException("Fetching metadata failed", e);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package pl.allegro.tech.hermes.management.domain.consistency

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import pl.allegro.tech.hermes.api.Group
import pl.allegro.tech.hermes.api.Subscription
import pl.allegro.tech.hermes.api.Topic
import pl.allegro.tech.hermes.common.metric.MetricsFacade
import pl.allegro.tech.hermes.management.config.ConsistencyCheckerProperties
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import java.time.Duration

import static pl.allegro.tech.hermes.test.helper.builder.GroupBuilder.group
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription
Expand All @@ -15,6 +20,8 @@ import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic
class DcConsistencyServiceSpec extends Specification {

def objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
def meterRegistry = new SimpleMeterRegistry()
def metricsFacade = new MetricsFacade(meterRegistry)

def "should return empty list when given groups are consistent"() {
given:
Expand All @@ -30,8 +37,8 @@ class DcConsistencyServiceSpec extends Specification {
.addGroup(group)
.addTopic(topic)
.addSubscription(subscription)
DcConsistencyService dcConsistencyService = new DcConsistencyService(repositoryManager, objectMapper,
new ConsistencyCheckerProperties())
DcConsistencyService dcConsistencyService = new DcConsistencyService(repositoryManager,
objectMapper, new ConsistencyCheckerProperties(), metricsFacade)

when:
def inconsistentGroups = dcConsistencyService.listInconsistentGroups([group.groupName] as Set)
Expand All @@ -48,8 +55,8 @@ class DcConsistencyServiceSpec extends Specification {
repositoryManager.datacenter("dc2")
.addGroup(group("testGroup").build())
.addGroup(group("testGroup-dc2").build())
DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, objectMapper,
new ConsistencyCheckerProperties())
DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager,
objectMapper, new ConsistencyCheckerProperties(), metricsFacade)

when:
def groups = consistencyService.listInconsistentGroups(["testGroup", "testGroup-dc1", "testGroup-dc2"] as Set)
Expand All @@ -68,7 +75,7 @@ class DcConsistencyServiceSpec extends Specification {
.addGroup(group)
.addTopic(topic(group.groupName, "testTopic").withDescription("dc2").build())
DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, objectMapper,
new ConsistencyCheckerProperties())
new ConsistencyCheckerProperties(), metricsFacade)

when:
def groups = consistencyService.listInconsistentGroups(["testGroup"] as Set)
Expand All @@ -90,7 +97,7 @@ class DcConsistencyServiceSpec extends Specification {
.addTopic(topic)
.addSubscription(subscription(topic, "testSubscription").withDescription("dc2").build())
DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, objectMapper,
new ConsistencyCheckerProperties())
new ConsistencyCheckerProperties(), metricsFacade)

when:
def groups = consistencyService.listInconsistentGroups(["testGroup"] as Set)
Expand All @@ -108,12 +115,64 @@ class DcConsistencyServiceSpec extends Specification {
.addGroup(group("testGroup").build())
.addGroup(group("testGroup-dc2").build())
DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager, objectMapper,
new ConsistencyCheckerProperties())
new ConsistencyCheckerProperties(), metricsFacade)

when:
def groups = consistencyService.listAllGroupNames()

then:
groups == ["testGroup", "testGroup-dc1", "testGroup-dc2"] as Set
}

def "should report storage as not consistent with periodic check"() {
given: "inconsistent storage state"
MockRepositoryManager repositoryManager = new MockRepositoryManager()
repositoryManager.datacenter("dc1")
.addGroup(group("testGroup").build())
.addGroup(group("testGroup-dc1").build())
repositoryManager.datacenter("dc2")
.addGroup(group("testGroup").build())
.addGroup(group("testGroup-dc2").build())

and: "enabled periodic consistency checks"
def properties = new ConsistencyCheckerProperties()
properties.setPeriodicCheckEnabled(true)
properties.setInitialRefreshDelay(Duration.ofMillis(0))

when: "consistency service is created"
DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager,
objectMapper,
properties,
metricsFacade)

then: "storage is reported as not consistent"
new PollingConditions(timeout: 10).eventually {
meterRegistry.get("storage.consistency").gauge().value() == 0.0d
}
}

def "should report storage as consistent with periodic check"() {
given: "consistent storage state"
MockRepositoryManager repositoryManager = new MockRepositoryManager()
repositoryManager.datacenter("dc1")
.addGroup(group("testGroup").build())
repositoryManager.datacenter("dc2")
.addGroup(group("testGroup").build())

and: "enabled periodic consistency checks"
def properties = new ConsistencyCheckerProperties()
properties.setPeriodicCheckEnabled(true)
properties.setInitialRefreshDelay(Duration.ofMillis(0))

when: "consistency service is created"
DcConsistencyService consistencyService = new DcConsistencyService(repositoryManager,
objectMapper,
properties,
metricsFacade)

then: "storage is reported as consistent"
new PollingConditions(timeout: 10).eventually {
meterRegistry.get("storage.consistency").gauge().value() == 1.0d
}
}
}

0 comments on commit 3b02b2b

Please sign in to comment.