diff --git a/operator/config/crd/bases/forklift.konveyor.io_migrations.yaml b/operator/config/crd/bases/forklift.konveyor.io_migrations.yaml index d76178348..b5d5771d3 100644 --- a/operator/config/crd/bases/forklift.konveyor.io_migrations.yaml +++ b/operator/config/crd/bases/forklift.konveyor.io_migrations.yaml @@ -539,6 +539,8 @@ spec: items: description: Precopy durations properties: + createTaskId: + type: string deltas: items: properties: @@ -554,6 +556,8 @@ spec: end: format: date-time type: string + removeTaskId: + type: string snapshot: type: string start: diff --git a/operator/config/crd/bases/forklift.konveyor.io_plans.yaml b/operator/config/crd/bases/forklift.konveyor.io_plans.yaml index 2ac95b87d..7ddf1136a 100644 --- a/operator/config/crd/bases/forklift.konveyor.io_plans.yaml +++ b/operator/config/crd/bases/forklift.konveyor.io_plans.yaml @@ -1051,6 +1051,8 @@ spec: items: description: Precopy durations properties: + createTaskId: + type: string deltas: items: properties: @@ -1066,6 +1068,8 @@ spec: end: format: date-time type: string + removeTaskId: + type: string snapshot: type: string start: diff --git a/pkg/apis/forklift/v1beta1/plan/vm.go b/pkg/apis/forklift/v1beta1/plan/vm.go index 65d6b37c3..8b3fdf000 100644 --- a/pkg/apis/forklift/v1beta1/plan/vm.go +++ b/pkg/apis/forklift/v1beta1/plan/vm.go @@ -98,10 +98,12 @@ const ( // Precopy durations type Precopy struct { - Start *meta.Time `json:"start,omitempty"` - End *meta.Time `json:"end,omitempty"` - Snapshot string `json:"snapshot,omitempty"` - Deltas []DiskDelta `json:"deltas,omitempty"` + Start *meta.Time `json:"start,omitempty"` + End *meta.Time `json:"end,omitempty"` + Snapshot string `json:"snapshot,omitempty"` + CreateTaskId string `json:"createTaskId,omitempty"` + RemoveTaskId string `json:"removeTaskId,omitempty"` + Deltas []DiskDelta `json:"deltas,omitempty"` } func (r *Precopy) WithDeltas(deltas map[string]string) { diff --git a/pkg/controller/plan/adapter/base/doc.go b/pkg/controller/plan/adapter/base/doc.go index 0900f3397..b57ae58ef 100644 --- a/pkg/controller/plan/adapter/base/doc.go +++ b/pkg/controller/plan/adapter/base/doc.go @@ -104,11 +104,13 @@ type Client interface { // Return whether the source VM is powered off. PoweredOff(vmRef ref.Ref) (bool, error) // Create a snapshot of the source VM. - CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string, error) + CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error) // Remove a snapshot. - RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) error + RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error) // Check if a snapshot is ready to transfer. - CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) + CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error) + // Check if a snapshot is removed. + CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, err error) // Set DataVolume checkpoints. SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error) // Close connections to the provider API. diff --git a/pkg/controller/plan/adapter/ocp/client.go b/pkg/controller/plan/adapter/ocp/client.go index c17cf49a8..afbb24643 100644 --- a/pkg/controller/plan/adapter/ocp/client.go +++ b/pkg/controller/plan/adapter/ocp/client.go @@ -25,7 +25,12 @@ type Client struct { } // CheckSnapshotReady implements base.Client -func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (bool, error) { +func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, string, error) { + return false, "", nil +} + +// CheckSnapshotRemove implements base.Client +func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) { return false, nil } @@ -35,12 +40,12 @@ func (r *Client) Close() { } // CreateSnapshot implements base.Client -func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string, error) { - return "", nil +func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string, string, error) { + return "", "", nil } // Remove a VM snapshot. No-op for this provider. -func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) { +func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error) { return } diff --git a/pkg/controller/plan/adapter/openstack/client.go b/pkg/controller/plan/adapter/openstack/client.go index e8e7a89da..9f6558810 100644 --- a/pkg/controller/plan/adapter/openstack/client.go +++ b/pkg/controller/plan/adapter/openstack/client.go @@ -110,22 +110,27 @@ func (r *Client) PoweredOff(vmRef ref.Ref) (off bool, err error) { } // Create a snapshot of the source VM. -func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (imageID string, err error) { +func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error) { return } // Check if a snapshot is ready to transfer. -func (r *Client) CheckSnapshotReady(vmRef ref.Ref, imageID string) (ready bool, err error) { +func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error) { return } +// CheckSnapshotRemove implements base.Client +func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) { + return false, nil +} + // Set DataVolume checkpoints. func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) error { return nil } // Remove a VM snapshot. No-op for this provider. -func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) { +func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error) { return } diff --git a/pkg/controller/plan/adapter/ova/client.go b/pkg/controller/plan/adapter/ova/client.go index c1ef2838d..c0d9d96a6 100644 --- a/pkg/controller/plan/adapter/ova/client.go +++ b/pkg/controller/plan/adapter/ova/client.go @@ -46,12 +46,12 @@ func (r *Client) connect() (err error) { } // Create a VM snapshot and return its ID. No-op for this provider. -func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshot string, err error) { +func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error) { return } // Remove a VM snapshot. No-op for this provider. -func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) { +func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error) { return } @@ -61,10 +61,15 @@ func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc uti } // Check if a snapshot is ready to transfer, to avoid importer restarts. -func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) { +func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error) { return } +// CheckSnapshotRemove implements base.Client +func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) { + return false, nil +} + // Set DataVolume checkpoints. func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error) { return diff --git a/pkg/controller/plan/adapter/ovirt/client.go b/pkg/controller/plan/adapter/ovirt/client.go index 4a7a7fec5..5cc174166 100644 --- a/pkg/controller/plan/adapter/ovirt/client.go +++ b/pkg/controller/plan/adapter/ovirt/client.go @@ -40,7 +40,7 @@ type Client struct { } // Create a VM snapshot and return its ID. -func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshot string, err error) { +func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error) { _, vmService, err := r.getVM(vmRef) if err != nil { err = liberr.Wrap(err) @@ -70,13 +70,18 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapsh } return } - snapshot = snap.MustSnapshot().MustId() + snapshotId = snap.MustSnapshot().MustId() return } +// CheckSnapshotRemove implements base.Client +func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) { + return false, nil +} + // Check if a snapshot is ready to transfer, to avoid importer restarts. -func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) { - correlationID, err := r.getSnapshotCorrelationID(vmRef, &snapshot) +func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error) { + correlationID, err := r.getSnapshotCorrelationID(vmRef, &precopy.Snapshot) if err != nil { err = liberr.Wrap(err) return @@ -105,7 +110,7 @@ func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, } // Remove a VM snapshot. No-op for this provider. -func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) { +func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error) { return } diff --git a/pkg/controller/plan/adapter/vsphere/BUILD.bazel b/pkg/controller/plan/adapter/vsphere/BUILD.bazel index c744b9e7c..cd15868d9 100644 --- a/pkg/controller/plan/adapter/vsphere/BUILD.bazel +++ b/pkg/controller/plan/adapter/vsphere/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "//vendor/github.com/vmware/govmomi", "//vendor/github.com/vmware/govmomi/find", "//vendor/github.com/vmware/govmomi/object", + "//vendor/github.com/vmware/govmomi/property", "//vendor/github.com/vmware/govmomi/session", "//vendor/github.com/vmware/govmomi/vim25", "//vendor/github.com/vmware/govmomi/vim25/mo", diff --git a/pkg/controller/plan/adapter/vsphere/client.go b/pkg/controller/plan/adapter/vsphere/client.go index a48af1af5..d59edf4fa 100644 --- a/pkg/controller/plan/adapter/vsphere/client.go +++ b/pkg/controller/plan/adapter/vsphere/client.go @@ -15,6 +15,7 @@ import ( liberr "github.com/konveyor/forklift-controller/pkg/lib/error" "github.com/vmware/govmomi" "github.com/vmware/govmomi/object" + "github.com/vmware/govmomi/property" "github.com/vmware/govmomi/session" "github.com/vmware/govmomi/vim25" "github.com/vmware/govmomi/vim25/mo" @@ -29,6 +30,7 @@ import ( const ( snapshotName = "forklift-migration-precopy" snapshotDesc = "Forklift Operator warm migration precopy" + taskType = "Task" ) // vSphere VM Client @@ -39,9 +41,9 @@ type Client struct { } // Create a VM snapshot and return its ID. -func (r *Client) CreateSnapshot(vmRef ref.Ref, hosts util.HostsFunc) (id string, err error) { +func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error) { r.Log.V(1).Info("Creating snapshot", "vmRef", vmRef) - vm, err := r.getVM(vmRef, hosts) + vm, err := r.getVM(vmRef, hostsFunc) if err != nil { return } @@ -50,28 +52,15 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hosts util.HostsFunc) (id string, err = liberr.Wrap(err) return } - res, err := task.WaitForResult(context.TODO(), nil) - if err != nil { - err = liberr.Wrap(err) - return - } - id = res.Result.(types.ManagedObjectReference).Value - r.Log.Info("Created snapshot", "vmRef", vmRef, "id", id) - - return -} - -// Check if a snapshot is ready to transfer. -func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) { - return true, nil + return "", task.Reference().Value, nil } // Remove a VM snapshot. -func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.HostsFunc) (err error) { +func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.HostsFunc) (taskId string, err error) { r.Log.V(1).Info("RemoveSnapshot", "vmRef", vmRef, "snapshot", snapshot) - err = r.removeSnapshot(vmRef, snapshot, false, hosts) + taskId, err = r.removeSnapshot(vmRef, snapshot, false, hosts) return } @@ -247,6 +236,86 @@ func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshotId string, hosts util. return } +// CheckSnapshotRemove implements base.Client +func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) { + taskInfo, err := r.getTaskById(vmRef, precopy.RemoveTaskId, hosts) + if err != nil { + return false, liberr.Wrap(err) + } + return r.checkTaskStatus(taskInfo) +} + +// Check if a snapshot is removed. +func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error) { + taskInfo, err := r.getTaskById(vmRef, precopy.CreateTaskId, hosts) + if err != nil { + return false, snapshotId, liberr.Wrap(err) + } + ready, err = r.checkTaskStatus(taskInfo) + snapshotId = taskInfo.Result.(types.ManagedObjectReference).Value + return +} + +func (r *Client) checkTaskStatus(taskInfo *types.TaskInfo) (ready bool, err error) { + r.Log.Info("Snapshot task", "task", taskInfo.Task.Value, "name", taskInfo.Name, "status", taskInfo.State) + switch taskInfo.State { + case types.TaskInfoStateSuccess: + return true, nil + case types.TaskInfoStateError: + return false, fmt.Errorf(taskInfo.Error.LocalizedMessage) + default: + return false, nil + } +} +func (r *Client) getClientFromVmRef(vmRef ref.Ref, hosts util.HostsFunc) (client *vim25.Client, err error) { + vm := &model.VM{} + err = r.Source.Inventory.Find(vm, vmRef) + if err != nil { + return nil, liberr.Wrap(err, "vm", vmRef.String()) + } + return r.getClient(vm, hosts) +} + +func (r *Client) getTaskById(vmRef ref.Ref, taskId string, hosts util.HostsFunc) (*types.TaskInfo, error) { + r.Log.V(1).Info("Get task by id", "taskId", taskId, "vmRef", vmRef) + + // Get the ESXi client for the haTasks + client, err := r.getClientFromVmRef(vmRef, hosts) + if err != nil { + return nil, err + } + // Create a collector to receive the tasks + pc := property.DefaultCollector(client) + pc, err = pc.Create(context.TODO()) + if err != nil { + return nil, err + } + //nolint:errcheck + defer pc.Destroy(context.TODO()) + + // Retrieve the task from ESXi host + taskRef := types.ManagedObjectReference{ + Type: taskType, + Value: taskId, + } + var content []types.ObjectContent + err = pc.RetrieveOne(context.TODO(), taskRef, []string{"info"}, &content) + if err != nil { + return nil, err + } + if len(content) == 0 { + return nil, fmt.Errorf("task %s not found", taskId) + } + if len(content[0].PropSet) == 0 { + return nil, fmt.Errorf("task %s not found property set", taskId) + } + if content[0].PropSet[0].Val == nil { + return nil, fmt.Errorf("no task value found for task %s", taskId) + } + task := content[0].PropSet[0].Val.(types.TaskInfo) + return &task, nil +} + func (r *Client) getClient(vm *model.VM, hosts util.HostsFunc) (client *vim25.Client, err error) { if coldLocal, vErr := r.Plan.VSphereColdLocal(); vErr == nil && coldLocal { // when virt-v2v runs the migration, forklift-controller should interact only @@ -371,7 +440,7 @@ func nullableHosts() (hosts map[string]*v1beta1.Host, err error) { } // Remove a VM snapshot and optionally its children. -func (r *Client) removeSnapshot(vmRef ref.Ref, snapshot string, children bool, hosts util.HostsFunc) (err error) { +func (r *Client) removeSnapshot(vmRef ref.Ref, snapshot string, children bool, hosts util.HostsFunc) (taskId string, err error) { r.Log.Info("Removing snapshot", "vmRef", vmRef, "snapshot", snapshot, @@ -381,12 +450,11 @@ func (r *Client) removeSnapshot(vmRef ref.Ref, snapshot string, children bool, h if err != nil { return } - _, err = vm.RemoveSnapshot(context.TODO(), snapshot, children, nil) + task, err := vm.RemoveSnapshot(context.TODO(), snapshot, children, nil) if err != nil { - err = liberr.Wrap(err) - return + return "", liberr.Wrap(err) } - return + return task.Reference().Value, nil } // Connect to the vSphere API. diff --git a/pkg/controller/plan/migration.go b/pkg/controller/plan/migration.go index 2535e97c8..69e835aa7 100644 --- a/pkg/controller/plan/migration.go +++ b/pkg/controller/plan/migration.go @@ -108,7 +108,6 @@ const ( TransferCompleted = "Transfer completed." PopulatorPodPrefix = "populate-" DvStatusCheckRetriesAnnotation = "dvStatusCheckRetries" - SnapshotRemovalCheckRetries = "snapshotRemovalCheckRetries" ) var ( @@ -536,7 +535,7 @@ func (r *Migration) removeLastWarmSnapshot(vm *plan.VMStatus) { return } snapshot := vm.Warm.Precopies[n-1].Snapshot - if err := r.provider.RemoveSnapshot(vm.Ref, snapshot, r.kubevirt.loadHosts); err != nil { + if _, err := r.provider.RemoveSnapshot(vm.Ref, snapshot, r.kubevirt.loadHosts); err != nil { r.Log.Error( err, "Failed to clean up warm migration snapshots.", @@ -1014,7 +1013,9 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) { break } n := len(vm.Warm.Precopies) - err = r.provider.RemoveSnapshot(vm.Ref, vm.Warm.Precopies[n-1].Snapshot, r.kubevirt.loadHosts) + var taskId string + taskId, err = r.provider.RemoveSnapshot(vm.Ref, vm.Warm.Precopies[n-1].Snapshot, r.kubevirt.loadHosts) + vm.Warm.Precopies[len(vm.Warm.Precopies)-1].RemoveTaskId = taskId if err != nil { step.AddError(err.Error()) err = nil @@ -1027,27 +1028,15 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) { vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm))) break } - // FIXME: This is just temporary timeout to unblock the migrations which get stuck on issue https://issues.redhat.com/browse/MTV-1753 - // This should be fixed properly by adding the task manager inside the inventory and monitor the task status - // from the main controller. - var retries int - retriesAnnotation := step.Annotations[SnapshotRemovalCheckRetries] - if retriesAnnotation == "" { - step.Annotations[SnapshotRemovalCheckRetries] = "1" - } else { - retries, err = strconv.Atoi(retriesAnnotation) - if err != nil { - step.AddError(err.Error()) - err = nil - break - } - if retries >= settings.Settings.SnapshotRemovalCheckRetries { - vm.Phase = r.next(vm.Phase) - // Reset for next precopy - step.Annotations[SnapshotRemovalCheckRetries] = "1" - } else { - step.Annotations[SnapshotRemovalCheckRetries] = strconv.Itoa(retries + 1) - } + precopy := vm.Warm.Precopies[len(vm.Warm.Precopies)-1] + ready, err := r.provider.CheckSnapshotRemove(vm.Ref, precopy, r.kubevirt.loadHosts) + if err != nil { + step.AddError(err.Error()) + err = nil + break + } + if ready { + vm.Phase = r.next(vm.Phase) } case CreateInitialSnapshot, CreateSnapshot, CreateFinalSnapshot: step, found := vm.FindStep(r.step(vm)) @@ -1055,8 +1044,8 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) { vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm))) break } - var snapshot string - if snapshot, err = r.provider.CreateSnapshot(vm.Ref, r.kubevirt.loadHosts); err != nil { + var snapshot, taskId string + if snapshot, taskId, err = r.provider.CreateSnapshot(vm.Ref, r.kubevirt.loadHosts); err != nil { if errors.As(err, &web.ProviderNotReadyError{}) || errors.As(err, &web.ConflictError{}) { return } @@ -1065,7 +1054,7 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) { break } now := meta.Now() - precopy := plan.Precopy{Snapshot: snapshot, Start: &now} + precopy := plan.Precopy{Snapshot: snapshot, CreateTaskId: taskId, Start: &now} vm.Warm.Precopies = append(vm.Warm.Precopies, precopy) r.resetPrecopyTasks(vm, step) vm.Phase = r.next(vm.Phase) @@ -1075,14 +1064,17 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) { vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm))) break } - snapshot := vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot - ready, err := r.provider.CheckSnapshotReady(vm.Ref, snapshot) + precopy := vm.Warm.Precopies[len(vm.Warm.Precopies)-1] + ready, snapshotId, err := r.provider.CheckSnapshotReady(vm.Ref, precopy, r.kubevirt.loadHosts) if err != nil { step.AddError(err.Error()) err = nil break } if ready { + if snapshotId != "" { + vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot = snapshotId + } vm.Phase = r.next(vm.Phase) } case WaitForDataVolumesStatus, WaitForFinalDataVolumesStatus: