From a7adb513e6d963aa4e6eb8f3e571bd19d64dcaea Mon Sep 17 00:00:00 2001 From: andyzhangx Date: Wed, 27 Dec 2023 13:15:33 +0000 Subject: [PATCH] fix: reduce mount lock to avoid volumeID collision issue --- pkg/blob/nodeserver.go | 10 ++++++---- pkg/blob/nodeserver_test.go | 8 ++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/blob/nodeserver.go b/pkg/blob/nodeserver.go index e92ca3bfb..11b2d8657 100644 --- a/pkg/blob/nodeserver.go +++ b/pkg/blob/nodeserver.go @@ -229,10 +229,11 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe return nil, status.Error(codes.InvalidArgument, "Volume capability not provided") } - if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired { + lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath) + if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired { return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID) } - defer d.volumeLocks.Release(volumeID) + defer d.volumeLocks.Release(lockKey) mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() attrib := req.GetVolumeContext() @@ -437,10 +438,11 @@ func (d *Driver) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolume return nil, status.Error(codes.InvalidArgument, "Staging target not provided") } - if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired { + lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath) + if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired { return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID) } - defer d.volumeLocks.Release(volumeID) + defer d.volumeLocks.Release(lockKey) klog.V(2).Infof("NodeUnstageVolume: volume %s unmounting on %s", volumeID, stagingTargetPath) err := mount.CleanupMountPoint(stagingTargetPath, d.mounter, true /*extensiveMountPointCheck*/) diff --git a/pkg/blob/nodeserver_test.go b/pkg/blob/nodeserver_test.go index 8ca86d155..6a4cec9b1 100644 --- a/pkg/blob/nodeserver_test.go +++ b/pkg/blob/nodeserver_test.go @@ -435,8 +435,8 @@ func TestNodeStageVolume(t *testing.T) { VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, } d := NewFakeDriver() - d.volumeLocks.TryAcquire("unit-test") - defer d.volumeLocks.Release("unit-test") + d.volumeLocks.TryAcquire(fmt.Sprintf("%s-%s", "unit-test", "unit-test")) + defer d.volumeLocks.Release(fmt.Sprintf("%s-%s", "unit-test", "unit-test")) _, err := d.NodeStageVolume(context.TODO(), req) expectedErr := status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "unit-test")) if !reflect.DeepEqual(err, expectedErr) { @@ -606,8 +606,8 @@ func TestNodeUnstageVolume(t *testing.T) { VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, } d := NewFakeDriver() - d.volumeLocks.TryAcquire("unit-test") - defer d.volumeLocks.Release("unit-test") + d.volumeLocks.TryAcquire(fmt.Sprintf("%s-%s", "unit-test", "unit-test")) + defer d.volumeLocks.Release(fmt.Sprintf("%s-%s", "unit-test", "unit-test")) _, err := d.NodeStageVolume(context.TODO(), req) expectedErr := status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "unit-test")) if !reflect.DeepEqual(err, expectedErr) {