Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the case where the process group gets removed without the addresses being included #2147

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions controllers/remove_process_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (u removeProcessGroups) reconcile(ctx context.Context, r *FoundationDBClust
removedProcessGroups := r.removeProcessGroups(ctx, logger, cluster, zoneRemovals, zonedRemovals[removals.TerminatingZone])
err = includeProcessGroup(ctx, logger, r, cluster, removedProcessGroups, status, adminClient)
if err != nil {
return &requeue{curError: err, delayedRequeue: true}
// If the inclusion is blocked or another issues happened we will retry in 60 seconds.
return &requeue{curError: err, delayedRequeue: true, delay: 60 * time.Second}
}

return nil
Expand Down Expand Up @@ -214,7 +215,7 @@ func confirmRemoval(ctx context.Context, logger logr.Logger, r *FoundationDBClus
return false, false, nil
}

// Pod is in terminating state so we don't want to block but we also don't want to include it
// Pod is in terminating state so we don't want to block, but we also don't want to include it
canBeIncluded = false
}

Expand All @@ -231,7 +232,7 @@ func confirmRemoval(ctx context.Context, logger logr.Logger, r *FoundationDBClus
return false, false, nil
}

// PVC is in terminating state so we don't want to block but we also don't want to include it
// PVC is in terminating state so we don't want to block, but we also don't want to include it
canBeIncluded = false
} else if len(pvcs.Items) > 1 {
return false, false, fmt.Errorf("multiple PVCs found for cluster %s, processGroupID %s", cluster.Name, processGroup.ProcessGroupID)
Expand All @@ -251,20 +252,27 @@ func confirmRemoval(ctx context.Context, logger logr.Logger, r *FoundationDBClus
return false, false, nil
}

// Service is in terminating state so we don't want to block but we also don't want to include it
// Service is in terminating state so we don't want to block, but we also don't want to include it
canBeIncluded = false
}

return true, canBeIncluded, nil
}

func includeProcessGroup(ctx context.Context, logger logr.Logger, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool, status *fdbv1beta2.FoundationDBStatus, adminClient fdbadminclient.AdminClient) error {
fdbProcessesToInclude, err := getProcessesToInclude(logger, cluster, removedProcessGroups, status)
fdbProcessesToInclude, newProcessGroups, err := getProcessesToInclude(logger, cluster, removedProcessGroups, status)
if err != nil {
return err
}

if len(fdbProcessesToInclude) == 0 {
// In case that the operator was removing a process group without exclusion.
// We can update the process groups at this stage, as no other processes must be included.
if len(cluster.Status.ProcessGroups) != len(newProcessGroups) {
cluster.Status.ProcessGroups = newProcessGroups
return r.updateOrApply(ctx, cluster)
}

return nil
}

Expand Down Expand Up @@ -293,59 +301,63 @@ func includeProcessGroup(ctx context.Context, logger logr.Logger, r *FoundationD
return err
}

// Reset the SecondsSinceLastRecovered sine the operator just included some processes, which will cause a recovery.
// Reset the SecondsSinceLastRecovered since the operator just included some processes, which will cause a recovery.
status.Cluster.RecoveryState.SecondsSinceLastRecovered = 0.0
// Update the process group list and remove all removed and included process groups.
cluster.Status.ProcessGroups = newProcessGroups
johscheuer marked this conversation as resolved.
Show resolved Hide resolved

return r.updateOrApply(ctx, cluster)
}

func getProcessesToInclude(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool, status *fdbv1beta2.FoundationDBStatus) ([]fdbv1beta2.ProcessAddress, error) {
func getProcessesToInclude(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool, status *fdbv1beta2.FoundationDBStatus) ([]fdbv1beta2.ProcessAddress, []*fdbv1beta2.ProcessGroupStatus, error) {
fdbProcessesToInclude := make([]fdbv1beta2.ProcessAddress, 0)

if len(removedProcessGroups) == 0 {
return fdbProcessesToInclude, nil
return fdbProcessesToInclude, nil, nil
}

excludedServers, err := fdbstatus.GetExclusions(status)
if err != nil {
return fdbProcessesToInclude, fmt.Errorf("unable to get excluded servers from status, %w", err)
return fdbProcessesToInclude, nil, fmt.Errorf("unable to get excluded servers from status, %w", err)
}
excludedServersMap := make(map[string]fdbv1beta2.None, len(excludedServers))
for _, excludedServer := range excludedServers {
excludedServersMap[excludedServer.String()] = fdbv1beta2.None{}
}

processGroups := cluster.Status.DeepCopy().ProcessGroups
idx := 0
for _, processGroup := range cluster.Status.ProcessGroups {
for _, processGroup := range processGroups {
if processGroup.IsMarkedForRemoval() && removedProcessGroups[processGroup.ProcessGroupID] {
foundInExcludedServerList := false
exclusionString := processGroup.GetExclusionString()
if _, ok := excludedServersMap[exclusionString]; ok {
fdbProcessesToInclude = append(fdbProcessesToInclude, fdbv1beta2.ProcessAddress{StringAddress: exclusionString})
foundInExcludedServerList = true
}

for _, pAddr := range processGroup.Addresses {
if _, ok := excludedServersMap[pAddr]; ok {
fdbProcessesToInclude = append(fdbProcessesToInclude, fdbv1beta2.ProcessAddress{IPAddress: net.ParseIP(pAddr)})
foundInExcludedServerList = true
}
}
if !foundInExcludedServerList {

if !foundInExcludedServerList && !processGroup.ExclusionSkipped {
// This means that the process is marked for exclusion and is also removed in the previous step but is missing
// its entry in the excluded servers in the status. This should not throw an error as this will block the
// inclusion for other processes, but we should have a record of this event happening in the logs.
logger.Info("processGroup is included but is missing from excluded server list", "processGroup", processGroup)
logger.Info("processGroup should be included but is missing from excluded server list", "processGroup", processGroup)
}

continue
}
cluster.Status.ProcessGroups[idx] = processGroup

processGroups[idx] = processGroup
idx++
}

// Remove the trailing duplicates.
cluster.Status.ProcessGroups = cluster.Status.ProcessGroups[:idx]

return fdbProcessesToInclude, nil
return fdbProcessesToInclude, processGroups[:idx], nil
}

func (r *FoundationDBClusterReconciler) getProcessGroupsToRemove(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, remainingMap map[string]bool, cordSet map[string]fdbv1beta2.None) (bool, bool, []*fdbv1beta2.ProcessGroupStatus) {
Expand Down
52 changes: 29 additions & 23 deletions controllers/remove_process_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,11 @@ var _ = Describe("remove_process_groups", func() {

When("including no process", func() {
It("should not include any process", func() {
processesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(processesToInclude)).To(Equal(0))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(16))
Expect(processesToInclude).To(BeEmpty())
Expect(newProcessGroups).To(BeEmpty())
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})

Expand All @@ -580,11 +581,12 @@ var _ = Describe("remove_process_groups", func() {
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(1))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal("1.1.1.1"))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(15))
Expect(processesToInclude).To(HaveLen(1))
Expect(fdbv1beta2.ProcessAddressesString(processesToInclude, " ")).To(Equal("1.1.1.1"))
Expect(newProcessGroups).To(HaveLen(15))
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})
})
Expand All @@ -596,10 +598,11 @@ var _ = Describe("remove_process_groups", func() {

When("including no process", func() {
It("should not include any process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(0))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(16))
Expect(processesToInclude).To(BeEmpty())
Expect(newProcessGroups).To(BeEmpty())
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})

Expand All @@ -615,11 +618,12 @@ var _ = Describe("remove_process_groups", func() {
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(1))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal(removedProcessGroup.GetExclusionString()))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(15))
Expect(processesToInclude).To(HaveLen(1))
Expect(fdbv1beta2.ProcessAddressesString(processesToInclude, " ")).To(Equal(removedProcessGroup.GetExclusionString()))
Expect(newProcessGroups).To(HaveLen(15))
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})

Expand All @@ -636,12 +640,13 @@ var _ = Describe("remove_process_groups", func() {
removedProcessGroups[removedProcessGroup.ProcessGroupID] = true
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
It("should include two process addresses", func() {
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(2))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal(fmt.Sprintf("%s %s", removedProcessGroup.GetExclusionString(), removedProcessGroup.Addresses[0])))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(15))
Expect(processesToInclude).To(HaveLen(2))
Expect(fdbv1beta2.ProcessAddressesString(processesToInclude, " ")).To(Equal(fmt.Sprintf("%s %s", removedProcessGroup.GetExclusionString(), removedProcessGroup.Addresses[0])))
Expect(newProcessGroups).To(HaveLen(15))
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})

Expand All @@ -663,11 +668,12 @@ var _ = Describe("remove_process_groups", func() {
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
processesToInclude, newProcessGroups, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(1))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal(removedProcessGroup2.GetExclusionString()))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(14))
Expect(processesToInclude).To(HaveLen(1))
Expect(fdbv1beta2.ProcessAddressesString(processesToInclude, " ")).To(Equal(removedProcessGroup2.GetExclusionString()))
Expect(newProcessGroups).To(HaveLen(14))
Expect(cluster.Status.ProcessGroups).To(HaveLen(16))
})
})
})
Expand Down
4 changes: 0 additions & 4 deletions setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ func StartManager(
clusterReconciler.MaintenanceListWaitDuration = operatorOpts.MaintenanceListWaitDuration
clusterReconciler.MinimumRecoveryTimeForInclusion = operatorOpts.MinimumRecoveryTimeForInclusion
clusterReconciler.MinimumRecoveryTimeForExclusion = operatorOpts.MinimumRecoveryTimeForExclusion
clusterReconciler.MaintenanceListStaleDuration = operatorOpts.MaintenanceListStaleDuration
clusterReconciler.MaintenanceListWaitDuration = operatorOpts.MaintenanceListWaitDuration
clusterReconciler.MinimumRecoveryTimeForInclusion = operatorOpts.MinimumRecoveryTimeForInclusion
clusterReconciler.MinimumRecoveryTimeForExclusion = operatorOpts.MinimumRecoveryTimeForExclusion
clusterReconciler.ClusterLabelKeyForNodeTrigger = strings.Trim(operatorOpts.ClusterLabelKeyForNodeTrigger, "\"")
clusterReconciler.Namespace = operatorOpts.WatchNamespace

Expand Down
Loading