Skip to content

Commit 4208890

Browse files
committed
Move waiting for tasks to separate phase to unblock process
Issue: When the Forklift creates the snapshot we need to wait for the task to finish. Right now we are using task.WaitForResult, this causes the whole process to wait for the snapshot creation and blocks other VM migrations. Same problem we have with snapshot removal, if the ESXi host is busy and we start the snapshot removal the snapshots can take longer than the reconcile cycle (3s) and we can fail due to it. Fix: Instead of using the task.WaitForResult the forklift will start querying for the latest tasks per VM, by default it's 10 tasks. This querying will be done in a separate phase then the creation/deletion. So we will have WaitFor phases for each of the object manipulations. We find the specific task for the creation/deletion and check its status. This has the advantage that we are not only getting the status of the task but in addition also the results of the task, so we can propagate them to the user, in case the creation/deletion fails. Ref: - https://issues.redhat.com/browse/MTV-1753 - https://issues.redhat.com/browse/MTV-1775 Signed-off-by: Martin Necas <mnecas@redhat.com>
1 parent 7c33864 commit 4208890

File tree

11 files changed

+136
-73
lines changed

11 files changed

+136
-73
lines changed

operator/roles/forkliftcontroller/defaults/main.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ controller_snapshot_removal_timeout_minuts: 120
3535
controller_snapshot_status_check_rate_seconds: 10
3636
controller_cleanup_retries: 10
3737
controller_dv_status_check_retries: 10
38-
controller_snapshot_removal_check_retries: 20
3938
controller_vsphere_incremental_backup: true
4039
controller_ovirt_warm_migration: true
4140
controller_retain_precopy_importer_pods: false

operator/roles/forkliftcontroller/templates/controller/deployment-controller.yml.j2

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,6 @@ spec:
8585
- name: DV_STATUS_CHECK_RETRIES
8686
value: "{{ controller_dv_status_check_retries }}"
8787
{% endif %}
88-
{% if controller_snapshot_removal_check_retries is number %}
89-
- name: SNAPSHOT_REMOVAL_CHECK_RETRIES
90-
value: "{{ controller_snapshot_removal_check_retries }}"
91-
{% endif %}
9288
{% if controller_max_vm_inflight is number %}
9389
- name: MAX_VM_INFLIGHT
9490
value: "{{ controller_max_vm_inflight }}"

pkg/controller/plan/adapter/base/doc.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ type Client interface {
108108
// Remove a snapshot.
109109
RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) error
110110
// Check if a snapshot is ready to transfer.
111-
CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error)
111+
CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error)
112+
// Check if a snapshot is removed.
113+
CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (ready bool, err error)
112114
// Set DataVolume checkpoints.
113115
SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error)
114116
// Close connections to the provider API.

pkg/controller/plan/adapter/ocp/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,12 @@ type Client struct {
2525
}
2626

2727
// CheckSnapshotReady implements base.Client
28-
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (bool, error) {
28+
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
29+
return
30+
}
31+
32+
// CheckSnapshotRemoved implements base.Client
33+
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
2934
return false, nil
3035
}
3136

pkg/controller/plan/adapter/openstack/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,15 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (imageI
115115
}
116116

117117
// Check if a snapshot is ready to transfer.
118-
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, imageID string) (ready bool, err error) {
118+
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
119119
return
120120
}
121121

122+
// CheckSnapshotRemoved implements base.Client
123+
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
124+
return false, nil
125+
}
126+
122127
// Set DataVolume checkpoints.
123128
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) error {
124129
return nil

pkg/controller/plan/adapter/ova/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,15 @@ func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc uti
6161
}
6262

6363
// Check if a snapshot is ready to transfer, to avoid importer restarts.
64-
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
64+
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
6565
return
6666
}
6767

68+
// CheckSnapshotRemoved implements base.Client
69+
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
70+
return false, nil
71+
}
72+
6873
// Set DataVolume checkpoints.
6974
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error) {
7075
return

pkg/controller/plan/adapter/ovirt/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapsh
7575
}
7676

7777
// Check if a snapshot is ready to transfer, to avoid importer restarts.
78-
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
78+
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
7979
correlationID, err := r.getSnapshotCorrelationID(vmRef, &snapshot)
8080
if err != nil {
8181
err = liberr.Wrap(err)
@@ -104,6 +104,11 @@ func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool,
104104
return
105105
}
106106

107+
// CheckSnapshotRemoved implements base.Client
108+
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
109+
return false, nil
110+
}
111+
107112
// Remove a VM snapshot. No-op for this provider.
108113
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
109114
return

pkg/controller/plan/adapter/vsphere/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ go_library(
3636
"//vendor/github.com/vmware/govmomi/find",
3737
"//vendor/github.com/vmware/govmomi/object",
3838
"//vendor/github.com/vmware/govmomi/session",
39+
"//vendor/github.com/vmware/govmomi/task",
3940
"//vendor/github.com/vmware/govmomi/vim25",
4041
"//vendor/github.com/vmware/govmomi/vim25/mo",
4142
"//vendor/github.com/vmware/govmomi/vim25/soap",

pkg/controller/plan/adapter/vsphere/client.go

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/vmware/govmomi"
1717
"github.com/vmware/govmomi/object"
1818
"github.com/vmware/govmomi/session"
19+
"github.com/vmware/govmomi/task"
1920
"github.com/vmware/govmomi/vim25"
2021
"github.com/vmware/govmomi/vim25/mo"
2122
"github.com/vmware/govmomi/vim25/soap"
@@ -27,8 +28,11 @@ import (
2728
)
2829

2930
const (
30-
snapshotName = "forklift-migration-precopy"
31-
snapshotDesc = "Forklift Operator warm migration precopy"
31+
snapshotName = "forklift-migration-precopy"
32+
snapshotDesc = "Forklift Operator warm migration precopy"
33+
VirtualMachine = "VirtualMachine"
34+
CreateSnapshotTask = "CreateSnapshot_Task"
35+
RemoveSnapshotTask = "RemoveSnapshot_Task"
3236
)
3337

3438
// vSphere VM Client
@@ -39,9 +43,9 @@ type Client struct {
3943
}
4044

4145
// Create a VM snapshot and return its ID.
42-
func (r *Client) CreateSnapshot(vmRef ref.Ref, hosts util.HostsFunc) (id string, err error) {
46+
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (id string, err error) {
4347
r.Log.V(1).Info("Creating snapshot", "vmRef", vmRef)
44-
vm, err := r.getVM(vmRef, hosts)
48+
vm, err := r.getVM(vmRef, hostsFunc)
4549
if err != nil {
4650
return
4751
}
@@ -50,28 +54,82 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hosts util.HostsFunc) (id string,
5054
err = liberr.Wrap(err)
5155
return
5256
}
53-
res, err := task.WaitForResult(context.TODO(), nil)
57+
return task.Common.Reference().Value, nil
58+
}
59+
60+
// Check if a snapshot is ready to transfer.
61+
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
62+
taskInfo, err := r.getLatestTaskByName(vmRef, CreateSnapshotTask)
5463
if err != nil {
55-
err = liberr.Wrap(err)
56-
return
64+
return false, "", liberr.Wrap(err)
5765
}
58-
id = res.Result.(types.ManagedObjectReference).Value
59-
r.Log.Info("Created snapshot", "vmRef", vmRef, "id", id)
66+
ready, err = r.checkTaskStatus(taskInfo)
67+
if err != nil {
68+
return false, "", liberr.Wrap(err)
69+
}
70+
if ready {
71+
return true, taskInfo.Result.(types.ManagedObjectReference).Value, nil
72+
} else {
73+
// The snapshot is not ready, retry the check
74+
return false, "", nil
75+
}
76+
}
6077

61-
return
78+
// Check if a snapshot is removed.
79+
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (ready bool, err error) {
80+
taskInfo, err := r.getLatestTaskByName(vmRef, RemoveSnapshotTask)
81+
if err != nil {
82+
return false, liberr.Wrap(err)
83+
}
84+
return r.checkTaskStatus(taskInfo)
6285
}
6386

64-
// Check if a snapshot is ready to transfer.
65-
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
66-
return true, nil
87+
func (r *Client) checkTaskStatus(taskInfo *types.TaskInfo) (ready bool, err error) {
88+
r.Log.Info("Snapshot task", "task", taskInfo.Task.Value, "name", taskInfo.Name, "status", taskInfo.State)
89+
switch taskInfo.State {
90+
case types.TaskInfoStateSuccess:
91+
return true, nil
92+
case types.TaskInfoStateError:
93+
return false, fmt.Errorf(taskInfo.Error.LocalizedMessage)
94+
default:
95+
return false, nil
96+
}
97+
}
98+
99+
func (r *Client) getLatestTaskByName(vmRef ref.Ref, taskName string) (*types.TaskInfo, error) {
100+
taskManager := task.NewManager(r.client.Client)
101+
taskCollector, err := taskManager.CreateCollectorForTasks(context.TODO(), types.TaskFilterSpec{
102+
Entity: &types.TaskFilterSpecByEntity{
103+
Entity: types.ManagedObjectReference{
104+
Type: VirtualMachine,
105+
Value: vmRef.ID,
106+
},
107+
Recursion: types.TaskFilterSpecRecursionOptionSelf,
108+
},
109+
})
110+
if err != nil {
111+
return nil, err
112+
}
113+
//nolint:errcheck
114+
defer taskCollector.Destroy(context.Background())
115+
tasks, err := taskCollector.LatestPage(context.TODO())
116+
if err != nil {
117+
return nil, err
118+
}
119+
for _, taskInfo := range tasks {
120+
if taskInfo.Name == taskName {
121+
return &taskInfo, nil
122+
}
123+
}
124+
return nil, fmt.Errorf("no task found with name %s, vmRef %v", taskName, vmRef)
67125
}
68126

69127
// Remove a VM snapshot.
70-
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.HostsFunc) (err error) {
128+
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
71129
r.Log.V(1).Info("RemoveSnapshot",
72130
"vmRef", vmRef,
73131
"snapshot", snapshot)
74-
err = r.removeSnapshot(vmRef, snapshot, false, hosts)
132+
err = r.removeSnapshot(vmRef, snapshot, false, hostsFunc)
75133
return
76134
}
77135

pkg/controller/plan/migration.go

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ const (
108108
TransferCompleted = "Transfer completed."
109109
PopulatorPodPrefix = "populate-"
110110
DvStatusCheckRetriesAnnotation = "dvStatusCheckRetries"
111-
SnapshotRemovalCheckRetries = "snapshotRemovalCheckRetries"
112111
)
113112

114113
var (
@@ -1027,27 +1026,15 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) {
10271026
vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm)))
10281027
break
10291028
}
1030-
// FIXME: This is just temporary timeout to unblock the migrations which get stuck on issue https://issues.redhat.com/browse/MTV-1753
1031-
// This should be fixed properly by adding the task manager inside the inventory and monitor the task status
1032-
// from the main controller.
1033-
var retries int
1034-
retriesAnnotation := step.Annotations[SnapshotRemovalCheckRetries]
1035-
if retriesAnnotation == "" {
1036-
step.Annotations[SnapshotRemovalCheckRetries] = "1"
1037-
} else {
1038-
retries, err = strconv.Atoi(retriesAnnotation)
1039-
if err != nil {
1040-
step.AddError(err.Error())
1041-
err = nil
1042-
break
1043-
}
1044-
if retries >= settings.Settings.SnapshotRemovalCheckRetries {
1045-
vm.Phase = r.next(vm.Phase)
1046-
// Reset for next precopy
1047-
step.Annotations[SnapshotRemovalCheckRetries] = "1"
1048-
} else {
1049-
step.Annotations[SnapshotRemovalCheckRetries] = strconv.Itoa(retries + 1)
1050-
}
1029+
snapshot := vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot
1030+
ready, err := r.provider.CheckSnapshotRemoved(vm.Ref, snapshot)
1031+
if err != nil {
1032+
step.AddError(err.Error())
1033+
err = nil
1034+
break
1035+
}
1036+
if ready {
1037+
vm.Phase = r.next(vm.Phase)
10511038
}
10521039
case CreateInitialSnapshot, CreateSnapshot, CreateFinalSnapshot:
10531040
step, found := vm.FindStep(r.step(vm))
@@ -1076,12 +1063,18 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) {
10761063
break
10771064
}
10781065
snapshot := vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot
1079-
ready, err := r.provider.CheckSnapshotReady(vm.Ref, snapshot)
1066+
ready, snapshotId, err := r.provider.CheckSnapshotReady(vm.Ref, snapshot)
10801067
if err != nil {
10811068
step.AddError(err.Error())
10821069
err = nil
10831070
break
10841071
}
1072+
// If the provider does not directly create the snapshot, but we need to wait for the snapshot to be created
1073+
// We start the creation task in CreateSnapshot, set the task ID as a snapshot id which needs to be replaced
1074+
// by the snapshot id after the task finishes.
1075+
if snapshotId != "" {
1076+
vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot = snapshotId
1077+
}
10851078
if ready {
10861079
vm.Phase = r.next(vm.Phase)
10871080
}

pkg/settings/migration.go

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,25 @@ import (
1212

1313
// Environment variables.
1414
const (
15-
MaxVmInFlight = "MAX_VM_INFLIGHT"
16-
HookRetry = "HOOK_RETRY"
17-
ImporterRetry = "IMPORTER_RETRY"
18-
VirtV2vImage = "VIRT_V2V_IMAGE"
19-
PrecopyInterval = "PRECOPY_INTERVAL"
20-
VirtV2vDontRequestKVM = "VIRT_V2V_DONT_REQUEST_KVM"
21-
SnapshotRemovalTimeout = "SNAPSHOT_REMOVAL_TIMEOUT"
22-
SnapshotStatusCheckRate = "SNAPSHOT_STATUS_CHECK_RATE"
23-
CDIExportTokenTTL = "CDI_EXPORT_TOKEN_TTL"
24-
FileSystemOverhead = "FILESYSTEM_OVERHEAD"
25-
BlockOverhead = "BLOCK_OVERHEAD"
26-
CleanupRetries = "CLEANUP_RETRIES"
27-
DvStatusCheckRetries = "DV_STATUS_CHECK_RETRIES"
28-
SnapshotRemovalCheckRetries = "SNAPSHOT_REMOVAL_CHECK_RETRIES"
29-
OvirtOsConfigMap = "OVIRT_OS_MAP"
30-
VsphereOsConfigMap = "VSPHERE_OS_MAP"
31-
VirtCustomizeConfigMap = "VIRT_CUSTOMIZE_MAP"
32-
VddkJobActiveDeadline = "VDDK_JOB_ACTIVE_DEADLINE"
33-
VirtV2vExtraArgs = "VIRT_V2V_EXTRA_ARGS"
34-
VirtV2vExtraConfConfigMap = "VIRT_V2V_EXTRA_CONF_CONFIG_MAP"
15+
MaxVmInFlight = "MAX_VM_INFLIGHT"
16+
HookRetry = "HOOK_RETRY"
17+
ImporterRetry = "IMPORTER_RETRY"
18+
VirtV2vImage = "VIRT_V2V_IMAGE"
19+
PrecopyInterval = "PRECOPY_INTERVAL"
20+
VirtV2vDontRequestKVM = "VIRT_V2V_DONT_REQUEST_KVM"
21+
SnapshotRemovalTimeout = "SNAPSHOT_REMOVAL_TIMEOUT"
22+
SnapshotStatusCheckRate = "SNAPSHOT_STATUS_CHECK_RATE"
23+
CDIExportTokenTTL = "CDI_EXPORT_TOKEN_TTL"
24+
FileSystemOverhead = "FILESYSTEM_OVERHEAD"
25+
BlockOverhead = "BLOCK_OVERHEAD"
26+
CleanupRetries = "CLEANUP_RETRIES"
27+
DvStatusCheckRetries = "DV_STATUS_CHECK_RETRIES"
28+
OvirtOsConfigMap = "OVIRT_OS_MAP"
29+
VsphereOsConfigMap = "VSPHERE_OS_MAP"
30+
VirtCustomizeConfigMap = "VIRT_CUSTOMIZE_MAP"
31+
VddkJobActiveDeadline = "VDDK_JOB_ACTIVE_DEADLINE"
32+
VirtV2vExtraArgs = "VIRT_V2V_EXTRA_ARGS"
33+
VirtV2vExtraConfConfigMap = "VIRT_V2V_EXTRA_CONF_CONFIG_MAP"
3534
)
3635

3736
// Migration settings
@@ -62,8 +61,6 @@ type Migration struct {
6261
CleanupRetries int
6362
// DvStatusCheckRetries retries
6463
DvStatusCheckRetries int
65-
// SnapshotRemovalCheckRetries retries
66-
SnapshotRemovalCheckRetries int
6764
// oVirt OS config map name
6865
OvirtOsConfigMap string
6966
// vSphere OS config map name
@@ -109,9 +106,6 @@ func (r *Migration) Load() (err error) {
109106
if r.DvStatusCheckRetries, err = getPositiveEnvLimit(DvStatusCheckRetries, 10); err != nil {
110107
return liberr.Wrap(err)
111108
}
112-
if r.SnapshotRemovalCheckRetries, err = getPositiveEnvLimit(SnapshotRemovalCheckRetries, 20); err != nil {
113-
return liberr.Wrap(err)
114-
}
115109
if virtV2vImage, ok := os.LookupEnv(VirtV2vImage); ok {
116110
r.VirtV2vImage = virtV2vImage
117111
} else if Settings.Role.Has(MainRole) {

0 commit comments

Comments
 (0)