Skip to content

Commit

Permalink
CLOUDP-178754: Deletion Protection for Data Federation (#1044)
Browse files Browse the repository at this point in the history
* Support deletion protection for Atlas Data Federation

* Data Federation deletion protection tests

* Compare K8s and Atlas specs on creation

* Changes as per review

* Update ctx usage to use Background

* Check for appropriate DataFederation error codes

* Properly clean-up tests

* Properly handle delete event
  • Loading branch information
roothorp authored Aug 11, 2023
1 parent 77efe10 commit b2f37bd
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 62 deletions.
18 changes: 10 additions & 8 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,16 @@ func main() {
}

if err = (&atlasdatafederation.AtlasDataFederationReconciler{
Client: mgr.GetClient(),
Log: logger.Named("controllers").Named("AtlasDataFederation").Sugar(),
Scheme: mgr.GetScheme(),
AtlasDomain: config.AtlasDomain,
GlobalAPISecret: config.GlobalAPISecret,
ResourceWatcher: watch.NewResourceWatcher(),
GlobalPredicates: globalPredicates,
EventRecorder: mgr.GetEventRecorderFor("AtlasDataFederation"),
Client: mgr.GetClient(),
Log: logger.Named("controllers").Named("AtlasDataFederation").Sugar(),
Scheme: mgr.GetScheme(),
AtlasDomain: config.AtlasDomain,
GlobalAPISecret: config.GlobalAPISecret,
ResourceWatcher: watch.NewResourceWatcher(),
GlobalPredicates: globalPredicates,
EventRecorder: mgr.GetEventRecorderFor("AtlasDataFederation"),
ObjectDeletionProtection: config.ObjectDeletionProtection,
SubObjectDeletionProtection: config.SubObjectDeletionProtection,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AtlasDataFederation")
os.Exit(1)
Expand Down
5 changes: 5 additions & 0 deletions pkg/api/v1/atlasdatafederation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,8 @@ func (c *AtlasDataFederation) WithPrivateEndpoint(endpointID, provider, endpoint
})
return c
}

func (c *AtlasDataFederation) WithAnnotations(annotations map[string]string) *AtlasDataFederation {
c.Annotations = annotations
return c
}
3 changes: 3 additions & 0 deletions pkg/controller/atlas/api_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ const (

// Resource not found
ResourceNotFound = "RESOURCE_NOT_FOUND"

// Instance for the passed {groupId, tenantName} pair does not exist
DataFederationTenantNotFound = "DATA_FEDERATION_TENANT_NOT_FOUND_FOR_NAME"
)
14 changes: 14 additions & 0 deletions pkg/controller/atlasdatafederation/datafederation.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,17 @@ func getMergedSpec(atlasSpec, operatorSpec mdbv1.DataFederationSpec) (mdbv1.Data

return mergedSpec, nil
}

func dataFederationMatchesSpec(log *zap.SugaredLogger, atlasSpec *mongodbatlas.DataFederationInstance, operatorSpec *mdbv1.AtlasDataFederation) (bool, error) {
newAtlasSpec, err := DataFederationFromAtlas(atlasSpec)
if err != nil {
return false, err
}

equal, diff := dataFederationEqual(*newAtlasSpec, operatorSpec.Spec, log)
if !equal {
log.Debugf("DataFederation differs from spec: %s", diff)
}

return equal, nil
}
123 changes: 76 additions & 47 deletions pkg/controller/atlasdatafederation/datafederation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"fmt"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/connectionsecret"

"sigs.k8s.io/controller-runtime/pkg/handler"

"go.mongodb.org/atlas/mongodbatlas"

ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -20,9 +20,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/status"
Expand All @@ -37,13 +35,15 @@ import (
// AtlasDataFederationReconciler reconciles an DataFederation object
type AtlasDataFederationReconciler struct {
watch.ResourceWatcher
Client client.Client
Log *zap.SugaredLogger
Scheme *runtime.Scheme
AtlasDomain string
GlobalAPISecret client.ObjectKey
GlobalPredicates []predicate.Predicate
EventRecorder record.EventRecorder
Client client.Client
Log *zap.SugaredLogger
Scheme *runtime.Scheme
AtlasDomain string
GlobalAPISecret client.ObjectKey
GlobalPredicates []predicate.Predicate
EventRecorder record.EventRecorder
ObjectDeletionProtection bool
SubObjectDeletionProtection bool
}

// +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasdatafederations,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -61,10 +61,10 @@ func (r *AtlasDataFederationReconciler) Reconcile(contextInt context.Context, re
return result.ReconcileResult(), nil
}

if shouldSkip := customresource.ReconciliationShouldBeSkipped(dataFederation); shouldSkip {
if customresource.ReconciliationShouldBeSkipped(dataFederation) {
log.Infow(fmt.Sprintf("-> Skipping AtlasDataFederation reconciliation as annotation %s=%s", customresource.ReconciliationPolicyAnnotation, customresource.ReconciliationPolicySkip), "spec", dataFederation.Spec)
if !dataFederation.GetDeletionTimestamp().IsZero() {
err := r.removeDeletionFinalizer(contextInt, dataFederation)
err := customresource.ManageFinalizer(contextInt, r.Client, dataFederation, customresource.UnsetFinalizer)
if err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to remove finalizer", "error", err)
Expand Down Expand Up @@ -94,7 +94,7 @@ func (r *AtlasDataFederationReconciler) Reconcile(contextInt context.Context, re
if err != nil {
result := workflow.Terminate(workflow.AtlasCredentialsNotProvided, err.Error())
ctx.SetConditionFromResult(status.DataFederationReadyType, result)
if errRm := r.removeDeletionFinalizer(contextInt, dataFederation); errRm != nil {
if errRm := customresource.ManageFinalizer(contextInt, r.Client, dataFederation, customresource.UnsetFinalizer); errRm != nil {
result = workflow.Terminate(workflow.Internal, errRm.Error())
return result.ReconcileResult(), nil
}
Expand All @@ -110,6 +110,26 @@ func (r *AtlasDataFederationReconciler) Reconcile(contextInt context.Context, re
}
ctx.Client = atlasClient

owner, err := customresource.IsOwner(dataFederation, r.ObjectDeletionProtection, customresource.IsResourceManagedByOperator, managedByAtlas(contextInt, atlasClient, project.ID(), log))
if err != nil {
result = workflow.Terminate(workflow.Internal, fmt.Sprintf("unable to resolve ownership for deletion protection: %s", err))
ctx.SetConditionFromResult(status.DataFederationReadyType, result)
log.Error(result.GetMessage())

return result.ReconcileResult(), nil
}

if !owner {
result = workflow.Terminate(
workflow.AtlasDeletionProtection,
"unable to reconcile DataFederation: it already exists in Atlas, it was not previously managed by the operator, and the deletion protection is enabled.",
)
ctx.SetConditionFromResult(status.DataFederationReadyType, result)
log.Error(result.GetMessage())

return result.ReconcileResult(), nil
}

if result = r.ensureDataFederation(ctx, project, dataFederation); !result.IsOk() {
ctx.SetConditionFromResult(status.DataFederationReadyType, result)
return result.ReconcileResult(), nil
Expand Down Expand Up @@ -142,8 +162,8 @@ func (r *AtlasDataFederationReconciler) Reconcile(contextInt context.Context, re

if !dataFederation.GetDeletionTimestamp().IsZero() {
if customresource.HaveFinalizer(dataFederation, customresource.FinalizerLabel) {
if customresource.ResourceShouldBeLeftInAtlas(dataFederation) {
log.Infof("Not removing AtlasDataFederation from Atlas as the '%s' annotation is set", customresource.ResourcePolicyAnnotation)
if customresource.IsResourceProtected(dataFederation, r.ObjectDeletionProtection) {
log.Info("Not removing AtlasDataFederation from Atlas as per configuration")
} else {
if err = r.deleteDataFederationFromAtlas(contextInt, &atlasClient, dataFederation, project, log); err != nil {
log.Errorf("failed to remove DataFederation from Atlas: %s", err)
Expand All @@ -152,8 +172,8 @@ func (r *AtlasDataFederationReconciler) Reconcile(contextInt context.Context, re
return result.ReconcileResult(), nil
}
}
if err = r.removeDeletionFinalizer(contextInt, dataFederation); err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
if err = customresource.ManageFinalizer(contextInt, r.Client, dataFederation, customresource.UnsetFinalizer); err != nil {
result = workflow.Terminate(workflow.AtlasFinalizerNotRemoved, err.Error())
log.Errorw("failed to remove finalizer", "error", err)
return result.ReconcileResult(), nil
}
Expand All @@ -162,21 +182,17 @@ func (r *AtlasDataFederationReconciler) Reconcile(contextInt context.Context, re
}
}

ctx.SetConditionTrue(status.ReadyType)
return workflow.OK().ReconcileResult(), nil
}

func (r *AtlasDataFederationReconciler) removeDeletionFinalizer(context context.Context, df *mdbv1.AtlasDataFederation) error {
err := r.Client.Get(context, kube.ObjectKeyFromObject(df), df)
err = customresource.ApplyLastConfigApplied(contextInt, project, r.Client)
if err != nil {
return fmt.Errorf("cannot get AtlasDeployment while adding finalizer: %w", err)
}
result = workflow.Terminate(workflow.Internal, err.Error())
ctx.SetConditionFromResult(status.DataFederationReadyType, result)
log.Error(result.GetMessage())

customresource.UnsetFinalizer(df, customresource.FinalizerLabel)
if err = r.Client.Update(context, df); err != nil {
return fmt.Errorf("failed to remove deletion finalizer from %s: %w", df.GetName(), err)
return result.ReconcileResult(), nil
}
return nil

ctx.SetConditionTrue(status.ReadyType)
return workflow.OK().ReconcileResult(), nil
}

func (r *AtlasDataFederationReconciler) deleteDataFederationFromAtlas(ctx context.Context, client *mongodbatlas.Client, df *mdbv1.AtlasDataFederation, project *mdbv1.AtlasProject, log *zap.SugaredLogger) error {
Expand All @@ -191,7 +207,7 @@ func (r *AtlasDataFederationReconciler) deleteDataFederationFromAtlas(ctx contex
}

if err != nil {
log.Errorw("Can not delete Atlas data federation", "error", err)
log.Errorw("Can not delete Atlas DataFederation", "error", err)
return err
}

Expand All @@ -206,22 +222,11 @@ func (r *AtlasDataFederationReconciler) readProjectResource(ctx context.Context,
}

func (r *AtlasDataFederationReconciler) SetupWithManager(mgr ctrl.Manager) error {
c, err := controller.New("AtlasDataFederation", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

// Watch for changes to primary resource DataFederation & handle delete separately
err = c.Watch(&source.Kind{Type: &mdbv1.AtlasDataFederation{}}, &watch.EventHandlerWithDelete{Controller: r}, r.GlobalPredicates...)
if err != nil {
return err
}
err = c.Watch(&source.Kind{Type: &mdbv1.AtlasDataFederation{}}, &handler.EnqueueRequestForObject{}, r.GlobalPredicates...)
if err != nil {
return err
}

return nil
return ctrl.NewControllerManagedBy(mgr).
Named("AtlasDataFederation").
Watches(&source.Kind{Type: &mdbv1.AtlasDataFederation{}}, &watch.EventHandlerWithDelete{Controller: r}, builder.WithPredicates(r.GlobalPredicates...)).
For(&mdbv1.AtlasDataFederation{}, builder.WithPredicates(r.GlobalPredicates...)).
Complete(r)
}

// Delete implements a handler for the Delete event
Expand Down Expand Up @@ -260,3 +265,27 @@ func (r *AtlasDataFederationReconciler) Delete(e event.DeleteEvent) error {

return nil
}

func managedByAtlas(ctx context.Context, atlasClient mongodbatlas.Client, projectID string, log *zap.SugaredLogger) customresource.AtlasChecker {
return func(resource mdbv1.AtlasCustomResource) (bool, error) {
dataFederation, ok := resource.(*mdbv1.AtlasDataFederation)
if !ok {
return false, errors.New("failed to match resource type as AtlasDataFederation")
}

atlasDataFederation, _, err := atlasClient.DataFederation.Get(ctx, projectID, dataFederation.Spec.Name)
if err != nil {
var apiError *mongodbatlas.ErrorResponse
if errors.As(err, &apiError) && (apiError.ErrorCode == atlas.DataFederationTenantNotFound || apiError.ErrorCode == atlas.ResourceNotFound) {
return false, nil
}
return false, err
}

isSame, err := dataFederationMatchesSpec(log, atlasDataFederation, dataFederation)
if err != nil {
return true, nil
}
return !isSame, nil
}
}
Loading

0 comments on commit b2f37bd

Please sign in to comment.