Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import io.aiven.inkless.control_plane.MetadataView
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Node, TopicIdPartition, Uuid}
import org.apache.kafka.storage.internals.log.LogConfig

import java.util.Properties
import java.util.function.Supplier
import java.util.stream.{Collectors, IntStream}
import java.{lang, util}
Expand Down Expand Up @@ -55,15 +55,13 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf
metadataCache.topicConfig(topicName).getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
}

override def getTopicConfig(topicName: String): Properties = {
metadataCache.topicConfig(topicName)
}

override def getDisklessTopicPartitions: util.Set[TopicIdPartition] = {
metadataCache.getAllTopics().stream()
.filter(isDisklessTopic)
.flatMap(t => IntStream.range(0, metadataCache.numPartitions(t).get())
.mapToObj(p => new TopicIdPartition(metadataCache.getTopicId(t), p, t)))
.collect(Collectors.toSet[TopicIdPartition]())
}

override def getTopicConfig(topicName: String): LogConfig = LogConfig.fromProps(getDefaultConfig(), metadataCache.topicConfig(topicName))
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package kafka.server.metadata

import org.apache.kafka.common.config.TopicConfig
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.{BeforeEach, Nested, Test}
import org.junit.jupiter.api.Assertions._
import org.mockito.Mockito._

Expand Down Expand Up @@ -124,4 +124,110 @@ class InklessMetadataViewTest {
props
}

@Nested
class GetTopicConfigTest {
@Test
def testMergesDefaultConfigsWithTopicOverrides(): Unit = {
// Setup default configs
val defaultConfig = new util.HashMap[String, Object]()
defaultConfig.put(TopicConfig.RETENTION_MS_CONFIG, "86400000") // 1 day
defaultConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1073741824") // 1 GB
when(configSupplier.get()).thenReturn(defaultConfig)

// Setup topic-specific overrides
val topicOverrides = new Properties()
topicOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "604800000") // 7 days - overrides default
when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides)

// Call the method under test
val logConfig = metadataView.getTopicConfig("test-topic")

// Verify topic override takes precedence
assertEquals(604800000L, logConfig.retentionMs)
// Verify default is used when no override exists
assertEquals(1073741824L, logConfig.retentionSize)
}

@Test
def testTopicOverridesCompletelyReplaceDefaults(): Unit = {
// Setup default configs
val defaultConfig = new util.HashMap[String, Object]()
defaultConfig.put(TopicConfig.RETENTION_MS_CONFIG, "86400000")
defaultConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1073741824")
when(configSupplier.get()).thenReturn(defaultConfig)

// Setup topic-specific overrides for both configs
val topicOverrides = new Properties()
topicOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "3600000") // 1 hour
topicOverrides.put(TopicConfig.RETENTION_BYTES_CONFIG, "536870912") // 512 MB
when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides)

// Call the method under test
val logConfig = metadataView.getTopicConfig("test-topic")

// Verify both values are from topic overrides
assertEquals(3600000L, logConfig.retentionMs)
assertEquals(536870912L, logConfig.retentionSize)
}

@Test
def testEmptyTopicConfigUsesDefaults(): Unit = {
// Setup default configs
val defaultConfig = new util.HashMap[String, Object]()
defaultConfig.put(TopicConfig.RETENTION_MS_CONFIG, "86400000")
defaultConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1073741824")
when(configSupplier.get()).thenReturn(defaultConfig)

// Setup empty topic overrides
val topicOverrides = new Properties()
when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides)

// Call the method under test
val logConfig = metadataView.getTopicConfig("test-topic")

// Verify default values are used
assertEquals(86400000L, logConfig.retentionMs)
assertEquals(1073741824L, logConfig.retentionSize)
}

@Test
def testNullValuesInDefaultConfigAreFiltered(): Unit = {
// Setup default configs with null values
val defaultConfig = new util.HashMap[String, Object]()
defaultConfig.put(TopicConfig.RETENTION_MS_CONFIG, "86400000")
defaultConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, null) // null value should be filtered
when(configSupplier.get()).thenReturn(defaultConfig)

// Setup empty topic overrides
val topicOverrides = new Properties()
when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides)

// Call the method under test - should not throw due to null filtering
val logConfig = metadataView.getTopicConfig("test-topic")

// Verify the non-null default is applied
assertEquals(86400000L, logConfig.retentionMs)
// Verify the LogConfig default (-1) is used for the filtered null value
assertEquals(-1L, logConfig.retentionSize)
}

@Test
def testEmptyDefaultConfigWithTopicOverrides(): Unit = {
// Setup empty default configs
val defaultConfig = new util.HashMap[String, Object]()
when(configSupplier.get()).thenReturn(defaultConfig)

// Setup topic-specific overrides
val topicOverrides = new Properties()
topicOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "7200000") // 2 hours
when(metadataCache.topicConfig("test-topic")).thenReturn(topicOverrides)

// Call the method under test
val logConfig = metadataView.getTopicConfig("test-topic")

// Verify topic override is applied
assertEquals(7200000L, logConfig.retentionMs)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.storage.internals.log.LogConfig;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

public interface MetadataView {
Expand All @@ -37,7 +37,7 @@ public interface MetadataView {

boolean isDisklessTopic(String topicName);

Properties getTopicConfig(String topicName);
LogConfig getTopicConfig(String topicName);

Set<TopicIdPartition> getDisklessTopicPartitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ private synchronized void runUnsafe() {
final List<TopicIdPartition> readyPartitions = retentionEnforcementScheduler.getReadyPartitions();
final Map<String, LogConfig> topicConfigs = new HashMap<>();
for (final TopicIdPartition partition : readyPartitions) {
final LogConfig topicConfig = topicConfigs.computeIfAbsent(partition.topic(),
t -> LogConfig.fromProps(metadataView.getDefaultConfig(), metadataView.getTopicConfig(t)));
final LogConfig topicConfig = topicConfigs.computeIfAbsent(partition.topic(), metadataView::getTopicConfig);

// This check must be done here and not at scheduling, because the config may change at any moment.
if (topicConfig.delete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

import io.aiven.inkless.common.SharedState;

public class AppendHandler implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(AppendHandler.class);

private final SharedState state;
private final Function<String, LogConfig> getLogConfig;
private final Writer writer;

@DoNotMutate
@CoverageIgnore
public AppendHandler(final SharedState state) {
this(
state,
new Writer(
state.time(),
state.brokerId(),
Expand All @@ -67,14 +67,14 @@ public AppendHandler(final SharedState state) {
state.config().produceUploadBackoff(),
state.config().produceUploadThreadPoolSize(),
state.brokerTopicStats()
)
),
state.metadata()::getTopicConfig
);
}

// Visible for tests
AppendHandler(final SharedState state,
final Writer writer) {
this.state = state;
AppendHandler(final Writer writer, final Function<String, LogConfig> getLogConfig) {
this.getLogConfig = getLogConfig;
this.writer = writer;
}

Expand Down Expand Up @@ -113,11 +113,9 @@ private boolean requestContainsTransactionalProduce(final Map<TopicIdPartition,
}

private Map<String, LogConfig> getLogConfigs(final Set<TopicIdPartition> topicIdPartitions) {
final Map<String, Object> defaultTopicConfigs = state.defaultTopicConfigs().get().originals();
final Map<String, LogConfig> result = new HashMap<>();
for (final TopicIdPartition tp : topicIdPartitions) {
final var overrides = state.metadata().getTopicConfig(tp.topic());
result.put(tp.topic(), LogConfig.fromProps(defaultTopicConfigs, overrides));
result.put(tp.topic(), getLogConfig.apply(tp.topic()));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -145,8 +144,7 @@ static void tearDownS3() {

@BeforeEach
void setup() {
when(metadataView.getTopicConfig(anyString())).thenReturn(new Properties());
when(defaultTopicConfigs.get()).thenReturn(new LogConfig(Map.of()));
when(metadataView.getTopicConfig(anyString())).thenReturn(new LogConfig(Map.of()));

controlPlane = new InMemoryControlPlane(time);
controlPlane.configure(Map.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.LogConfig;

import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -76,81 +77,82 @@ class RetentionEnforcerTest {
@Nested
class RetentionSettings {
@Test
void fullDefault() {
void fullDefault() throws Exception {
when(retentionEnforcementScheduler.getReadyPartitions()).thenReturn(List.of(T0P0));
when(metadataView.getDefaultConfig()).thenReturn(Map.of());
when(metadataView.getTopicConfig(any())).thenReturn(new Properties());

final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0);
enforcer.run();

verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0));
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(-1L);
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(604800000L);
when(metadataView.getTopicConfig(any())).thenReturn(new LogConfig(Map.of()));
try (final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0)) {
enforcer.run();

verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0));
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(-1L);
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(604800000L);
}
}

@Test
void logConfig() {
void logConfig() throws Exception {
when(retentionEnforcementScheduler.getReadyPartitions()).thenReturn(List.of(T0P0));
when(metadataView.getDefaultConfig()).thenReturn(Map.of(
when(metadataView.getTopicConfig(any())).thenReturn(new LogConfig(Map.of(
RETENTION_BYTES_CONFIG, "123",
RETENTION_MS_CONFIG, "567"
));
when(metadataView.getTopicConfig(any())).thenReturn(new Properties());
)));

final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0);
enforcer.run();
try (final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0)) {
enforcer.run();

verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0));
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(123L);
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(567L);
verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0));
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(123L);
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(567L);
}
}

@Test
void definedForTopic() {
void definedForTopic() throws Exception {
when(retentionEnforcementScheduler.getReadyPartitions()).thenReturn(List.of(T0P0));
when(metadataView.getDefaultConfig()).thenReturn(Map.of(
final var defaultConfigs = Map.of(
RETENTION_BYTES_CONFIG, "123",
RETENTION_MS_CONFIG, "567"
));
);
final Properties topicConfig = new Properties();
topicConfig.put(RETENTION_BYTES_CONFIG, "123000");
topicConfig.put(RETENTION_MS_CONFIG, "567000");
when(metadataView.getTopicConfig(any())).thenReturn(topicConfig);
when(metadataView.getTopicConfig(any())).thenReturn(LogConfig.fromProps(defaultConfigs, topicConfig));

final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0);
enforcer.run();
try (final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0)) {
enforcer.run();

verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0));
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(123000L);
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(567000L);
verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0));
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionBytes).containsExactly(123000L);
assertThat(requestCaptor.getValue()).map(EnforceRetentionRequest::retentionMs).containsExactly(567000L);
}
}
}

@Test
void onlyDeleteTopics() {
void onlyDeleteTopics() throws Exception {
when(retentionEnforcementScheduler.getReadyPartitions()).thenReturn(List.of(T0P0, T1P0, T2P0));
when(metadataView.getDefaultConfig()).thenReturn(Map.of(
final var defaultConfigs = Map.of(
RETENTION_BYTES_CONFIG, "123",
RETENTION_MS_CONFIG, "567"
));
);

final var t0Config = new Properties();
t0Config.put(CLEANUP_POLICY_CONFIG, "compact");
when(metadataView.getTopicConfig(eq(TOPIC_0))).thenReturn(t0Config);
when(metadataView.getTopicConfig(eq(TOPIC_0))).thenReturn(LogConfig.fromProps(defaultConfigs, t0Config));
final var t1Config = new Properties();
t1Config.put(CLEANUP_POLICY_CONFIG, "delete,compact");
when(metadataView.getTopicConfig(eq(TOPIC_1))).thenReturn(t1Config);
when(metadataView.getTopicConfig(eq(TOPIC_1))).thenReturn(LogConfig.fromProps(defaultConfigs, t1Config));
final var t2Config = new Properties();
t2Config.put(CLEANUP_POLICY_CONFIG, "delete");
when(metadataView.getTopicConfig(eq(TOPIC_2))).thenReturn(t2Config);
when(metadataView.getTopicConfig(eq(TOPIC_2))).thenReturn(LogConfig.fromProps(defaultConfigs, t2Config));

final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0);
enforcer.run();
try (final var enforcer = new RetentionEnforcer(time, metadataView, controlPlane, retentionEnforcementScheduler, 0)) {
enforcer.run();

verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0));
assertThat(requestCaptor.getValue())
.map(EnforceRetentionRequest::topicId)
.containsExactly(TOPIC_ID_1, TOPIC_ID_2);
verify(controlPlane).enforceRetention(requestCaptor.capture(), eq(0));
assertThat(requestCaptor.getValue())
.map(EnforceRetentionRequest::topicId)
.containsExactly(TOPIC_ID_1, TOPIC_ID_2);
}
}
}
Loading
Loading