Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to batch the exclusions of transaction system processes together #2158

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion controllers/exclude_processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,25 @@ func (e excludeProcesses) reconcile(ctx context.Context, r *FoundationDBClusterR
return &requeue{curError: err, delayedRequeue: true}
}

// transactionSystemExclusionAllowed will keep track if the exclusion is allowed and if the operator is allowed to
// exclude processes from the transaction system. If multiple processes from different processes classes that are part
// of the transaction system should be excluded, the operator will expect that the exclusion is allowed for all
// transaction system processes. The idea here is to reduce the number of recoveries during transaction system
// migrations as the stateless pods are often created much faster than the log pod as the stateless pods don't have
// to wait for the storage provisioning.
transactionSystemExclusionAllowed := true
desiredProcessesMap := desiredProcesses.Map()
for processClass := range fdbProcessesToExcludeByClass {
contextLogger := logger.WithValues("processClass", processClass)
ongoingExclusions := ongoingExclusionsByClass[processClass]
processesToExclude := fdbProcessesToExcludeByClass[processClass]

allowedExclusions, missingProcesses := getAllowedExclusionsAndMissingProcesses(contextLogger, cluster, processClass, desiredProcessesMap[processClass], ongoingExclusions, r.InSimulation)
// TODO (johscheuer): Should we also batch exclusions for storage servers? Those should be rare compared to replacements in the transaction system.
if allowedExclusions <= 0 {
if processClass.IsTransaction() {
transactionSystemExclusionAllowed = false
}
contextLogger.Info("Waiting for missing processes before continuing with the exclusion", "missingProcesses", missingProcesses, "addressesToExclude", processesToExclude, "allowedExclusions", allowedExclusions, "ongoingExclusions", ongoingExclusions)
continue
}
Expand All @@ -134,7 +145,7 @@ func (e excludeProcesses) reconcile(ctx context.Context, r *FoundationDBClusterR
allowedExclusions = len(processesToExclude)
}

// TODO: As a next step we could exclude transaction (log + stateless) processes together and exclude
// TODO (johscheuer): As a next step we could exclude transaction (log + stateless) processes together and exclude
// storage processes with a separate call. This would make sure that no storage checks will block
// the exclusion of transaction processes.

Expand All @@ -149,6 +160,15 @@ func (e excludeProcesses) reconcile(ctx context.Context, r *FoundationDBClusterR
}
}

// In case that there are processes from different transaction process classes, we expect that the operator is allowed
// to exclude processes from all the different process classes. If not the operator will delay the exclusion.
if !transactionSystemExclusionAllowed {
return &requeue{
message: "more exclusions needed but not allowed, have to wait until new processes for the transaction system are up to reduce number of recoveries.",
delayedRequeue: true,
}
}

var coordinatorExcluded bool
for _, excludeProcess := range fdbProcessesToExclude {
excludeString := excludeProcess.String()
Expand Down
91 changes: 91 additions & 0 deletions controllers/exclude_processes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,97 @@ var _ = Describe("exclude_processes", func() {
})
})
})

When("transaction system processes should be excluded", func() {
When("a stateless and a log process should be excluded", func() {
BeforeEach(func() {
var pickedStateless, pickedLog bool

_, processGroupIDs, err := cluster.GetCurrentProcessGroupsAndProcessCounts()
Expect(err).NotTo(HaveOccurred())
cluster.Status.ProcessGroups = append(cluster.Status.ProcessGroups, fdbv1beta2.NewProcessGroupStatus(cluster.GetNextRandomProcessGroupID(fdbv1beta2.ProcessClassLog, processGroupIDs[fdbv1beta2.ProcessClassLog]), fdbv1beta2.ProcessClassLog, nil))
cluster.Status.ProcessGroups[len(cluster.Status.ProcessGroups)-1].ProcessGroupConditions = nil
cluster.Status.ProcessGroups = append(cluster.Status.ProcessGroups, fdbv1beta2.NewProcessGroupStatus(cluster.GetNextRandomProcessGroupID(fdbv1beta2.ProcessClassStateless, processGroupIDs[fdbv1beta2.ProcessClassStateless]), fdbv1beta2.ProcessClassStateless, nil))
cluster.Status.ProcessGroups[len(cluster.Status.ProcessGroups)-1].ProcessGroupConditions = nil

markedForRemoval := make([]fdbv1beta2.ProcessGroupID, 0, 2)
for _, processGroup := range cluster.Status.ProcessGroups {
if !processGroup.ProcessClass.IsTransaction() || processGroup.ProcessClass == fdbv1beta2.ProcessClassClusterController {
continue
}

if pickedLog && pickedStateless {
break
}

if !processGroup.ProcessClass.IsLogProcess() {
if pickedStateless {
continue
}
pickedStateless = true
}

if processGroup.ProcessClass.IsLogProcess() {
if pickedLog {
continue
}
pickedLog = true
}

processGroup.MarkForRemoval()
markedForRemoval = append(markedForRemoval, processGroup.ProcessGroupID)
}

Expect(markedForRemoval).To(HaveLen(2))
})

When("no processes are missing", func() {
It("should exclude the process", func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())

Expect(req).To(BeNil())
Expect(adminClient.ExcludedAddresses).To(HaveLen(2))
})
})

When("a transaction process is missing", func() {
var missingProcssGroup *fdbv1beta2.ProcessGroupStatus

BeforeEach(func() {
_, processGroupIDs, err := cluster.GetCurrentProcessGroupsAndProcessCounts()
Expect(err).NotTo(HaveOccurred())

missingProcssGroup = fdbv1beta2.NewProcessGroupStatus(cluster.GetNextRandomProcessGroupID(fdbv1beta2.ProcessClassLog, processGroupIDs[fdbv1beta2.ProcessClassLog]), fdbv1beta2.ProcessClassLog, nil)
cluster.Status.ProcessGroups = append(cluster.Status.ProcessGroups, missingProcssGroup)
// We have to set InSimulation here to false, otherwise the MissingProcess timestamp will be ignored.
clusterReconciler.InSimulation = false
})

It("should not exclude the process", func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())

Expect(req).NotTo(BeNil())
Expect(adminClient.ExcludedAddresses).To(BeEmpty())
})

When("the transaction process is missing for more than 10 minutes", func() {
BeforeEach(func() {
missingProcssGroup.ProcessGroupConditions[0].Timestamp = time.Now().Add(-10 * time.Minute).Unix()
})

It("should exclude the process", func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())

Expect(req).To(BeNil())
Expect(adminClient.ExcludedAddresses).To(HaveLen(2))
})
})
})
})
})
})
})

Expand Down
Loading