Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zk kraft migration #100

Merged
merged 8 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,13 +1102,18 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string {

// GetBrokerLabels returns the labels that are applied to broker pods
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string {
kraftLabels := make(map[string]string, 0)
var kraftLabels map[string]string
if kRaftMode {
kraftLabels = map[string]string{
ProcessRolesKey: strings.Join(bConfig.Roles, "_"),
IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()),
IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()),
}
} else { // in ZK mode -> new labels for backward compatibility for the headless service when going from ZK to KRaft
kraftLabels = map[string]string{
IsControllerNodeKey: fmt.Sprintf("%t", false),
IsBrokerNodeKey: fmt.Sprintf("%t", true),
}
}
return util.MergeLabels(
bConfig.BrokerLabels,
Expand Down
10 changes: 6 additions & 4 deletions api/v1beta1/kafkacluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,12 @@ func TestGetBrokerLabels(t *testing.T) {
{
testName: "Labels in zookeeper mode",
expectedLabels: map[string]string{
AppLabelKey: expectedDefaultLabelApp,
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
"test_label_key": "test_label_value",
AppLabelKey: expectedDefaultLabelApp,
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
KafkaCRLabelKey: expectedKafkaCRName,
IsBrokerNodeKey: "true",
IsControllerNodeKey: "false",
"test_label_key": "test_label_value",
},
brokerConfig: &BrokerConfig{
Roles: nil,
Expand Down
8 changes: 8 additions & 0 deletions controllers/tests/kafkacluster_controller_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ func expectKafkaBrokerPod(ctx context.Context, kafkaCluster *v1beta1.KafkaCluste
Value: "/kafka-logs,/ephemeral-dir1",
},
))

// when CLUSTER_ID is set as an ENV, verify the status is not randomly generated
for _, env := range kafkaCluster.Spec.Envs {
if env.Name == "CLUSTER_ID" {
Expect(kafkaCluster.Status.ClusterID).To(Equal("test-cluster-id"))
break
}
}
} else {
Expect(container.Env).To(ConsistOf(
// the exact value is not interesting
Expand Down
17 changes: 17 additions & 0 deletions controllers/tests/kafkacluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,23 @@ var _ = Describe("KafkaCluster", func() {
expectCruiseControl(ctx, kafkaClusterKRaft)
})
})
When("configuring Kafka cluster in KRaft mode with CLUSTER_ID env var", func() {
BeforeEach(func() {
loadBalancerServiceName = fmt.Sprintf("envoy-loadbalancer-test-%s", kafkaCluster.Name)
externalListenerHostName = "test.host.com"

loadBalancerServiceNameKRaft = fmt.Sprintf("envoy-loadbalancer-test-%s", kafkaClusterKRaft.Name)
externalListenerHostNameKRaft = "test.host.com"
kafkaClusterKRaft.Spec.Envs = append(kafkaClusterKRaft.Spec.Envs, corev1.EnvVar{
Name: "CLUSTER_ID",
Value: "test-cluster-id",
})
})

It("should reconciles objects properly", func(ctx SpecContext) {
expectKafka(ctx, kafkaClusterKRaft, count)
})
})
When("configuring one ingress envoy controller config inside the external listener without bindings", func() {
BeforeEach(func() {
testExternalListener := kafkaCluster.Spec.ListenersConfig.ExternalListeners[0]
Expand Down
67 changes: 53 additions & 14 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
// Cruise Control metrics reporter configuration
r.configCCMetricsReporter(broker, config, clientPass, log)

brokerReadOnlyConfig := getBrokerReadOnlyConfig(broker, r.KafkaCluster, log)

// Kafka Broker configurations
if r.KafkaCluster.Spec.KRaftMode {
configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log)
configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log, brokerReadOnlyConfig)
} else {
configureBrokerZKMode(broker.Id, r.KafkaCluster, config, serverPasses, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log)
}
Expand Down Expand Up @@ -148,23 +150,42 @@ func (r *Reconciler) configCCMetricsReporter(broker v1beta1.Broker, config *prop
}

func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties,
quorumVoters []string, serverPasses map[string]string, extListenerStatuses, intListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) {
if err := config.Set(kafkautils.KafkaConfigNodeID, brokerID); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigNodeID))
}
quorumVoters []string, serverPasses map[string]string, extListenerStatuses, intListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger,
brokerReadOnlyConfig *properties.Properties) {
controllerListenerName := generateControlPlaneListener(kafkaCluster.Spec.ListenersConfig.InternalListeners)

if err := config.Set(kafkautils.KafkaConfigProcessRoles, bConfig.Roles); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigProcessRoles))
}
// when kRaft is enabled for the cluster, brokers can still be configured to use zookeeper for metadata.
// this is to support the zk to kRaft migration where both zookeeper and kRaft controllers are running in parallel.
if shouldUseKRaftModeForBroker(brokerReadOnlyConfig) {
if err := config.Set(kafkautils.KafkaConfigNodeID, brokerID); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigNodeID))
}

if err := config.Set(kafkautils.KafkaConfigControllerQuorumVoters, quorumVoters); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerQuorumVoters))
if err := config.Set(kafkautils.KafkaConfigProcessRoles, bConfig.Roles); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigProcessRoles))
}
} else { // use zk mode for broker.
// when in zk mode, "broker.id" and "zookeeper.connect" are configured so it will communicate with zookeeper
// control.plane.listener.name will not be set in zk mode. There for it will default to the interbroker listener.
if err := config.Set(kafkautils.KafkaConfigBrokerID, brokerID); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID))
}

if err := config.Set(kafkautils.KafkaConfigZooKeeperConnect, zookeeperutils.PrepareConnectionAddress(
kafkaCluster.Spec.ZKAddresses, kafkaCluster.Spec.GetZkPath())); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigZooKeeperConnect))
}
}

controllerListenerName := generateControlPlaneListener(kafkaCluster.Spec.ListenersConfig.InternalListeners)
if controllerListenerName != "" {
if err := config.Set(kafkautils.KafkaConfigControllerListenerName, controllerListenerName); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerListenerName))
if shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig) {
if err := config.Set(kafkautils.KafkaConfigControllerQuorumVoters, quorumVoters); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerQuorumVoters))
}

if controllerListenerName != "" {
if err := config.Set(kafkautils.KafkaConfigControllerListenerName, controllerListenerName); err != nil {
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerListenerName))
}
}
}

Expand Down Expand Up @@ -214,6 +235,20 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf
}
}

// Returns true by default (not in migration configured) OR when MigrationBrokerKRaftMode is set and 'true'.
// this is to support the zk to kRaft migration
func shouldUseKRaftModeForBroker(brokerReadOnlyConfig *properties.Properties) bool {
migrationBrokerKRaftMode, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerKRaftMode)
return !found || migrationBrokerKRaftMode.Value() == "true"
}

// Returns true by default (not in migration) OR when MigrationBrokerControllerQuorumConfigEnabled is set and 'true'.
// this is to support the zk to kRaft migration
func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.Properties) bool {
migrationBrokerControllerQuorumConfigEnabled, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerControllerQuorumConfigEnabled)
return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true"
}

func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties,
serverPasses map[string]string, extListenerStatuses, intListenerStatuses,
controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) {
Expand Down Expand Up @@ -547,6 +582,10 @@ func (r Reconciler) generateBrokerConfig(broker v1beta1.Broker, brokerConfig *v1
finalBrokerConfig.Merge(opGenConf)
}

// Remove the migration broker configuration since its only used as flags to derive other configs
finalBrokerConfig.Delete(kafkautils.MigrationBrokerControllerQuorumConfigEnabled)
finalBrokerConfig.Delete(kafkautils.MigrationBrokerKRaftMode)

finalBrokerConfig.Sort()

return finalBrokerConfig.String()
Expand Down
Loading
Loading