From e617142b504ec82a26277d9a1394b91baa979158 Mon Sep 17 00:00:00 2001 From: Zack Proser Date: Tue, 26 Jul 2022 15:45:35 -0400 Subject: [PATCH] Support nuking elasticache replication groups (#335) --- aws/elasticache.go | 131 ++++++++++++++++++++++++++++++++++------ aws/elasticache_test.go | 41 +++++++++++-- 2 files changed, 151 insertions(+), 21 deletions(-) diff --git a/aws/elasticache.go b/aws/elasticache.go index 7e2bf59b..a0e473e4 100644 --- a/aws/elasticache.go +++ b/aws/elasticache.go @@ -1,9 +1,11 @@ package aws import ( + "fmt" "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/elasticache" "github.com/gruntwork-io/cloud-nuke/config" @@ -45,6 +47,95 @@ func shouldIncludeElasticacheCluster(cluster *elasticache.CacheCluster, excludeA ) } +type CacheClusterType string + +const ( + Replication CacheClusterType = "replication" + Single CacheClusterType = "single" +) + +func determineCacheClusterType(svc *elasticache.ElastiCache, clusterId *string) (*string, CacheClusterType, error) { + // A single cache cluster can either be standalone, in the case where Engine is memcached, + // or a member of a replication group, in the case where Engine is Redis, so we must + // check the current clusterId via both describe methods, otherwise we'll fail to find it + describeParams := &elasticache.DescribeCacheClustersInput{ + CacheClusterId: clusterId, + } + + cacheClustersOutput, describeErr := svc.DescribeCacheClusters(describeParams) + if describeErr != nil { + if awsErr, ok := describeErr.(awserr.Error); ok { + if awsErr.Code() == elasticache.ErrCodeCacheClusterNotFoundFault { + // It's possible that we're looking at a replication group, in which case we can safely ignore this error + } else { + return nil, Single, describeErr + } + } + } + + replicationGroupDescribeParams := &elasticache.DescribeReplicationGroupsInput{ + ReplicationGroupId: clusterId, + } + + replicationGroupOutput, describeReplicationGroupsErr := svc.DescribeReplicationGroups(replicationGroupDescribeParams) + if describeReplicationGroupsErr != nil { + if awsErr, ok := describeReplicationGroupsErr.(awserr.Error); ok { + if awsErr.Code() == elasticache.ErrCodeReplicationGroupNotFoundFault { + // It's possible that we're looking at a cache cluster, in which case we can safely ignore this error + } else { + return nil, Single, describeReplicationGroupsErr + } + } + } + + if len(cacheClustersOutput.CacheClusters) == 1 { + return cacheClustersOutput.CacheClusters[0].CacheClusterId, Single, nil + } else if len(replicationGroupOutput.ReplicationGroups) == 1 { + return replicationGroupOutput.ReplicationGroups[0].ReplicationGroupId, Replication, nil + } + + return nil, Single, CouldNotLookupCacheClusterErr{ClusterId: clusterId} +} + +func nukeNonReplicationGroupElasticacheCluster(svc *elasticache.ElastiCache, clusterId *string) error { + logging.Logger.Infof("Deleting Elasticache cluster Id: %s which is not a member of a replication group", aws.StringValue(clusterId)) + params := elasticache.DeleteCacheClusterInput{ + CacheClusterId: clusterId, + } + _, err := svc.DeleteCacheCluster(¶ms) + if err != nil { + return err + } + + return svc.WaitUntilCacheClusterDeleted(&elasticache.DescribeCacheClustersInput{ + CacheClusterId: clusterId, + }) +} + +func nukeReplicationGroupMemberElasticacheCluster(svc *elasticache.ElastiCache, clusterId *string) error { + logging.Logger.Infof("Elasticache cluster Id: %s is a member of a replication group. Therefore, deleting its replication group", aws.StringValue(clusterId)) + + params := &elasticache.DeleteReplicationGroupInput{ + ReplicationGroupId: clusterId, + } + _, err := svc.DeleteReplicationGroup(params) + if err != nil { + return err + } + + waitErr := svc.WaitUntilReplicationGroupDeleted(&elasticache.DescribeReplicationGroupsInput{ + ReplicationGroupId: clusterId, + }) + + if waitErr != nil { + return waitErr + } + + logging.Logger.Infof("Successfully deleted replication group Id: %s", aws.StringValue(clusterId)) + + return nil +} + func nukeAllElasticacheClusters(session *session.Session, clusterIds []*string) error { svc := elasticache.New(session) @@ -57,11 +148,22 @@ func nukeAllElasticacheClusters(session *session.Session, clusterIds []*string) var deletedClusterIds []*string for _, clusterId := range clusterIds { - params := elasticache.DeleteCacheClusterInput{ - CacheClusterId: clusterId, + // We need to look up the cache cluster again to determine if it is a member of a replication group or not, + // because there are two separate codepaths for deleting a cluster. Cache clusters that are not members of a + // replication group can be deleted via DeleteCacheCluster, whereas those that are members require a call to + // DeleteReplicationGroup, which will destroy both the replication group and its member clusters + clusterId, clusterType, describeErr := determineCacheClusterType(svc, clusterId) + if describeErr != nil { + return describeErr + } + + var err error + if clusterType == Single { + err = nukeNonReplicationGroupElasticacheCluster(svc, clusterId) + } else if clusterType == Replication { + err = nukeReplicationGroupMemberElasticacheCluster(svc, clusterId) } - _, err := svc.DeleteCacheCluster(¶ms) if err != nil { logging.Logger.Errorf("[Failed] %s", err) } else { @@ -70,21 +172,16 @@ func nukeAllElasticacheClusters(session *session.Session, clusterIds []*string) } } - if len(deletedClusterIds) > 0 { - logging.Logger.Infof("Confirming deletion of %d Elasticache clusters in region %s", len(deletedClusterIds), *session.Config.Region) + logging.Logger.Infof("[OK] %d Elasticache clusters deleted in %s", len(deletedClusterIds), *session.Config.Region) + return nil +} - for _, clusterId := range deletedClusterIds { - params := elasticache.DescribeCacheClustersInput{ - CacheClusterId: clusterId, - } +// Custome errors - err := svc.WaitUntilCacheClusterDeleted(¶ms) - if err != nil { - logging.Logger.Errorf("[Failed] %s", err) - } - } - } +type CouldNotLookupCacheClusterErr struct { + ClusterId *string +} - logging.Logger.Infof("[OK] %d Elasticache clusters deleted in %s", len(deletedClusterIds), *session.Config.Region) - return nil +func (err CouldNotLookupCacheClusterErr) Error() string { + return fmt.Sprintf("Failed to lookup clusterId: %s", aws.StringValue(err.ClusterId)) } diff --git a/aws/elasticache_test.go b/aws/elasticache_test.go index afcd4323..563fc094 100644 --- a/aws/elasticache_test.go +++ b/aws/elasticache_test.go @@ -35,6 +35,31 @@ func createTestElasticacheCluster(t *testing.T, session *session.Session, name s require.NoError(t, err) } +func createTestElasticacheReplicationGroup(t *testing.T, session *session.Session, name string) *string { + svc := elasticache.New(session) + + params := &elasticache.CreateReplicationGroupInput{ + ReplicationGroupDescription: awsgo.String(name), + ReplicationGroupId: awsgo.String(name), + Engine: awsgo.String("Redis"), + CacheNodeType: awsgo.String("cache.r6g.large"), + } + + validationErr := params.Validate() + require.NoError(t, validationErr) + + output, err := svc.CreateReplicationGroup(params) + require.NoError(t, err) + + err = svc.WaitUntilReplicationGroupAvailable(&elasticache.DescribeReplicationGroupsInput{ + ReplicationGroupId: output.ReplicationGroup.ReplicationGroupId, + }) + + require.NoError(t, err) + + return output.ReplicationGroup.ReplicationGroupId +} + func TestListElasticacheClusters(t *testing.T) { t.Parallel() @@ -42,7 +67,8 @@ func TestListElasticacheClusters(t *testing.T) { require.NoError(t, err) session, err := session.NewSession(&awsgo.Config{ - Region: awsgo.String(region)}, + Region: awsgo.String(region), + }, ) require.NoError(t, err) @@ -66,7 +92,8 @@ func TestListElasticacheClustersWithConfigFile(t *testing.T) { require.NoError(t, err) session, err := session.NewSession(&awsgo.Config{ - Region: awsgo.String(region)}, + Region: awsgo.String(region), + }, ) require.NoError(t, err) @@ -102,7 +129,8 @@ func TestNukeElasticacheClusters(t *testing.T) { require.NoError(t, err) session, err := session.NewSession(&awsgo.Config{ - Region: awsgo.String(region)}, + Region: awsgo.String(region), + }, ) require.NoError(t, err) @@ -110,7 +138,12 @@ func TestNukeElasticacheClusters(t *testing.T) { clusterId := "cloud-nuke-test-" + strings.ToLower(util.UniqueID()) createTestElasticacheCluster(t, session, clusterId) - err = nukeAllElasticacheClusters(session, []*string{&clusterId}) + replicationGroupId := "cloud-nuke-test-" + strings.ToLower(util.UniqueID()) + createTestElasticacheReplicationGroup(t, session, replicationGroupId) + // Ensure that nukeAllElasticacheClusters can handle both scenarios for elasticache: + // 1. The elasticache cluster is not the member of a replication group, so it can be deleted directly + // 2. The elasticache cluster is a member of a replication group, so that replication group must be deleted + err = nukeAllElasticacheClusters(session, []*string{&clusterId, &replicationGroupId}) require.NoError(t, err) clusterIds, err := getAllElasticacheClusters(session, region, time.Now().Add(1*time.Hour), config.Config{})