Skip to content

Commit

Permalink
use cluster identity for azcopy
Browse files Browse the repository at this point in the history
  • Loading branch information
umagnus committed Jan 2, 2024
1 parent b0f57ac commit 6d81809
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 36 deletions.
2 changes: 1 addition & 1 deletion deploy/example/cloning/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ outfile
## Create a PVC from an existing PVC
> Make sure application is not writing data to source blob container
```console
kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/blob-csi-driver/master/deploy/example/cloning/pvc-blob-cloning.yaml
kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/blob-csi-driver/master/deploy/example/cloning/pvc-blob-csi-cloning.yaml
```
### Check the Creation Status

Expand Down
Empty file modified hack/update-mock.sh
100644 → 100755
Empty file.
105 changes: 89 additions & 16 deletions pkg/blob/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
Expand Down Expand Up @@ -49,6 +50,15 @@ import (
const (
privateEndpoint = "privateendpoint"

azcopyAutoLoginType = "AZCOPY_AUTO_LOGIN_TYPE"
azcopySPAApplicationID = "AZCOPY_SPA_APPLICATION_ID"
azcopySPAClientSecret = "AZCOPY_SPA_CLIENT_SECRET"
azcopyTenantID = "AZCOPY_TENANT_ID"
azcopyMSIClientID = "AZCOPY_MSI_CLIENT_ID"
MSI = "MSI"
SPN = "SPN"
authorizationPermissionMismatch = "AuthorizationPermissionMismatch"

waitForCopyInterval = 5 * time.Second
waitForCopyTimeout = 3 * time.Minute
)
Expand All @@ -72,7 +82,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
// logging the job status if it's volume cloning
if req.GetVolumeContentSource() != nil {
jobState, percent, err := d.azcopy.GetAzcopyJob(volName)
jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
}
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
Expand Down Expand Up @@ -411,12 +421,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}

if req.GetVolumeContentSource() != nil {
if accountKey == "" {
if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
var accountSASToken string
if accountSASToken, err = d.getSASToken(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
return nil, status.Errorf(codes.Internal, "failed to getSASToken on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
if err := d.copyVolume(ctx, req, accountKey, validContainerName, storageEndpointSuffix); err != nil {
if err := d.copyVolume(ctx, req, accountSASToken, validContainerName, storageEndpointSuffix); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -711,7 +720,7 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
}

// CopyBlobContainer copies a blob container in the same storage account
func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeRequest, accountSasToken, dstContainerName, storageEndpointSuffix string) error {
var sourceVolumeID string
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
Expand All @@ -725,18 +734,19 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
}

klog.V(2).Infof("generate sas token for account(%s)", accountName)
accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
if genErr != nil {
return genErr
var authAzcopyEnv []string
if accountSasToken == "" {
if authAzcopyEnv, err = d.authorizeAzcopyWithIdentity(); err != nil {
return err
}
}

timeAfter := time.After(waitForCopyTimeout)
timeTick := time.Tick(waitForCopyInterval)
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)

jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
return err
Expand All @@ -745,14 +755,18 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
for {
select {
case <-timeTick:
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case util.AzcopyJobError, util.AzcopyJobCompleted:
return err
case util.AzcopyJobNotFound:
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
out, copyErr := cmd.CombinedOutput()
if copyErr != nil {
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
} else {
Expand All @@ -767,18 +781,77 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
}

// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountSASToken, dstContainerName, storageEndpointSuffix string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
case *csi.VolumeContentSource_Volume:
return d.copyBlobContainer(ctx, req, accountKey, dstContainerName, storageEndpointSuffix)
return d.copyBlobContainer(ctx, req, accountSASToken, dstContainerName, storageEndpointSuffix)
default:
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
}
}

func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
azureAuthConfig := d.cloud.Config.AzureAuthConfig
var authAzcopyEnv []string
if azureAuthConfig.UseManagedIdentityExtension {
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
} else {
klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
}
return authAzcopyEnv, nil
}
if len(azureAuthConfig.AADClientSecret) > 0 {
klog.V(2).Infof("use service principal to authorize azcopy")
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
}
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))

return authAzcopyEnv, nil
}
return []string{}, fmt.Errorf("service principle or managed identity are both not set")
}

// getSASToken will only generate sas token for azcopy in following conditions:
// 1. secrets is not empty
// 2. driver is not using managed identity and service principal
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
func (d *Driver) getSASToken(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, error) {
authAzcopyEnv, _ := d.authorizeAzcopyWithIdentity()
useSasTokenFallBack := false
if len(authAzcopyEnv) > 0 {
out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
if testErr != nil {
return "", fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
}
if strings.Contains(out, authorizationPermissionMismatch) {
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original output: %v", out)
useSasTokenFallBack = true
}
}
if len(secrets) > 0 || len(authAzcopyEnv) == 0 || useSasTokenFallBack {
var err error
if accountKey == "" {
if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
return "", err
}
}
klog.V(2).Infof("generate sas token for account(%s)", accountName)
return generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
}
return "", nil
}

// isValidVolumeCapabilities validates the given VolumeCapability array is valid
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
if len(volCaps) == 0 {
Expand Down
Loading

0 comments on commit 6d81809

Please sign in to comment.