Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG;
Expand Down Expand Up @@ -441,12 +440,13 @@ public void format() throws Exception {
List<Future<?>> futures = new ArrayList<>();
try {
for (ControllerServer controller : controllers.values()) {
futures.add(executorService.submit(() -> formatNode(controller.sharedServer().metaPropsEnsemble(), true)));
futures.add(executorService.submit(() -> formatNode(controller.sharedServer().metaPropsEnsemble())));
}
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
BrokerServer broker = entry.getValue();
futures.add(executorService.submit(() -> formatNode(broker.sharedServer().metaPropsEnsemble(),
!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id()))));
if (!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id())) {
futures.add(executorService.submit(() -> formatNode(broker.sharedServer().metaPropsEnsemble())));
}
}
for (Future<?> future: futures) {
future.get();
Expand All @@ -460,33 +460,22 @@ public void format() throws Exception {
}

private void formatNode(
MetaPropertiesEnsemble ensemble,
boolean writeMetadataDirectory
MetaPropertiesEnsemble ensemble
) {
try {
final var nodeId = ensemble.nodeId().getAsInt();
Formatter formatter = new Formatter();
formatter.setNodeId(nodeId);
formatter.setClusterId(ensemble.clusterId().get());
if (writeMetadataDirectory) {
formatter.setDirectories(ensemble.logDirProps().keySet());
} else {
formatter.setDirectories(ensemble.logDirProps().keySet().stream().
filter(d -> !ensemble.metadataLogDir().get().equals(d)).
collect(Collectors.toSet()));
}
formatter.setDirectories(ensemble.logDirProps().keySet());
if (formatter.directories().isEmpty()) {
return;
}
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
formatter.setUnstableFeatureVersionsEnabled(true);
formatter.setIgnoreFormatted(false);
formatter.setControllerListenerName(controllerListenerName);
if (writeMetadataDirectory) {
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
} else {
formatter.setMetadataLogDirectory(Optional.empty());
}
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
StringBuilder dynamicVotersBuilder = new StringBuilder();
String prefix = "";
if (standalone) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public TestKitNodes build() {
baseDirectory.toFile().getAbsolutePath(),
clusterId,
brokerNodeIds.contains(id),
perServerProperties.getOrDefault(id, Map.of())
perServerProperties.getOrDefault(id, Map.of()),
numDisksPerBroker
);
controllerNodes.put(id, controllerNode);
}
Expand Down Expand Up @@ -346,21 +347,36 @@ private static TestKitNode buildControllerNode(int id,
String baseDirectory,
String clusterId,
boolean combined,
Map<String, String> propertyOverrides) {
Map<String, String> propertyOverrides,
int numDisksPerController) {
List<String> logDataDirectories = combined
? IntStream
.range(0, numDisksPerController)
.mapToObj(i -> String.format("combined_%d_%d", id, i))
.map(logDir -> {
if (Paths.get(logDir).isAbsolute()) {
return logDir;
}
return new File(baseDirectory, logDir).getAbsolutePath();
})
.toList()
: List.of(new File(baseDirectory, String.format("controller_%d", id)).getAbsolutePath());
String metadataDirectory = new File(baseDirectory,
combined ? String.format("combined_%d_0", id) : String.format("controller_%d", id)).getAbsolutePath();
MetaPropertiesEnsemble.Copier copier = new MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);

copier.setMetaLogDir(Optional.of(metadataDirectory));
copier.setLogDirProps(
metadataDirectory,
new MetaProperties.Builder()
.setVersion(MetaPropertiesVersion.V1)
.setClusterId(clusterId)
.setNodeId(id)
.setDirectoryId(copier.generateValidDirectoryId())
.build()
);
for (String logDir : logDataDirectories) {
copier.setLogDirProps(
logDir,
new MetaProperties.Builder()
.setVersion(MetaPropertiesVersion.V1)
.setClusterId(clusterId)
.setNodeId(id)
.setDirectoryId(copier.generateValidDirectoryId())
.build()
);
}

return new TestKitNode() {
private final MetaPropertiesEnsemble ensemble = copier.copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@

package org.apache.kafka.common.test;

import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -87,37 +93,56 @@ public void testCreateClusterWithBadPerServerProperties() {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) throws Exception {
@CsvSource({
"true,1,1,2", /* 1 combined node */
"true,5,7,2", /* 5 combined nodes + 2 controllers */
"true,7,5,2", /* 7 combined nodes */
"false,1,1,2", /* 1 broker + 1 controller */
"false,5,7,2", /* 5 brokers + 7 controllers */
"false,7,5,2", /* 7 brokers + 5 controllers */
})
public void testCreateClusterFormatAndCloseWithMultipleLogDirs(boolean combined, int numBrokers, int numControllers, int numDisks) throws Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(5).
setNumDisksPerBroker(2).
setNumBrokerNodes(numBrokers).
setNumDisksPerBroker(numDisks).
setCombined(combined).
setNumControllerNodes(3).build()).build()) {
setNumControllerNodes(numControllers).build()).build()) {

TestKitNodes nodes = cluster.nodes();
assertEquals(5, nodes.brokerNodes().size());
assertEquals(3, nodes.controllerNodes().size());
assertEquals(numBrokers, nodes.brokerNodes().size());
assertEquals(numControllers, nodes.controllerNodes().size());

Set<String> logDirs = new HashSet<>();
nodes.brokerNodes().forEach((brokerId, node) -> {
assertEquals(2, node.logDataDirectories().size());
Set<String> expected = Set.of(String.format("broker_%d_data0", brokerId), String.format("broker_%d_data1", brokerId));
if (nodes.isCombined(node.id())) {
expected = Set.of(String.format("combined_%d_0", brokerId), String.format("combined_%d_1", brokerId));
}
assertEquals(numDisks, node.logDataDirectories().size());
Set<String> expectedDisks = IntStream.range(0, numDisks)
.mapToObj(i -> {
if (nodes.isCombined(node.id())) {
return String.format("combined_%d_%d", brokerId, i);
} else {
return String.format("broker_%d_data%d", brokerId, i);
}
}).collect(Collectors.toSet());
assertEquals(
expected,
expectedDisks,
node.logDataDirectories().stream()
.map(p -> Paths.get(p).getFileName().toString())
.collect(Collectors.toSet())
);
logDirs.addAll(node.logDataDirectories());
});

nodes.controllerNodes().forEach((controllerId, node) -> {
String expected = combined ? String.format("combined_%d_0", controllerId) : String.format("controller_%d", controllerId);
String expected = nodes.isCombined(node.id()) ? String.format("combined_%d_0", controllerId) : String.format("controller_%d", controllerId);
assertEquals(expected, Paths.get(node.metadataDirectory()).getFileName().toString());
logDirs.addAll(node.logDataDirectories());
});

cluster.format();
logDirs.forEach(logDir ->
assertTrue(Files.exists(Paths.get(logDir, MetaPropertiesEnsemble.META_PROPERTIES_NAME)))
);
}
}

Expand Down