Skip to content

Commit

Permalink
use cluster identity for azcopy
Browse files Browse the repository at this point in the history
  • Loading branch information
umagnus committed Dec 8, 2023
1 parent 75508df commit ec2f7a0
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
4 changes: 2 additions & 2 deletions pkg/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ const (
storageSPNClientIDField = "azurestoragespnclientid"
storageSPNTenantIDField = "azurestoragespntenantid"
storageAuthTypeField = "azurestorageauthtype"
storageIentityClientIDField = "azurestorageidentityclientid"
storageIdentityClientIDField = "azurestorageidentityclientid"
storageIdentityObjectIDField = "azurestorageidentityobjectid"
storageIdentityResourceIDField = "azurestorageidentityresourceid"
msiEndpointField = "msiendpoint"
Expand Down Expand Up @@ -415,7 +415,7 @@ func (d *Driver) GetAuthEnv(ctx context.Context, volumeID, protocol string, attr
case storageAuthTypeField:
azureStorageAuthType = v
authEnv = append(authEnv, "AZURE_STORAGE_AUTH_TYPE="+v)
case storageIentityClientIDField:
case storageIdentityClientIDField:
authEnv = append(authEnv, "AZURE_STORAGE_IDENTITY_CLIENT_ID="+v)
case storageIdentityObjectIDField:
authEnv = append(authEnv, "AZURE_STORAGE_IDENTITY_OBJECT_ID="+v)
Expand Down
34 changes: 25 additions & 9 deletions pkg/blob/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
var matchTags, useDataPlaneAPI, getLatestAccountKey bool
var softDeleteBlobs, softDeleteContainers int32
var vnetResourceIDs []string
var storageIdentityClientID string
var err error
// set allowBlobPublicAccess as false by default
allowBlobPublicAccess := pointer.Bool(false)
Expand Down Expand Up @@ -174,7 +175,8 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
containerNameReplaceMap[pvNameMetadata] = v
case serverNameField:
case storageAuthTypeField:
case storageIentityClientIDField:
case storageIdentityClientIDField:
storageIdentityClientID = v
case storageIdentityObjectIDField:
case storageIdentityResourceIDField:
case msiEndpointField:
Expand Down Expand Up @@ -416,7 +418,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
}
if err := d.copyVolume(ctx, req, accountKey, validContainerName, storageEndpointSuffix); err != nil {
if err := d.copyVolume(ctx, req, accountKey, validContainerName, storageEndpointSuffix, storageIdentityClientID); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -711,7 +713,7 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
}

// CopyBlobContainer copies a blob container in the same storage account
func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix, storageIdentityClientID string) error {
var sourceVolumeID string
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
Expand All @@ -725,10 +727,13 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
}

klog.V(2).Infof("generate sas token for account(%s)", accountName)
accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
if genErr != nil {
return genErr
var accountSasToken string
if storageIdentityClientID == "" {
klog.V(2).Infof("generate sas token for account(%s)", accountName)
accountSasToken, err = generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
if err != nil {
return err
}
}

timeAfter := time.After(waitForCopyTimeout)
Expand All @@ -752,6 +757,17 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
return err
case util.AzcopyJobNotFound:
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
if storageIdentityClientID != "" {
klog.V(2).Infof("use msi client id to authorize azcopy")
_, err = exec.Command("export", "AZCOPY_AUTO_LOGIN_TYPE=MSI").CombinedOutput()
if err != nil {
return err
}
_, err = exec.Command("export", fmt.Sprintf("AZCOPY_MSI_CLIENT_ID=%s", storageIdentityClientID)).CombinedOutput()
if err != nil {
return err
}
}
out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
if copyErr != nil {
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
Expand All @@ -767,13 +783,13 @@ func (d *Driver) copyBlobContainer(_ context.Context, req *csi.CreateVolumeReque
}

// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix, storageIdentityClientID string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
case *csi.VolumeContentSource_Volume:
return d.copyBlobContainer(ctx, req, accountKey, dstContainerName, storageEndpointSuffix)
return d.copyBlobContainer(ctx, req, accountKey, dstContainerName, storageEndpointSuffix, storageIdentityClientID)
default:
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/blob/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func TestCreateVolume(t *testing.T) {
mp[containerNameField] = "unit-test"
mp[mountPermissionsField] = "0750"
mp[storageAuthTypeField] = "msi"
mp[storageIentityClientIDField] = "msi"
mp[storageIdentityClientIDField] = "msi"
mp[storageIdentityObjectIDField] = "msi"
mp[storageIdentityResourceIDField] = "msi"
mp[msiEndpointField] = "msi"
Expand Down Expand Up @@ -1496,7 +1496,7 @@ func TestCopyVolume(t *testing.T) {
ctx := context.Background()

expectedErr := status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
err := d.copyVolume(ctx, req, "", "", "core.windows.net")
err := d.copyVolume(ctx, req, "", "", "core.windows.net", "")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1528,7 +1528,7 @@ func TestCopyVolume(t *testing.T) {
ctx := context.Background()

expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #")
err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net")
err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net", "")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1560,7 +1560,7 @@ func TestCopyVolume(t *testing.T) {
ctx := context.Background()

expectedErr := fmt.Errorf("srcContainerName() or dstContainerName(dstContainer) is empty")
err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net")
err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net", "")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1592,7 +1592,7 @@ func TestCopyVolume(t *testing.T) {
ctx := context.Background()

expectedErr := fmt.Errorf("srcContainerName(fileshare) or dstContainerName() is empty")
err := d.copyVolume(ctx, req, "", "", "core.windows.net")
err := d.copyVolume(ctx, req, "", "", "core.windows.net", "")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1636,7 +1636,7 @@ func TestCopyVolume(t *testing.T) {
ctx := context.Background()

var expectedErr error
err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net")
err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net", "")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -1681,7 +1681,7 @@ func TestCopyVolume(t *testing.T) {
ctx := context.Background()

var expectedErr error
err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net")
err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net", "")
if !reflect.DeepEqual(err, expectedErr) {
t.Errorf("Unexpected error: %v", err)
}
Expand Down

0 comments on commit ec2f7a0

Please sign in to comment.