From a9d5e54b4187fe6f60c3734e990974daa4b33759 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 19 Jan 2026 18:28:56 +0200 Subject: [PATCH 1/3] chore(inkless:build): add checkstyle to fmt make target --- Makefile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index eda118f825..8e373e82f1 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,9 @@ docs: .PHONY: fmt fmt: - ./gradlew :core:spotlessJavaApply :metadata:spotlessJavaApply :storage:spotlessJavaApply :storage:inkless:spotlessJavaApply + ./gradlew \ + :core:checkstyleMain :core:checkstyleTest :metadata:checkstyleMain :metadata:checkstyleTest :storage:checkstyleMain :storage:checkstyleTest \ + :core:spotlessJavaApply :metadata:spotlessJavaApply :storage:spotlessJavaApply :storage:inkless:spotlessJavaApply .PHONY: test test: From 8d1c2131c0de6e0145e9d5da9ea968dfcaa0c448 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 19 Jan 2026 18:28:25 +0200 Subject: [PATCH 2/3] chore(metadata:diskless): move diskless-related tests to nested legacy class For easier testing, moving existing tests to a nested class acknowledging the current approach as legacy. This will allow testing existing and new approach correctly. --- .../ReplicationControlManagerTest.java | 1094 +++++++++++------ 1 file changed, 695 insertions(+), 399 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index a114bccffc..0f738b074c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -102,6 +102,7 @@ import org.apache.kafka.server.util.MockRandom; import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -306,9 +307,23 @@ CreatableTopicResult createTestTopic(String name, int numPartitions, short replicationFactor, short expectedErrorCode) { + return createTestTopic(name, numPartitions, replicationFactor, Map.of(), expectedErrorCode); + } + + CreatableTopicResult createTestTopic(String name, + int numPartitions, + short replicationFactor, + Map configs, + short expectedErrorCode) { CreateTopicsRequestData request = new CreateTopicsRequestData(); CreatableTopic topic = new CreatableTopic().setName(name); topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor); + configs.forEach((key, value) -> topic.configs().add( + new CreateTopicsRequestData.CreatableTopicConfig() + .setName(key) + .setValue(value) + ) + ); request.topics().add(topic); ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); ControllerResult result = @@ -408,6 +423,28 @@ void registerBrokers(Integer... brokerIds) { registerBrokersWithDirs(brokersAndDirs); } + void registerBrokersWithRacks(Object... brokerIdsAndRacks) { + if (brokerIdsAndRacks.length % 2 != 0) { + throw new IllegalArgumentException("uneven number of arguments"); + } + for (int i = 0; i < brokerIdsAndRacks.length / 2; i++) { + int brokerId = (int) brokerIdsAndRacks[i * 2]; + String rackId = (String) brokerIdsAndRacks[i * 2 + 1]; + List logDirs = List.of( + Uuid.fromString("TESTBROKER" + Integer.toString(100000 + brokerId).substring(1) + "DIRAAAA") + ); + RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord(). + setBrokerEpoch(defaultBrokerEpoch(brokerId)).setBrokerId(brokerId). + setRack(rackId).setLogDirs(logDirs); + brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint(). + setSecurityProtocol(SecurityProtocol.PLAINTEXT.id). + setPort((short) 9092 + brokerId). + setName("PLAINTEXT"). + setHost("localhost")); + replay(List.of(new ApiMessageAndVersion(brokerRecord, (short) 3))); + } + } + @SuppressWarnings("unchecked") void registerBrokersWithDirs(Object... brokerIdsAndDirs) { if (brokerIdsAndDirs.length % 2 != 0) { @@ -701,346 +738,6 @@ public void testCreateTopics() { assertEquals(expectedResponse4, result4.response()); } - - @ParameterizedTest - @CsvSource({ - "false,false", - "false,", - "true,false", - }) - public void testNotCreateDisklessTopic(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDefaultDisklessEnable(logDisklessEnableServerConfig) - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given a request to create a kafka topic with diskless disabled - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - if (disklessEnableTopicConfig != null) { - creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue(disklessEnableTopicConfig)); - } - - request.topics().add(new CreatableTopic().setName("foo"). - setNumPartitions(-1).setReplicationFactor((short) -1) - .setConfigs(creatableTopicConfigs)); - - // Given all brokers unfenced - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - - // When creating a topic with diskless enabled - ControllerResult result = - replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); - // Then the topic creation should succeed, regardless of the RF - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). - setNumPartitions(1).setReplicationFactor((short) 3). - setErrorMessage(null).setErrorCode((short) 0). - setTopicId(result.response().topics().find("foo").topicId())); - assertEquals(expectedResponse, withoutConfigs(result.response())); - final List disklessConfigRecords = result.records().stream() - .filter(m -> m.message() instanceof ConfigRecord) - .map(m -> (ConfigRecord) m.message()) - .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) - .toList(); - assertEquals(1, disklessConfigRecords.size()); - // Then always diskless is disabled - assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); - - // Given the topic is registered - ctx.replay(result.records()); - assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 0}). - setDirectories(new Uuid[] { - Uuid.fromString("TESTBROKER00001DIRAAAA"), - Uuid.fromString("TESTBROKER00002DIRAAAA"), - Uuid.fromString("TESTBROKER00000DIRAAAA") - }). - setIsr(new int[] {1, 2, 0}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(), - replicationControl.getPartition( - ((TopicRecord) result.records().get(0).message()).topicId(), 0)); - - // When creating a topic with diskless enabled and already exists - ControllerResult result4 = - replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); - CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse4.topics().add(new CreatableTopicResult().setName("foo"). - setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). - setErrorMessage("Topic 'foo' already exists.")); - assertEquals(expectedResponse4, result4.response()); - } - - @ParameterizedTest - @CsvSource({ - "true,true", - "true,", - "false,true", - }) - public void testCreateDisklessTopic(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDefaultDisklessEnable(logDisklessEnableServerConfig) - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given a request to create a kafka topic with diskless enabled - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - if (disklessEnableTopicConfig != null) { - creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue(disklessEnableTopicConfig)); - } - request.topics().add(new CreatableTopic().setName("foo"). - setNumPartitions(-1).setReplicationFactor((short) -1) - .setConfigs(creatableTopicConfigs)); - - // When creating a topic without brokers available - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ControllerResult result = - replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should fail with BROKER_NOT_AVAILABLE error - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). - setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()). - setErrorMessage("No brokers available to create diskless topic.")); - assertEquals(expectedResponse, withoutConfigs(result.response())); - - // Given brokers are registered - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0); - ctx.inControlledShutdownBrokers(0); - - // When creating a topic with diskless enabled - ControllerResult result2 = - replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should succeed, regardless of fenced brokers - CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); - expectedResponse2.topics().add(new CreatableTopicResult().setName("foo"). - setNumPartitions(1).setReplicationFactor((short) 1). - setErrorMessage(null).setErrorCode((short) 0). - setTopicId(result2.response().topics().find("foo").topicId())); - CreateTopicsResponseData response = result2.response(); - assertEquals(expectedResponse2, withoutConfigs(response)); - - // Given all brokers unfenced - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - - // When creating a topic with diskless enabled - ControllerResult result3 = - replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should succeed, regardless of the RF - CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); - expectedResponse3.topics().add(new CreatableTopicResult().setName("foo"). - setNumPartitions(1).setReplicationFactor((short) 1). - setErrorMessage(null).setErrorCode((short) 0). - setTopicId(result3.response().topics().find("foo").topicId())); - assertEquals(expectedResponse3, withoutConfigs(result3.response())); - final List disklessConfigRecords = result3.records().stream() - .filter(m -> m.message() instanceof ConfigRecord) - .map(m -> (ConfigRecord) m.message()) - .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) - .toList(); - assertEquals(1, disklessConfigRecords.size()); - // Then diskless is always enabled - assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true"))); - - // Given the topic is registered - ctx.replay(result3.records()); - assertEquals( - new PartitionRegistration.Builder().setReplicas(new int[] {0}). - setDirectories(new Uuid[] { - Uuid.fromString("TESTBROKER00000DIRAAAA"), - }). - setIsr(new int[] {0}) - .setLeader(0) - .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) - .setLeaderEpoch(0) - .setPartitionEpoch(0) - .build(), - replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0)); - - // When creating a topic with diskless enabled and already exists - ControllerResult result4 = - replicationControl.createTopics(requestContext, request, Set.of("foo")); - CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse4.topics().add(new CreatableTopicResult().setName("foo"). - setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). - setErrorMessage("Topic 'foo' already exists.")); - assertEquals(expectedResponse4, result4.response()); - } - - @ParameterizedTest - @CsvSource({ - "1, -2, INVALID_REPLICATION_FACTOR", - "1, 0, INVALID_REPLICATION_FACTOR", - "1, 2, INVALID_REPLICATION_FACTOR", - "-2, 1, INVALID_PARTITIONS", - "0, 1, INVALID_PARTITIONS", - }) - public void testCreateDisklessTopicWithInvalidInput(int numPartitions, short replicationFactor, String expectedError) { - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - - CreateTopicsRequestData.CreatableTopicConfigCollection disklessConfig = - new CreateTopicsRequestData.CreatableTopicConfigCollection(); - disklessConfig.add( - new CreateTopicsRequestData.CreatableTopicConfig() - .setName("diskless.enable") - .setValue("true") - ); - - CreateTopicsRequestData request1 = new CreateTopicsRequestData(); - request1.topics().add(new CreatableTopic().setName("baz") - .setNumPartitions(numPartitions).setReplicationFactor(replicationFactor) - .setConfigs(disklessConfig)); - - ControllerResult result1 = - replicationControl.createTopics(requestContext, request1, Set.of("baz")); - assertEquals(Errors.valueOf(expectedError).code(), result1.response().topics().find("baz").errorCode()); - assertEquals(List.of(), result1.records()); - } - - @ParameterizedTest - @CsvSource({ - "true,false", - "true," - // "false,true", // This case is not valid because no internal topic should be explicitly created with diskless enabled. Tested in another case - }) - public void testCreateInternalTopicWithDisklessEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { - // Given a setup with diskless defined at the server level - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDefaultDisklessEnable(logDisklessEnableServerConfig) - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given an internal kafka topic with diskless enabled - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - if (disklessEnableTopicConfig != null) { - // If the diskless enable config is set, it should be added to the topic configs - creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue(disklessEnableTopicConfig)); - } - final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; - request.topics().add(new CreatableTopic().setName(internalTopic). - setNumPartitions(-1).setReplicationFactor((short) -1) - .setConfigs(creatableTopicConfigs)); - // Given all brokers unfenced - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - // When creating an internal topic with diskless enabled, disable it - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ControllerResult result = - replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse.topics().add( - new CreatableTopicResult() - .setName(internalTopic) - .setNumPartitions(1) - .setReplicationFactor((short) 3) - .setErrorMessage(null).setErrorCode((short) 0) - .setTopicId(result.response().topics().find(internalTopic).topicId())); - assertEquals(expectedResponse, withoutConfigs(result.response())); - assertTrue(result.response().topics().find(internalTopic) - .configs() - .stream() - .noneMatch(c -> c.name().equals(DISKLESS_ENABLE_CONFIG))); - final List disklessConfigRecords = result.records().stream() - .filter(m -> m.message() instanceof ConfigRecord) - .map(m -> (ConfigRecord) m.message()) - .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) - .toList(); - // Then always diskless is disabled - assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); - } - - @Test - public void testInvalidDisklessTopicCreationForInternalTopics() { - // Given a setup with diskless defined at the server level - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given an internal kafka topic with diskless enabled - final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue("true")); - request.topics().add(new CreatableTopic().setName(internalTopic). - setConfigs(topicConfigs)); - // Given all brokers unfenced - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - // When creating an internal topic with diskless enabled, disable it - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ControllerResult result = - replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse.topics().add( - new CreatableTopicResult() - .setName(internalTopic) - .setErrorCode(Errors.INVALID_REQUEST.code()) - .setErrorMessage("Internal topics cannot be diskless topics.")); - assertEquals(expectedResponse, withoutConfigs(result.response())); - } - - @ParameterizedTest - @CsvSource({ - "false,true", - "true," - }) - public void testInvalidDisklessTopicCreationWithoutSystemEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { - // Given a setup with diskless defined at the server level - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setDisklessStorageSystemEnabled(false) - .setDefaultDisklessEnable(logDisklessEnableServerConfig) - .build(); - ReplicationControlManager replicationControl = ctx.replicationControl; - // Given an internal kafka topic with diskless enabled - CreateTopicsRequestData request = new CreateTopicsRequestData(); - CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); - if (disklessEnableTopicConfig != null) { - // If the diskless enable config is set, it should be added to the topic configs - topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() - .setName(DISKLESS_ENABLE_CONFIG) - .setValue(disklessEnableTopicConfig)); - } - final String topicName = "foo"; - request.topics().add(new CreatableTopic().setName(topicName).setConfigs(topicConfigs)); - // Given all brokers unfenced - ctx.registerBrokers(0, 1, 2); - ctx.unfenceBrokers(0, 1, 2); - // When creating an internal topic with diskless enabled, disable it - ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); - ControllerResult result = - replicationControl.createTopics(requestContext, request, Set.of(topicName)); - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error - expectedResponse.topics().add( - new CreatableTopicResult() - .setName(topicName) - .setErrorCode(Errors.INVALID_REQUEST.code()) - .setErrorMessage("Cannot create diskless topics when the diskless storage system is disabled. Please enable the diskless storage system to create diskless topics.")); - assertEquals(expectedResponse, withoutConfigs(result.response())); - } - @Test public void testCreateTopicsWithMutationQuotaExceeded() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); @@ -2409,65 +2106,6 @@ public void testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition( assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); } - @ParameterizedTest - @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) - public void testReassignPartitionsDiskless(short version) { - MetadataVersion metadataVersion = MetadataVersion.latestTesting(); - ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() - .setMetadataVersion(metadataVersion) - .setDisklessStorageSystemEnabled(true) - .build(); - ReplicationControlManager replication = ctx.replicationControl; - ctx.registerBrokers(0, 1); - ctx.unfenceBrokers(0, 1); - String topic = "foo"; - ctx.createTestTopic(topic, new int[][] {new int[] {0}}, Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0).topicId(); - - // No change in the replication factor. - ControllerResult alterResult1 = - replication.alterPartitionReassignments( - new AlterPartitionReassignmentsRequestData().setTopics(List.of( - new ReassignableTopic().setName(topic).setPartitions(List.of( - new ReassignablePartition().setPartitionIndex(0). - setReplicas(List.of(1))))))); - assertEquals(new AlterPartitionReassignmentsResponseData(). - setErrorMessage(null).setResponses(List.of( - new ReassignableTopicResponse().setName(topic).setPartitions(List.of( - new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), - alterResult1.response()); - ctx.replay(alterResult1.records()); - ListPartitionReassignmentsResponseData currentReassigning = - new ListPartitionReassignmentsResponseData().setErrorMessage(null). - setTopics(List.of(new OngoingTopicReassignment(). - setName(topic).setPartitions(List.of( - new OngoingPartitionReassignment().setPartitionIndex(0). - setRemovingReplicas(List.of(0)). - setAddingReplicas(List.of(1)). - setReplicas(List.of(1, 0)))))); - assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( - new ListPartitionReassignmentsTopics().setName(topic). - setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); - - // Try to increase the replication factor. - ControllerResult alterResult2 = - replication.alterPartitionReassignments( - new AlterPartitionReassignmentsRequestData().setTopics(List.of( - new ReassignableTopic().setName(topic).setPartitions(List.of( - new ReassignablePartition().setPartitionIndex(0). - setReplicas(List.of(0, 1))))))); - assertEquals(new AlterPartitionReassignmentsResponseData(). - setErrorMessage(null).setResponses(List.of( - new ReassignableTopicResponse().setName(topic).setPartitions(List.of( - new ReassignablePartitionResponse().setPartitionIndex(0) - .setErrorCode(INVALID_REPLICATION_FACTOR.code()) - .setErrorMessage("The replication factor is changed from 1 to 2"))))), - alterResult2.response()); - ctx.replay(alterResult2.records()); - assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( - new ListPartitionReassignmentsTopics().setName(topic). - setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); - } - @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) public void testAlterPartitionShouldRejectFencedBrokers(short version) { @@ -3858,4 +3496,662 @@ void testElrsRemovedShouldNotBumpPartitionEpochIfNoChange() { ctx.replay(List.of(new ApiMessageAndVersion(new ClearElrRecord(), CLEAR_ELR_RECORD.highestSupportedVersion()))); assertEquals(partitionEpoch, ctx.replicationControl.getPartition(fooId, 0).partitionEpoch); } + + @Nested + // Tests Diskless single/unmanaged replica approach where a single replica is registered on KRaft but it's effectively ignored. + class DisklessUnmanagedReplicaTests { + @ParameterizedTest + @CsvSource({ + "false,false", + "false,", + "true,false", + }) + public void testNotCreateDisklessTopic(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given a request to create a kafka topic with diskless disabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + + request.topics().add(new CreatableTopic() + .setName("foo") + .setNumPartitions(-1) + .setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + + // Given all brokers unfenced + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // When creating a topic with diskless enabled + ControllerResult result = + replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); + // Then the topic creation should succeed, regardless of the RF + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + expectedResponse.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 3) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result.response().topics().find("foo").topicId())); + assertEquals(expectedResponse, withoutConfigs(result.response())); + final List disklessConfigRecords = result.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + assertEquals(1, disklessConfigRecords.size()); + // Then always diskless is disabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); + + // Given the topic is registered + ctx.replay(result.records()); + assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 0}) + .setDirectories(new Uuid[] { + Uuid.fromString("TESTBROKER00001DIRAAAA"), + Uuid.fromString("TESTBROKER00002DIRAAAA"), + Uuid.fromString("TESTBROKER00000DIRAAAA") + }) + .setIsr(new int[] {1, 2, 0}) + .setLeader(1) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .build(), + replicationControl.getPartition(((TopicRecord) result.records().get(0).message()).topicId(), 0)); + + // When creating a topic with diskless enabled and already exists + ControllerResult result1 = + replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); + CreateTopicsResponseData expectedResponse1 = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse1.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()) + .setErrorMessage("Topic 'foo' already exists.")); + assertEquals(expectedResponse1, result1.response()); + } + + @ParameterizedTest + @CsvSource({ + "true,true", + "true,", + "false,true", + }) + public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given a request to create a kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + request.topics().add(new CreatableTopic() + .setName("foo") + .setNumPartitions(-1) + .setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + + // When creating a topic without brokers available + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should fail with BROKER_NOT_AVAILABLE error + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + expectedResponse.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()) + .setErrorMessage("No brokers available to create diskless topic.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + + // Given brokers are registered + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0); + + // When creating a topic with diskless enabled + ControllerResult result2 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of fenced brokers + CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); + expectedResponse2.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 1) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result2.response().topics().find("foo").topicId())); + CreateTopicsResponseData response = result2.response(); + assertEquals(expectedResponse2, withoutConfigs(response)); + + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // When creating a topic with diskless enabled + ControllerResult result3 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of the RF + CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); + expectedResponse3.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 1) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result3.response().topics().find("foo").topicId())); + assertEquals(expectedResponse3, withoutConfigs(result3.response())); + final List disklessConfigRecords = result3.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + assertEquals(1, disklessConfigRecords.size()); + // Then diskless is always enabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true"))); + + // Given the topic is registered + ctx.replay(result3.records()); + assertEquals( + new PartitionRegistration.Builder().setReplicas(new int[] {0}) + .setDirectories(new Uuid[] { + Uuid.fromString("TESTBROKER00000DIRAAAA"), + }) + .setIsr(new int[] {0}) + .setLeader(0) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .build(), + replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0)); + + // When creating a topic with diskless enabled and already exists + ControllerResult result4 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse4.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()) + .setErrorMessage("Topic 'foo' already exists.")); + assertEquals(expectedResponse4, result4.response()); + } + + @ParameterizedTest + @CsvSource({ + "true,true", + "true,", + "false,true", + }) + public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given a request to create a kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + request.topics().add(new CreatableTopic() + .setName("foo") + .setNumPartitions(-1) + .setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + + // When creating a topic without brokers available + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should fail with BROKER_NOT_AVAILABLE error + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + expectedResponse.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()) + .setErrorMessage("No brokers available to create diskless topic.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + + // Given brokers are registered + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0); + + // When creating a topic with diskless enabled + ControllerResult result2 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of fenced brokers + CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); + expectedResponse2.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 1) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result2.response().topics().find("foo").topicId())); + CreateTopicsResponseData response = result2.response(); + assertEquals(expectedResponse2, withoutConfigs(response)); + + // Given all brokers unfenced + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + // When creating a topic with diskless enabled + ControllerResult result3 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of the RF + CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); + expectedResponse3.topics().add(new CreatableTopicResult() + .setName("foo") + .setNumPartitions(1) + .setReplicationFactor((short) 1) + .setErrorMessage(null) + .setErrorCode((short) 0) + .setTopicId(result3.response().topics().find("foo").topicId())); + assertEquals(expectedResponse3, withoutConfigs(result3.response())); + final List disklessConfigRecords = result3.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + assertEquals(1, disklessConfigRecords.size()); + // Then diskless is always enabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true"))); + + // Given the topic is registered + ctx.replay(result3.records()); + assertEquals( + new PartitionRegistration.Builder().setReplicas(new int[] {0}) + .setDirectories(new Uuid[] { + Uuid.fromString("TESTBROKER00000DIRAAAA"), + }) + .setIsr(new int[] {0}) + .setLeader(0) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .build(), + replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0)); + + // When creating a topic with diskless enabled and already exists + ControllerResult result4 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse4.topics().add(new CreatableTopicResult().setName("foo") + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()) + .setErrorMessage("Topic 'foo' already exists.")); + assertEquals(expectedResponse4, result4.response()); + } + + @ParameterizedTest + @CsvSource({ + "1, -2, INVALID_REPLICATION_FACTOR", + "1, 0, INVALID_REPLICATION_FACTOR", + "1, 2, INVALID_REPLICATION_FACTOR", + "-2, 1, INVALID_PARTITIONS", + "0, 1, INVALID_PARTITIONS", + }) + public void testCreateDisklessTopicWithInvalidInput(int numPartitions, short replicationFactor, String expectedError) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + + CreateTopicsRequestData.CreatableTopicConfigCollection disklessConfig = + new CreateTopicsRequestData.CreatableTopicConfigCollection(); + disklessConfig.add( + new CreateTopicsRequestData.CreatableTopicConfig() + .setName("diskless.enable") + .setValue("true") + ); + + CreateTopicsRequestData request1 = new CreateTopicsRequestData(); + request1.topics().add(new CreatableTopic().setName("baz") + .setNumPartitions(numPartitions).setReplicationFactor(replicationFactor) + .setConfigs(disklessConfig)); + + ControllerResult result1 = + replicationControl.createTopics(requestContext, request1, Set.of("baz")); + assertEquals(Errors.valueOf(expectedError).code(), result1.response().topics().find("baz").errorCode()); + assertEquals(List.of(), result1.records()); + } + + @ParameterizedTest + @CsvSource({ + "true,false", + "true," + // This case is not valid because no internal topic should be explicitly created with diskless enabled. + // Tested in testInvalidDisklessTopicCreationForInternalTopics + // "false,true", + }) + public void testCreateInternalTopicWithDisklessEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + // Given a setup with diskless defined at the server level + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given an internal kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + // If the diskless enable config is set, it should be added to the topic configs + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; + request.topics().add(new CreatableTopic().setName(internalTopic). + setNumPartitions(-1).setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // When creating an internal topic with diskless enabled, disable it + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse.topics().add( + new CreatableTopicResult() + .setName(internalTopic) + .setNumPartitions(1) + .setReplicationFactor((short) 3) + .setErrorMessage(null).setErrorCode((short) 0) + .setTopicId(result.response().topics().find(internalTopic).topicId())); + assertEquals(expectedResponse, withoutConfigs(result.response())); + assertTrue(result.response().topics().find(internalTopic) + .configs() + .stream() + .noneMatch(c -> c.name().equals(DISKLESS_ENABLE_CONFIG))); + final List disklessConfigRecords = result.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + // Then always diskless is disabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); + } + + @Test + public void testInvalidDisklessTopicCreationForInternalTopics() { + // Given a setup with diskless defined at the server level + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given an internal kafka topic with diskless enabled + final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue("true")); + request.topics().add(new CreatableTopic().setName(internalTopic).setConfigs(topicConfigs)); + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // When creating an internal topic with diskless enabled, disable it + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse.topics().add( + new CreatableTopicResult() + .setName(internalTopic) + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Internal topics cannot be diskless topics.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + } + + @ParameterizedTest + @CsvSource({ + "false,true", + "true," + }) + public void testInvalidDisklessTopicCreationWithoutSystemEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + // Given a setup with diskless defined at the server level + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(false) + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given an internal kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + // If the diskless enable config is set, it should be added to the topic configs + topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + final String topicName = "foo"; + request.topics().add(new CreatableTopic().setName(topicName).setConfigs(topicConfigs)); + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // When creating an internal topic with diskless enabled, disable it + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of(topicName)); + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse.topics().add( + new CreatableTopicResult() + .setName(topicName) + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Cannot create diskless topics when the diskless storage system is disabled. Please enable the diskless storage system to create diskless topics.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + } + + @Test + public void testReassignDisklessPartitions() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1); + ctx.unfenceBrokers(0, 1); + + String topic = "foo"; + ctx.createTestTopic(topic, new int[][] {new int[] {0}}, Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0); + + // No change in the replication factor. + ControllerResult alterResult1 = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(List.of( + new ReassignableTopic().setName(topic).setPartitions(List.of( + new ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(1))))))); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setResponses(List.of( + new ReassignableTopicResponse().setName(topic).setPartitions(List.of( + new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), + alterResult1.response()); + + ctx.replay(alterResult1.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(List.of(new OngoingTopicReassignment().setName(topic).setPartitions(List.of( + new OngoingPartitionReassignment().setPartitionIndex(0) + .setRemovingReplicas(List.of(0)) + .setAddingReplicas(List.of(1)) + .setReplicas(List.of(1, 0)))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( + new ListPartitionReassignmentsTopics().setName(topic). + setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); + + // Try to increase the replication factor. + ControllerResult alterResult2 = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(List.of( + new ReassignableTopic().setName(topic).setPartitions(List.of( + new ReassignablePartition().setPartitionIndex(0) + .setReplicas(List.of(0, 1))))))); + assertEquals(new AlterPartitionReassignmentsResponseData() + .setErrorMessage(null).setResponses(List.of( + new ReassignableTopicResponse().setName(topic).setPartitions(List.of( + new ReassignablePartitionResponse().setPartitionIndex(0) + .setErrorCode(INVALID_REPLICATION_FACTOR.code()) + .setErrorMessage("The replication factor is changed from 1 to 2"))))), + alterResult2.response()); + ctx.replay(alterResult2.records()); + assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( + new ListPartitionReassignmentsTopics().setName(topic) + .setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); + } + + @Test + public void testNoLeaderElectionOnBrokerFenced() { + // As there are no replicas to elect from, the leader should go offline but no new leader should be elected. + // Currently, diskless topics ignored replica management. It registers a single replica as the leader, but it's not maintained. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + new int[][] { + new int[] {0} + }, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerFenced(0, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after leader fencing"); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there was only one leader"); + assertEquals(-1, partition.leader, "Leader should be offline after fencing"); + } + + @Test + public void testNoReplicaChangeOnShutdown() { + // As there are no replicas to elect from, the leader should go offline but no new leader should be elected. + // Currently, diskless topics ignored replica management. It registers a single replica as the leader, but it's not maintained. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + new int[][] { + new int[] {0} + }, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerShutdown(0, true, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after leader fencing"); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there was only one leader"); + assertEquals(-1, partition.leader, "Leader should be offline after fencing"); + } + + @Test + void testDisklessMarksLeaderOfflineOnUnregister_noRacks() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + final int numPartitions = 6; + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + numPartitions, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerUnregistered(0, 100, records); + ctx.replay(records); + + // All partitions should remain present and keep the original replica/ISR, + // only the leader should be marked offline. + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + PartitionRegistration partition = replication.getPartition(topicId, partitionId); + assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration"); + assertArrayEquals(new int[]{0}, partition.replicas, "Replicas should stay unchanged for partition " + partitionId); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should stay unchanged for partition " + partitionId); + assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId); + } + } + + @Test + void testDisklessMarksLeaderOfflineOnUnregister_withRacks() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + final int numPartitions = 6; + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + numPartitions, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerUnregistered(0, 100, records); + ctx.replay(records); + + // All partitions should remain present and keep the original replica/ISR, + // only the leader should be marked offline. + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + PartitionRegistration partition = replication.getPartition(topicId, partitionId); + assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration"); + assertArrayEquals(new int[]{0}, partition.replicas, "Replicas should stay unchanged for partition " + partitionId); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should stay unchanged for partition " + partitionId); + assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId); + } + } + } + } From 473bb9695dcf6a03b3ff3d7e26eac7b0ea82ccee Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 20 Jan 2026 16:01:09 +0200 Subject: [PATCH 3/3] refactor(metadata:diskless): fail diskless topic creation on replica assignment Currently, diskless topic creation with replica assignment ignores the assignments and creates topic anyway with default partitions and replication factor. This can lead to unexpected/undesired states. Instead fail and provide error message for users to fix this. Existing tests changed is to align the new behavior. --- .../java/kafka/server/InklessClusterTest.java | 5 +-- .../controller/ReplicationControlManager.java | 5 ++- .../ReplicationControlManagerTest.java | 36 +++++++++++++++---- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/core/src/test/java/kafka/server/InklessClusterTest.java b/core/src/test/java/kafka/server/InklessClusterTest.java index 29dbd60e08..17c9e4cf69 100644 --- a/core/src/test/java/kafka/server/InklessClusterTest.java +++ b/core/src/test/java/kafka/server/InklessClusterTest.java @@ -206,10 +206,11 @@ public void produceToDisklessAndClassic() throws Exception { int numRecords = 10; try (Admin admin = AdminClient.create(clientConfigs)) { - final NewTopic disklessTopic = new NewTopic(disklessTopicName, Map.of(0, List.of(0))) - .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); final NewTopic classicTopic = new NewTopic(classicTopicName, Map.of(0, List.of(0))) .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")); + // manual assignment is not supported for diskless topics + final NewTopic disklessTopic = new NewTopic(disklessTopicName, 1, (short) 1) + .configs(Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")); CreateTopicsResult topics = admin.createTopics(List.of(disklessTopic, classicTopic)); topics.all().get(10, TimeUnit.SECONDS); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 91da41a7d9..5f85c0f18a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -765,7 +765,6 @@ private ApiError createTopic(ControllerRequestContext context, return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Replication factor for diskless topics must be 1 or -1 to use the default value (1)."); } - topic.assignments().clear(); } if (!topic.assignments().isEmpty()) { @@ -779,6 +778,10 @@ private ApiError createTopic(ControllerRequestContext context, "A manual partition assignment was specified, but numPartitions " + "was not set to -1."); } + if (disklessEnabled) { + return new ApiError(INVALID_REQUEST, + "A manual partition assignment cannot be specified for diskless topics."); + } OptionalInt replicationFactor = OptionalInt.empty(); for (CreatableReplicaAssignment assignment : topic.assignments()) { if (newParts.containsKey(assignment.partitionIndex())) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 0f738b074c..4712a8131b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -138,6 +138,7 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS; import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICATION_FACTOR; import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT; +import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST; import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION; import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED; import static org.apache.kafka.common.protocol.Errors.NONE; @@ -3977,7 +3978,13 @@ public void testReassignDisklessPartitions() { ctx.unfenceBrokers(0, 1); String topic = "foo"; - ctx.createTestTopic(topic, new int[][] {new int[] {0}}, Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0); + ctx.createTestTopic( + topic, + 1, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); // No change in the replication factor. ControllerResult alterResult1 = @@ -4036,9 +4043,8 @@ public void testNoLeaderElectionOnBrokerFenced() { CreatableTopicResult createResult = ctx.createTestTopic( "foo", - new int[][] { - new int[] {0} - }, + 1, + (short) 1, Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code() ); @@ -4067,9 +4073,8 @@ public void testNoReplicaChangeOnShutdown() { CreatableTopicResult createResult = ctx.createTestTopic( "foo", - new int[][] { - new int[] {0} - }, + 1, + (short) 1, Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code() ); @@ -4152,6 +4157,23 @@ void testDisklessMarksLeaderOfflineOnUnregister_withRacks() { assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId); } } + + @Test + void testManualReplicaAssignmentsShouldBeRejected() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Expectation: providing manual replica assignments for a diskless topic should be rejected. + ctx.createTestTopic( + "foo", + new int[][] {new int[] {0, 1}, new int[] {1, 2}}, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + INVALID_REQUEST.code() + ); + } } }