Skip to content

Commit 3629254

Browse files
committed
use cluster identity for azcopy
1 parent ded1639 commit 3629254

File tree

7 files changed

+429
-36
lines changed

7 files changed

+429
-36
lines changed

deploy/example/cloning/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ outfile
2121
## Create a PVC from an existing PVC
2222
> Make sure application is not writing data to source blob container
2323
```console
24-
kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/blob-csi-driver/master/deploy/example/cloning/pvc-blob-cloning.yaml
24+
kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/blob-csi-driver/master/deploy/example/cloning/pvc-blob-csi-cloning.yaml
2525
```
2626
### Check the Creation Status
2727

hack/update-mock.sh

100644100755
File mode changed.

pkg/blob/controllerserver.go

Lines changed: 89 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"net/url"
23+
"os"
2324
"os/exec"
2425
"strconv"
2526
"strings"
@@ -50,6 +51,15 @@ import (
5051
const (
5152
privateEndpoint = "privateendpoint"
5253

54+
azcopyAutoLoginType = "AZCOPY_AUTO_LOGIN_TYPE"
55+
azcopySPAApplicationID = "AZCOPY_SPA_APPLICATION_ID"
56+
azcopySPAClientSecret = "AZCOPY_SPA_CLIENT_SECRET"
57+
azcopyTenantID = "AZCOPY_TENANT_ID"
58+
azcopyMSIClientID = "AZCOPY_MSI_CLIENT_ID"
59+
MSI = "MSI"
60+
SPN = "SPN"
61+
authorizationPermissionMismatch = "ERROR CODE: AuthorizationPermissionMismatch"
62+
5363
waitForCopyInterval = 5 * time.Second
5464
waitForCopyTimeout = 3 * time.Minute
5565
)
@@ -73,7 +83,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
7383
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
7484
// logging the job status if it's volume cloning
7585
if req.GetVolumeContentSource() != nil {
76-
jobState, percent, err := d.azcopy.GetAzcopyJob(volName)
86+
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
7787
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
7888
}
7989
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
@@ -412,12 +422,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
412422
}
413423

414424
if req.GetVolumeContentSource() != nil {
415-
if accountKey == "" {
416-
if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
417-
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
418-
}
425+
var accountSASToken string
426+
if accountSASToken, err = d.getSASToken(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
427+
return nil, err
419428
}
420-
if err := d.copyVolume(ctx, req, accountKey, validContainerName, storageEndpointSuffix); err != nil {
429+
if err := d.copyVolume(ctx, req, accountSASToken, validContainerName, storageEndpointSuffix); err != nil {
421430
return nil, err
422431
}
423432
} else {
@@ -712,7 +721,7 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
712721
}
713722

714723
// CopyBlobContainer copies a blob container in the same storage account
715-
func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
724+
func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeRequest, accountSasToken, dstContainerName, storageEndpointSuffix string) error {
716725
var sourceVolumeID string
717726
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
718727
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
@@ -726,18 +735,19 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
726735
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
727736
}
728737

729-
klog.V(2).Infof("generate sas token for account(%s)", accountName)
730-
accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
731-
if genErr != nil {
732-
return genErr
738+
var authAzcopyEnv []string
739+
if accountSasToken == "" {
740+
if authAzcopyEnv, err = d.authorizeAzcopyWithIdentity(); err != nil {
741+
return err
742+
}
733743
}
734744

735745
timeAfter := time.After(waitForCopyTimeout)
736746
timeTick := time.Tick(waitForCopyInterval)
737747
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
738748
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
739749

740-
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
750+
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
741751
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
742752
if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
743753
return err
@@ -746,14 +756,18 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
746756
for {
747757
select {
748758
case <-timeTick:
749-
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
759+
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
750760
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
751761
switch jobState {
752762
case util.AzcopyJobError, util.AzcopyJobCompleted:
753763
return err
754764
case util.AzcopyJobNotFound:
755765
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
756-
out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
766+
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
767+
if len(authAzcopyEnv) > 0 {
768+
cmd.Env = append(os.Environ(), authAzcopyEnv...)
769+
}
770+
out, copyErr := cmd.CombinedOutput()
757771
if copyErr != nil {
758772
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
759773
} else {
@@ -768,18 +782,77 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
768782
}
769783

770784
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
771-
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
785+
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountSASToken, dstContainerName, storageEndpointSuffix string) error {
772786
vs := req.VolumeContentSource
773787
switch vs.Type.(type) {
774788
case *csi.VolumeContentSource_Snapshot:
775789
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
776790
case *csi.VolumeContentSource_Volume:
777-
return d.copyBlobContainer(ctx, req, accountKey, dstContainerName, storageEndpointSuffix)
791+
return d.copyBlobContainer(ctx, req, accountSASToken, dstContainerName, storageEndpointSuffix)
778792
default:
779793
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
780794
}
781795
}
782796

797+
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
798+
azureAuthConfig := d.cloud.Config.AzureAuthConfig
799+
var authAzcopyEnv []string
800+
if azureAuthConfig.UseManagedIdentityExtension {
801+
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
802+
if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
803+
klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
804+
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
805+
} else {
806+
klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
807+
}
808+
return authAzcopyEnv, nil
809+
}
810+
if len(azureAuthConfig.AADClientSecret) > 0 {
811+
klog.V(2).Infof("use service principal to authorize azcopy")
812+
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
813+
if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
814+
return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
815+
}
816+
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
817+
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
818+
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
819+
klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))
820+
821+
return authAzcopyEnv, nil
822+
}
823+
return []string{}, fmt.Errorf("AADClientSecret shouldn't be nil or useManagedIdentityExtension must be set to true")
824+
}
825+
826+
// getSASToken will only generate sas token for azcopy in following conditions:
827+
// 1. secrets is not empty
828+
// 2. driver is not using managed identity and service principal
829+
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
830+
func (d *Driver) getSASToken(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, error) {
831+
authAzcopyEnv, _ := d.authorizeAzcopyWithIdentity()
832+
useSasTokenFallBack := false
833+
// test by azcopy list command, if it returns AuthorizationPermissionMismatch error, then it is because sp or identity does not have permission of blob storage contributor, use sas token instead
834+
if len(authAzcopyEnv) > 0 {
835+
out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
836+
if strings.Contains(out, authorizationPermissionMismatch) {
837+
klog.Warningf("azcopy list command failed with AuthorizationPermissionMismatch error(%v), should assign \"Storage Blob Data Contributor\" role to service principal or managed identity, use sas token instead", testErr)
838+
useSasTokenFallBack = true
839+
} else if testErr != nil {
840+
return "", fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
841+
}
842+
}
843+
if len(secrets) > 0 || len(d.cloud.Config.AzureAuthConfig.AADClientSecret) == 0 && !d.cloud.Config.AzureAuthConfig.UseManagedIdentityExtension || useSasTokenFallBack {
844+
var err error
845+
if accountKey == "" {
846+
if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
847+
return "", status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
848+
}
849+
}
850+
klog.V(2).Infof("generate sas token for account(%s)", accountName)
851+
return generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
852+
}
853+
return "", nil
854+
}
855+
783856
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
784857
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
785858
if len(volCaps) == 0 {

0 commit comments

Comments
 (0)