From 6e7b51fb4d7c9e47ee8260d42e9a53f6be1fbbe1 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 26 Dec 2025 16:38:38 +0200 Subject: [PATCH] refactor(inkless): return LogConfig from MetadataView#getTopicConfig Change MetadataView#getTopicConfig to return LogConfig instead of Properties, consolidating the responsibility of merging default and topic-specific configurations into the MetadataView implementation. This simplifies callers by: - Removing redundant LogConfig.fromProps() calls in RetentionEnforcer and AppendHandler - Eliminating the need for callers to access default configs separately - Reducing test setup by removing mock configuration for default configs The InklessMetadataView now handles merging default configs with topic overrides internally, providing a fully-resolved LogConfig to consumers. --- .../server/metadata/InklessMetadataView.scala | 8 +- .../metadata/InklessMetadataViewTest.scala | 108 +++++++++++++++++- .../inkless/control_plane/MetadataView.java | 4 +- .../inkless/delete/RetentionEnforcer.java | 3 +- .../aiven/inkless/produce/AppendHandler.java | 16 ++- .../delete/FileCleanerIntegrationTest.java | 4 +- .../inkless/delete/RetentionEnforcerTest.java | 82 ++++++------- .../merge/FileMergerIntegrationTest.java | 4 +- .../inkless/produce/AppendHandlerTest.java | 60 ++-------- 9 files changed, 173 insertions(+), 116 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala b/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala index aec21b1d19..133518690a 100644 --- a/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala +++ b/core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala @@ -22,8 +22,8 @@ import io.aiven.inkless.control_plane.MetadataView import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.{Node, TopicIdPartition, Uuid} +import org.apache.kafka.storage.internals.log.LogConfig -import java.util.Properties import java.util.function.Supplier import java.util.stream.{Collectors, IntStream} import java.{lang, util} @@ -55,10 +55,6 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf metadataCache.topicConfig(topicName).getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean } - override def getTopicConfig(topicName: String): Properties = { - metadataCache.topicConfig(topicName) - } - override def getDisklessTopicPartitions: util.Set[TopicIdPartition] = { metadataCache.getAllTopics().stream() .filter(isDisklessTopic) @@ -66,4 +62,6 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf .mapToObj(p => new TopicIdPartition(metadataCache.getTopicId(t), p, t))) .collect(Collectors.toSet[TopicIdPartition]()) } + + override def getTopicConfig(topicName: String): LogConfig = LogConfig.fromProps(getDefaultConfig(), metadataCache.topicConfig(topicName)) } diff --git a/core/src/test/scala/kafka/server/metadata/InklessMetadataViewTest.scala b/core/src/test/scala/kafka/server/metadata/InklessMetadataViewTest.scala index 0c243467c7..4b610e58d1 100644 --- a/core/src/test/scala/kafka/server/metadata/InklessMetadataViewTest.scala +++ b/core/src/test/scala/kafka/server/metadata/InklessMetadataViewTest.scala @@ -19,7 +19,7 @@ package kafka.server.metadata import org.apache.kafka.common.config.TopicConfig -import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.{BeforeEach, Nested, Test} import org.junit.jupiter.api.Assertions._ import org.mockito.Mockito._ @@ -124,4 +124,110 @@ class InklessMetadataViewTest { props } + @Nested + class GetTopicConfigTest { + @Test + def testMergesDefaultConfigsWithTopicOverrides(): Unit = { + // Setup default configs + val defaultConfig = new util.HashMap[String, Object]() + defaultConfig.put(TopicConfig.RETENTION_MS_CONFIG, "86400000") // 1 day + defaultConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1073741824") // 1 GB + when(configSupplier.get()).thenReturn(defaultConfig) + + // Setup topic-specific overrides + val topicOverrides = new Properties() + topicOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "604800000") // 7 days - overrides default + when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides) + + // Call the method under test + val logConfig = metadataView.getTopicConfig("test-topic") + + // Verify topic override takes precedence + assertEquals(604800000L, logConfig.retentionMs) + // Verify default is used when no override exists + assertEquals(1073741824L, logConfig.retentionSize) + } + + @Test + def testTopicOverridesCompletelyReplaceDefaults(): Unit = { + // Setup default configs + val defaultConfig = new util.HashMap[String, Object]() + defaultConfig.put(TopicConfig.RETENTION_MS_CONFIG, "86400000") + defaultConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1073741824") + when(configSupplier.get()).thenReturn(defaultConfig) + + // Setup topic-specific overrides for both configs + val topicOverrides = new Properties() + topicOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "3600000") // 1 hour + topicOverrides.put(TopicConfig.RETENTION_BYTES_CONFIG, "536870912") // 512 MB + when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides) + + // Call the method under test + val logConfig = metadataView.getTopicConfig("test-topic") + + // Verify both values are from topic overrides + assertEquals(3600000L, logConfig.retentionMs) + assertEquals(536870912L, logConfig.retentionSize) + } + + @Test + def testEmptyTopicConfigUsesDefaults(): Unit = { + // Setup default configs + val defaultConfig = new util.HashMap[String, Object]() + defaultConfig.put(TopicConfig.RETENTION_MS_CONFIG, "86400000") + defaultConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1073741824") + when(configSupplier.get()).thenReturn(defaultConfig) + + // Setup empty topic overrides + val topicOverrides = new Properties() + when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides) + + // Call the method under test + val logConfig = metadataView.getTopicConfig("test-topic") + + // Verify default values are used + assertEquals(86400000L, logConfig.retentionMs) + assertEquals(1073741824L, logConfig.retentionSize) + } + + @Test + def testNullValuesInDefaultConfigAreFiltered(): Unit = { + // Setup default configs with null values + val defaultConfig = new util.HashMap[String, Object]() + defaultConfig.put(TopicConfig.RETENTION_MS_CONFIG, "86400000") + defaultConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, null) // null value should be filtered + when(configSupplier.get()).thenReturn(defaultConfig) + + // Setup empty topic overrides + val topicOverrides = new Properties() + when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides) + + // Call the method under test - should not throw due to null filtering + val logConfig = metadataView.getTopicConfig("test-topic") + + // Verify the non-null default is applied + assertEquals(86400000L, logConfig.retentionMs) + // Verify the LogConfig default (-1) is used for the filtered null value + assertEquals(-1L, logConfig.retentionSize) + } + + @Test + def testEmptyDefaultConfigWithTopicOverrides(): Unit = { + // Setup empty default configs + val defaultConfig = new util.HashMap[String, Object]() + when(configSupplier.get()).thenReturn(defaultConfig) + + // Setup topic-specific overrides + val topicOverrides = new Properties() + topicOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "7200000") // 2 hours + when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides) + + // Call the method under test + val logConfig = metadataView.getTopicConfig("test-topic") + + // Verify topic override is applied + assertEquals(7200000L, logConfig.retentionMs) + } + } + } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/MetadataView.java b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/MetadataView.java index 34d2529739..e7ae26d4ae 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/control_plane/MetadataView.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/control_plane/MetadataView.java @@ -21,9 +21,9 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.storage.internals.log.LogConfig; import java.util.Map; -import java.util.Properties; import java.util.Set; public interface MetadataView { @@ -37,7 +37,7 @@ public interface MetadataView { boolean isDisklessTopic(String topicName); - Properties getTopicConfig(String topicName); + LogConfig getTopicConfig(String topicName); Set getDisklessTopicPartitions(); } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/delete/RetentionEnforcer.java b/storage/inkless/src/main/java/io/aiven/inkless/delete/RetentionEnforcer.java index 0e762bdfdd..cbc3aa03fd 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/delete/RetentionEnforcer.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/delete/RetentionEnforcer.java @@ -94,8 +94,7 @@ private synchronized void runUnsafe() { final List readyPartitions = retentionEnforcementScheduler.getReadyPartitions(); final Map topicConfigs = new HashMap<>(); for (final TopicIdPartition partition : readyPartitions) { - final LogConfig topicConfig = topicConfigs.computeIfAbsent(partition.topic(), - t -> LogConfig.fromProps(metadataView.getDefaultConfig(), metadataView.getTopicConfig(t))); + final LogConfig topicConfig = topicConfigs.computeIfAbsent(partition.topic(), metadataView::getTopicConfig); // This check must be done here and not at scheduling, because the config may change at any moment. if (topicConfig.delete) { diff --git a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java index 53afcff96d..daad8759ea 100644 --- a/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java +++ b/storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.stream.Collectors; import io.aiven.inkless.common.SharedState; @@ -44,14 +45,13 @@ public class AppendHandler implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(AppendHandler.class); - private final SharedState state; + private final Function getLogConfig; private final Writer writer; @DoNotMutate @CoverageIgnore public AppendHandler(final SharedState state) { this( - state, new Writer( state.time(), state.brokerId(), @@ -67,14 +67,14 @@ public AppendHandler(final SharedState state) { state.config().produceUploadBackoff(), state.config().produceUploadThreadPoolSize(), state.brokerTopicStats() - ) + ), + state.metadata()::getTopicConfig ); } // Visible for tests - AppendHandler(final SharedState state, - final Writer writer) { - this.state = state; + AppendHandler(final Writer writer, final Function getLogConfig) { + this.getLogConfig = getLogConfig; this.writer = writer; } @@ -113,11 +113,9 @@ private boolean requestContainsTransactionalProduce(final Map getLogConfigs(final Set topicIdPartitions) { - final Map defaultTopicConfigs = state.defaultTopicConfigs().get().originals(); final Map result = new HashMap<>(); for (final TopicIdPartition tp : topicIdPartitions) { - final var overrides = state.metadata().getTopicConfig(tp.topic()); - result.put(tp.topic(), LogConfig.fromProps(defaultTopicConfigs, overrides)); + result.put(tp.topic(), getLogConfig.apply(tp.topic())); } return result; } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java index 221bd56242..bbdafce219 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/FileCleanerIntegrationTest.java @@ -55,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -145,8 +144,7 @@ static void tearDownS3() { @BeforeEach void setup() { - when(metadataView.getTopicConfig(anyString())).thenReturn(new Properties()); - when(defaultTopicConfigs.get()).thenReturn(new LogConfig(Map.of())); + when(metadataView.getTopicConfig(anyString())).thenReturn(new LogConfig(Map.of())); controlPlane = new InMemoryControlPlane(time); controlPlane.configure(Map.of()); diff --git a/storage/inkless/src/test/java/io/aiven/inkless/delete/RetentionEnforcerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/delete/RetentionEnforcerTest.java index d9ff98c49f..6004b59672 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/delete/RetentionEnforcerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/delete/RetentionEnforcerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.storage.internals.log.LogConfig; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -76,81 +77,82 @@ class RetentionEnforcerTest { @Nested class RetentionSettings { @Test - void fullDefault() { + void fullDefault() throws Exception { when(retentionEnforcementScheduler.getReadyPartitions()).thenReturn(List.of(T0P0)); - when(metadataView.getDefaultConfig()).thenReturn(Map.of()); - when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - - final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0); - enforcer.run(); - - verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0)); - assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(-1L); - assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(604800000L); + when(metadataView.getTopicConfig(any())).thenReturn(new LogConfig(Map.of())); + try (final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0)) { + enforcer.run(); + + verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0)); + assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(-1L); + assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(604800000L); + } } @Test - void logConfig() { + void logConfig() throws Exception { when(retentionEnforcementScheduler.getReadyPartitions()).thenReturn(List.of(T0P0)); - when(metadataView.getDefaultConfig()).thenReturn(Map.of( + when(metadataView.getTopicConfig(any())).thenReturn(new LogConfig(Map.of( RETENTION_BYTES_CONFIG, "123", RETENTION_MS_CONFIG, "567" - )); - when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); + ))); - final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0); - enforcer.run(); + try (final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0)) { + enforcer.run(); - verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0)); - assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(123L); - assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(567L); + verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0)); + assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(123L); + assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(567L); + } } @Test - void definedForTopic() { + void definedForTopic() throws Exception { when(retentionEnforcementScheduler.getReadyPartitions()).thenReturn(List.of(T0P0)); - when(metadataView.getDefaultConfig()).thenReturn(Map.of( + final var defaultConfigs = Map.of( RETENTION_BYTES_CONFIG, "123", RETENTION_MS_CONFIG, "567" - )); + ); final Properties topicConfig = new Properties(); topicConfig.put(RETENTION_BYTES_CONFIG, "123000"); topicConfig.put(RETENTION_MS_CONFIG, "567000"); - when(metadataView.getTopicConfig(any())).thenReturn(topicConfig); + when(metadataView.getTopicConfig(any())).thenReturn(LogConfig.fromProps(defaultConfigs, topicConfig)); - final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0); - enforcer.run(); + try (final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0)) { + enforcer.run(); - verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0)); - assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(123000L); - assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(567000L); + verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0)); + assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(123000L); + assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(567000L); + } } } @Test - void onlyDeleteTopics() { + void onlyDeleteTopics() throws Exception { when(retentionEnforcementScheduler.getReadyPartitions()).thenReturn(List.of(T0P0, T1P0, T2P0)); - when(metadataView.getDefaultConfig()).thenReturn(Map.of( + final var defaultConfigs = Map.of( RETENTION_BYTES_CONFIG, "123", RETENTION_MS_CONFIG, "567" - )); + ); final var t0Config = new Properties(); t0Config.put(CLEANUP_POLICY_CONFIG, "compact"); - when(metadataView.getTopicConfig(eq(TOPIC_0))).thenReturn(t0Config); + when(metadataView.getTopicConfig(eq(TOPIC_0))).thenReturn(LogConfig.fromProps(defaultConfigs, t0Config)); final var t1Config = new Properties(); t1Config.put(CLEANUP_POLICY_CONFIG, "delete,compact"); - when(metadataView.getTopicConfig(eq(TOPIC_1))).thenReturn(t1Config); + when(metadataView.getTopicConfig(eq(TOPIC_1))).thenReturn(LogConfig.fromProps(defaultConfigs, t1Config)); final var t2Config = new Properties(); t2Config.put(CLEANUP_POLICY_CONFIG, "delete"); - when(metadataView.getTopicConfig(eq(TOPIC_2))).thenReturn(t2Config); + when(metadataView.getTopicConfig(eq(TOPIC_2))).thenReturn(LogConfig.fromProps(defaultConfigs, t2Config)); - final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0); - enforcer.run(); + try (final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0)) { + enforcer.run(); - verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0)); - assertThat(requestCaptor.getValue()) - .map(EnforceRetentionRequest::topicId) - .containsExactly(TOPIC_ID_1, TOPIC_ID_2); + verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0)); + assertThat(requestCaptor.getValue()) + .map(EnforceRetentionRequest::topicId) + .containsExactly(TOPIC_ID_1, TOPIC_ID_2); + } } } diff --git a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java index e0d526dcec..2df1a2e26d 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerIntegrationTest.java @@ -57,7 +57,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -156,8 +155,7 @@ static void tearDownS3() { @BeforeEach void setup() { - when(metadataView.getTopicConfig(anyString())).thenReturn(new Properties()); - when(defaultTopicConfigs.get()).thenReturn(new LogConfig(Map.of())); + when(metadataView.getTopicConfig(anyString())).thenReturn(new LogConfig(Map.of())); controlPlane = new InMemoryControlPlane(time); controlPlane.configure(Map.of( diff --git a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java index e1dc6bfbe2..7d05e54bc0 100644 --- a/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java +++ b/storage/inkless/src/test/java/io/aiven/inkless/produce/AppendHandlerTest.java @@ -26,11 +26,8 @@ import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.RequestLocal; import org.apache.kafka.storage.internals.log.LogConfig; -import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -41,14 +38,8 @@ import java.io.IOException; import java.util.Map; -import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -import io.aiven.inkless.common.SharedState; -import io.aiven.inkless.config.InklessConfig; -import io.aiven.inkless.control_plane.ControlPlane; -import io.aiven.inkless.control_plane.MetadataView; +import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -61,22 +52,11 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class AppendHandlerTest { - static final int BROKER_ID = 11; - - static final Supplier DEFAULT_TOPIC_CONFIGS = () -> new LogConfig(Map.of()); + static final Function GET_LOG_CONFIGS = (topicName) -> new LogConfig(Map.of()); - Time time = new MockTime(); RequestLocal requestLocal = RequestLocal.noCaching(); @Mock - InklessConfig inklessConfig; - @Mock - MetadataView metadataView; - @Mock - ControlPlane controlPlane; - @Mock Writer writer; - @Mock - BrokerTopicStats brokerTopicStats; private static final MemoryRecords TRANSACTIONAL_RECORDS = MemoryRecords.withTransactionalRecords( Compression.NONE, @@ -100,11 +80,7 @@ public class AppendHandlerTest { @Test public void rejectTransactionalProduce() throws Exception { - try ( - final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - brokerTopicStats, DEFAULT_TOPIC_CONFIGS); - final AppendHandler interceptor = new AppendHandler(sharedState, writer) - ) { + try (final AppendHandler interceptor = new AppendHandler(writer, GET_LOG_CONFIGS)) { final TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless1"); final TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "inkless2"); @@ -126,11 +102,7 @@ topicIdPartition2, new PartitionResponse(Errors.INVALID_REQUEST) @Test public void emptyRequests() throws Exception { - try ( - final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - brokerTopicStats, DEFAULT_TOPIC_CONFIGS); - final AppendHandler interceptor = new AppendHandler(sharedState, writer) - ) { + try (final AppendHandler interceptor = new AppendHandler(writer, GET_LOG_CONFIGS)) { final Map entriesPerPartition = Map.of(); @@ -155,13 +127,7 @@ topicIdPartition, new PartitionResponse(Errors.NONE) CompletableFuture.completedFuture(writeResult) ); - when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try ( - final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - brokerTopicStats, DEFAULT_TOPIC_CONFIGS); - final AppendHandler interceptor = new AppendHandler(sharedState, writer) - ) { - + try (final AppendHandler interceptor = new AppendHandler(writer, GET_LOG_CONFIGS)) { final var result = interceptor.handle(entriesPerPartition, requestLocal).get(); assertThat(result).isEqualTo(writeResult); } @@ -180,25 +146,17 @@ public void writeFutureFailed() throws Exception { CompletableFuture.failedFuture(exception) ); - when(metadataView.getTopicConfig(any())).thenReturn(new Properties()); - try ( - final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - brokerTopicStats, DEFAULT_TOPIC_CONFIGS); - final AppendHandler interceptor = new AppendHandler(sharedState, writer) - ) { + try (final AppendHandler interceptor = new AppendHandler(writer, GET_LOG_CONFIGS)) { assertThatThrownBy(() -> interceptor.handle(entriesPerPartition, requestLocal).get()).hasCause(exception); } } @Test public void close() throws IOException { - try (final SharedState sharedState = SharedState.initialize(time, BROKER_ID, inklessConfig, metadataView, controlPlane, - brokerTopicStats, DEFAULT_TOPIC_CONFIGS)) { - final AppendHandler interceptor = new AppendHandler(sharedState, writer); + final AppendHandler interceptor = new AppendHandler(writer, GET_LOG_CONFIGS); - interceptor.close(); + interceptor.close(); - verify(writer).close(); - } + verify(writer).close(); } }