Skip to content

Commit

Permalink
feat(backup): delete backup in the backupstore asynchronously
Browse files Browse the repository at this point in the history
ref: longhorn/longhorn 8746

Signed-off-by: Jack Lin <jack.lin@suse.com>
  • Loading branch information
ChanYiLin committed Aug 1, 2024
1 parent 76191be commit c809fd8
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 32 deletions.
175 changes: 149 additions & 26 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/controller"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,21 +36,30 @@ import (
)

const (
MessageTypeReconcileInfo = "info"
MessageTypeReconcileInfo = "info"
MessageTypeReconcileError = "error"
)

const (
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
WaitForBackupDeletionIsCompleteMessage = "Wait for backup %v to be deleted"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
FailedToDeleteBackupMessage = "Failed to delete the backup %v in the backupstore, err %v"
NoDeletionInProgressRecordMessage = "No deletion in progress record, retry the deletion command"
)

type DeletingStatus struct {
State longhorn.BackupState
ErrorMessage string
}

type BackupController struct {
*baseController

// which namespace controller is running with
// Which namespace controller is running with
namespace string
// use as the OwnerID of the controller
// Use as the OwnerID of the controller
controllerID string

kubeClient clientset.Interface
Expand All @@ -63,6 +73,13 @@ type BackupController struct {
cacheSyncs []cache.InformerSynced

proxyConnCounter util.Counter

// Use to track the result of the deletion command.
// Also used to track if controller crashes after the deletion command is triggered.
deletingMapLock *sync.Mutex
inProgressDeletingMap map[string]*DeletingStatus

deletingBackoff *flowcontrol.Backoff
}

func NewBackupController(
Expand Down Expand Up @@ -96,6 +113,11 @@ func NewBackupController(
eventRecorder: eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "longhorn-backup-controller"}),

proxyConnCounter: proxyConnCounter,

deletingMapLock: &sync.Mutex{},
inProgressDeletingMap: map[string]*DeletingStatus{},

deletingBackoff: flowcontrol.NewBackOff(time.Second*10, time.Minute),
}

var err error
Expand Down Expand Up @@ -269,6 +291,10 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return errors.Wrap(err, "failed to get backup volume name")
}

if backup.Status.Messages == nil {
backup.Status.Messages = map[string]string{}
}

// Examine DeletionTimestamp to determine if object is under deletion
if !backup.DeletionTimestamp.IsZero() {
if err := bc.handleAttachmentTicketDeletion(backup, backupVolumeName); err != nil {
Expand All @@ -280,23 +306,18 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return err
}

if backupTarget.Spec.BackupTargetURL != "" &&
backupVolume != nil && backupVolume.DeletionTimestamp == nil {
backupTargetClient, err := newBackupTargetClientFromDefaultEngineImage(bc.ds, backupTarget)
if err != nil {
log.WithError(err).Warn("Failed to init backup target clients")
return nil // Ignore error to prevent enqueue
}
backupTargetClient, err := newBackupTargetClientFromDefaultEngineImage(bc.ds, backupTarget)
if err != nil {
log.WithError(err).Warn("Failed to init backup target clients")
return nil // Ignore error to prevent enqueue
}
backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)

if unused, err := bc.isBackupNotBeingUsedForVolumeRestore(backup.Name, backupVolumeName); !unused {
log.WithError(err).Warn("Failed to delete remote backup")
if backupTarget.Spec.BackupTargetURL != "" && backupVolume != nil && backupVolume.DeletionTimestamp == nil {
backupDeleted := bc.handleBackupDeletionInBackupStore(backup, backupVolumeName, backupURL, backupTargetClient)
if !backupDeleted {
return nil
}

backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
if err := backupTargetClient.BackupDelete(backupURL, backupTargetClient.Credential); err != nil {
return errors.Wrap(err, "failed to delete remote backup")
}
}

// Request backup_volume_controller to reconcile BackupVolume immediately if it's the last backup
Expand Down Expand Up @@ -406,10 +427,6 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return err
}

if backup.Status.Messages == nil {
backup.Status.Messages = map[string]string{}
}

monitor, err := bc.checkMonitor(backup, volume, backupTarget)
if err != nil {
if backup.Status.State == longhorn.BackupStateError {
Expand Down Expand Up @@ -454,7 +471,7 @@ func (bc *BackupController) reconcile(backupName string) (err error) {

backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential)
if err != nil {
if err != nil && !types.ErrorIsNotFound(err) {
if !strings.Contains(err.Error(), "in progress") {
log.WithError(err).Error("Error inspecting backup config")
}
Expand Down Expand Up @@ -765,6 +782,18 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
}
}

clusterBackups, err := bc.ds.ListBackupsWithBackupVolumeName(volume.Name)
if err != nil {
return nil, errors.Wrapf(err, "failed to list backups in the cluster")
}
for _, b := range clusterBackups {
if b.Status.State == longhorn.BackupStateDeleting {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForBackupDeletionIsCompleteMessage, b.Name)
return nil, fmt.Errorf("waiting for the backup %v to be deleted before enabling backup monitor", b.Name)
}
}

// Enable the backup monitor
monitor, err := bc.enableBackupMonitor(backup, volume, backupTargetClient, biChecksum,
volume.Spec.BackupCompressionMethod, int(concurrentLimit), storageClassName, engineClientProxy)
Expand Down Expand Up @@ -914,5 +943,99 @@ func (bc *BackupController) syncBackupStatusWithSnapshotCreationTimeAndVolumeSiz
func (bc *BackupController) backupInFinalState(backup *longhorn.Backup) bool {
return backup.Status.State == longhorn.BackupStateCompleted ||
backup.Status.State == longhorn.BackupStateError ||
backup.Status.State == longhorn.BackupStateUnknown
backup.Status.State == longhorn.BackupStateUnknown ||
backup.Status.State == longhorn.BackupStateDeleting
}

func (bc *BackupController) startDeletingBackupInBackupStore(backupURL string, backupTargetClient *engineapi.BackupTargetClient) {
bc.deletingMapLock.Lock()
bc.inProgressDeletingMap[backupURL] = &DeletingStatus{
State: longhorn.BackupStateDeleting,
ErrorMessage: "",
}
bc.deletingMapLock.Unlock()

// The backup deletion will be executed asynchronously.
// After triggering the backup deletion, update the state to deleting.
// Then keep checking if the backup is deleted in the backupstore before removing the finalizer.
go func() {
err := backupTargetClient.BackupDelete(backupURL, backupTargetClient.Credential)
// If the deletion command fails, update the in-memory map to inform the controller.
if err != nil {
bc.deletingMapLock.Lock()
bc.inProgressDeletingMap[backupURL].State = longhorn.BackupStateError
bc.inProgressDeletingMap[backupURL].ErrorMessage = fmt.Sprintf(FailedToDeleteBackupMessage, backupURL, err.Error())
bc.deletingMapLock.Unlock()
}
return
}()
}

func (bc *BackupController) handleBackupDeletionInBackupStore(backup *longhorn.Backup, backupVolumeName string, backupURL string, backupTargetClient *engineapi.BackupTargetClient) bool {
log := getLoggerForBackup(bc.logger, backup)
existingBackup := backup.DeepCopy()

if backup.Status.State != longhorn.BackupStateDeleting {
if bc.deletingBackoff.IsInBackOffSinceUpdate(backup.Name, time.Now()) {
return false
}

if unused, err := bc.isBackupNotBeingUsedForVolumeRestore(backup.Name, backupVolumeName); !unused {
log.WithError(err).Warn("Failed to delete backup in backupstore")
return false
}

bc.startDeletingBackupInBackupStore(backupURL, backupTargetClient)

// After triggering the asynchronous deletion,
// update the status to Deleting and requeue the backup to check the backupstore in the following reconciliations.
// Controller won't execute this code block if the status is deleting.
backup.Status.State = longhorn.BackupStateDeleting
backup.Status.Messages = map[string]string{}
if _, err := bc.ds.UpdateBackupStatus(backup); err != nil {
log.WithError(err).Debugf("Backup %v update status error", backup.Name)
}
bc.deletingBackoff.Next(backup.Name, time.Now())
bc.enqueueBackup(backup)
return false
}

backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential)
if err != nil && !types.ErrorIsNotFound(err) {
log.WithError(err).Debugf("failed to check backup %v in the backupstore", backup.Name)
bc.enqueueBackup(backup)
return false
}
if backupInfo != nil {
bc.deletingMapLock.Lock()
defer bc.deletingMapLock.Unlock()
// Controller could have crashed if there is no record in the map and the backup still exists in the backupstore.
// We update the status to Error so the controller can retry the deletion command again.
if bc.inProgressDeletingMap[backupURL] == nil {
backup.Status.State = longhorn.BackupStateError
backup.Status.Messages[MessageTypeReconcileError] = NoDeletionInProgressRecordMessage
} else {
// If the state is Deleting, we early return to check the file in the backupstore in the next reconcilation

Check failure on line 1018 in controller/backup_controller.go

View workflow job for this annotation

GitHub Actions / codespell

reconcilation ==> reconciliation
// If the state is Error, we update the state and message to inform users.
// With the state being Error, other pending backups can start without considering the deletion lock in the backupstore.
// Controller can also retry the deletion command after the status is Error.
if bc.inProgressDeletingMap[backupURL].State == longhorn.BackupStateDeleting {
return false
}
log.Warnf("Failed to delete backup in the backupstore, %v", bc.inProgressDeletingMap[backupURL].ErrorMessage)
backup.Status.State = longhorn.BackupStateError
backup.Status.Messages[MessageTypeReconcileError] = fmt.Sprintf(FailedToDeleteBackupMessage, backupURL, bc.inProgressDeletingMap[backupURL].ErrorMessage)
}
if reflect.DeepEqual(existingBackup.Status, backup.Status) {
return false
}
if _, err := bc.ds.UpdateBackupStatus(backup); err != nil {
log.WithError(err).Debugf("Backup %v update status error", backup.Name)
}
return false
}

// Clean up the deleting backoff
bc.deletingBackoff.DeleteEntry(backup.Name)
return true
}
2 changes: 1 addition & 1 deletion controller/backup_volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (bvc *BackupVolumeController) reconcile(backupVolumeName string) (err error
backupLabelMap := map[string]string{}

backupURL := backupstore.EncodeBackupURL(backupName, backupVolumeName, backupTargetClient.URL)
if backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential); err != nil {
if backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential); err != nil && !types.ErrorIsNotFound(err) {
log.WithError(err).WithFields(logrus.Fields{
"backup": backupName,
"backupvolume": backupVolumeName,
Expand Down
4 changes: 1 addition & 3 deletions engineapi/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ func parseBackupConfig(output string) (*Backup, error) {
func (btc *BackupTargetClient) BackupGet(backupConfigURL string, credential map[string]string) (*Backup, error) {
output, err := btc.ExecuteEngineBinary("backup", "inspect", backupConfigURL)
if err != nil {
if types.ErrorIsNotFound(err) {
return nil, nil
}
return nil, errors.Wrapf(err, "error getting backup config %s", backupConfigURL)
}
return parseBackupConfig(output)
Expand Down Expand Up @@ -294,6 +291,7 @@ func (btc *BackupTargetClient) BackupConfigMetaGet(url string, credential map[st

// BackupDelete deletes the backup from the remote backup target
func (btc *BackupTargetClient) BackupDelete(backupURL string, credential map[string]string) error {
logrus.Infof("Start deleting backup %s", backupURL)
_, err := btc.ExecuteEngineBinaryWithoutTimeout("backup", "rm", backupURL)
if err != nil {
if types.ErrorIsNotFound(err) {
Expand Down
7 changes: 5 additions & 2 deletions k8s/pkg/apis/longhorn/v1beta2/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
type BackupState string

const (
// non-final state
// Non-final state
BackupStateNew = BackupState("")
BackupStatePending = BackupState("Pending")
BackupStateInProgress = BackupState("InProgress")
// final state
// Final state
BackupStateCompleted = BackupState("Completed")
BackupStateError = BackupState("Error")
BackupStateUnknown = BackupState("Unknown")
// Deleting is also considered as final state
// as it only happens when the backup is being deleting and has deletion timestamp
BackupStateDeleting = BackupState("Deleting")
)

type BackupCompressionMethod string
Expand Down

0 comments on commit c809fd8

Please sign in to comment.