Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLOUPD-193335: Deletion Protection Serverless Private Endpoints #1093

Merged
merged 1 commit into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 53 additions & 42 deletions pkg/controller/atlasdeployment/atlasdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,53 +153,67 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl.
// Allow users to specify M0/M2/M5 deployments without providing TENANT for Normal and Serverless deployments
r.verifyNonTenantCase(deployment)

if result := r.checkDeploymentIsManaged(workflowCtx, context, log, project, deployment); !result.IsOk() {
return result.ReconcileResult(), nil
// convertedDeployment is either serverless or advanced, deployment must be kept unchanged
// convertedDeployment is always a separate copy, to avoid changes on it to go back to k8s
convertedDeployment := deployment.DeepCopy()
if deployment.IsLegacyDeployment() {
if err := ConvertLegacyDeployment(&convertedDeployment.Spec); err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to convert legacy deployment", "error", err)
return result.ReconcileResult(), nil
}
convertedDeployment.Spec.DeploymentSpec = nil
}

deletionRequest, result := r.handleDeletion(workflowCtx, context, log, prevResult, project, deployment)
if deletionRequest {
if result := r.checkDeploymentIsManaged(workflowCtx, context, log, project, convertedDeployment); !result.IsOk() {
return result.ReconcileResult(), nil
}

err = customresource.ApplyLastConfigApplied(context, deployment, r.Client)
if err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
log.Error(result.GetMessage())

deletionRequest, result := r.handleDeletion(workflowCtx, context, log, prevResult, project, deployment)
if deletionRequest {
return result.ReconcileResult(), nil
}

if deployment.IsLegacyDeployment() {
if err := ConvertLegacyDeployment(&deployment.Spec); err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to convert legacy deployment", "error", err)
return result.ReconcileResult(), nil
}
deployment.Spec.DeploymentSpec = nil
}

if err := uniqueKey(&deployment.Spec); err != nil {
if err := uniqueKey(&convertedDeployment.Spec); err != nil {
log.Errorw("failed to validate tags", "error", err)
result := workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
}

handleDeployment := r.selectDeploymentHandler(deployment)
if result, _ := handleDeployment(workflowCtx, project, deployment, req); !result.IsOk() {
handleDeployment := r.selectDeploymentHandler(convertedDeployment)
if result, _ := handleDeployment(context, workflowCtx, project, convertedDeployment, req); !result.IsOk() {
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
return r.registerConfigAndReturn(workflowCtx, context, log, deployment, result), nil
helderjs marked this conversation as resolved.
Show resolved Hide resolved
}

if !deployment.IsServerless() {
if result := r.handleAdvancedOptions(workflowCtx, project, deployment); !result.IsOk() {
if !convertedDeployment.IsServerless() {
if result := r.handleAdvancedOptions(workflowCtx, project, convertedDeployment); !result.IsOk() {
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
return r.registerConfigAndReturn(workflowCtx, context, log, deployment, result), nil
}
}

return r.registerConfigAndReturn(workflowCtx, context, log, deployment, workflow.OK()), nil
}

func (r *AtlasDeploymentReconciler) registerConfigAndReturn(
workflowCtx *workflow.Context,
context context.Context,
log *zap.SugaredLogger,
deployment *mdbv1.AtlasDeployment, // this must be the original non converted deployment
result workflow.Result) ctrl.Result {
if result.IsOk() || result.IsInProgress() {
josvazg marked this conversation as resolved.
Show resolved Hide resolved
err := customresource.ApplyLastConfigApplied(context, deployment, r.Client)
if err != nil {
alternateResult := workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, alternateResult)
log.Error(result.GetMessage())

return result.ReconcileResult()
}
}
return workflow.OK().ReconcileResult(), nil
return result.ReconcileResult()
}

func (r *AtlasDeploymentReconciler) verifyNonTenantCase(deployment *mdbv1.AtlasDeployment) {
Expand Down Expand Up @@ -231,19 +245,16 @@ func (r *AtlasDeploymentReconciler) checkDeploymentIsManaged(
project *mdbv1.AtlasProject,
deployment *mdbv1.AtlasDeployment,
) workflow.Result {
advancedDeployment := deployment
if deployment.IsLegacyDeployment() {
advancedDeployment = deployment.DeepCopy()
if err := ConvertLegacyDeployment(&advancedDeployment.Spec); err != nil {
result := workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to temporary convert legacy deployment", "error", err)
return result
}
advancedDeployment.Spec.DeploymentSpec = nil
result := workflow.Terminate(workflow.Internal, "ownership check expected a converted deployment, not a legacy one")
workflowCtx.SetConditionFromResult(status.DatabaseUserReadyType, result)
log.Error(result.GetMessage())

return result
}

owner, err := customresource.IsOwner(
advancedDeployment,
deployment,
r.ObjectDeletionProtection,
customresource.IsResourceManagedByOperator,
managedByAtlas(context, workflowCtx.Client, project.ID(), log),
Expand Down Expand Up @@ -277,7 +288,7 @@ func (r *AtlasDeploymentReconciler) handleDeletion(
log *zap.SugaredLogger,
prevResult workflow.Result,
project *mdbv1.AtlasProject,
deployment *mdbv1.AtlasDeployment,
deployment *mdbv1.AtlasDeployment, // this must be the original non converted deployment
) (bool, workflow.Result) {
if deployment.GetDeletionTimestamp().IsZero() {
if !customresource.HaveFinalizer(deployment, customresource.FinalizerLabel) {
Expand Down Expand Up @@ -358,7 +369,7 @@ func (r *AtlasDeploymentReconciler) selectDeploymentHandler(deployment *mdbv1.At
}

// handleAdvancedDeployment ensures the state of the deployment using the Advanced Deployment API
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureAdvancedDeploymentState(workflowCtx, project, deployment)
if c != nil && c.StateName != "" {
workflowCtx.EnsureStatusOption(status.AtlasDeploymentStateNameOption(c.StateName))
Expand Down Expand Up @@ -387,7 +398,7 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

if err := r.ensureBackupScheduleAndPolicy(
context.Background(),
ctx,
workflowCtx, project.ID(),
deployment,
backupEnabled,
Expand All @@ -411,8 +422,8 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

// handleServerlessInstance ensures the state of the serverless instance using the serverless API
func (r *AtlasDeploymentReconciler) handleServerlessInstance(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := ensureServerlessInstanceState(workflowCtx, project, deployment.Spec.ServerlessSpec)
func (r *AtlasDeploymentReconciler) handleServerlessInstance(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureServerlessInstanceState(ctx, workflowCtx, project, deployment)
return r.ensureConnectionSecretsAndSetStatusOptions(workflowCtx, project, deployment, result, c)
}

Expand Down Expand Up @@ -579,7 +590,7 @@ func (r *AtlasDeploymentReconciler) removeDeletionFinalizer(context context.Cont
return nil
}

type deploymentHandlerFunc func(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)
type deploymentHandlerFunc func(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)

type atlasClusterType int

Expand Down
25 changes: 23 additions & 2 deletions pkg/controller/atlasdeployment/atlasdeployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestDeploymentManaged(t *testing.T) {
},
}
project := testProject(fakeNamespace)
deployment := v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment)
deployment := asAdvanced(v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment))
te := newTestDeploymentEnv(t, tc.protected, atlasClient, testK8sClient(), project, deployment)
if tc.managedTag {
customresource.SetAnnotation(te.deployment, customresource.AnnotationLastAppliedConfiguration, "")
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestProtectedAdvancedDeploymentManagedInAtlas(t *testing.T) {
},
},
}
deployment := v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment)
deployment := asAdvanced(v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment))
te := newTestDeploymentEnv(t, protected, atlasClient, testK8sClient(), project, deployment)

result := te.reconciler.checkDeploymentIsManaged(te.workflowCtx, te.context, te.log, te.project, te.deployment)
Expand All @@ -146,6 +146,27 @@ func TestProtectedAdvancedDeploymentManagedInAtlas(t *testing.T) {
}
}

func TestLegacyIsManagedInAtlasMustFail(t *testing.T) {
t.Run("Legacy deployment must fail to check if it is managed in Atlas", func(t *testing.T) {
protected := true
project := testProject(fakeNamespace)
inAtlas := differentAdvancedDeployment(fakeNamespace)
atlasClient := mongodbatlas.Client{
AdvancedClusters: &advancedClustersClientMock{
GetFn: func(groupID string, clusterName string) (*mongodbatlas.AdvancedCluster, *mongodbatlas.Response, error) {
return inAtlas, nil, nil
},
},
}
deployment := v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment)
te := newTestDeploymentEnv(t, protected, atlasClient, testK8sClient(), project, deployment)

result := te.reconciler.checkDeploymentIsManaged(te.workflowCtx, te.context, te.log, te.project, te.deployment)

assert.Regexp(t, regexp.MustCompile("ownership check expected a converted deployment"), result.GetMessage())
})
}

func TestProtectedServerlessManagedInAtlas(t *testing.T) {
testCases := []struct {
title string
Expand Down
16 changes: 10 additions & 6 deletions pkg/controller/atlasdeployment/serverless_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasProject, serverlessSpec *mdbv1.ServerlessSpec) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
atlasDeployment, resp, err := ctx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
func (r *AtlasDeploymentReconciler) ensureServerlessInstanceState(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return nil, workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
serverlessSpec := deployment.Spec.ServerlessSpec
atlasDeployment, resp, err := workflowCtx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
if err != nil {
if resp == nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
Expand All @@ -28,8 +32,8 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
if err != nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
}
ctx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
workflowCtx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
Name: serverlessSpec.Name,
ProviderSettings: &mongodbatlas.ServerlessProviderSettings{
BackingProviderName: serverlessSpec.ProviderSettings.BackingProviderName,
Expand All @@ -53,7 +57,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
convertedDeployment.Tags = &[]*mongodbatlas.Tag{}
}
if !isTagsEqual(*(atlasDeployment.Tags), *(convertedDeployment.Tags)) {
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{
Tag: convertedDeployment.Tags,
ServerlessBackupOptions: &mongodbatlas.ServerlessBackupOptions{
ServerlessContinuousBackupEnabled: &serverlessSpec.BackupOptions.ServerlessContinuousBackupEnabled,
Expand All @@ -65,7 +69,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
}
return atlasDeployment, workflow.InProgress(workflow.DeploymentUpdating, "deployment is updating")
}
result := ensureServerlessPrivateEndpoints(ctx, project.ID(), serverlessSpec, atlasDeployment.Name)
result := ensureServerlessPrivateEndpoints(ctx, workflowCtx, project.ID(), deployment, atlasDeployment.Name, r.SubObjectDeletionProtection)
return atlasDeployment, result

case status.StateCREATING:
Expand Down
103 changes: 101 additions & 2 deletions pkg/controller/atlasdeployment/serverless_private_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package atlasdeployment

import (
"context"
"encoding/json"
"fmt"
"sort"

"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/stringutil"

Expand All @@ -14,6 +16,7 @@ import (

mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/provider"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

Expand All @@ -28,10 +31,30 @@ const (
SPEStatusFailed = "FAILED" //stage 2
)

func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string, deploymentSpec *mdbv1.ServerlessSpec, deploymentName string) workflow.Result {
if deploymentSpec == nil {
func ensureServerlessPrivateEndpoints(ctx context.Context, service *workflow.Context, groupID string, deployment *mdbv1.AtlasDeployment, deploymentName string, protected bool) workflow.Result {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
deploymentSpec := deployment.Spec.ServerlessSpec

canReconcile, err := canServerlessPrivateEndpointsReconcile(ctx, service, protected, groupID, deployment)
if err != nil {
result := workflow.Terminate(workflow.Internal, fmt.Sprintf("unable to resolve ownership for deletion protection: %s", err))
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

if !canReconcile {
result := workflow.Terminate(
workflow.AtlasDeletionProtection,
"unable to reconcile Serverless Private Endpoints due to deletion protection being enabled. see https://dochub.mongodb.org/core/ako-deletion-protection for further information",
)
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

providerName := GetServerlessProvider(deploymentSpec)
if providerName == provider.ProviderGCP {
if len(deploymentSpec.PrivateEndpoints) == 0 {
Expand All @@ -57,6 +80,82 @@ func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string,
return result
}

func canServerlessPrivateEndpointsReconcile(ctx context.Context, service *workflow.Context, protected bool, groupID string, deployment *mdbv1.AtlasDeployment) (bool, error) {
if !protected {
return true, nil
}

latestConfig := &mdbv1.AtlasDeploymentSpec{}
latestConfigString, ok := deployment.Annotations[customresource.AnnotationLastAppliedConfiguration]
if ok {
if err := json.Unmarshal([]byte(latestConfigString), latestConfig); err != nil {
return false, err
}
}

atlasClient := service.Client
existingPE, err := getAllExistingServerlessPE(ctx, atlasClient.ServerlessPrivateEndpoints, groupID, deployment.Spec.ServerlessSpec.Name)
if err != nil {
return false, err
}

if len(existingPE) == 0 {
return true, nil
}

logger := service.Log
prevCfg := prevPEConfig(latestConfig)
if matchingPEs(logger, deployment.Spec.ServerlessSpec.PrivateEndpoints, existingPE) ||
josvazg marked this conversation as resolved.
Show resolved Hide resolved
matchingPEs(logger, prevCfg, existingPE) {
return true, nil
}
return false, nil
}

func sortedK8sPENames(spes []mdbv1.ServerlessPrivateEndpoint) []string {
names := make([]string, 0, len(spes))
for _, spe := range spes {
names = append(names, spe.Name)
}
sort.Strings(names)
return names
}

func sortedAtlasPENames(atlasPEs []mongodbatlas.ServerlessPrivateEndpointConnection) []string {
names := make([]string, 0, len(atlasPEs))
for _, atlasPE := range atlasPEs {
names = append(names, atlasPE.Comment)
}
sort.Strings(names)
return names
}

func matchingPEs(logger *zap.SugaredLogger, spes []mdbv1.ServerlessPrivateEndpoint, atlasPEs []mongodbatlas.ServerlessPrivateEndpointConnection) bool {
k8sPENames := sortedK8sPENames(spes)
atlasPENames := sortedAtlasPENames(atlasPEs)
if len(k8sPENames) != len(atlasPEs) {
logger.Debugf("Kubernetes PEs do not match Atlas: k8s %v != Atlas %v", k8sPENames, atlasPENames)
logger.Debugf("Different PE sets lengths Kubernetes wants %d but atlas has %d", len(k8sPENames), len(atlasPEs))
return false
}
for i, k8sName := range k8sPENames {
if atlasPENames[i] != k8sName {
logger.Debugf("Kubernetes PEs do not match Atlas: k8s %v != Atlas %v", k8sPENames, atlasPENames)
logger.Debugf("Different PE at index %d %d but atlas has %d", k8sName, atlasPENames[i])
return false
}
}
logger.Debugf("Kubernetes PEs MATCH Atlas: k8s %v == Atlas %v", k8sPENames, atlasPENames)
return true
}

func prevPEConfig(deploymentSpec *mdbv1.AtlasDeploymentSpec) []mdbv1.ServerlessPrivateEndpoint {
if deploymentSpec.ServerlessSpec == nil || deploymentSpec.ServerlessSpec.PrivateEndpoints == nil {
return []mdbv1.ServerlessPrivateEndpoint{}
}
return deploymentSpec.ServerlessSpec.PrivateEndpoints
}

func GetServerlessProvider(deploymentSpec *mdbv1.ServerlessSpec) provider.ProviderName {
if deploymentSpec.ProviderSettings.ProviderName != provider.ProviderServerless {
return deploymentSpec.ProviderSettings.ProviderName
Expand Down
Loading
Loading