Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backup): delete backup in the backupstore asynchronously #3038

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 145 additions & 23 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -15,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/controller"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,21 +35,30 @@ import (
)

const (
MessageTypeReconcileInfo = "info"
MessageTypeReconcileInfo = "info"
MessageTypeReconcileError = "error"
)

const (
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
WaitForBackupDeletionIsCompleteMessage = "Wait for backup %v to be deleted"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
FailedToDeleteBackupMessage = "Failed to delete the backup %v in the backupstore, err %v"
NoDeletionInProgressRecordMessage = "No deletion in progress record, retry the deletion command"
)

type DeletingStatus struct {
State longhorn.BackupState
ErrorMessage string
}

type BackupController struct {
*baseController

// which namespace controller is running with
// Which namespace controller is running with
namespace string
// use as the OwnerID of the controller
// Use as the OwnerID of the controller
controllerID string

kubeClient clientset.Interface
Expand All @@ -63,6 +72,13 @@ type BackupController struct {
cacheSyncs []cache.InformerSynced

proxyConnCounter util.Counter

// Use to track the result of the deletion command.
// Also used to track if controller crashes after the deletion command is triggered.
deletingMapLock *sync.Mutex
inProgressDeletingMap map[string]*DeletingStatus

deletingBackoff *flowcontrol.Backoff
}

func NewBackupController(
Expand Down Expand Up @@ -96,6 +112,11 @@ func NewBackupController(
eventRecorder: eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "longhorn-backup-controller"}),

proxyConnCounter: proxyConnCounter,

deletingMapLock: &sync.Mutex{},
inProgressDeletingMap: map[string]*DeletingStatus{},

deletingBackoff: flowcontrol.NewBackOff(time.Minute*1, time.Hour*24),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make time.Minute*1, time.Hour*24 as a constant

}

var err error
Expand Down Expand Up @@ -269,6 +290,10 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return errors.Wrap(err, "failed to get backup volume name")
}

if backup.Status.Messages == nil {
backup.Status.Messages = map[string]string{}
}

// Examine DeletionTimestamp to determine if object is under deletion
if !backup.DeletionTimestamp.IsZero() {
if err := bc.handleAttachmentTicketDeletion(backup, backupVolumeName); err != nil {
Expand All @@ -280,23 +305,18 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return err
}

if backupTarget.Spec.BackupTargetURL != "" &&
backupVolume != nil && backupVolume.DeletionTimestamp == nil {
if backupTarget.Spec.BackupTargetURL != "" && backupVolume != nil && backupVolume.DeletionTimestamp == nil {
backupTargetClient, err := newBackupTargetClientFromDefaultEngineImage(bc.ds, backupTarget)
if err != nil {
log.WithError(err).Warn("Failed to init backup target clients")
return nil // Ignore error to prevent enqueue
}
backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)

if unused, err := bc.isBackupNotBeingUsedForVolumeRestore(backup.Name, backupVolumeName); !unused {
log.WithError(err).Warn("Failed to delete remote backup")
backupDeleted := bc.handleBackupDeletionInBackupStore(backup, backupVolumeName, backupURL, backupTargetClient)
if !backupDeleted {
return nil
}
Comment on lines +317 to 319
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR introduces deletingBackoff for handling deletion backoff. Since handleErr() can also achieve backoff after returning an error if the backup has not been deleted yet. Can we remove deletingBackoff and leverage handleErr() instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to do that, it is not an error after all.
Besides, inside the handleErr we even print the logs Failed to sync Longhorn backup
I think that might confuse the users.

We actually use this backoff a lot in our controllers

Copy link
Contributor Author

@ChanYiLin ChanYiLin Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point is that, we can control the backOff period with this backoff
But with using k8s workqueue, we can't set different backOff time for this case from other error cases.


backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
if err := backupTargetClient.BackupDelete(backupURL, backupTargetClient.Credential); err != nil {
return errors.Wrap(err, "failed to delete remote backup")
}
}

// Request backup_volume_controller to reconcile BackupVolume immediately if it's the last backup
Expand Down Expand Up @@ -406,10 +426,6 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return err
}

if backup.Status.Messages == nil {
backup.Status.Messages = map[string]string{}
}

monitor, err := bc.checkMonitor(backup, volume, backupTarget)
if err != nil {
if backup.Status.State == longhorn.BackupStateError {
Expand Down Expand Up @@ -454,8 +470,8 @@ func (bc *BackupController) reconcile(backupName string) (err error) {

backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential)
if err != nil {
if !strings.Contains(err.Error(), "in progress") {
if err != nil && !types.ErrorIsNotFound(err) {
if !types.ErrorIsInProgress(err) {
log.WithError(err).Error("Error inspecting backup config")
}
return nil // Ignore error to prevent enqueue
Expand Down Expand Up @@ -765,6 +781,18 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
}
}

clusterBackups, err := bc.ds.ListBackupsWithBackupVolumeName(volume.Name)
if err != nil {
return nil, errors.Wrapf(err, "failed to list backups in the cluster")
}
for _, b := range clusterBackups {
if b.Status.State == longhorn.BackupStateDeleting {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForBackupDeletionIsCompleteMessage, b.Name)
return nil, fmt.Errorf("waiting for the backup %v to be deleted before enabling backup monitor", b.Name)
}
}

// Enable the backup monitor
monitor, err := bc.enableBackupMonitor(backup, volume, backupTargetClient, biChecksum,
volume.Spec.BackupCompressionMethod, int(concurrentLimit), storageClassName, engineClientProxy)
Expand Down Expand Up @@ -914,5 +942,99 @@ func (bc *BackupController) syncBackupStatusWithSnapshotCreationTimeAndVolumeSiz
func (bc *BackupController) backupInFinalState(backup *longhorn.Backup) bool {
return backup.Status.State == longhorn.BackupStateCompleted ||
backup.Status.State == longhorn.BackupStateError ||
backup.Status.State == longhorn.BackupStateUnknown
backup.Status.State == longhorn.BackupStateUnknown ||
backup.Status.State == longhorn.BackupStateDeleting
}

func (bc *BackupController) startDeletingBackupInBackupStore(backupURL string, backupTargetClient *engineapi.BackupTargetClient) {
bc.deletingMapLock.Lock()
bc.inProgressDeletingMap[backupURL] = &DeletingStatus{
State: longhorn.BackupStateDeleting,
ErrorMessage: "",
}
bc.deletingMapLock.Unlock()

// The backup deletion will be executed asynchronously.
// After triggering the backup deletion, update the state to deleting.
// Then keep checking if the backup is deleted in the backupstore before removing the finalizer.
go func() {
err := backupTargetClient.BackupDelete(backupURL, backupTargetClient.Credential)
// If the deletion command fails, update the in-memory map to inform the controller.
if err != nil {
bc.deletingMapLock.Lock()
bc.inProgressDeletingMap[backupURL].State = longhorn.BackupStateError
bc.inProgressDeletingMap[backupURL].ErrorMessage = fmt.Sprintf(FailedToDeleteBackupMessage, backupURL, err.Error())
bc.deletingMapLock.Unlock()
}
}()
}

func (bc *BackupController) handleBackupDeletionInBackupStore(backup *longhorn.Backup, backupVolumeName string, backupURL string, backupTargetClient *engineapi.BackupTargetClient) bool {
log := getLoggerForBackup(bc.logger, backup)
existingBackup := backup.DeepCopy()

if backup.Status.State != longhorn.BackupStateDeleting {
if bc.deletingBackoff.IsInBackOffSinceUpdate(backup.Name, time.Now()) {
return false
}

if unused, err := bc.isBackupNotBeingUsedForVolumeRestore(backup.Name, backupVolumeName); !unused {
log.WithError(err).Warn("Failed to delete backup in backupstore")
return false
}
bc.startDeletingBackupInBackupStore(backupURL, backupTargetClient)

// After triggering the asynchronous deletion,
// update the status to Deleting and requeue the backup to check the backupstore in the following reconciliations.
// Controller won't execute this code block if the status is deleting.
backup.Status.State = longhorn.BackupStateDeleting
backup.Status.Messages = map[string]string{}
if _, err := bc.ds.UpdateBackupStatus(backup); err != nil {
log.WithError(err).Debugf("Backup %v update status error", backup.Name)
}
bc.deletingBackoff.Next(backup.Name, time.Now())
bc.enqueueBackup(backup)
return false
}

// For the in progress backup, the inspect command returns nil backupInfo with in progress error.
// We should consider the backup exists even the backupInfo is nil when it is in progress.
backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential)
if err != nil && !types.ErrorIsNotFound(err) && !types.ErrorIsInProgress(err) {
log.WithError(err).Debugf("failed to check backup %v in the backupstore", backup.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.WithError(err).Debugf("failed to check backup %v in the backupstore", backup.Name)
log.WithError(err).Debugf("Failed to check backup %v in the backupstore", backup.Name)

bc.enqueueBackup(backup)
return false
}
if backupInfo != nil || types.ErrorIsInProgress(err) {
bc.deletingMapLock.Lock()
defer bc.deletingMapLock.Unlock()
// Controller could have crashed if there is no record in the map and the backup still exists in the backupstore.
// We update the status to Error so the controller can retry the deletion command again.
if bc.inProgressDeletingMap[backupURL] == nil {
backup.Status.State = longhorn.BackupStateError
backup.Status.Messages[MessageTypeReconcileError] = NoDeletionInProgressRecordMessage
} else {
// If the state is Deleting, we early return to check the file in the backupstore in the next reconciliation
// If the state is Error, we update the state and message to inform users.
// With the state being Error, other pending backups can start without considering the deletion lock in the backupstore.
// Controller can also retry the deletion command after the status is Error.
if bc.inProgressDeletingMap[backupURL].State == longhorn.BackupStateDeleting {
return false
}
log.Warnf("Failed to delete backup in the backupstore, %v", bc.inProgressDeletingMap[backupURL].ErrorMessage)
backup.Status.State = longhorn.BackupStateError
backup.Status.Messages[MessageTypeReconcileError] = fmt.Sprintf(FailedToDeleteBackupMessage, backupURL, bc.inProgressDeletingMap[backupURL].ErrorMessage)
}
if reflect.DeepEqual(existingBackup.Status, backup.Status) {
return false
}
if _, err := bc.ds.UpdateBackupStatus(backup); err != nil {
log.WithError(err).Debugf("Backup %v update status error", backup.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.WithError(err).Debugf("Backup %v update status error", backup.Name)
log.WithError(err).Errorf("Backup %v update status error", backup.Name)

}
return false
}

// Clean up the deleting backoff
bc.deletingBackoff.DeleteEntry(backup.Name)
return true
}
2 changes: 1 addition & 1 deletion controller/backup_volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (bvc *BackupVolumeController) reconcile(backupVolumeName string) (err error
backupLabelMap := map[string]string{}

backupURL := backupstore.EncodeBackupURL(backupName, backupVolumeName, backupTargetClient.URL)
if backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential); err != nil {
if backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential); err != nil && !types.ErrorIsNotFound(err) {
log.WithError(err).WithFields(logrus.Fields{
"backup": backupName,
"backupvolume": backupVolumeName,
Expand Down
4 changes: 1 addition & 3 deletions engineapi/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ func parseBackupConfig(output string) (*Backup, error) {
func (btc *BackupTargetClient) BackupGet(backupConfigURL string, credential map[string]string) (*Backup, error) {
output, err := btc.ExecuteEngineBinary("backup", "inspect", backupConfigURL)
if err != nil {
if types.ErrorIsNotFound(err) {
return nil, nil
}
return nil, errors.Wrapf(err, "error getting backup config %s", backupConfigURL)
}
return parseBackupConfig(output)
Expand Down Expand Up @@ -294,6 +291,7 @@ func (btc *BackupTargetClient) BackupConfigMetaGet(url string, credential map[st

// BackupDelete deletes the backup from the remote backup target
func (btc *BackupTargetClient) BackupDelete(backupURL string, credential map[string]string) error {
logrus.Infof("Start deleting backup %s", backupURL)
_, err := btc.ExecuteEngineBinaryWithoutTimeout("backup", "rm", backupURL)
if err != nil {
if types.ErrorIsNotFound(err) {
Expand Down
7 changes: 5 additions & 2 deletions k8s/pkg/apis/longhorn/v1beta2/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
type BackupState string

const (
// non-final state
// Non-final state
BackupStateNew = BackupState("")
BackupStatePending = BackupState("Pending")
BackupStateInProgress = BackupState("InProgress")
// final state
// Final state
BackupStateCompleted = BackupState("Completed")
BackupStateError = BackupState("Error")
BackupStateUnknown = BackupState("Unknown")
// Deleting is also considered as final state
// as it only happens when the backup is being deleting and has deletion timestamp
BackupStateDeleting = BackupState("Deleting")
)

type BackupCompressionMethod string
Expand Down
4 changes: 4 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,10 @@ func ErrorIsNotFound(err error) bool {
return strings.Contains(err.Error(), "cannot find")
}

func ErrorIsInProgress(err error) bool {
return strings.Contains(err.Error(), "in progress")
}

func ErrorIsStopped(err error) bool {
return strings.Contains(err.Error(), "is stopped")
}
Expand Down