Skip to content

Commit

Permalink
Merge pull request #1761 from andyzhangx/wait-for-azcopy-job-running
Browse files Browse the repository at this point in the history
fix: wait for azcopy job running in volume clone
  • Loading branch information
andyzhangx authored Dec 19, 2024
2 parents b4a82b8 + 76a5a5f commit 77a4c8e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
25 changes: 17 additions & 8 deletions pkg/blob/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,17 @@ func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeReq
case util.AzcopyJobError, util.AzcopyJobCompleted:
return err
case util.AzcopyJobRunning:
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
err = wait.PollImmediate(20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() (bool, error) {
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
if err != nil {
return false, err
}
if jobState == util.AzcopyJobRunning {
return false, nil
}
return true, nil
})
case util.AzcopyJobNotFound:
klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
execFunc := func() error {
Expand All @@ -819,13 +829,12 @@ func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeReq
_, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
}
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
if copyErr != nil {
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, copyErr)
} else {
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
}
return copyErr
err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
}
if err != nil {
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, err)
} else {
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
}
return err
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/blob/controllerserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,7 @@ func TestCopyVolume(t *testing.T) {
name: "azcopy job is in progress",
testFunc: func(t *testing.T) {
ctx := context.Background()
accountOptions := azure.AccountOptions{}
d := NewFakeDriver()
mp := map[string]string{}

Expand All @@ -1761,14 +1762,14 @@ func TestCopyVolume(t *testing.T) {

m := util.NewMockEXEC(ctrl)
listStr1 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false"
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr1, nil).Times(1)
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstBlobContainer -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil)
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr1, nil).AnyTimes()
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstBlobContainer -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil).AnyTimes()

d.azcopy.ExecCmd = m
d.waitForAzCopyTimeoutMinutes = 1

expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%")
err := d.copyVolume(ctx, req, "", "sastoken", nil, "dstContainer", "", nil, "core.windows.net")
if !reflect.DeepEqual(err, expectedErr) {
err := d.copyVolume(ctx, req, "", "sastoken", nil, "dstContainer", "", &accountOptions, "core.windows.net")
if !reflect.DeepEqual(err, wait.ErrWaitTimeout) {
t.Errorf("Unexpected error: %v", err)
}
},
Expand Down

0 comments on commit 77a4c8e

Please sign in to comment.