diff --git a/.azure/templates/jobs/system-tests/upgrade_jobs.yaml b/.azure/templates/jobs/system-tests/upgrade_jobs.yaml
index bfe5a495374..fc540be7f80 100644
--- a/.azure/templates/jobs/system-tests/upgrade_jobs.yaml
+++ b/.azure/templates/jobs/system-tests/upgrade_jobs.yaml
@@ -18,3 +18,13 @@ jobs:
timeout: 360
releaseVersion: '${{ parameters.releaseVersion }}'
kafkaVersion: '${{ parameters.kafkaVersion }}'
+
+ - template: '../../steps/system_test_general.yaml'
+ parameters:
+ name: 'strimzi_upgrade_kraft'
+ display_name: 'strimzi-upgrade-kraft-bundle'
+ profile: 'azp_kraft_upgrade'
+ cluster_operator_install_type: 'bundle'
+ timeout: 360
+ releaseVersion: '${{ parameters.releaseVersion }}'
+ kafkaVersion: '${{ parameters.kafkaVersion }}'
\ No newline at end of file
diff --git a/systemtest/pom.xml b/systemtest/pom.xml
index a40daa1087a..b512474f887 100644
--- a/systemtest/pom.xml
+++ b/systemtest/pom.xml
@@ -354,6 +354,14 @@
+
+ kraft_upgrade
+
+ false
+ kraftupgrade
+
+
+
operators
@@ -526,13 +534,25 @@
+
+ azp_kraft_upgrade
+
+ false
+ kraftupgrade
+
+ !KRaftKafkaUpgradeDowngradeST
+
+
+
+
azp_kafka_upgrade
false
- upgrade
+ upgrade,kraftupgrade
- KafkaUpgradeDowngradeST
+ KafkaUpgradeDowngradeST,
+ KRaftKafkaUpgradeDowngradeST
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java b/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java
index d3952623fb3..c19d203608b 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java
@@ -201,7 +201,10 @@ public interface TestConstants {
*/
String USE_KRAFT_MODE = "+UseKRaft";
String DONT_USE_KAFKA_NODE_POOLS = "-KafkaNodePools";
+ // kept for upgrade/downgrade tests in KRaft
+ String USE_KAFKA_NODE_POOLS = "+KafkaNodePools";
String DONT_USE_UNIDIRECTIONAL_TOPIC_OPERATOR = "-UnidirectionalTopicOperator";
+ String USE_UNIDIRECTIONAL_TOPIC_OPERATOR = "+UnidirectionalTopicOperator";
/**
* Default value which allows execution of tests with any tags
@@ -223,6 +226,11 @@ public interface TestConstants {
*/
String UPGRADE = "upgrade";
+ /**
+ * Tag for KRaft to KRaft tests.
+ */
+ String KRAFT_UPGRADE = "kraftupgrade";
+
/**
* Tag for olm upgrade tests
*/
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaNodePoolResource.java b/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaNodePoolResource.java
index 83607fa7677..ba9929197e9 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaNodePoolResource.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaNodePoolResource.java
@@ -4,6 +4,8 @@
*/
package io.strimzi.systemtest.resources.crd;
+import io.fabric8.kubernetes.api.model.LabelSelector;
+import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.strimzi.api.kafka.Crds;
@@ -21,7 +23,9 @@
import io.strimzi.systemtest.utils.kubeUtils.objects.PersistentVolumeClaimUtils;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Consumer;
import static io.strimzi.operator.common.Util.hashStub;
@@ -111,4 +115,21 @@ public static KafkaNodePool convertKafkaResourceToKafkaNodePool(Kafka resource)
return builder.build();
}
+
+ public static LabelSelector getLabelSelector(String clusterName, String poolName, ProcessRoles processRole) {
+ Map matchLabels = new HashMap<>();
+ matchLabels.put(Labels.STRIMZI_CLUSTER_LABEL, clusterName);
+ matchLabels.put(Labels.STRIMZI_KIND_LABEL, Kafka.RESOURCE_KIND);
+ matchLabels.put(Labels.STRIMZI_POOL_NAME_LABEL, poolName);
+
+ switch (processRole) {
+ case BROKER -> matchLabels.put(Labels.STRIMZI_BROKER_ROLE_LABEL, "true");
+ case CONTROLLER -> matchLabels.put(Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "true");
+ default -> throw new RuntimeException("No role for KafkaNodePool specified");
+ }
+
+ return new LabelSelectorBuilder()
+ .withMatchLabels(matchLabels)
+ .build();
+ }
}
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaNodePoolTemplates.java b/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaNodePoolTemplates.java
index e5f125ba417..babf2138ad0 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaNodePoolTemplates.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaNodePoolTemplates.java
@@ -30,6 +30,40 @@ public static KafkaNodePoolBuilder defaultKafkaNodePool(String namespaceName, St
.endSpec();
}
+ public static KafkaNodePoolBuilder kafkaNodePoolWithControllerRole(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
+ return defaultKafkaNodePool(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
+ .editOrNewSpec()
+ .addToRoles(ProcessRoles.CONTROLLER)
+ .endSpec();
+ }
+
+ public static KafkaNodePoolBuilder kafkaNodePoolWithControllerRoleAndPersistentStorage(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
+ return kafkaNodePoolWithControllerRole(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
+ .editOrNewSpec()
+ .withNewPersistentClaimStorage()
+ .withSize("1Gi")
+ .withDeleteClaim(true)
+ .endPersistentClaimStorage()
+ .endSpec();
+ }
+
+ public static KafkaNodePoolBuilder kafkaNodePoolWithBrokerRole(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
+ return defaultKafkaNodePool(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
+ .editOrNewSpec()
+ .addToRoles(ProcessRoles.BROKER)
+ .endSpec();
+ }
+
+ public static KafkaNodePoolBuilder kafkaNodePoolWithBrokerRoleAndPersistentStorage(String namespaceName, String nodePoolName, String kafkaClusterName, int kafkaReplicas) {
+ return kafkaNodePoolWithBrokerRole(namespaceName, nodePoolName, kafkaClusterName, kafkaReplicas)
+ .editOrNewSpec()
+ .withNewPersistentClaimStorage()
+ .withSize("1Gi")
+ .withDeleteClaim(true)
+ .endPersistentClaimStorage()
+ .endSpec();
+ }
+
/**
* Creates a KafkaNodePoolBuilder for a Kafka instance (mirroring its mandatory specification) with roles based
* on the environment setting (TestConstants.USE_KRAFT_MODE) having BROKER role in Zookeeper and Kraft mode alike
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/UpgradeKafkaVersion.java b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/UpgradeKafkaVersion.java
index 29f2f928412..78072f927ab 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/UpgradeKafkaVersion.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/UpgradeKafkaVersion.java
@@ -15,12 +15,18 @@ public class UpgradeKafkaVersion {
private String version;
private String logMessageVersion;
private String interBrokerVersion;
+ private String metadataVersion;
- UpgradeKafkaVersion(TestKafkaVersion testKafkaVersion) {
+ public UpgradeKafkaVersion(TestKafkaVersion testKafkaVersion) {
this(testKafkaVersion.version(), testKafkaVersion.messageVersion(), testKafkaVersion.protocolVersion());
}
- UpgradeKafkaVersion(String version) {
+ public UpgradeKafkaVersion(String version, String desiredMetadataVersion) {
+ this.version = version;
+ this.metadataVersion = desiredMetadataVersion;
+ }
+
+ public UpgradeKafkaVersion(String version) {
String shortVersion = version;
if (version != null && !version.equals("")) {
@@ -31,17 +37,18 @@ public class UpgradeKafkaVersion {
this.version = version;
this.logMessageVersion = shortVersion;
this.interBrokerVersion = shortVersion;
+ this.metadataVersion = shortVersion;
}
/**
* Leaving empty, so original Kafka version in `kafka-persistent.yaml` will be used
* LMFV and IBPV should be null, so the test steps will for updating the config will be skipped
*/
- UpgradeKafkaVersion() {
+ public UpgradeKafkaVersion() {
this("", null, null);
}
- UpgradeKafkaVersion(String version, String logMessageVersion, String interBrokerVersion) {
+ public UpgradeKafkaVersion(String version, String logMessageVersion, String interBrokerVersion) {
this.version = version;
this.logMessageVersion = logMessageVersion;
this.interBrokerVersion = interBrokerVersion;
@@ -63,6 +70,10 @@ public String getInterBrokerVersion() {
return this.interBrokerVersion;
}
+ public String getMetadataVersion() {
+ return this.metadataVersion;
+ }
+
public static UpgradeKafkaVersion getKafkaWithVersionFromUrl(String kafkaVersionsUrl, String kafkaVersion) {
if (kafkaVersionsUrl.equals("HEAD")) {
return new UpgradeKafkaVersion(TestKafkaVersion.getSpecificVersion(kafkaVersion));
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/VersionModificationDataLoader.java b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/VersionModificationDataLoader.java
index 7611b56fdd0..d1145e327b2 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/upgrade/VersionModificationDataLoader.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/upgrade/VersionModificationDataLoader.java
@@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.type.CollectionType;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.strimzi.systemtest.Environment;
+import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.utils.TestKafkaVersion;
import io.strimzi.test.TestUtils;
import org.apache.logging.log4j.LogManager;
@@ -33,6 +34,7 @@ public enum ModificationType {
private static final Logger LOGGER = LogManager.getLogger(VersionModificationDataLoader.class);
private OlmVersionModificationData olmUpgradeData;
private List bundleVersionModificationDataList;
+ private static final String KRAFT_UPGRADE_FEATURE_GATES = String.join(",", TestConstants.USE_KRAFT_MODE, TestConstants.USE_KAFKA_NODE_POOLS, TestConstants.USE_UNIDIRECTIONAL_TOPIC_OPERATOR);
public VersionModificationDataLoader(ModificationType upgradeType) {
if (upgradeType == ModificationType.OLM_UPGRADE) {
@@ -106,6 +108,14 @@ public int getBundleUpgradeOrDowngradeDataSize() {
return bundleVersionModificationDataList.size();
}
+ public BundleVersionModificationData buildDataForUpgradeAcrossVersionsForKRaft() {
+ BundleVersionModificationData acrossUpgradeData = buildDataForUpgradeAcrossVersions();
+
+ acrossUpgradeData = updateUpgradeDataWithFeatureGates(acrossUpgradeData, KRAFT_UPGRADE_FEATURE_GATES);
+
+ return acrossUpgradeData;
+ }
+
public BundleVersionModificationData buildDataForUpgradeAcrossVersions() {
List sortedVersions = TestKafkaVersion.getSupportedKafkaVersions();
TestKafkaVersion latestKafkaSupported = sortedVersions.get(sortedVersions.size() - 1);
@@ -133,10 +143,28 @@ public BundleVersionModificationData buildDataForUpgradeAcrossVersions() {
}
public static Stream loadYamlDowngradeData() {
+ return loadYamlDowngradeDataWithFeatureGates(null);
+ }
+
+ public static Stream loadYamlDowngradeDataForKRaft() {
+ return loadYamlDowngradeDataWithFeatureGates(KRAFT_UPGRADE_FEATURE_GATES);
+ }
+
+ public static Stream loadYamlDowngradeDataWithFeatureGates(String featureGates) {
VersionModificationDataLoader dataLoader = new VersionModificationDataLoader(ModificationType.BUNDLE_DOWNGRADE);
List parameters = new LinkedList<>();
+ List testKafkaVersions = TestKafkaVersion.getSupportedKafkaVersions();
+ TestKafkaVersion testKafkaVersion = testKafkaVersions.get(0);
+
+ // Generate procedures for upgrade
+ UpgradeKafkaVersion procedures = new UpgradeKafkaVersion(testKafkaVersion.version());
+
dataLoader.getBundleUpgradeOrDowngradeDataList().forEach(downgradeData -> {
+ downgradeData.setProcedures(procedures);
+
+ downgradeData = updateUpgradeDataWithFeatureGates(downgradeData, featureGates);
+
parameters.add(Arguments.of(downgradeData.getFromVersion(), downgradeData.getToVersion(), downgradeData));
});
@@ -144,6 +172,14 @@ public static Stream loadYamlDowngradeData() {
}
public static Stream loadYamlUpgradeData() {
+ return loadYamlUpgradeDataWithFeatureGates(null);
+ }
+
+ public static Stream loadYamlUpgradeDataForKRaft() {
+ return loadYamlUpgradeDataWithFeatureGates(KRAFT_UPGRADE_FEATURE_GATES);
+ }
+
+ public static Stream loadYamlUpgradeDataWithFeatureGates(String featureGates) {
VersionModificationDataLoader upgradeDataList = new VersionModificationDataLoader(ModificationType.BUNDLE_UPGRADE);
List parameters = new LinkedList<>();
@@ -155,6 +191,9 @@ public static Stream loadYamlUpgradeData() {
upgradeDataList.getBundleUpgradeOrDowngradeDataList().forEach(upgradeData -> {
upgradeData.setProcedures(procedures);
+
+ upgradeData = updateUpgradeDataWithFeatureGates(upgradeData, featureGates);
+
parameters.add(Arguments.of(
upgradeData.getFromVersion(), upgradeData.getToVersion(),
upgradeData.getFeatureGatesBefore(), upgradeData.getFeatureGatesAfter(),
@@ -164,4 +203,28 @@ public static Stream loadYamlUpgradeData() {
return parameters.stream();
}
+
+ private static BundleVersionModificationData updateUpgradeDataWithFeatureGates(BundleVersionModificationData upgradeData, String featureGates) {
+ if (featureGates != null && !featureGates.isEmpty()) {
+ String fgBefore = upgradeData.getFeatureGatesBefore();
+ String fgAfter = upgradeData.getFeatureGatesAfter();
+
+ // in case that we would like to keep some feature gates, we should replace those from the YAML and use the specified one instead
+ // for example in case that we are disabling UTO in YAML, but we need it for KRaft upgrade, we should remove it from the list and
+ // keep just specified
+ for (String fg : featureGates.split(",")) {
+ String fgNameWithoutSign = fg.replace("+", "").replace("-", "");
+
+ fgBefore = fgBefore.replaceFirst("(,?)(\\+|-)" + fgNameWithoutSign, "");
+ fgAfter = fgAfter.replaceFirst("(,?)(\\+|-)" + fgNameWithoutSign, "");
+ }
+
+ upgradeData.setFeatureGatesBefore(fgBefore.isEmpty() ?
+ featureGates : String.join(",", fgBefore, featureGates));
+ upgradeData.setFeatureGatesAfter(fgAfter.isEmpty() ?
+ featureGates : String.join(",", fgAfter, featureGates));
+ }
+
+ return upgradeData;
+ }
}
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/RollingUpdateUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/RollingUpdateUtils.java
index d3fa64f6517..ad8992e63cb 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/utils/RollingUpdateUtils.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/RollingUpdateUtils.java
@@ -71,8 +71,11 @@ public static boolean componentHasRolled(String namespaceName, LabelSelector sel
* @return The snapshot of the component (StrimziPodSet, Deployment) after rolling update with Uid for every pod
*/
public static Map waitTillComponentHasRolled(String namespaceName, LabelSelector selector, Map snapshot) {
+ String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
+ componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;
+
LOGGER.info("Waiting for component matching {} -> {}/{} rolling update", selector, namespaceName, componentName);
TestUtils.waitFor("rolling update of component: " + namespaceName + "/" + componentName,
TestConstants.WAIT_FOR_ROLLING_UPDATE_INTERVAL, ResourceOperation.timeoutForPodsOperation(snapshot.size()), () -> {
@@ -92,6 +95,8 @@ public static Map waitTillComponentHasRolledAndPodsReady(String
String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
+ componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;
+
waitTillComponentHasRolled(namespaceName, selector, snapshot);
LOGGER.info("Waiting for {} Pod(s) of {}/{} to be ready", expectedPods, namespaceName, componentName);
@@ -116,8 +121,10 @@ public static Map waitTillComponentHasRolled(String namespaceNam
* @return The new Snapshot of actually present Pods after the first successful roll
*/
public static Map waitTillComponentHasStartedRolling(String namespaceName, LabelSelector selector, Map snapshot) {
+ String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
+ String componentName = selector.getMatchLabels().get(Labels.STRIMZI_CONTROLLER_NAME_LABEL);
- String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
+ componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;
LOGGER.info("Waiting for component matching {} -> {}/{} first rolled Pod", selector, namespaceName, componentName);
TestUtils.waitFor("first pod's roll : " + namespaceName + "/" + componentName,
@@ -156,7 +163,9 @@ public static Map waitTillComponentHasStartedRolling(String name
public static void waitForComponentAndPodsReady(String namespaceName, LabelSelector selector, int expectedPods) {
final String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
- final String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
+ String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
+
+ componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;
LOGGER.info("Waiting for {} Pod(s) of {}/{} to be ready", expectedPods, namespaceName, componentName);
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/TestKafkaVersion.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/TestKafkaVersion.java
index 1dc46cf00ee..8bf3469ddcf 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/utils/TestKafkaVersion.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/TestKafkaVersion.java
@@ -56,6 +56,9 @@ public static List parseKafkaVersionsFromUrl(String url) throw
@JsonProperty("format")
String messageVersion;
+ @JsonProperty("metadata")
+ String metadataVersion;
+
@JsonProperty("zookeeper")
String zookeeperVersion;
@@ -89,6 +92,10 @@ public String messageVersion() {
return messageVersion;
}
+ public String metadataVersion() {
+ return metadataVersion;
+ }
+
public String zookeeperVersion() {
return zookeeperVersion;
}
diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java
index 8ddbbe54897..8af665e87c3 100644
--- a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java
+++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java
@@ -4,10 +4,13 @@
*/
package io.strimzi.systemtest.utils.kafkaUtils;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLParser;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.Pod;
import io.strimzi.api.kafka.model.Kafka;
@@ -494,6 +497,55 @@ public static String changeOrRemoveKafkaConfiguration(File file, String version,
}
}
+ public static String changeOrRemoveKafkaInKRaft(File file, String version) {
+ return changeOrRemoveKafkaConfigurationInKRaft(file, version, null);
+ }
+
+ public static String changeOrRemoveKafkaConfigurationInKRaft(File file, String version, String metadataVersion) {
+ YAMLFactory yamlFactory = new YAMLFactory();
+ ObjectMapper mapper = new ObjectMapper();
+ YAMLMapper yamlMapper = new YAMLMapper();
+
+ try {
+ YAMLParser yamlParser = yamlFactory.createParser(file);
+ List objects = mapper.readValues(yamlParser, new TypeReference() { }).readAll();
+
+ ObjectNode kafkaResourceNode = objects.get(2);
+ ObjectNode kafkaNode = (ObjectNode) kafkaResourceNode.at("/spec/kafka");
+
+ ObjectNode entity = (ObjectNode) kafkaResourceNode.at("/spec/entityOperator");
+ entity.set("topicOperator", mapper.createObjectNode());
+
+ // workaround for current Strimzi upgrade (before we will have release containing metadataVersion in examples + CRDs)
+ boolean metadataVersionFieldSupported = !cmdKubeClient().exec(false, "explain", "kafka.spec.kafka.metadataVersion").err().contains("does not exist");
+
+ if (version == null) {
+ kafkaNode.remove("version");
+ kafkaNode.remove("metadataVersion");
+ } else if (!version.equals("")) {
+ kafkaNode.put("version", version);
+
+ if (metadataVersionFieldSupported) {
+ kafkaNode.put("metadataVersion", TestKafkaVersion.getSpecificVersion(version).messageVersion());
+ }
+ }
+
+ if (metadataVersion != null && metadataVersionFieldSupported) {
+ kafkaNode.put("metadataVersion", metadataVersion);
+ }
+
+ StringBuilder output = new StringBuilder();
+
+ for (ObjectNode objectNode : objects) {
+ output.append(yamlMapper.writeValueAsString(objectNode));
+ }
+
+ return output.toString();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static String namespacedPlainBootstrapAddress(String clusterName, String namespace) {
return namespacedBootstrapAddress(clusterName, namespace, 9092);
}
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java
index 4a6bcf22de7..73a4a7709ef 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/AbstractUpgradeST.java
@@ -5,6 +5,7 @@
package io.strimzi.systemtest.upgrade;
import io.fabric8.kubernetes.api.model.LabelSelector;
+import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.strimzi.api.kafka.model.Constants;
import io.strimzi.api.kafka.model.Kafka;
@@ -89,6 +90,8 @@ public class AbstractUpgradeST extends AbstractST {
protected final LabelSelector kafkaSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.kafkaStatefulSetName(clusterName));
protected final LabelSelector zkSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.zookeeperStatefulSetName(clusterName));
+ protected final LabelSelector eoSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.entityOperatorDeploymentName(clusterName));
+ protected final LabelSelector coSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator")).build();
protected final LabelSelector connectLabelSelector = KafkaConnectResource.getLabelSelector(clusterName, KafkaConnectResources.deploymentName(clusterName));
protected final String topicName = "my-topic";
@@ -98,6 +101,10 @@ public class AbstractUpgradeST extends AbstractST {
protected final int expectedTopicCount = upgradeTopicCount + 3;
protected File kafkaYaml;
+ protected int getExpectedTopicCount() {
+ return expectedTopicCount;
+ }
+
protected void makeSnapshots() {
coPods = DeploymentUtils.depSnapshot(TestConstants.CO_NAMESPACE, ResourceManager.getCoDeploymentName());
zkPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, zkSelector);
@@ -109,11 +116,11 @@ protected void makeSnapshots() {
@SuppressWarnings("CyclomaticComplexity")
protected void changeKafkaAndLogFormatVersion(CommonVersionModificationData versionModificationData, ExtensionContext extensionContext) throws IOException {
// Get Kafka configurations
- String operatorVersion = versionModificationData.getToVersion();
- String currentLogMessageFormat = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, ".spec.kafka.config.log\\.message\\.format\\.version");
- String currentInterBrokerProtocol = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, ".spec.kafka.config.inter\\.broker\\.protocol\\.version");
+ String currentLogMessageFormat = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.config.log\\.message\\.format\\.version");
+ String currentInterBrokerProtocol = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.config.inter\\.broker\\.protocol\\.version");
+
// Get Kafka version
- String kafkaVersionFromCR = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, ".spec.kafka.version");
+ String kafkaVersionFromCR = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.version");
kafkaVersionFromCR = kafkaVersionFromCR.equals("") ? null : kafkaVersionFromCR;
String kafkaVersionFromProcedure = versionModificationData.getProcedures().getVersion();
@@ -147,7 +154,7 @@ protected void changeKafkaAndLogFormatVersion(CommonVersionModificationData vers
if (versionModificationData.getProcedures() != null && (!currentLogMessageFormat.isEmpty() || !currentInterBrokerProtocol.isEmpty())) {
if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure) && extensionContext.getTestClass().get().getSimpleName().toLowerCase(Locale.ROOT).contains("upgrade")) {
LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure);
- cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure);
+ cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure);
LOGGER.info("Waiting for Kafka rolling update to finish");
kafkaPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, kafkaSelector, 3, kafkaPods);
}
@@ -158,13 +165,13 @@ protected void changeKafkaAndLogFormatVersion(CommonVersionModificationData vers
if (logMessageVersion != null && !logMessageVersion.isEmpty() || interBrokerProtocolVersion != null && !interBrokerProtocolVersion.isEmpty()) {
if (!logMessageVersion.isEmpty()) {
LOGGER.info("Set log message format version to {} (current version is {})", logMessageVersion, currentLogMessageFormat);
- cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, "/spec/kafka/config/log.message.format.version", logMessageVersion);
+ cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/config/log.message.format.version", logMessageVersion);
}
if (!interBrokerProtocolVersion.isEmpty()) {
LOGGER.info("Set inter-broker protocol version to {} (current version is {})", interBrokerProtocolVersion, currentInterBrokerProtocol);
LOGGER.info("Set inter-broker protocol version to " + interBrokerProtocolVersion);
- cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, "/spec/kafka/config/inter.broker.protocol.version", interBrokerProtocolVersion);
+ cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/config/inter.broker.protocol.version", interBrokerProtocolVersion);
}
if ((currentInterBrokerProtocol != null && !currentInterBrokerProtocol.equals(interBrokerProtocolVersion)) ||
@@ -177,26 +184,31 @@ protected void changeKafkaAndLogFormatVersion(CommonVersionModificationData vers
if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure) && extensionContext.getTestClass().get().getSimpleName().toLowerCase(Locale.ROOT).contains("downgrade")) {
LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure);
- cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL, operatorVersion), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure);
+ cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure);
LOGGER.info("Waiting for Kafka rolling update to finish");
kafkaPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, kafkaSelector, kafkaPods);
}
}
}
- protected void logPodImages(String clusterName) {
- List pods = kubeClient().listPods(KafkaResource.getLabelSelector(clusterName, KafkaResources.zookeeperStatefulSetName(clusterName)));
- for (Pod pod : pods) {
- LOGGER.info("Pod: {}/{} has image {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage());
- }
- pods = kubeClient().listPods(KafkaResource.getLabelSelector(clusterName, KafkaResources.kafkaStatefulSetName(clusterName)));
- for (Pod pod : pods) {
- LOGGER.info("Pod: {}/{} has image {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage());
- }
- pods = kubeClient().listPods(kubeClient().getDeploymentSelectors(KafkaResources.entityOperatorDeploymentName(clusterName)));
- for (Pod pod : pods) {
- LOGGER.info("Pod: {}/{} has image {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage());
- LOGGER.info("Pod: {}/{} has image {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(1).getImage());
+ protected void logPodImages(String namespaceName) {
+ logPodImages(namespaceName, zkSelector, kafkaSelector, eoSelector, coSelector);
+ }
+
+ protected void logPodImagesWithConnect(String namespaceName) {
+ logPodImages(namespaceName, zkSelector, kafkaSelector, eoSelector, connectLabelSelector, coSelector);
+ }
+
+ protected void logPodImages(String namespaceName, LabelSelector... labelSelectors) {
+ for (LabelSelector labelSelector : labelSelectors) {
+ List pods = kubeClient().listPods(namespaceName, labelSelector);
+
+ pods.forEach(pod ->
+ pod.getSpec().getContainers().forEach(container ->
+ LOGGER.info("Pod: {}/{} has image {}",
+ pod.getMetadata().getNamespace(), pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getImage())
+ )
+ );
}
}
@@ -219,7 +231,7 @@ protected void waitForReadinessOfKafkaCluster() {
DeploymentUtils.waitForDeploymentAndPodsReady(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1);
}
- protected void changeClusterOperator(BundleVersionModificationData versionModificationData, String namespace, ExtensionContext extensionContext) throws IOException {
+ protected void changeClusterOperator(BundleVersionModificationData versionModificationData, String namespace, ExtensionContext extensionContext) throws IOException {
File coDir;
// Modify + apply installation files
LOGGER.info("Update CO from {} to {}", versionModificationData.getFromVersion(), versionModificationData.getToVersion());
@@ -231,13 +243,13 @@ protected void changeClusterOperator(BundleVersionModificationData versionModif
coDir = new File(dir, versionModificationData.getToExamples() + "/install/cluster-operator/");
}
- copyModifyApply(coDir, namespace, extensionContext, versionModificationData.getFeatureGatesAfter());
+ copyModifyApply(coDir, namespace, versionModificationData.getFeatureGatesAfter());
LOGGER.info("Waiting for CO upgrade");
DeploymentUtils.waitTillDepHasRolled(namespace, ResourceManager.getCoDeploymentName(), 1, coPods);
}
- protected void copyModifyApply(File root, String namespace, ExtensionContext extensionContext, final String strimziFeatureGatesValue) {
+ protected void copyModifyApply(File root, String namespace, final String strimziFeatureGatesValue) {
Arrays.stream(Objects.requireNonNull(root.listFiles())).sorted().forEach(f -> {
if (f.getName().matches(".*RoleBinding.*")) {
cmdKubeClient().replaceContent(TestUtils.changeRoleBindingSubject(f, namespace));
@@ -283,30 +295,18 @@ protected void checkAllImages(BundleVersionModificationData versionModificationD
fail("There are no expected images");
}
- Map zkSelector = Labels.EMPTY
- .withStrimziKind(Kafka.RESOURCE_KIND)
- .withStrimziCluster(clusterName)
- .withStrimziName(KafkaResources.zookeeperStatefulSetName(clusterName))
- .toMap();
-
- Map kafkaSelector = Labels.EMPTY
- .withStrimziKind(Kafka.RESOURCE_KIND)
- .withStrimziCluster(clusterName)
- .withStrimziName(KafkaResources.kafkaStatefulSetName(clusterName))
- .toMap();
-
checkContainerImages(zkSelector, versionModificationData.getZookeeperImage());
checkContainerImages(kafkaSelector, versionModificationData.getKafkaImage());
- checkContainerImages(kubeClient().getDeployment(namespaceName, KafkaResources.entityOperatorDeploymentName(clusterName)).getSpec().getSelector().getMatchLabels(), versionModificationData.getTopicOperatorImage());
- checkContainerImages(kubeClient().getDeployment(namespaceName, KafkaResources.entityOperatorDeploymentName(clusterName)).getSpec().getSelector().getMatchLabels(), 1, versionModificationData.getUserOperatorImage());
+ checkContainerImages(eoSelector, versionModificationData.getTopicOperatorImage());
+ checkContainerImages(eoSelector, 1, versionModificationData.getUserOperatorImage());
}
- protected void checkContainerImages(Map matchLabels, String image) {
- checkContainerImages(matchLabels, 0, image);
+ protected void checkContainerImages(LabelSelector labelSelector, String image) {
+ checkContainerImages(labelSelector, 0, image);
}
- protected void checkContainerImages(Map matchLabels, int container, String image) {
- List pods1 = kubeClient().listPods(matchLabels);
+ protected void checkContainerImages(LabelSelector labelSelector, int container, String image) {
+ List pods1 = kubeClient().listPods(labelSelector);
for (Pod pod : pods1) {
if (!image.equals(pod.getSpec().getContainers().get(container).getImage())) {
LOGGER.debug("Expected image for Pod: {}/{}: {} \nCurrent image: {}", pod.getMetadata().getNamespace(), pod.getMetadata().getName(), image, pod.getSpec().getContainers().get(container).getImage());
@@ -319,8 +319,6 @@ protected void setupEnvAndUpgradeClusterOperator(ExtensionContext extensionConte
LOGGER.info("Test upgrade of Cluster Operator from version: {} to version: {}", upgradeData.getFromVersion(), upgradeData.getToVersion());
cluster.setNamespace(namespace);
- String operatorVersion = upgradeData.getFromVersion();
-
this.deployCoWithWaitForReadiness(extensionContext, upgradeData, namespace);
this.deployKafkaClusterWithWaitForReadiness(extensionContext, upgradeData, upgradeKafkaVersion);
this.deployKafkaUserWithWaitForReadiness(extensionContext, upgradeData, namespace);
@@ -343,7 +341,7 @@ protected void setupEnvAndUpgradeClusterOperator(ExtensionContext extensionConte
// Attach clients which will continuously produce/consume messages to/from Kafka brokers during rolling update
// ##############################
// Setup topic, which has 3 replicas and 2 min.isr to see if producer will be able to work during rolling update
- if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL, operatorVersion)).contains(testStorage.getTopicName())) {
+ if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL)).contains(testStorage.getTopicName())) {
String pathToTopicExamples = upgradeData.getFromExamples().equals("HEAD") ? PATH_TO_KAFKA_TOPIC_CONFIG : upgradeData.getFromExamples() + "/examples/topic/kafka-topic.yaml";
kafkaTopicYaml = new File(dir, pathToTopicExamples);
@@ -353,7 +351,7 @@ protected void setupEnvAndUpgradeClusterOperator(ExtensionContext extensionConte
.replace("replicas: 1", "replicas: 3") +
" min.insync.replicas: 2");
- ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL, operatorVersion), testStorage.getTopicName());
+ ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL), testStorage.getTopicName());
}
String producerAdditionConfiguration = "delivery.timeout.ms=20000\nrequest.timeout.ms=20000";
@@ -374,7 +372,6 @@ protected void setupEnvAndUpgradeClusterOperator(ExtensionContext extensionConte
}
makeSnapshots();
- logPodImages(clusterName);
}
protected void verifyProcedure(BundleVersionModificationData upgradeData, String producerName, String consumerName, String namespace) {
@@ -383,7 +380,7 @@ protected void verifyProcedure(BundleVersionModificationData upgradeData, String
// Check that topics weren't deleted/duplicated during upgrade procedures
String listedTopics = cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL));
int additionalTopics = upgradeData.getAdditionalTopics();
- assertThat("KafkaTopic list doesn't have expected size", Long.valueOf(listedTopics.lines().count() - 1).intValue(), is(expectedTopicCount + additionalTopics));
+ assertThat("KafkaTopic list doesn't have expected size", Long.valueOf(listedTopics.lines().count() - 1).intValue(), is(getExpectedTopicCount() + additionalTopics));
assertThat("KafkaTopic " + topicName + " is not in expected Topic list",
listedTopics.contains(topicName), is(true));
for (int x = 0; x < upgradeTopicCount; x++) {
@@ -399,17 +396,8 @@ protected void verifyProcedure(BundleVersionModificationData upgradeData, String
// ##############################
}
}
-
protected String getResourceApiVersion(String resourcePlural) {
- return getResourceApiVersion(resourcePlural, "HEAD");
- }
-
- protected String getResourceApiVersion(String resourcePlural, String coVersion) {
- if (coVersion.equals("HEAD") || TestKafkaVersion.compareDottedVersions(coVersion, "0.22.0") >= 0) {
- return resourcePlural + "." + Constants.V1BETA2 + "." + Constants.RESOURCE_GROUP_NAME;
- } else {
- return resourcePlural + "." + Constants.V1BETA1 + "." + Constants.RESOURCE_GROUP_NAME;
- }
+ return resourcePlural + "." + Constants.V1BETA2 + "." + Constants.RESOURCE_GROUP_NAME;
}
protected void deployCoWithWaitForReadiness(final ExtensionContext extensionContext, final BundleVersionModificationData upgradeData,
@@ -425,7 +413,7 @@ protected void deployCoWithWaitForReadiness(final ExtensionContext extensionCont
}
// Modify + apply installation files
- copyModifyApply(coDir, namespaceName, extensionContext, upgradeData.getFeatureGatesBefore());
+ copyModifyApply(coDir, namespaceName, upgradeData.getFeatureGatesBefore());
LOGGER.info("Waiting for Deployment: {}", ResourceManager.getCoDeploymentName());
DeploymentUtils.waitForDeploymentAndPodsReady(namespaceName, ResourceManager.getCoDeploymentName(), 1);
@@ -437,7 +425,7 @@ protected void deployKafkaClusterWithWaitForReadiness(final ExtensionContext ext
final UpgradeKafkaVersion upgradeKafkaVersion) {
LOGGER.info("Deploying Kafka: {} in Namespace: {}", clusterName, kubeClient().getNamespace());
- if (!cmdKubeClient().getResources(getResourceApiVersion(Kafka.RESOURCE_PLURAL, upgradeData.getFromVersion())).contains(clusterName)) {
+ if (!cmdKubeClient().getResources(getResourceApiVersion(Kafka.RESOURCE_PLURAL)).contains(clusterName)) {
// Deploy a Kafka cluster
if (upgradeData.getFromExamples().equals("HEAD")) {
resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaPersistent(clusterName, 3, 3)
@@ -468,14 +456,14 @@ protected void deployKafkaUserWithWaitForReadiness(final ExtensionContext extens
final String namespaceName) {
LOGGER.info("Deploying KafkaUser: {}/{}", kubeClient().getNamespace(), userName);
- if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL, upgradeData.getFromVersion())).contains(userName)) {
+ if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL)).contains(userName)) {
if (upgradeData.getFromVersion().equals("HEAD")) {
resourceManager.createResourceWithWait(extensionContext, KafkaUserTemplates.tlsUser(namespaceName, clusterName, userName).build());
} else {
kafkaUserYaml = new File(dir, upgradeData.getFromExamples() + "/examples/user/kafka-user.yaml");
LOGGER.info("Deploying KafkaUser from: {}", kafkaUserYaml.getPath());
cmdKubeClient().applyContent(KafkaUserUtils.removeKafkaUserPart(kafkaUserYaml, "authorization"));
- ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL, upgradeData.getFromVersion()), userName);
+ ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaUser.RESOURCE_PLURAL), userName);
}
}
}
@@ -483,7 +471,7 @@ protected void deployKafkaUserWithWaitForReadiness(final ExtensionContext extens
protected void deployKafkaTopicWithWaitForReadiness(final BundleVersionModificationData upgradeData) {
LOGGER.info("Deploying KafkaTopic: {}/{}", kubeClient().getNamespace(), topicName);
- if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL, upgradeData.getFromVersion())).contains(topicName)) {
+ if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL)).contains(topicName)) {
if (upgradeData.getFromVersion().equals("HEAD")) {
kafkaTopicYaml = new File(dir, PATH_TO_PACKAGING_EXAMPLES + "/topic/kafka-topic.yaml");
} else {
@@ -491,7 +479,7 @@ protected void deployKafkaTopicWithWaitForReadiness(final BundleVersionModificat
}
LOGGER.info("Deploying KafkaTopic from: {}", kafkaTopicYaml.getPath());
cmdKubeClient().create(kafkaTopicYaml);
- ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL, upgradeData.getFromVersion()), topicName);
+ ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaTopic.RESOURCE_PLURAL), topicName);
}
}
@@ -500,7 +488,7 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(final Ext
final UpgradeKafkaVersion upgradeKafkaVersion,
final TestStorage testStorage) {
// setup KafkaConnect + KafkaConnector
- if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL, acrossUpgradeData.getFromVersion())).contains(clusterName)) {
+ if (!cmdKubeClient().getResources(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL)).contains(clusterName)) {
if (acrossUpgradeData.getFromVersion().equals("HEAD")) {
resourceManager.createResourceWithWait(extensionContext, KafkaConnectTemplates.kafkaConnectWithFilePlugin(clusterName, testStorage.getNamespaceName(), 1)
.editMetadata()
@@ -559,7 +547,7 @@ protected void deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(final Ext
LOGGER.info("Deploying KafkaConnect from: {}", kafkaConnectYaml.getPath());
cmdKubeClient().applyContent(TestUtils.toYamlString(kafkaConnect));
- ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL, acrossUpgradeData.getFromVersion()), kafkaConnect.getMetadata().getName());
+ ResourceManager.waitForResourceReadiness(getResourceApiVersion(KafkaConnect.RESOURCE_PLURAL), kafkaConnect.getMetadata().getName());
// in our examples is no sink connector and thus we are using the same as in HEAD verification
resourceManager.createResourceWithWait(extensionContext, KafkaConnectorTemplates.kafkaConnector(clusterName)
@@ -601,7 +589,7 @@ protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(final
ClientUtils.waitForProducerClientSuccess(testStorage);
makeSnapshots();
- logPodImages(clusterName);
+ logPodImagesWithConnect(TestConstants.CO_NAMESPACE);
// Verify FileSink KafkaConnector before upgrade
String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName();
@@ -627,4 +615,13 @@ protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(final
// Verify that pods are stable
PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), clusterName);
}
+
+ protected String downloadExamplesAndGetPath(CommonVersionModificationData versionModificationData) throws IOException {
+ if (versionModificationData.getToUrl().equals("HEAD")) {
+ return PATH_TO_PACKAGING_EXAMPLES;
+ } else {
+ File dir = FileUtils.downloadAndUnzip(versionModificationData.getToUrl());
+ return dir.getAbsolutePath() + "/" + versionModificationData.getToExamples() + "/examples";
+ }
+ }
}
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java
new file mode 100644
index 00000000000..edd9a6c4e0a
--- /dev/null
+++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/AbstractKRaftUpgradeST.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.systemtest.upgrade.kraft;
+
+import io.fabric8.kubernetes.api.model.LabelSelector;
+import io.strimzi.api.kafka.model.Kafka;
+import io.strimzi.api.kafka.model.KafkaResources;
+import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
+import io.strimzi.operator.common.Annotations;
+import io.strimzi.systemtest.TestConstants;
+import io.strimzi.systemtest.resources.ResourceManager;
+import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource;
+import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
+import io.strimzi.systemtest.templates.crd.KafkaTemplates;
+import io.strimzi.systemtest.upgrade.AbstractUpgradeST;
+import io.strimzi.systemtest.upgrade.BundleVersionModificationData;
+import io.strimzi.systemtest.upgrade.CommonVersionModificationData;
+import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion;
+import io.strimzi.systemtest.utils.RollingUpdateUtils;
+import io.strimzi.systemtest.utils.TestKafkaVersion;
+import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils;
+import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils;
+import io.strimzi.systemtest.utils.kubeUtils.controllers.DeploymentUtils;
+import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
+import io.strimzi.test.TestUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static io.strimzi.test.k8s.KubeClusterResource.cmdKubeClient;
+import static io.strimzi.test.k8s.KubeClusterResource.kubeClient;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class AbstractKRaftUpgradeST extends AbstractUpgradeST {
+
+ private static final Logger LOGGER = LogManager.getLogger(AbstractKRaftUpgradeST.class);
+
+ protected Map brokerPods;
+ protected Map controllerPods;
+
+ protected static final String CONTROLLER_NODE_NAME = "controller";
+ protected static final String BROKER_NODE_NAME = "broker";
+
+ protected final LabelSelector controllerSelector = KafkaNodePoolResource.getLabelSelector(clusterName, CONTROLLER_NODE_NAME, ProcessRoles.CONTROLLER);
+ protected final LabelSelector brokerSelector = KafkaNodePoolResource.getLabelSelector(clusterName, BROKER_NODE_NAME, ProcessRoles.BROKER);
+
+ // topics that are just present in Kafka itself are not created as CRs in UTO, thus -3 topics in comparison to regular upgrade
+ protected final int expectedTopicCount = upgradeTopicCount;
+
+ protected int getExpectedTopicCount() {
+ return expectedTopicCount;
+ }
+
+ @Override
+ protected void makeSnapshots() {
+ coPods = DeploymentUtils.depSnapshot(TestConstants.CO_NAMESPACE, ResourceManager.getCoDeploymentName());
+ eoPods = DeploymentUtils.depSnapshot(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName));
+ controllerPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, controllerSelector);
+ brokerPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, brokerSelector);
+ connectPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, connectLabelSelector);
+ }
+
+ @Override
+ protected void deployKafkaClusterWithWaitForReadiness(final ExtensionContext extensionContext, final BundleVersionModificationData upgradeData,
+ final UpgradeKafkaVersion upgradeKafkaVersion) {
+ LOGGER.info("Deploying Kafka: {} in Namespace: {}", clusterName, kubeClient().getNamespace());
+
+ if (!cmdKubeClient().getResources(getResourceApiVersion(Kafka.RESOURCE_PLURAL)).contains(clusterName)) {
+ // Deploy a Kafka cluster
+ if (upgradeData.getFromExamples().equals("HEAD")) {
+ resourceManager.createResourceWithWait(extensionContext,
+ KafkaNodePoolTemplates.kafkaNodePoolWithControllerRoleAndPersistentStorage(TestConstants.CO_NAMESPACE, CONTROLLER_NODE_NAME, clusterName, 3).build(),
+ KafkaNodePoolTemplates.kafkaNodePoolWithBrokerRoleAndPersistentStorage(TestConstants.CO_NAMESPACE, BROKER_NODE_NAME, clusterName, 3).build(),
+ KafkaTemplates.kafkaPersistent(clusterName, 3, 3)
+ .editMetadata()
+ .addToAnnotations(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled")
+ .addToAnnotations(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled")
+ .endMetadata()
+ .editSpec()
+ .editKafka()
+ .withVersion(upgradeKafkaVersion.getVersion())
+ .withMetadataVersion(upgradeKafkaVersion.getMetadataVersion())
+ .endKafka()
+ .endSpec()
+ .build());
+ } else {
+ kafkaYaml = new File(dir, upgradeData.getFromExamples() + "/examples/kafka/nodepools/kafka-with-kraft.yaml");
+ LOGGER.info("Deploying Kafka from: {}", kafkaYaml.getPath());
+ // Change kafka version of it's empty (null is for remove the version)
+ if (upgradeKafkaVersion == null) {
+ cmdKubeClient().applyContent(KafkaUtils.changeOrRemoveKafkaInKRaft(kafkaYaml, null));
+ } else {
+ cmdKubeClient().applyContent(KafkaUtils.changeOrRemoveKafkaConfigurationInKRaft(kafkaYaml, upgradeKafkaVersion.getVersion(), upgradeKafkaVersion.getMetadataVersion()));
+ }
+ // Wait for readiness
+ waitForReadinessOfKafkaCluster();
+ }
+ }
+ }
+
+ @Override
+ protected void waitForKafkaClusterRollingUpdate() {
+ LOGGER.info("Waiting for Kafka Pods with controller role to be rolled");
+ controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, controllerSelector, 3, controllerPods);
+ LOGGER.info("Waiting for Kafka Pods with broker role to be rolled");
+ brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, brokerSelector, 3, brokerPods);
+ LOGGER.info("Waiting for EO Deployment to be rolled");
+ // Check the TO and UO also got upgraded
+ eoPods = DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods);
+ }
+
+ @Override
+ protected void waitForReadinessOfKafkaCluster() {
+ LOGGER.info("Waiting for Kafka Pods with controller role to be ready");
+ RollingUpdateUtils.waitForComponentAndPodsReady(TestConstants.CO_NAMESPACE, controllerSelector, 3);
+ LOGGER.info("Waiting for Kafka Pods with broker role to be ready");
+ RollingUpdateUtils.waitForComponentAndPodsReady(TestConstants.CO_NAMESPACE, brokerSelector, 3);
+ LOGGER.info("Waiting for EO Deployment");
+ DeploymentUtils.waitForDeploymentAndPodsReady(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1);
+ }
+
+ protected void changeKafkaAndMetadataVersion(CommonVersionModificationData versionModificationData, ExtensionContext extensionContext) throws IOException {
+ changeKafkaAndMetadataVersion(versionModificationData, false, extensionContext);
+ }
+
+ /**
+ * Method for changing Kafka `version` and `metadataVersion` fields in Kafka CR based on the current scenario
+ * @param versionModificationData data structure holding information about the desired steps/versions that should be applied
+ * @param replaceEvenIfMissing current workaround for the situation when `metadataVersion` is not set in Kafka CR -> that's because previous version of operator
+ * doesn't contain this kind of field, so even if we set this field in the Kafka CR, it is removed by the operator
+ * this is needed for correct functionality of the `testUpgradeAcrossVersionsWithUnsupportedKafkaVersion` test
+ * @param extensionContext context of the test
+ * @throws IOException exception during application of YAML files
+ */
+ @SuppressWarnings("CyclomaticComplexity")
+ protected void changeKafkaAndMetadataVersion(CommonVersionModificationData versionModificationData, boolean replaceEvenIfMissing, ExtensionContext extensionContext) throws IOException {
+ // Get Kafka version
+ String kafkaVersionFromCR = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.version");
+ kafkaVersionFromCR = kafkaVersionFromCR.equals("") ? null : kafkaVersionFromCR;
+ // Get Kafka metadata version
+ String currentMetadataVersion = cmdKubeClient().getResourceJsonPath(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, ".spec.kafka.metadataVersion");
+
+ String kafkaVersionFromProcedure = versionModificationData.getProcedures().getVersion();
+
+ // #######################################################################
+ // ################# Update CRs to latest version ###################
+ // #######################################################################
+ String examplesPath = downloadExamplesAndGetPath(versionModificationData);
+
+ applyCustomResourcesFromPath(examplesPath, kafkaVersionFromCR);
+
+ // #######################################################################
+
+ if (versionModificationData.getProcedures() != null && (!currentMetadataVersion.isEmpty() || replaceEvenIfMissing)) {
+
+ if (kafkaVersionFromProcedure != null && !kafkaVersionFromProcedure.isEmpty() && !kafkaVersionFromCR.contains(kafkaVersionFromProcedure)) {
+ LOGGER.info("Set Kafka version to " + kafkaVersionFromProcedure);
+ cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/version", kafkaVersionFromProcedure);
+
+ waitForKafkaControllersAndBrokersFinishRollingUpdate();
+ }
+
+ String metadataVersion = versionModificationData.getProcedures().getMetadataVersion();
+
+ if (metadataVersion != null && !metadataVersion.isEmpty()) {
+ LOGGER.info("Set metadata version to {} (current version is {})", metadataVersion, currentMetadataVersion);
+ cmdKubeClient().patchResource(getResourceApiVersion(Kafka.RESOURCE_PLURAL), clusterName, "/spec/kafka/metadataVersion", metadataVersion);
+
+ makeSnapshots();
+ }
+ }
+ }
+
+ @Override
+ protected void checkAllImages(BundleVersionModificationData versionModificationData, String namespaceName) {
+ if (versionModificationData.getImagesAfterOperations().isEmpty()) {
+ fail("There are no expected images");
+ }
+
+ checkContainerImages(controllerSelector, versionModificationData.getKafkaImage());
+ checkContainerImages(brokerSelector, versionModificationData.getKafkaImage());
+ checkContainerImages(eoSelector, versionModificationData.getTopicOperatorImage());
+ checkContainerImages(eoSelector, 1, versionModificationData.getUserOperatorImage());
+ }
+
+ @Override
+ protected void logPodImages(String namespaceName) {
+ logPodImages(namespaceName, controllerSelector, brokerSelector, eoSelector, coSelector);
+ }
+
+ @Override
+ protected void logPodImagesWithConnect(String namespaceName) {
+ logPodImages(namespaceName, controllerSelector, brokerSelector, eoSelector, coSelector);
+ }
+
+ protected void waitForKafkaControllersAndBrokersFinishRollingUpdate() {
+ LOGGER.info("Waiting for Kafka rolling update to finish");
+ controllerPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, controllerSelector, 3, controllerPods);
+ brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, brokerSelector, 3, brokerPods);
+ }
+
+ protected void applyKafkaCustomResourceFromPath(String examplesPath, String kafkaVersionFromCR) {
+ // Change kafka version of it's empty (null is for remove the version)
+ String metadataVersion = kafkaVersionFromCR == null ? null : TestKafkaVersion.getSpecificVersion(kafkaVersionFromCR).metadataVersion();
+
+ kafkaYaml = new File(examplesPath + "/kafka/nodepools/kafka-with-kraft.yaml");
+ LOGGER.info("Deploying Kafka from: {}", kafkaYaml.getPath());
+ cmdKubeClient().applyContent(KafkaUtils.changeOrRemoveKafkaConfigurationInKRaft(kafkaYaml, kafkaVersionFromCR, metadataVersion));
+ }
+
+ protected void applyCustomResourcesFromPath(String examplesPath, String kafkaVersionFromCR) {
+ applyKafkaCustomResourceFromPath(examplesPath, kafkaVersionFromCR);
+
+ kafkaUserYaml = new File(examplesPath + "/user/kafka-user.yaml");
+ LOGGER.info("Deploying KafkaUser from: {}", kafkaUserYaml.getPath());
+ cmdKubeClient().applyContent(KafkaUserUtils.removeKafkaUserPart(kafkaUserYaml, "authorization"));
+
+ kafkaTopicYaml = new File(examplesPath + "/topic/kafka-topic.yaml");
+ LOGGER.info("Deploying KafkaTopic from: {}", kafkaTopicYaml.getPath());
+ cmdKubeClient().applyContent(TestUtils.readFile(kafkaTopicYaml));
+ }
+}
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java
new file mode 100644
index 00000000000..fe7280f897c
--- /dev/null
+++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftKafkaUpgradeDowngradeST.java
@@ -0,0 +1,278 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.systemtest.upgrade.kraft;
+
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.strimzi.api.kafka.Crds;
+import io.strimzi.api.kafka.model.KafkaBuilder;
+import io.strimzi.api.kafka.model.KafkaResources;
+import io.strimzi.operator.common.Annotations;
+import io.strimzi.systemtest.Environment;
+import io.strimzi.systemtest.TestConstants;
+import io.strimzi.systemtest.annotations.IsolatedTest;
+import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients;
+import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClientsBuilder;
+import io.strimzi.systemtest.resources.crd.KafkaResource;
+import io.strimzi.systemtest.storage.TestStorage;
+import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
+import io.strimzi.systemtest.templates.crd.KafkaTemplates;
+import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates;
+import io.strimzi.systemtest.utils.ClientUtils;
+import io.strimzi.systemtest.utils.RollingUpdateUtils;
+import io.strimzi.systemtest.utils.TestKafkaVersion;
+import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils;
+import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static io.strimzi.systemtest.TestConstants.KRAFT_UPGRADE;
+import static io.strimzi.test.k8s.KubeClusterResource.kubeClient;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * This test class contains tests for Kafka upgrade/downgrade from version X to X +/- 1, running in KRaft mode.
+ * Metadata for upgrade/downgrade procedure are loaded from kafka-versions.yaml in root dir of this repository.
+ */
+@Tag(KRAFT_UPGRADE)
+public class KRaftKafkaUpgradeDowngradeST extends AbstractKRaftUpgradeST {
+ private static final Logger LOGGER = LogManager.getLogger(KRaftKafkaUpgradeDowngradeST.class);
+
+ private final String continuousTopicName = "continuous-topic";
+ private final int continuousClientsMessageCount = 1000;
+
+ @IsolatedTest
+ void testKafkaClusterUpgrade(ExtensionContext testContext) {
+ List sortedVersions = TestKafkaVersion.getSupportedKafkaVersions();
+
+ String producerName = clusterName + "-producer";
+ String consumerName = clusterName + "-consumer";
+
+ for (int x = 0; x < sortedVersions.size() - 1; x++) {
+ TestKafkaVersion initialVersion = sortedVersions.get(x);
+ TestKafkaVersion newVersion = sortedVersions.get(x + 1);
+
+ // If it is an upgrade test we keep the metadata version as the lower version number
+ String metadataVersion = initialVersion.metadataVersion();
+
+ runVersionChange(initialVersion, newVersion, producerName, consumerName, metadataVersion, 3, 3, testContext);
+ }
+
+ // ##############################
+ // Validate that continuous clients finished successfully
+ // ##############################
+ ClientUtils.waitForClientsSuccess(producerName, consumerName, TestConstants.CO_NAMESPACE, continuousClientsMessageCount);
+ // ##############################
+ }
+
+ @IsolatedTest
+ void testKafkaClusterDowngrade(ExtensionContext testContext) {
+ final TestStorage testStorage = storageMap.get(testContext);
+ List sortedVersions = TestKafkaVersion.getSupportedKafkaVersions();
+
+ String clusterName = testStorage.getClusterName();
+ String producerName = clusterName + "-producer";
+ String consumerName = clusterName + "-consumer";
+
+ for (int x = sortedVersions.size() - 1; x > 0; x--) {
+ TestKafkaVersion initialVersion = sortedVersions.get(x);
+ TestKafkaVersion newVersion = sortedVersions.get(x - 1);
+
+ // If it is a downgrade then we make sure that we are using the lowest metadataVersion from the whole list
+ String metadataVersion = sortedVersions.get(0).metadataVersion();
+ runVersionChange(initialVersion, newVersion, producerName, consumerName, metadataVersion, 3, 3, testContext);
+ }
+
+ // ##############################
+ // Validate that continuous clients finished successfully
+ // ##############################
+ ClientUtils.waitForClientsSuccess(producerName, consumerName, TestConstants.CO_NAMESPACE, continuousClientsMessageCount);
+ // ##############################
+ }
+
+ @IsolatedTest
+ void testUpgradeWithNoMetadataVersionSet(ExtensionContext testContext) {
+ List sortedVersions = TestKafkaVersion.getSupportedKafkaVersions();
+
+ String producerName = clusterName + "-producer";
+ String consumerName = clusterName + "-consumer";
+
+ for (int x = 0; x < sortedVersions.size() - 1; x++) {
+ TestKafkaVersion initialVersion = sortedVersions.get(x);
+ TestKafkaVersion newVersion = sortedVersions.get(x + 1);
+
+ runVersionChange(initialVersion, newVersion, producerName, consumerName, null, 3, 3, testContext);
+ }
+
+ // ##############################
+ // Validate that continuous clients finished successfully
+ // ##############################
+ ClientUtils.waitForClientsSuccess(producerName, consumerName, TestConstants.CO_NAMESPACE, continuousClientsMessageCount);
+ // ##############################
+ }
+
+ @BeforeAll
+ void setupEnvironment(final ExtensionContext extensionContext) {
+ List coEnvVars = new ArrayList<>();
+ coEnvVars.add(new EnvVar(Environment.STRIMZI_FEATURE_GATES_ENV, String.join(",",
+ TestConstants.USE_KRAFT_MODE, TestConstants.USE_KAFKA_NODE_POOLS, TestConstants.USE_UNIDIRECTIONAL_TOPIC_OPERATOR), null));
+
+ clusterOperator
+ .defaultInstallation(extensionContext)
+ .withExtraEnvVars(coEnvVars)
+ .createInstallation()
+ .runInstallation();
+ }
+
+ @SuppressWarnings({"checkstyle:MethodLength"})
+ void runVersionChange(TestKafkaVersion initialVersion, TestKafkaVersion newVersion, String producerName, String consumerName, String initMetadataVersion, int controllerReplicas, int brokerReplicas, ExtensionContext testContext) {
+ boolean isUpgrade = initialVersion.isUpgrade(newVersion);
+ Map controllerPods;
+ Map brokerPods;
+
+ boolean sameMinorVersion = initialVersion.metadataVersion().equals(newVersion.metadataVersion());
+
+ if (KafkaResource.kafkaClient().inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName).get() == null) {
+ LOGGER.info("Deploying initial Kafka version {} with metadataVersion={}", initialVersion.version(), initMetadataVersion);
+
+ KafkaBuilder kafka = KafkaTemplates.kafkaPersistent(clusterName, controllerReplicas, brokerReplicas)
+ .editMetadata()
+ .addToAnnotations(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled")
+ .addToAnnotations(Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled")
+ .endMetadata()
+ .editSpec()
+ .editKafka()
+ .withVersion(initialVersion.version())
+ .withConfig(null)
+ .endKafka()
+ .endSpec();
+
+ // Do not set metadataVersion if it's not passed to method
+ if (initMetadataVersion != null) {
+ kafka
+ .editSpec()
+ .editKafka()
+ .withMetadataVersion(initMetadataVersion)
+ .endKafka()
+ .endSpec();
+ }
+
+ resourceManager.createResourceWithWait(testContext,
+ KafkaNodePoolTemplates.kafkaNodePoolWithControllerRoleAndPersistentStorage(TestConstants.CO_NAMESPACE, CONTROLLER_NODE_NAME, clusterName, controllerReplicas).build(),
+ KafkaNodePoolTemplates.kafkaNodePoolWithBrokerRoleAndPersistentStorage(TestConstants.CO_NAMESPACE, BROKER_NODE_NAME, clusterName, brokerReplicas).build(),
+ kafka.build()
+ );
+
+ // ##############################
+ // Attach clients which will continuously produce/consume messages to/from Kafka brokers during rolling update
+ // ##############################
+ // Setup topic, which has 3 replicas and 2 min.isr to see if producer will be able to work during rolling update
+ resourceManager.createResourceWithWait(testContext, KafkaTopicTemplates.topic(clusterName, continuousTopicName, 3, 3, 2, TestConstants.CO_NAMESPACE).build());
+ String producerAdditionConfiguration = "delivery.timeout.ms=20000\nrequest.timeout.ms=20000";
+
+ KafkaClients kafkaBasicClientJob = new KafkaClientsBuilder()
+ .withProducerName(producerName)
+ .withConsumerName(consumerName)
+ .withBootstrapAddress(KafkaResources.plainBootstrapAddress(clusterName))
+ .withTopicName(continuousTopicName)
+ .withMessageCount(continuousClientsMessageCount)
+ .withAdditionalConfig(producerAdditionConfiguration)
+ .withDelayMs(1000)
+ .build();
+
+ resourceManager.createResourceWithWait(testContext, kafkaBasicClientJob.producerStrimzi());
+ resourceManager.createResourceWithWait(testContext, kafkaBasicClientJob.consumerStrimzi());
+ // ##############################
+ }
+
+ LOGGER.info("Deployment of initial Kafka version (" + initialVersion.version() + ") complete");
+
+ String controllerVersionResult = KafkaResource.kafkaClient().inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName).get().getStatus().getKafkaVersion();
+ LOGGER.info("Pre-change Kafka version: " + controllerVersionResult);
+
+ controllerPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, controllerSelector);
+ brokerPods = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, brokerSelector);
+
+ LOGGER.info("Updating Kafka CR version field to " + newVersion.version());
+
+ // Change the version in Kafka CR
+ KafkaResource.replaceKafkaResourceInSpecificNamespace(clusterName, kafka -> {
+ kafka.getSpec().getKafka().setVersion(newVersion.version());
+ }, TestConstants.CO_NAMESPACE);
+
+ LOGGER.info("Waiting for readiness of new Kafka version (" + newVersion.version() + ") to complete");
+
+ // Wait for the controllers' version change roll
+ controllerPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, controllerSelector, controllerReplicas, controllerPods);
+ LOGGER.info("1st Controllers roll (image change) is complete");
+
+ // Wait for the brokers' version change roll
+ brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(TestConstants.CO_NAMESPACE, brokerSelector, brokerReplicas, brokerPods);
+ LOGGER.info("1st Brokers roll (image change) is complete");
+
+ String currentMetadataVersion = KafkaResource.kafkaClient().inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName).get().getSpec().getKafka().getMetadataVersion();
+
+ LOGGER.info("Deployment of Kafka (" + newVersion.version() + ") complete");
+
+ PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
+
+ String controllerPodName = kubeClient().listPodsByPrefixInName(TestConstants.CO_NAMESPACE, KafkaResource.getStrimziPodSetName(clusterName, CONTROLLER_NODE_NAME)).get(0).getMetadata().getName();
+ String brokerPodName = kubeClient().listPodsByPrefixInName(TestConstants.CO_NAMESPACE, KafkaResource.getStrimziPodSetName(clusterName, BROKER_NODE_NAME)).get(0).getMetadata().getName();
+
+ // Extract the Kafka version number from the jars in the lib directory
+ controllerVersionResult = KafkaUtils.getVersionFromKafkaPodLibs(controllerPodName);
+ LOGGER.info("Post-change Kafka version query returned: " + controllerVersionResult);
+
+ assertThat("Kafka container had version " + controllerVersionResult + " where " + newVersion.version() +
+ " was expected", controllerVersionResult, is(newVersion.version()));
+
+ // Extract the Kafka version number from the jars in the lib directory
+ String brokerVersionResult = KafkaUtils.getVersionFromKafkaPodLibs(brokerPodName);
+ LOGGER.info("Post-change Kafka version query returned: " + brokerVersionResult);
+
+ assertThat("Kafka container had version " + brokerVersionResult + " where " + newVersion.version() +
+ " was expected", brokerVersionResult, is(newVersion.version()));
+
+ if (isUpgrade && !sameMinorVersion) {
+ LOGGER.info("Updating Kafka config attribute 'metadataVersion' from '{}' to '{}' version", initialVersion.metadataVersion(), newVersion.metadataVersion());
+
+ KafkaResource.replaceKafkaResourceInSpecificNamespace(clusterName, kafka -> {
+ LOGGER.info("Kafka config before updating '{}'", kafka.getSpec().getKafka().toString());
+
+ kafka.getSpec().getKafka().setMetadataVersion(newVersion.metadataVersion());
+
+ LOGGER.info("Kafka config after updating '{}'", kafka.getSpec().getKafka().toString());
+ }, TestConstants.CO_NAMESPACE);
+
+ LOGGER.info("Metadata version changed, it doesn't require rolling update, so the Pods should be stable");
+ PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
+ assertFalse(RollingUpdateUtils.componentHasRolled(TestConstants.CO_NAMESPACE, controllerSelector, controllerPods));
+ assertFalse(RollingUpdateUtils.componentHasRolled(TestConstants.CO_NAMESPACE, brokerSelector, brokerPods));
+ }
+
+ if (!isUpgrade) {
+ LOGGER.info("Verifying that metadataVersion attribute updated correctly to version {}", initMetadataVersion);
+ assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName)
+ .get().getStatus().getKafkaMetadataVersion().contains(initMetadataVersion), is(true));
+ } else {
+ if (currentMetadataVersion != null) {
+ LOGGER.info("Verifying that metadataVersion attribute updated correctly to version {}", newVersion.metadataVersion());
+ assertThat(Crds.kafkaOperation(kubeClient().getClient()).inNamespace(TestConstants.CO_NAMESPACE).withName(clusterName)
+ .get().getStatus().getKafkaMetadataVersion().contains(newVersion.metadataVersion()), is(true));
+ }
+ }
+
+ LOGGER.info("Waiting till Kafka Cluster {}/{} with specified version {} has the same version in status and specification", TestConstants.CO_NAMESPACE, clusterName, newVersion.version());
+ KafkaUtils.waitUntilStatusKafkaVersionMatchesExpectedVersion(clusterName, TestConstants.CO_NAMESPACE, newVersion.version());
+ }
+}
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java
new file mode 100644
index 00000000000..cdebcfc6aa1
--- /dev/null
+++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziDowngradeST.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.systemtest.upgrade.kraft;
+
+import io.strimzi.systemtest.TestConstants;
+import io.strimzi.systemtest.storage.TestStorage;
+import io.strimzi.systemtest.upgrade.BundleVersionModificationData;
+import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion;
+import io.strimzi.systemtest.upgrade.VersionModificationDataLoader;
+import io.strimzi.systemtest.utils.StUtils;
+import io.strimzi.systemtest.utils.kubeUtils.objects.NamespaceUtils;
+import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.List;
+
+import static io.strimzi.systemtest.TestConstants.CO_NAMESPACE;
+import static io.strimzi.systemtest.TestConstants.INTERNAL_CLIENTS_USED;
+import static io.strimzi.systemtest.TestConstants.KRAFT_UPGRADE;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Class for testing downgrade process of Strimzi with its components when running in KRaft mode
+ * -> KRaft to KRaft downgrades
+ * Metadata for the following tests are collected from systemtest/src/test/resources/upgrade/BundleDowngrade.yaml
+ */
+@Tag(KRAFT_UPGRADE)
+@Disabled("The tests are currently disabled, as the KRaft to KRaft downgrade (with operator downgrade) is not handled in Strimzi 0.38.0")
+public class KRaftStrimziDowngradeST extends AbstractKRaftUpgradeST {
+ private static final Logger LOGGER = LogManager.getLogger(KRaftStrimziDowngradeST.class);
+ private final List bundleDowngradeMetadata = new VersionModificationDataLoader(VersionModificationDataLoader.ModificationType.BUNDLE_DOWNGRADE).getBundleUpgradeOrDowngradeDataList();
+
+ @ParameterizedTest(name = "testDowngradeStrimziVersion-{0}-{1}")
+ @MethodSource("io.strimzi.systemtest.upgrade.VersionModificationDataLoader#loadYamlDowngradeDataForKRaft")
+ @Tag(INTERNAL_CLIENTS_USED)
+ void testDowngradeStrimziVersion(String from, String to, BundleVersionModificationData parameters, ExtensionContext extensionContext) throws Exception {
+ assumeTrue(StUtils.isAllowOnCurrentEnvironment(parameters.getEnvFlakyVariable()));
+ assumeTrue(StUtils.isAllowedOnCurrentK8sVersion(parameters.getEnvMaxK8sVersion()));
+
+ LOGGER.debug("Running downgrade test from version {} to {}", from, to);
+ performDowngrade(parameters, extensionContext);
+ }
+
+ @Test
+ void testDowngradeOfKafkaConnectAndKafkaConnector(final ExtensionContext extensionContext) throws IOException {
+ final TestStorage testStorage = new TestStorage(extensionContext, TestConstants.CO_NAMESPACE);
+ final BundleVersionModificationData bundleDowngradeDataWithFeatureGates = bundleDowngradeMetadata.stream()
+ .filter(bundleMetadata -> bundleMetadata.getFeatureGatesBefore() != null && !bundleMetadata.getFeatureGatesBefore().isEmpty() ||
+ bundleMetadata.getFeatureGatesAfter() != null && !bundleMetadata.getFeatureGatesAfter().isEmpty()).toList().get(0);
+ UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(bundleDowngradeDataWithFeatureGates.getDeployKafkaVersion());
+
+ doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(extensionContext, bundleDowngradeDataWithFeatureGates, testStorage, upgradeKafkaVersion);
+ }
+
+ @SuppressWarnings("MethodLength")
+ private void performDowngrade(BundleVersionModificationData downgradeData, ExtensionContext extensionContext) throws IOException {
+ TestStorage testStorage = new TestStorage(extensionContext);
+
+ String lowerMetadataVersion = downgradeData.getProcedures().getMetadataVersion();
+ UpgradeKafkaVersion testUpgradeKafkaVersion = new UpgradeKafkaVersion(downgradeData.getDeployKafkaVersion(), lowerMetadataVersion);
+
+ // Setup env
+ // We support downgrade only when you didn't upgrade to new inter.broker.protocol.version and log.message.format.version
+ // https://strimzi.io/docs/operators/latest/full/deploying.html#con-target-downgrade-version-str
+
+ setupEnvAndUpgradeClusterOperator(extensionContext, downgradeData, testStorage, testUpgradeKafkaVersion, TestConstants.CO_NAMESPACE);
+
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ // Downgrade CO
+ changeClusterOperator(downgradeData, TestConstants.CO_NAMESPACE, extensionContext);
+
+ // Wait for Kafka cluster rolling update
+ waitForKafkaClusterRollingUpdate();
+
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ // Downgrade kafka
+ changeKafkaAndMetadataVersion(downgradeData, extensionContext);
+
+ // Verify that pods are stable
+ PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
+
+ checkAllImages(downgradeData, TestConstants.CO_NAMESPACE);
+
+ // Verify upgrade
+ verifyProcedure(downgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE);
+ }
+
+ @BeforeEach
+ void setupEnvironment() {
+ cluster.createNamespace(TestConstants.CO_NAMESPACE);
+ StUtils.copyImagePullSecrets(TestConstants.CO_NAMESPACE);
+ }
+
+ @AfterEach
+ void afterEach() {
+ deleteInstalledYamls(coDir, TestConstants.CO_NAMESPACE);
+ NamespaceUtils.deleteNamespaceWithWait(CO_NAMESPACE);
+ }
+}
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java
new file mode 100644
index 00000000000..c05063f5d34
--- /dev/null
+++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/kraft/KRaftStrimziUpgradeST.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.systemtest.upgrade.kraft;
+
+import io.strimzi.api.kafka.model.KafkaResources;
+import io.strimzi.api.kafka.model.KafkaTopic;
+import io.strimzi.systemtest.TestConstants;
+import io.strimzi.systemtest.annotations.IsolatedTest;
+import io.strimzi.systemtest.resources.ResourceManager;
+import io.strimzi.systemtest.resources.crd.KafkaResource;
+import io.strimzi.systemtest.storage.TestStorage;
+import io.strimzi.systemtest.upgrade.BundleVersionModificationData;
+import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion;
+import io.strimzi.systemtest.upgrade.VersionModificationDataLoader;
+import io.strimzi.systemtest.utils.RollingUpdateUtils;
+import io.strimzi.systemtest.utils.StUtils;
+import io.strimzi.systemtest.utils.TestKafkaVersion;
+import io.strimzi.systemtest.utils.kafkaUtils.KafkaTopicUtils;
+import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils;
+import io.strimzi.systemtest.utils.kubeUtils.controllers.DeploymentUtils;
+import io.strimzi.systemtest.utils.kubeUtils.objects.NamespaceUtils;
+import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static io.strimzi.systemtest.TestConstants.INTERNAL_CLIENTS_USED;
+import static io.strimzi.systemtest.TestConstants.KRAFT_UPGRADE;
+import static io.strimzi.test.k8s.KubeClusterResource.cmdKubeClient;
+import static io.strimzi.test.k8s.KubeClusterResource.kubeClient;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ * Class for testing upgrade process of Strimzi with its components when running in KRaft mode
+ * -> KRaft to KRaft upgrades
+ * Metadata for the following tests are collected from systemtest/src/test/resources/upgrade/BundleUpgrade.yaml
+ */
+@Tag(KRAFT_UPGRADE)
+public class KRaftStrimziUpgradeST extends AbstractKRaftUpgradeST {
+
+ private static final Logger LOGGER = LogManager.getLogger(KRaftStrimziUpgradeST.class);
+ private final BundleVersionModificationData acrossUpgradeData = new VersionModificationDataLoader(VersionModificationDataLoader.ModificationType.BUNDLE_UPGRADE).buildDataForUpgradeAcrossVersionsForKRaft();
+
+ @ParameterizedTest(name = "from: {0} (using FG <{2}>) to: {1} (using FG <{3}>) ")
+ @MethodSource("io.strimzi.systemtest.upgrade.VersionModificationDataLoader#loadYamlUpgradeDataForKRaft")
+ @Tag(INTERNAL_CLIENTS_USED)
+ void testUpgradeStrimziVersion(String fromVersion, String toVersion, String fgBefore, String fgAfter, BundleVersionModificationData upgradeData, ExtensionContext extensionContext) throws Exception {
+ assumeTrue(StUtils.isAllowOnCurrentEnvironment(upgradeData.getEnvFlakyVariable()));
+ assumeTrue(StUtils.isAllowedOnCurrentK8sVersion(upgradeData.getEnvMaxK8sVersion()));
+
+ performUpgrade(upgradeData, extensionContext);
+ }
+
+ @IsolatedTest
+ void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IOException {
+ UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion());
+ upgradeKafkaVersion.setVersion(null);
+
+ TestStorage testStorage = new TestStorage(extensionContext);
+
+ // Setup env
+ setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE);
+
+ Map controllerSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, controllerSelector);
+ Map brokerSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, brokerSelector);
+ Map eoSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, eoSelector);
+
+ // Make snapshots of all Pods
+ makeSnapshots();
+
+ // Upgrade CO
+ changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext);
+
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, controllerSelector, 3, controllerSnapshot);
+ RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, brokerSelector, 3, brokerSnapshot);
+ DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoSnapshot);
+
+ logPodImages(TestConstants.CO_NAMESPACE);
+ checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE);
+
+ // Verify that Pods are stable
+ PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
+ // Verify upgrade
+ verifyProcedure(acrossUpgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE);
+
+ String controllerPodName = kubeClient().listPodsByPrefixInName(TestConstants.CO_NAMESPACE, KafkaResource.getStrimziPodSetName(clusterName, CONTROLLER_NODE_NAME)).get(0).getMetadata().getName();
+ String brokerPodName = kubeClient().listPodsByPrefixInName(TestConstants.CO_NAMESPACE, KafkaResource.getStrimziPodSetName(clusterName, BROKER_NODE_NAME)).get(0).getMetadata().getName();
+
+ assertThat(KafkaUtils.getVersionFromKafkaPodLibs(controllerPodName), containsString(acrossUpgradeData.getProcedures().getVersion()));
+ assertThat(KafkaUtils.getVersionFromKafkaPodLibs(brokerPodName), containsString(acrossUpgradeData.getProcedures().getVersion()));
+ }
+
+ @IsolatedTest
+ void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion(ExtensionContext extensionContext) throws IOException {
+ TestStorage testStorage = new TestStorage(extensionContext);
+ UpgradeKafkaVersion upgradeKafkaVersion = UpgradeKafkaVersion.getKafkaWithVersionFromUrl(acrossUpgradeData.getFromKafkaVersionsUrl(), acrossUpgradeData.getStartingKafkaVersion());
+
+ // Setup env
+ setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE);
+
+ // Make snapshots of all Pods
+ makeSnapshots();
+
+ // Upgrade CO
+ changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext);
+
+ waitForKafkaClusterRollingUpdate();
+
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ // Upgrade kafka
+ changeKafkaAndMetadataVersion(acrossUpgradeData, true, extensionContext);
+
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE);
+
+ // Verify that Pods are stable
+ PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
+
+ // Verify upgrade
+ verifyProcedure(acrossUpgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE);
+ }
+
+ @IsolatedTest
+ void testUpgradeAcrossVersionsWithNoKafkaVersion(ExtensionContext extensionContext) throws IOException {
+ TestStorage testStorage = new TestStorage(extensionContext);
+
+ // Setup env
+ setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, null, TestConstants.CO_NAMESPACE);
+
+ // Upgrade CO
+ changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext);
+
+ // Wait till first upgrade finished
+ controllerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, controllerSelector, 3, controllerPods);
+ brokerPods = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, brokerSelector, 3, brokerPods);
+ eoPods = DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods);
+
+ LOGGER.info("Rolling to new images has finished!");
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ // Upgrade kafka
+ changeKafkaAndMetadataVersion(acrossUpgradeData, extensionContext);
+
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE);
+
+ // Verify that Pods are stable
+ PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
+
+ // Verify upgrade
+ verifyProcedure(acrossUpgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE);
+ }
+
+ @IsolatedTest
+ void testUpgradeOfKafkaConnectAndKafkaConnector(final ExtensionContext extensionContext) throws IOException {
+ final TestStorage testStorage = new TestStorage(extensionContext, TestConstants.CO_NAMESPACE);
+ final UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion(acrossUpgradeData.getDefaultKafka());
+
+ doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion);
+ }
+
+ private void performUpgrade(BundleVersionModificationData upgradeData, ExtensionContext extensionContext) throws IOException {
+ TestStorage testStorage = new TestStorage(extensionContext);
+
+ // leave empty, so the original Kafka version from appropriate Strimzi's yaml will be used
+ UpgradeKafkaVersion upgradeKafkaVersion = new UpgradeKafkaVersion();
+
+ // Setup env
+ setupEnvAndUpgradeClusterOperator(extensionContext, upgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE);
+
+ // Upgrade CO to HEAD
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ changeClusterOperator(upgradeData, TestConstants.CO_NAMESPACE, extensionContext);
+
+ if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeData.getDefaultKafkaVersionPerStrimzi())) {
+ waitForKafkaClusterRollingUpdate();
+ }
+
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ // Upgrade kafka
+ changeKafkaAndMetadataVersion(upgradeData, true, extensionContext);
+
+ logPodImages(TestConstants.CO_NAMESPACE);
+
+ checkAllImages(upgradeData, TestConstants.CO_NAMESPACE);
+
+ // Verify that Pods are stable
+ PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
+
+ // Verify upgrade
+ verifyProcedure(upgradeData, testStorage.getProducerName(), testStorage.getConsumerName(), TestConstants.CO_NAMESPACE);
+ }
+
+ @BeforeEach
+ void setupEnvironment() {
+ cluster.createNamespace(TestConstants.CO_NAMESPACE);
+ StUtils.copyImagePullSecrets(TestConstants.CO_NAMESPACE);
+ }
+
+ protected void afterEachMayOverride(ExtensionContext extensionContext) {
+ // delete all topics created in test
+ cmdKubeClient(TestConstants.CO_NAMESPACE).deleteAllByResource(KafkaTopic.RESOURCE_KIND);
+ KafkaTopicUtils.waitForTopicWithPrefixDeletion(TestConstants.CO_NAMESPACE, topicName);
+
+ ResourceManager.getInstance().deleteResources(extensionContext);
+ NamespaceUtils.deleteNamespaceWithWait(TestConstants.CO_NAMESPACE);
+ }
+}
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KafkaUpgradeDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java
similarity index 99%
rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/KafkaUpgradeDowngradeST.java
rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java
index c8c1528aab8..4951111093a 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/KafkaUpgradeDowngradeST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/KafkaUpgradeDowngradeST.java
@@ -2,7 +2,7 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
-package io.strimzi.systemtest.upgrade;
+package io.strimzi.systemtest.upgrade.regular;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.KafkaBuilder;
@@ -16,6 +16,7 @@
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates;
+import io.strimzi.systemtest.upgrade.AbstractUpgradeST;
import io.strimzi.systemtest.utils.ClientUtils;
import io.strimzi.systemtest.utils.RollingUpdateUtils;
import io.strimzi.systemtest.utils.TestKafkaVersion;
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/OlmUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/OlmUpgradeST.java
similarity index 96%
rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/OlmUpgradeST.java
rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/OlmUpgradeST.java
index 51c657ab23c..ef9cb9af731 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/OlmUpgradeST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/OlmUpgradeST.java
@@ -2,7 +2,7 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
-package io.strimzi.systemtest.upgrade;
+package io.strimzi.systemtest.upgrade.regular;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import io.strimzi.api.kafka.model.KafkaResources;
@@ -17,6 +17,9 @@
import io.strimzi.systemtest.resources.operator.configuration.OlmConfiguration;
import io.strimzi.systemtest.resources.operator.configuration.OlmConfigurationBuilder;
import io.strimzi.systemtest.storage.TestStorage;
+import io.strimzi.systemtest.upgrade.AbstractUpgradeST;
+import io.strimzi.systemtest.upgrade.OlmVersionModificationData;
+import io.strimzi.systemtest.upgrade.VersionModificationDataLoader;
import io.strimzi.systemtest.utils.ClientUtils;
import io.strimzi.systemtest.utils.FileUtils;
import io.strimzi.systemtest.utils.RollingUpdateUtils;
@@ -139,9 +142,9 @@ void testStrimziUpgrade(ExtensionContext extensionContext) throws IOException {
// ======== Cluster Operator upgrade ends ========
// ======== Kafka upgrade starts ========
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
changeKafkaAndLogFormatVersion(olmUpgradeData, extensionContext);
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
// ======== Kafka upgrade ends ========
// Wait for messages of previously created clients
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziDowngradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java
similarity index 93%
rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziDowngradeST.java
rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java
index e3577aa7d21..c7f03cd7c8c 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziDowngradeST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziDowngradeST.java
@@ -2,11 +2,15 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
-package io.strimzi.systemtest.upgrade;
+package io.strimzi.systemtest.upgrade.regular;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.annotations.KRaftNotSupported;
import io.strimzi.systemtest.storage.TestStorage;
+import io.strimzi.systemtest.upgrade.AbstractUpgradeST;
+import io.strimzi.systemtest.upgrade.BundleVersionModificationData;
+import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion;
+import io.strimzi.systemtest.upgrade.VersionModificationDataLoader;
import io.strimzi.systemtest.utils.StUtils;
import io.strimzi.systemtest.utils.kubeUtils.objects.NamespaceUtils;
import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
@@ -71,12 +75,12 @@ private void performDowngrade(BundleVersionModificationData downgradeData, Exten
// We support downgrade only when you didn't upgrade to new inter.broker.protocol.version and log.message.format.version
// https://strimzi.io/docs/operators/latest/full/deploying.html#con-target-downgrade-version-str
setupEnvAndUpgradeClusterOperator(extensionContext, downgradeData, testStorage, testUpgradeKafkaVersion, TestConstants.CO_NAMESPACE);
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
// Downgrade CO
changeClusterOperator(downgradeData, TestConstants.CO_NAMESPACE, extensionContext);
// Wait for Kafka cluster rolling update
waitForKafkaClusterRollingUpdate();
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
// Verify that pods are stable
PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
checkAllImages(downgradeData, TestConstants.CO_NAMESPACE);
diff --git a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziUpgradeST.java b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java
similarity index 92%
rename from systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziUpgradeST.java
rename to systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java
index fb1c7aad43f..dcba8f333b9 100644
--- a/systemtest/src/test/java/io/strimzi/systemtest/upgrade/StrimziUpgradeST.java
+++ b/systemtest/src/test/java/io/strimzi/systemtest/upgrade/regular/StrimziUpgradeST.java
@@ -2,7 +2,7 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
-package io.strimzi.systemtest.upgrade;
+package io.strimzi.systemtest.upgrade.regular;
import io.strimzi.api.kafka.model.KafkaResources;
import io.strimzi.api.kafka.model.KafkaTopic;
@@ -10,6 +10,10 @@
import io.strimzi.systemtest.annotations.KRaftNotSupported;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.storage.TestStorage;
+import io.strimzi.systemtest.upgrade.AbstractUpgradeST;
+import io.strimzi.systemtest.upgrade.BundleVersionModificationData;
+import io.strimzi.systemtest.upgrade.UpgradeKafkaVersion;
+import io.strimzi.systemtest.upgrade.VersionModificationDataLoader;
import io.strimzi.systemtest.utils.RollingUpdateUtils;
import io.strimzi.systemtest.utils.StUtils;
import io.strimzi.systemtest.utils.TestKafkaVersion;
@@ -74,7 +78,7 @@ void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IO
setupEnvAndUpgradeClusterOperator(extensionContext, acrossUpgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE);
Map zooSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, zkSelector);
- Map kafkaSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, zkSelector);
+ Map kafkaSnapshot = PodUtils.podSnapshot(TestConstants.CO_NAMESPACE, kafkaSelector);
Map eoSnapshot = DeploymentUtils.depSnapshot(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName));
// Make snapshots of all Pods
@@ -83,13 +87,13 @@ void testUpgradeKafkaWithoutVersion(ExtensionContext extensionContext) throws IO
// Upgrade CO
changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext);
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, zkSelector, 3, zooSnapshot);
- RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, zkSelector, 3, kafkaSnapshot);
+ RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(TestConstants.CO_NAMESPACE, kafkaSelector, 3, kafkaSnapshot);
DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoSnapshot);
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE);
// Verify that Pods are stable
@@ -112,10 +116,10 @@ void testUpgradeAcrossVersionsWithUnsupportedKafkaVersion(ExtensionContext exten
// Upgrade CO
changeClusterOperator(acrossUpgradeData, TestConstants.CO_NAMESPACE, extensionContext);
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
// Upgrade kafka
changeKafkaAndLogFormatVersion(acrossUpgradeData, extensionContext);
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE);
// Verify that Pods are stable
PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
@@ -136,10 +140,10 @@ void testUpgradeAcrossVersionsWithNoKafkaVersion(ExtensionContext extensionConte
eoPods = DeploymentUtils.waitTillDepHasRolled(TestConstants.CO_NAMESPACE, KafkaResources.entityOperatorDeploymentName(clusterName), 1, eoPods);
LOGGER.info("Rolling to new images has finished!");
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
// Upgrade kafka
changeKafkaAndLogFormatVersion(acrossUpgradeData, extensionContext);
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
checkAllImages(acrossUpgradeData, TestConstants.CO_NAMESPACE);
// Verify that Pods are stable
PodUtils.verifyThatRunningPodsAreStable(TestConstants.CO_NAMESPACE, clusterName);
@@ -164,17 +168,17 @@ private void performUpgrade(BundleVersionModificationData upgradeData, Extension
setupEnvAndUpgradeClusterOperator(extensionContext, upgradeData, testStorage, upgradeKafkaVersion, TestConstants.CO_NAMESPACE);
// Upgrade CO to HEAD
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
changeClusterOperator(upgradeData, TestConstants.CO_NAMESPACE, extensionContext);
if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeData.getDefaultKafkaVersionPerStrimzi())) {
waitForKafkaClusterRollingUpdate();
}
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
// Upgrade kafka
changeKafkaAndLogFormatVersion(upgradeData, extensionContext);
- logPodImages(clusterName);
+ logPodImages(TestConstants.CO_NAMESPACE);
checkAllImages(upgradeData, TestConstants.CO_NAMESPACE);
// Verify that Pods are stable
diff --git a/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml b/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml
index 5948ba11dcf..06b1757a5ba 100644
--- a/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml
+++ b/systemtest/src/test/resources/upgrade/BundleDowngrade.yaml
@@ -70,3 +70,4 @@
flakyEnvVariable: none
reason: Test is working on all environment used by QE.
featureGatesBefore: "-KafkaNodePools"
+ featureGatesAfter: ""