Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backup): delete backup in the backupstore asynchronously #3038

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 162 additions & 21 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -15,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/controller"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,21 +35,35 @@ import (
)

const (
MessageTypeReconcileInfo = "info"
MessageTypeReconcileInfo = "info"
MessageTypeReconcileError = "error"
)

const (
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
WaitForSnapshotMessage = "Waiting for the snapshot %v to be ready"
WaitForEngineMessage = "Waiting for the engine %v to be ready"
WaitForBackupDeletionIsCompleteMessage = "Wait for backup %v to be deleted"
FailedToGetSnapshotMessage = "Failed to get the Snapshot %v"
FailedToDeleteBackupMessage = "Failed to delete the backup %v in the backupstore, err %v"
NoDeletionInProgressRecordMessage = "No deletion in progress record, retry the deletion command"
)

const (
DeletionMinInterval = time.Minute * 1
DeletionMaxInterval = time.Hour * 24
)

type DeletingStatus struct {
State longhorn.BackupState
ErrorMessage string
}

type BackupController struct {
*baseController

// which namespace controller is running with
// Which namespace controller is running with
namespace string
// use as the OwnerID of the controller
// Use as the OwnerID of the controller
controllerID string

kubeClient clientset.Interface
Expand All @@ -63,6 +77,13 @@ type BackupController struct {
cacheSyncs []cache.InformerSynced

proxyConnCounter util.Counter

// Use to track the result of the deletion command.
// Also used to track if controller crashes after the deletion command is triggered.
deletingMapLock sync.Mutex
inProgressDeletingMap map[string]*DeletingStatus

deletingBackoff *flowcontrol.Backoff
ChanYiLin marked this conversation as resolved.
Show resolved Hide resolved
}

func NewBackupController(
Expand Down Expand Up @@ -96,6 +117,11 @@ func NewBackupController(
eventRecorder: eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "longhorn-backup-controller"}),

proxyConnCounter: proxyConnCounter,

deletingMapLock: sync.Mutex{},
inProgressDeletingMap: map[string]*DeletingStatus{},

deletingBackoff: flowcontrol.NewBackOff(DeletionMinInterval, DeletionMaxInterval),
}

var err error
Expand Down Expand Up @@ -269,6 +295,10 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return errors.Wrap(err, "failed to get backup volume name")
}

if backup.Status.Messages == nil {
backup.Status.Messages = map[string]string{}
}

// Examine DeletionTimestamp to determine if object is under deletion
if !backup.DeletionTimestamp.IsZero() {
if err := bc.handleAttachmentTicketDeletion(backup, backupVolumeName); err != nil {
Expand All @@ -291,16 +321,12 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
log.WithError(err).Warn("Failed to init backup target clients")
return nil // Ignore error to prevent enqueue
}
backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)

if unused, err := bc.isBackupNotBeingUsedForVolumeRestore(backup.Name, backupVolumeName); !unused {
log.WithError(err).Warn("Failed to delete remote backup")
backupDeleted := bc.handleBackupDeletionInBackupStore(backup, backupVolumeName, backupURL, backupTargetClient)
if !backupDeleted {
return nil
}
ChanYiLin marked this conversation as resolved.
Show resolved Hide resolved

backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
if err := backupTargetClient.BackupDelete(backupURL, backupTargetClient.Credential); err != nil {
return errors.Wrap(err, "failed to delete remote backup")
}
}

// Request backup_volume_controller to reconcile BackupVolume immediately if it's the last backup
Expand Down Expand Up @@ -400,10 +426,6 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return err
}

if backup.Status.Messages == nil {
backup.Status.Messages = map[string]string{}
}

monitor, err := bc.checkMonitor(backup, volume, backupTarget)
if err != nil {
if backup.Status.State == longhorn.BackupStateError {
Expand Down Expand Up @@ -448,8 +470,8 @@ func (bc *BackupController) reconcile(backupName string) (err error) {

backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential)
if err != nil {
if !strings.Contains(err.Error(), "in progress") {
if err != nil && !types.ErrorIsNotFound(err) {
if !types.ErrorIsInProgress(err) {
log.WithError(err).Error("Error inspecting backup config")
}
return nil // Ignore error to prevent enqueue
Expand Down Expand Up @@ -759,6 +781,18 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
}
}

clusterBackups, err := bc.ds.ListBackupsWithBackupVolumeName(volume.Name)
if err != nil {
return nil, errors.Wrapf(err, "failed to list backups in the cluster")
}
for _, b := range clusterBackups {
if b.Status.State == longhorn.BackupStateDeleting {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForBackupDeletionIsCompleteMessage, b.Name)
return nil, fmt.Errorf("waiting for the backup %v to be deleted before enabling backup monitor", b.Name)
}
}

derekbit marked this conversation as resolved.
Show resolved Hide resolved
ChanYiLin marked this conversation as resolved.
Show resolved Hide resolved
// Enable the backup monitor
monitor, err := bc.enableBackupMonitor(backup, volume, backupTargetClient, biChecksum,
volume.Spec.BackupCompressionMethod, int(concurrentLimit), storageClassName, engineClientProxy)
Expand Down Expand Up @@ -908,5 +942,112 @@ func (bc *BackupController) syncBackupStatusWithSnapshotCreationTimeAndVolumeSiz
func (bc *BackupController) backupInFinalState(backup *longhorn.Backup) bool {
return backup.Status.State == longhorn.BackupStateCompleted ||
backup.Status.State == longhorn.BackupStateError ||
backup.Status.State == longhorn.BackupStateUnknown
backup.Status.State == longhorn.BackupStateUnknown ||
backup.Status.State == longhorn.BackupStateDeleting
}

func (bc *BackupController) startDeletingBackupInBackupStore(backupURL string, backupTargetClient *engineapi.BackupTargetClient) {
bc.deletingMapLock.Lock()
bc.inProgressDeletingMap[backupURL] = &DeletingStatus{
State: longhorn.BackupStateDeleting,
ErrorMessage: "",
}
bc.deletingMapLock.Unlock()

// The backup deletion will be executed asynchronously.
// After triggering the backup deletion, update the state to deleting.
// Then keep checking if the backup is deleted in the backupstore before removing the finalizer.
go func() {
defer func() {
if r := recover(); r != nil {
bc.setInprogressDeletionMap(backupURL, longhorn.BackupStateError, fmt.Sprintf("Recovered from panic: %v", r))
}
}()
err := backupTargetClient.BackupDelete(backupURL, backupTargetClient.Credential)
// If the deletion command fails, update the in-memory map to inform the controller.
if err != nil {
bc.setInprogressDeletionMap(backupURL, longhorn.BackupStateError, fmt.Sprintf(FailedToDeleteBackupMessage, backupURL, err.Error()))
}
}()
}
ChanYiLin marked this conversation as resolved.
Show resolved Hide resolved

func (bc *BackupController) handleBackupDeletionInBackupStore(backup *longhorn.Backup, backupVolumeName string, backupURL string, backupTargetClient *engineapi.BackupTargetClient) bool {
log := getLoggerForBackup(bc.logger, backup)
existingBackup := backup.DeepCopy()

if backup.Status.State != longhorn.BackupStateDeleting {
if bc.deletingBackoff.IsInBackOffSinceUpdate(backup.Name, time.Now()) {
return false
}

if unused, err := bc.isBackupNotBeingUsedForVolumeRestore(backup.Name, backupVolumeName); !unused {
log.WithError(err).Warn("Failed to delete backup in backupstore")
return false
}
bc.startDeletingBackupInBackupStore(backupURL, backupTargetClient)

// After triggering the asynchronous deletion,
// update the status to Deleting and requeue the backup to check the backupstore in the following reconciliations.
// Controller won't execute this code block if the status is deleting.
backup.Status.State = longhorn.BackupStateDeleting
backup.Status.Messages = map[string]string{}
if _, err := bc.ds.UpdateBackupStatus(backup); err != nil {
log.WithError(err).Debugf("Backup %v update status error", backup.Name)
}
bc.deletingBackoff.Next(backup.Name, time.Now())
bc.enqueueBackup(backup)
return false
}

// For the in progress backup, the inspect command returns nil backupInfo with in progress error.
// We should consider the backup exists even the backupInfo is nil when it is in progress.
backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential)
if err != nil && !types.ErrorIsNotFound(err) && !types.ErrorIsInProgress(err) {
log.WithError(err).Debugf("Failed to check backup %v in the backupstore", backup.Name)
bc.enqueueBackup(backup)
return false
}
if backupInfo != nil || types.ErrorIsInProgress(err) {
bc.deletingMapLock.Lock()
defer bc.deletingMapLock.Unlock()
// Controller could have crashed if there is no record in the map and the backup still exists in the backupstore.
// We update the status to Error so the controller can retry the deletion command again.
if bc.inProgressDeletingMap[backupURL] == nil {
backup.Status.State = longhorn.BackupStateError
backup.Status.Messages[MessageTypeReconcileError] = NoDeletionInProgressRecordMessage
} else {
// If the state is Deleting, we early return to check the file in the backupstore in the next reconciliation
// If the state is Error, we update the state and message to inform users.
// With the state being Error, other pending backups can start without considering the deletion lock in the backupstore.
// Controller can also retry the deletion command after the status is Error.
if bc.inProgressDeletingMap[backupURL].State == longhorn.BackupStateDeleting {
return false
}
log.Warnf("Failed to delete backup in the backupstore, %v", bc.inProgressDeletingMap[backupURL].ErrorMessage)
backup.Status.State = longhorn.BackupStateError
backup.Status.Messages[MessageTypeReconcileError] = fmt.Sprintf(FailedToDeleteBackupMessage, backupURL, bc.inProgressDeletingMap[backupURL].ErrorMessage)
}
if reflect.DeepEqual(existingBackup.Status, backup.Status) {
return false
}
if _, err := bc.ds.UpdateBackupStatus(backup); err != nil {
log.WithError(err).Errorf("Backup %v update status error", backup.Name)
}
return false
}

// Clean up the deleting backoff
bc.deletingBackoff.DeleteEntry(backup.Name)
bc.deletingMapLock.Lock()
delete(bc.inProgressDeletingMap, backupURL)
bc.deletingMapLock.Unlock()
return true
}
ChanYiLin marked this conversation as resolved.
Show resolved Hide resolved

func (bc *BackupController) setInprogressDeletionMap(backupURL string, state longhorn.BackupState, errMsg string) {
bc.deletingMapLock.Lock()
defer bc.deletingMapLock.Unlock()

bc.inProgressDeletingMap[backupURL].State = state
bc.inProgressDeletingMap[backupURL].ErrorMessage = errMsg
}
2 changes: 1 addition & 1 deletion controller/backup_volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (bvc *BackupVolumeController) reconcile(backupVolumeName string) (err error
backupLabelMap := map[string]string{}

backupURL := backupstore.EncodeBackupURL(backupName, backupVolumeName, backupTargetClient.URL)
if backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential); err != nil {
if backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential); err != nil && !types.ErrorIsNotFound(err) {
shuo-wu marked this conversation as resolved.
Show resolved Hide resolved
log.WithError(err).WithFields(logrus.Fields{
"backup": backupName,
"backupvolume": backupVolumeName,
Expand Down
4 changes: 1 addition & 3 deletions engineapi/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ func parseBackupConfig(output string) (*Backup, error) {
func (btc *BackupTargetClient) BackupGet(backupConfigURL string, credential map[string]string) (*Backup, error) {
output, err := btc.ExecuteEngineBinary("backup", "inspect", backupConfigURL)
if err != nil {
if types.ErrorIsNotFound(err) {
return nil, nil
}
return nil, errors.Wrapf(err, "error getting backup config %s", backupConfigURL)
}
return parseBackupConfig(output)
Expand Down Expand Up @@ -302,6 +299,7 @@ func (btc *BackupTargetClient) BackupConfigMetaGet(url string, credential map[st

// BackupDelete deletes the backup from the remote backup target
func (btc *BackupTargetClient) BackupDelete(backupURL string, credential map[string]string) error {
logrus.Infof("Start deleting backup %s", backupURL)
_, err := btc.ExecuteEngineBinaryWithoutTimeout("backup", "rm", backupURL)
if err != nil {
if types.ErrorIsNotFound(err) {
Expand Down
7 changes: 5 additions & 2 deletions k8s/pkg/apis/longhorn/v1beta2/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
type BackupState string

const (
// non-final state
// Non-final state
BackupStateNew = BackupState("")
BackupStatePending = BackupState("Pending")
BackupStateInProgress = BackupState("InProgress")
// final state
// Final state
BackupStateCompleted = BackupState("Completed")
BackupStateError = BackupState("Error")
BackupStateUnknown = BackupState("Unknown")
// Deleting is also considered as final state
// as it only happens when the backup is being deleting and has deletion timestamp
BackupStateDeleting = BackupState("Deleting")
)

type BackupCompressionMethod string
Expand Down
4 changes: 4 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,10 @@ func ErrorIsNotFound(err error) bool {
return strings.Contains(err.Error(), "cannot find")
}

func ErrorIsInProgress(err error) bool {
return strings.Contains(err.Error(), "in progress")
}

func ErrorIsStopped(err error) bool {
return strings.Contains(err.Error(), "is stopped")
}
Expand Down