Skip to content

Commit

Permalink
Add possiblity to tag dynamically provisioned FSx volumes (#85)
Browse files Browse the repository at this point in the history
Included FSx client in order to be able to tag PVCs provisioned by
aws-fsx-csi-driver. Use case needed as we provision a lot of FSx
volumes, and need to have them uniquely tagged.

---------

Co-authored-by: m.bergamasco <m.bergamasco@sportradar.com>
  • Loading branch information
Mberga14 and m.bergamasco authored Nov 14, 2023
1 parent 16811a2 commit 1c9076e
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 9 deletions.
62 changes: 62 additions & 0 deletions aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/efs"
"github.com/aws/aws-sdk-go/service/efs/efsiface"
"github.com/aws/aws-sdk-go/service/fsx"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
Expand All @@ -54,6 +55,11 @@ type EBSClient struct {
ec2iface.EC2API
}

// FSx client
type FSxClient struct {
*fsx.FSx
}

// CustomRetryer for custom retry settings
type CustomRetryer struct {
client.DefaultRetryer
Expand Down Expand Up @@ -89,6 +95,12 @@ func newEC2Client() (*EBSClient, error) {
return &EBSClient{svc}, nil
}

// newFSxClient initializes an AWS client
func newFSxClient() (*FSxClient, error) {
svc := fsx.New(awsSession)
return &FSxClient{svc}, nil
}

func getMetadataRegion() (string, error) {
sess := session.Must(session.NewSession(&aws.Config{}))
svc := ec2metadata.New(sess)
Expand Down Expand Up @@ -189,3 +201,53 @@ func (client *EFSClient) deleteEFSVolumeTags(volumeID string, tags []string, sto
promActionsTotal.With(prometheus.Labels{"status": "success", "storageclass": storageclass}).Inc()
promActionsLegacyTotal.With(prometheus.Labels{"status": "success"}).Inc()
}

func (client *FSxClient) addFSxVolumeTags(volumeID string, tags map[string]string, storageclass string) {
volumeIDs := []*string{&volumeID}
describeFileSystemOutput, err := client.DescribeFileSystems(&fsx.DescribeFileSystemsInput{
FileSystemIds: volumeIDs,
})
if err != nil {
log.WithError(err)
return
}
_, err = client.TagResource(&fsx.TagResourceInput{
ResourceARN: describeFileSystemOutput.FileSystems[0].ResourceARN,
Tags: convertTagsToFSxTags(tags),
})

if err != nil {
log.Errorln("Could not FSx create tags for volumeID:", volumeID, err)
promActionsTotal.With(prometheus.Labels{"status": "error", "storageclass": storageclass}).Inc()
promActionsLegacyTotal.With(prometheus.Labels{"status": "error"}).Inc()
return
}

promActionsTotal.With(prometheus.Labels{"status": "success", "storageclass": storageclass}).Inc()
promActionsLegacyTotal.With(prometheus.Labels{"status": "success"}).Inc()
}

func (client *FSxClient) deleteFSxVolumeTags(volumeID string, tags []*string, storageclass string) {
volumeIDs := []*string{&volumeID}
describeVolumesOutput, err := client.DescribeVolumes(&fsx.DescribeVolumesInput{
VolumeIds: volumeIDs,
})
if err != nil {
log.WithError(err)
return
}
_, err = client.UntagResource(&fsx.UntagResourceInput{
ResourceARN: describeVolumesOutput.Volumes[0].ResourceARN,
TagKeys: tags,
})

if err != nil {
log.Errorln("Could not FSx delete tags for volumeID:", volumeID, err)
promActionsTotal.With(prometheus.Labels{"status": "error", "storageclass": storageclass}).Inc()
promActionsLegacyTotal.With(prometheus.Labels{"status": "error"}).Inc()
return
}

promActionsTotal.With(prometheus.Labels{"status": "success", "storageclass": storageclass}).Inc()
promActionsLegacyTotal.With(prometheus.Labels{"status": "success"}).Inc()
}
64 changes: 55 additions & 9 deletions kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"regexp"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/fsx"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -104,11 +106,12 @@ func watchForPersistentVolumeClaims(ch chan struct{}, watchNamespace string) {

efsClient, _ := newEFSClient()
ec2Client, _ := newEC2Client()
fsxClient, _ := newFSxClient()

_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pvc := getPVC(obj)
if !provisionedByAwsEfs(pvc) && !provisionedByAwsEbs(pvc) {
if !provisionedByAwsEfs(pvc) && !provisionedByAwsEbs(pvc) && !provisionedByAwsFsx(pvc) {
return
}
log.WithFields(log.Fields{"namespace": pvc.GetNamespace(), "pvc": pvc.GetName()}).Infoln("New PVC Added to Store")
Expand All @@ -123,6 +126,9 @@ func watchForPersistentVolumeClaims(ch chan struct{}, watchNamespace string) {
if provisionedByAwsEbs(pvc) {
ec2Client.addEBSVolumeTags(volumeID, tags, *pvc.Spec.StorageClassName)
}
if provisionedByAwsFsx(pvc) {
fsxClient.addFSxVolumeTags(volumeID, tags, *pvc.Spec.StorageClassName)
}
},
UpdateFunc: func(old, new interface{}) {

Expand All @@ -132,7 +138,7 @@ func watchForPersistentVolumeClaims(ch chan struct{}, watchNamespace string) {
log.WithFields(log.Fields{"namespace": newPVC.GetNamespace(), "pvc": newPVC.GetName()}).Debugln("ResourceVersion are the same")
return
}
if !provisionedByAwsEfs(newPVC) && !provisionedByAwsEbs(newPVC) {
if !provisionedByAwsEfs(newPVC) && !provisionedByAwsEbs(newPVC) && !provisionedByAwsFsx(newPVC) {
return
}
if newPVC.Spec.VolumeName == "" {
Expand All @@ -156,12 +162,17 @@ func watchForPersistentVolumeClaims(ch chan struct{}, watchNamespace string) {
if provisionedByAwsEbs(newPVC) {
ec2Client.addEBSVolumeTags(volumeID, tags, *newPVC.Spec.StorageClassName)
}
if provisionedByAwsFsx(newPVC) {
fsxClient.addFSxVolumeTags(volumeID, tags, *newPVC.Spec.StorageClassName)
}
}
oldTags := buildTags(oldPVC)
var deletedTags []string
var deletedTagsPtr []*string
for k := range oldTags {
if _, ok := tags[k]; !ok {
deletedTags = append(deletedTags, k)
deletedTagsPtr = append(deletedTagsPtr, &k)
}
}
if len(deletedTags) > 0 {
Expand All @@ -171,6 +182,9 @@ func watchForPersistentVolumeClaims(ch chan struct{}, watchNamespace string) {
if provisionedByAwsEbs(newPVC) {
ec2Client.deleteEBSVolumeTags(volumeID, deletedTags, *oldPVC.Spec.StorageClassName)
}
if provisionedByAwsFsx(newPVC) {
fsxClient.deleteFSxVolumeTags(volumeID, deletedTagsPtr, *oldPVC.Spec.StorageClassName)
}
}
},
})
Expand All @@ -182,6 +196,17 @@ func watchForPersistentVolumeClaims(ch chan struct{}, watchNamespace string) {
informer.Run(ch)
}

func convertTagsToFSxTags(tags map[string]string) []*fsx.Tag {
convertedTags := []*fsx.Tag{}
for tagKey, tagValue := range tags {
convertedTags = append(convertedTags, &fsx.Tag{
Key: aws.String(tagKey),
Value: aws.String(tagValue),
})
}
return convertedTags
}

func parseAWSEBSVolumeID(kubernetesID string) string {
// Pulled from https://github.com/kubernetes/csi-translation-lib/blob/release-1.26/plugins/aws_ebs.go#L244
if !strings.HasPrefix(kubernetesID, "aws://") {
Expand Down Expand Up @@ -379,6 +404,25 @@ func provisionedByAwsEbs(pvc *corev1.PersistentVolumeClaim) bool {
return false
}

func provisionedByAwsFsx(pvc *corev1.PersistentVolumeClaim) bool {
annotations := pvc.GetAnnotations()
if annotations == nil {
return false
}

provisionedBy, ok := getProvisionedBy(annotations)
if !ok {
log.WithFields(log.Fields{"namespace": pvc.GetNamespace(), "pvc": pvc.GetName()}).Debugln("no volume.kubernetes.io/storage-provisioner annotation")
return false
}

if provisionedBy == "fsx.csi.aws.com" {
log.WithFields(log.Fields{"namespace": pvc.GetNamespace(), "pvc": pvc.GetName()}).Debugln("fsx.csi.aws.com volume")
return true
}
return false
}

func processPersistentVolumeClaim(pvc *corev1.PersistentVolumeClaim) (string, map[string]string, error) {
tags := buildTags(pvc)

Expand Down Expand Up @@ -406,14 +450,16 @@ func processPersistentVolumeClaim(pvc *corev1.PersistentVolumeClaim) (string, ma
if pv.Spec.CSI != nil {
volumeID = pv.Spec.CSI.VolumeHandle
} else {
volumeID = parseAWSEBSVolumeID(pv.Spec.AWSElasticBlockStore.VolumeID)
}
volumeID = parseAWSEBSVolumeID(pv.Spec.AWSElasticBlockStore.VolumeID)
}
} else if provisionedBy == "efs.csi.aws.com" {
if pv.Spec.CSI != nil {
volumeID = parseAWSEFSVolumeID(pv.Spec.CSI.VolumeHandle)
}
} else if provisionedBy == "kubernetes.io/aws-ebs" {
volumeID = parseAWSEBSVolumeID(pv.Spec.AWSElasticBlockStore.VolumeID)
} else if provisionedBy == "fsx.csi.aws.com" {
volumeID = pv.Spec.CSI.VolumeHandle
}
log.WithFields(log.Fields{"namespace": pvc.GetNamespace(), "pvc": pvc.GetName(), "volumeID": volumeID}).Debugln("parsed volumeID:", volumeID)
if len(volumeID) == 0 {
Expand Down Expand Up @@ -449,14 +495,14 @@ func getPVC(obj interface{}) *corev1.PersistentVolumeClaim {
pvc := obj.(*corev1.PersistentVolumeClaim)

// https://kubernetes.io/docs/reference/labels-annotations-taints/#volume-beta-kubernetes-io-storage-class-deprecated
// The "volume.beta.kubernetes.io/storage-class" annotation is deprecated but can be used
// to specify the name of StorageClass in PVC. When both storageClassName attribute and
// volume.beta.kubernetes.io/storage-class annotation are specified, the annotation
// The "volume.beta.kubernetes.io/storage-class" annotation is deprecated but can be used
// to specify the name of StorageClass in PVC. When both storageClassName attribute and
// volume.beta.kubernetes.io/storage-class annotation are specified, the annotation
// volume.beta.kubernetes.io/storage-class takes precedence over the storageClassName attribute.
storageClassName, ok := pvc.GetAnnotations()["volume.beta.kubernetes.io/storage-class"]
if ok {
pvc.Spec.StorageClassName = &storageClassName
}

return pvc
}
}
47 changes: 47 additions & 0 deletions kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,53 @@ func Test_provisionedByAwsEfs(t *testing.T) {
}
}

func Test_provisionedByAwsFsx(t *testing.T) {

pvc := &corev1.PersistentVolumeClaim{}
pvc.SetName("my-pvc")
pvc.Spec.StorageClassName = &dummyStorageClassName

tests := []struct {
name string
annotations map[string]string
want bool
}{
{
name: "valid provisioner fsx.csi.aws.com",
annotations: map[string]string{"volume.kubernetes.io/storage-provisioner": "fsx.csi.aws.com"},
want: true,
},
{
name: "invalid provisioner",
annotations: map[string]string{"volume.kubernetes.io/storage-provisioner": "something else"},
want: false,
},
{
name: "valid provisioner fsx.csi.aws.com legacy annotation",
annotations: map[string]string{"volume.beta.kubernetes.io/storage-provisioner": "fsx.csi.aws.com"},
want: true,
},
{
name: "invalid provisioner legacy annotation",
annotations: map[string]string{"volume.beta.kubernetes.io/storage-provisioner": "something else"},
want: false,
},
{
name: "provisioner not set",
annotations: map[string]string{"some annotation": "something else"},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pvc.SetAnnotations(tt.annotations)
if got := provisionedByAwsFsx(pvc); got != tt.want {
t.Errorf("provisionedByAwsEfs() = %v, want %v", got, tt.want)
}
})
}
}

func Test_buildTags(t *testing.T) {

pvc := &corev1.PersistentVolumeClaim{}
Expand Down

0 comments on commit 1c9076e

Please sign in to comment.