diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/DumpCommand.java b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/DumpCommand.java index dae7922..c1887c8 100644 --- a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/DumpCommand.java +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/DumpCommand.java @@ -24,7 +24,7 @@ public class DumpCommand extends BaseCommand { public int run() { try (AdminClient kafka = KafkaAdminClient.create(kafkaConfig())) { TopicService topicService = new TopicServiceImpl(kafka, true); - Collection existing = topicService.listExisting(true).values() + Collection existing = topicService.listExisting().values() .stream() .sorted(Comparator.comparing(ConfiguredTopic::getName)) .collect(Collectors.toList()); diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicEnforcer.java b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicEnforcer.java index 4a4ce45..9ffa33e 100644 --- a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicEnforcer.java +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicEnforcer.java @@ -54,7 +54,7 @@ public TopicEnforcer( boolean safemode) { super( configuredTopics, - () -> topicService.listExisting(true).values(), + () -> topicService.listExisting().values(), (t1, t2) -> t1.getName().equals(t2.getName()), safemode); checkArgument( @@ -88,7 +88,7 @@ List topicsWithConfigDrift( if (this.configured.isEmpty()) { return Collections.emptyList(); } - Map existing = this.topicService.listExisting(true); + Map existing = this.topicService.listExisting(); return Collections.unmodifiableList( this.configured.stream() .filter(t -> existing.containsKey(t.getName())) diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicService.java b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicService.java index bd91d43..66bbfcf 100644 --- a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicService.java +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicService.java @@ -19,10 +19,9 @@ public interface TopicService { /** * Load existing topics from a kafka cluster. * - * @param excludeInternal if true kafka internal topics are excluded * @return a collection of topics. */ - Map listExisting(boolean excludeInternal); + Map listExisting(); /** * Create new topics. diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicServiceImpl.java b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicServiceImpl.java index 3a26871..e018081 100644 --- a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicServiceImpl.java +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/TopicServiceImpl.java @@ -30,7 +30,6 @@ public class TopicServiceImpl implements TopicService { - private static final ListTopicsOptions INCLUDE_INTERNAL = new ListTopicsOptions().listInternal(true); private static final ListTopicsOptions EXCLUDE_INTERNAL = new ListTopicsOptions().listInternal(false); private final AdminClient adminClient; private final boolean dryRun; @@ -94,11 +93,9 @@ static boolean isNonDefault(ConfigEntry e) { } @Override - public Map listExisting(boolean excludeInternal) { + public Map listExisting() { try { - Set topics = adminClient - .listTopics(excludeInternal ? EXCLUDE_INTERNAL : INCLUDE_INTERNAL) - .names().get(); + Set topics = adminClient.listTopics(EXCLUDE_INTERNAL).names().get(); Collection topicDescriptions = adminClient.describeTopics(topics).all().get().values(); List resources = topics diff --git a/kafka_topic_enforcer/src/test/java/com/tesla/data/topic/enforcer/TopicEnforcerTest.java b/kafka_topic_enforcer/src/test/java/com/tesla/data/topic/enforcer/TopicEnforcerTest.java index 7841e6b..fc2630e 100644 --- a/kafka_topic_enforcer/src/test/java/com/tesla/data/topic/enforcer/TopicEnforcerTest.java +++ b/kafka_topic_enforcer/src/test/java/com/tesla/data/topic/enforcer/TopicEnforcerTest.java @@ -61,13 +61,13 @@ public void testPartitionIncrease() { "d", new ConfiguredTopic("b", 1, (short) 1, Collections.emptyMap()) ); - when(service.listExisting(true)).thenReturn(existing); + when(service.listExisting()).thenReturn(existing); TopicEnforcer enforcer = new TopicEnforcer(service, configured, true); enforcer.increasePartitions(); // topic 'b' and 'c' should be subject to config alteration List expected = Arrays.asList(configured.get(1), configured.get(2)); - verify(service).listExisting(true); + verify(service).listExisting(); verify(service).increasePartitions(expected); verifyNoMoreInteractions(service); } @@ -91,13 +91,13 @@ public void testAlterConfiguration() { "d", new ConfiguredTopic("d", 1, (short) 1, Collections.singletonMap("k", "v")) ); - when(service.listExisting(true)).thenReturn(existing); + when(service.listExisting()).thenReturn(existing); TopicEnforcer enforcer = new TopicEnforcer(service, configured, true); enforcer.alterConfiguration(); // topic 'a' and 'd' should be subject to config alteration List expected = Arrays.asList(configured.get(0), configured.get(3)); - verify(service).listExisting(true); + verify(service).listExisting(); verify(service).alterConfiguration(expected); verifyNoMoreInteractions(service); } @@ -108,7 +108,7 @@ public void testPartitionIncreaseUnSafeMode() { new ConfiguredTopic("a", 1000, (short) 3, Collections.emptyMap())); Map existing = Collections.singletonMap("a", new ConfiguredTopic("a", 100, (short) 3, Collections.emptyMap())); - when(service.listExisting(true)).thenReturn(existing); + when(service.listExisting()).thenReturn(existing); enforcer = new TopicEnforcer(service, configured, false); Assert.assertEquals("aggressive partition count increase must be allowed in unsafe mode", configured, enforcer.increasePartitions()); @@ -121,7 +121,7 @@ public void testConfigUpdateUnSafeMode() { new ConfiguredTopic("a", 10, (short) 3, risky)); Map existing = Collections.singletonMap("a", new ConfiguredTopic("a", 10, (short) 3, Collections.emptyMap())); - when(service.listExisting(true)).thenReturn(existing); + when(service.listExisting()).thenReturn(existing); // attempt a risk config change, it should go through enforcer = new TopicEnforcer(service, configured, diff --git a/kafka_topic_enforcer/src/test/java/com/tesla/data/topic/enforcer/TopicServiceImplTest.java b/kafka_topic_enforcer/src/test/java/com/tesla/data/topic/enforcer/TopicServiceImplTest.java index 4a3a69f..0b9514a 100644 --- a/kafka_topic_enforcer/src/test/java/com/tesla/data/topic/enforcer/TopicServiceImplTest.java +++ b/kafka_topic_enforcer/src/test/java/com/tesla/data/topic/enforcer/TopicServiceImplTest.java @@ -122,7 +122,7 @@ public void testListExisting() { when(adminClient.describeTopics(topicNames)).thenReturn(describeTopicsResult); when(adminClient.describeConfigs(any(Collection.class))).thenReturn(describeConfigsResult); - Map actual = service.listExisting(true); + Map actual = service.listExisting(); Assert.assertEquals(2, actual.size()); Assert.assertEquals(new HashSet<>(Arrays.asList("a", "b")), actual.keySet()); }