Skip to content

Commit

Permalink
feat: ability to bless (not terminate) topics by name/regex
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgb committed Oct 21, 2024
1 parent e7a3482 commit a472611
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;

import java.util.Collection;
import java.util.regex.Pattern;

@ConfigurationProperties("app")
@Validated
public class ApplicationProperties {
Expand All @@ -24,6 +27,14 @@ public class ApplicationProperties {
*/
private boolean dryRun;

/**
* Used to specify topics that should retain in the cluster,
* even if the topic is marked for termination by the other rules.
* Can be specified as a list or a comma separated value of topic
* name regular expressions.
*/
private Collection<Pattern> blessedTopics;

public String getFixedRateString() {
return fixedRateString;
}
Expand All @@ -39,4 +50,12 @@ public boolean isDryRun() {
public void setDryRun(boolean dryRun) {
this.dryRun = dryRun;
}

public Collection<Pattern> getBlessedTopics() {
return blessedTopics;
}

public void setBlessedTopics(Collection<Pattern> blessedTopics) {
this.blessedTopics = blessedTopics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.statnett.k3a.topicterminator.strategy.BlessedTopic;
import io.statnett.k3a.topicterminator.strategy.ConsumedTopic;
import io.statnett.k3a.topicterminator.strategy.InternalTopic;
import io.statnett.k3a.topicterminator.strategy.NonEmptyTopic;
Expand Down Expand Up @@ -49,7 +50,8 @@ public void terminateUnusedTopics() throws ExecutionException, InterruptedExcept
Collection<ReservedTopic> reservedTopics = List.of(
new ConsumedTopic(),
new InternalTopic(allTopics),
new NonEmptyTopic()
new NonEmptyTopic(),
new BlessedTopic(allTopics, props.getBlessedTopics())
);

for (ReservedTopic reservedTopic : reservedTopics) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.statnett.k3a.topicterminator.strategy;

import org.apache.kafka.clients.admin.AdminClient;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class BlessedTopic implements ReservedTopic {
private final Set<String> allNames;
private final Collection<Pattern> patterns;

public BlessedTopic(Set<String> allNames, Collection<Pattern> patterns) {
this.allNames = allNames;
this.patterns = patterns;
}

@Override
public Set<String> getNames(AdminClient client) {
if (patterns == null || patterns.isEmpty()) {
return Collections.emptySet();
}
return allNames.stream()
.filter(this::isBlessed)
.collect(Collectors.toSet());
}

private boolean isBlessed(String topicName) {
return patterns.stream()
.anyMatch(pattern -> pattern.matcher(topicName).matches());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class ApplicationTest {
public static final String TOPIC_INTERNAL = "_schemas";
public static final String TOPIC_UNUSED = "topic-unused";
public static final String TOPIC_WITH_DATA = "topic-with-data";
public static final String TOPIC_BLESSED_BY_REGEX = "blessed-topic";
public static final String TOPIC_BLESSED_BY_NAME = "topic-foo";

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
Expand Down Expand Up @@ -66,7 +68,7 @@ void testTerminateUnusedTopics() throws Exception {
Set<String> allTopics = client.listTopics(new ListTopicsOptions().listInternal(true)).names().get();

assertThat(allTopics)
.contains(TOPIC_CONSUMED, TOPIC_INTERNAL, TOPIC_WITH_DATA)
.contains(TOPIC_CONSUMED, TOPIC_INTERNAL, TOPIC_WITH_DATA, TOPIC_BLESSED_BY_REGEX, TOPIC_BLESSED_BY_NAME)
.doesNotContain(TOPIC_UNUSED);
}

Expand Down Expand Up @@ -106,6 +108,18 @@ public NewTopic topicWithData() {
return TopicBuilder.name(TOPIC_WITH_DATA)
.build();
}

@Bean
public NewTopic topicBlessedByRegex() {
return TopicBuilder.name(TOPIC_BLESSED_BY_REGEX)
.build();
}

@Bean
public NewTopic topicBlessedByName() {
return TopicBuilder.name(TOPIC_BLESSED_BY_NAME)
.build();
}
}


Expand Down
1 change: 1 addition & 0 deletions src/test/resources/config/application.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
app:
dry-run: false
blessed-topics: ^blessed.*,topic-foo

0 comments on commit a472611

Please sign in to comment.