Skip to content

Commit

Permalink
Remove dead code.
Browse files Browse the repository at this point in the history
  • Loading branch information
shk3 committed Dec 27, 2024
1 parent 3ee3f4d commit 015ff05
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class DumpCommand extends BaseCommand<ConfiguredTopic> {
public int run() {
try (AdminClient kafka = KafkaAdminClient.create(kafkaConfig())) {
TopicService topicService = new TopicServiceImpl(kafka, true);
Collection<ConfiguredTopic> existing = topicService.listExisting(true).values()
Collection<ConfiguredTopic> existing = topicService.listExisting().values()
.stream()
.sorted(Comparator.comparing(ConfiguredTopic::getName))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public TopicEnforcer(
boolean safemode) {
super(
configuredTopics,
() -> topicService.listExisting(true).values(),
() -> topicService.listExisting().values(),
(t1, t2) -> t1.getName().equals(t2.getName()),
safemode);
checkArgument(
Expand Down Expand Up @@ -88,7 +88,7 @@ List<ConfiguredTopic> topicsWithConfigDrift(
if (this.configured.isEmpty()) {
return Collections.emptyList();
}
Map<String, ConfiguredTopic> existing = this.topicService.listExisting(true);
Map<String, ConfiguredTopic> existing = this.topicService.listExisting();
return Collections.unmodifiableList(
this.configured.stream()
.filter(t -> existing.containsKey(t.getName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ public interface TopicService {
/**
* Load existing topics from a kafka cluster.
*
* @param excludeInternal if true kafka internal topics are excluded
* @return a collection of topics.
*/
Map<String, ConfiguredTopic> listExisting(boolean excludeInternal);
Map<String, ConfiguredTopic> listExisting();

/**
* Create new topics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

public class TopicServiceImpl implements TopicService {

private static final ListTopicsOptions INCLUDE_INTERNAL = new ListTopicsOptions().listInternal(true);
private static final ListTopicsOptions EXCLUDE_INTERNAL = new ListTopicsOptions().listInternal(false);
private final AdminClient adminClient;
private final boolean dryRun;
Expand Down Expand Up @@ -94,11 +93,9 @@ static boolean isNonDefault(ConfigEntry e) {
}

@Override
public Map<String, ConfiguredTopic> listExisting(boolean excludeInternal) {
public Map<String, ConfiguredTopic> listExisting() {
try {
Set<String> topics = adminClient
.listTopics(excludeInternal ? EXCLUDE_INTERNAL : INCLUDE_INTERNAL)
.names().get();
Set<String> topics = adminClient.listTopics(EXCLUDE_INTERNAL).names().get();
Collection<TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get().values();

List<ConfigResource> resources = topics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ public void testPartitionIncrease() {
"d", new ConfiguredTopic("b", 1, (short) 1, Collections.emptyMap())
);

when(service.listExisting(true)).thenReturn(existing);
when(service.listExisting()).thenReturn(existing);
TopicEnforcer enforcer = new TopicEnforcer(service, configured, true);
enforcer.increasePartitions();

// topic 'b' and 'c' should be subject to config alteration
List<ConfiguredTopic> expected = Arrays.asList(configured.get(1), configured.get(2));
verify(service).listExisting(true);
verify(service).listExisting();
verify(service).increasePartitions(expected);
verifyNoMoreInteractions(service);
}
Expand All @@ -91,13 +91,13 @@ public void testAlterConfiguration() {
"d", new ConfiguredTopic("d", 1, (short) 1, Collections.singletonMap("k", "v"))
);

when(service.listExisting(true)).thenReturn(existing);
when(service.listExisting()).thenReturn(existing);
TopicEnforcer enforcer = new TopicEnforcer(service, configured, true);
enforcer.alterConfiguration();

// topic 'a' and 'd' should be subject to config alteration
List<ConfiguredTopic> expected = Arrays.asList(configured.get(0), configured.get(3));
verify(service).listExisting(true);
verify(service).listExisting();
verify(service).alterConfiguration(expected);
verifyNoMoreInteractions(service);
}
Expand All @@ -108,7 +108,7 @@ public void testPartitionIncreaseUnSafeMode() {
new ConfiguredTopic("a", 1000, (short) 3, Collections.emptyMap()));
Map<String, ConfiguredTopic> existing = Collections.singletonMap("a",
new ConfiguredTopic("a", 100, (short) 3, Collections.emptyMap()));
when(service.listExisting(true)).thenReturn(existing);
when(service.listExisting()).thenReturn(existing);
enforcer = new TopicEnforcer(service, configured, false);
Assert.assertEquals("aggressive partition count increase must be allowed in unsafe mode", configured,
enforcer.increasePartitions());
Expand All @@ -121,7 +121,7 @@ public void testConfigUpdateUnSafeMode() {
new ConfiguredTopic("a", 10, (short) 3, risky));
Map<String, ConfiguredTopic> existing = Collections.singletonMap("a",
new ConfiguredTopic("a", 10, (short) 3, Collections.emptyMap()));
when(service.listExisting(true)).thenReturn(existing);
when(service.listExisting()).thenReturn(existing);

// attempt a risk config change, it should go through
enforcer = new TopicEnforcer(service, configured,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testListExisting() {
when(adminClient.describeTopics(topicNames)).thenReturn(describeTopicsResult);
when(adminClient.describeConfigs(any(Collection.class))).thenReturn(describeConfigsResult);

Map<String, ConfiguredTopic> actual = service.listExisting(true);
Map<String, ConfiguredTopic> actual = service.listExisting();
Assert.assertEquals(2, actual.size());
Assert.assertEquals(new HashSet<>(Arrays.asList("a", "b")), actual.keySet());
}
Expand Down

0 comments on commit 015ff05

Please sign in to comment.