Skip to content

Commit

Permalink
feat: add opt-in to terminate non-empty topics matching props (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgb authored Oct 28, 2024
1 parent 1380a7b commit 68c4379
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.springframework.validation.annotation.Validated;

import java.util.Collection;
import java.util.Map;
import java.util.regex.Pattern;

@ConfigurationProperties("app")
Expand Down Expand Up @@ -35,6 +36,14 @@ public class ApplicationProperties {
*/
private Collection<Pattern> blessedTopics;

/**
* Can be used to terminate topics even if the topic contains data
* (destructive operation) if the topic is otherwise considered unused.
* The supplied properties will be matched against topic configuration,
* and all properties must match!
*/
private Map<String, String> nonEmptyTopicsMatchingProps;

public String getFixedRateString() {
return fixedRateString;
}
Expand All @@ -58,4 +67,12 @@ public Collection<Pattern> getBlessedTopics() {
public void setBlessedTopics(Collection<Pattern> blessedTopics) {
this.blessedTopics = blessedTopics;
}

public Map<String, String> getNonEmptyTopicsMatchingProps() {
return nonEmptyTopicsMatchingProps;
}

public void setNonEmptyTopicsMatchingProps(Map<String, String> nonEmptyTopicsMatchingProps) {
this.nonEmptyTopicsMatchingProps = nonEmptyTopicsMatchingProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.statnett.k3a.topicterminator.strategy.AndOperation;
import io.statnett.k3a.topicterminator.strategy.BlessedTopic;
import io.statnett.k3a.topicterminator.strategy.ConsumedTopic;
import io.statnett.k3a.topicterminator.strategy.InternalTopic;
import io.statnett.k3a.topicterminator.strategy.NonEmptyTopic;
import io.statnett.k3a.topicterminator.strategy.ReservedIfTopicNotMatchingProps;
import io.statnett.k3a.topicterminator.strategy.ReservedTopic;
import org.apache.kafka.clients.admin.AdminClient;
import org.slf4j.Logger;
Expand All @@ -18,6 +20,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -48,14 +51,23 @@ public void terminateUnusedTopics() throws ExecutionException, InterruptedExcept
.remove(internalTopics())
.remove(blessedTopics(props.getBlessedTopics()))
.remove(consumedTopics())
.remove(nonEmptyTopics())
.remove(
and(
nonEmptyTopics(),
reservedTopicsNotMatchingProps(props.getNonEmptyTopicsMatchingProps())
)
)
.terminate();
}

private TopicTerminatorChain from(TopicProvider topicProvider) {
return new TopicTerminatorChain(topicProvider);
}

private static AndOperation and(ReservedTopic... reservedTopic) {
return new AndOperation(reservedTopic);
}

private class TopicTerminatorChain {
private final TopicProvider topicProvider;
private final List<ReservedTopic> reservedTopics;
Expand Down Expand Up @@ -129,4 +141,8 @@ private static ConsumedTopic consumedTopics() {
private static NonEmptyTopic nonEmptyTopics() {
return new NonEmptyTopic();
}

private static ReservedIfTopicNotMatchingProps reservedTopicsNotMatchingProps(Map<String, String> props) {
return new ReservedIfTopicNotMatchingProps(props);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.statnett.k3a.topicterminator.strategy;

import org.apache.kafka.clients.admin.AdminClient;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;


public class AndOperation implements ReservedTopic {

private final ReservedTopic[] parts;

public AndOperation(ReservedTopic... parts) {
this.parts = parts;
}

@Override
public Set<String> filter(AdminClient client, Set<String> topicNames) throws ExecutionException, InterruptedException {
if (parts.length == 0) {
return topicNames;
}

HashSet<String> reserved = new HashSet<>();
for (ReservedTopic reservedTopic : parts) {
reserved.addAll(reservedTopic.filter(client, new HashSet<>(topicNames)));
}
topicNames.retainAll(reserved);
return topicNames;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.statnett.k3a.topicterminator.strategy;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigResource;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet;

public class ReservedIfTopicNotMatchingProps implements ReservedTopic {
private final Set<Map.Entry<String, String>> matchingProps;

public ReservedIfTopicNotMatchingProps(Map<String, String> matchingProps) {
if (matchingProps == null) {
this.matchingProps = emptySet();
} else {
this.matchingProps = unmodifiableSet(matchingProps.entrySet());
}
}

@Override
public Set<String> filter(AdminClient client, Set<String> topicNames) throws ExecutionException, InterruptedException {
if (matchingProps.isEmpty()) {
return emptySet();
}

List<ConfigResource> topicResources = topicNames.stream()
.map(t -> new ConfigResource(ConfigResource.Type.TOPIC, t))
.toList();

Map<String, Map<String, String>> topicProps = client.describeConfigs(topicResources).all().get().entrySet().stream()
.collect(Collectors.toMap(
entry -> entry.getKey().name(),
entry -> entry.getValue().entries().stream().collect(
Collectors.toMap(ConfigEntry::name, ConfigEntry::value)
)
)
);

return topicNames.stream()
.filter(t -> topicProps.get(t).entrySet().containsAll(matchingProps))
.collect(Collectors.toSet());
}
}
13 changes: 11 additions & 2 deletions src/test/java/io/statnett/k3a/topicterminator/ApplicationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ApplicationTest {
public static final String TOPIC_INTERNAL = "_schemas";
public static final String TOPIC_UNUSED = "topic-unused";
public static final String TOPIC_WITH_DATA = "topic-with-data";
public static final String TOPIC_WITH_DATA_COMPACT = "topic-with-data-compact";
public static final String TOPIC_BLESSED_BY_REGEX = "blessed-topic";
public static final String TOPIC_BLESSED_BY_NAME = "topic-foo";

Expand All @@ -54,6 +55,7 @@ public class ApplicationTest {
void testTerminateUnusedTopics() throws Exception {
// Put some data on topic-with-data topic
kafkaTemplate.send(TOPIC_WITH_DATA, "foo").get();
kafkaTemplate.send(TOPIC_WITH_DATA_COMPACT, "key", "value").get();

// Wait until consumer is started and registered in cluster
try (AdminClient client1 = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
Expand All @@ -69,13 +71,13 @@ void testTerminateUnusedTopics() throws Exception {

assertThat(allTopics)
.contains(TOPIC_CONSUMED, TOPIC_INTERNAL, TOPIC_WITH_DATA, TOPIC_BLESSED_BY_REGEX, TOPIC_BLESSED_BY_NAME)
.doesNotContain(TOPIC_UNUSED);
.doesNotContain(TOPIC_UNUSED, TOPIC_WITH_DATA_COMPACT);
}

// Assert delete of topic increases metrics counter
assertThat(meterRegistry.find("topic.deleted.total").counter())
.isNotNull()
.matches(counter -> counter.count() == 1);
.matches(counter -> counter.count() == 2);
}

@TestConfiguration
Expand Down Expand Up @@ -109,6 +111,13 @@ public NewTopic topicWithData() {
.build();
}

@Bean
public NewTopic topicWithDataCompact() {
return TopicBuilder.name(TOPIC_WITH_DATA_COMPACT)
.compact()
.build();
}

@Bean
public NewTopic topicBlessedByRegex() {
return TopicBuilder.name(TOPIC_BLESSED_BY_REGEX)
Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/config/application.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
app:
dry-run: false
blessed-topics: ^blessed.*,topic-foo
non-empty-topics-matching-props:
cleanup.policy: compact

0 comments on commit 68c4379

Please sign in to comment.