Skip to content

Commit

Permalink
Revert "Move waiting for tasks to separate phase to unblock process"
Browse files Browse the repository at this point in the history
This reverts commit 4208890.
  • Loading branch information
mnecas committed Dec 12, 2024
1 parent 59c31b0 commit 5e767cf
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 136 deletions.
1 change: 1 addition & 0 deletions operator/roles/forkliftcontroller/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ controller_snapshot_removal_timeout_minuts: 120
controller_snapshot_status_check_rate_seconds: 10
controller_cleanup_retries: 10
controller_dv_status_check_retries: 10
controller_snapshot_removal_check_retries: 20
controller_vsphere_incremental_backup: true
controller_ovirt_warm_migration: true
controller_retain_precopy_importer_pods: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ spec:
- name: DV_STATUS_CHECK_RETRIES
value: "{{ controller_dv_status_check_retries }}"
{% endif %}
{% if controller_snapshot_removal_check_retries is number %}
- name: SNAPSHOT_REMOVAL_CHECK_RETRIES
value: "{{ controller_snapshot_removal_check_retries }}"
{% endif %}
{% if controller_max_vm_inflight is number %}
- name: MAX_VM_INFLIGHT
value: "{{ controller_max_vm_inflight }}"
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/plan/adapter/base/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ type Client interface {
// Remove a snapshot.
RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) error
// Check if a snapshot is ready to transfer.
CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error)
// Check if a snapshot is removed.
CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (ready bool, err error)
CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error)
// Set DataVolume checkpoints.
SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error)
// Close connections to the provider API.
Expand Down
7 changes: 1 addition & 6 deletions pkg/controller/plan/adapter/ocp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ type Client struct {
}

// CheckSnapshotReady implements base.Client
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
return
}

// CheckSnapshotRemoved implements base.Client
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (bool, error) {
return false, nil
}

Expand Down
7 changes: 1 addition & 6 deletions pkg/controller/plan/adapter/openstack/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,10 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (imageI
}

// Check if a snapshot is ready to transfer.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, imageID string) (ready bool, err error) {
return
}

// CheckSnapshotRemoved implements base.Client
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
return false, nil
}

// Set DataVolume checkpoints.
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) error {
return nil
Expand Down
7 changes: 1 addition & 6 deletions pkg/controller/plan/adapter/ova/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,10 @@ func (r *Client) GetSnapshotDeltas(vmRef ref.Ref, snapshot string, hostsFunc uti
}

// Check if a snapshot is ready to transfer, to avoid importer restarts.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
return
}

// CheckSnapshotRemoved implements base.Client
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
return false, nil
}

// Set DataVolume checkpoints.
func (r *Client) SetCheckpoints(vmRef ref.Ref, precopies []planapi.Precopy, datavolumes []cdi.DataVolume, final bool, hostsFunc util.HostsFunc) (err error) {
return
Expand Down
7 changes: 1 addition & 6 deletions pkg/controller/plan/adapter/ovirt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (snapsh
}

// Check if a snapshot is ready to transfer, to avoid importer restarts.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
correlationID, err := r.getSnapshotCorrelationID(vmRef, &snapshot)
if err != nil {
err = liberr.Wrap(err)
Expand Down Expand Up @@ -104,11 +104,6 @@ func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool,
return
}

// CheckSnapshotRemoved implements base.Client
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (bool, error) {
return false, nil
}

// Remove a VM snapshot. No-op for this provider.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
return
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/plan/adapter/vsphere/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ go_library(
"//vendor/github.com/vmware/govmomi/find",
"//vendor/github.com/vmware/govmomi/object",
"//vendor/github.com/vmware/govmomi/session",
"//vendor/github.com/vmware/govmomi/task",
"//vendor/github.com/vmware/govmomi/vim25",
"//vendor/github.com/vmware/govmomi/vim25/mo",
"//vendor/github.com/vmware/govmomi/vim25/soap",
Expand Down
88 changes: 15 additions & 73 deletions pkg/controller/plan/adapter/vsphere/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/vmware/govmomi"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/session"
"github.com/vmware/govmomi/task"
"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/soap"
Expand All @@ -28,11 +27,8 @@ import (
)

const (
snapshotName = "forklift-migration-precopy"
snapshotDesc = "Forklift Operator warm migration precopy"
VirtualMachine = "VirtualMachine"
CreateSnapshotTask = "CreateSnapshot_Task"
RemoveSnapshotTask = "RemoveSnapshot_Task"
snapshotName = "forklift-migration-precopy"
snapshotDesc = "Forklift Operator warm migration precopy"
)

// vSphere VM Client
Expand All @@ -43,9 +39,9 @@ type Client struct {
}

// Create a VM snapshot and return its ID.
func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (id string, err error) {
func (r *Client) CreateSnapshot(vmRef ref.Ref, hosts util.HostsFunc) (id string, err error) {
r.Log.V(1).Info("Creating snapshot", "vmRef", vmRef)
vm, err := r.getVM(vmRef, hostsFunc)
vm, err := r.getVM(vmRef, hosts)
if err != nil {
return
}
Expand All @@ -54,82 +50,28 @@ func (r *Client) CreateSnapshot(vmRef ref.Ref, hostsFunc util.HostsFunc) (id str
err = liberr.Wrap(err)
return
}
return task.Common.Reference().Value, nil
}

// Check if a snapshot is ready to transfer.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, snapshotId string, err error) {
taskInfo, err := r.getLatestTaskByName(vmRef, CreateSnapshotTask)
res, err := task.WaitForResult(context.TODO(), nil)
if err != nil {
return false, "", liberr.Wrap(err)
}
ready, err = r.checkTaskStatus(taskInfo)
if err != nil {
return false, "", liberr.Wrap(err)
}
if ready {
return true, taskInfo.Result.(types.ManagedObjectReference).Value, nil
} else {
// The snapshot is not ready, retry the check
return false, "", nil
}
}

// Check if a snapshot is removed.
func (r *Client) CheckSnapshotRemoved(vmRef ref.Ref, snapshot string) (ready bool, err error) {
taskInfo, err := r.getLatestTaskByName(vmRef, RemoveSnapshotTask)
if err != nil {
return false, liberr.Wrap(err)
err = liberr.Wrap(err)
return
}
return r.checkTaskStatus(taskInfo)
}
id = res.Result.(types.ManagedObjectReference).Value
r.Log.Info("Created snapshot", "vmRef", vmRef, "id", id)

func (r *Client) checkTaskStatus(taskInfo *types.TaskInfo) (ready bool, err error) {
r.Log.Info("Snapshot task", "task", taskInfo.Task.Value, "name", taskInfo.Name, "status", taskInfo.State)
switch taskInfo.State {
case types.TaskInfoStateSuccess:
return true, nil
case types.TaskInfoStateError:
return false, fmt.Errorf(taskInfo.Error.LocalizedMessage)
default:
return false, nil
}
return
}

func (r *Client) getLatestTaskByName(vmRef ref.Ref, taskName string) (*types.TaskInfo, error) {
taskManager := task.NewManager(r.client.Client)
taskCollector, err := taskManager.CreateCollectorForTasks(context.TODO(), types.TaskFilterSpec{
Entity: &types.TaskFilterSpecByEntity{
Entity: types.ManagedObjectReference{
Type: VirtualMachine,
Value: vmRef.ID,
},
Recursion: types.TaskFilterSpecRecursionOptionSelf,
},
})
if err != nil {
return nil, err
}
//nolint:errcheck
defer taskCollector.Destroy(context.Background())
tasks, err := taskCollector.LatestPage(context.TODO())
if err != nil {
return nil, err
}
for _, taskInfo := range tasks {
if taskInfo.Name == taskName {
return &taskInfo, nil
}
}
return nil, fmt.Errorf("no task found with name %s, vmRef %v", taskName, vmRef)
// Check if a snapshot is ready to transfer.
func (r *Client) CheckSnapshotReady(vmRef ref.Ref, snapshot string) (ready bool, err error) {
return true, nil
}

// Remove a VM snapshot.
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hostsFunc util.HostsFunc) (err error) {
func (r *Client) RemoveSnapshot(vmRef ref.Ref, snapshot string, hosts util.HostsFunc) (err error) {
r.Log.V(1).Info("RemoveSnapshot",
"vmRef", vmRef,
"snapshot", snapshot)
err = r.removeSnapshot(vmRef, snapshot, false, hostsFunc)
err = r.removeSnapshot(vmRef, snapshot, false, hosts)
return
}

Expand Down
39 changes: 23 additions & 16 deletions pkg/controller/plan/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const (
TransferCompleted = "Transfer completed."
PopulatorPodPrefix = "populate-"
DvStatusCheckRetriesAnnotation = "dvStatusCheckRetries"
SnapshotRemovalCheckRetries = "snapshotRemovalCheckRetries"
)

var (
Expand Down Expand Up @@ -1026,15 +1027,27 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) {
vm.AddError(fmt.Sprintf("Step '%s' not found", r.step(vm)))
break
}
snapshot := vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot
ready, err := r.provider.CheckSnapshotRemoved(vm.Ref, snapshot)
if err != nil {
step.AddError(err.Error())
err = nil
break
}
if ready {
vm.Phase = r.next(vm.Phase)
// FIXME: This is just temporary timeout to unblock the migrations which get stuck on issue https://issues.redhat.com/browse/MTV-1753
// This should be fixed properly by adding the task manager inside the inventory and monitor the task status
// from the main controller.
var retries int
retriesAnnotation := step.Annotations[SnapshotRemovalCheckRetries]
if retriesAnnotation == "" {
step.Annotations[SnapshotRemovalCheckRetries] = "1"
} else {
retries, err = strconv.Atoi(retriesAnnotation)
if err != nil {
step.AddError(err.Error())
err = nil
break
}
if retries >= settings.Settings.SnapshotRemovalCheckRetries {
vm.Phase = r.next(vm.Phase)
// Reset for next precopy
step.Annotations[SnapshotRemovalCheckRetries] = "1"
} else {
step.Annotations[SnapshotRemovalCheckRetries] = strconv.Itoa(retries + 1)
}
}
case CreateInitialSnapshot, CreateSnapshot, CreateFinalSnapshot:
step, found := vm.FindStep(r.step(vm))
Expand Down Expand Up @@ -1063,18 +1076,12 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) {
break
}
snapshot := vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot
ready, snapshotId, err := r.provider.CheckSnapshotReady(vm.Ref, snapshot)
ready, err := r.provider.CheckSnapshotReady(vm.Ref, snapshot)
if err != nil {
step.AddError(err.Error())
err = nil
break
}
// If the provider does not directly create the snapshot, but we need to wait for the snapshot to be created
// We start the creation task in CreateSnapshot, set the task ID as a snapshot id which needs to be replaced
// by the snapshot id after the task finishes.
if snapshotId != "" {
vm.Warm.Precopies[len(vm.Warm.Precopies)-1].Snapshot = snapshotId
}
if ready {
vm.Phase = r.next(vm.Phase)
}
Expand Down
44 changes: 25 additions & 19 deletions pkg/settings/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,26 @@ import (

// Environment variables.
const (
MaxVmInFlight = "MAX_VM_INFLIGHT"
HookRetry = "HOOK_RETRY"
ImporterRetry = "IMPORTER_RETRY"
VirtV2vImage = "VIRT_V2V_IMAGE"
PrecopyInterval = "PRECOPY_INTERVAL"
VirtV2vDontRequestKVM = "VIRT_V2V_DONT_REQUEST_KVM"
SnapshotRemovalTimeout = "SNAPSHOT_REMOVAL_TIMEOUT"
SnapshotStatusCheckRate = "SNAPSHOT_STATUS_CHECK_RATE"
CDIExportTokenTTL = "CDI_EXPORT_TOKEN_TTL"
FileSystemOverhead = "FILESYSTEM_OVERHEAD"
BlockOverhead = "BLOCK_OVERHEAD"
CleanupRetries = "CLEANUP_RETRIES"
DvStatusCheckRetries = "DV_STATUS_CHECK_RETRIES"
OvirtOsConfigMap = "OVIRT_OS_MAP"
VsphereOsConfigMap = "VSPHERE_OS_MAP"
VirtCustomizeConfigMap = "VIRT_CUSTOMIZE_MAP"
VddkJobActiveDeadline = "VDDK_JOB_ACTIVE_DEADLINE"
VirtV2vExtraArgs = "VIRT_V2V_EXTRA_ARGS"
VirtV2vExtraConfConfigMap = "VIRT_V2V_EXTRA_CONF_CONFIG_MAP"
MaxVmInFlight = "MAX_VM_INFLIGHT"
HookRetry = "HOOK_RETRY"
ImporterRetry = "IMPORTER_RETRY"
VirtV2vImage = "VIRT_V2V_IMAGE"
PrecopyInterval = "PRECOPY_INTERVAL"
VirtV2vDontRequestKVM = "VIRT_V2V_DONT_REQUEST_KVM"
SnapshotRemovalTimeout = "SNAPSHOT_REMOVAL_TIMEOUT"
SnapshotStatusCheckRate = "SNAPSHOT_STATUS_CHECK_RATE"
CDIExportTokenTTL = "CDI_EXPORT_TOKEN_TTL"
FileSystemOverhead = "FILESYSTEM_OVERHEAD"
BlockOverhead = "BLOCK_OVERHEAD"
CleanupRetries = "CLEANUP_RETRIES"
DvStatusCheckRetries = "DV_STATUS_CHECK_RETRIES"
SnapshotRemovalCheckRetries = "SNAPSHOT_REMOVAL_CHECK_RETRIES"
OvirtOsConfigMap = "OVIRT_OS_MAP"
VsphereOsConfigMap = "VSPHERE_OS_MAP"
VirtCustomizeConfigMap = "VIRT_CUSTOMIZE_MAP"
VddkJobActiveDeadline = "VDDK_JOB_ACTIVE_DEADLINE"
VirtV2vExtraArgs = "VIRT_V2V_EXTRA_ARGS"
VirtV2vExtraConfConfigMap = "VIRT_V2V_EXTRA_CONF_CONFIG_MAP"
)

// Migration settings
Expand Down Expand Up @@ -61,6 +62,8 @@ type Migration struct {
CleanupRetries int
// DvStatusCheckRetries retries
DvStatusCheckRetries int
// SnapshotRemovalCheckRetries retries
SnapshotRemovalCheckRetries int
// oVirt OS config map name
OvirtOsConfigMap string
// vSphere OS config map name
Expand Down Expand Up @@ -106,6 +109,9 @@ func (r *Migration) Load() (err error) {
if r.DvStatusCheckRetries, err = getPositiveEnvLimit(DvStatusCheckRetries, 10); err != nil {
return liberr.Wrap(err)
}
if r.SnapshotRemovalCheckRetries, err = getPositiveEnvLimit(SnapshotRemovalCheckRetries, 20); err != nil {
return liberr.Wrap(err)
}
if virtV2vImage, ok := os.LookupEnv(VirtV2vImage); ok {
r.VirtV2vImage = virtV2vImage
} else if Settings.Role.Has(MainRole) {
Expand Down

0 comments on commit 5e767cf

Please sign in to comment.