Skip to content

Commit

Permalink
Add a wait phase for snapshot tasks
Browse files Browse the repository at this point in the history
Issue:
The main problem of the MTV-1753 and MTV-1775 is that we are either not
waiting for the VMware task to finish or if we are waiting we are halting
the whole controller process. This causes either performance issues or
even migration failures. So we need to add a mechanism to wait for the
tasks without halting the whole process.

Fix:
My first attempt was in PR kubev2v#1262 which used the event manager. This on
the surface was an easy approach which did not require any additional changes
to the CR. The problem there was that some of the tasks were not
reported to the taskManager. These tasks had a prefix haTask. After some
investigation, I found out that these tasks are directly on the ESXi host
and not sent to the vspehre, so we can't use the taskManager.

This PR adds the taskIds to the status CR so additional wait phases can
monitor the tasks. The main controller will get the ESXi client and
create a property collector to request the specific task from the host.

Ref:
- https://issues.redhat.com/browse/MTV-1753
- https://issues.redhat.com/browse/MTV-1775
- kubev2v#1262
- kubev2v#1265

Signed-off-by: Martin Necas <mnecas@redhat.com>
  • Loading branch information
mnecas committed Dec 13, 2024
1 parent 3929eaf commit 5879406
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ spec:
items:
description: Precopy durations
properties:
createTaskId:
type: string
deltas:
items:
properties:
Expand All @@ -554,6 +556,8 @@ spec:
end:
format: date-time
type: string
removeTaskId:
type: string
snapshot:
type: string
start:
Expand Down
4 changes: 4 additions & 0 deletions operator/config/crd/bases/forklift.konveyor.io_plans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,8 @@ spec:
items:
description: Precopy durations
properties:
createTaskId:
type: string
deltas:
items:
properties:
Expand All @@ -1066,6 +1068,8 @@ spec:
end:
format: date-time
type: string
removeTaskId:
type: string
snapshot:
type: string
start:
Expand Down
10 changes: 6 additions & 4 deletions pkg/apis/forklift/v1beta1/plan/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ const (

// Precopy durations
type Precopy struct {
Start *meta.Time `json:"start,omitempty"`
End *meta.Time `json:"end,omitempty"`
Snapshot string `json:"snapshot,omitempty"`
Deltas []DiskDelta `json:"deltas,omitempty"`
Start *meta.Time `json:"start,omitempty"`
End *meta.Time `json:"end,omitempty"`
Snapshot string `json:"snapshot,omitempty"`
CreateTaskId string `json:"createTaskId,omitempty"`
RemoveTaskId string `json:"removeTaskId,omitempty"`
Deltas []DiskDelta `json:"deltas,omitempty"`
}

func (r *Precopy) WithDeltas(deltas map[string]string) {
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/plan/adapter/base/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ type Client interface {
// Return whether the source VM is powered off.
PoweredOff(vmRef ref.Ref) (bool, error)
// Create a snapshot of the source VM.
CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string, error)
CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error)
// Remove a snapshot.
RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) error
RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error)
// Check if a snapshot is ready to transfer.
CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error)
CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error)
// Check if a snapshot is removed.
CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, err error)
// Set DataVolume checkpoints.
SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error)
// Close connections to the provider API.
Expand Down
13 changes: 9 additions & 4 deletions pkg/controller/plan/adapter/ocp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ type Client struct {
}

// CheckSnapshotReady implements base.Client
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (bool, error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, string, error) {
return false, "", nil
}

// CheckSnapshotRemove implements base.Client
func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) {
return false, nil
}

Expand All @@ -35,12 +40,12 @@ func (r *Client) Close() {
}

// CreateSnapshot implements base.Client
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string, error) {
return "", nil
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (string, string, error) {
return "", "", nil
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error) {
return
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/plan/adapter/openstack/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,27 @@ func (r *Client) PoweredOff(vmRef ref.Ref) (off bool, err error) {
}

// Create a snapshot of the source VM.
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (imageID string, err error) {
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error) {
return
}

// Check if a snapshot is ready to transfer.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, imageID string) (ready bool, err error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error) {
return
}

// CheckSnapshotRemove implements base.Client
func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) {
return false, nil
}

// Set DataVolume checkpoints.
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) error {
return nil
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error) {
return
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/plan/adapter/ova/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (r *Client) connect() (err error) {
}

// Create a VM snapshot and return its ID. No-op for this provider.
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshot string, err error) {
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error) {
return
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error) {
return
}

Expand All @@ -61,10 +61,15 @@ func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc uti
}

// Check if a snapshot is ready to transfer, to avoid importer restarts.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error) {
return
}

// CheckSnapshotRemove implements base.Client
func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) {
return false, nil
}

// Set DataVolume checkpoints.
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error) {
return
Expand Down
15 changes: 10 additions & 5 deletions pkg/controller/plan/adapter/ovirt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Client struct {
}

// Create a VM snapshot and return its ID.
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshot string, err error) {
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error) {
_, vmService, err := r.getVM(vmRef)
if err != nil {
err = liberr.Wrap(err)
Expand Down Expand Up @@ -70,13 +70,18 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapsh
}
return
}
snapshot = snap.MustSnapshot().MustId()
snapshotId = snap.MustSnapshot().MustId()
return
}

// CheckSnapshotRemove implements base.Client
func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) {
return false, nil
}

// Check if a snapshot is ready to transfer, to avoid importer restarts.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
correlationID, err := r.getSnapshotCorrelationID(vmRef, &snapshot)
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error) {
correlationID, err := r.getSnapshotCorrelationID(vmRef, &precopy.Snapshot)
if err != nil {
err = liberr.Wrap(err)
return
Expand Down Expand Up @@ -105,7 +110,7 @@ func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool,
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (removeTaskId string, err error) {
return
}

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/plan/adapter/vsphere/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//vendor/github.com/vmware/govmomi",
"//vendor/github.com/vmware/govmomi/find",
"//vendor/github.com/vmware/govmomi/object",
"//vendor/github.com/vmware/govmomi/property",
"//vendor/github.com/vmware/govmomi/session",
"//vendor/github.com/vmware/govmomi/vim25",
"//vendor/github.com/vmware/govmomi/vim25/mo",
Expand Down
114 changes: 91 additions & 23 deletions pkg/controller/plan/adapter/vsphere/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
liberr "github.com/konveyor/forklift-controller/pkg/lib/error"
"github.com/vmware/govmomi"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/property"
"github.com/vmware/govmomi/session"
"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/mo"
Expand All @@ -29,6 +30,7 @@ import (
const (
snapshotName = "forklift-migration-precopy"
snapshotDesc = "Forklift Operator warm migration precopy"
taskType = "Task"
)

// vSphere VM Client
Expand All @@ -39,9 +41,9 @@ type Client struct {
}

// Create a VM snapshot and return its ID.
func (r *Client) CreateSnapshot(vmRef ref.Ref, hosts util.HostsFunc) (id string, err error) {
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapshotId string, creationTaskId string, err error) {
r.Log.V(1).Info("Creating snapshot", "vmRef", vmRef)
vm, err := r.getVM(vmRef, hosts)
vm, err := r.getVM(vmRef, hostsFunc)
if err != nil {
return
}
Expand All @@ -50,28 +52,15 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hosts util.HostsFunc) (id string,
err = liberr.Wrap(err)
return
}
res, err := task.WaitForResult(context.TODO(), nil)
if err != nil {
err = liberr.Wrap(err)
return
}
id = res.Result.(types.ManagedObjectReference).Value
r.Log.Info("Created snapshot", "vmRef", vmRef, "id", id)

return
}

// Check if a snapshot is ready to transfer.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
return true, nil
return "", task.Reference().Value, nil
}

// Remove a VM snapshot.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.HostsFunc) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.HostsFunc) (taskId string, err error) {
r.Log.V(1).Info("RemoveSnapshot",
"vmRef", vmRef,
"snapshot", snapshot)
err = r.removeSnapshot(vmRef, snapshot, false, hosts)
taskId, err = r.removeSnapshot(vmRef, snapshot, false, hosts)
return
}

Expand Down Expand Up @@ -247,6 +236,86 @@ func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshotId string, hosts util.
return
}

// CheckSnapshotRemove implements base.Client
func (r *Client) CheckSnapshotRemove(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (bool, error) {
taskInfo, err := r.getTaskById(vmRef, precopy.RemoveTaskId, hosts)
if err != nil {
return false, liberr.Wrap(err)
}
return r.checkTaskStatus(taskInfo)
}

// Check if a snapshot is removed.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, precopy planapi.Precopy, hosts util.HostsFunc) (ready bool, snapshotId string, err error) {
taskInfo, err := r.getTaskById(vmRef, precopy.CreateTaskId, hosts)
if err != nil {
return false, snapshotId, liberr.Wrap(err)
}
ready, err = r.checkTaskStatus(taskInfo)
snapshotId = taskInfo.Result.(types.ManagedObjectReference).Value
return
}

func (r *Client) checkTaskStatus(taskInfo *types.TaskInfo) (ready bool, err error) {
r.Log.Info("Snapshot task", "task", taskInfo.Task.Value, "name", taskInfo.Name, "status", taskInfo.State)
switch taskInfo.State {
case types.TaskInfoStateSuccess:
return true, nil
case types.TaskInfoStateError:
return false, fmt.Errorf(taskInfo.Error.LocalizedMessage)
default:
return false, nil
}
}
func (r *Client) getClientFromVmRef(vmRef ref.Ref, hosts util.HostsFunc) (client *vim25.Client, err error) {
vm := &model.VM{}
err = r.Source.Inventory.Find(vm, vmRef)
if err != nil {
return nil, liberr.Wrap(err, "vm", vmRef.String())
}
return r.getClient(vm, hosts)
}

func (r *Client) getTaskById(vmRef ref.Ref, taskId string, hosts util.HostsFunc) (*types.TaskInfo, error) {
r.Log.V(1).Info("Get task by id", "taskId", taskId, "vmRef", vmRef)

// Get the ESXi client for the haTasks
client, err := r.getClientFromVmRef(vmRef, hosts)
if err != nil {
return nil, err
}
// Create a collector to receive the tasks
pc := property.DefaultCollector(client)
pc, err = pc.Create(context.TODO())
if err != nil {
return nil, err
}
//nolint:errcheck
defer pc.Destroy(context.TODO())

// Retrieve the task from ESXi host
taskRef := types.ManagedObjectReference{
Type: taskType,
Value: taskId,
}
var content []types.ObjectContent
err = pc.RetrieveOne(context.TODO(), taskRef, []string{"info"}, &content)
if err != nil {
return nil, err
}
if len(content) == 0 {
return nil, fmt.Errorf("task %s not found", taskId)
}
if len(content[0].PropSet) == 0 {
return nil, fmt.Errorf("task %s not found property set", taskId)
}
if content[0].PropSet[0].Val == nil {
return nil, fmt.Errorf("no task value found for task %s", taskId)
}
task := content[0].PropSet[0].Val.(types.TaskInfo)
return &task, nil
}

func (r *Client) getClient(vm *model.VM, hosts util.HostsFunc) (client *vim25.Client, err error) {
if coldLocal, vErr := r.Plan.VSphereColdLocal(); vErr == nil && coldLocal {
// when virt-v2v runs the migration, forklift-controller should interact only
Expand Down Expand Up @@ -371,7 +440,7 @@ func nullableHosts() (hosts map[string]*v1beta1.Host, err error) {
}

// Remove a VM snapshot and optionally its children.
func (r *Client) removeSnapshot(vmRef ref.Ref, snapshot string, children bool, hosts util.HostsFunc) (err error) {
func (r *Client) removeSnapshot(vmRef ref.Ref, snapshot string, children bool, hosts util.HostsFunc) (taskId string, err error) {
r.Log.Info("Removing snapshot",
"vmRef", vmRef,
"snapshot", snapshot,
Expand All @@ -381,12 +450,11 @@ func (r *Client) removeSnapshot(vmRef ref.Ref, snapshot string, children bool, h
if err != nil {
return
}
_, err = vm.RemoveSnapshot(context.TODO(), snapshot, children, nil)
task, err := vm.RemoveSnapshot(context.TODO(), snapshot, children, nil)
if err != nil {
err = liberr.Wrap(err)
return
return "", liberr.Wrap(err)
}
return
return task.Reference().Value, nil
}

// Connect to the vSphere API.
Expand Down
Loading

0 comments on commit 5879406

Please sign in to comment.