Skip to content

Commit

Permalink
Support nuking elasticache replication groups (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackproser authored Jul 26, 2022
1 parent cb3d91b commit e617142
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 21 deletions.
131 changes: 114 additions & 17 deletions aws/elasticache.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package aws

import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/gruntwork-io/cloud-nuke/config"
Expand Down Expand Up @@ -45,6 +47,95 @@ func shouldIncludeElasticacheCluster(cluster *elasticache.CacheCluster, excludeA
)
}

type CacheClusterType string

const (
Replication CacheClusterType = "replication"
Single CacheClusterType = "single"
)

func determineCacheClusterType(svc *elasticache.ElastiCache, clusterId *string) (*string, CacheClusterType, error) {
// A single cache cluster can either be standalone, in the case where Engine is memcached,
// or a member of a replication group, in the case where Engine is Redis, so we must
// check the current clusterId via both describe methods, otherwise we'll fail to find it
describeParams := &elasticache.DescribeCacheClustersInput{
CacheClusterId: clusterId,
}

cacheClustersOutput, describeErr := svc.DescribeCacheClusters(describeParams)
if describeErr != nil {
if awsErr, ok := describeErr.(awserr.Error); ok {
if awsErr.Code() == elasticache.ErrCodeCacheClusterNotFoundFault {
// It's possible that we're looking at a replication group, in which case we can safely ignore this error
} else {
return nil, Single, describeErr
}
}
}

replicationGroupDescribeParams := &elasticache.DescribeReplicationGroupsInput{
ReplicationGroupId: clusterId,
}

replicationGroupOutput, describeReplicationGroupsErr := svc.DescribeReplicationGroups(replicationGroupDescribeParams)
if describeReplicationGroupsErr != nil {
if awsErr, ok := describeReplicationGroupsErr.(awserr.Error); ok {
if awsErr.Code() == elasticache.ErrCodeReplicationGroupNotFoundFault {
// It's possible that we're looking at a cache cluster, in which case we can safely ignore this error
} else {
return nil, Single, describeReplicationGroupsErr
}
}
}

if len(cacheClustersOutput.CacheClusters) == 1 {
return cacheClustersOutput.CacheClusters[0].CacheClusterId, Single, nil
} else if len(replicationGroupOutput.ReplicationGroups) == 1 {
return replicationGroupOutput.ReplicationGroups[0].ReplicationGroupId, Replication, nil
}

return nil, Single, CouldNotLookupCacheClusterErr{ClusterId: clusterId}
}

func nukeNonReplicationGroupElasticacheCluster(svc *elasticache.ElastiCache, clusterId *string) error {
logging.Logger.Infof("Deleting Elasticache cluster Id: %s which is not a member of a replication group", aws.StringValue(clusterId))
params := elasticache.DeleteCacheClusterInput{
CacheClusterId: clusterId,
}
_, err := svc.DeleteCacheCluster(&params)
if err != nil {
return err
}

return svc.WaitUntilCacheClusterDeleted(&elasticache.DescribeCacheClustersInput{
CacheClusterId: clusterId,
})
}

func nukeReplicationGroupMemberElasticacheCluster(svc *elasticache.ElastiCache, clusterId *string) error {
logging.Logger.Infof("Elasticache cluster Id: %s is a member of a replication group. Therefore, deleting its replication group", aws.StringValue(clusterId))

params := &elasticache.DeleteReplicationGroupInput{
ReplicationGroupId: clusterId,
}
_, err := svc.DeleteReplicationGroup(params)
if err != nil {
return err
}

waitErr := svc.WaitUntilReplicationGroupDeleted(&elasticache.DescribeReplicationGroupsInput{
ReplicationGroupId: clusterId,
})

if waitErr != nil {
return waitErr
}

logging.Logger.Infof("Successfully deleted replication group Id: %s", aws.StringValue(clusterId))

return nil
}

func nukeAllElasticacheClusters(session *session.Session, clusterIds []*string) error {
svc := elasticache.New(session)

Expand All @@ -57,11 +148,22 @@ func nukeAllElasticacheClusters(session *session.Session, clusterIds []*string)

var deletedClusterIds []*string
for _, clusterId := range clusterIds {
params := elasticache.DeleteCacheClusterInput{
CacheClusterId: clusterId,
// We need to look up the cache cluster again to determine if it is a member of a replication group or not,
// because there are two separate codepaths for deleting a cluster. Cache clusters that are not members of a
// replication group can be deleted via DeleteCacheCluster, whereas those that are members require a call to
// DeleteReplicationGroup, which will destroy both the replication group and its member clusters
clusterId, clusterType, describeErr := determineCacheClusterType(svc, clusterId)
if describeErr != nil {
return describeErr
}

var err error
if clusterType == Single {
err = nukeNonReplicationGroupElasticacheCluster(svc, clusterId)
} else if clusterType == Replication {
err = nukeReplicationGroupMemberElasticacheCluster(svc, clusterId)
}

_, err := svc.DeleteCacheCluster(&params)
if err != nil {
logging.Logger.Errorf("[Failed] %s", err)
} else {
Expand All @@ -70,21 +172,16 @@ func nukeAllElasticacheClusters(session *session.Session, clusterIds []*string)
}
}

if len(deletedClusterIds) > 0 {
logging.Logger.Infof("Confirming deletion of %d Elasticache clusters in region %s", len(deletedClusterIds), *session.Config.Region)
logging.Logger.Infof("[OK] %d Elasticache clusters deleted in %s", len(deletedClusterIds), *session.Config.Region)
return nil
}

for _, clusterId := range deletedClusterIds {
params := elasticache.DescribeCacheClustersInput{
CacheClusterId: clusterId,
}
// Custome errors

err := svc.WaitUntilCacheClusterDeleted(&params)
if err != nil {
logging.Logger.Errorf("[Failed] %s", err)
}
}
}
type CouldNotLookupCacheClusterErr struct {
ClusterId *string
}

logging.Logger.Infof("[OK] %d Elasticache clusters deleted in %s", len(deletedClusterIds), *session.Config.Region)
return nil
func (err CouldNotLookupCacheClusterErr) Error() string {
return fmt.Sprintf("Failed to lookup clusterId: %s", aws.StringValue(err.ClusterId))
}
41 changes: 37 additions & 4 deletions aws/elasticache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,40 @@ func createTestElasticacheCluster(t *testing.T, session *session.Session, name s
require.NoError(t, err)
}

func createTestElasticacheReplicationGroup(t *testing.T, session *session.Session, name string) *string {
svc := elasticache.New(session)

params := &elasticache.CreateReplicationGroupInput{
ReplicationGroupDescription: awsgo.String(name),
ReplicationGroupId: awsgo.String(name),
Engine: awsgo.String("Redis"),
CacheNodeType: awsgo.String("cache.r6g.large"),
}

validationErr := params.Validate()
require.NoError(t, validationErr)

output, err := svc.CreateReplicationGroup(params)
require.NoError(t, err)

err = svc.WaitUntilReplicationGroupAvailable(&elasticache.DescribeReplicationGroupsInput{
ReplicationGroupId: output.ReplicationGroup.ReplicationGroupId,
})

require.NoError(t, err)

return output.ReplicationGroup.ReplicationGroupId
}

func TestListElasticacheClusters(t *testing.T) {
t.Parallel()

region, err := getRandomRegion()
require.NoError(t, err)

session, err := session.NewSession(&awsgo.Config{
Region: awsgo.String(region)},
Region: awsgo.String(region),
},
)

require.NoError(t, err)
Expand All @@ -66,7 +92,8 @@ func TestListElasticacheClustersWithConfigFile(t *testing.T) {
require.NoError(t, err)

session, err := session.NewSession(&awsgo.Config{
Region: awsgo.String(region)},
Region: awsgo.String(region),
},
)

require.NoError(t, err)
Expand Down Expand Up @@ -102,15 +129,21 @@ func TestNukeElasticacheClusters(t *testing.T) {
require.NoError(t, err)

session, err := session.NewSession(&awsgo.Config{
Region: awsgo.String(region)},
Region: awsgo.String(region),
},
)

require.NoError(t, err)

clusterId := "cloud-nuke-test-" + strings.ToLower(util.UniqueID())
createTestElasticacheCluster(t, session, clusterId)

err = nukeAllElasticacheClusters(session, []*string{&clusterId})
replicationGroupId := "cloud-nuke-test-" + strings.ToLower(util.UniqueID())
createTestElasticacheReplicationGroup(t, session, replicationGroupId)
// Ensure that nukeAllElasticacheClusters can handle both scenarios for elasticache:
// 1. The elasticache cluster is not the member of a replication group, so it can be deleted directly
// 2. The elasticache cluster is a member of a replication group, so that replication group must be deleted
err = nukeAllElasticacheClusters(session, []*string{&clusterId, &replicationGroupId})
require.NoError(t, err)

clusterIds, err := getAllElasticacheClusters(session, region, time.Now().Add(1*time.Hour), config.Config{})
Expand Down

0 comments on commit e617142

Please sign in to comment.