From a5e97850317d46bfd1de420f6907ba6338e6b9c1 Mon Sep 17 00:00:00 2001 From: Igal Tsoiref Date: Tue, 4 May 2021 12:09:47 +0300 Subject: [PATCH 1/2] TEst --- .../assisted_installer_controller.go | 54 +++++++++++-------- .../assisted_installer_main.go | 16 ++++-- src/utils/utils.go | 6 +++ 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/src/assisted_installer_controller/assisted_installer_controller.go b/src/assisted_installer_controller/assisted_installer_controller.go index 9dd327b53a..62e70aba4b 100644 --- a/src/assisted_installer_controller/assisted_installer_controller.go +++ b/src/assisted_installer_controller/assisted_installer_controller.go @@ -116,8 +116,12 @@ func logHostsStatus(log logrus.FieldLogger, hosts map[string]inventory_client.Ho log.Infof("Hosts status: %v", hostsStatus) } +<<<<<<< Updated upstream func (c *controller) WaitAndUpdateNodesStatus(status *ControllerStatus) { +======= +func (c *controller) WaitAndUpdateNodesStatus(status *ControllerStatus) error { +>>>>>>> Stashed changes c.log.Infof("Waiting till all nodes will join and update status to assisted installer") ignoreStatuses := []string{models.HostStatusDisabled} var hostsInError int @@ -141,7 +145,7 @@ func (c *controller) WaitAndUpdateNodesStatus(status *ControllerStatus) { //if all hosts are in error, mark the failure and finish if hostsInError > 0 && hostsInError == len(hostsInProgressMap) { status.Error() - break + return errors.Errorf("return error") } //if all hosts are successfully installed, finish if len(hostsInProgressMap) == 0 { @@ -182,6 +186,7 @@ func (c *controller) WaitAndUpdateNodesStatus(status *ControllerStatus) { c.updateConfiguringStatusIfNeeded(assistedNodesMap) } c.log.Infof("Done waiting for all the nodes. Nodes in error status: %d\n", hostsInError) + return nil } func (c *controller) HackDNSAddressConflict(wg *sync.WaitGroup) { @@ -318,25 +323,28 @@ func isCsrApproved(csr *certificatesv1.CertificateSigningRequest) bool { return false } -func (c controller) PostInstallConfigs(wg *sync.WaitGroup, status *ControllerStatus) { +func (c controller) PostInstallConfigs(ctx context.Context, wg *sync.WaitGroup, status *ControllerStatus) { defer wg.Done() - for { - time.Sleep(GeneralWaitInterval) + err := utils.WaitForPredicateWithContext(ctx, time.Duration(1<<63 - 1), GeneralWaitInterval, func() bool { ctx := utils.GenerateRequestContext() cluster, err := c.ic.GetCluster(ctx) if err != nil { utils.RequestIDLogger(ctx, c.log).WithError(err).Errorf("Failed to get cluster %s from assisted-service", c.ClusterID) - continue + return false } // waiting till cluster will be installed(3 masters must be installed) if *cluster.Status != models.ClusterStatusFinalizing { - continue + return false } - break + return true + }) + if err != nil { + return } errMessage := "" - err := c.postInstallConfigs() + // TODO veridy if ctx was cancelled + err = c.postInstallConfigs(ctx) if err != nil { errMessage = err.Error() status.Error() @@ -345,19 +353,19 @@ func (c controller) PostInstallConfigs(wg *sync.WaitGroup, status *ControllerSta c.sendCompleteInstallation(success, errMessage) } -func (c controller) postInstallConfigs() error { +func (c controller) postInstallConfigs(ctx context.Context) error { var err error c.log.Infof("Waiting for cluster version operator: %t", c.WaitForClusterVersion) if c.WaitForClusterVersion { - err = c.waitingForClusterVersion() + err = c.waitingForClusterVersion(ctx) if err != nil { return err } } - err = utils.WaitForPredicate(WaitTimeout, GeneralWaitInterval, c.addRouterCAToClusterCA) + err = utils.WaitForPredicateWithContext(ctx, WaitTimeout, GeneralWaitInterval, c.addRouterCAToClusterCA) if err != nil { return errors.Errorf("Timeout while waiting router ca data") } @@ -367,7 +375,7 @@ func (c controller) postInstallConfigs() error { return err } if unpatch && c.HighAvailabilityMode != models.ClusterHighAvailabilityModeNone { - err = utils.WaitForPredicate(WaitTimeout, GeneralWaitInterval, c.unpatchEtcd) + err = utils.WaitForPredicateWithContext(ctx, WaitTimeout, GeneralWaitInterval, c.unpatchEtcd) if err != nil { return errors.Errorf("Timeout while trying to unpatch etcd") } @@ -375,13 +383,13 @@ func (c controller) postInstallConfigs() error { c.log.Infof("Skipping etcd unpatch for cluster version %s", c.ControllerConfig.OpenshiftVersion) } - err = utils.WaitForPredicate(WaitTimeout, GeneralWaitInterval, c.validateConsoleAvailability) + err = utils.WaitForPredicateWithContext(ctx, WaitTimeout, GeneralWaitInterval, c.validateConsoleAvailability) if err != nil { return errors.Errorf("Timeout while waiting for console to become available") } waitTimeout := c.getMaximumOLMTimeout() - err = utils.WaitForPredicate(waitTimeout, GeneralWaitInterval, c.waitForOLMOperators) + err = utils.WaitForPredicateWithContext(ctx, waitTimeout, GeneralWaitInterval, c.waitForOLMOperators) if err != nil { // In case the timeout occur, we have to update the pending OLM operators to failed state, // so the assisted-service can update the cluster state to completed. @@ -394,14 +402,13 @@ func (c controller) postInstallConfigs() error { return nil } -func (c controller) UpdateBMHs(wg *sync.WaitGroup) { +func (c controller) UpdateBMHs(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - for { - time.Sleep(GeneralWaitInterval) + _ = utils.WaitForPredicateWithContext(ctx, time.Duration(1<<63 - 1), GeneralWaitInterval, func() bool { bmhs, err := c.kc.ListBMHs() if err != nil { c.log.WithError(err).Errorf("Failed to list BMH hosts") - continue + return false } c.log.Infof("Number of BMHs is %d", len(bmhs.Items)) @@ -409,7 +416,7 @@ func (c controller) UpdateBMHs(wg *sync.WaitGroup) { machines, err := c.unallocatedMachines(bmhs) if err != nil { c.log.WithError(err).Errorf("Failed to find unallocated machines") - continue + return false } c.log.Infof("Number of unallocated Machines is %d", len(machines.Items)) @@ -417,9 +424,10 @@ func (c controller) UpdateBMHs(wg *sync.WaitGroup) { allUpdated := c.updateBMHs(&bmhs, machines) if allUpdated { c.log.Infof("Updated all the BMH CRs, finished successfully") - return + return true } - } + return false + }) } func (c controller) unallocatedMachines(bmhList metal3v1alpha1.BareMetalHostList) (*mapiv1beta1.MachineList, error) { @@ -766,7 +774,7 @@ func (c controller) validateConsoleAvailability() bool { // // This function would be aligned with the console operator reporting workflow // as part of the deprecation of the old API in MGMT-5188. -func (c controller) waitingForClusterVersion() error { +func (c controller) waitingForClusterVersion(ctx context.Context) error { isClusterVersionAvailable := func() bool { c.log.Infof("Checking cluster version operator availability status") co, err := c.kc.GetClusterVersion("version") @@ -801,7 +809,7 @@ func (c controller) waitingForClusterVersion() error { return false } - err := utils.WaitForPredicate(WaitTimeout, GeneralProgressUpdateInt, isClusterVersionAvailable) + err := utils.WaitForPredicateWithContext(ctx, WaitTimeout, GeneralProgressUpdateInt, isClusterVersionAvailable) if err != nil { return errors.Errorf("Timeout while waiting for cluster version to be available") } diff --git a/src/main/assisted-installer-controller/assisted_installer_main.go b/src/main/assisted-installer-controller/assisted_installer_main.go index 22e14643a2..b5c53b935d 100644 --- a/src/main/assisted-installer-controller/assisted_installer_main.go +++ b/src/main/assisted-installer-controller/assisted_installer_main.go @@ -64,9 +64,11 @@ func main() { ctxApprove, cancelApprove := context.WithCancel(context.Background()) go assistedController.ApproveCsrs(ctxApprove, &wg) wg.Add(1) - go assistedController.PostInstallConfigs(&wg, &status) + + ctxOthers, cancelOthers := context.WithCancel(context.Background()) + go assistedController.PostInstallConfigs(ctxOthers, &wg, &status) wg.Add(1) - go assistedController.UpdateBMHs(&wg) + go assistedController.UpdateBMHs(ctxOthers, &wg) wg.Add(1) go assistedController.HackDNSAddressConflict(&wg) wg.Add(1) @@ -76,13 +78,19 @@ func main() { wgLogs.Add(1) assistedController.SetReadyState() - assistedController.WaitAndUpdateNodesStatus(&status) + err = assistedController.WaitAndUpdateNodesStatus(&status) logger.Infof("Sleeping for 10 minutes to give a chance to approve all csrs") - time.Sleep(10 * time.Minute) + if err == nil { + time.Sleep(10 * time.Minute) + } else { + cancelOthers() + } cancelApprove() + logger.Infof("Waiting for all go routines to finish") wg.Wait() + // TODO verify if canceled and cancel logs without waiting for error if !status.HasError() { //with error the logs are canceled within UploadLogs logger.Infof("closing logs...") diff --git a/src/utils/utils.go b/src/utils/utils.go index 6bfd30ed04..f097be1e6f 100644 --- a/src/utils/utils.go +++ b/src/utils/utils.go @@ -187,11 +187,17 @@ func GetHostIpsFromInventory(inventory *models.Inventory) ([]string, error) { } func WaitForPredicate(timeout time.Duration, interval time.Duration, predicate func() bool) error { + return WaitForPredicateWithContext(context.TODO(), timeout, interval, predicate) +} + +func WaitForPredicateWithContext(ctx context.Context, timeout time.Duration, interval time.Duration, predicate func() bool) error { timeoutAfter := time.After(timeout) ticker := time.NewTicker(interval) // Keep trying until we're time out or get true for { select { + case <- ctx.Done(): + return errors.Errorf("Cancelled") // Got a timeout! fail with a timeout error case <-timeoutAfter: return errors.New("timed out") From 79e87830dbf4a517fcfd1db81fa90f5d4d15d6a3 Mon Sep 17 00:00:00 2001 From: Igal Tsoiref Date: Tue, 4 May 2021 12:11:52 +0300 Subject: [PATCH 2/2] rebase fix --- .../assisted_installer_controller.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/assisted_installer_controller/assisted_installer_controller.go b/src/assisted_installer_controller/assisted_installer_controller.go index 62e70aba4b..94d87d2c0a 100644 --- a/src/assisted_installer_controller/assisted_installer_controller.go +++ b/src/assisted_installer_controller/assisted_installer_controller.go @@ -116,12 +116,7 @@ func logHostsStatus(log logrus.FieldLogger, hosts map[string]inventory_client.Ho log.Infof("Hosts status: %v", hostsStatus) } -<<<<<<< Updated upstream -func (c *controller) WaitAndUpdateNodesStatus(status *ControllerStatus) { - -======= func (c *controller) WaitAndUpdateNodesStatus(status *ControllerStatus) error { ->>>>>>> Stashed changes c.log.Infof("Waiting till all nodes will join and update status to assisted installer") ignoreStatuses := []string{models.HostStatusDisabled} var hostsInError int