Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

todo #1711

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

todo #1711

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/controller/ramenconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
VolumeUnprotectionEnabledForAsyncVolSync = false
)

// FIXME
// FIXME #1710
const NoS3StoreAvailable = "NoS3"

var ControllerType ramendrv1alpha1.ControllerType
Expand Down
18 changes: 15 additions & 3 deletions internal/controller/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,10 +639,22 @@
return numRestoredForVS + numRestoredForVR, fmt.Errorf("failed to restore PV/PVC for VolRep (%w)", err)
}

// Only after both succeed, we mark ClusterDataReady as true
msg := "Restored PVs and PVCs"
if numRestoredForVS+numRestoredForVR == 0 {
objectsRestored, err := v.kubeObjectsRecover(result)
if err != nil {
v.log.Info("Kube objects restore failed")

return numRestoredForVS + numRestoredForVR, fmt.Errorf("failed to restore kube objects (%w)", err)
}

var msg string
// Only after volsync, volrep and kubeObjects succeed, we mark ClusterDataReady as true
if numRestoredForVS+numRestoredForVR == 0 && !objectsRestored {
msg = "Nothing to restore"
} else {
msg = fmt.Sprintf("Restored %d volsync PVs/PVCs and %d volrep PVs/PVCs", numRestoredForVS, numRestoredForVR)
if objectsRestored {
msg = msg + " and kube objects"

Check failure on line 656 in internal/controller/volumereplicationgroup_controller.go

View workflow job for this annotation

GitHub Actions / Golangci Lint (.)

assignOp: replace `msg = msg + " and kube objects"` with `msg += " and kube objects"` (gocritic)
}
}

setVRGClusterDataReadyCondition(&v.instance.Status.Conditions, v.instance.Generation, msg)
Expand Down
174 changes: 125 additions & 49 deletions internal/controller/vrg_kubeobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,65 +495,96 @@ func (v *VRGInstance) kubeObjectsCaptureStatus(status metav1.ConditionStatus, re
}
}

func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result,
s3StoreProfile ramen.S3StoreProfile, objectStorer ObjectStorer,
) error {
func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result) (bool, error) {
if v.kubeObjectProtectionDisabled("recovery") {
return nil
}
v.log.Info("KubeObjects recovery disabled")

localS3StoreAccessor, err := v.findS3StoreAccessor(s3StoreProfile)
if err != nil {
return err
return false, nil
}

vrg := v.instance
sourceVrgNamespaceName, sourceVrgName := vrg.Namespace, vrg.Name
sourcePathNamePrefix := s3PathNamePrefix(sourceVrgNamespaceName, sourceVrgName)
v.log.Info("Restoring KubeObjects")

sourceVrg := &ramen.VolumeReplicationGroup{}
if err := vrgObjectDownload(objectStorer, sourcePathNamePrefix, sourceVrg); err != nil {
v.log.Error(err, "Kube objects capture-to-recover-from identifier get error")
if len(v.instance.Spec.S3Profiles) == 0 {
v.log.Info("No S3 profiles configured")

return nil
}

captureToRecoverFromIdentifier := sourceVrg.Status.KubeObjectProtection.CaptureToRecoverFrom
if captureToRecoverFromIdentifier == nil {
v.log.Info("Kube objects capture-to-recover-from identifier nil")
result.Requeue = true

return nil
return false, fmt.Errorf("no S3Profiles configured")
}

vrg.Status.KubeObjectProtection.CaptureToRecoverFrom = captureToRecoverFromIdentifier
veleroNamespaceName := v.veleroNamespaceName()
labels := util.OwnerLabels(vrg)
log := v.log.WithValues("number", captureToRecoverFromIdentifier.Number, "profile", localS3StoreAccessor.S3ProfileName)
v.log.Info(fmt.Sprintf("Restoring KubeObjects to this managed cluster. ProfileList: %v", v.instance.Spec.S3Profiles))

captureRequestsStruct, err := v.reconciler.kubeObjects.ProtectRequestsGet(
v.ctx, v.reconciler.APIReader, veleroNamespaceName, labels)
restored, err := v.kubeObjectsRecoverFromS3(result)
if err != nil {
log.Error(err, "Kube objects capture requests query error")
errMsg := fmt.Sprintf("failed to KubeObjects using profile list (%v)", v.instance.Spec.S3Profiles)
v.log.Info(errMsg)

return err
return restored, fmt.Errorf("%s: %w", errMsg, err)
}

recoverRequestsStruct, err := v.reconciler.kubeObjects.RecoverRequestsGet(
v.ctx, v.reconciler.APIReader, veleroNamespaceName, labels)
if err != nil {
log.Error(err, "Kube objects recover requests query error")
return restored, nil
}

return err
func (v *VRGInstance) kubeObjectsRecoverFromS3(result *ctrl.Result) (bool, error) {
objectsRestored := true

err := errors.New("s3Profiles empty")
NoS3 := false

for _, s3ProfileName := range v.instance.Spec.S3Profiles {
if s3ProfileName == NoS3StoreAvailable {
v.log.Info("NoS3 available to fetch")

NoS3 = true

continue
}

objectStorer, _, err := v.reconciler.ObjStoreGetter.ObjectStore(
v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log)
if err != nil {
v.log.Error(err, "Kube objects recovery object store inaccessible", "profile", s3ProfileName)

continue
}

sourceVrg := &ramen.VolumeReplicationGroup{}
if err := vrgObjectDownload(objectStorer, s3PathNamePrefix(v.instance.Namespace, v.instance.Name),
sourceVrg); err != nil {
v.log.Error(err, "Kube objects capture-to-recover-from identifier get error")

// TODO: check if not finding a vrg is an error
return !objectsRestored, nil
}

captureToRecoverFromIdentifier := sourceVrg.Status.KubeObjectProtection.CaptureToRecoverFrom
if captureToRecoverFromIdentifier == nil {
v.log.Info("Kube objects capture-to-recover-from identifier nil")

// TODO: check if vrg doesn't have a capture-to-recover-from is an error
return !objectsRestored, nil
}

v.instance.Status.KubeObjectProtection.CaptureToRecoverFrom = captureToRecoverFromIdentifier
log := v.log.WithValues("number", captureToRecoverFromIdentifier.Number,
"profile", s3ProfileName)

err = v.kubeObjectsRecoveryStartOrResume(result, s3ProfileName, captureToRecoverFromIdentifier, log)
if err != nil {
return !objectsRestored, err
}

return objectsRestored, nil
}

return v.kubeObjectsRecoveryStartOrResume(
result,
s3StoreAccessor{objectStorer, localS3StoreAccessor.S3StoreProfile},
sourceVrgNamespaceName, sourceVrgName, captureToRecoverFromIdentifier,
kubeobjects.RequestsMapKeyedByName(captureRequestsStruct),
kubeobjects.RequestsMapKeyedByName(recoverRequestsStruct),
veleroNamespaceName, labels, log,
)
if NoS3 {
// TODO: check if objectsRestored should be false. Affects tests only.
return objectsRestored, nil
}

result.Requeue = true

return !objectsRestored, err
}

func (v *VRGInstance) findS3StoreAccessor(s3StoreProfile ramen.S3StoreProfile) (s3StoreAccessor, error) {
Expand Down Expand Up @@ -627,16 +658,60 @@ func (v *VRGInstance) getRecoverOrProtectRequest(
}
}

func (v *VRGInstance) getCaptureRequests() (map[string]kubeobjects.Request, error) {
captureRequestsStruct, err := v.reconciler.kubeObjects.ProtectRequestsGet(
v.ctx, v.reconciler.APIReader, v.veleroNamespaceName(), util.OwnerLabels(v.instance))
if err != nil {
return nil, fmt.Errorf("kube objects capture requests query error: %v", err)
}

return kubeobjects.RequestsMapKeyedByName(captureRequestsStruct), nil
}

func (v *VRGInstance) getRecoverRequests() (map[string]kubeobjects.Request, error) {
recoverRequestsStruct, err := v.reconciler.kubeObjects.RecoverRequestsGet(
v.ctx, v.reconciler.APIReader, v.veleroNamespaceName(), util.OwnerLabels(v.instance))
if err != nil {
return nil, fmt.Errorf("kube objects recover requests query error: %v", err)
}

return kubeobjects.RequestsMapKeyedByName(recoverRequestsStruct), nil
}

func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
result *ctrl.Result, s3StoreAccessor s3StoreAccessor,
sourceVrgNamespaceName, sourceVrgName string,
result *ctrl.Result, s3ProfileName string,
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier,
captureRequests, recoverRequests map[string]kubeobjects.Request,
veleroNamespaceName string, labels map[string]string, log logr.Logger,
log logr.Logger,
) error {
veleroNamespaceName := v.veleroNamespaceName()
labels := util.OwnerLabels(v.instance)

captureRequests, err := v.getCaptureRequests()
if err != nil {
return err
}

recoverRequests, err := v.getRecoverRequests()
if err != nil {
return err
}

groups := v.recipeElements.RecoverWorkflow
requests := make([]kubeobjects.Request, len(groups))

objectStorer, s3StoreProfile, err := v.reconciler.ObjStoreGetter.ObjectStore(
v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log)
if err != nil {
return fmt.Errorf("kube objects recovery object store inaccessible for profile %v: %v", s3ProfileName, err)
}

localS3StoreAccessor, err := v.findS3StoreAccessor(s3StoreProfile)
if err != nil {
return fmt.Errorf("kube objects recovery couldn't build s3StoreAccessor: %v", err)
}

s3StoreAccessor := s3StoreAccessor{objectStorer, localS3StoreAccessor.S3StoreProfile}

for groupNumber, recoverGroup := range groups {
rg := recoverGroup
log1 := log.WithValues("group", groupNumber, "name", rg.BackupName)
Expand All @@ -646,8 +721,8 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
return fmt.Errorf("check hook execution failed during restore %s: %v", rg.Hook.Name, err)
}
} else {
if err := v.executeRecoverGroup(result, s3StoreAccessor, sourceVrgNamespaceName,
sourceVrgName, captureToRecoverFromIdentifier, captureRequests,
if err := v.executeRecoverGroup(result, s3StoreAccessor,
captureToRecoverFromIdentifier, captureRequests,
recoverRequests, veleroNamespaceName, labels, groupNumber, rg,
requests, log1); err != nil {
return err
Expand All @@ -663,12 +738,13 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
}

func (v *VRGInstance) executeRecoverGroup(result *ctrl.Result, s3StoreAccessor s3StoreAccessor,
sourceVrgNamespaceName, sourceVrgName string,
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier,
captureRequests, recoverRequests map[string]kubeobjects.Request,
veleroNamespaceName string, labels map[string]string, groupNumber int,
rg kubeobjects.RecoverSpec, requests []kubeobjects.Request, log1 logr.Logger,
) error {
sourceVrgName := v.instance.Name
sourceVrgNamespaceName := v.instance.Namespace
request, ok, submit, cleanup := v.getRecoverOrProtectRequest(
captureRequests, recoverRequests, s3StoreAccessor,
sourceVrgNamespaceName, sourceVrgName,
Expand Down
6 changes: 2 additions & 4 deletions internal/controller/vrg_volrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -2010,9 +2010,7 @@ func (v *VRGInstance) restorePVsAndPVCsFromS3(result *ctrl.Result) (int, error)

var objectStore ObjectStorer

var s3StoreProfile ramendrv1alpha1.S3StoreProfile

objectStore, s3StoreProfile, err = v.reconciler.ObjStoreGetter.ObjectStore(
objectStore, _, err = v.reconciler.ObjStoreGetter.ObjectStore(
v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log)
if err != nil {
v.log.Error(err, "Kube objects recovery object store inaccessible", "profile", s3ProfileName)
Expand Down Expand Up @@ -2046,7 +2044,7 @@ func (v *VRGInstance) restorePVsAndPVCsFromS3(result *ctrl.Result) (int, error)

v.log.Info(fmt.Sprintf("Restored %d PVs and %d PVCs using profile %s", pvCount, pvcCount, s3ProfileName))

return pvCount + pvcCount, v.kubeObjectsRecover(result, s3StoreProfile, objectStore)
return pvCount + pvcCount, nil
}

if NoS3 {
Expand Down
Loading