Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use cluster identity for using azcopy in volume cloning #1654

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified hack/update-mock.sh
100644 → 100755
Empty file.
43 changes: 31 additions & 12 deletions pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
Expand Down Expand Up @@ -196,6 +197,8 @@ var (
supportedFSGroupChangePolicyList = []string{FSGroupChangeNone, string(v1.FSGroupChangeAlways), string(v1.FSGroupChangeOnRootMismatch)}

retriableErrors = []string{accountNotProvisioned, tooManyRequests, shareBeingDeleted, clientThrottled}

defaultAzcopyCopyOptions = []string{"--recursive", "--check-length=false"}
)

// Driver implements all interfaces of CSI drivers
Expand Down Expand Up @@ -250,6 +253,8 @@ type Driver struct {
resizeFileShareFailureCache azcache.Resource
// a timed cache storing volume stats <volumeID, volumeStats>
volStatsCache azcache.Resource
// a timed cache storing account which should use sastoken for azcopy based volume cloning
azcopySasTokenCache azcache.Resource
// sas expiry time for azcopy in volume clone
sasTokenExpirationMinutes int
// azcopy for provide exec mock for ut
Expand Down Expand Up @@ -320,6 +325,10 @@ func NewDriver(options *DriverOptions) *Driver {
klog.Fatalf("%v", err)
}

if driver.azcopySasTokenCache, err = azcache.NewTimedCache(15*time.Minute, getter, false); err != nil {
klog.Fatalf("%v", err)
}

if driver.resizeFileShareFailureCache, err = azcache.NewTimedCache(3*time.Minute, getter, false); err != nil {
klog.Fatalf("%v", err)
}
Expand Down Expand Up @@ -977,7 +986,7 @@ func (d *Driver) ResizeFileShare(ctx context.Context, subsID, resourceGroup, acc
}

// copyFileShare copies a fileshare in the same storage account
func (d *Driver) copyFileShare(req *csi.CreateVolumeRequest, accountKey string, shareOptions *fileclient.ShareOptions, storageEndpointSuffix string) error {
func (d *Driver) copyFileShare(ctx context.Context, req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, shareOptions *fileclient.ShareOptions, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
if shareOptions.Protocol == storage.EnabledProtocolsNFS {
return fmt.Errorf("protocol nfs is not supported for volume cloning")
}
Expand All @@ -994,18 +1003,12 @@ func (d *Driver) copyFileShare(req *csi.CreateVolumeRequest, accountKey string,
return fmt.Errorf("srcFileShareName(%s) or dstFileShareName(%s) is empty", srcFileShareName, dstFileShareName)
}

klog.V(2).Infof("generate sas token for account(%s)", accountName)
accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
if genErr != nil {
return genErr
}

timeAfter := time.After(waitForCopyTimeout)
timeTick := time.Tick(waitForCopyInterval)
srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, srcFileShareName, accountSasToken)
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, dstFileShareName, accountSasToken)
srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, srcFileShareName, accountSASToken)
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, dstFileShareName, accountSASToken)

jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName)
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
if jobState == fileutil.AzcopyJobError || jobState == fileutil.AzcopyJobCompleted {
return err
Expand All @@ -1014,14 +1017,30 @@ func (d *Driver) copyFileShare(req *csi.CreateVolumeRequest, accountKey string,
for {
select {
case <-timeTick:
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName)
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case fileutil.AzcopyJobError, fileutil.AzcopyJobCompleted:
return err
case fileutil.AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
out, copyErr := cmd.CombinedOutput()
if accountSASToken == "" && strings.Contains(string(out), authorizationPermissionMismatch) && copyErr != nil {
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage File Data SMB Share Elevated Contributor\" role to controller identity, fall back to use sas token, original output: %v", string(out))
d.azcopySasTokenCache.Set(accountName, "")
var sasToken string
if sasToken, _, err = d.getAzcopyAuth(ctx, accountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true); err != nil {
return err
}
cmd := exec.Command("azcopy", "copy", srcPath+sasToken, dstPath+sasToken)
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
out, copyErr = cmd.CombinedOutput()
}
if copyErr != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstFileShareName, copyErr, string(out))
} else {
Expand Down
92 changes: 86 additions & 6 deletions pkg/azurefile/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ const (
privateEndpoint = "privateendpoint"
snapshotTimeFormat = "2006-01-02T15:04:05.0000000Z07:00"
snapshotsExpand = "snapshots"

azcopyAutoLoginType = "AZCOPY_AUTO_LOGIN_TYPE"
azcopySPAApplicationID = "AZCOPY_SPA_APPLICATION_ID"
azcopySPAClientSecret = "AZCOPY_SPA_CLIENT_SECRET"
azcopyTenantID = "AZCOPY_TENANT_ID"
azcopyMSIClientID = "AZCOPY_MSI_CLIENT_ID"
MSI = "MSI"
SPN = "SPN"
authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
)

var (
Expand Down Expand Up @@ -108,7 +117,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
// logging the job status if it's volume cloning
if req.GetVolumeContentSource() != nil {
jobState, percent, err := d.azcopy.GetAzcopyJob(volName)
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
}
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
Expand Down Expand Up @@ -571,11 +580,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
return nil, status.Errorf(codes.Internal, "failed to create file share(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d), error: %v", validFileShareName, account, sku, subsID, resourceGroup, location, fileShareSize, err)
}
if req.GetVolumeContentSource() != nil {
accountKeyCopy, err := d.GetStorageAccesskey(ctx, accountOptions, req.GetSecrets(), secretName, secretNamespace)
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secret, secretName, secretNamespace, false)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
if err := d.copyVolume(req, accountKeyCopy, shareOptions, storageEndpointSuffix); err != nil {
if err := d.copyVolume(ctx, req, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secret, shareOptions, accountOptions, storageEndpointSuffix); err != nil {
return nil, err
}
// storeAccountKey is not needed here since copy volume is only using SAS token
Expand Down Expand Up @@ -726,13 +735,13 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
}

// copyVolume copy an azure file
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountKey string, shareOptions *fileclient.ShareOptions, storageEndpointSuffix string) error {
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, shareOptions *fileclient.ShareOptions, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
case *csi.VolumeContentSource_Volume:
return d.copyFileShare(req, accountKey, shareOptions, storageEndpointSuffix)
return d.copyFileShare(ctx, req, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secrets, shareOptions, accountOptions, storageEndpointSuffix)
default:
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
}
Expand Down Expand Up @@ -1300,6 +1309,77 @@ func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
return nil
}

func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
azureAuthConfig := d.cloud.Config.AzureAuthConfig
var authAzcopyEnv []string
if azureAuthConfig.UseManagedIdentityExtension {
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
} else {
klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
}
return authAzcopyEnv, nil
}
if len(azureAuthConfig.AADClientSecret) > 0 {
klog.V(2).Infof("use service principal to authorize azcopy")
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
}
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))

return authAzcopyEnv, nil
}
return []string{}, fmt.Errorf("neither the service principal nor the managed identity has been set")
}

// getAzcopyAuth will only generate sas token for azcopy in following conditions:
// 1. secrets is not empty
// 2. driver is not using managed identity and service principal
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
// 4. parameter useSasToken is true
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string, useSasToken bool) (string, []string, error) {
var authAzcopyEnv []string
if !useSasToken && len(secrets) == 0 && len(secretName) == 0 {
var err error
authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
if err != nil {
klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
} else {
if len(authAzcopyEnv) > 0 {
// search in cache first
cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault)
if err != nil {
return "", nil, fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
}
if cache != nil {
klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
useSasToken = true
}
}
}
}

if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
var err error
if accountKey == "" {
if accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
return "", nil, err
}
}
klog.V(2).Infof("generate sas token for account(%s)", accountName)
sasToken, err := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
return sasToken, nil, err
}
return "", authAzcopyEnv, nil
}

// generateSASToken generate a sas token for storage account
func generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
credential, err := service.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
Expand Down
Loading
Loading