diff --git a/Makefile b/Makefile index eda118f825..8e373e82f1 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,9 @@ docs: .PHONY: fmt fmt: - ./gradlew :core:spotlessJavaApply :metadata:spotlessJavaApply :storage:spotlessJavaApply :storage:inkless:spotlessJavaApply + ./gradlew \ + :core:checkstyleMain :core:checkstyleTest :metadata:checkstyleMain :metadata:checkstyleTest :storage:checkstyleMain :storage:checkstyleTest \ + :core:spotlessJavaApply :metadata:spotlessJavaApply :storage:spotlessJavaApply :storage:inkless:spotlessJavaApply .PHONY: test test: diff --git a/core/src/test/java/kafka/server/InklessClusterTest.java b/core/src/test/java/kafka/server/InklessClusterTest.java index 29dbd60e08..17c9e4cf69 100644 --- a/core/src/test/java/kafka/server/InklessClusterTest.java +++ b/core/src/test/java/kafka/server/InklessClusterTest.java @@ -206,10 +206,11 @@ public void produceToDisklessAndClassic() throws Exception { int numRecords = 10; try (Admin admin = AdminClient.create(clientConfigs)) { - final NewTopic disklessTopic = new NewTopic(disklessTopicName, Map.of(0, List.of(0))) - .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); final NewTopic classicTopic = new NewTopic(classicTopicName, Map.of(0, List.of(0))) .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")); + // manual assignment is not supported for diskless topics + final NewTopic disklessTopic = new NewTopic(disklessTopicName, 1, (short) 1) + .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); CreateTopicsResult topics = admin.createTopics(List.of(disklessTopic, classicTopic)); topics.all().get(10, TimeUnit.SECONDS); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 91da41a7d9..5f85c0f18a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -765,7 +765,6 @@ private ApiError createTopic(ControllerRequestContext context, return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Replication factor for diskless topics must be 1 or -1 to use the default value (1)."); } - topic.assignments().clear(); } if (!topic.assignments().isEmpty()) { @@ -779,6 +778,10 @@ private ApiError createTopic(ControllerRequestContext context, "A manual partition assignment was specified, but numPartitions " + "was not set to -1."); } + if (disklessEnabled) { + return new ApiError(INVALID_REQUEST, + "A manual partition assignment cannot be specified for diskless topics."); + } OptionalInt replicationFactor = OptionalInt.empty(); for (CreatableReplicaAssignment assignment : topic.assignments()) { if (newParts.containsKey(assignment.partitionIndex())) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index a114bccffc..4712a8131b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -102,6 +102,7 @@ import org.apache.kafka.server.util.MockRandom; import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -137,6 +138,7 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS; import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICATION_FACTOR; import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT; +import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST; import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION; import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED; import static org.apache.kafka.common.protocol.Errors.NONE; @@ -306,9 +308,23 @@ CreatableTopicResult createTestTopic(String name, int numPartitions, short replicationFactor, short expectedErrorCode) { + return createTestTopic(name, numPartitions, replicationFactor, Map.of(), expectedErrorCode); + } + + CreatableTopicResult createTestTopic(String name, + int numPartitions, + short replicationFactor, + Map configs, + short expectedErrorCode) { CreateTopicsRequestData request = new CreateTopicsRequestData(); CreatableTopic topic = new CreatableTopic().setName(name); topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor); + configs.forEach((key, value) -> topic.configs().add( + new CreateTopicsRequestData.CreatableTopicConfig() + .setName(key) + .setValue(value) + ) + ); request.topics().add(topic); ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); ControllerResult result = @@ -408,6 +424,28 @@ void registerBrokers(Integer... brokerIds) { registerBrokersWithDirs(brokersAndDirs); } + void registerBrokersWithRacks(Object... brokerIdsAndRacks) { + if (brokerIdsAndRacks.length % 2 != 0) { + throw new IllegalArgumentException("uneven number of arguments"); + } + for (int i = 0; i < brokerIdsAndRacks.length / 2; i++) { + int brokerId = (int) brokerIdsAndRacks[i * 2]; + String rackId = (String) brokerIdsAndRacks[i * 2 + 1]; + List logDirs = List.of( + Uuid.fromString("TESTBROKER" + Integer.toString(100000 + brokerId).substring(1) + "DIRAAAA") + ); + RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord(). + setBrokerEpoch(defaultBrokerEpoch(brokerId)).setBrokerId(brokerId). + setRack(rackId).setLogDirs(logDirs); + brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint(). + setSecurityProtocol(SecurityProtocol.PLAINTEXT.id). + setPort((short) 9092 + brokerId). + setName("PLAINTEXT"). + setHost("localhost")); + replay(List.of(new ApiMessageAndVersion(brokerRecord, (short) 3))); + } + } + @SuppressWarnings("unchecked") void registerBrokersWithDirs(Object... brokerIdsAndDirs) { if (brokerIdsAndDirs.length % 2 != 0) { @@ -701,346 +739,6 @@ public void testCreateTopics() { assertEquals(expectedResponse4, result4.response()); } - - @ParameterizedTest - @CsvSource({ - "false,false", - "false,", - "true,false", - }) - public void testNotCreateDisklessTopic(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDefaultDisklessEnable(logDisklessEnableServerConfig) - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given a request to create a kafka topic with diskless disabled - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - if (disklessEnableTopicConfig != null) { - creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue(disklessEnableTopicConfig)); - } - - request.topics().add(new CreatableTopic().setName("foo"). - setNumPartitions(-1).setReplicationFactor((short) -1) - .setConfigs(creatableTopicConfigs)); - - // Given all brokers unfenced - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - - // When creating a topic with diskless enabled - ControllerResult result = - replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); - // Then the topic creation should succeed, regardless of the RF - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). - setNumPartitions(1).setReplicationFactor((short) 3). - setErrorMessage(null).setErrorCode((short) 0). - setTopicId(result.response().topics().find("foo").topicId())); - assertEquals(expectedResponse, withoutConfigs(result.response())); - final List disklessConfigRecords = result.records().stream() - .filter(m -> m.message() instanceof ConfigRecord) - .map(m -> (ConfigRecord) m.message()) - .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) - .toList(); - assertEquals(1, disklessConfigRecords.size()); - // Then always diskless is disabled - assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); - - // Given the topic is registered - ctx.replay(result.records()); - assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 0}). - setDirectories(new Uuid[] { - Uuid.fromString("TESTBROKER00001DIRAAAA"), - Uuid.fromString("TESTBROKER00002DIRAAAA"), - Uuid.fromString("TESTBROKER00000DIRAAAA") - }). - setIsr(new int[] {1, 2, 0}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(), - replicationControl.getPartition( - ((TopicRecord) result.records().get(0).message()).topicId(), 0)); - - // When creating a topic with diskless enabled and already exists - ControllerResult result4 = - replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); - CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse4.topics().add(new CreatableTopicResult().setName("foo"). - setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). - setErrorMessage("Topic 'foo' already exists.")); - assertEquals(expectedResponse4, result4.response()); - } - - @ParameterizedTest - @CsvSource({ - "true,true", - "true,", - "false,true", - }) - public void testCreateDisklessTopic(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDefaultDisklessEnable(logDisklessEnableServerConfig) - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given a request to create a kafka topic with diskless enabled - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - if (disklessEnableTopicConfig != null) { - creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue(disklessEnableTopicConfig)); - } - request.topics().add(new CreatableTopic().setName("foo"). - setNumPartitions(-1).setReplicationFactor((short) -1) - .setConfigs(creatableTopicConfigs)); - - // When creating a topic without brokers available - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ControllerResult result = - replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should fail with BROKER_NOT_AVAILABLE error - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). - setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()). - setErrorMessage("No brokers available to create diskless topic.")); - assertEquals(expectedResponse, withoutConfigs(result.response())); - - // Given brokers are registered - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0); - ctx.inControlledShutdownBrokers(0); - - // When creating a topic with diskless enabled - ControllerResult result2 = - replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should succeed, regardless of fenced brokers - CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); - expectedResponse2.topics().add(new CreatableTopicResult().setName("foo"). - setNumPartitions(1).setReplicationFactor((short) 1). - setErrorMessage(null).setErrorCode((short) 0). - setTopicId(result2.response().topics().find("foo").topicId())); - CreateTopicsResponseData response = result2.response(); - assertEquals(expectedResponse2, withoutConfigs(response)); - - // Given all brokers unfenced - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - - // When creating a topic with diskless enabled - ControllerResult result3 = - replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should succeed, regardless of the RF - CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); - expectedResponse3.topics().add(new CreatableTopicResult().setName("foo"). - setNumPartitions(1).setReplicationFactor((short) 1). - setErrorMessage(null).setErrorCode((short) 0). - setTopicId(result3.response().topics().find("foo").topicId())); - assertEquals(expectedResponse3, withoutConfigs(result3.response())); - final List disklessConfigRecords = result3.records().stream() - .filter(m -> m.message() instanceof ConfigRecord) - .map(m -> (ConfigRecord) m.message()) - .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) - .toList(); - assertEquals(1, disklessConfigRecords.size()); - // Then diskless is always enabled - assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true"))); - - // Given the topic is registered - ctx.replay(result3.records()); - assertEquals( - new PartitionRegistration.Builder().setReplicas(new int[] {0}). - setDirectories(new Uuid[] { - Uuid.fromString("TESTBROKER00000DIRAAAA"), - }). - setIsr(new int[] {0}) - .setLeader(0) - .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) - .setLeaderEpoch(0) - .setPartitionEpoch(0) - .build(), - replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0)); - - // When creating a topic with diskless enabled and already exists - ControllerResult result4 = - replicationControl.createTopics(requestContext, request, Set.of("foo")); - CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse4.topics().add(new CreatableTopicResult().setName("foo"). - setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). - setErrorMessage("Topic 'foo' already exists.")); - assertEquals(expectedResponse4, result4.response()); - } - - @ParameterizedTest - @CsvSource({ - "1, -2, INVALID_REPLICATION_FACTOR", - "1, 0, INVALID_REPLICATION_FACTOR", - "1, 2, INVALID_REPLICATION_FACTOR", - "-2, 1, INVALID_PARTITIONS", - "0, 1, INVALID_PARTITIONS", - }) - public void testCreateDisklessTopicWithInvalidInput(int numPartitions, short replicationFactor, String expectedError) { - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - - CreateTopicsRequestData.CreatableTopicConfigCollection disklessConfig = - new CreateTopicsRequestData.CreatableTopicConfigCollection(); - disklessConfig.add( - new CreateTopicsRequestData.CreatableTopicConfig() - .setName("diskless.enable") - .setValue("true") - ); - - CreateTopicsRequestData request1 = new CreateTopicsRequestData(); - request1.topics().add(new CreatableTopic().setName("baz") - .setNumPartitions(numPartitions).setReplicationFactor(replicationFactor) - .setConfigs(disklessConfig)); - - ControllerResult result1 = - replicationControl.createTopics(requestContext, request1, Set.of("baz")); - assertEquals(Errors.valueOf(expectedError).code(), result1.response().topics().find("baz").errorCode()); - assertEquals(List.of(), result1.records()); - } - - @ParameterizedTest - @CsvSource({ - "true,false", - "true," - // "false,true", // This case is not valid because no internal topic should be explicitly created with diskless enabled. Tested in another case - }) - public void testCreateInternalTopicWithDisklessEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { - // Given a setup with diskless defined at the server level - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDefaultDisklessEnable(logDisklessEnableServerConfig) - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given an internal kafka topic with diskless enabled - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - if (disklessEnableTopicConfig != null) { - // If the diskless enable config is set, it should be added to the topic configs - creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue(disklessEnableTopicConfig)); - } - final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; - request.topics().add(new CreatableTopic().setName(internalTopic). - setNumPartitions(-1).setReplicationFactor((short) -1) - .setConfigs(creatableTopicConfigs)); - // Given all brokers unfenced - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - // When creating an internal topic with diskless enabled, disable it - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ControllerResult result = - replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse.topics().add( - new CreatableTopicResult() - .setName(internalTopic) - .setNumPartitions(1) - .setReplicationFactor((short) 3) - .setErrorMessage(null).setErrorCode((short) 0) - .setTopicId(result.response().topics().find(internalTopic).topicId())); - assertEquals(expectedResponse, withoutConfigs(result.response())); - assertTrue(result.response().topics().find(internalTopic) - .configs() - .stream() - .noneMatch(c -> c.name().equals(DISKLESS_ENABLE_CONFIG))); - final List disklessConfigRecords = result.records().stream() - .filter(m -> m.message() instanceof ConfigRecord) - .map(m -> (ConfigRecord) m.message()) - .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) - .toList(); - // Then always diskless is disabled - assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); - } - - @Test - public void testInvalidDisklessTopicCreationForInternalTopics() { - // Given a setup with diskless defined at the server level - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given an internal kafka topic with diskless enabled - final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue("true")); - request.topics().add(new CreatableTopic().setName(internalTopic). - setConfigs(topicConfigs)); - // Given all brokers unfenced - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - // When creating an internal topic with diskless enabled, disable it - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ControllerResult result = - replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse.topics().add( - new CreatableTopicResult() - .setName(internalTopic) - .setErrorCode(Errors.INVALID_REQUEST.code()) - .setErrorMessage("Internal topics cannot be diskless topics.")); - assertEquals(expectedResponse, withoutConfigs(result.response())); - } - - @ParameterizedTest - @CsvSource({ - "false,true", - "true," - }) - public void testInvalidDisklessTopicCreationWithoutSystemEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { - // Given a setup with diskless defined at the server level - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDisklessStorageSystemEnabled(false) - .setDefaultDisklessEnable(logDisklessEnableServerConfig) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given an internal kafka topic with diskless enabled - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - if (disklessEnableTopicConfig != null) { - // If the diskless enable config is set, it should be added to the topic configs - topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue(disklessEnableTopicConfig)); - } - final String topicName = "foo"; - request.topics().add(new CreatableTopic().setName(topicName).setConfigs(topicConfigs)); - // Given all brokers unfenced - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - // When creating an internal topic with diskless enabled, disable it - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ControllerResult result = - replicationControl.createTopics(requestContext, request, Set.of(topicName)); - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse.topics().add( - new CreatableTopicResult() - .setName(topicName) - .setErrorCode(Errors.INVALID_REQUEST.code()) - .setErrorMessage("Cannot create diskless topics when the diskless storage system is disabled. Please enable the diskless storage system to create diskless topics.")); - assertEquals(expectedResponse, withoutConfigs(result.response())); - } - @Test public void testCreateTopicsWithMutationQuotaExceeded() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); @@ -2409,65 +2107,6 @@ public void testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition( assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); } - @ParameterizedTest - @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) - public void testReassignPartitionsDiskless(short version) { - MetadataVersion metadataVersion = MetadataVersion.latestTesting(); - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setMetadataVersion(metadataVersion) - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replication = ctx.replicationControl; - ctx.registerBrokers(0, 1); - ctx.unfenceBrokers(0, 1); - String topic = "foo"; - ctx.createTestTopic(topic, new int[][] {new int[] {0}}, Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0).topicId(); - - // No change in the replication factor. - ControllerResult alterResult1 = - replication.alterPartitionReassignments( - new AlterPartitionReassignmentsRequestData().setTopics(List.of( - new ReassignableTopic().setName(topic).setPartitions(List.of( - new ReassignablePartition().setPartitionIndex(0). - setReplicas(List.of(1))))))); - assertEquals(new AlterPartitionReassignmentsResponseData(). - setErrorMessage(null).setResponses(List.of( - new ReassignableTopicResponse().setName(topic).setPartitions(List.of( - new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), - alterResult1.response()); - ctx.replay(alterResult1.records()); - ListPartitionReassignmentsResponseData currentReassigning = - new ListPartitionReassignmentsResponseData().setErrorMessage(null). - setTopics(List.of(new OngoingTopicReassignment(). - setName(topic).setPartitions(List.of( - new OngoingPartitionReassignment().setPartitionIndex(0). - setRemovingReplicas(List.of(0)). - setAddingReplicas(List.of(1)). - setReplicas(List.of(1, 0)))))); - assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( - new ListPartitionReassignmentsTopics().setName(topic). - setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); - - // Try to increase the replication factor. - ControllerResult alterResult2 = - replication.alterPartitionReassignments( - new AlterPartitionReassignmentsRequestData().setTopics(List.of( - new ReassignableTopic().setName(topic).setPartitions(List.of( - new ReassignablePartition().setPartitionIndex(0). - setReplicas(List.of(0, 1))))))); - assertEquals(new AlterPartitionReassignmentsResponseData(). - setErrorMessage(null).setResponses(List.of( - new ReassignableTopicResponse().setName(topic).setPartitions(List.of( - new ReassignablePartitionResponse().setPartitionIndex(0) - .setErrorCode(INVALID_REPLICATION_FACTOR.code()) - .setErrorMessage("The replication factor is changed from 1 to 2"))))), - alterResult2.response()); - ctx.replay(alterResult2.records()); - assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( - new ListPartitionReassignmentsTopics().setName(topic). - setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); - } - @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) public void testAlterPartitionShouldRejectFencedBrokers(short version) { @@ -3858,4 +3497,683 @@ void testElrsRemovedShouldNotBumpPartitionEpochIfNoChange() { ctx.replay(List.of(new ApiMessageAndVersion(new ClearElrRecord(), CLEAR_ELR_RECORD.highestSupportedVersion()))); assertEquals(partitionEpoch, ctx.replicationControl.getPartition(fooId, 0).partitionEpoch); } + + @Nested + // Tests Diskless single/unmanaged replica approach where a single replica is registered on KRaft but it's effectively ignored. + class DisklessUnmanagedReplicaTests { + @ParameterizedTest + @CsvSource({ + "false,false", + "false,", + "true,false", + }) + public void testNotCreateDisklessTopic(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given a request to create a kafka topic with diskless disabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + + request.topics().add(new CreatableTopic() + .setName("foo") + .setNumPartitions(-1) + .setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + + // Given all brokers unfenced + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // When creating a topic with diskless enabled + ControllerResult result = + replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); + // Then the topic creation should succeed, regardless of the RF + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + expectedResponse.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 3) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result.response().topics().find("foo").topicId())); + assertEquals(expectedResponse, withoutConfigs(result.response())); + final List disklessConfigRecords = result.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + assertEquals(1, disklessConfigRecords.size()); + // Then always diskless is disabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); + + // Given the topic is registered + ctx.replay(result.records()); + assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 0}) + .setDirectories(new Uuid[] { + Uuid.fromString("TESTBROKER00001DIRAAAA"), + Uuid.fromString("TESTBROKER00002DIRAAAA"), + Uuid.fromString("TESTBROKER00000DIRAAAA") + }) + .setIsr(new int[] {1, 2, 0}) + .setLeader(1) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .build(), + replicationControl.getPartition(((TopicRecord) result.records().get(0).message()).topicId(), 0)); + + // When creating a topic with diskless enabled and already exists + ControllerResult result1 = + replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); + CreateTopicsResponseData expectedResponse1 = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse1.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()) + .setErrorMessage("Topic 'foo' already exists.")); + assertEquals(expectedResponse1, result1.response()); + } + + @ParameterizedTest + @CsvSource({ + "true,true", + "true,", + "false,true", + }) + public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given a request to create a kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + request.topics().add(new CreatableTopic() + .setName("foo") + .setNumPartitions(-1) + .setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + + // When creating a topic without brokers available + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should fail with BROKER_NOT_AVAILABLE error + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + expectedResponse.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()) + .setErrorMessage("No brokers available to create diskless topic.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + + // Given brokers are registered + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0); + + // When creating a topic with diskless enabled + ControllerResult result2 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of fenced brokers + CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); + expectedResponse2.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 1) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result2.response().topics().find("foo").topicId())); + CreateTopicsResponseData response = result2.response(); + assertEquals(expectedResponse2, withoutConfigs(response)); + + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // When creating a topic with diskless enabled + ControllerResult result3 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of the RF + CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); + expectedResponse3.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 1) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result3.response().topics().find("foo").topicId())); + assertEquals(expectedResponse3, withoutConfigs(result3.response())); + final List disklessConfigRecords = result3.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + assertEquals(1, disklessConfigRecords.size()); + // Then diskless is always enabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true"))); + + // Given the topic is registered + ctx.replay(result3.records()); + assertEquals( + new PartitionRegistration.Builder().setReplicas(new int[] {0}) + .setDirectories(new Uuid[] { + Uuid.fromString("TESTBROKER00000DIRAAAA"), + }) + .setIsr(new int[] {0}) + .setLeader(0) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .build(), + replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0)); + + // When creating a topic with diskless enabled and already exists + ControllerResult result4 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse4.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()) + .setErrorMessage("Topic 'foo' already exists.")); + assertEquals(expectedResponse4, result4.response()); + } + + @ParameterizedTest + @CsvSource({ + "true,true", + "true,", + "false,true", + }) + public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given a request to create a kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + request.topics().add(new CreatableTopic() + .setName("foo") + .setNumPartitions(-1) + .setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + + // When creating a topic without brokers available + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should fail with BROKER_NOT_AVAILABLE error + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + expectedResponse.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()) + .setErrorMessage("No brokers available to create diskless topic.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + + // Given brokers are registered + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0); + + // When creating a topic with diskless enabled + ControllerResult result2 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of fenced brokers + CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); + expectedResponse2.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 1) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result2.response().topics().find("foo").topicId())); + CreateTopicsResponseData response = result2.response(); + assertEquals(expectedResponse2, withoutConfigs(response)); + + // Given all brokers unfenced + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + // When creating a topic with diskless enabled + ControllerResult result3 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of the RF + CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); + expectedResponse3.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 1) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result3.response().topics().find("foo").topicId())); + assertEquals(expectedResponse3, withoutConfigs(result3.response())); + final List disklessConfigRecords = result3.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + assertEquals(1, disklessConfigRecords.size()); + // Then diskless is always enabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true"))); + + // Given the topic is registered + ctx.replay(result3.records()); + assertEquals( + new PartitionRegistration.Builder().setReplicas(new int[] {0}) + .setDirectories(new Uuid[] { + Uuid.fromString("TESTBROKER00000DIRAAAA"), + }) + .setIsr(new int[] {0}) + .setLeader(0) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .build(), + replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0)); + + // When creating a topic with diskless enabled and already exists + ControllerResult result4 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse4.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()) + .setErrorMessage("Topic 'foo' already exists.")); + assertEquals(expectedResponse4, result4.response()); + } + + @ParameterizedTest + @CsvSource({ + "1, -2, INVALID_REPLICATION_FACTOR", + "1, 0, INVALID_REPLICATION_FACTOR", + "1, 2, INVALID_REPLICATION_FACTOR", + "-2, 1, INVALID_PARTITIONS", + "0, 1, INVALID_PARTITIONS", + }) + public void testCreateDisklessTopicWithInvalidInput(int numPartitions, short replicationFactor, String expectedError) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + + CreateTopicsRequestData.CreatableTopicConfigCollection disklessConfig = + new CreateTopicsRequestData.CreatableTopicConfigCollection(); + disklessConfig.add( + new CreateTopicsRequestData.CreatableTopicConfig() + .setName("diskless.enable") + .setValue("true") + ); + + CreateTopicsRequestData request1 = new CreateTopicsRequestData(); + request1.topics().add(new CreatableTopic().setName("baz") + .setNumPartitions(numPartitions).setReplicationFactor(replicationFactor) + .setConfigs(disklessConfig)); + + ControllerResult result1 = + replicationControl.createTopics(requestContext, request1, Set.of("baz")); + assertEquals(Errors.valueOf(expectedError).code(), result1.response().topics().find("baz").errorCode()); + assertEquals(List.of(), result1.records()); + } + + @ParameterizedTest + @CsvSource({ + "true,false", + "true," + // This case is not valid because no internal topic should be explicitly created with diskless enabled. + // Tested in testInvalidDisklessTopicCreationForInternalTopics + // "false,true", + }) + public void testCreateInternalTopicWithDisklessEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + // Given a setup with diskless defined at the server level + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given an internal kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + // If the diskless enable config is set, it should be added to the topic configs + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; + request.topics().add(new CreatableTopic().setName(internalTopic). + setNumPartitions(-1).setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // When creating an internal topic with diskless enabled, disable it + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse.topics().add( + new CreatableTopicResult() + .setName(internalTopic) + .setNumPartitions(1) + .setReplicationFactor((short) 3) + .setErrorMessage(null).setErrorCode((short) 0) + .setTopicId(result.response().topics().find(internalTopic).topicId())); + assertEquals(expectedResponse, withoutConfigs(result.response())); + assertTrue(result.response().topics().find(internalTopic) + .configs() + .stream() + .noneMatch(c -> c.name().equals(DISKLESS_ENABLE_CONFIG))); + final List disklessConfigRecords = result.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + // Then always diskless is disabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); + } + + @Test + public void testInvalidDisklessTopicCreationForInternalTopics() { + // Given a setup with diskless defined at the server level + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given an internal kafka topic with diskless enabled + final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue("true")); + request.topics().add(new CreatableTopic().setName(internalTopic).setConfigs(topicConfigs)); + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // When creating an internal topic with diskless enabled, disable it + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse.topics().add( + new CreatableTopicResult() + .setName(internalTopic) + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Internal topics cannot be diskless topics.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + } + + @ParameterizedTest + @CsvSource({ + "false,true", + "true," + }) + public void testInvalidDisklessTopicCreationWithoutSystemEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + // Given a setup with diskless defined at the server level + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(false) + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given an internal kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + // If the diskless enable config is set, it should be added to the topic configs + topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + final String topicName = "foo"; + request.topics().add(new CreatableTopic().setName(topicName).setConfigs(topicConfigs)); + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // When creating an internal topic with diskless enabled, disable it + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of(topicName)); + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse.topics().add( + new CreatableTopicResult() + .setName(topicName) + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Cannot create diskless topics when the diskless storage system is disabled. Please enable the diskless storage system to create diskless topics.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + } + + @Test + public void testReassignDisklessPartitions() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1); + ctx.unfenceBrokers(0, 1); + + String topic = "foo"; + ctx.createTestTopic( + topic, + 1, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + + // No change in the replication factor. + ControllerResult alterResult1 = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(List.of( + new ReassignableTopic().setName(topic).setPartitions(List.of( + new ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(1))))))); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setResponses(List.of( + new ReassignableTopicResponse().setName(topic).setPartitions(List.of( + new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), + alterResult1.response()); + + ctx.replay(alterResult1.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(List.of(new OngoingTopicReassignment().setName(topic).setPartitions(List.of( + new OngoingPartitionReassignment().setPartitionIndex(0) + .setRemovingReplicas(List.of(0)) + .setAddingReplicas(List.of(1)) + .setReplicas(List.of(1, 0)))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( + new ListPartitionReassignmentsTopics().setName(topic). + setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); + + // Try to increase the replication factor. + ControllerResult alterResult2 = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(List.of( + new ReassignableTopic().setName(topic).setPartitions(List.of( + new ReassignablePartition().setPartitionIndex(0) + .setReplicas(List.of(0, 1))))))); + assertEquals(new AlterPartitionReassignmentsResponseData() + .setErrorMessage(null).setResponses(List.of( + new ReassignableTopicResponse().setName(topic).setPartitions(List.of( + new ReassignablePartitionResponse().setPartitionIndex(0) + .setErrorCode(INVALID_REPLICATION_FACTOR.code()) + .setErrorMessage("The replication factor is changed from 1 to 2"))))), + alterResult2.response()); + ctx.replay(alterResult2.records()); + assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( + new ListPartitionReassignmentsTopics().setName(topic) + .setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); + } + + @Test + public void testNoLeaderElectionOnBrokerFenced() { + // As there are no replicas to elect from, the leader should go offline but no new leader should be elected. + // Currently, diskless topics ignored replica management. It registers a single replica as the leader, but it's not maintained. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + 1, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerFenced(0, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after leader fencing"); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there was only one leader"); + assertEquals(-1, partition.leader, "Leader should be offline after fencing"); + } + + @Test + public void testNoReplicaChangeOnShutdown() { + // As there are no replicas to elect from, the leader should go offline but no new leader should be elected. + // Currently, diskless topics ignored replica management. It registers a single replica as the leader, but it's not maintained. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + 1, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerShutdown(0, true, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after leader fencing"); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there was only one leader"); + assertEquals(-1, partition.leader, "Leader should be offline after fencing"); + } + + @Test + void testDisklessMarksLeaderOfflineOnUnregister_noRacks() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + final int numPartitions = 6; + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + numPartitions, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerUnregistered(0, 100, records); + ctx.replay(records); + + // All partitions should remain present and keep the original replica/ISR, + // only the leader should be marked offline. + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + PartitionRegistration partition = replication.getPartition(topicId, partitionId); + assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration"); + assertArrayEquals(new int[]{0}, partition.replicas, "Replicas should stay unchanged for partition " + partitionId); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should stay unchanged for partition " + partitionId); + assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId); + } + } + + @Test + void testDisklessMarksLeaderOfflineOnUnregister_withRacks() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + final int numPartitions = 6; + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + numPartitions, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerUnregistered(0, 100, records); + ctx.replay(records); + + // All partitions should remain present and keep the original replica/ISR, + // only the leader should be marked offline. + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + PartitionRegistration partition = replication.getPartition(topicId, partitionId); + assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration"); + assertArrayEquals(new int[]{0}, partition.replicas, "Replicas should stay unchanged for partition " + partitionId); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should stay unchanged for partition " + partitionId); + assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId); + } + } + + @Test + void testManualReplicaAssignmentsShouldBeRejected() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Expectation: providing manual replica assignments for a diskless topic should be rejected. + ctx.createTestTopic( + "foo", + new int[][] {new int[] {0, 1}, new int[] {1, 2}}, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + INVALID_REQUEST.code() + ); + } + } + }